ho avuto lo stesso problema e trovato una soluzione per la creazione di una sottoclasse di InputDStream. È necessario definire i metodi start()
e compute()
.
start()
può essere utilizzato per la preparazione. La logica principale risiede in compute()
. Restituisce Option[RDD[T]]
. Per rendere flessibile la classe, è definito il tratto InputStreamQuery
.
trait InputStreamQuery[T] {
// where clause condition for partition key
def partitionCond : (String, Any)
// function to return next partition key
def nextValue(v:Any) : Option[Any]
// where clause condition for clustering key
def whereCond : (String, (T) => Any)
// batch size
def batchSize : Int
}
Per la tabella Cassandra keyspace.test
, creare test_by_date
che riorganizza la tabella per il partizionamento chiave date
.
CREATE TABLE IF NOT exists keyspace.test
(id timeuuid, date text, value text, primary key (id))
CREATE MATERIALIZED VIEW IF NOT exists keyspace.test_by_date AS
SELECT *
FROM keyspace.test
WHERE id IS NOT NULL
PRIMARY KEY (date, id)
WITH CLUSTERING ORDER BY (id ASC);
Una possibile implementazione per test
tabella è
class class Test(id:UUID, date:String, value:String)
trait InputStreamQueryTest extends InputStreamQuery[Test] {
val dateFormat = "uuuu-MM-dd"
// set batch size as 10 records
override def batchSize: Int = 10
// partitioning key conditions, query string and initial value
override def partitionCond: (String, Any) = ("date = ?", "2017-10-01")
// clustering key condition, query string and function to get clustering key from the instance
override def whereCond: (String, Test => Any) = (" id > ?", m => m.id)
// return next value of clustering key. ex) '2017-10-02' for input value '2017-10-01'
override def nextValue(v: Any): Option[Any] = {
import java.time.format.DateTimeFormatter
val formatter = DateTimeFormatter.ofPattern(dateFormat)
val nextDate = LocalDate.parse(v.asInstanceOf[String], formatter).plusDays(1)
if (nextDate.isAfter(LocalDate.now())) None
else Some(nextDate.format(formatter))
}
}
che può essere usata nella classe CassandraInputStream
come segue.
class CassandraInputStream[T: ClassTag]
(_ssc: StreamingContext, keyspace:String, table:String)
(implicit rrf: RowReaderFactory[T], ev: ValidRDDType[T])
extends InputDStream[T](_ssc) with InputStreamQuery[T] {
var lastElm:Option[T] = None
var partitionKey : Any = _
override def start(): Unit = {
// find a partition key which stores some records
def findStartValue(cql : String, value:Any): Any = {
val rdd = _ssc.sparkContext.cassandraTable[T](keyspace, table).where(cql, value).limit(1)
if (rdd.cassandraCount() > 0) value
else {
nextValue(value).map(findStartValue(cql, _)).getOrElse(value)
}
}
// get query string and initial value from partitionCond method
val (cql, value) = partitionCond
partitionKey = findStartValue(cql, value)
}
override def stop(): Unit = {}
override def compute(validTime: Time): Option[RDD[T]] = {
val (cql, _) = partitionCond
val (wh, whKey) = whereCond
def fetchNext(patKey: Any) : Option[CassandraTableScanRDD[T]] = {
// query with partitioning condition
val query = _ssc.sparkContext.cassandraTable[T](keyspace, table).where(cql, patKey)
val rdd = lastElm.map{ x =>
query.where(wh, whKey(x)).withAscOrder.limit(batchSize)
}.getOrElse(query.withAscOrder.limit(batchSize))
if (rdd.cassandraCount() > 0) {
// store the last element of this RDD
lastElm = Some(rdd.collect.last)
Some(rdd)
}
else {
// find the next partition key which stores data
nextValue(patKey).flatMap{ k =>
partitionKey = k
fetchNext(k)}
}
}
fetchNext(partitionKey)
}
}
Combinando tutte le classi,
val conf = new SparkConf().setAppName(appName).setMaster(master)
val ssc = new StreamingContext(conf, Seconds(10))
val dstream = new CassandraInputStream[Test](ssc, "keyspace", "test_by_date") with InputStreamQueryTest
dstream.map(println).saveToCassandra(...)
ssc.start()
ssc.awaitTermination()
Potresti descrivere che cosa si vuole raggiungere? Leggi la tabella completa su ogni intervallo? Da dove provengono i dati di streaming? – maasg
@maasg Voglio leggere la tabella su ogni intervallo (come 10s) per interrogare alcuni record che sono correlati al tempo. Significa che voglio lasciare che la Cassandra sia la fonte di Spark Streaming. In una parola, sono bloccato alla creazione di DStream. Vuoi dare alcuni suggerimenti ed esempi? Grazie mille! –