2016-01-27 17 views
10

Sto eseguendo Spark Streaming con due finestre diverse (sulla finestra per addestrare un modello con SKLearn e l'altro per prevedere i valori basati su quel modello) e mi chiedo come evitare una finestra (la finestra di allenamento "lenta") per addestrare un modello, senza "bloccare" la finestra di previsione "veloce".
Il mio codice semplificato appare come segue:Come evitare che una finestra di Spark Streaming blocchi un'altra finestra con entrambi eseguendo del codice Python nativo

conf = SparkConf() 
conf.setMaster("local[4]") 
sc = SparkContext(conf=conf) 
ssc = StreamingContext(sc, 1) 

stream = ssc.socketTextStream("localhost", 7000) 


import Custom_ModelContainer 

### Window 1 ### 
### predict data based on model computed in window 2 ### 

def predict(time, rdd): 
    try: 
     # ... rdd conversion to df, feature extraction etc... 

     # regular python code 
     X = np.array(df.map(lambda lp: lp.features.toArray()).collect()) 
     pred = Custom_ModelContainer.getmodel().predict(X) 

     # send prediction to GUI 

    except Exception, e: print e 

predictionStream = stream.window(60,60) 
predictionStream.foreachRDD(predict) 


### Window 2 ### 
### fit new model ### 

def trainModel(time, rdd): 
try: 
    # ... rdd conversion to df, feature extraction etc... 

    X = np.array(df.map(lambda lp: lp.features.toArray()).collect()) 
    y = np.array(df.map(lambda lp: lp.label).collect()) 

    # train test split etc... 

    model = SVR().fit(X_train, y_train) 
    Custom_ModelContainer.setModel(model) 

except Exception, e: print e 

modelTrainingStream = stream.window(600,600) 
modelTrainingStream.foreachRDD(trainModel) 

(Nota: il Custom_ModelContainer è una classe che ho scritto per salvare e recuperare il modello addestrato)

La mia configurazione in generale funziona bene, con l'eccezione che ogni volta un nuovo modello viene addestrato nella seconda finestra (che impiega circa un minuto), le prime finestre non calcolano le previsioni finché l'allenamento del modello non è terminato. In realtà, immagino che questo abbia senso, dal momento che l'adattamento del modello e le previsioni sono entrambi calcolati sul nodo master (in un'impostazione non distribuita - a causa di SKLearn).

Quindi la mia domanda è la seguente: sarebbe possibile addestrare il modello su un singolo nodo di lavoro (invece del nodo principale)? In tal caso, come potrei conseguire quest'ultimo e risolverebbe effettivamente il mio problema?

In caso negativo, qualsiasi altro suggerimento su come posso eseguire una tale configurazione senza ritardare i calcoli nella finestra 1?

Qualsiasi aiuto è molto apprezzato.

MODIFICA: Credo che la domanda più generale sarebbe: Come posso eseguire due attività diverse su due diversi lavoratori in parallelo?

risposta

2

Disclaimer: Questa è solo una serie di idee. Nessuno di questi è stato testato in pratica.


Un paio di cose che si possono provare:

  1. Non collect a predict. scikit-learn modelli sono tipicamente serializzabili così processo predizione possono essere facilmente manipolati nel cluster:

    def predict(time, rdd): 
        ... 
    
        model = Custom_ModelContainer.getmodel() 
        pred = (df.rdd.map(lambda lp: lp.features.toArray()) 
         .mapPartitions(lambda iter: model.predict(np.array(list(iter))))) 
        ... 
    

    Dovrebbe non solo parallelizzare previsioni ma anche, se i dati grezzi non viene passato alla GUI, ridurre quantità di dati che devono essere raccolti .

  2. Provare a collect e inviare dati in modo asincrono. PySpark non fornisce collectAsync metodo, ma si può provare a realizzare qualcosa di simile con concurrent.futures:

    from pyspark.rdd import RDD 
    from concurrent.futures import ThreadPoolExecutor 
    
    executor = ThreadPoolExecutor(max_workers=4) 
    
    def submit_to_gui(*args): ... 
    
    def submit_if_success(f): 
        if not f.exception(): 
         executor.submit(submit_to_gui, f.result()) 
    

    continuano dal 1.

    def predict(time, rdd): 
        ... 
        f = executor.submit(RDD.collect, pred) 
        f.add_done_callback(submit_if_success) 
        ... 
    
  3. Se davvero si vuole utilizzare locale scikit-learn modello di cercare di collect e fit ricorrendo a future come sopra. Si può anche cercare di raccogliere solo una volta, soprattutto se i dati non vengono memorizzati nella cache: processo di addestramento

    def collect_and_train(df): 
        y, X = zip(*((p.label, p.features.toArray()) for p in df.collect())) 
        ... 
        return SVR().fit(X_train, y_train) 
    
    def set_if_success(f): 
        if not f.exception(): 
         Custom_ModelContainer.setModel(f.result()) 
    
    def trainModel(time, rdd): 
        ... 
        f = excutor.submit(collect_and_train, df) 
        f.add_done_callback(set_if_success) 
        ... 
    
  4. Spostare al cluster sia utilizzando soluzioni già esistenti come spark-sklearn o un approccio personalizzato:

    • soluzione ingenua - preparare i dati, coalesce(1) e formare un modello singolo utilizzando mapPartitions.
    • soluzione distribuita: creare e convalidare un modello separato per partizione utilizzando mapPartitions, raccogliere modelli e utilizzarli come un insieme, ad esempio, prendendo una previsione media o mediana.
  5. Gettare via scikit-learn e utilizzare un modello che può essere addestrato e mantenuto in un ambiente distribuito, in streaming (ad esempio StreamingLinearRegressionWithSGD).

    Il tuo approccio attuale rende Spark obsoleto. Se riesci a formare il modello a livello locale, ci sono buone probabilità che tu possa eseguire tutte le altre attività molto più velocemente sulla macchina locale. Altrimenti il ​​tuo programma fallirà semplicemente su collect.

1

Penso che quello che stai cercando è la proprietà: "spark.streaming.concurrentJobs", che per impostazione predefinita 1. Aumentare questo dovrebbe consentire di eseguire più funzioni foreachRDD in parallelo.

In JobScheduler.scala:

private val numConcurrentJobs = ssc.conf.getInt("spark.streaming.concurrentJobs", 1) 

Solo un promemoria per essere anche consapevoli del filo di sicurezza sul contenitore modello personalizzato se si sta andando ad essere mutare e la lettura in parallelo. :)