2015-02-03 2 views
8

Ho un database Cassandra da cui ho analizzato i dati utilizzando SparkSQL tramite Apache Spark. Ora voglio inserire quei dati analizzati in PostgreSQL. C'è qualche modo per ottenere questo direttamente oltre all'utilizzo del driver PostgreSQL (l'ho realizzato usando postREST e Driver voglio sapere se esistono metodi come saveToCassandra())?Inserimento di dati analitici da Spark a Postgres

risposta

13

Al momento non esiste un'implementazione nativa di scrivere l'RDD su alcun DBMS. Ecco i link alle discussioni connessi nella lista degli utenti Spark: one, two

In generale, l'approccio più performante sarebbe la seguente:

  1. Convalida il numero di partizioni in RDD, non dovrebbe essere troppo basso e troppo alto 20-50 partizioni dovrebbero andare bene, se il numero è inferiore - chiamare repartition con 20 partizioni, se superiore - chiamare coalesce a 50 partizioni
  2. chiamata la mapPartition trasformazione, all'interno di esso chiamare la funzione per inserire i record per DBMS utilizzando JDBC. In questa funzione si apre la connessione al database e utilizza il comando COPY con this API, permetterebbe di eliminare la necessità di un comando separato per ogni record - in questo modo l'inserto verrà elaborato molto più veloce

Questo modo in cui si inseriscono i dati in Postgres in modo parallelo utilizzando fino a 50 connessioni parallele (dipende dalla dimensione del cluster Spark e dalla sua configurazione). L'intero approccio potrebbe essere implementato come una funzione Java/Scala che accetta l'RDD e la stringa di connessione

1

La risposta di 0x0FFF è buona. Ecco un altro punto che sarebbe utile.

Io uso foreachPartition per persistere nel negozio esterno. Questo è anche in linea con il modello di progettazione Design Patterns for using foreachRDD dato nella documentazione Spark https://spark.apache.org/docs/1.3.0/streaming-programming-guide.html#output-operations-on-dstreams

Esempio:

dstream.foreachRDD { rdd => 
    rdd.foreachPartition { partitionOfRecords => 
    // ConnectionPool is a static, lazily initialized pool of connections 
    val connection = ConnectionPool.getConnection() 
    partitionOfRecords.foreach(record => connection.send(record)) 
    ConnectionPool.returnConnection(connection) // return to the pool for future reuse 
    } 
} 
1

È possibile utilizzare Postgres copiare API per scrivere, la sua molto più veloce in questo modo. Vedere i seguenti due metodi: uno itera su RDD per riempire il buffer che può essere salvato dalla copia di API. L'unica cosa di cui ti devi occupare è creare un'istruzione corretta in formato CSV che verrà utilizzata da copy api.

def saveToDB(rdd: RDD[Iterable[EventModel]]): Unit = { 
     val sb = mutable.StringBuilder.newBuilder 
     val now = System.currentTimeMillis() 

     rdd.collect().foreach(itr => { 
      itr.foreach(_.createCSV(sb, now).append("\n")) 
     }) 

     copyIn("myTable", new StringReader(sb.toString), "statement") 
     sb.clear 
    } 


def copyIn(tableName: String, reader: java.io.Reader, columnStmt: String = "") = { 
     val conn = connectionPool.getConnection() 
     try { 
      conn.unwrap(classOf[PGConnection]).getCopyAPI.copyIn(s"COPY $tableName $columnStmt FROM STDIN WITH CSV", reader) 
     } catch { 
      case se: SQLException => logWarning(se.getMessage) 
      case t: Throwable => logWarning(t.getMessage) 
     } finally { 
      conn.close() 
     } 
    } 
+0

non sarà il buffer StringBuilder sb crescere senza limite come per il numero di record nel EventModel RDD? perché non ti manca la memoria? – nont

+0

Ho usato questo per la mia soluzione che è stata in esecuzione da mesi e non ho visto fino ad ora la memoria. Il volume di dati che ho è abbastanza consistente - 100000/sec. Inoltre, se sei preoccupato di questo, puoi sempre avere un altro controllo in base al quale chiami copyIn e cancelli il buffer. – smishra