2015-06-10 15 views
6

Sto provando a verificare se l'applicazione sta riprovando.Come testare nuovamente l'applicazione Celery in Python?

@celery.task(bind=False, default_retry_delay=30) 
def convert_video(gif_url, webhook): 
    // doing something 
    VideoManager().convert(gif_url) 
    return 
    except Exception as exc: 
     raise convert_video.retry(exc=exc) 

E sto beffardo il test

@patch('src.video_manager.VideoManager.convert') 
@patch('requests.post') 
def test_retry_failed_task(self, mock_video_manager, mock_requests): 
    mock_video_manager.return_value= {'webm':'file.webm', 'mp4':'file.mp4', 'ogv' : 'file.ogv', 'snapshot':'snapshot.png'} 
    mock_video_manager.side_effect = Exception('some error') 

    server.convert_video.retry = MagicMock() 

    server.convert_video('gif_url', 'http://www.company.com/webhook?attachment_id=1234') 
    server.convert_video.retry.assert_called_with(ANY) 

e sto ottenendo questo errore

TypeError: exceptions must be old-style classes or derived from BaseException, not MagicMock

che è ovvio, ma non so come fare altrimenti prova se il metodo viene chiamato.

risposta

3

Non è riuscito a farlo funzionare solo con il nuovo tentativo incorporato, quindi devo usare una simulazione con l'effetto collaterale del reale Riprova, questo rende possibile prenderlo in una prova. ho fatto in questo modo:

from celery.exceptions import Retry 
from mock import MagicMock 
from nose.plugins.attrib import attr 

# Set it for for every task-call (or per task below with @patch) 
task.retry = MagicMock(side_effect=Retry) 

#@patch('task.retry', MagicMock(side_effect=Retry) 
def test_task(self): 
    with assert_raises(Retry): 
     task() # Note, no delay or things like that 

# and the task, I don't know if it works without bind. 
@Celery.task(bind=True) 
def task(self): 
    raise self.retry() 

Se qualcuno sa come posso liberarmi del passaggio aggiuntivo nel beffardo il "eccezione" Riprova Sarei felice di sentirlo!

1
from mock import patch 

import pytest  


@patch('tasks.convert_video.retry') 
@patch('tasks.VideoManager') 
def test_retry_on_exception(mock_video_manger, mock_retry): 
    mock_video_manger.convert.side_effect = error = Exception() 
    with pytest.raises(Exception): 
     tasks.convert_video('foo', 'bar') 
    mock_retry.assert_called_with(exc=error) 

si sta anche mancano alcune cose nel vostro compito:

@celery.task(bind=False, default_retry_delay=30) 
def convert_video(gif_url, webhook): 
    try: 
     return VideoManager().convert(gif_url) 
    except Exception as exc: 
     convert_video.retry(exc=exc)