2015-04-09 16 views
5

Quindi, in pratica ho un bel flusso di lavoro complesso, che è simile a questo:Sedano: accedere a tutti i risultati precedenti in una catena

>>> res = (add.si(2, 2) | add.s(4) | add.s(8))() 
>>> res.get() 
16 

Successivamente è piuttosto banale per me a piedi la catena del risultato e raccogliere tutti i singoli risultati:

>>> res.parent.get() 
8 

>>> res.parent.parent.get() 
4 

il mio problema è, che cosa se il mio terzo compito dipende conoscere il risultato della prima, ma in questo esempio riceve solo il risultato del secondo?

Anche le catene sono piuttosto lunghe ei risultati non sono così piccoli, quindi il solo passaggio attraverso l'input come risultato potrebbe inquinare inutilmente il risultato-archivio. Che è Redis, quindi le limitazioni quando si usa RabbitMQ, ZeroMQ, ... non si applicano.

risposta

2

Assegno ad ogni catena un ID di lavoro e rintraccio questo lavoro salvando i dati in un database.

Avvio coda

if __name__ == "__main__": 
    # Generate unique id for the job 
    job_id = uuid.uuid4().hex 
    # This is the root parent 
    parent_level = 1 
    # Pack the data. The last value is your value to add 
    parameters = job_id, parent_level, 2 
    # Build the chain. I added an clean task that removes the data 
    # created during the process (if you want it) 
    add_chain = add.s(parameters, 2) | add.s(4) | add.s(8)| clean.s() 
    add_chain.apply_async() 

Ora i compiti

#Function for store the result. I used sqlalchemy (mysql) but you can 
# change it for whatever you want (distributed file system for example) 
@inject.params(entity_manager=EntityManager) 
def save_result(job_id, level, result, entity_manager): 
    r = Result() 
    r.job_id = job_id 
    r.level = level 
    r.result = result 
    entity_manager.add(r) 
    entity_manager.commit() 

#Restore a result from one parent 
@inject.params(entity_manager=EntityManager) 
def get_result(job_id, level, entity_manager): 
    result = entity_manager.query(Result).filter_by(job_id=job_id, level=level).one() 
    return result.result 

#Clear the data or do something with the final result 
@inject.params(entity_manager=EntityManager) 
    def clear(job_id, entity_manager): 
    entity_manager.query(Result).filter_by(job_id=job_id).delete() 

@app.task() 
def add(parameters, number): 
    # Extract data from parameters list 
    job_id, level, other_number = parameters 

    #Load result from your second parent (level - 2) 
    #For level 3 parent level - 3 and so on 
    #second_parent_result = get_result(job_id, level - 2) 

    # do your stuff, I guess you want to add numbers 
    result = number + other_number 
    save_result(job_id, level, result) 

    #Return the result of the sum or anything you want, but you have to send something because the "add" function expects 3 values 
    #Of course your should return the actual job and increment the parent level 
    return job_id, level + 1, result 

@app.task() 
def clean(parameters): 
    job_id, level, result = parameters 
    #Do something with final result or not 
    #Clear the data 
    clear(job_id) 

Io uso un entity_manager alle gestisce le operazioni del database. Il mio gestore di entità utilizza sql alchemy e mysql. Ho anche usato una tabella "risultato" per memorizzare i risultati parziali. Questa parte dovrebbe essere cambiamento per il sistema di storage migliore (o usare questo se mysql è ok per voi)

from sqlalchemy.orm import sessionmaker 
from sqlalchemy import create_engine 
import inject 

class EntityManager(): 

    session = None 

    @inject.params(config=Configuration) 
    def __init__(self, config): 
    conf = config['persistence'] 
    uri = conf['driver'] + "://" + conf['username'] + ":@" + conf['host'] + "/" + conf['database'] 

    engine = create_engine(uri, echo=conf['debug']) 

    Session = sessionmaker(bind=engine) 
    self.session = Session() 

    def query(self, entity_type): 
    return self.session.query(entity_type) 

    def add(self, entity): 
    return self.session.add(entity) 

    def flush(self): 
    return self.session.flush() 

    def commit(self): 
    return self.session.commit() 

class Configuration: 
    def __init__(self, params): 
    f = open(os.environ.get('PYTHONPATH') + '/conf/config.yml') 
    self.configMap = yaml.safe_load(f) 
    f.close() 

    def __getitem__(self, key: str): 
    return self.configMap[key] 

class Result(Base): 
    __tablename__ = 'result' 

    id = Column(Integer, primary_key=True) 
    job_id = Column(String(255)) 
    level = Column(Integer) 
    result = Column(Integer) 

    def __repr__(self): 
    return "<Result (job='%s', level='%s', result='%s')>" % (self.job_id, str(self.level), str(self.result)) 

ho usato il pacchetto di iniettare per ottenere un iniettore di dipendenza. Il pacchetto inject riutilizzerà l'oggetto in modo da poter inserire l'accesso al database ogni volta che lo desideri e non preoccuparti della connessione.

La configurazione della classe consiste nel caricare i dati di accesso al database in un file di configurazione. È possibile sostituirlo e utilizzare i dati statici (una mappa con hardcoded) per il test.

Modificare l'iniezione di dipendenza per qualsiasi altra cosa adatta a te. Questa è solo la mia soluzione. L'ho appena aggiunto per un test veloce.

La chiave qui è salvare i risultati parziali da qualche parte dal sistema di coda e nelle attività restituire i dati per l'accesso a questi risultati (ID job e livello padre). Manderai questo extra (ma piccolo) dato che è un indirizzo (job_id + livello genitore) che punta ai dati reali (alcune cose importanti).

Questa soluzione che quello che sto utilizzando nel mio software

+0

Grazie! Onestamente penso che tutte e tre le risposte siano grandi e meritino la generosità. Sono andato per il tuo perché produce il minimo mal di testa mantenendo i risultati precedenti dalla mia archiviazione dei risultati effettivi. – WhatIsName

1

Un lavoro semplice consiste nel memorizzare i risultati delle attività in un elenco e utilizzarli nelle attività.

from celery import Celery, chain 
from celery.signals import task_success 

results = [] 

app = Celery('tasks', backend='amqp', broker='amqp://') 


@task_success.connect() 
def store_result(**kwargs): 
    sender = kwargs.pop('sender') 
    result = kwargs.pop('result') 
    results.append((sender.name, result)) 


@app.task 
def add(x, y): 
    print("previous results", results) 
    return x + y 

Ora, nella catena, è possibile accedere a tutti i risultati precedenti da qualsiasi attività in qualsiasi ordine.

1

Forse la configurazione è troppo complesso per questo, ma mi piace usare group combinato con un compito noop di realizzare qualcosa di simile. Lo faccio in questo modo perché voglio evidenziare le aree che sono ancora sincrone nella mia pipeline (di solito in modo che possano essere rimosse).

Usando qualcosa di simile al tuo esempio, inizio con una serie di compiti che simile a questa:

tasks.py:

from celery import Celery 

app = Celery('tasks', backend="redis", broker='redis://localhost') 


@app.task 
def add(x, y): 
     return x + y 


@app.task 
def xsum(elements): 
    return sum(elements) 


@app.task 
def noop(ignored): 
    return ignored 

Con questi compiti creo una catena con un gruppo per controllare i risultati che dipendono dai risultati sincroni:

In [1]: from tasks import add,xsum,noop 
In [2]: from celery import group 

# First I run the task which I need the value of later, then I send that result to a group where the first task does nothing and the other tasks are my pipeline. 
In [3]: ~(add.si(2, 2) | group(noop.s(), add.s(4) | add.s(8))) 
Out[3]: [4, 16] 

# At this point I have a list where the first element is the result of my original task and the second element has the result of my workflow. 
In [4]: ~(add.si(2, 2) | group(noop.s(), add.s(4) | add.s(8)) | xsum.s()) 
Out[4]: 20 

# From here, things can go back to a normal chain 
In [5]: ~(add.si(2, 2) | group(noop.s(), add.s(4) | add.s(8)) | xsum.s() | add.s(1) | add.s(1)) 
Out[5]: 22 

Spero sia utile!

+0

Questo è geniale! Il pensiero fuori dagli schemi e l'impronta più piccola di tutte le risposte. Sfortunatamente è un bug di sedano aperto, che impedisce i gruppi nidificati, quindi non posso usarlo al momento, ma non vedo l'ora di passare a questo un giorno! – WhatIsName

+0

Oh, interessante. Hai per caso un link al bug? Uso occasionalmente gruppi annidati e vorrei trovare ulteriori informazioni. –

+1

https://github.com/celery/celery/issues/2573 – WhatIsName