2016-05-26 43 views
5

Sto utilizzando una variabile di trasmissione di circa 100 MB in salamoia in termini di dimensioni, che sto approssimare con:Suggerimenti per utilizzare correttamente le variabili di trasmissione di grandi dimensioni?

>>> data = list(range(int(10*1e6))) 
>>> import cPickle as pickle 
>>> len(pickle.dumps(data)) 
98888896 

esecuzione su un cluster con 3 esecutori c3.2xlarge, e un driver m3.large, con il seguente comando lanciando la sessione interattiva:

IPYTHON=1 pyspark --executor-memory 10G --driver-memory 5G --conf spark.driver.maxResultSize=5g 

In un RDD, se insisto un riferimento a questa variabile trasmissione, esplode utilizzo della memoria. Per 100 riferimenti a una variabile da 100 MB, anche se fosse stata copiata 100 volte, mi aspetto che l'utilizzo dei dati non sia superiore a 10 GB totali (figuriamoci 30 GB su 3 nodi). Tuttavia, vedo errori di memoria quando si esegue il seguente test:

data = list(range(int(10*1e6))) 
metadata = sc.broadcast(data) 
ids = sc.parallelize(zip(range(100), range(100))) 
joined_rdd = ids.mapValues(lambda _: metadata.value) 
joined_rdd.persist() 
print('count: {}'.format(joined_rdd.count())) 

L'analisi dello stack: le discussioni precedenti

TaskSetManager: Lost task 17.3 in stage 0.0 (TID 75, 10.22.10.13): 

org.apache.spark.api.python.PythonException: Traceback (most recent call last): 
    File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main 
    process() 
    File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process 
    serializer.dump_stream(func(split_index, iterator), outfile) 
    File "/usr/lib/spark/python/pyspark/rdd.py", line 2355, in pipeline_func 
    return func(split, prev_func(split, iterator)) 
    File "/usr/lib/spark/python/pyspark/rdd.py", line 2355, in pipeline_func 
    return func(split, prev_func(split, iterator)) 
    File "/usr/lib/spark/python/pyspark/rdd.py", line 317, in func 
    return f(iterator) 
    File "/usr/lib/spark/python/pyspark/rdd.py", line 1006, in <lambda> 
    return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum() 
    File "/usr/lib/spark/python/pyspark/rdd.py", line 1006, in <genexpr> 
    return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum() 
    File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 139, in load_stream 
    yield self._read_with_length(stream) 
    File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 164, in _read_with_length 
    return self.loads(obj) 
    File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 422, in loads 
    return pickle.loads(obj) 
MemoryError 


    at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:138) 
    at org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:179) 
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:97) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) 
    at org.apache.spark.scheduler.Task.run(Task.scala:88) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
    at org.apache.spark.scheduler.Task.run(Task.scala:88) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    at java.lang.Thread.run(Thread.java:745) 

16/05/25 23:57:15 ERROR TaskSetManager: Task 17 in stage 0.0 failed 4 times; aborting job 
--------------------------------------------------------------------------- 
Py4JJavaError        Traceback (most recent call last) 
<ipython-input-1-7a262fdfa561> in <module>() 
     7 joined_rdd.persist() 
     8 print('persist called') 
----> 9 print('count: {}'.format(joined_rdd.count())) 

/usr/lib/spark/python/pyspark/rdd.py in count(self) 
    1004   3 
    1005   """ 
-> 1006   return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum() 
    1007 
    1008  def stats(self): 

/usr/lib/spark/python/pyspark/rdd.py in sum(self) 
    995   6.0 
    996   """ 
--> 997   return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add) 
    998 
    999  def count(self): 

/usr/lib/spark/python/pyspark/rdd.py in fold(self, zeroValue, op) 
    869   # zeroValue provided to each partition is unique from the one provided 
    870   # to the final reduce call 
--> 871   vals = self.mapPartitions(func).collect() 
    872   return reduce(op, vals, zeroValue) 
    873 

/usr/lib/spark/python/pyspark/rdd.py in collect(self) 
    771   """ 
    772   with SCCallSiteSync(self.context) as css: 
--> 773    port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd()) 
    774   return list(_load_from_socket(port, self._jrdd_deserializer)) 
    775 

/usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in __call__(self, *args) 

    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) 
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) 
    at py4j.Gateway.invoke(Gateway.java:259) 
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) 
    at py4j.commands.CallCommand.execute(CallCommand.java:79) 
    at py4j.GatewayConnection.run(GatewayConnection.java:207) 
    at java.lang.Thread.run(Thread.java:745) 

che ho visto circa l'utilizzo della memoria di deserializzazione salamoia essere un problema. Tuttavia, mi aspetto che una variabile di trasmissione venga deserializzata (e caricata in memoria su un executor) solo una volta e i successivi riferimenti a .value per fare riferimento a tale indirizzo in memoria. Ciò non sembra essere il caso, comunque. Mi sto perdendo qualcosa?

Gli esempi che ho visto con le variabili di trasmissione li hanno come dizionari, usati una volta per trasformare un insieme di dati (cioè sostituire gli acronimi aeroportuali con i nomi degli aeroporti). La motivazione alla base della loro persistenza è quella di creare oggetti con la conoscenza di una variabile di trasmissione e di come interagire con essa, perseguire quegli oggetti ed eseguire più calcoli che li utilizzano (con la scintilla avendo cura di tenerli in memoria).

Quali sono alcuni suggerimenti per l'utilizzo di grandi variabili di trasmissione (100 MB +)? Sta persistendo una variabile di trasmissione fuorviata? Si tratta di un problema che è forse specifico per PySpark?

Grazie! Il tuo aiuto è apprezzato.

nota, ho anche postato questa domanda sul databricks forums

Edit - domanda followup:

è stato suggerito che il serializzatore predefinito Spark ha una dimensione del lotto di 65337. oggetti serializzati in diversi i lotti non sono identificati come identici e vengono assegnati indirizzi di memoria diversi, qui esaminati tramite la funzione integrata id. Tuttavia, anche con una variabile di trasmissione più ampia che in teoria richiederebbe la serializzazione di 256 lotti, vedo ancora solo 2 copie distinte. Non dovrei vedere molti di più? La mia comprensione di come la serializzazione batch funziona in modo errato?

>>> sc.serializer.bestSize 
65536 
>>> import cPickle as pickle 
>>> broadcast_data = {k: v for (k, v) in enumerate(range(int(1e6)))} 
>>> len(pickle.dumps(broadcast_data)) 
16777786 
>>> len(pickle.dumps({k: v for (k, v) in enumerate(range(int(1e6)))}))/sc.serializer.bestSize 
256 
>>> bd = sc.broadcast(broadcast_data) 
>>> rdd = sc.parallelize(range(100), 1).map(lambda _: bd.value) 
>>> rdd.map(id).distinct().count() 
1 
>>> rdd.cache().count() 
100 
>>> rdd.map(id).distinct().count() 
2 

risposta

5

Bene, il diavolo è nei dettagli. Per capire il motivo per cui ciò potrebbe accadere dovremo dare un'occhiata più da vicino ai serializzatori PySpark.In primo luogo permette di creare SparkContext con le impostazioni predefinite:

from pyspark import SparkContext 

sc = SparkContext("local", "foo") 

e verificare che cosa è un serializzatore predefinito:

sc.serializer 
## AutoBatchedSerializer(PickleSerializer()) 

sc.serializer.bestSize 
## 65536 

Ci dice tre cose diverse:

  • questo è AutoBatchedSerializer serializzatore
  • utilizza PickleSerializer per eseguire il lavoro effettivo
  • bestSize del serializzato dosato è 65536 byte

una rapida occhiata at the source code vi mostrerà che questo serializzare regola numero di record serializzato al momento del runtime e cerca di mantenere la dimensione del lotto meno di 10 * bestSize. Il punto importante è che non tutti i record nella singola partizione vengono serializzati nello stesso momento.

possiamo verificare che sperimentalmente come segue:

from operator import add 

bd = sc.broadcast({}) 

rdd = sc.parallelize(range(10), 1).map(lambda _: bd.value) 
rdd.map(id).distinct().count() 
## 1 

rdd.cache().count() 
## 10 

rdd.map(id).distinct().count() 
## 2 

Come si può vedere anche in questo semplice esempio, dopo la serializzazione-deserializzazione otteniamo due oggetti distinti. È possibile osservare il comportamento simile lavorando direttamente con pickle:

v = {} 
vs = [v, v, v, v] 

v1, *_, v4 = pickle.loads(pickle.dumps(vs)) 
v1 is v4 
## True 

(v1_, v2_), (v3_, v4_) = (
    pickle.loads(pickle.dumps(vs[:2])), 
    pickle.loads(pickle.dumps(vs[2:])) 
) 

v1_ is v4_ 
## False 

v3_ is v4_ 
## True 

valori serializzati nello stesso riferimento batch, dopo deserializzazione, lo stesso oggetto. I valori di lotti diversi indicano oggetti diversi.

In pratica Spark ha multiple serializzazioni e diverse strategie di serializzazione. È possibile ad esempio utilizzare lotti di dimensione infinita:

from pyspark.serializers import BatchedSerializer, PickleSerializer 

rdd_ = (sc.parallelize(range(10), 1).map(lambda _: bd.value) 
    ._reserialize(BatchedSerializer(PickleSerializer()))) 
rdd_.cache().count() 

rdd_.map(id).distinct().count() 
## 1 

È possibile modificare serializzatore passando serializer e/o batchSize parametri SparkContext costruttore:

sc = SparkContext(
    "local", "bar", 
    serializer=PickleSerializer(), # Default serializer 
    # Unlimited batch size -> BatchedSerializer instead of AutoBatchedSerializer 
    batchSize=-1 
) 

sc.serializer 
## BatchedSerializer(PickleSerializer(), -1) 

Scegliere diverse serializzatori e dosaggio di strategie risultati in commercio diversi -offs (velocità, capacità di serializzare oggetti arbitrari, requisiti di memoria, ecc.).

Si dovrebbe anche ricordare che le variabili di trasmissione in Spark non sono condivise tra i thread di executor in modo che sullo stesso worker possano esistere più copie deserializzate allo stesso tempo.

Inoltre si vedrà un comportamento simile a questo se si esegue una trasformazione che richiede mescolamento.

+0

Ti dispiace commentare maggiormente i trade-off tra le strategie di serializzazione? Con una maggiore dimensione del batch, dovremmo aspettarci più memoria necessaria per la serializzazione? In che modo influenzerà la velocità della serializzazione? Perché scegliere un serializzatore che non può serializzare oggetti arbitrari? – captaincapsaicin

+0

Bene, la partizione completa potrebbe non adattarsi alla memoria, quindi se il batch è infinito non c'è garanzia che riuscirà. Questo è per i principianti. Un maggiore utilizzo della memoria può portare a problemi di GC diversi. Per quanto riguarda le tue ultime domande praticamente nessun serializzatore può elaborare un oggetto arbitrario, specialmente se questo si interfaccia con un codice nativo. Ci sono alcuni costrutti linguistici che non possono essere serializzati di default in base alla progettazione (come le espressioni lambda) e richiedono strumenti specializzati. D'altra parte la serializzazione di chiusure complesse può essere lenta. – zero323

+0

Ho anche modificato una domanda successiva alla mia domanda originale. Puoi dare un'occhiata e chiarire in che modo la dimensione del batch è correlata al numero di oggetti distinti che vediamo nella memoria Spark? – captaincapsaicin