È possibile creare una pipeline che legge dati da Pub/Sub e scrive su Datastore? Nel mio codice specificherò PubsubIO come input e applicherò la windowing per ottenere una PColection limitata, ma sembra che non sia possibile usare DatastoreIO.writeTo con options.setStreaming come true, mentre è necessario per usare PubsubIO come input. C'è un modo per aggirare questo? O semplicemente non è possibile leggere da pubsub e scrivere su datastore?Lettura da PubsubIO scrittura su DatastoreIO
Ecco il mio codice:
DataflowPipelineOptions options = PipelineOptionsFactory.create()
.as(DataflowPipelineOptions.class);
options.setRunner(DataflowPipelineRunner.class);
options.setProject(projectName);
options.setStagingLocation("gs://my-staging-bucket/staging");
options.setStreaming(true);
Pipeline p = Pipeline.create(options);
PCollection<String> input = p.apply(PubsubIO.Read.topic("projects/"+projectName+"/topics/event-streaming"));
PCollection<String> inputWindow = input.apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(5))).triggering(AfterPane.elementCountAtLeast(1)).discardingFiredPanes().withAllowedLateness(Duration.standardHours(1)));
PCollection<String> inputDecode = inputWindow.apply(ParDo.of(new DoFn<String, String>() {
private static final long serialVersionUID = 1L;
public void processElement(ProcessContext c) {
String msg = c.element();
byte[] decoded = Base64.decodeBase64(msg.getBytes());
String outmsg = new String(decoded);
c.output(outmsg);
}
}));
PCollection<DatastoreV1.Entity> inputEntity = inputDecode.apply(ParDo.of(new CreateEntityFn("stream", "events")));
inputEntity.apply(DatastoreIO.writeTo(datasetid));
p.run();
E questa è l'eccezione ottengo:
Exception in thread "main" java.lang.UnsupportedOperationException: The Write transform is not supported by the Dataflow streaming runner.
at com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner$StreamingWrite.apply(DataflowPipelineRunner.java:488)
at com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner$StreamingWrite.apply(DataflowPipelineRunner.java:480)
at com.google.cloud.dataflow.sdk.runners.PipelineRunner.apply(PipelineRunner.java:74)
at com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner.apply(DataflowPipelineRunner.java:314)
at com.google.cloud.dataflow.sdk.Pipeline.applyInternal(Pipeline.java:358)
at com.google.cloud.dataflow.sdk.Pipeline.applyTransform(Pipeline.java:267)
at com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner.apply(DataflowPipelineRunner.java:312)
at com.google.cloud.dataflow.sdk.Pipeline.applyInternal(Pipeline.java:358)
at com.google.cloud.dataflow.sdk.Pipeline.applyTransform(Pipeline.java:267)
at com.google.cloud.dataflow.sdk.values.PCollection.apply(PCollection.java:159)
at my.own.project.google.dataflow.EventStreamingDataflow.main(EventStreamingDataflow.java:104)
Grazie, questo è stato utile. Ma ora sto affrontando problemi con la chiamata dell'API Datastore dall'app Dataflow, che non è un'app AppEngine, e apparentemente l'API del datastore si basa molto sulla funzionalità AppEngine disponibile solo per le app in esecuzione su AppEngine. Poi ho trovato l'API Remote che sembra fornire esattamente ciò di cui ho bisogno, ma ho ancora difficoltà ad usarlo. Devo autenticarmi con un account di servizio? Ho seguito il codice di esempio su questa [pagina] (https://cloud.google.com/appengine/docs/java/tools/remoteapi) ma sto ricevendo una HttpResponseException, 302 – lilline
Stai provando a scrivere su un'istanza di Datastore appartiene a un progetto diverso dalla tua pipeline Dataflow? In tal caso, dai un'occhiata a https://cloud.google.com/dataflow/security-and-permissions#cross-project per come configurarlo – danielm
No, l'istanza del datastore fa parte dello stesso progetto del flusso di dati, Ho superato il problema 302. MA, come è possibile utilizzare l'API remota in un ParDo, quando (sto indovinando qui) ParDo esegue la funzione DoFn in thread o istanze diverse rispetto alla pipeline padre e l'installer dell'API remota non è serializzabile e l'installer è disponibile solo sul thread in cui è stato creato? Non sono sicuro se questo è il problema, ma in ogni caso, sto ricevendo diverse eccezioni a seconda di dove provo a creare e ad accedere all'installer .. – lilline