2016-02-29 17 views
5

Ho decine di milioni di righe di dati. È possibile analizzare tutto questo in una settimana o un giorno utilizzando lo streaming spark? Qual è il limite per stimolare lo streaming in termini di quantità di dati? Non sono sicuro di quale sia il limite superiore e quando dovrei inserirli nel mio database poiché Stream probabilmente non può più gestirli. Ho anche finestre di tempo differenti 1,3, 6 ore ecc. Dove utilizzo le operazioni della finestra per separare i dati.Qual è il limite per stimolare lo streaming in termini di quantità di dati?

Si prega di trovare il mio codice qui sotto:

conf = SparkConf().setAppName(appname) 
sc = SparkContext(conf=conf) 
ssc = StreamingContext(sc,300) 
sqlContext = SQLContext(sc) 
channels = sc.cassandraTable("abc","channels") 
topic = 'abc.crawled_articles' 
kafkaParams = {"metadata.broker.list": "0.0.0.0:9092"} 

category = 'abc.crawled_article' 
category_stream = KafkaUtils.createDirectStream(ssc, [category], kafkaParams) 
category_join_stream = category_stream.map(lambda x:read_json(x[1])).filter(lambda x:x!=0).map(lambda x:categoryTransform(x)).filter(lambda x:x!=0).map(lambda x:(x['id'],x)) 


article_stream = KafkaUtils.createDirectStream(ssc, [topic], kafkaParams) 
article_join_stream=article_stream.map(lambda x:read_json(x[1])).filter(lambda x: x!=0).map(lambda x:TransformInData(x)).filter(lambda x: x!=0).flatMap(lambda x:(a for a in x)).map(lambda x:(x['id'].encode("utf-8") ,x)) 

#axes topic integration the article and the axes 
axes_topic = 'abc.crawled_axes' 
axes_stream = KafkaUtils.createDirectStream(ssc, [axes_topic], kafkaParams) 
axes_join_stream = axes_stream.filter(lambda x:'delete' not in str(x)).map(lambda x:read_json(x[1])).filter(lambda x: x!=0).map(lambda x:axesTransformData(x)).filter(lambda x: x!=0).map(lambda x:(str(x['id']),x)).map(lambda x:(x[0],{'id':x[0], 'attitudes':x[1]['likes'],'reposts':0,'comments':x[1]['comments'],'speed':x[1]['comments']})) 
#axes_join_stream.reduceByKeyAndWindow(lambda x, y: x + y, 30, 10).transform(axestrans).pprint() 

#join 
statistics = article_join_stream.window(1*60*60,5*60).cogroup(category_join_stream.window(1*60*60,60)).cogroup((axes_join_stream.window(24*60*60,5*60))) 
statistics.transform(joinstream).pprint() 

ssc.start() # Start the computation ssc.awaitTermination() 
ssc.awaitTermination() 
+1

Ci sono più domande qui, sarebbe di aiuto rispondere se le hai chiaramente separate. Inoltre, sarebbe utile se si minimizza il codice incluso nel campione più piccolo sufficiente per illustrare il problema – etov

risposta

1

Uno alla volta:

  • E 'possibile analizzare [qualche grande numero di righe] entro [un determinato periodo di tempo]?

In generale, sì - Spark consente di scalare fuori attraverso molte macchine, quindi in linea di principio si dovrebbe essere in grado di avviare un grande cluster e macinare grandi quantità di dati in tempi relativamente brevi (assumendo che stiamo parlando di ore o giorni, non secondi o meno, che potrebbero essere problematici a causa di spese generali).

In particolare, l'esecuzione del tipo di elaborazione illustrato nelle domande su decine di milioni di record mi sembra fattibile in un lasso di tempo ragionevole (ovvero senza utilizzare un cluster estremamente grande).

  • Qual è il limite di Spark Streaming in termini di quantità di dati?

Non so, ma avrete difficoltà a raggiungerlo. Esistono esempi di distribuzioni estremamente grandi, ad es. in ebay ("centinaia di metriche su una media di 30 TB al giorno"). Inoltre, vedere lo FAQ, che menziona un cluster di 8000 macchine e l'elaborazione di PB di dati.

  • Quando devono essere scritti i risultati in [una sorta di archivio]?

Secondo lo standard basic model di Spark-Streaming, i dati vengono elaborati in micro-lotti. Se i tuoi dati sono effettivamente un flusso (cioè non ha un finale definito), allora l'approccio più semplice sarebbe quello di memorizzare i risultati di elaborazione di ciascun RDD (cioè microbatch).

Se i dati NON sono un flusso, ad es. stai elaborando una serie di file statici di tanto in tanto, dovresti probabilmente considerare di rinunciare alla parte dello stream (ad esempio usando solo Spark come processore batch).

Poiché la tua domanda menziona le dimensioni delle finestre di alcune ore, sospetto che potresti voler prendere in considerazione l'opzione batch.

  • Come posso elaborare gli stessi dati in finestre temporali diverse?

Se stai usando Spark-streaming, è possibile mantenere più stati (per esempio usando mapWithState) - una per ogni finestra temporale.

Un'altra idea (più semplice nel codice, più complicata in termini di operazioni): è possibile avviare più cluster, ciascuno con una propria finestra, leggendo dallo stesso flusso.

Se si esegue l'elaborazione in batch, è possibile eseguire la stessa operazione più volte con finestre temporali diverse, ad es. reduceByWindow con più dimensioni della finestra.