2015-02-05 15 views
6

Attualmente sto cercando di addestrare una serie di vettori Word2Vec su UMBS Webbase Corpus (circa 30 GB di testo in 400 file).Modalità di unione efficiente della memoria una sequenza di RDD dai file in Apache Spark

Spesso mi imbatto in situazioni di memoria anche su 100 GB più macchine. Eseguo Spark nell'applicazione stessa. Ho provato a modificare un po ', ma non sono in grado di eseguire questa operazione su più di 10 GB di dati testuali. Il collo di bottiglia chiaro della mia implementazione è l'unione degli RDD precedentemente calcolati, da cui proviene l'eccezione di memoria esaurita.

Forse uno avete l'esperienza a venire con una memoria implementazione più efficiente di questo:

object SparkJobs { 
    val conf = new SparkConf() 
    .setAppName("TestApp") 
    .setMaster("local[*]") 
    .set("spark.executor.memory", "100g") 
    .set("spark.rdd.compress", "true") 

    val sc = new SparkContext(conf) 


    def trainBasedOnWebBaseFiles(path: String): Unit = { 
    val folder: File = new File(path) 

    val files: ParSeq[File] = folder.listFiles(new TxtFileFilter).toIndexedSeq.par 


    var i = 0; 
    val props = new Properties(); 
    props.setProperty("annotators", "tokenize, ssplit"); 
    props.setProperty("nthreads","2") 
    val pipeline = new StanfordCoreNLP(props); 

    //preprocess files parallel 
    val training_data_raw: ParSeq[RDD[Seq[String]]] = files.map(file => { 
     //preprocess line of file 
     println(file.getName() +"-" + file.getTotalSpace()) 
     val rdd_lines: Iterator[Option[Seq[String]]] = for (line <- Source.fromFile(file,"utf-8").getLines) yield { 
      //performs some preprocessing like tokenization, stop word filtering etc. 
      processWebBaseLine(pipeline, line)  
     } 
     val filtered_rdd_lines = rdd_lines.filter(line => line.isDefined).map(line => line.get).toList 
     println(s"File $i done") 
     i = i + 1 
     sc.parallelize(filtered_rdd_lines).persist(StorageLevel.MEMORY_ONLY_SER) 

    }) 

    val rdd_file = sc.union(training_data_raw.seq) 

    val starttime = System.currentTimeMillis() 
    println("Start Training") 
    val word2vec = new Word2Vec() 

    word2vec.setVectorSize(100) 
    val model: Word2VecModel = word2vec.fit(rdd_file) 

    println("Training time: " + (System.currentTimeMillis() - starttime)) 
    ModelUtil.storeWord2VecModel(model, Config.WORD2VEC_MODEL_PATH) 
    }} 
} 
+1

30 GB di dati nei file ... creeranno sicuramente più di 100 GB di oggetti Java ... Farlo in modo che un solo file sia in memoria contemporaneamente ... elaborarlo ... quindi caricare il successivo uno. –

+1

Inoltre ... non farlo -> 'StorageLevel.MEMORY_ONLY_SER' –

+0

Ho bisogno di elaborarli in una volta perché nella fase di adattamento del modello tutti i dati devono essere presenti – dice89

risposta

1

Come Sarvesh sottolinea nei commenti, è probabilmente troppi dati per una singola macchina. Usa più macchine. Generalmente vediamo la necessità di 20 – 30 GB di memoria per lavorare con un file di 1 GB. Con questa stima (estremamente approssimativa) occorrono 600 – 800 GB di memoria per l'ingresso da 30 GB. (È possibile ottenere una stima più accurata caricando una parte dei dati.)

Come commento più generale, suggerirei di evitare l'uso di rdd.union e sc.parallelize. Utilizzare invece sc.textFile con un carattere jolly per caricare tutti i file in un singolo RDD.

0

Hai provato a ottenere i vettori word2vec da un corpus più piccolo? Ti dico che stavo eseguendo l'implementazione word2vec spark su uno molto più piccolo e ho avuto problemi perché c'è questo problema: http://mail-archives.apache.org/mod_mbox/spark-issues/201412.mbox/%[email protected]%3E

Quindi, per il mio caso di utilizzo, il problema ha reso l'implementazione della parola2vec spark un po 'inutile. Così ho usato la scintilla per massaggiare il mio corpus ma non per aver effettivamente ottenuto i vettori.

  • Come altri suggerito stare lontano dal chiamare rdd.union.
  • Inoltre penso che il numero .toList probabilmente raccoglierà ogni riga dall'RDD e la raccoglierà nel tuo Driver Machine (quello utilizzato per inviare l'attività) probabilmente è per questo che stai uscendo dalla memoria. Dovresti assolutamente evitare di trasformare l'RDD in una lista!