Ho una connessione JDBC con Apache Spark e PostgreSQL e voglio inserire alcuni dati nel mio database. Quando utilizzo la modalità append
, devo specificare id
per ogni DataFrame.Row
. C'è un modo per Spark di creare chiavi primarie?Chiavi primarie con Apache Spark
risposta
Scala:
Se tutto ciò che serve è numeri univoci è possibile utilizzare zipWithUniqueId
e ricreare dataframe. In primo luogo alcune importazioni e dati fittizi:
import sqlContext.implicits._
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructType, StructField, LongType}
val df = sc.parallelize(Seq(
("a", -1.0), ("b", -2.0), ("c", -3.0))).toDF("foo", "bar")
schema estratto per un ulteriore utilizzo:
val schema = df.schema
campo id Aggiungi:
val rows = df.rdd.zipWithUniqueId.map{
case (r: Row, id: Long) => Row.fromSeq(id +: r.toSeq)}
Crea dataframe:
val dfWithPK = sqlContext.createDataFrame(
rows, StructType(StructField("id", LongType, false) +: schema.fields))
Lo stesso cosa in Python:
from pyspark.sql import Row
from pyspark.sql.types import StructField, StructType, LongType
row = Row("foo", "bar")
row_with_index = Row(*["id"] + df.columns)
df = sc.parallelize([row("a", -1.0), row("b", -2.0), row("c", -3.0)]).toDF()
def make_row(columns):
def _make_row(row, uid):
row_dict = row.asDict()
return row_with_index(*[uid] + [row_dict.get(c) for c in columns])
return _make_row
f = make_row(df.columns)
df_with_pk = (df.rdd
.zipWithUniqueId()
.map(lambda x: f(*x))
.toDF(StructType([StructField("id", LongType(), False)] + df.schema.fields)))
Se si preferisce il tuo numero progressivo può sostituire zipWithUniqueId
con zipWithIndex
ma è un po 'più costoso.
Direttamente con DataFrame
API:
(universale Scala, Python, Java, R con più o meno la stessa sintassi)
In precedenza ho perso monotonicallyIncreasingId
funzione che dovrebbe funzionare bene come fino a quando non si richiedono numeri consecutivi:
import org.apache.spark.sql.functions.monotonicallyIncreasingId
df.withColumn("id", monotonicallyIncreasingId).show()
// +---+----+-----------+
// |foo| bar| id|
// +---+----+-----------+
// | a|-1.0|17179869184|
// | b|-2.0|42949672960|
// | c|-3.0|60129542144|
// +---+----+-----------+
Mentre utile monotonicallyIncreasingId
non è deterministico. Non solo gli ID possono essere diversi dall'esecuzione all'esecuzione, ma senza trucchi aggiuntivi non possono essere utilizzati per identificare le righe quando le operazioni successive contengono filtri.
Nota:
E 'anche possibile utilizzare rowNumber
funzione finestra:
from pyspark.sql.window import Window
from pyspark.sql.functions import rowNumber
w = Window().orderBy()
df.withColumn("id", rowNumber().over(w)).show()
Sfortunatamente:
WARN Window: No partizione definita per il funzionamento della finestra! Spostando tutti i dati su una singola partizione, ciò può causare un serio peggioramento delle prestazioni.
Quindi, a meno che non si abbia un modo naturale di partizionare i dati e garantire l'unicità non è particolarmente utile in questo momento.
funzioneranno solo con R? so che hai usato scala sopra, ma tutto quello che posso trovare su questo 'zipWithUniqueId' è solo nei documenti SparkR – Nhor
In realtà è Scala. Hai bisogno di una soluzione Python? SQL semplice? – zero323
no no, posso capire il tuo codice, stavo solo chiedendo se c'è qualcosa nei documenti di pyspark su 'zipWithUniqueId', ma sembra che fossi solo pigro, perché alla fine l'ho trovato, grazie mille per la tua soluzione! – Nhor
from pyspark.sql.functions import monotonically_increasing_id
df.withColumn("id", monotonically_increasing_id()).show()
Nota che il secondo argomento della df.withColumn è monotonically_increasing_id() non monotonically_increasing_id.
Ho trovato la seguente soluzione relativamente semplice per il caso in cui zipWithIndex() è il comportamento desiderato, vale a dire per quelli che desiderano numeri interi consecutivi.
In questo caso, utilizziamo pyspark e ci basiamo sulla comprensione del dizionario per mappare l'oggetto riga originale in un nuovo dizionario che si adatta a un nuovo schema incluso l'indice univoco.
# read the initial dataframe without index
dfNoIndex = sqlContext.read.parquet(dataframePath)
# Need to zip together with a unique integer
# First create a new schema with uuid field appended
newSchema = StructType([StructField("uuid", IntegerType(), False)]
+ dfNoIndex.schema.fields)
# zip with the index, map it to a dictionary which includes new field
df = dfNoIndex.rdd.zipWithIndex()\
.map(lambda (row, id): {k:v
for k, v
in row.asDict().items() + [("uuid", id)]})\
.toDF(newSchema)
Avete esigenze particolari? Tipo di dati, valori consecutivi, qualcos'altro? – zero323
no, solo i vecchi interi unici validi – Nhor