Sto usando Firebase nella mia app, insieme a RxJava. Firebase è in grado di notificare la tua app ogni volta che qualcosa è cambiato nei dati di back-end (aggiunte, rimozioni, modifiche, ...). Sto cercando di combinare la funzionalità di Firebase con RxJava.Combina listener di dati in tempo reale di Firebase con RxJava
I dati che sto ascoltando per si chiama Leisure
, e il Observable
emette LeisureUpdate
che contiene un Leisure
e il tipo di aggiornamento (aggiungere, rimuovere, spostato, cambiato).
Ecco il mio metodo che consente di iscriversi a questi eventi.
private Observable<LeisureUpdate> leisureUpdatesObservable;
private ChildEventListener leisureUpdatesListener;
private int leisureUpdatesSubscriptionsCount;
@NonNull
public Observable<LeisureUpdate> subscribeToLeisuresUpdates() {
if (leisureUpdatesObservable == null) {
leisureUpdatesObservable = Observable.create(new Observable.OnSubscribe<LeisureUpdate>() {
@Override
public void call(final Subscriber<? super LeisureUpdate> subscriber) {
leisureUpdatesListener = firebase.child(FirebaseStructure.LEISURES).addChildEventListener(new ChildEventListener() {
@Override
public void onChildAdded(DataSnapshot dataSnapshot, String s) {
final Leisure leisure = convertMapToLeisure((Map<String, Object>) dataSnapshot.getValue());
subscriber.onNext(new LeisureUpdate(leisure, LeisureUpdate.ADDED));
}
@Override
public void onChildChanged(DataSnapshot dataSnapshot, String s) {
final Leisure leisure = convertMapToLeisure((Map<String, Object>) dataSnapshot.getValue());
subscriber.onNext(new LeisureUpdate(leisure, LeisureUpdate.CHANGED));
}
@Override
public void onChildRemoved(DataSnapshot dataSnapshot) {
final Leisure leisure = convertMapToLeisure((Map<String, Object>) dataSnapshot.getValue());
subscriber.onNext(new LeisureUpdate(leisure, LeisureUpdate.REMOVED));
}
@Override
public void onChildMoved(DataSnapshot dataSnapshot, String s) {
final Leisure leisure = convertMapToLeisure((Map<String, Object>) dataSnapshot.getValue());
subscriber.onNext(new LeisureUpdate(leisure, LeisureUpdate.MOVED));
}
@Override
public void onCancelled(FirebaseError firebaseError) {
subscriber.onError(new Error(firebaseError.getMessage()));
}
});
}
});
}
leisureUpdatesSubscriptionsCount++;
return leisureUpdatesObservable;
}
Prima di tutto, vorrei utilizzare Observable.fromCallable()
metodo al fine di creare il Observable
, ma credo sia impossibile, dal momento che Firebase utilizza callback, giusto?
Conservo una singola istanza di Observable
in modo da avere sempre uno Observable
in cui più Subscriber
possono iscriversi.
Il problema si presenta quando tutti si disconnettono e devo smettere di ascoltare gli eventi in Firebase. Non ho trovato comunque che lo Observable
capisse se c'è ancora un abbonamento. Quindi continuo a contare quante chiamate ho avuto a subscribeToLeisuresUpdates()
, con leisureUpdatesSubscriptionsCount
.
Poi ogni volta che qualcuno vuole cancellare l'iscrizione deve chiamare
@Override
public void unsubscribeFromLeisuresUpdates() {
if (leisureUpdatesObservable == null) {
return;
}
leisureUpdatesSubscriptionsCount--;
if (leisureUpdatesSubscriptionsCount == 0) {
firebase.child(FirebaseStructure.LEISURES).removeEventListener(leisureUpdatesListener);
leisureUpdatesObservable = null;
}
}
Questo è l'unico modo che ho trovato per rendere l'Observable
emette elementi quando v'è un abbonato, ma mi sento come se ci deve essere un più facile modo, specialmente capire quando non ci sono più abbonati che ascoltano l'osservabile.
Chiunque abbia riscontrato un problema simile o abbia un approccio diverso?
Sono abbastanza nuovo per RX e volevo dare a Firestore - il nuovo database in tempo reale per Firebase - una prova. Tuttavia, mi sto imbattendo negli stessi problemi asincroni che la tua libreria mi ha aiutato a risolvere con Firebase RTDB - grazie mille! Un wrapper rx su Firestore sarebbe simile a quello che hai fatto qui? –