2015-10-10 20 views
8

Ho un'app di elaborazione clojure che è una pipeline di canali. Ogni fase di elaborazione esegue i suoi calcoli in modo asincrono (ad es. Fa una richiesta http usando http-kit o qualcosa del genere) e la mette sul canale di uscita. In questo modo il prossimo passo può leggere da quel canale e fare il suo calcolo.Come arrestare al meglio una pipeline di processi core.async di clojure

mia funzione principale è simile al seguente

(defn -main [args] 
(-> file/tmp-dir 
    (schedule/scheduler) 
    (search/searcher) 
    (process/resultprocessor) 
    (buy/buyer) 
    (report/reporter))) 

Attualmente, il passo di pianificazione spinge la pipeline (non ha un canale di ingresso), e fornisce la catena con carico di lavoro.

Quando ho eseguito questo nel REPL:

(-main "some args") 

Si corre essenzialmente per sempre a causa della infinità dello scheduler. Qual è il modo migliore per cambiare questa architettura in modo tale da poter spegnere l'intero sistema dal REPL? La chiusura di ciascun canale significa che il sistema termina?

Potrebbe essere utile un canale di trasmissione?

+0

'(Sistema/uscita 0)'? – Bill

+0

Anche questo uccide la REPL, purtroppo. Proverò l'approccio Component –

risposta

6

Si potrebbe avere il programmatore delle alts!/alts!! su un canale kill e il canale di ingresso del gasdotto:

(def kill-channel (async/chan)) 

(defn scheduler [input output-ch kill-ch] 
    (loop [] 
    (let [[v p] (async/alts!! [kill-ch [out-ch (preprocess input)]] 
        :priority true)] 
     (if-not (= p kill-ch) 
     (recur)))) 

Mettere un valore su kill-channel poi terminare il ciclo.

Tecnicamente è anche possibile utilizzare output-ch per controllare il processo (passa ai canali chiusi restituire false), ma normalmente trovo i pulitori di uccisione espliciti, almeno per le pipeline di livello superiore.

Per rendere le cose simultaneamente più eleganti e più comode da usare (sia REPL che in produzione), è possibile utilizzare Stuart Sierra's component, avviare il ciclo di pianificazione (su un thread separato) e assoc il canale di eliminazione sul componente in il metodo start del componente e quindi close! il canale kill (e quindi terminano il ciclo) nel metodo stop del componente.

4

Suggerirei di utilizzare qualcosa come https://github.com/stuartsierra/component per gestire l'installazione del sistema. Garantisce che è possibile avviare e arrestare facilmente il sistema in REPL. Usando questa libreria, la configureresti in modo che ogni fase di elaborazione fosse un componente e ogni componente gestisse l'installazione e la rimozione dei canali nei loro protocolli e stop. Inoltre, potresti probabilmente creare un protocollo IStream per ciascun componente da implementare e fare in modo che ciascun componente dipenda da componenti che implementano tale protocollo. Ti offre una modularità molto semplice.

Faresti finisce con un sistema che è simile al seguente:

(component/system-map 
:scheduler (schedule/new-scheduler file/tmp-dir) 
:searcher (component/using (search/searcher) 
          {:in :scheduler}) 
:processor (component/using (process/resultprocessor) 
          {:in :searcher}) 
:buyer  (component/using (buy/buyer) 
          {:in :processor}) 
:report (component/using (report/reporter) 
          {:in :buyer})) 

Una cosa bella, con questo tipo di approccio è che si potrebbe facilmente aggiungere componenti se si basano su un canale pure. Ad esempio, se ogni componente crea il proprio canale esterno utilizzando tap su un mult interno, è possibile aggiungere un logger per il processore solo da un componente di registrazione che accetta il processore come dipendenza.

:processor (component/using (process/resultprocessor) 
          {:in :searcher}) 
:processor-logger (component/using (log/logger) 
            {:in processor}) 

avrei consiglio di guardare il suo talk anche per avere un'idea di come funziona.

1

Si dovrebbe considerare l'utilizzo di Stuart Sierra's reloaded workflow, che dipende dal modellare elementi tuo 'conduttura' come components, in questo modo è possibile modellare i vostri singletons logiche come 'classi', significa che è possibile controllare la costruzione e la distruzione (start/stop) logica ognuno di loro.