2016-06-30 40 views
5

Volevo sfruttare la nuova funzionalità BigQuery delle tabelle partizionate nel tempo, ma non sono sicuro che ciò sia attualmente possibile nella versione 1.6 dell'SDK Dataflow.Creazione/scrittura nella tabella BigQuery parititoned tramite Google Cloud Dataflow

Guardando il BigQuery JSON API, per creare una tabella partizionata giorno si ha la necessità di passare a un'opzione di

"timePartitioning": { "type": "DAY" } 

, ma l'interfaccia com.google.cloud.dataflow.sdk.io.BigQueryIO solo consente di specificare un TableReference.

Ho pensato che forse potevo pre-creare il tavolo e intrufolarmi in un decoratore di partizioni tramite un lambda BigQueryIO.Write.toTableReference? Qualcun altro sta avendo successo con la creazione/scrittura di tabelle partizionate tramite Dataflow?

Questo sembra un problema simile all'impostazione di table expiration time che non è attualmente disponibile.

risposta

6

Come dice Pavan, è sicuramente possibile scrivere su tabelle di partizione con Dataflow. Stai utilizzando lo DataflowPipelineRunner in modalità streaming o batch?

La soluzione che hai proposto dovrebbe funzionare. In particolare, se si crea una tabella con la configurazione del partizionamento della data, è possibile utilizzare un lambda BigQueryIO.Write.toTableReference per scrivere in una partizione data. Per esempio:

/** 
* A Joda-time formatter that prints a date in format like {@code "20160101"}. 
* Threadsafe. 
*/ 
private static final DateTimeFormatter FORMATTER = 
    DateTimeFormat.forPattern("yyyyMMdd").withZone(DateTimeZone.UTC); 

// This code generates a valid BigQuery partition name: 
Instant instant = Instant.now(); // any Joda instant in a reasonable time range 
String baseTableName = "project:dataset.table"; // a valid BigQuery table name 
String partitionName = 
    String.format("%s$%s", baseTableName, FORMATTER.print(instant)); 
+3

Questo metodo è molto bello, ma consentirà solo di controllare il timbro data con i parametri all'esterno della pipeline. E se volessimo utilizzare i timestamp dai dati stessi per suddividerli per date e poi scrivere in tabelle corrispondenti? – nembleton

+3

@nembleton: Se gli elementi hanno data/ora, tu può utilizzare la finestra per mapparli in finestre giornaliere.Modificare questo codice: 'PCollection windowedItems = items.apply ( Window. in (FixedWindows.of (Duration.standardMinutes (10))));'. Quindi il TableSpecFun che legge le finestre mapperanno gli elementi nei giorni corretti.Il codice proviene da [FixedWindows javadoc] (https://cloud.google.com/dataflow/java-sdk/JavaDoc/com/google/cloud/dataflow/sdk/transforms/windowing/FixedWindows) –

+1

Grazie a @DanHalperin è praticamente quello che sto facendo, inclusa la finestra, ma usando '.apply (Window.into (CalendarWindows.days (1)))' L'unico problema è dato dal fatto che i dati possono essere in fusi orari diversi e vogliamo che BigQuery restituisca i dati nel fuso orario originale, facciamo un po 'di funkiness in un precedente PTransform con una chiamata 'outputWithTimestamp' – ptf

3

Credo che dovrebbe essere possibile utilizzare il decoratore di partizioni quando non si utilizza lo streaming. Stiamo lavorando attivamente per supportare i decoratori di partizioni attraverso lo streaming. Facci sapere se stai riscontrando degli errori oggi con la modalità non streaming.

+1

Hi @Pavan, stiamo utilizzando BlockingDataflowPipelineRunner ed è in esecuzione in modalità batch, ma il passaggio di BigQueryIO.Write fallisce con '400 Bad Request' e' "I decoratori di tabelle non possono essere utilizzati con lo streaming insert." ' C'è un modo di non utilizzare le scritture di streaming su BigQuery? Pensavo che avrebbe effettivamente fatto un carico grosso. E c'è una linea temporale per supportare la modalità streaming? – ptf

+0

Ah, sembra che una funzione di riferimento tabella lo faccia andare in modalità streaming :( – ptf

+0

Ciao @Pavan, qualsiasi timeline quando i decoratori di tavoli saranno supportati durante lo streaming? – manishpal

6

L'approccio che ho preso (funziona in modalità streaming, troppo):

  • Definire una finestra personalizzata per il record in entrata
  • Convertire la finestra nella tabella partizione/nome

    p.apply(PubsubIO.Read 
          .subscription(subscription) 
          .withCoder(TableRowJsonCoder.of()) 
         ) 
         .apply(Window.into(new TablePartitionWindowFn())) 
         .apply(BigQueryIO.Write 
             .to(new DayPartitionFunc(dataset, table)) 
             .withSchema(schema) 
             .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND) 
         ); 
    

Impostazione della finestra in base i dati in arrivo, il fine immediato possono essere ignorati, come il valore iniziale viene utilizzato per impostare la partizione:

public class TablePartitionWindowFn extends NonMergingWindowFn<Object, IntervalWindow> { 

private IntervalWindow assignWindow(AssignContext context) { 
    TableRow source = (TableRow) context.element(); 
    String dttm_str = (String) source.get("DTTM"); 

    DateTimeFormatter formatter = DateTimeFormat.forPattern("yyyy-MM-dd").withZoneUTC(); 

    Instant start_point = Instant.parse(dttm_str,formatter); 
    Instant end_point = start_point.withDurationAdded(1000, 1); 

    return new IntervalWindow(start_point, end_point); 
}; 

@Override 
public Coder<IntervalWindow> windowCoder() { 
    return IntervalWindow.getCoder(); 
} 

@Override 
public Collection<IntervalWindow> assignWindows(AssignContext c) throws Exception { 
    return Arrays.asList(assignWindow(c)); 
} 

@Override 
public boolean isCompatible(WindowFn<?, ?> other) { 
    return false; 
} 

@Override 
public IntervalWindow getSideInputWindow(BoundedWindow window) { 
    if (window instanceof GlobalWindow) { 
     throw new IllegalArgumentException(
       "Attempted to get side input window for GlobalWindow from non-global WindowFn"); 
    } 
    return null; 
} 

Impostazione della tabella delle partizioni in modo dinamico:

public class DayPartitionFunc implements SerializableFunction<BoundedWindow, String> { 

String destination = ""; 

public DayPartitionFunc(String dataset, String table) { 
    this.destination = dataset + "." + table+ "$"; 
} 

@Override 
public String apply(BoundedWindow boundedWindow) { 
    // The cast below is safe because CalendarWindows.days(1) produces IntervalWindows. 
    String dayString = DateTimeFormat.forPattern("yyyyMMdd") 
            .withZone(DateTimeZone.UTC) 
            .print(((IntervalWindow) boundedWindow).start()); 
    return destination + dayString; 
}} 

c'è un modo migliore per raggiungere il stesso risultato?

+0

quale versione della libreria a fascio di Apache hai usato per configurare il flusso di dati sopra? –

1

Apache Beam versione 2.0 supporta lo sharding delle tabelle di output BigQuery out of the box.