2012-05-23 18 views
5

Attualmente sto utilizzando java.nio.channel.Selectors & SocketChannels per un'applicazione che aprirà le connessioni 1-a-molti per lo streaming continuo su un server. Ho tre thread per la mia applicazione: StreamWriteWorker - esegue operazioni di scrittura su SocketChannel, StreamReadWorker - legge i byte dal buffer e analizza il contenuto e StreamTaskDispatcher - esegue la selezione del selettore per readyOps e invia nuovi runnable per i thread worker.java.nio Selettori e SocketChannel per lo streaming continua

Problema: il richiamo al metodo di selezione del selettore restituisce solo un valore> 0 (readyOps validi) al primo richiamo; Sono in grado di eseguire una scrittura e inviare dati su tutti i canali pronti una volta, ma restituisce tutto il seguente richiamo del metodo di selezione del selettore 0.

Domanda: Devo invocare Chiudi su SocketChannel dopo ogni lettura/Scrivi (spero di no!)? In caso contrario, quale potrebbe essere la causa per cui SocketChannels non è disponibile per eventuali operazioni di lettura/scrittura?

Mi dispiace non posso pubblicare il codice, ma spero di aver spiegato il problema in modo abbastanza chiaro che qualcuno possa aiutarlo. Ho cercato le risposte e vedo che non è possibile riutilizzare una connessione SocketChannel dopo la chiusura, ma il mio canale non deve essere chiuso, il server non riceve mai il risultato del flusso EOF.

Ho fatto alcuni progressi e ho capito che l'operazione di scrittura non stava avvenendo sull'app del server a causa dell'errore di analisi json. Così ora il mio SocketChannel sul codice dell'app client diventa pronto per un'altra operazione di scrittura dopo che ha elaborato un'operazione di lettura. Immagino che questa sia la natura TCP di SocketChannels. Tuttavia, SocketChannel non diventa disponibile per un'altra operazione di lettura sul lato dell'app server ,. Questo comportamento normale è per SocketChannels? Devo chiudere la connessione sul lato client dopo l'operazione di lettura e stabilire una nuova connessione?

Ecco un esempio di codice di quello che sto cercando di fare:

package org.stream.socket; 

import java.io.IOException; 
import java.net.InetSocketAddress; 
import java.net.ServerSocket; 
import java.nio.ByteBuffer; 
import java.nio.CharBuffer; 
import java.nio.channels.SelectionKey; 
import java.nio.channels.Selector; 
import java.nio.channels.ServerSocketChannel; 
import java.nio.channels.SocketChannel; 
import java.nio.charset.Charset; 
import java.nio.charset.CharsetDecoder; 
import java.nio.charset.CodingErrorAction; 
import java.util.HashMap; 
import java.util.Iterator; 
import java.util.UUID; 
import java.util.concurrent.ExecutorService; 
import java.util.concurrent.Executors; 
import java.util.concurrent.LinkedBlockingQueue; 

import org.apache.commons.lang3.RandomStringUtils; 

import com.google.gson.Gson; 
import com.google.gson.JsonElement; 
import com.google.gson.JsonObject; 
import com.google.gson.JsonParser; 
import com.google.gson.JsonPrimitive; 
import com.google.gson.stream.JsonToken; 

public class ClientServerTest { 

    private LinkedBlockingQueue<byte[]> dataQueue = new LinkedBlockingQueue<byte[]>(); 
    private ExecutorService executor = Executors.newFixedThreadPool(1); 
    private HashMap<String, Integer> uuidToSize = new HashMap<String, Integer>(); 

    private class StreamWriteTask implements Runnable { 
     private ByteBuffer buffer; 
     private SelectionKey key; 
     private Selector selector; 

     private StreamWriteTask(ByteBuffer buffer, SelectionKey key, Selector selector) { 
      this.buffer = buffer; 
      this.key = key; 
      this.selector = selector; 
     } 

     @Override 
     public void run() { 
      SocketChannel sc = (SocketChannel) key.channel(); 
      byte[] data = (byte[]) key.attachment(); 
      buffer.clear(); 
      buffer.put(data); 
      buffer.flip(); 
      int results = 0; 
      while (buffer.hasRemaining()) { 
       try { 
        results = sc.write(buffer); 
       } catch (IOException e) { 
        // TODO Auto-generated catch block 
        e.printStackTrace(); 
       } 

       if (results == 0) { 
        buffer.compact(); 
        buffer.flip(); 
        data = new byte[buffer.remaining()]; 
        buffer.get(data); 
        key.interestOps(SelectionKey.OP_WRITE); 
        key.attach(data); 
        selector.wakeup(); 
        return; 
       } 
      } 

      key.interestOps(SelectionKey.OP_READ); 
      key.attach(null); 
      selector.wakeup(); 
     } 

    } 

    private class StreamReadTask implements Runnable { 
     private ByteBuffer buffer; 
     private SelectionKey key; 
     private Selector selector; 

     private StreamReadTask(ByteBuffer buffer, SelectionKey key, Selector selector) { 
      this.buffer = buffer; 
      this.key = key; 
      this.selector = selector; 
     } 

     private boolean checkUUID(byte[] data) { 
      return uuidToSize.containsKey(new String(data)); 
     } 

     @Override 
     public void run() { 
      SocketChannel sc = (SocketChannel) key.channel(); 
      buffer.clear(); 
      byte[] data = (byte[]) key.attachment(); 
      if (data != null) { 
       buffer.put(data); 
      } 
      int count = 0; 
      int readAttempts = 0; 
      try { 
       while ((count = sc.read(buffer)) > 0) { 
        readAttempts++; 
       } 
      } catch (IOException e) { 
       // TODO Auto-generated catch block 
       e.printStackTrace(); 
      } 

      if (count == 0) { 
       buffer.flip(); 
       data = new byte[buffer.limit()]; 
       buffer.get(data); 
       if (checkUUID(data)) { 
        key.interestOps(SelectionKey.OP_READ); 
        key.attach(data); 
       } else { 
        System.out.println("Clinet Read - uuid ~~~~ " + new String(data)); 
        key.interestOps(SelectionKey.OP_WRITE); 
        key.attach(null); 
       } 
      } 

      if (count == -1) { 
       try { 
        sc.close(); 
       } catch (IOException e) { 
        // TODO Auto-generated catch block 
        e.printStackTrace(); 
       } 
      } 

      selector.wakeup(); 
     } 

    } 

    private class ClientWorker implements Runnable { 

     @Override 
     public void run() { 
      try { 
       Selector selector = Selector.open(); 
       SocketChannel sc = SocketChannel.open(); 
       sc.configureBlocking(false); 
       sc.connect(new InetSocketAddress("127.0.0.1", 9001)); 
       sc.register(selector, SelectionKey.OP_CONNECT); 
       ByteBuffer buffer = ByteBuffer.allocateDirect(65535); 

       while (selector.isOpen()) { 
        int count = selector.select(10); 

        if (count == 0) { 
         continue; 
        } 

        Iterator<SelectionKey> it = selector.selectedKeys().iterator(); 

        while (it.hasNext()) { 
         final SelectionKey key = it.next(); 
         it.remove(); 
         if (!key.isValid()) { 
          continue; 
         } 

         if (key.isConnectable()) { 
          sc = (SocketChannel) key.channel(); 
          if (!sc.finishConnect()) { 
           continue; 
          } 
          sc.register(selector, SelectionKey.OP_WRITE); 
         } 

         if (key.isReadable()) { 
          key.interestOps(0); 
          executor.execute(new StreamReadTask(buffer, key, selector)); 
         } 
         if (key.isWritable()) { 
          key.interestOps(0); 
          if(key.attachment() == null){ 
           key.attach(dataQueue.take()); 
          } 
          executor.execute(new StreamWriteTask(buffer, key, selector)); 
         } 
        } 
       } 
      } catch (IOException ex) { 
       // Handle Exception 
      }catch(InterruptedException ex){ 

      } 

     } 
    } 

    private class ServerWorker implements Runnable { 
     @Override 
     public void run() { 
      try { 
       Selector selector = Selector.open(); 
       ServerSocketChannel ssc = ServerSocketChannel.open(); 
       ServerSocket socket = ssc.socket(); 
       socket.bind(new InetSocketAddress(9001)); 
       ssc.configureBlocking(false); 
       ssc.register(selector, SelectionKey.OP_ACCEPT); 
       ByteBuffer buffer = ByteBuffer.allocateDirect(65535); 
       DataHandler handler = new DataHandler(); 

       while (selector.isOpen()) { 
        int count = selector.select(10); 

        if (count == 0) { 
         continue; 
        } 

        Iterator<SelectionKey> it = selector.selectedKeys().iterator(); 

        while (it.hasNext()) { 
         final SelectionKey key = it.next(); 
         it.remove(); 
         if (!key.isValid()) { 
          continue; 
         } 

         if (key.isAcceptable()) { 
          ssc = (ServerSocketChannel) key.channel(); 
          SocketChannel sc = ssc.accept(); 
          sc.configureBlocking(false); 
          sc.register(selector, SelectionKey.OP_READ); 
         } 
         if (key.isReadable()) { 
          handler.readSocket(buffer, key); 
         } 
         if (key.isWritable()) { 
          handler.writeToSocket(buffer, key); 
         } 
        } 
       } 

      } catch (IOException e) { 
       // TODO Auto-generated catch block 
       e.printStackTrace(); 
      } 
     } 

    } 

    private class DataHandler { 

     private JsonObject parseData(StringBuilder builder) { 
      if (!builder.toString().endsWith("}")) { 
       return null; 
      } 

      JsonParser parser = new JsonParser(); 
      JsonObject obj = (JsonObject) parser.parse(builder.toString()); 
      return obj; 
     } 

     private void readSocket(ByteBuffer buffer, SelectionKey key) 
       throws IOException { 
      SocketChannel sc = (SocketChannel) key.channel(); 
      buffer.clear(); 
      int count = Integer.MAX_VALUE; 
      int readAttempts = 0; 
      try { 
       while ((count = sc.read(buffer)) > 0) { 
        readAttempts++; 
       } 
      } catch (IOException e) { 
       e.printStackTrace(); 
      } 

      if (count == 0) { 
       buffer.flip(); 
       StringBuilder builder = key.attachment() instanceof StringBuilder ? (StringBuilder) key 
         .attachment() : new StringBuilder(); 
       Charset charset = Charset.forName("UTF-8"); 
       CharsetDecoder decoder = charset.newDecoder(); 
       decoder.onMalformedInput(CodingErrorAction.IGNORE); 
       System.out.println(buffer); 
       CharBuffer charBuffer = decoder.decode(buffer); 
       String content = charBuffer.toString(); 
       charBuffer = null; 
       builder.append(content);  
       System.out.println(content); 
       JsonObject obj = parseData(builder); 
       if (obj == null) { 
        key.attach(builder); 
        key.interestOps(SelectionKey.OP_READ); 
       } else { 
        System.out.println("data ~~~~~~~ " + builder.toString()); 
        JsonPrimitive uuid = obj.get("uuid").getAsJsonPrimitive(); 
        key.attach(uuid.toString().getBytes()); 
        key.interestOps(SelectionKey.OP_WRITE); 
       } 
      } 

      if (count == -1) { 
       key.attach(null); 
       sc.close(); 
      } 
     } 

     private void writeToSocket(ByteBuffer buffer, SelectionKey key) 
       throws IOException { 
      SocketChannel sc = (SocketChannel) key.channel(); 
      byte[] data = (byte[]) key.attachment(); 
      buffer.clear(); 
      buffer.put(data); 
      buffer.flip(); 
      int writeAttempts = 0; 
      while (buffer.hasRemaining()) { 
       int results = sc.write(buffer); 
       writeAttempts++; 
       System.out.println("Write Attempt #" + writeAttempts); 
       if (results == 0) { 
        buffer.compact(); 
        buffer.flip(); 
        data = new byte[buffer.remaining()]; 
        buffer.get(data); 
        key.attach(data); 
        key.interestOps(SelectionKey.OP_WRITE); 
        break; 
       } 
      } 

      key.interestOps(SelectionKey.OP_READ); 
      key.attach(null); 
     } 
    } 

    public ClientServerTest() { 
     for (int index = 0; index < 1000; index++) { 
      JsonObject obj = new JsonObject(); 
      String uuid = UUID.randomUUID().toString(); 
      uuidToSize.put(uuid, uuid.length()); 
      obj.addProperty("uuid", uuid); 
      String data = RandomStringUtils.randomAlphanumeric(10000); 
      obj.addProperty("event", data); 
      dataQueue.add(obj.toString().getBytes()); 
     } 

     Thread serverWorker = new Thread(new ServerWorker()); 
     serverWorker.start(); 

     Thread clientWorker = new Thread(new ClientWorker()); 
     clientWorker.start(); 

    } 

    /** 
    * @param args 
    */ 
    public static void main(String[] args) { 
     ClientServerTest test = new ClientServerTest(); 
     for(;;){ 

     } 
    } 

} 
+2

Perché pensi di aver bisogno di questi tre fili? Certamente non hai bisogno del thread di scrittura, e con un po 'di ristrutturazione lungo le linee pensate da NIO puoi anche liberarti del thread di lettura. Multithreading e NIO davvero non si mescolano. Se vuoi il multithreading, usa java.net e bloccando I/O. – EJP

+0

Grazie per il tuo commento. Ho iniziato con i tre thread perché la tempestività è un fattore importante; Non volevo leggere Read waiting on Write o viceversa, e lavorerò con una grande quantità di dati. Hai una risposta riguardo al problema che ho affermato? –

+0

Non sono sicuro che il problema che hai affermato che motiva i tre thread esiste persino. Sei in * modalità non bloccante. * Niente attenderà, tranne la chiamata select(). Ma se select() restituisce zero, nulla è pronto. Chiudere i canali a caso non cambierà quello. – EJP

risposta

4
  1. Il modo corretto per elaborare OP_CONNECT è tentare finishConnect() una volta, e se riesce annullare la registrazione OP_CONNECT e registrare OP_READ o OP_WRITE , probabilmente quest'ultimo dato che sei un cliente. Fare il ciclo e dormire in modalità non bloccante non ha senso. Se finishConnect() restituisce false, OP_CONNECT verrà attivato nuovamente.

  2. L'elaborazione di !key.isAcceptable(), !key.isReadable() e !key.isWriteable() non ha assolutamente senso. Se la chiave è accettabile, chiamare accept(). Se è leggibile, chiama read(). Se è scrivibile, chiama write(). E 'così semplice.

  3. È necessario essere consapevoli del fatto che i canali sono quasi sempre scrivibili, ad eccezione dei brevi periodi in cui il buffer di invio del socket è pieno. Quindi registrati solo per OP_WRITE quando hai qualcosa da scrivere, o meglio ancora dopo il hai provato una scrittura e hai ottenuto un rendimento pari a zero; quindi quando viene attivato lo OP_WRITE, riprovare a scrivere e annullare la registrazione OP_WRITE a meno che non si ottenga un altro zero.

  4. Sei troppo economico con il tuo ByteBuffer. In pratica è necessario uno uno per canale. Puoi salvarlo come allegato chiave in modo da poterlo riavere quando ne hai bisogno.Altrimenti non hai alcun modo di accumulare letture parziali, che sono certe che accada, o qualsiasi altro modo di riprovare le scritture.

+0

Ok, quindi non penso che OP_CONNECT sia un grosso problema, ma lo cambio; nel mio codice al lavoro di solito inizializzo le mie connessioni in precedenza, ma proverò a modo tuo e vedrò se è d'aiuto. Per quanto riguarda il tuo secondo proiettile che era ovviamente un refuso, stavo mettendo insieme il campione in ritardo. Per quanto riguarda il nostro terzo proiettile, questo è intrigante per me perché il mio OP_WRITE si attiva inizialmente, ma non si attiva fino a quando un OP_READ non è stato attivato da un incendio successivo. Sto anche scrivendo il contenuto completo del byte [] e non sta superando il limite del mio buffer. –

+0

Infine, non sono sicuro che se seguirò su di me di non essere in grado di gestire una lettura o scrittura parziale, il mio codice gestirà tale caso. –

+0

Dopo aver lavorato, tutto ciò che hai detto ha avuto senso. Il mio codice funziona come dovrebbe funzionare. Grazie per la tua pazienza e aiuto. Pubblicherò una nuova domanda in merito alle prestazioni e alla gestione della memoria, attendo con impazienza la tua saggezza. –