2016-04-19 12 views
19

Sono davvero un principiante in questo forum. Ma ho giocato con il flusso d'aria, per qualche tempo, per la nostra azienda. Scusa se questa domanda sembra davvero stupida.execution_date in airflow: necessario accedere come variabile

Sto scrivendo una pipeline usando un gruppo di BashOperators. In sostanza, per ogni attività, voglio chiamare semplicemente un'API REST utilizzando 'ricciolo'

Questo è ciò che la mia condotta assomiglia (versione molto semplificata):

from airflow import DAG 
from airflow.operators import BashOperator, PythonOperator 
from dateutil import tz 
import datetime 

datetime_obj = datetime.datetime 

default_args = { 
    'owner': 'airflow', 
    'depends_on_past': False, 
    'start_date': datetime.datetime.combine(datetime_obj.today() - datetime.timedelta(1), datetime_obj.min.time()), 
    'email': ['[email protected]'], 
    'email_on_failure': True, 
    'email_on_retry': False, 
    'retries': 2, 
    'retry_delay': datetime.timedelta(minutes=5), 
} 


current_datetime = datetime_obj.now(tz=tz.tzlocal()) 

dag = DAG(
    'test_run', default_args=default_args, schedule_interval=datetime.timedelta(minutes=60)) 

curl_cmd='curl -XPOST "'+hostname+':8000/run?st='+current_datetime +'"' 


t1 = BashOperator(
    task_id='rest-api-1', 
    bash_command=curl_cmd, 
    dag=dag) 

Se si nota che sto facendo current_datetime= datetime_obj.now(tz=tz.tzlocal()) Invece quello che voglio qui è 'execution_date'

Come faccio a usare 'execution_date' direttamente e assegnarlo a una variabile i n il mio file python?

Ho riscontrato questo problema generale di accesso a args. Qualsiasi aiuto sarà sinceramente apprezzato.

Grazie

risposta

13

Il costruttore PythonOperator prende un parametro 'provide_context' (vedi https://pythonhosted.org/airflow/code.html). Se è True, passa un numero di parametri in python_callable tramite kwargs. kwargs ['execution_date'] è ciò che vuoi, credo.

Qualcosa di simile a questo:

def python_method(ds, **kwargs): 
    Variable.set('execution_date', kwargs['execution_date']) 
    return 

doit = PythonOperator(
    task_id='doit', 
    provide_context=True, 
    python_callable=python_method, 
    dag=dag) 

non sono sicuro di come farlo con la BashOperator, ma si potrebbe iniziare con questo problema: https://github.com/airbnb/airflow/issues/775

+2

Grazie. Con questo approccio, avrò un task t1, che sarà un'istanza di PythonOperator con provide_context = true, che mi permette di usare kwargs ['execution_date'] dove imposterò e restituirò current_datetime = 'execution_date'. Quindi creo il mio compito t2: BashOperator: in cui eseguirò il pull (usando XCOM) e utilizzo le mie variabili. Quindi, vedi, devo creare 2 compiti. che non è molto sexy;) Sono sicuro (e spero di aver ragione) che c'è un modo per accedere a 'execution_date' direttamente nel codice python senza usare PythonOperator. Ma non sono in grado di capire come farlo :( – Roger

+0

questo approccio lo ha risolto per me! – Nico

13

Il BashOperator bash_command argomento è un modello. È possibile accedere a execution_date in qualsiasi modello come oggetto datetime utilizzando la variabile execution_date. Nel modello, puoi usare qualsiasi metodo jinja2 per manipolarlo. Utilizzando il seguente come stringa di bash_command BashOperator, ad esempio:

# pass in the first of the current month 
some_command.sh {{ execution_date.replace(day=1) }} 

# last day of previous month 
some_command.sh {{ execution_date.replace(day=1) - macros.timedelta(days=1) }} 

Se si desidera solo l'equivalente di stringa del data di esecuzione, ds restituirà un datestamp (AAAA-MM-DD), ds-nodash rendimenti stesso senza trattini (YYYYMMDD), ecc. Ulteriori informazioni sui macro sono disponibili nello Api Docs.

+0

Questa è la risposta corretta. Vorrei semplicemente modificarlo per mostrare una versione completata dell'attività, ad esempio 't1 = BashOperator ( task_id = 'rest-api-1', bash_command = 'curl -XPOST' '+ hostname +': 8000/run? st = {{execution_date}} "', dag = dag)' – Davos

0

L'execution_date, (datetime.datetime)

{{ execution_date }} 
1

Penso che non è possibile assegnare variabili con valori dal contesto flusso d'aria al di fuori di un'istanza di attività, sono disponibili solo in fase di esecuzione. Fondamentalmente ci sono 2 diversi passi quando un DAG viene caricato ed eseguito nel flusso d'aria:

  • Prima il file dag viene interpretato e analizzato. Deve funzionare e compilare e le definizioni dell'attività devono essere corrette (nessun errore di sintassi o altro). Durante questo passaggio, se si eseguono chiamate di funzione per riempire alcuni valori, queste funzioni non saranno in grado di accedere al contesto del flusso d'aria (ad esempio, la data di esecuzione, ancor più se si sta facendo un rinterro).

  • Il secondo passo è l'esecuzione del dag.È solo durante questo secondo passaggio che le variabili fornite dal flusso d'aria (execution_date, ds, etc...) sono disponibili in quanto correlate a un'esecuzione del dag.

Quindi non è possibile inizializzare le variabili globali utilizzando il contesto di flusso d'aria, tuttavia, flusso d'aria ti dà molteplici meccanismi per ottenere lo stesso effetto:

  1. Utilizzando template Jinja nel comando (può essere in una stringa nel codice o in un file, entrambi saranno elaborati). Hai l'elenco dei modelli disponibili qui: https://airflow.apache.org/code.html#default-variables. Si noti che sono disponibili anche alcune funzioni, in particolare per il calcolo dei giorni di formattazione delta e della data.

  2. Utilizzo di un PythonOperator in cui si passa il contesto (con l'argomento provide_context). Ciò ti consentirà di accedere allo stesso modello con la sintassi kwargs['<variable_name']. Se è necessario, è possibile restituire un valore da un PythonOperator, questo verrà memorizzato in una variabile XCOM che è possibile utilizzare in un secondo momento in qualsiasi modello. L'accesso alle variabili XCOM utilizza questa sintassi: https://airflow.apache.org/concepts.html#xcoms

  3. Se si scrive il proprio operatore, è possibile accedere alle variabili del flusso d'aria con il dict context.

+1

C'è tecnicamente 3 modi per fare come indicato in altre domande sopra.Usando il template jinja, usando kwargs in un python_callable, o usando context ['execution_date'] in un operatore Probabilmente è meglio rimuovere completamente questa risposta, o almeno cancellarne la maggior parte .. – Davos

+1

Grazie per a testa alta, ho imparato molto sul flusso d'aria da quando ho scritto questa risposta, l'ho modificato per renderlo più giusto e preciso! – Babcool

+0

Ho fatto alcune modifiche minori per fare il tuo primo statem di riepilogo ent coerente con i 2 punti seguenti. Penso che questa risposta sia corretta, anche se potresti aggiungere altri esempi di codice per i punti extra. – Davos

1
def execute(self, context): 
    execution_date = context.get("execution_date") 

Questo dovrebbe essere all'interno del metodo execute() di operatore

+0

Questo è probabilmente ciò che si desidera se si sta costruendo un operatore personalizzato. –