13

Sto tentando di scrivere messaggi Google PubSub su Google Cloud Storage utilizzando Google Cloud Dataflow. So che TextIO/AvroIO non supportano le condotte di streaming. Tuttavia, ho letto in [1] che è possibile scrivere su GCS in una pipeline di streaming da un ParDo/DoFn in un commento dell'autore. Ho costruito un oleodotto seguendo il loro articolo il più vicino possibile.Scrittura su Google Cloud Storage da PubSub utilizzando Cloud Dataflow tramite DoFn

stavo puntando per questo comportamento:

  • messaggi scritti in un batch di fino a 100 per gli oggetti in GCS (uno per vetro di una finestra) sotto un percorso che corrisponde al tempo il messaggio è stato pubblicato nel dataflow-requests/[isodate-time]/[paneIndex].

ottengo risultati diversi:

  • C'è solo un singolo riquadro in ogni finestra oraria. Pertanto, ottengo solo un file in ogni "bucket" orario (in realtà è un percorso oggetto in GCS). Ridurre MAX_EVENTS_IN_FILE a 10 non ha fatto alcuna differenza, ancora solo un riquadro/file.
  • C'è un solo messaggio in ogni oggetto GCS che è stato scritto
  • La pipeline genera occasionalmente un errore CRC durante la scrittura su GCS.

Come risolvere questi problemi e ottenere il comportamento che mi aspetto?

Esempio di uscita di registro:

21:30:06.977 writing pane 0 to blob dataflow-requests/2016-04-08T20:59:59.999Z/0 
21:30:06.977 writing pane 0 to blob dataflow-requests/2016-04-08T20:59:59.999Z/0 
21:30:07.773 sucessfully write pane 0 to blob dataflow-requests/2016-04-08T20:59:59.999Z/0 
21:30:07.846 sucessfully write pane 0 to blob dataflow-requests/2016-04-08T20:59:59.999Z/0 
21:30:07.847 writing pane 0 to blob dataflow-requests/2016-04-08T20:59:59.999Z/0 

Ecco il mio codice:

package com.example.dataflow; 

import com.google.cloud.dataflow.sdk.Pipeline; 
import com.google.cloud.dataflow.sdk.io.PubsubIO; 
import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions; 
import com.google.cloud.dataflow.sdk.options.PipelineOptions; 
import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; 
import com.google.cloud.dataflow.sdk.transforms.DoFn; 
import com.google.cloud.dataflow.sdk.transforms.ParDo; 
import com.google.cloud.dataflow.sdk.transforms.windowing.*; 
import com.google.cloud.dataflow.sdk.values.PCollection; 
import com.google.gcloud.storage.BlobId; 
import com.google.gcloud.storage.BlobInfo; 
import com.google.gcloud.storage.Storage; 
import com.google.gcloud.storage.StorageOptions; 
import org.joda.time.Duration; 
import org.joda.time.format.ISODateTimeFormat; 
import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 

import java.io.IOException; 

public class PubSubGcsSSCCEPipepline { 

    private static final Logger LOG = LoggerFactory.getLogger(PubSubGcsSSCCEPipepline.class); 

    public static final String BUCKET_PATH = "dataflow-requests"; 

    public static final String BUCKET_NAME = "myBucketName"; 

    public static final Duration ONE_DAY = Duration.standardDays(1); 
    public static final Duration ONE_HOUR = Duration.standardHours(1); 
    public static final Duration TEN_SECONDS = Duration.standardSeconds(10); 

    public static final int MAX_EVENTS_IN_FILE = 100; 

    public static final String PUBSUB_SUBSCRIPTION = "projects/myProjectId/subscriptions/requests-dataflow"; 

    private static class DoGCSWrite extends DoFn<String, Void> 
     implements DoFn.RequiresWindowAccess { 

     public transient Storage storage; 

     { init(); } 

     public void init() { storage = StorageOptions.defaultInstance().service(); } 

     private void readObject(java.io.ObjectInputStream in) 
       throws IOException, ClassNotFoundException { 
      init(); 
     } 

     @Override 
     public void processElement(ProcessContext c) throws Exception { 
      String isoDate = ISODateTimeFormat.dateTime().print(c.window().maxTimestamp()); 
      String blobName = String.format("%s/%s/%s", BUCKET_PATH, isoDate, c.pane().getIndex()); 

      BlobId blobId = BlobId.of(BUCKET_NAME, blobName); 
      LOG.info("writing pane {} to blob {}", c.pane().getIndex(), blobName); 
      storage.create(BlobInfo.builder(blobId).contentType("text/plain").build(), c.element().getBytes()); 
      LOG.info("sucessfully write pane {} to blob {}", c.pane().getIndex(), blobName); 
     } 
    } 

    public static void main(String[] args) { 
     PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create(); 
     options.as(DataflowPipelineOptions.class).setStreaming(true); 
     Pipeline p = Pipeline.create(options); 

     PubsubIO.Read.Bound<String> readFromPubsub = PubsubIO.Read.named("ReadFromPubsub") 
       .subscription(PUBSUB_SUBSCRIPTION); 

     PCollection<String> streamData = p.apply(readFromPubsub); 

     PCollection<String> windows = streamData.apply(Window.<String>into(FixedWindows.of(ONE_HOUR)) 
       .withAllowedLateness(ONE_DAY) 
       .triggering(AfterWatermark.pastEndOfWindow() 
         .withEarlyFirings(AfterPane.elementCountAtLeast(MAX_EVENTS_IN_FILE)) 
         .withLateFirings(AfterFirst.of(AfterPane.elementCountAtLeast(MAX_EVENTS_IN_FILE), 
           AfterProcessingTime.pastFirstElementInPane() 
             .plusDelayOf(TEN_SECONDS)))) 
       .discardingFiredPanes()); 

     windows.apply(ParDo.of(new DoGCSWrite())); 

     p.run(); 
    } 


} 

[1] https://labs.spotify.com/2016/03/10/spotifys-event-delivery-the-road-to-the-cloud-part-iii/

Grazie a Sam McVeety per la soluzione. Ecco il codice corretto per la lettura a chiunque:

package com.example.dataflow; 

import com.google.cloud.dataflow.sdk.Pipeline; 
import com.google.cloud.dataflow.sdk.io.PubsubIO; 
import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions; 
import com.google.cloud.dataflow.sdk.options.PipelineOptions; 
import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; 
import com.google.cloud.dataflow.sdk.transforms.*; 
import com.google.cloud.dataflow.sdk.transforms.windowing.*; 
import com.google.cloud.dataflow.sdk.values.KV; 
import com.google.cloud.dataflow.sdk.values.PCollection; 
import com.google.gcloud.WriteChannel; 
import com.google.gcloud.storage.BlobId; 
import com.google.gcloud.storage.BlobInfo; 
import com.google.gcloud.storage.Storage; 
import com.google.gcloud.storage.StorageOptions; 
import org.joda.time.Duration; 
import org.joda.time.format.ISODateTimeFormat; 
import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 

import java.io.IOException; 
import java.nio.ByteBuffer; 
import java.util.Iterator; 

public class PubSubGcsSSCCEPipepline { 

    private static final Logger LOG = LoggerFactory.getLogger(PubSubGcsSSCCEPipepline.class); 

    public static final String BUCKET_PATH = "dataflow-requests"; 

    public static final String BUCKET_NAME = "myBucketName"; 

    public static final Duration ONE_DAY = Duration.standardDays(1); 
    public static final Duration ONE_HOUR = Duration.standardHours(1); 
    public static final Duration TEN_SECONDS = Duration.standardSeconds(10); 

    public static final int MAX_EVENTS_IN_FILE = 100; 

    public static final String PUBSUB_SUBSCRIPTION = "projects/myProjectId/subscriptions/requests-dataflow"; 

    private static class DoGCSWrite extends DoFn<Iterable<String>, Void> 
     implements DoFn.RequiresWindowAccess { 

     public transient Storage storage; 

     { init(); } 

     public void init() { storage = StorageOptions.defaultInstance().service(); } 

     private void readObject(java.io.ObjectInputStream in) 
       throws IOException, ClassNotFoundException { 
      init(); 
     } 

     @Override 
     public void processElement(ProcessContext c) throws Exception { 
      String isoDate = ISODateTimeFormat.dateTime().print(c.window().maxTimestamp()); 
      long paneIndex = c.pane().getIndex(); 
      String blobName = String.format("%s/%s/%s", BUCKET_PATH, isoDate, paneIndex); 

      BlobId blobId = BlobId.of(BUCKET_NAME, blobName); 

      LOG.info("writing pane {} to blob {}", paneIndex, blobName); 
      WriteChannel writer = storage.writer(BlobInfo.builder(blobId).contentType("text/plain").build()); 
      LOG.info("blob stream opened for pane {} to blob {} ", paneIndex, blobName); 
      int i=0; 
      for (Iterator<String> it = c.element().iterator(); it.hasNext();) { 
       i++; 
       writer.write(ByteBuffer.wrap(it.next().getBytes())); 
       LOG.info("wrote {} elements to blob {}", i, blobName); 
      } 
      writer.close(); 
      LOG.info("sucessfully write pane {} to blob {}", paneIndex, blobName); 
     } 
    } 

    public static void main(String[] args) { 
     PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create(); 
     options.as(DataflowPipelineOptions.class).setStreaming(true); 
     Pipeline p = Pipeline.create(options); 

     PubsubIO.Read.Bound<String> readFromPubsub = PubsubIO.Read.named("ReadFromPubsub") 
       .subscription(PUBSUB_SUBSCRIPTION); 

     PCollection<String> streamData = p.apply(readFromPubsub); 
     PCollection<KV<String, String>> keyedStream = 
       streamData.apply(WithKeys.of(new SerializableFunction<String, String>() { 
        public String apply(String s) { return "constant"; } })); 

     PCollection<KV<String, Iterable<String>>> keyedWindows = keyedStream 
       .apply(Window.<KV<String, String>>into(FixedWindows.of(ONE_HOUR)) 
         .withAllowedLateness(ONE_DAY) 
         .triggering(AfterWatermark.pastEndOfWindow() 
           .withEarlyFirings(AfterPane.elementCountAtLeast(MAX_EVENTS_IN_FILE)) 
           .withLateFirings(AfterFirst.of(AfterPane.elementCountAtLeast(MAX_EVENTS_IN_FILE), 
             AfterProcessingTime.pastFirstElementInPane() 
               .plusDelayOf(TEN_SECONDS)))) 
         .discardingFiredPanes()) 
       .apply(GroupByKey.create()); 


     PCollection<Iterable<String>> windows = keyedWindows 
       .apply(Values.<Iterable<String>>create()); 


     windows.apply(ParDo.of(new DoGCSWrite())); 

     p.run(); 
    } 

} 
+1

Sto preparando una risposta più lunga, ma una cosa che volevo confermare: stai tentando di scrivere 100 elementi in un singolo oggetto GCS, giusto? –

+2

Esatto. Mi piacerebbe evitare il problema dei "file di piccole dimensioni" quando procedo allo elaborare questi file in modalità batch con flusso di dati o qualche altro framework in un lavoro separato. –

risposta

7

C'è un Gotcha qui, che è che avrete bisogno di un GroupByKey in modo che i vetri per essere aggregati appropriata. L'esempio di Spotify fa riferimento a questo come "La materializzazione dei riquadri viene eseguita nella trasformazione" Eventi aggregati "che non è altro che una trasformazione di GroupByKey", ma è un punto sottile. Dovrai fornire una chiave per farlo e, nel tuo caso, sembra che un valore costante funzionerà.

PCollection<String> streamData = p.apply(readFromPubsub); 
    PCollection<KV<String, String>> keyedStream = 
     streamData.apply(WithKeys.of(new SerializableFunction<String, String>() { 
      public Integer apply(String s) { return "constant"; } })); 

A questo punto, è possibile applicare la funzione a finestre, e poi un finale GroupByKey per ottenere il comportamento desiderato:

PCollection<String, Iterable<String>> keyedWindows = keyedStream.apply(...) 
     .apply(GroupByKey.create()); 
    PCollection<Iterable<String>> windows = keyedWindows 
     .apply(Values.<Iterable<String>>create()); 

Ora gli elementi in processElement saranno Iterable<String>, con dimensioni di 100 o più .

Abbiamo archiviato https://issues.apache.org/jira/browse/BEAM-184 per rendere questo comportamento più chiaro.

+0

Grazie per questo. La cosa chiave dummy funziona. –

2

A partire dal fascio 2.0, TextIO/AvroIOdo supporto scrittura collezioni illimitati - vedi documentation, in particolare, è necessario specificare withWindowedWrites().