2015-06-23 17 views
5

Ho un TransformManyBlock con il seguente disegno:blocco TPL Dataflow consuma tutta la memoria disponibile

  • ingresso: Percorso al file
  • uscita: IEnumerable del contenuto del file, una riga alla volta

Sto eseguendo questo blocco su un file enorme (61 GB), che è troppo grande per adattarsi alla RAM. Per evitare una crescita illimitata della memoria, ho impostato BoundedCapacity su un valore molto basso (ad es. 1) per questo blocco e su tutti i blocchi downstream. Nondimeno, il blocco apparentemente preannuncia avidamente l'IEnumerable, che consuma tutta la memoria disponibile sul computer, interrompendo ogni processo. L'OutputCount del blocco continua ad aumentare senza vincoli finché non uccido il processo.

Cosa posso fare per impedire al blocco di consumare il IEnumerable in questo modo?

EDIT: Ecco un programma di esempio che illustra il problema:

using System; 
using System.Collections.Generic; 
using System.Linq; 
using System.Threading; 
using System.Threading.Tasks; 
using System.Threading.Tasks.Dataflow; 

class Program 
{ 
    static IEnumerable<string> GetSequence(char c) 
    { 
     for (var i = 0; i < 1024 * 1024; ++i) 
      yield return new string(c, 1024 * 1024); 
    } 

    static void Main(string[] args) 
    { 
     var options = new ExecutionDataflowBlockOptions() { BoundedCapacity = 1 }; 
     var firstBlock = new TransformManyBlock<char, string>(c => GetSequence(c), options); 
     var secondBlock = new ActionBlock<string>(str => 
      { 
       Console.WriteLine(str.Substring(0, 10)); 
       Thread.Sleep(1000); 
      }, options); 

     firstBlock.LinkTo(secondBlock); 
     firstBlock.Completion.ContinueWith(task => 
      { 
       if (task.IsFaulted) ((IDataflowBlock) secondBlock).Fault(task.Exception); 
       else secondBlock.Complete(); 
      }); 

     firstBlock.Post('A'); 
     firstBlock.Complete(); 
     for (; ;) 
     { 
      Console.WriteLine("OutputCount: {0}", firstBlock.OutputCount); 
      Thread.Sleep(3000); 
     } 
    } 
} 

Se siete su una scatola a 64 bit, assicurarsi di deselezionare l'opzione "Preferisco a 32 bit" in Visual Studio. Ho 16 GB di RAM sul mio computer e questo programma consuma immediatamente ogni byte disponibile.

+0

ben TBH: non ho tempo per discutere con voi qui - buona fortuna – Carsten

+0

OK, grazie per l'input comunque. – brianberns

+0

se leggi attentamente il resto della sezione vedrai che non funziona come pensi tu - il tuo 'firstBlock' offre sempre tutto ciò che può produrre - se leghi il secondo si rifiuterà il secondo input e lo recupererà in seguito – Carsten

risposta

3

Ti sembra di fraintendere come funziona TPL Dataflow.

BoundedCapacity limita la quantità di elementi che è possibile caricare in un blocco. Nel tuo caso questo significa un singolo char nel TransformManyBlock e il singolo string nello ActionBlock.

Quindi pubblichi un singolo articolo su TransformManyBlock che restituisce le stringhe 1024*1024 e prova a passarle sullo ActionBlock che accetta solo un singolo alla volta. Il resto delle stringhe resterà semplicemente seduto nella coda di output di TransformManyBlock.

Quello che probabilmente vuole fare è creare un unico elementi di blocco e post in esso in modo di streaming da attesa (in modo sincrono o altro) quando si tratta di capacità è raggiunto:

private static void Main() 
{ 
    MainAsync().Wait(); 
} 

private static async Task MainAsync() 
{ 
    var block = new ActionBlock<string>(async item => 
    { 
     Console.WriteLine(item.Substring(0, 10)); 
     await Task.Delay(1000); 
    }, new ExecutionDataflowBlockOptions { BoundedCapacity = 1 }); 

    foreach (var item in GetSequence('A')) 
    { 
     await block.SendAsync(item); 
    } 

    block.Complete(); 
    await block.Completion; 
} 
+0

Grazie. Ho finito per creare un nuovo blocco che incapsula una sorgente ActionBlock e una destinazione BufferBlock. Il blocco azione utilizza SendAsync come suggerisci di popolare il buffer. Per il mondo esterno, si comporta come un TransformManyBlock con il comportamento che voglio. – brianberns

+0

@brianberns: Scusa se questa è una domanda stupida, ma qual è la differenza tra "attendi block.SendAsync (item)" e "block.Post (item)"? – Bugmaster

+0

@Bugmaster Non è una domanda stupida: http://stackoverflow.com/a/13605979/885318 – i3arnon