2010-10-21 4 views
6

Io uso un BlockingCollection per implementare un modello produttore-consumatore in C# 4.0.BlockingCollection in Task Parallel Library non rilascia automaticamente il riferimento delle istanze sottostanti

Il BlockingCollection contiene elementi che occupano una notevole quantità di memoria. Vorrei lasciare che il produttore estrae un elemento da BlockingCollection alla volta e lo elabora.

pensavo che utilizzando foreach su BlockingCollection.GetConsumingEnumerable(), ogni volta, il BlockingCollection rimuoverà l'elemento dalla coda sottostante (che significa tutti insieme con il riferimento) così alla fine del metodo Process() che elabora l'articolo , l'oggetto può essere raccolto dalla spazzatura.

Ma questo non è vero. Sembra che il ciclo di foreach su BlockingCollection.GetConsumingEnumerable() trattiene tutti i riferimenti degli elementi inseriti nella coda. Tutti gli articoli sono trattenuti (quindi impediti dalla raccolta dei rifiuti) fino all'uscita dal ciclo di foreach.

Invece di utilizzare il ciclo foreach semplice su BlockingCollection.GetConsumingEnumerable(), utilizzo un ciclo while per il test BlockingCollection.IsComplete flag e all'interno del loop utilizzo BlockingCollection.Take() per prelevare un articolo di consumo. Suppongo che BlockingCollection.Take() abbia un effetto simile a List.Remove(), che rimuoverà il riferimento dell'articolo da BlockingCollection. Ma di nuovo questo è sbagliato. Tutti gli elementi sono solo garbage collection al di fuori del ciclo while.

Quindi la mia domanda è: in che modo è possibile implementare facilmente il requisito in modo che BlockingCollection conservi potenzialmente gli elementi che consumano memoria e che ciascun elemento possa essere raccolto automaticamente una volta che viene consumato dal consumatore? Grazie mille per qualsiasi aiuto.

EDIT: come richiesto, viene aggiunto un codice demo semplice:

// Entity is what we are going to process. 
// The finalizer will tell us when Entity is going to be garbage collected. 
class Entity 
{ 
    private static int counter_; 
    private int id_; 
    public int ID { get{ return id_; } } 
    public Entity() { id_ = counter++; } 
    ~Entity() { Console.WriteLine("Destroying entity {0}.", id_); } 
} 

... 

private BlockingCollection<Entity> jobQueue_ = new BlockingCollection<Entity>(); 
private List<Task> tasks_ = new List<Task>(); 

// This is the method to launch and wait for the tasks to finish the work. 
void Run() 
{ 
    tasks_.Add(Task.Factory.StartNew(ProduceEntity); 
    Console.WriteLine("Start processing."); 
    tasks_.Add(Task.Factory.StartNew(ConsumeEntity); 
    Task.WaitAll(tasks_.ToArray()); 
} 

// The producer creates Entity instances and add them to BlockingCollection. 
void ProduceEntity() 
{ 
    for(int i = 0; i < 10; i ++) // We are adding totally 10 entities. 
    { 
     var newEntity = new Entity(); 
     Console.WriteLine("Create entity {0}.", newEntity.ID); 
     jobQueue_.Add(newEntity); 
    } 
    jobQueue_.CompleteAdding(); 
} 

// The consumer takes entity, process it (and what I need: destroy it). 
void ConsumeEntity() 
{ 
    while(!jobQueue_.IsCompleted){ 
     Entity entity; 
     if(jobQueue_.TryTake(entity)) 
     { 
      Console.WriteLine("Process entity {0}.", entity.ID); 
      entity = null; 

      // I would assume after GC, the entity will be finalized and garbage collected, but NOT. 
      GC.Collect(); 
      GC.WaitForPendingFinalizers(); 
      GC.Collect(); 
     } 
    } 
    Console.WriteLine("Finish processing."); 
} 

L'output è che tutti i messaggi di creazione e di processo, seguito da "completare l'elaborazione." e seguito da tutti i messaggi di distruzione dalle entità. E entità creazione del messaggio che mostra Entity.ID 0-9 ed i messaggi di distruzione che mostrano Entity.ID da 9 a 0.

EDIT:

Anche quando ho impostato la capacità limite del BlockingCollection, tutte le voci mai entrarvi sono finalizzati solo quando il loop termina, il che è strano.

+0

Solo perché non v'è Rif detenuti non significa che il GC farà un passo in subito e raccoglierlo ... il codice di esempio che illustra il problema con i metodi ecc GC.Collect appropriata sarebbe essere utile –

risposta

2

Se BlockingCollection continua a contenere i riferimenti dipende dal tipo di raccolta che sta utilizzando.

Il default collection type per BlockingCollection<T> è ConcurrentQueue<T>.

Quindi il comportamento della raccolta dati inutili dipende dal tipo di raccolta. Nel caso dello ConcurrentQueue<T> questa è una struttura FIFO, quindi sarei estremamente sorpreso se questo non rilasciasse riferimenti dalla struttura dati dopo che sono stati rimossi dalla coda (è una specie di definizione di una coda)!

In che modo si determina esattamente che gli oggetti non vengono raccolti?

6

ConcurrentQueue contiene segmenti con un array interno di 32 elementi. Gli elementi Entity non saranno raccolti prima che il segmento sia spazzato via. Ciò si verificherà dopo che tutti e 32 gli articoli sono stati estratti dalla coda. Se cambi il tuo esempio per aggiungere 32 elementi, vedrai i messaggi "Entità distruttiva" prima di "Termina l'elaborazione."

+0

Hm. Questo è un comportamento strano per me. Non posso assicurarmi ogni volta che abbiamo tanti elementi. Ma l'ho provato come hai descritto. Penso di dover liberare memoria da ciascun elemento manualmente ogni volta che consumare un oggetto. La tua è molto vicina alla risposta finale, ma aspetterò solo di vedere se qualcun altro può trovare una soluzione per far fronte a questo strano comportamento. – Steve