Attualmente sto cercando di addestrare una serie di vettori Word2Vec su UMBS Webbase Corpus (circa 30 GB di testo in 400 file).Modalità di unione efficiente della memoria una sequenza di RDD dai file in Apache Spark
Spesso mi imbatto in situazioni di memoria anche su 100 GB più macchine. Eseguo Spark nell'applicazione stessa. Ho provato a modificare un po ', ma non sono in grado di eseguire questa operazione su più di 10 GB di dati testuali. Il collo di bottiglia chiaro della mia implementazione è l'unione degli RDD precedentemente calcolati, da cui proviene l'eccezione di memoria esaurita.
Forse uno avete l'esperienza a venire con una memoria implementazione più efficiente di questo:
object SparkJobs {
val conf = new SparkConf()
.setAppName("TestApp")
.setMaster("local[*]")
.set("spark.executor.memory", "100g")
.set("spark.rdd.compress", "true")
val sc = new SparkContext(conf)
def trainBasedOnWebBaseFiles(path: String): Unit = {
val folder: File = new File(path)
val files: ParSeq[File] = folder.listFiles(new TxtFileFilter).toIndexedSeq.par
var i = 0;
val props = new Properties();
props.setProperty("annotators", "tokenize, ssplit");
props.setProperty("nthreads","2")
val pipeline = new StanfordCoreNLP(props);
//preprocess files parallel
val training_data_raw: ParSeq[RDD[Seq[String]]] = files.map(file => {
//preprocess line of file
println(file.getName() +"-" + file.getTotalSpace())
val rdd_lines: Iterator[Option[Seq[String]]] = for (line <- Source.fromFile(file,"utf-8").getLines) yield {
//performs some preprocessing like tokenization, stop word filtering etc.
processWebBaseLine(pipeline, line)
}
val filtered_rdd_lines = rdd_lines.filter(line => line.isDefined).map(line => line.get).toList
println(s"File $i done")
i = i + 1
sc.parallelize(filtered_rdd_lines).persist(StorageLevel.MEMORY_ONLY_SER)
})
val rdd_file = sc.union(training_data_raw.seq)
val starttime = System.currentTimeMillis()
println("Start Training")
val word2vec = new Word2Vec()
word2vec.setVectorSize(100)
val model: Word2VecModel = word2vec.fit(rdd_file)
println("Training time: " + (System.currentTimeMillis() - starttime))
ModelUtil.storeWord2VecModel(model, Config.WORD2VEC_MODEL_PATH)
}}
}
30 GB di dati nei file ... creeranno sicuramente più di 100 GB di oggetti Java ... Farlo in modo che un solo file sia in memoria contemporaneamente ... elaborarlo ... quindi caricare il successivo uno. –
Inoltre ... non farlo -> 'StorageLevel.MEMORY_ONLY_SER' –
Ho bisogno di elaborarli in una volta perché nella fase di adattamento del modello tutti i dati devono essere presenti – dice89