2015-08-02 15 views
6

Sto cercando di filtrare su una piccola parte di un enorme tavolo Cassandra utilizzando:Spark joinWithCassandraTable() sulla mappa chiave di partizione multipla ERRORE

val snapshotsFiltered = sc.parallelize(startDate to endDate).map(TableKey(_2)).joinWithCassandraTable("listener","snapshots_test_b") 

voglio mappare le righe nella tabella cassandra su ' creato 'colonna che fa parte della chiave di partizione.

La mia chiave tavolo (la chiave di partizione della tabella) definito come:

case class TableKey(imei: String, created: Long, when: Long) 

Il risultato è un errore:

[error] /home/ubuntu/scala/test/test.scala:61: not enough arguments for method apply: (imei: String, created: Long)test.TableKey in object TableKey. [error] Unspecified value parameter created. [error] val snapshotsFiltered = sc.parallelize(startDate to endDate).map(TableKey(_2)).joinWithCassandraTable("listener","snapshots_test_b") [error] ^ [error] one error found [error] (compile:compile) Compilation failed

Ha funzionato con un solo oggetto nella chiave di partizione come in il Documentation.

Perché c'è un problema con la chiave di partizione multipla? - ha risposto.

EDIT: ho provato ad usare il joinWithCassandraTable nella giusta forma:

val snapshotsFiltered = sc.parallelize(startDate to endDate).map(TableKey("*",_,startDate)).joinWithCassandraTable("listener","snapshots_test_c") 

Quando ho cercato di eseguire sul Spark non c'è nessun errore ma è bloccato su "[stadio 0:> (0 + 2)/2] "per sempre ...

Cosa non va?

risposta

5

L'errore sta dicendo che la classe TableKey richiede l'inizializzazione di 3 componenti eppure è stato passato un solo argomento. Questo è un errore di compilazione Scala e non è correlato a C * o Spark.

val snapshotsFiltered = sc.parallelize(startDate to endDate) 
    .map(TableKey(_2)) /// Table Key does not have a single element constructor so this will fail 
    .joinWithCassandraTable("listener","snapshots_test_b") 

In generale, comunque, C * utilizza l'intero partition key non determinare se una particolare vite fila. Per questo motivo è possibile estrarre i dati in modo efficiente solo se si conosce l'intero partition key in modo tale che solo una parte di esso non ha valore.

Il joinWithCassandraTable richiede i valori completi partition key in modo che sia possibile farlo funzionare. se si dispone solo di una parte di parition key, verrà richiesto di eseguire una scansione completa della tabella e utilizzare Spark per filtrare.

Se si desidera solo filtro basato su un clustering column è possibile farlo premendo verso il basso una clausola where a C * come

sc.cassandraTable("ks","test").where("clustering_key > someValue")