Codice:flusso d'aria non si pianifica correttamente Python
Python versione 2.7.x e il flusso d'aria versione 1.5.1
mio script dag è questo
from airflow import DAG
from airflow.operators import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'Vignesh',
'depends_on_past': False,
'start_date': datetime(2015,10,13),
'email': ['[email protected]'],
'schedule_interval':timedelta(minutes=5),
'email_on_failure': True,
'email_on_retry': True,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG('testing', default_args=default_args)
run_this_first = BashOperator(task_id='Start1',bash_command='date', dag=dag)
for i in range(5):
t = BashOperator(task_id="Orders1"+str(i), bash_command='sleep 5',dag=dag)
t.set_upstream(run_this_first)
Da che si poteva vedere che sto creando un DAG con 6 attività, la prima attività (Start1) inizia prima dopo la quale iniziano tutte le altre cinque attività
Attualmente mi hanno dato 5 minuti di ritardo tra l'inizio del DAG
Ha funzionato perfettamente per tutti i sei compiti del primo tipo, ma dopo cinque minuti il DAG non è ri-avviato
E 'stato più di 1 Ora il DAG non è ancora riavviato. Davvero non so se mi sbaglio.
Sarebbe davvero bello se qualcuno potesse indicarmi cosa c'è che non va. Ho provato a pulire usando airflow testing clear
poi alla stessa cosa. Ha funzionato in prima istanza e poi si è fermato lì.
L'unica cosa che la linea di comando mostra è Getting all instance for DAG testing
Quando cambio della posizione del schedule_interval appena eseguito con qualsiasi orario intervallo parallel.That è con in 5 minuti 300 o più esempio attività è stata completata. Non v'è alcun 5 minuti intervallo di pianificazione
Codice 2:
from airflow import DAG
from airflow.operators import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'Vignesh',
'depends_on_past': False,
'start_date': datetime(2015,10,13),
'email': ['[email protected]'],
'email_on_failure': True,
'email_on_retry': True,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG('testing',schedule_interval=timedelta(minutes=5),default_args=default_args)#Schedule here
run_this_first = BashOperator(task_id='Start1',bash_command='date', dag=dag)
for i in range(5):
t = BashOperator(task_id="Orders1"+str(i), bash_command='sleep 5',dag=dag)
t.set_upstream(run_this_first)
Grazie Vignesh,
quindi stai dicendo che verrà eseguito ogni cinque secondi finché la data di esecuzione non assorbe la data corrente ti dopo di che seguirà l'intervallo di tempo pianificato – The6thSense
Sì, è quello che intendo. – Yongyiw
grazie amico, ma ho due dubbi. Come posso programmare un compito a partire da questo secondo con un intervallo di programmazione di un'ora. Posso programmare un lavoro per il futuro – The6thSense