2016-06-14 34 views
5

Quando si tenta di utilizzare le funzionalità di template di Airflow (tramite Jinja2) con PostgresOperator, non sono riuscito a ottenere le cose da eseguire. È possibile che stia facendo qualcosa di sbagliato, ma sono piuttosto dispiaciuto su quale potrebbe essere il problema. Ecco un esempio per riprodurre l'errore TemplateNotFound Sono stato sempre:TemplateNotFound quando si utilizza PostgresOperator di Airflow con il templating di Jinja e SQL

airflow.cfg

airflow_home = /home/gregreda/airflow 
dags_folder = /home/gregreda/airflow/dags 

rilevanti DAG e le variabili

default_args = { 
    'owner': 'gregreda', 
    'start_date': datetime(2016, 6, 1), 
    'schedule_interval': None, 
    'depends_on_past': False, 
    'retries': 3, 
    'retry_delay': timedelta(minutes=5) 
} 

this_dag_path = '/home/gregreda/airflow/dags/example_csv_to_redshift' 
dag = DAG(
    dag_id='example_csv_to_redshift', 
    schedule_interval=None, 
    default_args=default_args 
) 

/example_csv_to_redshift/csv_to_redshift.py

copy_s3_to_redshift = PostgresOperator(
    task_id='load_table', 
    sql=this_dag_path + '/copy_to_redshift.sql', 
    params=dict(
     AWS_ACCESS_KEY_ID=Variable.get('AWS_ACCESS_KEY_ID'), 
     AWS_SECRET_ACCESS_KEY=Variable.get('AWS_SECRET_ACCESS_KEY') 
    ), 
    postgres_conn_id='postgres_redshift', 
    autocommit=False, 
    dag=dag 
) 

/example_csv_to_redshift/copy_to_redshift.sql

COPY public.table_foobar FROM 's3://mybucket/test-data/import/foobar.csv' 
CREDENTIALS 'aws_access_key_id={{ AWS_ACCESS_KEY_ID }};aws_secret_access_key={{ AWS_SECRET_ACCESS_KEY }}' 
CSV 
NULL as 'null' 
IGNOREHEADER as 1; 

Calling airflow render example_csv_to_redshift load_table 2016-06-14 genera l'eccezione di seguito. Nota Sto correndo questo problema anche per un altro DAG, motivo per cui viene visualizzato il percorso con example_redshift_query_to_csv menzionato.

[2016-06-14 21:24:57,484] {__init__.py:36} INFO - Using executor SequentialExecutor 
[2016-06-14 21:24:57,565] {driver.py:120} INFO - Generating grammar tables from /usr/lib/python2.7/lib2to3/Grammar.txt 
[2016-06-14 21:24:57,596] {driver.py:120} INFO - Generating grammar tables from /usr/lib/python2.7/lib2to3/PatternGrammar.txt 
[2016-06-14 21:24:57,763] {models.py:154} INFO - Filling up the DagBag from /home/gregreda/airflow/dags 
[2016-06-14 21:24:57,828] {models.py:2040} ERROR - /home/gregreda/airflow/dags/example_redshift_query_to_csv/export_query_to_s3.sql 
Traceback (most recent call last): 
    File "/usr/local/lib/python2.7/dist-packages/airflow/models.py", line 2038, in resolve_template_files 
    setattr(self, attr, env.loader.get_source(env, content)[0]) 
    File "/usr/local/lib/python2.7/dist-packages/jinja2/loaders.py", line 187, in get_source 
    raise TemplateNotFound(template) 
TemplateNotFound: /home/gregreda/airflow/dags/example_redshift_query_to_csv/export_query_to_s3.sql 
[2016-06-14 21:24:57,834] {models.py:2040} ERROR - /home/gregreda/airflow/dags/example_csv_to_redshift/copy_to_redshift.sql 
Traceback (most recent call last): 
    File "/usr/local/lib/python2.7/dist-packages/airflow/models.py", line 2038, in resolve_template_files 
    setattr(self, attr, env.loader.get_source(env, content)[0]) 
    File "/usr/local/lib/python2.7/dist-packages/jinja2/loaders.py", line 187, in get_source 
    raise TemplateNotFound(template) 
TemplateNotFound: /home/gregreda/airflow/dags/example_csv_to_redshift/copy_to_redshift.sql 
Traceback (most recent call last): 
    File "/usr/local/bin/airflow", line 15, in <module> 
    args.func(args) 
    File "/usr/local/lib/python2.7/dist-packages/airflow/bin/cli.py", line 359, in render 
    ti.render_templates() 
    File "/usr/local/lib/python2.7/dist-packages/airflow/models.py", line 1409, in render_templates 
    rendered_content = rt(attr, content, jinja_context) 
    File "/usr/local/lib/python2.7/dist-packages/airflow/models.py", line 2017, in render_template 
    return jinja_env.get_template(content).render(**context) 
    File "/usr/local/lib/python2.7/dist-packages/jinja2/environment.py", line 812, in get_template 
    return self._load_template(name, self.make_globals(globals)) 
    File "/usr/local/lib/python2.7/dist-packages/jinja2/environment.py", line 774, in _load_template 
    cache_key = self.loader.get_source(self, name)[1] 
    File "/usr/local/lib/python2.7/dist-packages/jinja2/loaders.py", line 187, in get_source 
    raise TemplateNotFound(template) 
jinja2.exceptions.TemplateNotFound: /home/gregreda/airflow/dags/example_csv_to_redshift/copy_to_redshift.sql 

Tutte le idee verso una soluzione sono molto apprezzate.

risposta

3

Standard PEBCAK error.

Si è verificato un problema che specifica il percorso del modello SQL all'interno della specifica attività Airflow, che doveva essere relativa.

copy_s3_to_redshift = PostgresOperator(
    task_id='load_table', 
    sql='/copy_to_redshift.sql', 
    params=dict(
     AWS_ACCESS_KEY_ID=Variable.get('AWS_ACCESS_KEY_ID'), 
     AWS_SECRET_ACCESS_KEY=Variable.get('AWS_SECRET_ACCESS_KEY') 
    ), 
    postgres_conn_id='postgres_redshift', 
    autocommit=False, 
    dag=dag 
) 

Inoltre, il modello SQL aveva bisogno di essere cambiato un po '(si noti la params. ... questa volta):

COPY public.pitches FROM 's3://mybucket/test-data/import/heyward.csv' 
CREDENTIALS 'aws_access_key_id={{ params.AWS_ACCESS_KEY_ID }};aws_secret_access_key={{ params.AWS_SECRET_ACCESS_KEY }}' 
CSV 
NULL as 'null' 
IGNOREHEADER as 1; 
+1

Sono stato in grado di imparare come utilizzare Variables, PostgresOperator e file di riferimento .sql da questo. Grazie! – trench

0

Per un po' più di controllo, init vostro DAG con il template_searchpath param, poi basta usare il nome file nell'operatore.

:param template_searchpath: This list of folders (non relative) 
    defines where jinja will look for your templates. Order matters. 
    Note that jinja/airflow includes the path of your DAG file by 
    default 
:type template_searchpath: string or list of stings