2012-11-13 4 views
6

Sto scrivendo un'applicazione che comporta la scrittura di considerevoli blocchi di dati su un OutputStream (appartenente a un socket). La cosa che rende questo un po 'complicato è che di solito ci sono più thread che tentano di scrivere sullo stesso OutputStream. Attualmente, l'ho progettato in modo che l'OutputStream a cui i dati vengono scritti sia nella sua stessa thread. Il thread contiene una coda (LinkedList) che esegue il polling degli array di byte e li scrive al più presto possibile.Scrittura simultanea su uno standard OutputStream

private class OutputStreamWriter implements Runnable { 

    private final LinkedList<byte[]> chunkQueue = new LinkedList<byte[]>(); 

    public void run() { 
     OutputStream outputStream = User.this.outputStream; 
     while (true) { 
      try { 
       if (chunkQueue.isEmpty()) { 
        Thread.sleep(100); 
        continue; 
       } 
       outputStream.write(chunkQueue.poll()); 
      } catch (Exception e) { 
       e.printStackTrace(); 
      } 
     } 
    } 
} 

Il problema di questo progetto è che, come si verificano sempre più operazioni di scrittura, sempre più code di dati su e non è sempre scritto più velocemente. Inizialmente, quando i dati vengono messi in coda, vengono scritti praticamente immediatamente. Quindi, dopo circa 15 secondi circa, i dati iniziano a rimanere indietro; un ritardo si sviluppa dal momento in cui i dati vengono accodati al momento in cui i dati vengono effettivamente scritti. Col passare del tempo, questo ritardo diventa sempre più lungo. È molto notevole.

Un modo per risolvere il problema sarebbe una sorta di implementazione ConcurrentOutputStream che consente di inviare i dati senza bloccare in modo che le scritture non inizino a essere sottoposte a backup (diamine, quindi la coda non sarebbe necessaria). Non so se ci sia una tale implementazione - non sono stato in grado di trovarne una - e personalmente non credo sia nemmeno possibile scriverne una.

Quindi, qualcuno ha qualche suggerimento su come posso riprogettare questo?

+4

Non è molto costruttivo. Che cosa c'è che non va? –

+0

BufferedOutputStream? –

+0

Come parte, stai sincronizzando le modifiche alla tua lista collegata? Perché non è thread-safe dal design. Inoltre, che tipo di flusso di output stai sovrapponendo all'output del socket e quanti dati stai trasmettendo? – Perception

risposta

4

Il throughput della presa è limitato; se è più lento del throughput di generazione dei dati, i dati devono essere memorizzati nel buffer, non c'è modo di aggirarli. Scrivere "simultaneamente" non aiuterà affatto.

È possibile sospendere la generazione dei dati quando i dati in coda superano determinati limiti, per ridurre il consumo di memoria.

+0

Sto solo gettando le cose al muro qui, ma, che dire di un SocketChannel? –

+0

Non credo che possa essere d'aiuto.Il collo di bottiglia è la larghezza di banda della rete. – irreputable

0

Sono d'accordo con @irreputable che la scrittura simultanea non aiuterà in alcun modo. Invece dovresti guardare al lato produttore, cioè a quello che hai già.

  1. Utilizzare un BlockingQueue anziché una LinkedList.

  2. Utilizzare l'operazione di interrogazione di blocco della coda, piuttosto che un sonno cieco per 100 msl, che, per definizione, sprecherà il 50% del tempo in media. Per un lungo periodo che potrebbe davvero aggiungere.

0

avevo bisogno di un filtro per intercettare connessioni lente in cui ho bisogno di chiudere le connessioni DB ASAP così ho inizialmente usato tubi di Java, ma quando guardò più da vicino la loro attuazione, è tutti sincronizzati così ho finito per creare il mio QueueInputStream utilizzando un piccolo buffer e una coda di blocco per mettere il buffer in coda una volta era pieno, è libero da lock tranne quando per le condizioni di lock usate su LinkedBlockingQueue che con l'aiuto del piccolo buffer dovrebbe essere economico, questa classe è pensata per essere utilizzato per un singolo produttore e consumatore per istanza e si dovrebbe passare un ExecutorService per avviare lo streaming dei byte accodati all'output finale finale:

import java.io.IOException; 
import java.io.OutputStream; 
import java.util.concurrent.*; 

public class QueueOutputStream extends OutputStream 
{ 
    private static final int DEFAULT_BUFFER_SIZE=1024; 
    private static final byte[] END_SIGNAL=new byte[]{}; 

    private final BlockingQueue<byte[]> queue=new LinkedBlockingDeque<>(); 
    private final byte[] buffer; 

    private boolean closed=false; 
    private int count=0; 

    public QueueOutputStream() 
    { 
    this(DEFAULT_BUFFER_SIZE); 
    } 

    public QueueOutputStream(final int bufferSize) 
    { 
    if(bufferSize<=0){ 
     throw new IllegalArgumentException("Buffer size <= 0"); 
    } 
    this.buffer=new byte[bufferSize]; 
    } 

    private synchronized void flushBuffer() 
    { 
    if(count>0){ 
     final byte[] copy=new byte[count]; 
     System.arraycopy(buffer,0,copy,0,count); 
     queue.offer(copy); 
     count=0; 
    } 
    } 

    @Override 
    public synchronized void write(final int b) throws IOException 
    { 
    if(closed){ 
     throw new IllegalStateException("Stream is closed"); 
    } 
    if(count>=buffer.length){ 
     flushBuffer(); 
    } 
    buffer[count++]=(byte)b; 
    } 

    @Override 
    public synchronized void write(final byte[] b, final int off, final int len) throws IOException 
    { 
    super.write(b,off,len); 
    } 

    @Override 
    public synchronized void close() throws IOException 
    { 
    flushBuffer(); 
    queue.offer(END_SIGNAL); 
    closed=true; 
    } 

    public Future<Void> asyncSendToOutputStream(final ExecutorService executor, final OutputStream outputStream) 
    { 
    return executor.submit(
      new Callable<Void>() 
      { 
       @Override 
       public Void call() throws Exception 
       { 
       try{ 
        byte[] buffer=queue.take(); 
        while(buffer!=END_SIGNAL){ 
        outputStream.write(buffer); 
        buffer=queue.take(); 
        } 
        outputStream.flush(); 
       } catch(Exception e){ 
        close(); 
        throw e; 
       } finally{ 
        outputStream.close(); 
       } 
       return null; 
       } 
      } 
    ); 
    }