Use modern decorator syntax to define airflow DAGs.

Python pattern

Airflow supports decorator syntax (@task, @dag) for defining workflows. It is recommended to use them over the legacy python classes.

Works as expected on functions that represent DAGs

BEFORE
def do_my_thing() -> DAG:
    dag = DAG(description="My cool DAG")
    def do_thing(**context: T.Any) -> bool:
        return not aws_rds.db_exists(region=get_variable(EnvVarKeys.TARGET))
    def do_thing_two(**context: T.Any) -> bool:
        pass
    other_operator = ShortCircuitOperator(
        dag=dag,
        task_id='do_db_thing',
        python_callable=do_thing,
        provide_context=True,
    )
    operator_two = PythonOperator(python_callable=do_thing_two)
    other_operator >> operator_two
AFTER
@dag(description="My cool DAG")
def do_my_thing():

    @task(task_id='do_db_thing', provide_context=True)
    def do_thing(**context: T.Any) -> bool:
        return not aws_rds.db_exists(region=get_variable(EnvVarKeys.TARGET))
    @task()
    def do_thing_two(**context: T.Any) -> bool:
        pass


    chain(do_thing, do_thing_two)

Does not affect functions that do not work with dags

PYTHON
def not_a_dag():
  dag = notDAG()
  return dag

Removes any references to the dag variable.

Removing the dag reference from kwargs will still retain the intended behavior, since the Operator is instantiated inside a @dag() decorator context.

BEFORE
def my_dag():
  dag = DAG()
  o1 = EmptyOperator(dag=dag)
  o2 = EmptyOperator(dag=dag,foo=bar)
  return o1 >> o2
AFTER
@dag()
def my_dag():

    o1 = EmptyOperator()
    o2 = EmptyOperator(foo=bar)
    return chain(o1, o2)

Preserves the order of operations when re-writing >> and << to chain calls.

BEFORE
def some_dag():
    dag = DAG()
    first = BashOperator(dag=dag,bash_command="echo hello")
    second = EmptyOperator(dag=dag)
    # second is upstream of first
    first << second
AFTER
@dag()
def some_dag():

    first = BashOperator(bash_command="echo hello")
    second = EmptyOperator()
    # second is upstream of first
    chain(second, first)

Does not remove the dag variable if it used in places other than task operators

If the dag variable is used someplace other than the keyword arguments of a operator, then the variable dag = DAG() should not be removed.

BEFORE
def some_dag():
    dag = DAG()
    def print_dag():
        print(dag)
    t1 = PythonOperator(dag=dag,python_callable=print_dag)
    t2 = PythonOperator(dag=dag,python_callable=print_dag)
    t1 >> t2
AFTER
@dag()
def some_dag():
    dag = DAG()
    @task()
    def print_dag():
        print(dag)


    chain(print_dag, print_dag)

Works when a non-python-callable task is chained with a python-callable task

BEFORE
def some_dag():
    dag = DAG()
    def print_dag():
        print(dag)
    t1 = PythonOperator(dag=dag,python_callable=print_dag)
    t2 = BashOperator(dag=dag,bash_command="echo 1")
    t1 >> t2
AFTER
@dag()
def some_dag():
    dag = DAG()
    @task()
    def print_dag():
        print(dag)

    t2 = BashOperator(bash_command="echo 1")
    chain(print_dag, t2)