2016-01-26 38 views
5

Ho il seguente lavoro scintilla:Come abilitare lo streaming da Cassandra a Spark?

from __future__ import print_function 

import os 
import sys 
import time 
from random import random 
from operator import add 
from pyspark.streaming import StreamingContext 
from pyspark import SparkContext,SparkConf 
from pyspark.streaming.kafka import KafkaUtils 
from pyspark.sql import SQLContext, Row 
from pyspark.streaming import StreamingContext 
from pyspark_cassandra import streaming,CassandraSparkContext 

if __name__ == "__main__": 

    conf = SparkConf().setAppName("PySpark Cassandra Test") 
    sc = CassandraSparkContext(conf=conf) 
    stream = StreamingContext(sc, 2) 

    rdd=sc.cassandraTable("keyspace2","users").collect() 
    #print rdd 
    stream.start() 
    stream.awaitTermination() 
    sc.stop() 

Quando ho eseguito questo, mi dà il seguente errore:

ERROR StreamingContext: Error starting the context, marking it as stopped 
java.lang.IllegalArgumentException: requirement failed: \ 
No output operations registered, so nothing to execute 

lo script shell corro:

./bin/spark-submit --packages TargetHolding:pyspark-cassandra:0.2.4 example 
s/src/main/python/test/reading-cassandra.py 

Confronto longherone k streaming con Kafka, ho questa linea manca dal codice di cui sopra:

kafkaStream = KafkaUtils.createStream(stream, 'localhost:2181', "name", {'topic':1}) 

dove realtà sto usando createStream ma per Cassandra, non riesco a vedere nulla di simile sui documenti. Come posso avviare lo streaming tra lo streaming di scintille e cassandra?

Versioni:

Cassandra v2.1.12 
Spark v1.4.1 
Scala 2.10 
+0

Vuoi trasmettere in streaming da Cassandra a Spark? Non penso che sia supportato al momento. Salvataggio dei dati di streaming * su * cassandra è supportato: https://github.com/datastax/spark-cassandra-connector/blob/master/doc/8_streaming.md – maasg

+0

Sì, voglio trasmettere da CASSANDRA a SPARK. Pensavo di essere abbastanza vicino alla sceneggiatura che ho scritto, avevo solo bisogno di registrare un'operazione con lo stream, forse "createStream". So come passare dalla scintilla a cassandra. – HackCode

+0

Vuoi trasmettere l'intera tabella ('cassandraTable (" keyspace2 "," users ")') ogni intervallo di tempo? – maasg

risposta

0

per creare DSTREAM da una tabella di Cassandra, è possibile utilizzare un ConstantInputDStream fornire il RDD creato dal tavolo Cassandra come input. Ciò comporterà l'RDD materializzato su ciascun intervallo DStream.

Tieni presente che tabelle o tabelle di grandi dimensioni che crescono continuamente di dimensioni incideranno negativamente sulle prestazioni del tuo lavoro di streaming.

Vedere anche: Reading from Cassandra using Spark Streaming per un esempio.

+0

grazie per la risposta, ma ho provato a cercare la sua implementazione con pyspark ma non sono riuscito a trovarne. È supportato con Python? – HackCode

+0

@HackCode dopo aver controllato l'API Python, sembra che 'ConstantInputDStream' non esiste per i collegamenti Python: http://spark.apache.org/docs/latest/api/python/pyspark.streaming.html#module-pyspark .streaming – maasg

+0

@HackCode Hai mai trovato una soluzione per questo? Se 'ConstantInputDStream' non esiste nell'API Python, in che modo PySpark Streaming può funzionare con Cassandra? – user2361174