2016-02-10 9 views
5

Utilizzo un client di gioco multiplayer chiamato AppWarp (http://appwarp.shephertz.com), in cui è possibile aggiungere i listener di eventi da richiamare quando si verifica l'evento, supponiamo che parleremo del Listener di connessioni, in cui è necessario implementare questa interfaccia :Come convertire correttamente gli ascoltatori in Reactive (Observables) usando RxJava?

public interface ConnectionRequestListener { 
    void onConnectDone(ConnectEvent var1); 
    void onDisconnectDone(ConnectEvent var1); 
    void onInitUDPDone(byte var1); 
} 

il mio obiettivo è quello di creare principalmente una versione reattiva di questo client da utilizzare nel mio Apps Internamente invece di utilizzare il client stesso direttamente (sarò anche contare su interfacce tardi invece di a seconda della WarpClient stesso come nell'esempio, ma non è questo il punto importante, per favore leggi la mia domanda alla fine).

Quindi quello che ho fatto è la seguente:

1) ho introdotto un nuovo evento, la chiamò RxConnectionEvent (Quale relativi alla connessione di eventi per lo più gruppi) come segue:

public class RxConnectionEvent { 
    // This is the original connection event from the source client 
    private final ConnectEvent connectEvent; 
    // this is to identify if it was Connection/Disconnection 
    private final int eventType; 

    public RxConnectionEvent(ConnectEvent connectEvent, int eventType) { 
     this.connectEvent = connectEvent; 
     this.eventType = eventType; 
    } 

    public ConnectEvent getConnectEvent() { 
     return connectEvent; 
    } 

    public int getEventType() { 
     return eventType; 
    } 
} 

2) Creato alcuni tipi di eventi come segue:

public class RxEventType { 
    // Connection Events 
    public final static int CONNECTION_CONNECTED = 20; 
    public final static int CONNECTION_DISCONNECTED = 30; 
} 

3) creato il seguente osservabile che emette la mia nuova RxConnectionEvent

import com.shephertz.app42.gaming.multiplayer.client.WarpClient; 
import com.shephertz.app42.gaming.multiplayer.client.events.ConnectEvent; 
import rx.Observable; 
import rx.Subscriber; 
import rx.functions.Action0; 
import rx.subscriptions.Subscriptions; 

public class ConnectionObservable extends BaseObservable<RxConnectionEvent> { 

    private ConnectionRequestListener connectionListener; 

    // This is going to be called from my ReactiveWarpClient (Factory) Later. 
    public static Observable<RxConnectionEvent> createConnectionListener(WarpClient warpClient) { 
     return Observable.create(new ConnectionObservable(warpClient)); 
    } 

    private ConnectionObservable(WarpClient warpClient) { 
     super(warpClient); 
    } 

    @Override 
    public void call(final Subscriber<? super RxConnectionEvent> subscriber) { 
     subscriber.onStart(); 
     connectionListener = new ConnectionRequestListener() { 
      @Override 
      public void onConnectDone(ConnectEvent connectEvent) { 
       super.onConnectDone(connectEvent); 
       callback(new RxConnectionEvent(connectEvent, RxEventType.CONNECTION_CONNECTED)); 
      } 

      @Override 
      public void onDisconnectDone(ConnectEvent connectEvent) { 
       super.onDisconnectDone(connectEvent); 
       callback(new RxConnectionEvent(connectEvent, RxEventType.CONNECTION_DISCONNECTED)); 
      } 

      // not interested in this method (for now) 
      @Override 
      public void onInitUDPDone(byte var1) { } 

      private void callback(RxConnectionEvent rxConnectionEvent) 
      { 
       if (!subscriber.isUnsubscribed()) { 
        subscriber.onNext(rxConnectionEvent); 
       } else { 
        warpClient.removeConnectionRequestListener(connectionListener); 
       } 
      } 
     }; 

     warpClient.addConnectionRequestListener(connectionListener); 
     subscriber.add(Subscriptions.create(new Action0() { 
      @Override 
      public void call() { 
       onUnsubscribed(warpClient); 
      } 
     })); 
    } 

    @Override 
    protected void onUnsubscribed(WarpClient warpClient) { 
     warpClient.removeConnectionRequestListener(connectionListener); 
    } 
} 

4) e, infine, il mio BaseObservable è simile al seguente:

public abstract class BaseObservable<T> implements Observable.OnSubscribe<T> { 

    protected WarpClient warpClient; 

    protected BaseObservable (WarpClient warpClient) 
    { 
     this.warpClient = warpClient; 
    } 

    @Override 
    public abstract void call(Subscriber<? super T> subscriber); 

    protected abstract void onUnsubscribed(WarpClient warpClient); 
} 

La mia domanda è principalmente: è la mia realizzazione sopra corretta o dovrei invece creare osservabili separati per ogni evento, ma in tal caso, questo cliente ha più di 40-50 eventi devo creare osservabili separati per ogni evento?

Io uso anche il codice di cui sopra come segue (usato in un semplice test di integrazione "non-finale"):

public void testConnectDisconnect() { 
    connectionSubscription = reactiveWarpClient.createOnConnectObservable(client) 
      .subscribe(new Action1<RxConnectionEvent>() { 
       @Override 
       public void call(RxConnectionEvent rxEvent) { 
        assertEquals(WarpResponseResultCode.SUCCESS, rxEvent.getConnectEvent().getResult()); 
        if (rxEvent.getEventType() == RxEventType.CONNECTION_CONNECTED) { 
         connectionStatus = connectionStatus | 0b0001; 
         client.disconnect(); 
        } else { 
         connectionStatus = connectionStatus | 0b0010; 
         connectionSubscription.unsubscribe(); 
         haltExecution = true; 
        } 
       } 
      }, new Action1<Throwable>() { 
       @Override 
       public void call(Throwable throwable) { 
        fail("Unexpected error: " + throwable.getMessage()); 
        haltExecution = true; 
       } 
      }); 

    client.connectWithUserName("test user"); 
    waitForSomeTime(); 
    assertEquals(0b0011, connectionStatus); 
    assertEquals(true, connectionSubscription.isUnsubscribed()); 
} 

risposta

2

Vi suggerisco di evitare l'estensione del BaseObservable direttamente dal momento che è molto soggetto ad errori. Invece, prova a usare gli strumenti che Rx ti dà per creare il tuo osservabile.

La soluzione più semplice è l'utilizzo di un PublishSubject, che è sia un osservabile che un Sottoscrittore. L'ascoltatore deve semplicemente richiamare il soggetto onNext e il soggetto emetterà l'evento. Ecco un esempio di lavoro semplificato:

public class PublishSubjectWarpperDemo { 

    public interface ConnectionRequestListener { 
     void onConnectDone(); 

     void onDisconnectDone(); 

     void onInitUDPDone(); 
    } 

    public static class RxConnectionEvent { 
     private int type; 

     public RxConnectionEvent(int type) { 
      this.type = type; 
     } 

     public int getType() { 
      return type; 
     } 

     public String toString() { 
      return "Event of Type " + type; 
     } 
    } 

    public static class SimpleCallbackWrapper { 
     private final PublishSubject<RxConnectionEvent> subject = PublishSubject.create(); 

     public ConnectionRequestListener getListener() { 
      return new ConnectionRequestListener() { 

       @Override 
       public void onConnectDone() { 
        subject.onNext(new RxConnectionEvent(1)); 
       } 

       @Override 
       public void onDisconnectDone() { 
        subject.onNext(new RxConnectionEvent(2)); 
       } 

       @Override 
       public void onInitUDPDone() { 
        subject.onNext(new RxConnectionEvent(3)); 
       } 
      }; 
     } 

     public Observable<RxConnectionEvent> getObservable() { 
      return subject; 
     } 

    } 

    public static void main(String[] args) throws IOException { 
     SimpleCallbackWrapper myWrapper = new SimpleCallbackWrapper(); 
     ConnectionRequestListener listner = myWrapper.getListener();// Get the listener and attach it to the game here. 
     myWrapper.getObservable().observeOn(Schedulers.newThread()).subscribe(event -> System.out.println(event)); 

     listner.onConnectDone(); // Call the listener a few times, the observable should print the event 
     listner.onDisconnectDone(); 
     listner.onInitUDPDone(); 

     System.in.read(); // Wait for enter 
    } 
} 

Una soluzione più complessa sarebbe quella di utilizzare uno dei onSubscribe implementazioni di creare un osservabile utilizzando Observable.create(). Ad esempio AsyncOnSubscibe. Questa soluzione ha il vantaggio di gestire correttamente il backperssure, quindi il tuo abbonato all'evento non si lascia sopraffare dagli eventi. Ma nel tuo caso, sembra uno scenario improbabile, quindi la complessità aggiunta probabilmente non ne vale la pena.