Assegno ad ogni catena un ID di lavoro e rintraccio questo lavoro salvando i dati in un database.
Avvio coda
if __name__ == "__main__":
# Generate unique id for the job
job_id = uuid.uuid4().hex
# This is the root parent
parent_level = 1
# Pack the data. The last value is your value to add
parameters = job_id, parent_level, 2
# Build the chain. I added an clean task that removes the data
# created during the process (if you want it)
add_chain = add.s(parameters, 2) | add.s(4) | add.s(8)| clean.s()
add_chain.apply_async()
Ora i compiti
#Function for store the result. I used sqlalchemy (mysql) but you can
# change it for whatever you want (distributed file system for example)
@inject.params(entity_manager=EntityManager)
def save_result(job_id, level, result, entity_manager):
r = Result()
r.job_id = job_id
r.level = level
r.result = result
entity_manager.add(r)
entity_manager.commit()
#Restore a result from one parent
@inject.params(entity_manager=EntityManager)
def get_result(job_id, level, entity_manager):
result = entity_manager.query(Result).filter_by(job_id=job_id, level=level).one()
return result.result
#Clear the data or do something with the final result
@inject.params(entity_manager=EntityManager)
def clear(job_id, entity_manager):
entity_manager.query(Result).filter_by(job_id=job_id).delete()
@app.task()
def add(parameters, number):
# Extract data from parameters list
job_id, level, other_number = parameters
#Load result from your second parent (level - 2)
#For level 3 parent level - 3 and so on
#second_parent_result = get_result(job_id, level - 2)
# do your stuff, I guess you want to add numbers
result = number + other_number
save_result(job_id, level, result)
#Return the result of the sum or anything you want, but you have to send something because the "add" function expects 3 values
#Of course your should return the actual job and increment the parent level
return job_id, level + 1, result
@app.task()
def clean(parameters):
job_id, level, result = parameters
#Do something with final result or not
#Clear the data
clear(job_id)
Io uso un entity_manager alle gestisce le operazioni del database. Il mio gestore di entità utilizza sql alchemy e mysql. Ho anche usato una tabella "risultato" per memorizzare i risultati parziali. Questa parte dovrebbe essere cambiamento per il sistema di storage migliore (o usare questo se mysql è ok per voi)
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
import inject
class EntityManager():
session = None
@inject.params(config=Configuration)
def __init__(self, config):
conf = config['persistence']
uri = conf['driver'] + "://" + conf['username'] + ":@" + conf['host'] + "/" + conf['database']
engine = create_engine(uri, echo=conf['debug'])
Session = sessionmaker(bind=engine)
self.session = Session()
def query(self, entity_type):
return self.session.query(entity_type)
def add(self, entity):
return self.session.add(entity)
def flush(self):
return self.session.flush()
def commit(self):
return self.session.commit()
class Configuration:
def __init__(self, params):
f = open(os.environ.get('PYTHONPATH') + '/conf/config.yml')
self.configMap = yaml.safe_load(f)
f.close()
def __getitem__(self, key: str):
return self.configMap[key]
class Result(Base):
__tablename__ = 'result'
id = Column(Integer, primary_key=True)
job_id = Column(String(255))
level = Column(Integer)
result = Column(Integer)
def __repr__(self):
return "<Result (job='%s', level='%s', result='%s')>" % (self.job_id, str(self.level), str(self.result))
ho usato il pacchetto di iniettare per ottenere un iniettore di dipendenza. Il pacchetto inject riutilizzerà l'oggetto in modo da poter inserire l'accesso al database ogni volta che lo desideri e non preoccuparti della connessione.
La configurazione della classe consiste nel caricare i dati di accesso al database in un file di configurazione. È possibile sostituirlo e utilizzare i dati statici (una mappa con hardcoded) per il test.
Modificare l'iniezione di dipendenza per qualsiasi altra cosa adatta a te. Questa è solo la mia soluzione. L'ho appena aggiunto per un test veloce.
La chiave qui è salvare i risultati parziali da qualche parte dal sistema di coda e nelle attività restituire i dati per l'accesso a questi risultati (ID job e livello padre). Manderai questo extra (ma piccolo) dato che è un indirizzo (job_id + livello genitore) che punta ai dati reali (alcune cose importanti).
Questa soluzione che quello che sto utilizzando nel mio software
Grazie! Onestamente penso che tutte e tre le risposte siano grandi e meritino la generosità. Sono andato per il tuo perché produce il minimo mal di testa mantenendo i risultati precedenti dalla mia archiviazione dei risultati effettivi. – WhatIsName