Ho scritto un codice core.async in Clojure e quando l'ho eseguito ha consumato tutta la memoria disponibile e non è riuscito con un errore. Sembra che l'utilizzo di mapcat
in una conduttura core.async interrompa la pressione. (Che è un peccato per motivi indipendenti dalla portata di questa domanda.)Dove si verifica la perdita di memoria quando mapcat interrompe la contropressione in core.async?
Ecco po 'di codice che illustra il problema contando :x
s dentro e fuori di un mapcat
ing trasduttore:
(ns mapcat.core
(:require [clojure.core.async :as async]))
(defn test-backpressure [n length]
(let [message (repeat length :x)
input (async/chan)
transform (async/chan 1 (mapcat seq))
output (async/chan)
sent (atom 0)]
(async/pipe input transform)
(async/pipe transform output)
(async/go
(dotimes [_ n]
(async/>! input message)
(swap! sent inc))
(async/close! input))
(async/go-loop [x 0]
(when (= 0 (mod x (/ (* n length) 10)))
(println "in:" (* @sent length) "out:" x))
(when-let [_ (async/<! output)]
(recur (inc x))))))
=> (test-backpressure 1000 10)
in: 10 out: 0
in: 2680 out: 1000
in: 7410 out: 2000
in: 10000 out: 3000 ; Where are the other 7000 characters?
in: 10000 out: 4000
in: 10000 out: 5000
in: 10000 out: 6000
in: 10000 out: 7000
in: 10000 out: 8000
in: 10000 out: 9000
in: 10000 out: 10000
Le gare produttori lontano davanti al consumatore.
Sembra che io non sia la prima persona a scoprirlo. Ma la spiegazione data here non sembra coprirlo. (Anche se fornisce una soluzione alternativa.) Concettualmente, mi aspetto che il produttore sia in vantaggio, ma solo dalla lunghezza dei pochi messaggi che potrebbero essere memorizzati nei canali.
La mia domanda è, dove sono tutti gli altri messaggi? Dalla quarta riga di output 7000 :x
s non sono presenti.
Nel link che hai fornito, Alex ha detto che questo è un dilemma tra il risultato sbagliato e la violazione del limite del buffer. Chiaramente [ASYNC-124] (http://dev.clojure.org/jira/browse/ASYNC-124) preferisce una risposta corretta – Davyzhu
Quindi, per quanto riguarda la tua domanda, gli altri messaggi potrebbero essere trattenuti nel 'takers' di riferimento [qui ] (https://github.com/clojure/core.async/blob/master/src/main/clojure/clojure/core/async/impl/channels.clj#L86). Non ne siamo così sicuri, quindi aspettiamo una risposta più sicura. – Davyzhu