Sto cercando di scrivere una topologia che esegue le operazioni seguenti:raggruppamento in una topologia tempesta semplice aggregazione
- un beccuccio che sottoscrive un feed Twitter (basata su una parola chiave)
- Un bullone di aggregazione che aggrega una serie di tweet (ad esempio N) in una raccolta e invia loro il bullone della stampante
- Un semplice bullone che stampa la raccolta sulla console in una sola volta.
In realtà voglio fare un po 'più di elaborazione sulla collezione.
L'ho provato localmente e sembra che funzioni. Tuttavia, non sono sicuro di aver impostato correttamente i raggruppamenti sui bulloni e se ciò funzionasse correttamente quando è stato distribuito su un effettivo cluster di temporali. Sarei grato se qualcuno possa contribuire a rivedere questa topologia e suggerire eventuali errori, modifiche o miglioramenti.
Grazie.
Ecco come si presenta la mia topologia.
builder.setSpout("spout", new TwitterFilterSpout("pittsburgh"));
builder.setBolt("sampleaggregate", new SampleAggregatorBolt())
.shuffleGrouping("spout");
builder.setBolt("printaggreator",new PrinterBolt()).shuffleGrouping("sampleaggregate");
aggregazione Bolt
public class SampleAggregatorBolt implements IRichBolt {
protected OutputCollector collector;
protected Tuple currentTuple;
protected Logger log;
/**
* Holds the messages in the bolt till you are ready to send them out
*/
protected List<Status> statusCache;
@Override
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.collector = collector;
log = Logger.getLogger(getClass().getName());
statusCache = new ArrayList<Status>();
}
@Override
public void execute(Tuple tuple) {
currentTuple = tuple;
Status currentStatus = null;
try {
currentStatus = (Status) tuple.getValue(0);
} catch (ClassCastException e) {
}
if (currentStatus != null) {
//add it to the status cache
statusCache.add(currentStatus);
collector.ack(tuple);
//check the size of the status cache and pass it to the next stage if you have enough messages to emit
if (statusCache.size() > 10) {
collector.emit(new Values(statusCache));
}
}
}
@Override
public void cleanup() {
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("tweets"));
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null; //To change body of implemented methods use File | Settings | File Templates.
}
protected void setupNonSerializableAttributes() {
}
}
Bolt stampante
public class PrinterBolt extends BaseBasicBolt {
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
System.out.println(tuple.size() + " " + tuple);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer ofd) {
}
}
Ho fornito il codice per il bullone di aggregazione sopra (vedere il metodo di esecuzione). Per ora è in attesa che abbia accumulato N (10 nell'esempio sopra) e li divida non appena ha 10 messaggi. Ho appena trovato un bug che risolverò. Ho bisogno di cancellare la cache una volta che ho emesso i valori. Quindi, quali modifiche dovrebbero essere necessarie se ho bisogno di utilizzare più di un aggregatore. –