airflow is an scheduling tool in python, it use DAG to define the whole workflow
- DAG(Directed Acyclic Graph): all tasks in the same DAG have same scheduling time
- DAG run: when a DAG is triggered by time or externals, it create an instance of DAG to run
components
WebServer
webserver is an visuable web interface. it is possible to check and change the running state of defined DAG, change configuration of web interface
worker
work can be deployed on multi-machines, the task scheduled on this work will be processed in queue.there is also a server-logs service, which is to log the task process in worker
scheduler
scheduler checks the defined DAG and tasks insides regularly. If the task meets run condition, scheduler will assign it to worker
Flower
flower is a visuable interface to watch celery worker running state
Operator
airflow has many built-in operators. BashOperator can execuate bash command, PythonOperator can call python function, EmailOperator can send email, HTTPOperator can send http request, SqlOperator can execuate sql command. Airflow also supports user-defined operators
task
task is an instance of Operator
task instance
task can be called repeatedly and each time a new task instance will be generated. The generated task instance has its own state (running, success, failed, skipped, up for retry)
task relationship
tasks in same DAG have dependency relationship
configuration
configuration is done with python script tasks in same configuration cannot communicate with each other
# -*- coding: utf-8 -*-
import airflow
# Operators
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
# The DAG object; we'll need this to instantiaze a DAG
from airflow.models import DAG
# define default args for task constructor
# meaning of each arg can be found in documents :py:class:airflow.models.BaseOperator
args = {
'owner': 'airflow',
'start_date': airflow.utils.dates.days_ago(2)
}
# instantiate a DAG instance with args
dag = DAG(
dag_id='example_bash_operator', default_args=args,
schedule_interval='0 0 * * *')
#creating tasks by instantiating operators
# task_id is the unique identifier of a task
# the parameters from bash command and from Bashoperator are combined together
# the parameter from default args can be rewritten
# task_id and owner are mandatory
cmd = 'ls -l'
run_this_last = DummyOperator(
task_id='run_this_last',
dag=dag)
run_this = BashOperator(
task_id='run_after_loop',
bash_command='echo 1',
dag=dag)
run_this.set_downstream(run_this_last)
for i in range(3):
i = str(i)
task = BashOperator(
task_id='runme_'+i,
bash_command='echo "" && sleep 1',
dag=dag)
task.set_downstream(run_this)
task = BashOperator(
task_id='also_run_this',
bash_command='echo "run_id= | dag_run="',
dag=dag)
task.set_downstream(run_this_last)
DAG
- dag_id
name of DAG instance - default_args
default configuration arguments - schedule_interval
execuation scheduletask
there are many operators available in airflow, we can also define new operator ourself
- Dummy Operator
dummy - BashOperator
run bash command that is specifed in bash_command args. When operator is initialized, it is called task - dag define which DAG this task belong to
- task_id
task name - owner owner of the task
- start_data
time to start the taskjinja template
jinja provides a set of built-in parameters and macros for work flow definition
templated_command = “””
””” # BashOperator params interface allows passing a dictionary of parameters and/or objects to the template # bash_command accepts bash file as input t3 = BashOperator( task_id=’templated’, bash_command=templated_command, params={‘my_param’: ‘Parameter I passed in’}, dag=dag)
dependency
define dependency betwwen task using set_upstream and set_downstream.
it is also possible to define dependency withdag.set_dependency(task_id_1, task_id_2) # This means that t2 will depend on t1 # running successfully to run # It is equivalent to # t1.set_downstream(t2) t2.set_upstream(t1)
verification
metadata verification
# print the list of active DAGs
airflow list_dags
# prints the list of tasks the "tutorial" dag_id
airflow list_tasks dag_id
# prints the hierarchy of tasks in the tutorial DAG
airflow list_tasks dag_id --tree
test
# command layout: command subcommand dag_id task_id date
# testing print_date
airflow test tutorial print_date 2015-06-01
# testing sleep
airflow test tutorial sleep 2015-06-01
# testing templated
airflow test tutorial templated 2015-06-01
backfill
reexecuate some operation in past time
# optional, start a web server in debug mode in the background
# airflow webserver --debug &
# start your backfill on a date range
airflow backfill tutorial -s 2015-06-01 -e 2015-06-07