2012-12-26 9 views
9

Un server TCP viene sviluppato utilizzando SocketAsyncEventArgs ed è async come un servizio di Windows. Ho queste 2 linee di codice all'inizio di Main:20 Ricevute al secondo con SocketAsyncEventArgs

ThreadPool.SetMaxThreads(15000, 30000); 
ThreadPool.SetMinThreads(10000, 20000); 

E entrambi restituiscono true (i valori restituiti vengono registrati). Ora da 2000 a 3000 client iniziano a inviare messaggi a questo server e inizia ad accettare connessioni (conto il numero di connessioni ed è come previsto - C'è un pool di connessioni). Il numero di thread del processo del server aumenterà da ~ 2050 a ~ 3050. Fin qui tutto bene!

Ora è presente un metodo Received che verrà chiamato sia dopo che ReceiveAsync restituisce true o da evento Completato di SocketAsyncEventArgs.

E qui iniziano i problemi: non importa quanti client sono connessi e quanti messaggi inviano, i messaggi ricevuti verranno chiamati al massimo 20 volte in un secondo! E con l'aumentare del numero di clienti, questo numero (20) scende a ~ 10.

Ambiente: TCP Server e client vengono simulati sulla stessa macchina. Ho testato il codice su 2 macchine, una con CPU 2-core e 4GB RAM e l'altra con CPU 8-core e 12GB RAM. Non c'è alcuna perdita di dati (ancora) e a volte ricevo più di 1 messaggio in ciascuna operazione di ricezione. Va bene. Ma come si può aumentare il numero di operazioni di ricezione?

Note aggiuntive sull'implementazione: il codice è di grandi dimensioni e include molte logiche diverse. Una descrizione generale sarebbe: Ho un singolo SocketAsyncEventArgs per accettare nuove connessioni. Funziona alla grande. Ora per ogni nuova connessione accettata creo un nuovo SocketAsyncEventArgs per la ricezione dei dati. Ho messo questo (il SocketAsyncEventArgs creato per ricevere) in un pool. Non verrà riutilizzato, ma viene utilizzato UserToken per tracciare le connessioni; per esempio quelle connessioni che sono disconnesse o quelle che non hanno inviato dati per 7 minuti saranno chiuse e disposte (Il AcceptSocket di SocketAsyncEventArgs sarà shutdown (entrambi), chiuso e smaltito e così sarà l'oggetto SocketAsyncEventArgs stesso). Qui è una classe Sudo che svolge questi compiti, ma tutti gli altri la logica e la registrazione e il controllo degli errori e quant'altro viene rimosso per rendere più semplice e chiaro (forse allora è più facile da individuare il codice problematico):

class Sudo 
{ 
    Socket _listener; 
    int _port = 8797; 

    public Sudo() 
    { 
     var ipEndPoint = new IPEndPoint(IPAddress.Any, _port); 
     _listener = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); 
     _listener.Bind(ipEndPoint); 

     _listener.Listen(100); 

     Accept(null); 
    } 

    void Accept(SocketAsyncEventArgs acceptEventArg) 
    { 
     if (acceptEventArg == null) 
     { 
      acceptEventArg = new SocketAsyncEventArgs(); 
      acceptEventArg.Completed += AcceptCompleted; 
     } 
     else acceptEventArg.AcceptSocket = null; 

     bool willRaiseEvent = _listener.AcceptAsync(acceptEventArg); ; 

     if (!willRaiseEvent) Accepted(acceptEventArg); 
    } 

    void AcceptCompleted(object sender, SocketAsyncEventArgs e) 
    { 
     Accepted(e); 
    } 

    void Accepted(SocketAsyncEventArgs e) 
    { 
     var acceptSocket = e.AcceptSocket; 
     var readEventArgs = CreateArg(acceptSocket); 

     var willRaiseEvent = acceptSocket.ReceiveAsync(readEventArgs); 

     Accept(e); 

     if (!willRaiseEvent) Received(readEventArgs); 
    } 

    SocketAsyncEventArgs CreateArg(Socket acceptSocket) 
    { 
     var arg = new SocketAsyncEventArgs(); 
     arg.Completed += IOCompleted; 

     var buffer = new byte[64 * 1024]; 
     arg.SetBuffer(buffer, 0, buffer.Length); 

     arg.AcceptSocket = acceptSocket; 

     arg.SocketFlags = SocketFlags.None; 

     return arg; 
    } 

    void IOCompleted(object sender, SocketAsyncEventArgs e) 
    { 
     switch (e.LastOperation) 
     { 
      case SocketAsyncOperation.Receive: 
       Received(e); 
       break; 
      default: break; 
     } 
    } 

    void Received(SocketAsyncEventArgs e) 
    { 
     if (e.SocketError != SocketError.Success || e.BytesTransferred == 0 || e.Buffer == null || e.Buffer.Length == 0) 
     { 
      // Kill(e); 
      return; 
     } 

     var bytesList = new List<byte>(); 
     for (var i = 0; i < e.BytesTransferred; i++) bytesList.Add(e.Buffer[i]); 

     var bytes = bytesList.ToArray(); 

     Process(bytes); 

     ReceiveRest(e); 

     Perf.IncOp(); 
    } 

    void ReceiveRest(SocketAsyncEventArgs e) 
    { 
     e.SocketFlags = SocketFlags.None; 
     for (int i = 0; i < e.Buffer.Length; i++) e.Buffer[i] = 0; 
     e.SetBuffer(0, e.Buffer.Length); 

     var willRaiseEvent = e.AcceptSocket.ReceiveAsync(e); 
     if (!willRaiseEvent) Received(e); 
    } 

    void Process(byte[] bytes) { } 
} 
+0

MB RAM? È questa I-cache? Spero che tu abbia diversi GB di memoria. Hai usato Resource Monitor o Process Explorer per cercare i colli di bottiglia? – HABO

+0

Hmm ... Qual è l'architettura asincrona sul tuo server? Potresti postare qualche pseudo codice forse per descrivere come accade l'ascolto e come le connessioni vengono accettate, ecc.? –

+0

@HABO Grazie! L'ho corretto. –

risposta

4

Il La ragione per cui sta rallentando è perché ognuno di questi thread ha bisogno di cambiare contesto, e questa è un'operazione relativamente costosa. Più thread aggiungi, maggiore è la percentuale della tua CPU trascorsa semplicemente sul cambio di contesto e non sul tuo codice reale.

Hai colto il collo di bottiglia di un thread-per-cliente in un modo piuttosto strano. L'intero punto di async lato server è ridurre il numero di thread - per non avere un thread per client, ma idealmente solo uno o due per ciascuno dei processori logici nel sistema.

Il codice asincrono che hai postato guarda bene, quindi posso solo immaginare il metodo Process ha qualche facile trascurare non asincrono, il blocco di I/O in esso, vale a dire un database o file di accesso. Quando I/O blocca, il pool di thread .NET rileva questo e avvia automaticamente un nuovo thread: in questo caso è praticamente fuori controllo con I/O in Process come collo di bottiglia.

Una pipeline asincrona deve essere al 100% asincrona per trarne un vantaggio significativo. Half-in ti farà scrivere un codice complesso che funziona male come un semplice codice di sincronizzazione.

Se non si riesce assolutamente a rendere il metodo Process puramente asincrono, si potrebbe avere un po 'di fortuna a simularlo.In attesa di una coda per l'elaborazione da parte di un piccolo pool di thread di dimensioni limitate.

+0

In questo servizio di Windows eseguo alcune elaborazioni sui dati ricevuti in memoria e l'unico posto al quale sto parlando al di fuori del mio programma è un metodo che inserisce i dati elaborati in una coda MSMQ. –

+0

Cosa hai sottolineato su database e file (e nel mio caso MSMQ). Avevi ragione. Ho dimenticato il maledetto MSMQ con l'opzione recuperabile nei miei altri progetti (ha prestazioni pessime rispetto a RabbitMQ per esempio, ma qui sono costretto a usarlo - anche i commenti sulle prestazioni di MSMQ sono ben accetti). Quindi, per favore, modifica la tua risposta per indicare correttamente il problema causato da altre attività fuori dal flusso di lavoro asincrono principale; quindi posso contrassegnarlo come risposta. Grazie ancora! –

+0

Fantastico, felice che tu abbia trovato una risposta. Sì, MSMQ fa davvero schifo quando la modalità ripristinabile è attiva :). –