2013-03-20 13 views
6

Capisco che questa domanda duplica interrogazioni using rabbitmq to send a message not string but structInvia un oggetto utilizzando RabbitMQ

se a farlo usando il primo modo

first way

ho la seguente traccia:

java.io.EOFException 
at java.io.ObjectInputStream$PeekInputStream.readFully(ObjectInputStream.java:2304) 
at java.io.ObjectInputStream$BlockDataInputStream.readShort(ObjectInputStream.java:2773) 
at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:798) 
at java.io.ObjectInputStream.<init>(ObjectInputStream.java:298) 
at com.mdnaRabbit.worker.data.Data.fromBytes(Data.java:78) 
at com.mdnaRabbit.worker.App.main(App.java:41) 
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
at java.lang.reflect.Method.invoke(Method.java:601) 
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:120) 

Ho verificato e verificato che il messaggio sia trasformato in byte in modo assolutamente soddisfacente nella classe mittente, ma il consumatore non può eccomi.

qui è la mia classe di produttore:

package com.mdnaRabbit.newt; 

import java.io.IOException; 
import com.rabbitmq.client.ConnectionFactory; 
import com.rabbitmq.client.Connection; 
import com.rabbitmq.client.Channel; 
import com.rabbitmq.client.MessageProperties; 
import org.apache.commons.lang.SerializationUtils; 
import com.mdnaRabbit.worker.data.Data; 

public class App { 

    private static final String TASK_QUEUE_NAME = "task_queue"; 

    public static void main(String[] argv) throws IOException{ 

     ConnectionFactory factory = new ConnectionFactory(); 
     factory.setHost("localhost"); 
     Connection connection = factory.newConnection(); 
     Channel channel = connection.createChannel(); 

     channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); 

     int i = 0; 

     do { 
      Data message = getMessage(); 
      byte [] byteMessage = message.getBytes(); 
      //System.out.println(byteMessage); 
      channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, byteMessage); 
      System.out.println(" [" + (i+1) + "] message Sent" + Data.fromBytes(byteMessage).getBody()); 
      i++; 
     } while (i<15); 

     channel.close(); 
     connection.close(); 
    } 

    private static Data getMessage(){ 
     Data data = new Data(); 
     data.setHeader("header"); 
     data.setDomainId("abc.com"); 
     data.setReceiver("me"); 
     data.setSender("he"); 
     data.setBody("body"); 
     return data; 
    } 

    private static String joinStrings(String[] strings, String delimiter){ 
     int length = strings.length; 
     if (length == 0) return ""; 
     StringBuilder words = new StringBuilder(strings[0]); 
     for (int i = 1; i < length; i++){ 
      words.append(delimiter).append(strings[i]); 
     } 
     return words.toString(); 
    } 
} 

qui è la mia classe di consumatori:

package com.mdnaRabbit.worker; 

import java.io.IOException; 
import java.util.concurrent.ExecutorService; 
import java.util.concurrent.Executors; 

import com.rabbitmq.client.ConnectionFactory; 
import com.rabbitmq.client.Connection; 
import com.rabbitmq.client.Channel; 
import com.rabbitmq.client.QueueingConsumer; 
import com.mdnaRabbit.worker.data.Data; 
import org.apache.commons.lang.SerializationUtils; 

public class App { 

    private static final String TASK_QUEUE_NAME = "task_queue"; 
    private static int i = 0; 
    public static void main(String[] argv) 
      throws IOException, 
      InterruptedException{ 

     ExecutorService threader = Executors.newFixedThreadPool(20); 
     ConnectionFactory factory = new ConnectionFactory(); 
     factory.setHost("localhost"); 
     Connection connection = factory.newConnection(threader); 
     final Channel channel = connection.createChannel(); 

     channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); 
     System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); 

     channel.basicQos(20); 

     final QueueingConsumer consumer = new QueueingConsumer(channel); 
     channel.basicConsume(TASK_QUEUE_NAME, false, consumer); 

     try { 

      while (true) { 

         try {QueueingConsumer.Delivery delivery = consumer.nextDelivery(); 
          Data message = Data.fromBytes(delivery.getBody()); 
          //Data message = (Data) SerializationUtils.deserialize(delivery.getBody()); 

          System.out.println(" [" + (i++) +"] Received" + message.getBody()); 

          channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); 
         }catch (Exception e){ 
         } 
        } 
     } catch (Exception e){ 
      e.printStackTrace(); 
     } 
     channel.close(); 
     connection.close(); 
    } 

} 

qui è la mia classe di dati:

package com.mdnaRabbit.worker.data; 

import java.io.*; 
import java.util.logging.Level; 
import java.util.logging.Logger; 

public class Data implements Serializable{ 

    public String header; 
    public String body; 
    public String domainId; 
    public String sender; 
    public String receiver; 

    public void setHeader(String head){ 
     this.header = head; 
    } 

    public String getHeader(){ 
     return header; 
    } 

    public void setBody(String body){ 
     this.body = body; 
    } 

    public String getBody(){ 
     return body; 
    } 

    public void setDomainId(String domainId){ 
     this.domainId = domainId; 
    } 

    public String getDomainId(){ 
     return domainId; 
    } 

    public void setSender(String sender){ 
     this.sender = sender; 
    } 

    public String getSender(){ 
     return sender; 
    } 

    public String getReceiver(){ 
     return receiver; 
    } 

    public void setReceiver(String receiver){ 
     this.receiver = receiver; 
    } 


    public byte[] getBytes() { 
     byte[]bytes; 
     ByteArrayOutputStream baos = new ByteArrayOutputStream(); 
     try{ 
      ObjectOutputStream oos = new ObjectOutputStream(baos); 
      oos.writeObject(this); 
      oos.flush(); 
      oos.reset(); 
      bytes = baos.toByteArray(); 
      oos.close(); 
      baos.close(); 
     } catch(IOException e){ 
      bytes = new byte[] {}; 
      Logger.getLogger("bsdlog").log(Level.ALL, "unable to write to output stream" + e); 
     } 
     return bytes; 
    } 

    public static Data fromBytes(byte[] body) { 
     Data obj = null; 
     try { 
      ByteArrayInputStream bis = new ByteArrayInputStream(body); 
      ObjectInputStream ois = new ObjectInputStream(bis); 
      obj = (Data) ois.readObject(); 
      ois.close(); 
      bis.close(); 
     } 
     catch (IOException e) { 
      e.printStackTrace(); 
     } 
     catch (ClassNotFoundException ex) { 
      ex.printStackTrace(); 
     } 
     return obj; 
    } 
} 

ho sempre sembra che dei consumatori riceve messaggi , perché quando non sto cercando di trasformarlo nell'oggetto e basta scrivere mostra byte

+0

Ho fissato l'errore nel mio altra risposta che portano alla confusione con il messaggio e DataMessage – robthewolf

risposta

4

Sembra che l'array di byte che si riceve sia vuoto. Questo accade a causa di questo:

} catch(IOException e){ 
     bytes = new byte[] {}; 
    } 

Quando si produce un'eccezione, il codice non si avverte che qualcosa è rotto e solo invia un array vuoto, invece. È necessario almeno registrare l'errore.

L'eccezione viene prodotta probabilmente perché si sta tentando di serializzare una classe che non è serializzabile. Per fare una classe serializzabile è necessario aggiungere "implementa Serializable" per la sua dichiarazione:

public class Data implements Serializable { 
+0

making classe serializzabile non ha causato alcun miglioramento –

+0

Ho trovato un errore nel mio codice: non ho impostato i valori nella classe Data. Ma anche dopo questo ho lo stesso problema –

+0

Ho trovato la soluzione qui: [collegamento] (http://stackoverflow.com/a/13174951/2082631), ma non funziona comunque –