2016-03-02 15 views
6

Sto implementando un livello di collegamento dati utilizzando uno producer/consumer pattern. Il livello di collegamento dati ha il proprio thread e macchina di stato per comunicare il protocollo di collegamento dati via cavo (Ethernet, RS-232 ...). L'interfaccia con il livello fisico è rappresentata come System.IO.Stream. Un altro thread scrive messaggi e legge i messaggi dall'oggetto collegamento dati.C# In attesa di più eventi in Producer/Consumer

L'oggetto collegamento dati ha uno stato di inattività che deve attendere per una delle quattro condizioni:

  1. Un byte ricevuto
  2. Un messaggio è disponibile dal thread rete
  3. Il timer keep-alive è scaduto
  4. Tutte le comunicazioni è stato annullato dal livello di rete

sto avendo un momento difficile figu risuona il modo migliore per farlo senza dividere la comunicazione in un thread di lettura/scrittura (aumentando così in modo significativo la complessità). Ecco come posso ottenere 3 su 4:

// Read a byte from 'stream'. Timeout after 10 sec. Monitor the cancellation token. 
stream.ReadTimeout = 10000; 
await stream.ReadAsync(buf, 0, 1, cts.Token); 

o

BlockingCollection<byte[]> SendQueue = new ...; 
... 
// Check for a message from network layer. Timeout after 10 seconds. 
// Monitor cancellation token. 
SendQueue.TryTake(out msg, 10000, cts.Token); 

Cosa devo fare per bloccare il filo, in attesa che tutte e quattro le condizioni? Tutti i consigli sono ben accetti Non sono impostato su alcuna architettura o struttura di dati.

EDIT: ******** Grazie per l'aiuto di tutti. Ecco la mia soluzione ********

Per prima cosa non penso ci sia stata un'implementazione asincrona della coda produttore/consumatore. Quindi ho implementato qualcosa di simile a this stackoverflow post.

Avevo bisogno di una fonte di cancellazione esterna e interna per interrompere il thread del consumatore e annullare le attività intermedie, rispettivamente, similar to this article.

byte[] buf = new byte[1]; 
using (CancellationTokenSource internalTokenSource = new CancellationTokenSource()) 
{ 
    CancellationToken internalToken = internalTokenSource.Token; 
    CancellationToken stopToken = stopTokenSource.Token; 
    using (CancellationTokenSource linkedCts = 
     CancellationTokenSource.CreateLinkedTokenSource(stopToken, internalToken)) 
    { 
     CancellationToken ct = linkedCts.Token; 
     Task<int> readTask = m_stream.ReadAsync(buf, 0, 1, ct); 
     Task<byte[]> msgTask = m_sendQueue.DequeueAsync(ct); 
     Task keepAliveTask = Task.Delay(m_keepAliveTime, ct); 

     // Wait for at least one task to complete 
     await Task.WhenAny(readTask, msgTask, keepAliveTask); 

     // Next cancel the other tasks 
     internalTokenSource.Cancel(); 
     try { 
      await Task.WhenAll(readTask, msgTask, keepAliveTask); 
     } catch (OperationCanceledException e) { 
      if (e.CancellationToken == stopToken) 
       throw; 
     } 

     if (msgTask.IsCompleted) 
      // Send the network layer message 
     else if (readTask.IsCompleted) 
      // Process the byte from the physical layer 
     else 
      Contract.Assert(keepAliveTask.IsCompleted); 
      // Send a keep alive message 
    } 
} 
+3

['Attendere Task.WhenAny (...)'] (https://msdn.microsoft.com/en-us/library/system.threading.tasks.task.whenany%28v=vs.110%29.aspx) potrebbe aiutare. –

risposta

2

In questo caso, vorrei usare solo i token di annullamento per la cancellazione. Un timeout ripetuto come un timer keep-alive è meglio rappresentato come un timer.

Quindi, modifico questo come tre attività cancellabili. In primo luogo, il token di cancellazione:

Ogni comunicazione è stata annullata dal livello di rete

CancellationToken token = ...; 

Poi, tre operazioni simultanee:

Un byte ricevuto

var readByteTask = stream.ReadAsync(buf, 0, 1, token); 

Il timer keep-alive è scaduto

var keepAliveTimerTask = Task.Delay(TimeSpan.FromSeconds(10), token); 

Un messaggio è disponibile presso la rete di filo

Questo è un po 'più complicato. Il codice corrente utilizza BlockingCollection<T>, che non è compatibile asincrono. Raccomando di passare a TPL Dataflow's BufferBlock<T> o my own AsyncProducerConsumerQueue<T>, ognuno dei quali può essere utilizzato come coda compatibile produttore/consumatore asincrono (il che significa che il produttore può essere sincronizzato o asincrono e che il consumatore può essere sincronizzato o asincrono).

BufferBlock<byte[]> SendQueue = new ...; 
... 
var messageTask = SendQueue.ReceiveAsync(token); 

Quindi è possibile utilizzare Task.WhenAny per determinare quale di queste attività completato:

var completedTask = await Task.WhenAny(readByteTask, keepAliveTimerTask, messageTask); 

Ora, è possibile recuperare i risultati confrontando completedTask agli altri e await li ing:

if (completedTask == readByteTask) 
{ 
    // Throw an exception if there was a read error or cancellation. 
    await readByteTask; 
    var byte = buf[0]; 
    ... 
    // Continue reading 
    readByteTask = stream.ReadAsync(buf, 0, 1, token); 
} 
else if (completedTask == keepAliveTimerTask) 
{ 
    // Throw an exception if there was a cancellation. 
    await keepAliveTimerTask; 
    ... 
    // Restart keepalive timer. 
    keepAliveTimerTask = Task.Delay(TimeSpan.FromSeconds(10), token); 
} 
else if (completedTask == messageTask) 
{ 
    // Throw an exception if there was a cancellation (or the SendQueue was marked as completed) 
    byte[] message = await messageTask; 
    ... 
    // Continue reading 
    messageTask = SendQueue.ReceiveAsync(token); 
} 
+0

Vorrei averlo visto prima mentre andavo a implementare il mio AsyncQueue, anche se non così generale come il tuo. Perché aspetti ogni singola attività? Non dovrebbe già essere completato? –

+0

@BrianHeilig: è necessario recuperare i risultati. Queste attività sono state completate ma i loro risultati non sono stati osservati. I risultati possono essere dati, ad esempio 'messageTask', ma i risultati possono anche essere eccezioni. 'keepAliveTimerTask' può avere un'eccezione se è cancellata, e' readByteTask' può avere un'eccezione se è cancellata o se c'è qualche errore di lettura I/O. 'await' (ri) solleverà quelle eccezioni se presenti. –

1

L'annullamento di una lettura non consente di sapere se i dati sono stati letti o meno. Annullare e leggere non sono atomici l'uno rispetto all'altro. Questo approccio funziona solo se si chiude il flusso dopo la cancellazione.

L'approccio alla coda è migliore. È possibile creare un collegamento CancellationTokenSource che viene annullato ogni volta che lo si desidera. Invece di passare cts.Token si passa un token che controlli.

Puoi quindi segnalare quel token in base al tempo, a un altro token ea qualsiasi altro evento che ti piace. Se si utilizza il timeout incorporato, la coda farà internamente la stessa cosa per collegare il token in entrata con un timeout.

+0

Mi dispiace, ero un po 'corto sui dettagli. Il cts è una CancellationTokenSource di proprietà del livello di collegamento dati. Anche l'annullamento è controllato dal livello del collegamento dati; la chiamata al thread di rete si interrompe che annulla tutte le comunicazioni e attende il completamento del thread. L'obiettivo è quello di fermare in modo elegante e rapido il livello del collegamento dati. –

+1

E perché questa risposta non funziona? Puoi annullare l'opzione 'TryTake' in * qualsiasi * condizione che ti piace. – usr

+0

Penso di aver capito. Stai suggerendo di segnalare il token di cancellazione quando il thread di rete cancella o un byte è disponibile, giusto? Credo che avrei bisogno di un altro thread per leggere in modo sincrono dal flusso, quindi segnalare il token di cancellazione quando la lettura è completa. Avrò bisogno di una variabile per determinare cosa è stato cancellato (leggi o interrompi). Dovrò anche cancellare il thread letto una volta che un messaggio è stato preso, giusto? O mi sta sfuggendo qualcosa? –

3

Vorrei andare con l'opzione numero due, in attesa che si verificasse una delle 4 condizioni. Supponendo di aver i 4 compiti come metodi awaitable già:

var task1 = WaitForByteReceivedAsync(); 
var task2 = WaitForMessageAvailableAsync(); 
var task3 = WaitForKeepAliveTimerAsync(); 
var task4 = WaitForCommunicationCancelledAsync(); 

// now gather them 
IEnumerable<Task<bool>> theTasks = new List<IEnumerable<Task<bool>>>{ 
task1, task2, task3, task4 
}; 

// Wait for any of the things to complete 
var result = await Task.WhenAny(theTasks); 

Il codice qui sopra riprendere immediatamente dopo la prima operazione completata, e ignorare gli altri 3.

Nota:

In the documentation for WhenAny, si dice:

Il compito tornato terminerà sempre nello stato RanToCompletion con il suo set di risultati il ​​primo compito da completare. Questo è vero anche se il primo compito da completare termina nello stato Cancellato o Faultato.

quindi assicuratevi di farlo controllo finale prima fidarsi di quello che è successo:

if(result.Result.Result == true) 
... // First Result is the task, the second is the bool that the task returns 
+1

Penso che intendessi WhenAny() –

+1

No. WhenAny restituisce il momento in cui una delle attività restituisce. QuandoTutto è in attesa di ogni attività assegnata al completamento e restituisce di per sé un'attività (al contrario di WaitAll() –

+1

Questo non funziona perché il risultato del byte letto potrebbe essere scartato. Suppongo che gli importi che i dati non vengano persi. – usr