Ho un database Cassandra da cui ho analizzato i dati utilizzando SparkSQL tramite Apache Spark. Ora voglio inserire quei dati analizzati in PostgreSQL. C'è qualche modo per ottenere questo direttamente oltre all'utilizzo del driver PostgreSQL (l'ho realizzato usando postREST e Driver voglio sapere se esistono metodi come saveToCassandra()
)?Inserimento di dati analitici da Spark a Postgres
risposta
Al momento non esiste un'implementazione nativa di scrivere l'RDD su alcun DBMS. Ecco i link alle discussioni connessi nella lista degli utenti Spark: one, two
In generale, l'approccio più performante sarebbe la seguente:
- Convalida il numero di partizioni in RDD, non dovrebbe essere troppo basso e troppo alto 20-50 partizioni dovrebbero andare bene, se il numero è inferiore - chiamare
repartition
con 20 partizioni, se superiore - chiamarecoalesce
a 50 partizioni - chiamata la
mapPartition
trasformazione, all'interno di esso chiamare la funzione per inserire i record per DBMS utilizzando JDBC. In questa funzione si apre la connessione al database e utilizza il comando COPY con this API, permetterebbe di eliminare la necessità di un comando separato per ogni record - in questo modo l'inserto verrà elaborato molto più veloce
Questo modo in cui si inseriscono i dati in Postgres in modo parallelo utilizzando fino a 50 connessioni parallele (dipende dalla dimensione del cluster Spark e dalla sua configurazione). L'intero approccio potrebbe essere implementato come una funzione Java/Scala che accetta l'RDD e la stringa di connessione
La risposta di 0x0FFF è buona. Ecco un altro punto che sarebbe utile.
Io uso foreachPartition
per persistere nel negozio esterno. Questo è anche in linea con il modello di progettazione Design Patterns for using foreachRDD
dato nella documentazione Spark https://spark.apache.org/docs/1.3.0/streaming-programming-guide.html#output-operations-on-dstreams
Esempio:
dstream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
// ConnectionPool is a static, lazily initialized pool of connections
val connection = ConnectionPool.getConnection()
partitionOfRecords.foreach(record => connection.send(record))
ConnectionPool.returnConnection(connection) // return to the pool for future reuse
}
}
È possibile utilizzare Postgres copiare API per scrivere, la sua molto più veloce in questo modo. Vedere i seguenti due metodi: uno itera su RDD per riempire il buffer che può essere salvato dalla copia di API. L'unica cosa di cui ti devi occupare è creare un'istruzione corretta in formato CSV che verrà utilizzata da copy api.
def saveToDB(rdd: RDD[Iterable[EventModel]]): Unit = {
val sb = mutable.StringBuilder.newBuilder
val now = System.currentTimeMillis()
rdd.collect().foreach(itr => {
itr.foreach(_.createCSV(sb, now).append("\n"))
})
copyIn("myTable", new StringReader(sb.toString), "statement")
sb.clear
}
def copyIn(tableName: String, reader: java.io.Reader, columnStmt: String = "") = {
val conn = connectionPool.getConnection()
try {
conn.unwrap(classOf[PGConnection]).getCopyAPI.copyIn(s"COPY $tableName $columnStmt FROM STDIN WITH CSV", reader)
} catch {
case se: SQLException => logWarning(se.getMessage)
case t: Throwable => logWarning(t.getMessage)
} finally {
conn.close()
}
}
non sarà il buffer StringBuilder sb crescere senza limite come per il numero di record nel EventModel RDD? perché non ti manca la memoria? – nont
Ho usato questo per la mia soluzione che è stata in esecuzione da mesi e non ho visto fino ad ora la memoria. Il volume di dati che ho è abbastanza consistente - 100000/sec. Inoltre, se sei preoccupato di questo, puoi sempre avere un altro controllo in base al quale chiami copyIn e cancelli il buffer. – smishra