2016-06-21 48 views
9

Ho scritto un codice core.async in Clojure e quando l'ho eseguito ha consumato tutta la memoria disponibile e non è riuscito con un errore. Sembra che l'utilizzo di mapcat in una conduttura core.async interrompa la pressione. (Che è un peccato per motivi indipendenti dalla portata di questa domanda.)Dove si verifica la perdita di memoria quando mapcat interrompe la contropressione in core.async?

Ecco po 'di codice che illustra il problema contando :x s dentro e fuori di un mapcat ing trasduttore:

(ns mapcat.core 
    (:require [clojure.core.async :as async])) 

(defn test-backpressure [n length] 
    (let [message (repeat length :x) 
     input (async/chan) 
     transform (async/chan 1 (mapcat seq)) 
     output (async/chan) 
     sent (atom 0)] 
    (async/pipe input transform) 
    (async/pipe transform output) 
    (async/go 
     (dotimes [_ n] 
     (async/>! input message) 
     (swap! sent inc)) 
     (async/close! input)) 
    (async/go-loop [x 0] 
     (when (= 0 (mod x (/ (* n length) 10))) 
     (println "in:" (* @sent length) "out:" x)) 
     (when-let [_ (async/<! output)] 
     (recur (inc x)))))) 

=> (test-backpressure 1000 10) 
in: 10 out: 0 
in: 2680 out: 1000 
in: 7410 out: 2000 
in: 10000 out: 3000 ; Where are the other 7000 characters? 
in: 10000 out: 4000 
in: 10000 out: 5000 
in: 10000 out: 6000 
in: 10000 out: 7000 
in: 10000 out: 8000 
in: 10000 out: 9000 
in: 10000 out: 10000 

Le gare produttori lontano davanti al consumatore.

Sembra che io non sia la prima persona a scoprirlo. Ma la spiegazione data here non sembra coprirlo. (Anche se fornisce una soluzione alternativa.) Concettualmente, mi aspetto che il produttore sia in vantaggio, ma solo dalla lunghezza dei pochi messaggi che potrebbero essere memorizzati nei canali.

La mia domanda è, dove sono tutti gli altri messaggi? Dalla quarta riga di output 7000 :x s non sono presenti.

+0

Nel link che hai fornito, Alex ha detto che questo è un dilemma tra il risultato sbagliato e la violazione del limite del buffer. Chiaramente [ASYNC-124] (http://dev.clojure.org/jira/browse/ASYNC-124) preferisce una risposta corretta – Davyzhu

+0

Quindi, per quanto riguarda la tua domanda, gli altri messaggi potrebbero essere trattenuti nel 'takers' di riferimento [qui ] (https://github.com/clojure/core.async/blob/master/src/main/clojure/clojure/core/async/impl/channels.clj#L86). Non ne siamo così sicuri, quindi aspettiamo una risposta più sicura. – Davyzhu

risposta

2

Esistono due possibili interpretazioni della domanda "Dov'è la perdita di memoria?"

In primo luogo, dove vengono conservati i dati? La risposta sembra essere nel buffer di canale immediatamente a valle della trasformazione in espansione.

I canali di default utilizzano uno FixedBuffer (clojure.core.async.impl.buffers/FixedBuffer) che può dire se è pieno ma non si oppone a essere troppo pieno.

In secondo luogo, quale passaggio di codice causa il sovraccarico del buffer? Questo (correggimi se ho torto) sembra essere in the take! method di ManyToManyChannel (clojure.core.async.impl.channels/ManyToManyChannel) dove lo first call to add! sul buffer si verifica prima che qualsiasi calls to full? abbia avuto luogo.

Sembra che take! presuppone che è possibile aggiungere almeno un elemento al buffer per ogni elemento che rimuove. Nel caso di trasduttori espandenti di lunga durata come mapcat questo non è sempre un presupposto sicuro.

Modificando this line a (when (and (.hasNext iter) (not (impl/full? buf))) in una copia locale di core.async, posso fare in modo che il codice nella domanda si comporti come previsto. (NB: La mia comprensione di core.async non è sufficiente per me per garantire che questa è una soluzione robusta per vostro caso uso.)

UPDATE 2016/09/17: ora c'è un problema per questo: http://dev.clojure.org/jira/browse/ASYNC-178