2013-06-04 7 views
10

Sto cercando di scrivere una topologia che esegue le operazioni seguenti:raggruppamento in una topologia tempesta semplice aggregazione

  1. un beccuccio che sottoscrive un feed Twitter (basata su una parola chiave)
  2. Un bullone di aggregazione che aggrega una serie di tweet (ad esempio N) in una raccolta e invia loro il bullone della stampante
  3. Un semplice bullone che stampa la raccolta sulla console in una sola volta.

In realtà voglio fare un po 'più di elaborazione sulla collezione.

L'ho provato localmente e sembra che funzioni. Tuttavia, non sono sicuro di aver impostato correttamente i raggruppamenti sui bulloni e se ciò funzionasse correttamente quando è stato distribuito su un effettivo cluster di temporali. Sarei grato se qualcuno possa contribuire a rivedere questa topologia e suggerire eventuali errori, modifiche o miglioramenti.

Grazie.

Ecco come si presenta la mia topologia.

builder.setSpout("spout", new TwitterFilterSpout("pittsburgh")); 
    builder.setBolt("sampleaggregate", new SampleAggregatorBolt()) 
       .shuffleGrouping("spout"); 
    builder.setBolt("printaggreator",new PrinterBolt()).shuffleGrouping("sampleaggregate"); 

aggregazione Bolt

public class SampleAggregatorBolt implements IRichBolt { 

    protected OutputCollector collector; 
    protected Tuple currentTuple; 
    protected Logger log; 
    /** 
    * Holds the messages in the bolt till you are ready to send them out 
    */ 
    protected List<Status> statusCache; 

    @Override 
    public void prepare(Map stormConf, TopologyContext context, 
         OutputCollector collector) { 
     this.collector = collector; 

     log = Logger.getLogger(getClass().getName()); 
     statusCache = new ArrayList<Status>(); 
    } 

    @Override 
    public void execute(Tuple tuple) { 
     currentTuple = tuple; 

     Status currentStatus = null; 
     try { 
      currentStatus = (Status) tuple.getValue(0); 
     } catch (ClassCastException e) { 
     } 
     if (currentStatus != null) { 

      //add it to the status cache 
      statusCache.add(currentStatus); 
      collector.ack(tuple); 


      //check the size of the status cache and pass it to the next stage if you have enough messages to emit 
      if (statusCache.size() > 10) { 
       collector.emit(new Values(statusCache)); 
      } 

     } 
    } 

    @Override 
    public void cleanup() { 


    } 

    @Override 
    public void declareOutputFields(OutputFieldsDeclarer declarer) { 
     declarer.declare(new Fields("tweets")); 

    } 

    @Override 
    public Map<String, Object> getComponentConfiguration() { 
     return null; //To change body of implemented methods use File | Settings | File Templates. 
    } 


    protected void setupNonSerializableAttributes() { 

    } 

} 

Bolt stampante

public class PrinterBolt extends BaseBasicBolt { 

    @Override 
    public void execute(Tuple tuple, BasicOutputCollector collector) { 
     System.out.println(tuple.size() + " " + tuple); 
    } 

    @Override 
    public void declareOutputFields(OutputFieldsDeclarer ofd) { 
    } 

} 

risposta

4

Da quello che posso vedere si guarda bene. Il diavolo è nei dettagli, però. Non sono sicuro di ciò che fa il tuo aggregatore di bulloni, ma se fa qualche ipotesi sui valori che gli vengono passati allora dovresti considerare un raggruppamento di campi appropriato. Questo potrebbe non fare una grande differenza poiché stai usando il suggerimento di parallelismo predefinito di 1, ma se dovessi decidere di ridimensionare con più istanze di bolt aggregate, le ipotesi implicite della logica potrebbero richiedere un raggruppamento non shuffle.

+0

Ho fornito il codice per il bullone di aggregazione sopra (vedere il metodo di esecuzione). Per ora è in attesa che abbia accumulato N (10 nell'esempio sopra) e li divida non appena ha 10 messaggi. Ho appena trovato un bug che risolverò. Ho bisogno di cancellare la cache una volta che ho emesso i valori. Quindi, quali modifiche dovrebbero essere necessarie se ho bisogno di utilizzare più di un aggregatore. –

0

Salve non appena si sta tentando di iscriversi a più di una parola chiave si incontreranno problemi. Suggerisco che il beccuccio emetta anche la parola chiave originale che è stata usata per filtrare.

Poi invece di fare shuffleGrouping Farei un fieldsGrouping

builder.setBolt("sampleaggregate", new SampleAggregatorBolt()) 
      .shuffleGrouping("spout", new Fields("keyword")); 

In questo modo si assicura i risultati di una singola parola finiscono sullo stesso bullone di ogni tempo. Tale che è possibile calcolare correttamente gli aggregati. Se ometti i campi, Storming Storming può istanziare qualsiasi quantità del tuo bullone aggregato e inviare tutti i messaggi dallo spout a qualsiasi istanza del bullone aggregato che nel caso contrario restituirebbe risultati errati.