Airflow supports decorator syntax (@task
, @dag
) for defining workflows. It is recommended to use them over the legacy python classes.
Apply with the Grit CLI
grit apply airflow_decorator_syntax
Works as expected on functions that represent DAGs
BEFORE
def do_my_thing() -> DAG: dag = DAG(description="My cool DAG") def do_thing(**context: T.Any) -> bool: return not aws_rds.db_exists(region=get_variable(EnvVarKeys.TARGET)) def do_thing_two(**context: T.Any) -> bool: pass other_operator = ShortCircuitOperator( dag=dag, task_id='do_db_thing', python_callable=do_thing, provide_context=True, ) operator_two = PythonOperator(python_callable=do_thing_two) other_operator >> operator_two
AFTER
@dag(description="My cool DAG") def do_my_thing(): @task(task_id='do_db_thing', provide_context=True) def do_thing(**context: T.Any) -> bool: return not aws_rds.db_exists(region=get_variable(EnvVarKeys.TARGET)) @task() def do_thing_two(**context: T.Any) -> bool: pass chain(do_thing, do_thing_two)
Does not affect functions that do not work with dags
PYTHON
def not_a_dag(): dag = notDAG() return dag
Removes any references to the dag
variable.
Removing the dag
reference from kwargs will still retain the intended behavior, since the Operator is instantiated inside a @dag()
decorator context.
BEFORE
def my_dag(): dag = DAG() o1 = EmptyOperator(dag=dag) o2 = EmptyOperator(dag=dag,foo=bar) return o1 >> o2
AFTER
@dag() def my_dag(): o1 = EmptyOperator() o2 = EmptyOperator(foo=bar) return chain(o1, o2)
Preserves the order of operations when re-writing >>
and <<
to chain
calls.
BEFORE
def some_dag(): dag = DAG() first = BashOperator(dag=dag,bash_command="echo hello") second = EmptyOperator(dag=dag) # second is upstream of first first << second
AFTER
@dag() def some_dag(): first = BashOperator(bash_command="echo hello") second = EmptyOperator() # second is upstream of first chain(second, first)
Does not remove the dag
variable if it used in places other than task operators
If the dag
variable is used someplace other than the keyword arguments of a operator, then the variable dag = DAG()
should not be removed.
BEFORE
def some_dag(): dag = DAG() def print_dag(): print(dag) t1 = PythonOperator(dag=dag,python_callable=print_dag) t2 = PythonOperator(dag=dag,python_callable=print_dag) t1 >> t2
AFTER
@dag() def some_dag(): dag = DAG() @task() def print_dag(): print(dag) chain(print_dag, print_dag)
Works when a non-python-callable task is chained with a python-callable task
BEFORE
def some_dag(): dag = DAG() def print_dag(): print(dag) t1 = PythonOperator(dag=dag,python_callable=print_dag) t2 = BashOperator(dag=dag,bash_command="echo 1") t1 >> t2
AFTER
@dag() def some_dag(): dag = DAG() @task() def print_dag(): print(dag) t2 = BashOperator(bash_command="echo 1") chain(print_dag, t2)