Airflow

Posted by neverset on September 5, 2020

airflow is an scheduling tool in python, it use DAG to define the whole workflow

  • DAG(Directed Acyclic Graph): all tasks in the same DAG have same scheduling time
  • DAG run: when a DAG is triggered by time or externals, it create an instance of DAG to run

components

WebServer

webserver is an visuable web interface. it is possible to check and change the running state of defined DAG, change configuration of web interface

worker

work can be deployed on multi-machines, the task scheduled on this work will be processed in queue.there is also a server-logs service, which is to log the task process in worker

scheduler

scheduler checks the defined DAG and tasks insides regularly. If the task meets run condition, scheduler will assign it to worker

Flower

flower is a visuable interface to watch celery worker running state

Operator

airflow has many built-in operators. BashOperator can execuate bash command, PythonOperator can call python function, EmailOperator can send email, HTTPOperator can send http request, SqlOperator can execuate sql command. Airflow also supports user-defined operators

task

task is an instance of Operator

task instance

task can be called repeatedly and each time a new task instance will be generated. The generated task instance has its own state (running, success, failed, skipped, up for retry)

task relationship

tasks in same DAG have dependency relationship

configuration

configuration is done with python script tasks in same configuration cannot communicate with each other

# -*- coding: utf-8 -*-
import airflow
# Operators
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
# The DAG object; we'll need this to instantiaze a DAG
from airflow.models import DAG
# define default args for task constructor
# meaning of each arg can be found in documents :py:class:airflow.models.BaseOperator
args = {
    'owner': 'airflow',
    'start_date': airflow.utils.dates.days_ago(2)
}
# instantiate a DAG instance with args
dag = DAG(
    dag_id='example_bash_operator', default_args=args,
    schedule_interval='0 0 * * *')

#creating tasks by instantiating operators
# task_id is the unique identifier of a task
# the parameters from bash command and from Bashoperator are combined together
# the parameter from default args can be rewritten
# task_id and owner are mandatory
cmd = 'ls -l'
run_this_last = DummyOperator(
    task_id='run_this_last', 
    dag=dag)

run_this = BashOperator(
    task_id='run_after_loop', 
    bash_command='echo 1', 
    dag=dag)
run_this.set_downstream(run_this_last)

for i in range(3):
    i = str(i)
    task = BashOperator(
        task_id='runme_'+i,
        bash_command='echo "" && sleep 1',
        dag=dag)
    task.set_downstream(run_this)

task = BashOperator(
    task_id='also_run_this',
    bash_command='echo "run_id= | dag_run="',
    dag=dag)
task.set_downstream(run_this_last)

DAG

  • dag_id
    name of DAG instance
  • default_args
    default configuration arguments
  • schedule_interval
    execuation schedule

    task

    there are many operators available in airflow, we can also define new operator ourself

  • Dummy Operator
    dummy
  • BashOperator
    run bash command that is specifed in bash_command args. When operator is initialized, it is called task
  • dag define which DAG this task belong to
  • task_id
    task name
  • owner owner of the task
  • start_data
    time to start the task

    jinja template

    jinja provides a set of built-in parameters and macros for work flow definition

    templated_command = “””

    ””” # BashOperator params interface allows passing a dictionary of parameters and/or objects to the template # bash_command accepts bash file as input t3 = BashOperator( task_id=’templated’, bash_command=templated_command, params={‘my_param’: ‘Parameter I passed in’}, dag=dag)

    dependency

    define dependency betwwen task using set_upstream and set_downstream.
    it is also possible to define dependency with

    dag.set_dependency(task_id_1, task_id_2) # This means that t2 will depend on t1 # running successfully to run # It is equivalent to # t1.set_downstream(t2) t2.set_upstream(t1)

    verification

metadata verification

# print the list of active DAGs
airflow list_dags
# prints the list of tasks the "tutorial" dag_id
airflow list_tasks dag_id
# prints the hierarchy of tasks in the tutorial DAG
airflow list_tasks dag_id --tree

test

# command layout: command subcommand dag_id task_id date
# testing print_date
airflow test tutorial print_date 2015-06-01
# testing sleep
airflow test tutorial sleep 2015-06-01
# testing templated
airflow test tutorial templated 2015-06-01

backfill

reexecuate some operation in past time

# optional, start a web server in debug mode in the background
# airflow webserver --debug &
# start your backfill on a date range
airflow backfill tutorial -s 2015-06-01 -e 2015-06-07