2016-03-31 9 views
9

Sto usando Firebase nella mia app, insieme a RxJava. Firebase è in grado di notificare la tua app ogni volta che qualcosa è cambiato nei dati di back-end (aggiunte, rimozioni, modifiche, ...). Sto cercando di combinare la funzionalità di Firebase con RxJava.Combina listener di dati in tempo reale di Firebase con RxJava

I dati che sto ascoltando per si chiama Leisure, e il Observable emette LeisureUpdate che contiene un Leisure e il tipo di aggiornamento (aggiungere, rimuovere, spostato, cambiato).

Ecco il mio metodo che consente di iscriversi a questi eventi.

private Observable<LeisureUpdate> leisureUpdatesObservable; 
private ChildEventListener leisureUpdatesListener; 
private int leisureUpdatesSubscriptionsCount; 

@NonNull 
public Observable<LeisureUpdate> subscribeToLeisuresUpdates() { 
    if (leisureUpdatesObservable == null) { 
     leisureUpdatesObservable = Observable.create(new Observable.OnSubscribe<LeisureUpdate>() { 

      @Override 
      public void call(final Subscriber<? super LeisureUpdate> subscriber) { 
       leisureUpdatesListener = firebase.child(FirebaseStructure.LEISURES).addChildEventListener(new ChildEventListener() { 
        @Override 
        public void onChildAdded(DataSnapshot dataSnapshot, String s) { 
         final Leisure leisure = convertMapToLeisure((Map<String, Object>) dataSnapshot.getValue()); 
         subscriber.onNext(new LeisureUpdate(leisure, LeisureUpdate.ADDED)); 
        } 

        @Override 
        public void onChildChanged(DataSnapshot dataSnapshot, String s) { 
         final Leisure leisure = convertMapToLeisure((Map<String, Object>) dataSnapshot.getValue()); 
         subscriber.onNext(new LeisureUpdate(leisure, LeisureUpdate.CHANGED)); 
        } 

        @Override 
        public void onChildRemoved(DataSnapshot dataSnapshot) { 
         final Leisure leisure = convertMapToLeisure((Map<String, Object>) dataSnapshot.getValue()); 
         subscriber.onNext(new LeisureUpdate(leisure, LeisureUpdate.REMOVED)); 
        } 

        @Override 
        public void onChildMoved(DataSnapshot dataSnapshot, String s) { 
         final Leisure leisure = convertMapToLeisure((Map<String, Object>) dataSnapshot.getValue()); 
         subscriber.onNext(new LeisureUpdate(leisure, LeisureUpdate.MOVED)); 
        } 

        @Override 
        public void onCancelled(FirebaseError firebaseError) { 
         subscriber.onError(new Error(firebaseError.getMessage())); 
        } 
       }); 
      } 
     }); 
    } 
    leisureUpdatesSubscriptionsCount++; 
    return leisureUpdatesObservable; 
} 

Prima di tutto, vorrei utilizzare Observable.fromCallable() metodo al fine di creare il Observable, ma credo sia impossibile, dal momento che Firebase utilizza callback, giusto?

Conservo una singola istanza di Observable in modo da avere sempre uno Observable in cui più Subscriber possono iscriversi.

Il problema si presenta quando tutti si disconnettono e devo smettere di ascoltare gli eventi in Firebase. Non ho trovato comunque che lo Observable capisse se c'è ancora un abbonamento. Quindi continuo a contare quante chiamate ho avuto a subscribeToLeisuresUpdates(), con leisureUpdatesSubscriptionsCount.

Poi ogni volta che qualcuno vuole cancellare l'iscrizione deve chiamare

@Override 
public void unsubscribeFromLeisuresUpdates() { 
    if (leisureUpdatesObservable == null) { 
     return; 
    } 
    leisureUpdatesSubscriptionsCount--; 
    if (leisureUpdatesSubscriptionsCount == 0) { 
     firebase.child(FirebaseStructure.LEISURES).removeEventListener(leisureUpdatesListener); 
     leisureUpdatesObservable = null; 
    } 
} 

Questo è l'unico modo che ho trovato per rendere l'Observable emette elementi quando v'è un abbonato, ma mi sento come se ci deve essere un più facile modo, specialmente capire quando non ci sono più abbonati che ascoltano l'osservabile.

Chiunque abbia riscontrato un problema simile o abbia un approccio diverso?

risposta

2

Metti questo nel tuo Observable.create() alla fine.

subscriber.add(Subscriptions.create(new Action0() { 
        @Override public void call() { 
         ref.removeEventListener(leisureUpdatesListener); 
        } 
       })); 
6

Inoltre, è possibile utilizzare la mia libreria rxFirebase

+0

Sono abbastanza nuovo per RX e volevo dare a Firestore - il nuovo database in tempo reale per Firebase - una prova. Tuttavia, mi sto imbattendo negli stessi problemi asincroni che la tua libreria mi ha aiutato a risolvere con Firebase RTDB - grazie mille! Un wrapper rx su Firestore sarebbe simile a quello che hai fatto qui? –

5

È possibile utilizzare Observable.fromEmitter, qualcosa in questo senso

return Observable.fromEmitter(new Action1<Emitter<LeisureUpdate>>() { 
     @Override 
     public void call(final Emitter<LeisureUpdate> leisureUpdateEmitter) { 
      final ValueEventListener listener = new ValueEventListener() { 
       @Override 
       public void onDataChange(DataSnapshot dataSnapshot) { 
        // process update 
        LeisureUpdate leisureUpdate = ... 
        leisureUpdateEmitter.onNext(leisureUpdate); 
       } 

       @Override 
       public void onCancelled(DatabaseError databaseError) { 
        leisureUpdateEmitter.onError(new Throwable(databaseError.getMessage())); 
        mDatabaseReference.removeEventListener(this); 
       } 
      }; 
      mDatabaseReference.addValueEventListener(listener); 
      leisureUpdateEmitter.setCancellation(new Cancellable() { 
       @Override 
       public void cancel() throws Exception { 
        mDatabaseReference.removeEventListener(listener); 
       } 
      }); 
     } 
    }, Emitter.BackpressureMode.BUFFER); 
+0

qual è l'equivalente in Rx Java 2.0? – Mike6679

+0

Dovresti utilizzare Flowable.create, vedere https://github.com/ReactiveX/RxJava/wiki/What's-different-in-2.0. – Francesc

0

Ecco un codice di esempio per l'utilizzo di RxJava2 con CompletionListener di Firebase:

Completable.create(new CompletableOnSubscribe() { 
     @Override 
     public void subscribe(final CompletableEmitter e) throws Exception { 
      String orderKey = FirebaseDatabase.getInstance().getReference().child("orders").push().getKey(); 
      FirebaseDatabase.getInstance().getReference().child("orders").child(orderKey).setValue(order, 
        new DatabaseReference.CompletionListener() { 
         @Override 
         public void onComplete(DatabaseError databaseError, DatabaseReference databaseReference) { 
          if (e.isDisposed()) { 
           return; 
          } 
          if (databaseError == null) { 
           e.onComplete(); 
          } else { 
           e.onError(new Throwable(databaseError.getMessage())); 
          } 
         } 
        }); 

     } 
    }).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()); 
0

Un'altra libreria sorprendente che vi aiuterà ad avvolgere ogni logica di database in tempo reale Firebase sotto modelli RX.

https://github.com/Link184/Respiration

Qui è possibile creare il repository Firebase ed estenderlo da GeneralRepository ad esempio:

@RespirationRepository(dataSnapshotType = Leisure.class) 
public class LeisureRepository extends GeneralRepository<Leisure>{ 
    protected LeisureRepository(Configuration<Leisure> repositoryConfig) { 
     super(repositoryConfig); 
    } 

    // observable will never emit any events if there are no more subscribers 
    public void killRepository() { 
     if (!behaviorSubject.hasObservers()) { 
      //protected fields 
      behaviorSubject.onComplete(); 
      databaseReference.removeEventListener(valueListener); 
     } 
    } 
} 

Si può "uccidere" il repository in questo modo:

// LeisureRepositoryBuilder is a generated class by annotation processor, will appear after a successful gradle build 
LeisureRepositoryBuilder.getInstance().killRepository(); 

Ma Penso che per la tua situazione sarà meglio estendere com.link184.respiration.repository.ListRepository per evitare il mapping dei dati da java.util.Map a Leisur e model through LeisureUpdate

+1

Dovresti anche spiegare come usare questa libreria. – Taslim

+1

Sebbene questo collegamento possa rispondere alla domanda, è meglio includere qui le parti essenziali della risposta e fornire il link per riferimento. Le risposte di solo collegamento possono diventare non valide se la pagina collegata cambia. - [Dalla recensione] (/ recensione/post di bassa qualità/18966092) –