External Task Sensors stop bad data from trickling downstream in an information pipeline. Leverage them to create a reliable data infrastructure.
Orchestrating an information pipeline is a fragile endeavor. In an information pipeline, we are able to have hundreds of tasks running concurrently they usually are sometimes depending on each other. If we’re not careful, a single point of failure can have a domino-like effect that trickles downstream and mess up the entire pipeline.
Apache Airflow introduced the External Task Sensor to place an end to those issues. While it’s an especially powerful feature, it also comes with some extent of complexity.
On this introductory piece, I hope to untangle a number of the confusion surrounding the External Task Sensor and show how we are able to use it to reinforce the reliability of our data pipelines — making sense of sensors!
Meet Jamie, a rookie chef at Airflow Bakery. She’s recent. Her only responsibility is to make a brand new batch of cookie dough every hour.
After which we’ve Gordon Damnsie, the cookie master. Gordon takes the dough from Jamie and turns them into award-winning cookies.
One positive day, Gordon swoops in to grab the freshest dough he can find and bakes cookies. But when he takes a bite, yuck! “Bad” would’ve been an understatement. Gordon quickly discovers the foundation cause was the stale dough, which was left over from every week ago.
Gordon, visibly frustrated, tosses the cookies into the bin. After he composes himself, he slowly turns to Jamie and asks, “Why is the dough not fresh?”
“I needed to stop making them, Chef. There was an issue with the raw ingredients,” Jamie replies, attempting to stay calm within the face of Gordon’s anger. Unfortunately, the bad cookies had already been served to clients they usually now not trust the food quality of the bakery.
This slight detour is a cautionary tale on the importance of validating the freshness of information sources. Within the story, Gordon’s success relies on Jamie, but they’re working independently without communicating with one another. They “trust” that the opposite person will do their job flawlessly. But as any data practitioner will know, all the pieces that may go fallacious will go fallacious in an information pipeline.
Ideally, Gordon should check with Jamie whether she made dough recently. Once he has confirmed, it signifies that the dough is fresh so he can proceed to bake his cookies. Otherwise, stop baking and determine what went fallacious.
You see, what Gordon needs… is an external task sensor.
An external task sensor checks whether other people accomplished their assigned task. It senses the completion of an external task, hence the name.
Within the context of Airflow, Jamie and Gordon are DAGs. They’ve specific tasks that they need to finish.
Once we add an External Task Sensor, it becomes the middleman that coordinates between the 2 independent DAGs. The sensor will check on Jamie at a selected time to see if she has accomplished her task.
If Jamie successfully completes her task, the sensor will inform Gordon in order that he can carry on along with his downstream tasks.
If Jamie fails to finish her task, the sensor stops Gordon from doing any tasks which have a dependency on the failed task.
Having this extra layer of validation essentially stops stale data from trickling further downstream and polluting the remainder of our pipeline with dirty, inaccurate data.
Airflow makes it very easy to create an External Task Sensor — just import them. The syntax will look something like this:
from airflow.sensors.external_task import ExternalTaskSensorext_task_sensor = ExternalTaskSensor(
dag=gordon_tasks,
task_id='check_dough_freshness',
external_dag_id='jamie_tasks',
external_task_id='make_new_dough',
email=['gordon.damnsie@gmail.com', 'jamie@gmail.com'],
execution_delta=timedelta(minutes=30),
# execution_date_fn=my_function,
timeout=1800,
poke_interval=300,
mode='reschedule'
)
Here’s what they mean:
dag
is the present DAG object. Since Gordon is the one who wants to ascertain whether Jamie made dough, this could point to Gordon’s DAG.task_id
is the unique name for this External Task Sensor.external_dag_id
is the name of the DAG you desire to check. On this case, Jamie’s DAG.external_task_id
is the name of the precise task you desire to check. Ideally, we must always at all times specify this. Otherwise, the sensor will check for the completion of the entire DAG as an alternative of only one specific task. In other words, Gordon will do nothing until Jamie finishes chopping onions, washing dishes, and restocking the pantry, though we only need to know whether she made dough. Or worse, if any one among these irrelevant tasks fails, the sensor will unnecessarily pause the complete pipeline.email
is the list of individuals you wish Airflow to notify when the External Task Sensor fails. Have in mind that for this to work, you must have the SMTP settings properly configured within the Airflow configuration file.execution_delta
is arguably essentially the most confusing part about External Task Sensors but in addition a very powerful. So, I’m dedicating a whole section to it below. Keep scrolling!execution_date_fn
and execution delta are very similar. We will only use one among them at a time. Sometimes it’s easier to make use of this somewhat than execution delta. I’m also giving this its own section below.timeout
limits how long a sensor can stay alive. Once we create a sensor, it consumes resources by occupying one employee slot. If the goal task never completes, these sensors will keep checking indefinitely while hogging the employee slot. Over time, we are able to run right into a Sensor Deadlock, where all employee slots grow to be occupied by useless sensors and no tasks can run anymore. Due to this fact, it’s best practice to set a maximum cut-off date for the checks.poke_interval
is the duration before the sensor checks again if the previous check fails. The rationale is that we don’t want the sensor to ascertain excessively like a madman, because it adds unnecessary loads to the server. On the flip side, checking too infrequently means the sensor will wait longer than vital, delaying the pipeline. The trick is to seek out the sweet spot based on the expected run time of the external task.mode
is how we would like the sensor to behave. It may be set to “poke” or “reschedule”.
When set to “poke”, the sensor goes to sleep on failure and wakes up on the following poke interval to try again. It’s like being on standby mode. The sensor might be more reactive, but because it’s on standby, the employee slot stays occupied throughout the entire process.
When set to “reschedule”, the sensor will check once. If the check fails, the sensor will schedule one other check at a later time but terminates itself for now, freeing up the employee slot. Airflow recommends using “reschedule” if the poke interval is bigger than 60 seconds.
Alright, that’s nearly every parameter we’d like to learn about External Task Sensor. Granted that this list isn’t exhaustive, knowing these 10 parameters might be good enough for us to establish our External Task Sensor properly for virtually all use cases.
For completeness’ sake, I’ll include Airflow’s official documentation for many who are desperate to explore it in additional detail.
Within the section above, I’ve glossed over these two parameters because they’re arguably essentially the most notorious, annoying, and confusing a part of external task sensors. But I feel it’s time we tackle them.
So what are execution_delta
and execution_date_fn
?
Constructing on our analogy, external_task_id
tells the sensor to ascertain if Jamie accomplished the make_dough()
task. But she makes loads of dough — once every hour. Are we checking if she baked previously hour, yesterday, or last week?
This ambiguity confuses External Task Sensors and that’s why Airflow got here up with two ways for us to speak this information. Each execution_delta
and execution_date_fn
are supposed to tell sensors the precise time of the duty.
execution_delta
expresses time on a relative basis, e.g.: “Did Jamie bake half-hour ago?” It accepts adatetime.timedelta
object as its argument, e.g:datetime.timedelta(minutes=30)
.execution_date_fn
expresses time on an absolute basis, e.g.: “Did Jamie bake on the third May 2023 at 4.30 pm?” It accepts a callable Python function as its argument. This function should return the execution date of the duty that we would like to ascertain on, e.g:datetime.datetime(yr=2023,month=5,day=3,hour=4,minute=30)
.
Since each of them convey the identical information, Airflow only allows us to make use of one or the opposite, but not each at the identical time.
I generally use execution_delta
because the de-facto selection. But, there are scenarios where it’s too complicated to calculate the execution_delta
. In that case, I’d use execution_date_fn
as an alternative.
Easy methods to calculate execution_delta?
The word, execution_delta
, is brief for delta (a.k.a difference) of execution dates (a.k.a the previous runtime of our tasks).
I’d like to spotlight the keyword here — “previous”.
A few of it’s possible you’ll be wondering… Why does Airflow want the time difference of previous runs, but not the present runs? This used to confuse the crap out of me once I first began using Airflow.
Seems there’s a wonderfully good reason. Nonetheless, I don’t wish to derail from the subject at hand so I’ll include it within the later section (here). For now, let’s just accept the formula as-is and see how we might apply this.
Suppose that Jamie makes dough every hour (e.g: 13:00 pm, 14:00 pm, 15:00 pm, …). Gordon also makes cookies every hour, but he makes them on the thirtieth minute of each hour (e.g: 13:30 pm, 14:30 pm, 15:30 pm, …).
At 14:30 pm sharp, Gordon gets able to bake his cookie. Before he starts, he would want to ascertain if Jamie made fresh dough recently. The newest run for make_dough()
could be 14:00 pm.
On condition that each Gordon and Jamie’s tasks are scheduled hourly, their execution date (a.k.a previous runs) for the 14:30 pm run could be…
- Gordon’s execution date = 14:30 pm — 1 hour = 13:30 pm
- Jamie’s execution date = 14:00 pm — 1 hour = 13:00 pm
We will plug these values into the formula, and voilà!
You may do the identical calculation for various runs of the tasks to get their respective execution_delta
.
On this (cherry-picked) example, the entire execution_delta
seems to be the exact same. We will pass this to our External Task Sensor and all the pieces will work.
from airflow.sensors.external_task import ExternalTaskSensorext_task_sensor = ExternalTaskSensor(
dag=gordon_tasks,
task_id='check_dough_freshness',
external_dag_id='jamie_tasks',
external_task_id='make_new_dough',
email=['gordon.damnsie@gmail.com', 'jamie@gmail.com'],
execution_delta=timedelta(minutes=30), # Pass the execution delta here
timeout=1800,
poke_interval=300,
mode='reschedule'
)
But-!
The execution_delta
can be different sometimes. This normally happens when the schedule intervals of the 2 dags are different (e.g.: each day vs weekly, each day vs monthly, …).
For instance, let’s say that Jamie makes her dough weekly on Sunday at 14:00 pm, but Gordon makes his cookies each day at 14:30 pm.
If we do the identical calculations, you will notice that the execution deltas differ for each run.
This becomes an issue because execution_delta
only accepts a single datetime
object as its argument. We will’t input a special value of execution_delta
for each run.
In cases like this, we’d like execution_date_fn
.
Easy methods to calculate Execution Date Function?
The execution_date_fn
is just an everyday Python function. As with all Python functions, it takes some argument(s) and returns some output(s). But the great thing about using a function is the power to return a special output based on the function’s inputs and logic.
Within the case of execution_date_fn
, Airflow passes the current task’s execution date as an argument and expects the function to return the external task’s execution date. Note that these execution dates must be expressed in UTC time.
def my_exec_date_fn(gordon_exec_date):
# Add your logic here.
return jamie_exec_dateext_task_sensor = ExternalTaskSensor(
dag=gordon_tasks,
task_id='check_dough_freshness',
external_dag_id='jamie_tasks',
external_task_id='make_new_dough',
email=['gordon.damnsie@gmail.com', 'jamie@gmail.com'],
execution_date_fn=my_exec_date_fn, # Pass the function here.
timeout=1800,
poke_interval=300,
mode='reschedule'
)
Based on our earlier case study, our execution_date_fn
would want to do the next…
One naive way may very well be hardcoding each run, until the top of time.
# The naive way (This can be a bad practice. Don't do that.)
def my_exec_date_fn(gordon_exec_date):
if gordon_exec_date == datetime(yr=2023,month=3,day=14,hour=6,minute=30):
jamie_exec_date = datetime(yr=2023,month=3,day=5,hour=6,minute=0)
elif gordon_exec_date == datetime(yr=2023,month=3,day=15,hour=6,minute=30):
jamie_exec_date = datetime(yr=2023,month=3,day=5,hour=6,minute=0)
elif gordon_exec_date == datetime(yr=2023,month=3,day=16,hour=6,minute=30):
jamie_exec_date = datetime(yr=2023,month=3,day=5,hour=6,minute=0)
elif gordon_exec_date == datetime(yr=2023,month=3,day=17,hour=6,minute=30):
jamie_exec_date = datetime(yr=2023,month=3,day=5,hour=6,minute=0)
...return jamie_exec_date
This works however it is unquestionably not essentially the most efficient way.
A greater approach is to search for consistent patterns and use that to programmatically derive the outputs. Normally, place to search for patterns is the execution_delta
, because it comprises the connection between the execution dates (we talked about this here).
Moreover, we also can have a look at datetime
attributes, comparable to the day of the week. If we actually give it some thought, our External Task Sensor will at all times be pointing to a Sunday because Jamie only makes dough on Sunday. As we move through the week, Gordon’s task date might be further and further away from this Sunday until it resets again the following Sunday. Then, it repeats.
This means that day of the week will also be helpful in coming up with our execution_date_fn
. So let’s add the day of the week to our table. I’ll be labeling Monday as 1 and Sunday as 7 as per the ISO 8601 standard.
By labeling them, it becomes immediately clear that…
- The
execution_delta
starts from 6 on a Saturday. - The
execution_delta
increases by 1 every single day, as much as a maximum of 12 every Friday. - The
execution_delta
then resets back to a 6 on a Saturday.
We will re-create that relationship in a Python function and assign this execution_date_fn
to our External Task Sensor.
def my_exec_date_fn(gordon_exec_date):
day_of_week = gordon_exec_date.isoweekday()if day_of_week in (6, 7):
time_diff = timedelta(days=day_of_week, minute=30)
jamie_exec_date = gordon_exec_date - time_diff
elif day_of_week in (1, 2, 3, 4, 5):
time_diff = timedelta(days=day_of_week+7, minute=30)
jamie_exec_date = gordon_exec_date - time_diff
return jamie_exec_date
ext_task_sensor = ExternalTaskSensor(
dag=gordon_tasks,
task_id='check_dough_freshness',
external_dag_id='jamie_tasks',
external_task_id='make_new_dough',
email=['gordon.damnsie@gmail.com', 'jamie@gmail.com'],
execution_date_fn=my_exec_date_fn,
timeout=1800,
poke_interval=300,
mode='reschedule'
)
There we’ve it — our very own execution_date_fn
. With a little bit of creativity, execution_date_fn
can cater to any scenario.
Up until this point, we’ve covered all the pieces you must know to start with External Task Sensor. On this section, I believed it’d be nice to collate the entire things we’ve learned to see how the pieces fit together in our data pipelines.
Initially, we’ll be creating Jamie DAG, in a file called jamie_dag.py
.
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.sensors.external_task import ExternalTaskSensor# Define task 1
def make_dough():
# include your secret recipe here!
return cookies
# Create DAG
jamie_tasks = DAG(
dag_id='jamie_tasks',
description='Jamie to do list. (a.k.a making dough only)',
schedule_interval='5 3 * * *',
...
)
# Include task 0 in DAG (as a place to begin)
start = DummyOperator(
dag=jamie_tasks,
task_id='start'
)
# Include task 1 in DAG
make_dough = PythonOperator(
dag=jamie_tasks,
task_id='make_dough',
python_callable=make_dough,
...
)
# Create dependencies (deciding the sequence of task to run)
start >> make_dough
Then, we’ll be creating Gordon DAG, in one other file called gordon_dag.py
.
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.sensors.external_task import ExternalTaskSensor# Define task 1
def bake_cookies():
# include your secret recipe here!
return cookies
# Define task 2
def make_money():
# include your money making technique step-by-step here.
return money
# Define execution_date_fn for sensor 1
def my_exec_date_fn(gordon_exec_date):
day_of_week = gordon_exec_date.isoweekday()
if day_of_week in (6, 7):
time_diff = timedelta(days=day_of_week, minute=30)
jamie_exec_date = gordon_exec_date - time_diff
elif day_of_week in (1, 2, 3, 4, 5):
time_diff = timedelta(days=day_of_week+7, minute=30)
jamie_exec_date = gordon_exec_date - time_diff
return jamie_exec_date
# Create DAG
gordon_tasks = DAG(
dag_id='gordon_tasks',
description='List of things that Gordon must do.',
schedule_interval='5 3 * * *',
...
)
# Include task 0 in DAG (as a place to begin)
start = DummyOperator(
dag=gordon_tasks,
task_id='start'
)
# Include task 1 in DAG
bake_cookies = PythonOperator(
dag=gordon_tasks,
task_id='bake_cookies',
python_callable=bake_cookies,
...
)
# Include task 2 in DAG
make_money = PythonOperator(
dag=gordon_tasks,
task_id='make_money',
python_callable=make_money,
...
)
# Create sensor 1
check_dough_freshness = ExternalTaskSensor(
dag=gordon_tasks,
task_id='check_dough_freshness',
external_dag_id='jamie_tasks',
external_task_id='make_new_dough',
email=['gordon.damnsie@gmail.com', 'jamie@gmail.com'],
execution_date_fn=my_exec_date_fn,
timeout=1800,
poke_interval=300,
mode='reschedule'
)
# Create dependencies (deciding the sequence of task to run)
(start
>> check_dough_freshness
>> bake_cookies
>> make_money)
Note that External Task Sensor is in gordon_dag.py
and never jamie_dag.py
since we would like Gordon to be checking on Jamie, not the opposite way around. Gordon’s DAG could be the present DAG and Jamie the external DAG.
And… there we’ve it!
We’ve created our very first External Task Sensor, check_dough_fresness
. This sensor will poke Jamie’s make_new_dough()
returns either Success or Fail. If it fails, bake_cookies()
and make_money()
is not going to run.
Dates in Apache Airflow are confusing because there are such a lot of date-related terminologies, comparable to start_date
, end_date
, schedule_interval
, execution_date
, etc. It’s a multitude, really. But let’s attempt to figure it out with a story.
Suppose that our boss desires to know the sales performance of his company. He wants this data to be refreshed every single day at 12 midnight for the following 6 months.
First, we write an advanced SQL query that generates the sales performance data. It takes 6 hours to run the query.
task_start
is the beginning time of a task.task_end
is the top time of a task.task_duration
is the time it takes to run the duty.
Each day, we’ll must run this task at 12 midnight.
To automate this question, we create an Airflow DAG and specify the start_date
and end_date
. Airflow will execute the DAG so long as today’s date falls inside this era.
Then, we put the duty into the Airflow DAG.
We want this data refreshed once a day at 12 midnight. So, we set the schedule_interval
to "0 0 * * *"
, which is the CRON equivalent of each day at 12 midnight.
The schedule_interval
essentially adds a delay between each consecutive schedule, telling Airflow only run the duty at a selected time, since we don’t want the duty to re-run again as soon because it finishes.
interval_start
refers back to the start time of a selected schedule interval.interval_end
refers back to the end time of a selected schedule interval.
Here comes essentially the most mind-blowing part — although seemingly counterintuitive, Airflow Scheduler triggers a DAG run on the end of its schedule interval, somewhat than originally of it.
Which means Airflow is not going to do anything within the first-ever schedule interval. Our query will run for the primary time on 2nd Jan 2023 at 12 am.
It’s because Airflow is originally created as an ETL tool. It’s built on the concept that data from a time frame gets summarised on the end of the interval.
For instance, if we desired to know the sales of cookies for the first of January, we wouldn’t create a sales report on the first of January at 1 pm since the day hasn’t ended yet and the sales number could be incomplete. As an alternative, we might only process the information when the clock strikes 12 midnight. Today, we might be processing yesterday’s data.
Why is that this essential?
Since we’re summarizing the previous run’s data, the sales report we’re producing on the 2nd of Jan describes the first of Jan sales, not the 2nd of Jan sales.
For that reason, Airflow finds it more meaningful to check with this run because the 1st of Jan run though it’s executed on the 2nd. To raised differentiate the dates, Airflow gives a special name to the start of a schedule interval—execution_date
.
For this reason we at all times take the difference of the “previous” run once we calculate execution_delta
since it is the delta of the execution_dates
, which is basically the “previous” run.
External Task Sensors are like gatekeepers. They stop bad data from going downstream by ensuring that tasks are executed in a selected order and that the vital dependencies are met before proceeding with subsequent tasks.
For many who have never used External Task Sensors before, I hope the article was in a position to convey its importance and persuade you to begin using them. For many who have been using them, I hope a number of the insights listed here are in a position to help deepen your understanding.
Thanks on your time, and have an excellent day.