Sto eseguendo Spark Streaming con due finestre diverse (sulla finestra per addestrare un modello con SKLearn e l'altro per prevedere i valori basati su quel modello) e mi chiedo come evitare una finestra (la finestra di allenamento "lenta") per addestrare un modello, senza "bloccare" la finestra di previsione "veloce".
Il mio codice semplificato appare come segue:Come evitare che una finestra di Spark Streaming blocchi un'altra finestra con entrambi eseguendo del codice Python nativo
conf = SparkConf()
conf.setMaster("local[4]")
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 1)
stream = ssc.socketTextStream("localhost", 7000)
import Custom_ModelContainer
### Window 1 ###
### predict data based on model computed in window 2 ###
def predict(time, rdd):
try:
# ... rdd conversion to df, feature extraction etc...
# regular python code
X = np.array(df.map(lambda lp: lp.features.toArray()).collect())
pred = Custom_ModelContainer.getmodel().predict(X)
# send prediction to GUI
except Exception, e: print e
predictionStream = stream.window(60,60)
predictionStream.foreachRDD(predict)
### Window 2 ###
### fit new model ###
def trainModel(time, rdd):
try:
# ... rdd conversion to df, feature extraction etc...
X = np.array(df.map(lambda lp: lp.features.toArray()).collect())
y = np.array(df.map(lambda lp: lp.label).collect())
# train test split etc...
model = SVR().fit(X_train, y_train)
Custom_ModelContainer.setModel(model)
except Exception, e: print e
modelTrainingStream = stream.window(600,600)
modelTrainingStream.foreachRDD(trainModel)
(Nota: il Custom_ModelContainer è una classe che ho scritto per salvare e recuperare il modello addestrato)
La mia configurazione in generale funziona bene, con l'eccezione che ogni volta un nuovo modello viene addestrato nella seconda finestra (che impiega circa un minuto), le prime finestre non calcolano le previsioni finché l'allenamento del modello non è terminato. In realtà, immagino che questo abbia senso, dal momento che l'adattamento del modello e le previsioni sono entrambi calcolati sul nodo master (in un'impostazione non distribuita - a causa di SKLearn).
Quindi la mia domanda è la seguente: sarebbe possibile addestrare il modello su un singolo nodo di lavoro (invece del nodo principale)? In tal caso, come potrei conseguire quest'ultimo e risolverebbe effettivamente il mio problema?
In caso negativo, qualsiasi altro suggerimento su come posso eseguire una tale configurazione senza ritardare i calcoli nella finestra 1?
Qualsiasi aiuto è molto apprezzato.
MODIFICA: Credo che la domanda più generale sarebbe: Come posso eseguire due attività diverse su due diversi lavoratori in parallelo?