2015-11-16 9 views
5

Sto solo cercando di capire come funziona il modello di programmazione. Scenario sto utilizzando Pub/Sub + Dataflow per l'analisi degli strumenti per un forum web. Ho un flusso di dati provenienti da Pub/Sub che assomiglia a:Stato longevo con Google Dataflow

ID | TS | EventType 
1 | 1 | Create 
1 | 2 | Comment 
2 | 2 | Create 
1 | 4 | Comment 

E voglio finire con un flusso proveniente dal flusso di dati che assomiglia:

ID | TS | num_comments 
1 | 1 | 0 
1 | 2 | 1 
2 | 2 | 0 
1 | 4 | 2 

voglio il lavoro che questo rollup viene eseguito come un processo di flusso, con nuovi conteggi che vengono compilati quando arrivano nuovi eventi. La mia domanda è: dov'è il posto idiomatico in cui il lavoro memorizza lo stato per l'argomento corrente e il conteggio dei commenti? Supponendo che gli argomenti possano vivere per anni. idee correnti sono:

  • scrivere una voce 'corrente' per il tema id per BigTable e in una query DoFn ciò che l'attuale numero di commento per l'argomento ID è in arrivo anche mentre scrivo questo io non lo sono. fan.
  • Utilizzare gli ingressi laterali in qualche modo? Sembra che forse questa è la risposta, ma se così fosse non capisco del tutto.
  • Configurare un processo di streaming con una finestra globale, con un trigger che si attiva ogni volta che ottiene un record e fare affidamento su Dataflow per mantenere l'intera cronologia del riquadro da qualche parte. (Requisiti di archiviazione illimitata?)

EDIT: Giusto per chiarire, non avrei avuto alcun problema in atto qualsiasi di queste tre strategie, o un milione di diversi altri modi di farlo, io sono più interessato a ciò che è il migliore modo di farlo con Dataflow. Ciò che sarà più resistente al fallimento, dovendo rielaborare la cronologia per un backfill, ecc. Ecc.

EDIT2: c'è attualmente un bug con il servizio dataflow in cui gli errori hanno esito negativo se si aggiungono input ad una trasformazione flatten, che significherà è necessario scartare e ricostruire qualsiasi stato accumulato nel lavoro se si apporta una modifica a un lavoro che include l'aggiunta di qualcosa a un'operazione di appiattimento.

risposta

7

Dovresti essere in grado di utilizzare i trigger e una mietitrebbia per ottenere ciò.

PCollection<ID> comments = /* IDs from the source */; 
PCollection<KV<ID, Long>> commentCounts = comments 
    // Produce speculative results by triggering as data comes in. 
    // Note that this won't trigger after *every* element, but it will 
    // trigger relatively quickly (as the system divides incoming data 
    // into work units). You could also throttle this with something 
    // like: 
    // AfterProcessingTime.pastFirstElementInPane() 
    //  .plusDelayOf(Duration.standardMinutes(5)) 
    // which will produce output every 5 minutes 
    .apply(Window.triggering(
      Repeatedly.forever(AfterPane.elementCountAtLeast(1))) 
     .accumulatingFiredPanes()) 
    // Count the occurrences of each ID 
    .apply(Count.perElement()); 

// Produce an output String -- in your use case you'd want to produce 
// a row and write it to the appropriate source 
commentCounts.apply(new DoFn<KV<ID, Long>, String>() { 
    public void processElement(ProcessContext c) { 
    KV<ID, Long> element = c.element(); 
    // This includes details about the pane of the window being 
    // processed, and including a strictly increasing index of the 
    // number of panes that have been produced for the key.   
    PaneInfo pane = c.pane(); 
    return element.key() + " | " + pane.getIndex() + " | " + element.value(); 
    } 
}); 

A seconda dei dati, si potrebbe anche leggere i commenti interi dalla sorgente, estrarre l'ID, e quindi utilizzare Count.perKey() per ottenere i conteggi per ciascun ID. Se si desidera una combinazione più complicata, è possibile considerare la definizione di una personalizzata CombineFn e l'utilizzo di Combine.perKey.

+0

giusto, quindi questo è il numero 3 nella mia lista di potenziali implementazioni. La mia domanda è, è una * buona idea *? Lo stato qui viene mantenuto implicitamente dal flusso di dati. Cosa succede se ho bisogno di riavviare il lavoro? Come si implementerebbe un backfill storico? – bfabry

+1

In base alle modifiche apportate, potresti essere in grado di [Aggiornare la pipeline esistente] (https://cloud.google.com/dataflow/pipelines/updating-a-pipeline). Se le modifiche sono più sostanziali, l'approccio citato funziona se si sta utilizzando un'origine personalizzata che consente di leggere tutti i vecchi dati. –

+0

Un'origine personalizzata come metodo per gestire i backfill ecc. È un'idea interessante. Sembrerebbe risolvere quella domanda. È una buona idea avere quello stato che cresce solo per sempre? Cosa succede se un argomento del forum può essere chiuso, c'è un modo per dire "non ci saranno più eventi a cui teniamo conto per questo ID" in modo che venga scartato? – bfabry

2

Dato BigQuery non supporta file sovrascrittura, un modo per andare su questo è quello di scrivere gli eventi al BigQuery, e interrogare i dati utilizzando COUNT:

SELECT MATR, COUNT (NUM_COMMENTS) dalla tabella GROUP BY ID;

È inoltre possibile eseguire aggregazioni per finestra di num_comments all'interno di Dataflow prima di scrivere le voci in BigQuery; la query sopra continuerà a funzionare.

+0

Grazie per averci dato un colpo :-). Ai fini del problema, puoi ignorare che la destinazione è BQ. La destinazione dovrebbe essere aggiunta solo se. Il calcolo nella vita reale è anche più complicato di una semplice somma e preferiremmo essere espressi nel nostro ETL nel flusso di dati piuttosto che nelle query BQ complicate (e costose da eseguire). Inoltre, questa soluzione non ci fornisce la cronologia delle serie temporali del numero di commenti su un argomento. – bfabry