Ho decine di milioni di righe di dati. È possibile analizzare tutto questo in una settimana o un giorno utilizzando lo streaming spark? Qual è il limite per stimolare lo streaming in termini di quantità di dati? Non sono sicuro di quale sia il limite superiore e quando dovrei inserirli nel mio database poiché Stream probabilmente non può più gestirli. Ho anche finestre di tempo differenti 1,3, 6 ore ecc. Dove utilizzo le operazioni della finestra per separare i dati.Qual è il limite per stimolare lo streaming in termini di quantità di dati?
Si prega di trovare il mio codice qui sotto:
conf = SparkConf().setAppName(appname)
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc,300)
sqlContext = SQLContext(sc)
channels = sc.cassandraTable("abc","channels")
topic = 'abc.crawled_articles'
kafkaParams = {"metadata.broker.list": "0.0.0.0:9092"}
category = 'abc.crawled_article'
category_stream = KafkaUtils.createDirectStream(ssc, [category], kafkaParams)
category_join_stream = category_stream.map(lambda x:read_json(x[1])).filter(lambda x:x!=0).map(lambda x:categoryTransform(x)).filter(lambda x:x!=0).map(lambda x:(x['id'],x))
article_stream = KafkaUtils.createDirectStream(ssc, [topic], kafkaParams)
article_join_stream=article_stream.map(lambda x:read_json(x[1])).filter(lambda x: x!=0).map(lambda x:TransformInData(x)).filter(lambda x: x!=0).flatMap(lambda x:(a for a in x)).map(lambda x:(x['id'].encode("utf-8") ,x))
#axes topic integration the article and the axes
axes_topic = 'abc.crawled_axes'
axes_stream = KafkaUtils.createDirectStream(ssc, [axes_topic], kafkaParams)
axes_join_stream = axes_stream.filter(lambda x:'delete' not in str(x)).map(lambda x:read_json(x[1])).filter(lambda x: x!=0).map(lambda x:axesTransformData(x)).filter(lambda x: x!=0).map(lambda x:(str(x['id']),x)).map(lambda x:(x[0],{'id':x[0], 'attitudes':x[1]['likes'],'reposts':0,'comments':x[1]['comments'],'speed':x[1]['comments']}))
#axes_join_stream.reduceByKeyAndWindow(lambda x, y: x + y, 30, 10).transform(axestrans).pprint()
#join
statistics = article_join_stream.window(1*60*60,5*60).cogroup(category_join_stream.window(1*60*60,60)).cogroup((axes_join_stream.window(24*60*60,5*60)))
statistics.transform(joinstream).pprint()
ssc.start() # Start the computation ssc.awaitTermination()
ssc.awaitTermination()
Ci sono più domande qui, sarebbe di aiuto rispondere se le hai chiaramente separate. Inoltre, sarebbe utile se si minimizza il codice incluso nel campione più piccolo sufficiente per illustrare il problema – etov