Sto utilizzando una variabile di trasmissione di circa 100 MB in salamoia in termini di dimensioni, che sto approssimare con:Suggerimenti per utilizzare correttamente le variabili di trasmissione di grandi dimensioni?
>>> data = list(range(int(10*1e6)))
>>> import cPickle as pickle
>>> len(pickle.dumps(data))
98888896
esecuzione su un cluster con 3 esecutori c3.2xlarge, e un driver m3.large, con il seguente comando lanciando la sessione interattiva:
IPYTHON=1 pyspark --executor-memory 10G --driver-memory 5G --conf spark.driver.maxResultSize=5g
In un RDD, se insisto un riferimento a questa variabile trasmissione, esplode utilizzo della memoria. Per 100 riferimenti a una variabile da 100 MB, anche se fosse stata copiata 100 volte, mi aspetto che l'utilizzo dei dati non sia superiore a 10 GB totali (figuriamoci 30 GB su 3 nodi). Tuttavia, vedo errori di memoria quando si esegue il seguente test:
data = list(range(int(10*1e6)))
metadata = sc.broadcast(data)
ids = sc.parallelize(zip(range(100), range(100)))
joined_rdd = ids.mapValues(lambda _: metadata.value)
joined_rdd.persist()
print('count: {}'.format(joined_rdd.count()))
L'analisi dello stack: le discussioni precedenti
TaskSetManager: Lost task 17.3 in stage 0.0 (TID 75, 10.22.10.13):
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main
process()
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/usr/lib/spark/python/pyspark/rdd.py", line 2355, in pipeline_func
return func(split, prev_func(split, iterator))
File "/usr/lib/spark/python/pyspark/rdd.py", line 2355, in pipeline_func
return func(split, prev_func(split, iterator))
File "/usr/lib/spark/python/pyspark/rdd.py", line 317, in func
return f(iterator)
File "/usr/lib/spark/python/pyspark/rdd.py", line 1006, in <lambda>
return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
File "/usr/lib/spark/python/pyspark/rdd.py", line 1006, in <genexpr>
return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 139, in load_stream
yield self._read_with_length(stream)
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 164, in _read_with_length
return self.loads(obj)
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 422, in loads
return pickle.loads(obj)
MemoryError
at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:138)
at org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:179)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:97)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
16/05/25 23:57:15 ERROR TaskSetManager: Task 17 in stage 0.0 failed 4 times; aborting job
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
<ipython-input-1-7a262fdfa561> in <module>()
7 joined_rdd.persist()
8 print('persist called')
----> 9 print('count: {}'.format(joined_rdd.count()))
/usr/lib/spark/python/pyspark/rdd.py in count(self)
1004 3
1005 """
-> 1006 return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
1007
1008 def stats(self):
/usr/lib/spark/python/pyspark/rdd.py in sum(self)
995 6.0
996 """
--> 997 return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add)
998
999 def count(self):
/usr/lib/spark/python/pyspark/rdd.py in fold(self, zeroValue, op)
869 # zeroValue provided to each partition is unique from the one provided
870 # to the final reduce call
--> 871 vals = self.mapPartitions(func).collect()
872 return reduce(op, vals, zeroValue)
873
/usr/lib/spark/python/pyspark/rdd.py in collect(self)
771 """
772 with SCCallSiteSync(self.context) as css:
--> 773 port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
774 return list(_load_from_socket(port, self._jrdd_deserializer))
775
/usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in __call__(self, *args)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:207)
at java.lang.Thread.run(Thread.java:745)
che ho visto circa l'utilizzo della memoria di deserializzazione salamoia essere un problema. Tuttavia, mi aspetto che una variabile di trasmissione venga deserializzata (e caricata in memoria su un executor) solo una volta e i successivi riferimenti a .value
per fare riferimento a tale indirizzo in memoria. Ciò non sembra essere il caso, comunque. Mi sto perdendo qualcosa?
Gli esempi che ho visto con le variabili di trasmissione li hanno come dizionari, usati una volta per trasformare un insieme di dati (cioè sostituire gli acronimi aeroportuali con i nomi degli aeroporti). La motivazione alla base della loro persistenza è quella di creare oggetti con la conoscenza di una variabile di trasmissione e di come interagire con essa, perseguire quegli oggetti ed eseguire più calcoli che li utilizzano (con la scintilla avendo cura di tenerli in memoria).
Quali sono alcuni suggerimenti per l'utilizzo di grandi variabili di trasmissione (100 MB +)? Sta persistendo una variabile di trasmissione fuorviata? Si tratta di un problema che è forse specifico per PySpark?
Grazie! Il tuo aiuto è apprezzato.
nota, ho anche postato questa domanda sul databricks forums
Edit - domanda followup:
è stato suggerito che il serializzatore predefinito Spark ha una dimensione del lotto di 65337. oggetti serializzati in diversi i lotti non sono identificati come identici e vengono assegnati indirizzi di memoria diversi, qui esaminati tramite la funzione integrata id
. Tuttavia, anche con una variabile di trasmissione più ampia che in teoria richiederebbe la serializzazione di 256 lotti, vedo ancora solo 2 copie distinte. Non dovrei vedere molti di più? La mia comprensione di come la serializzazione batch funziona in modo errato?
>>> sc.serializer.bestSize
65536
>>> import cPickle as pickle
>>> broadcast_data = {k: v for (k, v) in enumerate(range(int(1e6)))}
>>> len(pickle.dumps(broadcast_data))
16777786
>>> len(pickle.dumps({k: v for (k, v) in enumerate(range(int(1e6)))}))/sc.serializer.bestSize
256
>>> bd = sc.broadcast(broadcast_data)
>>> rdd = sc.parallelize(range(100), 1).map(lambda _: bd.value)
>>> rdd.map(id).distinct().count()
1
>>> rdd.cache().count()
100
>>> rdd.map(id).distinct().count()
2
Ti dispiace commentare maggiormente i trade-off tra le strategie di serializzazione? Con una maggiore dimensione del batch, dovremmo aspettarci più memoria necessaria per la serializzazione? In che modo influenzerà la velocità della serializzazione? Perché scegliere un serializzatore che non può serializzare oggetti arbitrari? – captaincapsaicin
Bene, la partizione completa potrebbe non adattarsi alla memoria, quindi se il batch è infinito non c'è garanzia che riuscirà. Questo è per i principianti. Un maggiore utilizzo della memoria può portare a problemi di GC diversi. Per quanto riguarda le tue ultime domande praticamente nessun serializzatore può elaborare un oggetto arbitrario, specialmente se questo si interfaccia con un codice nativo. Ci sono alcuni costrutti linguistici che non possono essere serializzati di default in base alla progettazione (come le espressioni lambda) e richiedono strumenti specializzati. D'altra parte la serializzazione di chiusure complesse può essere lenta. – zero323
Ho anche modificato una domanda successiva alla mia domanda originale. Puoi dare un'occhiata e chiarire in che modo la dimensione del batch è correlata al numero di oggetti distinti che vediamo nella memoria Spark? – captaincapsaicin