2014-11-21 5 views
8

Sto riscontrando problemi nel trovare qualcosa che potrebbe sembrare relativamente semplice per me.Inizializzazione di un worker con argomenti utilizzando Celery

Sto usando Celery 3.1 con Python 3 e sto iniziando a inizializzare i miei worker con argomenti in modo che possano usare questi dettagli per l'installazione.

In particolare: questi lavoratori consumano attività che richiedono l'interazione con un'API di terze parti utilizzando le credenziali di autenticazione. È necessario che l'operatore trasmetta i dettagli di autenticazione al server API prima di consumare qualsiasi attività (i dettagli di autenticazione sono memorizzati nei cookie dopo la prima richiesta di autenticazione).

Desidero trasferire queste credenziali di accesso al worker quando viene avviato dalla CLI. Vorrei quindi che l'operatore autentificasse l'utilizzo di tali file e memorizzasse la sessione per l'utilizzo quando si consumano attività future (idealmente questo sarebbe archiviato in un attributo a cui è possibile accedere dalle attività).

E 'possibile con Celery?

Come nota a margine, ho considerato di passare un oggetto requests.session (dalla libreria Python requests) come argomento di attività ma che richiederebbe la serializzazione a cui sembra che sia disapprovato.

risposta

16

Suggerirei di utilizzare una classe di base attività astratta e di memorizzare nella cache requests.session.

Dalla documentazione di sedano:

Un compito non viene creata un'istanza per ogni richiesta, ma è registrato nel Registro di sistema compito come istanza globale.

Ciò significa che il costruttore __init__ verrà chiamato una sola volta per processo e che la classe di attività è semanticamente più vicina a un attore.

Questo può anche essere utile per memorizzare le risorse in cache ...

import requests 
from celery import Task 

class APITask(Task): 
    """API requests task class.""" 

    abstract = True 

    # the cached requests.session object 
    _session = None 

    def __init__(self): 
     # since this class is instantiated once, use this method 
     # to initialize and cache resources like a requests.session 
     # or use a property like the example below which will create 
     # a requests.session only the first time it's accessed 

    @property 
    def session(self): 
     if self._session is None: 
      # store the session object for the first time 
      session = requests.Session() 
      session.auth = ('user', 'pass') 

      self._session = session 

     return self._session 

Ora, quando si creano le attività che renderanno le richieste API:

@app.task(base=APITask, bind=True) 
def call_api(self, url): 
    # self will refer to the task instance (because we're using bind=True) 
    self.session.get(url) 

Inoltre è possibile passare le opzioni di autenticazione API utilizzando il app.task decoratore come un argomento in più che sarà ospitato al __dict__ del compito, per esempio:

# pass a custom auth argument 
@app.task(base=APITask, bind=True, auth=('user', 'pass')) 
def call_api(self, url): 
    pass 

e rendere la classe base usa la passata una opzioni, ossia l'autenticazione:

class APITask(Task): 
    """API requests task class.""" 

    abstract = True 

    # the cached requests.session object 
    _session = None 

    # the API authentication 
    auth =() 

    @property 
    def session(self): 
     if self._session is None: 
      # store the session object for the first time 
      session = requests.Session() 
      # use the authentication that was passed to the task 
      session.auth = self.auth 

      self._session = session 

     return self._session 

Si può leggere di più sul sito docs Sedano:

Ora torniamo alla tua domanda iniziale che sta passando argomenti extra al worker dalla riga di comando:

C'è una sezione su questo nei documenti di sedano Adding new command-line options, ecco un esempio di passaggio di un nome utente e una password per il lavoratore dalla riga di comando:

$ celery worker -A appname --username user --password pass 

Il codice:

from celery import bootsteps 
from celery.bin import Option 


app.user_options['worker'].add(
    Option('--username', dest='api_username', default=None, help='API username.') 
) 

app.user_options['worker'].add(
    Option('--password', dest='api_password', default=None, help='API password.') 
) 


class CustomArgs(bootsteps.Step): 

    def __init__(self, worker, api_username, api_password, **options): 
     # store the api authentication 
     APITask.auth = (api_username, api_password) 


app.steps['worker'].add(CustomArgs) 
+0

Eccellente, stavo avendo difficoltà a decifrare tutto ciò dalla documentazione. Grazie per averlo esposto così bene. –

+0

Mi dispiace di approfondire ancora una volta, saresti in grado di chiarire come passare gli argomenti della riga di comando da Boostep all'inizializzazione dell'attività (in modo da poter inizializzare l'oggetto della sessione Attività con il nome utente e la password forniti dal comando -linea). L'obiettivo è non memorizzare le credenziali API in testo semplice. –

+0

@JoshuaGilman scusa per il ritardo, ho aggiornato la risposta con un esempio. – Pierre

0

Penso che potresti chiamare lo script che hai scritto usando gli argomenti della riga di comando. Qualcosa di simile a quanto segue:

my_script.py username password 

All'interno dello script, si può avere la funzione principale avvolto in un decoratore @celery.task o @app.task.

import sys 

from celery import Celery 

cel = Celery() # put whatever config info you need in here 

@celery.task 
def main(): 
    username, password = sys.argv[1], sys.argv[2] 

Qualcosa del genere dovrebbe iniziare. Assicurati di controllare anche Python argparse per l'analisi degli argomenti più sofisticata.

+0

Grazie, ma non è possibile avviare un processo di lavoro chiamando lo script python. Devi invocare il sedano in questo modo: 'celery -A proj worker -l info' –

+0

Dobbiamo avere un setup davvero strano qui allora ... perché è così che ha funzionato. Dovrò studiare quello che stiamo facendo ancora. –