14

Sto cercando di utilizzare la scintilla per un semplice compito di apprendimento automatico. Ho usato pyspark e spark 1.2.0 per fare un semplice problema di regressione logistica. Ho 1,2 milioni di record per la formazione e ho cancellato le funzionalità dei record. Quando ho impostato il numero di funzioni hash come 1024, il programma funziona bene, ma quando ho impostato il numero di funzioni hash come 16384, il programma non riesce più volte con il seguente errore:Errore Spark Java: la dimensione supera Integer.MAX_VALUE

Py4JJavaError: An error occurred while calling o84.trainLogisticRegressionModelWithSGD. 
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 4.0 failed 4 times, most recent failure: Lost task 1.3 in stage 4.0 (TID 9, workernode0.sparkexperience4a7.d5.internal.cloudapp.net): java.lang.RuntimeException: java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE 
    at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:828) 
    at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:123) 
    at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132) 
    at org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:517) 
    at org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:307) 
    at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) 
    at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) 
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) 
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) 
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) 
    at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) 
    at org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57) 
    at org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:124) 
    at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:97) 
    at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:91) 
    at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:44) 
    at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) 
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) 
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) 
    at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) 
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) 
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) 
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163) 
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) 
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) 
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787) 
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130) 
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) 
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) 
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) 
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) 
    at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116) 
    at java.lang.Thread.run(Thread.java:745) 

    at org.apache.spark.network.client.TransportResponseHandler.handle(TransportResponseHandler.java:156) 
    at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:93) 
    at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:44) 
    at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) 
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) 
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) 
    at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) 
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) 
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) 
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163) 
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) 
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) 
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787) 
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130) 
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) 
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) 
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) 
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) 
    at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116) 
    at java.lang.Thread.run(Thread.java:745) 

Driver stacktrace: 
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) 
    at scala.Option.foreach(Option.scala:236) 
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420) 
    at akka.actor.Actor$class.aroundReceive(Actor.scala:465) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1375) 
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) 
    at akka.actor.ActorCell.invoke(ActorCell.scala:487) 
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) 
    at akka.dispatch.Mailbox.run(Mailbox.scala:220) 
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) 
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 

Questo errore si verifica quando Formazione del LogisticRegressionWithSGD dopo aver trasferito i dati in LabeledPoint.

Qualcuno ha un'idea in merito?

Il mio codice è il seguente (sto usando un notebook IPython per questo):

from pyspark.mllib.regression import LabeledPoint 
from pyspark.mllib.classification import LogisticRegressionWithSGD 
from numpy import array 
from sklearn.feature_extraction import FeatureHasher 
from pyspark import SparkContext 
sf = SparkConf().setAppName("test").set("spark.executor.memory", "50g").set("spark.cores.max", 30) 
sc = SparkContext(conf=sf) 
training_file = sc.textFile("train_small.txt") 
def hash_feature(line): 
    values = [0, dict()] 
    for index, x in enumerate(line.strip("\n").split('\t')): 
     if index == 0: 
      values[0] = float(x) 
     else: 
      values[1][str(index)+"_"+x] = 1 
    return values 
n_feature = 2**14 
hasher = FeatureHasher(n_features=n_feature) 
training_file_hashed = training_file.map(lambda line: [hash_feature(line)[0], hasher.transform([hash_feature(line)[1]])]) 
def build_lable_points(line): 
    values = [0.0] * n_feature 
    for index, value in zip(line[1].indices, line[1].data): 
     values[index] = value 
    return LabeledPoint(line[0], values) 
parsed_training_data = training_file_hashed.map(lambda line: build_lable_points(line)) 
model = LogisticRegressionWithSGD.train(parsed_training_data) 

L'errore si verifica durante l'esecuzione l'ultima riga.

+1

Puoi mostrare il tuo codice? –

+0

il codice viene aggiunto al post originale, grazie – peng

+0

Puoi provare più partizioni? (Penso che più partizioni significano meno dati per partizione, quindi dovrebbe fare il trucco). –

risposta

1

A un certo punto, tenta di memorizzare le funzionalità e 1.2M * 16384 è maggiore di Integer.MAX_INT in modo che si stia tentando di memorizzare più della dimensione massima delle funzionalità supportate da Spark.

Probabilmente stai correndo nei limiti di Apache Spark.

+1

Grazie. Puoi approfondire questo? Non ho mai sentito parlare delle dimensioni massime delle funzioni supportate dalla scintilla. So che c'è una limitazione sulla dimensione del blocco per la scintilla, vedi https://issues.apache.org/jira/browse/SPARK-1476, non sono sicuro se lo sto colpendo, ma se lo colpisco, mi chiedo come Posso evitarlo senza ridurre il numero di funzioni e il numero di record – peng

11

La limitazione Integer.MAX_INT è sulla dimensione di un file in fase di memorizzazione. Le file da 1,2 milioni non sono una grande cosa, per non essere sicuro che il tuo problema sia "i limiti della scintilla". Più probabilmente, parte del tuo lavoro sta creando qualcosa di troppo grande per essere gestito da un determinato esecutore.

Non sono un codificatore Python, ma quando "hai cancellato le caratteristiche dei record" potresti prendere un set di record molto scarso per un campione e creare un array non sparse. Ciò significa molta memoria per le funzioni 16384. In particolare, quando si esegue zip(line[1].indices, line[1].data). L'unica ragione per cui non riesci a liberarti della memoria è proprio il carico che hai configurato (50G).

Un'altra cosa che potrebbe aiutare è aumentare il partizionamento. Quindi, se non puoi fare in modo che le tue righe usino meno memoria, almeno puoi provare ad avere un minor numero di righe su una determinata attività. È probabile che qualsiasi file temporaneo creato dipenda da questo, quindi è molto improbabile che i limiti dei file vengano raggiunti.


E, totalmente estranei l'errore ma rilevanti per ciò che si sta cercando di fare:

16384 è davvero un gran numero di funzioni, nel caso ottimistico in cui ognuno è solo una funzione booleana, hai un totale di 2^16384 possibili permutazioni da cui imparare, questo è un numero enorme (provalo qui: https://defuse.ca/big-number-calculator.htm).

È MOLTO, MOLTO probabile che nessun algoritmo sarà in grado di apprendere un limite di decisione con soli 1,2 milioni di campioni, probabilmente occorrerebbero almeno alcuni trilioni di miliardi di esempi per intaccare tale spazio di funzionalità. L'apprendimento automatico ha i suoi limiti, quindi non sorprenderti se non ottieni una precisione migliore del caso.

Consiglio vivamente di provare prima una riduzione della dimensionalità !!

+1

Grazie. Questo problema viene corretto utilizzando più partizioni durante il caricamento dei dati. Stiamo solo testando su un piccolo set di dati e ottenendo qualche idea, quindi applicheremo il big data set con una macchina molto potente. – peng