2014-10-20 10 views
7

Quando uno dei miei test di unità elimina un oggetto SQLAlchemy, l'oggetto attiva un evento after_delete che attiva un'attività di Celery per eliminare un file dall'unità.La connessione viene chiusa quando un evento SQLAlchemy attiva un'attività di Celeria

L'attività è CELERY_ALWAYS_EAGER = True durante il test.

gist to reproduce the issue easily

L'esempio ha due prove. Si attiva l'attività nell'evento, l'altra all'esterno dell'evento. Solo quello nell'evento chiude la connessione.

Per riprodurre rapidamente l'errore è possibile eseguire:

git clone https://gist.github.com/5762792fc1d628843697.git 
cd 5762792fc1d628843697 
virtualenv venv 
. venv/bin/activate 
pip install -r requirements.txt 
python test.py 

Lo stack:

$  python test.py 
E 
====================================================================== 
ERROR: test_delete_task (__main__.CeleryTestCase) 
---------------------------------------------------------------------- 
Traceback (most recent call last): 
    File "test.py", line 73, in test_delete_task 
    db.session.commit() 
    File "/home/brice/Code/5762792fc1d628843697/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/scoping.py", line 150, in do 
    return getattr(self.registry(), name)(*args, **kwargs) 
    File "/home/brice/Code/5762792fc1d628843697/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 776, in commit 
    self.transaction.commit() 
    File "/home/brice/Code/5762792fc1d628843697/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 377, in commit 
    self._prepare_impl() 
    File "/home/brice/Code/5762792fc1d628843697/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 357, in _prepare_impl 
    self.session.flush() 
    File "/home/brice/Code/5762792fc1d628843697/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 1919, in flush 
    self._flush(objects) 
    File "/home/brice/Code/5762792fc1d628843697/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 2037, in _flush 
    transaction.rollback(_capture_exception=True) 
    File "/home/brice/Code/5762792fc1d628843697/venv/local/lib/python2.7/site-packages/sqlalchemy/util/langhelpers.py", line 63, in __exit__ 
    compat.reraise(type_, value, traceback) 
    File "/home/brice/Code/5762792fc1d628843697/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 2037, in _flush 
    transaction.rollback(_capture_exception=True) 
    File "/home/brice/Code/5762792fc1d628843697/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 393, in rollback 
    self._assert_active(prepared_ok=True, rollback_ok=True) 
    File "/home/brice/Code/5762792fc1d628843697/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 223, in _assert_active 
    raise sa_exc.ResourceClosedError(closed_msg) 
ResourceClosedError: This transaction is closed 

---------------------------------------------------------------------- 
Ran 1 test in 0.014s 

FAILED (errors=1) 
+0

Si prega di inviare l'errore e la traccia dello stack. – ACV

+0

Ho appena aggiornato con l'output di test. Ho anche aggiunto i comandi per riprodurli facilmente su una macchina locale * nix. – deBrice

risposta

8

Credo di aver trovato il problema - è nel modo in cui si imposta il compito di sedano. Se si rimuove la chiamata contesto app dalla vostra configurazione di sedano, tutto funziona bene:

class ContextTask(TaskBase): 
    abstract = True 

    def __call__(self, *args, **kwargs): 
     # deleted --> with app.app_context(): 
     return TaskBase.__call__(self, *args, **kwargs) 

C'è un grande allarme nella documentazione SQLAlchemy mai la modifica della sessione durante after_delete eventi: http://docs.sqlalchemy.org/en/latest/orm/events.html#sqlalchemy.orm.events.MapperEvents.after_delete

così ho il sospetto il with app.app_context(): viene chiamato durante l'eliminazione, cercando di collegare e/o modificare la sessione che Flask-SQLAlchemy memorizza nell'oggetto app e quindi l'intera cosa sta bombardando.

Flask-SQlAlchemy fa un sacco di magia dietro le quinte per te, ma puoi ignorarlo e usare SQLAlchemy direttamente. Se hai bisogno di parlare al database durante l'evento di eliminazione, è possibile creare una nuova sessione al db:

@celery.task() 
def my_task(): 
    # obviously here I create a new object 
    session = db.create_scoped_session() 
    session.add(User(id=13, value="random string")) 
    session.commit() 
    return 

Ma sembra che tu non hai bisogno di questo, si sta solo cercando di eliminare un'immagine sentiero. In tal caso, vorrei solo cambiare il vostro compito quindi ci vuole un percorso:

# instance will call the task 
@event.listens_for(User, "after_delete") 
def after_delete(mapper, connection, target): 
    my_task.delay(target.value) 

@celery.task() 
def my_task(image_path): 
    os.remove(image_path) 

Speriamo che sia utile - fatemi sapere se qualcuno di questo non funziona per voi. Grazie per l'installazione molto dettagliata, ha davvero aiutato nel debug.

+0

Lo hai inchiodato, il problema è stato ridefinire l'attività da eseguire all'interno del contesto dell'app. Vedrò se è possibile definire un decoratore 'celery.task_with_context' oltre al' celery.task' esistente. Grazie mille! – deBrice

+0

Nessun problema! Sono contento che ci abbia aiutato. –

+1

Oh hai @RachelSanders, voglia di correre qui dentro. – ACV

0

ASK, il creatore di sedano, ha suggerito che la soluzione on github

from celery import signals 

def make_celery(app): 
    ... 

    @signals.task_prerun.connect 
    def add_task_flask_context(sender, **kwargs): 
     if not sender.request.is_eager: 
      sender.request.flask_context = app.app_context().__enter__() 

    @signals.task_postrun.connect 
    def cleanup_task_flask_context(sender, **kwargs): 
     flask_context = getattr(sender.request, 'flask_context', None) 
     if flask_context is not None: 
      flask_context.__exit__(None, None, None) 
1

Simile alla risposta suggerita da deBrice, ma utilizzando l'approccio simile a Rachel.

class ContextTask(TaskBase): 
    abstract = True 

    def __call__(self, *args, **kwargs): 
     import flask 
     # tests will be run in unittest app context 
     if flask.current_app: 
      return TaskBase.__call__(self, *args, **kwargs) 
     else: 
      # actual workers need to enter worker app context 
      with app.app_context(): 
       return TaskBase.__call__(self, *args, **kwargs)