Home Artificial Intelligence Mastering ExternalTaskSensor in Apache Airflow: Easy methods to Calculate Execution Delta Why do we’d like External Task Sensor? What does External Task Sensor do? How can we create External Task Sensor? What are Execution Delta and Execution Date Function? How can we fit External Task Sensor into our DAGs? Bonus: Concept of Dates in Airflow Concluding Remarks

Mastering ExternalTaskSensor in Apache Airflow: Easy methods to Calculate Execution Delta Why do we’d like External Task Sensor? What does External Task Sensor do? How can we create External Task Sensor? What are Execution Delta and Execution Date Function? How can we fit External Task Sensor into our DAGs? Bonus: Concept of Dates in Airflow Concluding Remarks

0
Mastering ExternalTaskSensor in Apache Airflow: Easy methods to Calculate Execution Delta
Why do we’d like External Task Sensor?
What does External Task Sensor do?
How can we create External Task Sensor?
What are Execution Delta and Execution Date Function?
How can we fit External Task Sensor into our DAGs?
Bonus: Concept of Dates in Airflow
Concluding Remarks

External Task Sensors are like gatekeepers — they stop bad data from trickling downstream. Image by Freepik.

Orchestrating an information pipeline is a fragile endeavor. In an information pipeline, we are able to have hundreds of tasks running concurrently they usually are sometimes depending on each other. If we’re not careful, a single point of failure can have a domino-like effect that trickles downstream and mess up the entire pipeline.

Apache Airflow introduced the External Task Sensor to place an end to those issues. While it’s an especially powerful feature, it also comes with some extent of complexity.

On this introductory piece, I hope to untangle a number of the confusion surrounding the External Task Sensor and show how we are able to use it to reinforce the reliability of our data pipelines — making sense of sensors!

Meet Jamie, a rookie chef at Airflow Bakery. She’s recent. Her only responsibility is to make a brand new batch of cookie dough every hour.

Jamie’s responsibilities as shown in a “DAG” format. Chef (F) icon by Freepik.

After which we’ve Gordon Damnsie, the cookie master. Gordon takes the dough from Jamie and turns them into award-winning cookies.

Gordon’s responsibilities as shown in a “DAG” format. Chef (M) icon by Freepik.

One positive day, Gordon swoops in to grab the freshest dough he can find and bakes cookies. But when he takes a bite, yuck! “Bad” would’ve been an understatement. Gordon quickly discovers the foundation cause was the stale dough, which was left over from every week ago.

Gordon, visibly frustrated, tosses the cookies into the bin. After he composes himself, he slowly turns to Jamie and asks, “Why is the dough not fresh?”

“I needed to stop making them, Chef. There was an issue with the raw ingredients,” Jamie replies, attempting to stay calm within the face of Gordon’s anger. Unfortunately, the bad cookies had already been served to clients they usually now not trust the food quality of the bakery.

This slight detour is a cautionary tale on the importance of validating the freshness of information sources. Within the story, Gordon’s success relies on Jamie, but they’re working independently without communicating with one another. They “trust” that the opposite person will do their job flawlessly. But as any data practitioner will know, all the pieces that may go fallacious will go fallacious in an information pipeline.

Ideally, Gordon should check with Jamie whether she made dough recently. Once he has confirmed, it signifies that the dough is fresh so he can proceed to bake his cookies. Otherwise, stop baking and determine what went fallacious.

You see, what Gordon needs… is an external task sensor.

An external task sensor checks whether other people accomplished their assigned task. It senses the completion of an external task, hence the name.

Within the context of Airflow, Jamie and Gordon are DAGs. They’ve specific tasks that they need to finish.

Once we add an External Task Sensor, it becomes the middleman that coordinates between the 2 independent DAGs. The sensor will check on Jamie at a selected time to see if she has accomplished her task.

If Jamie successfully completes her task, the sensor will inform Gordon in order that he can carry on along with his downstream tasks.

The external task sensor — check_dough() returns as a hit after verifying that make_dough() ran successfully. Chef (F) and Chef (M) icons by Freepik.

If Jamie fails to finish her task, the sensor stops Gordon from doing any tasks which have a dependency on the failed task.

The external task sensor — check_dough() returns as a fail after verifying that make_dough() didn’t run successfully. Chef (F) and Chef (M) icons by Freepik.

Having this extra layer of validation essentially stops stale data from trickling further downstream and polluting the remainder of our pipeline with dirty, inaccurate data.

Airflow makes it very easy to create an External Task Sensor — just import them. The syntax will look something like this:

from airflow.sensors.external_task import ExternalTaskSensor

ext_task_sensor = ExternalTaskSensor(
dag=gordon_tasks,
task_id='check_dough_freshness',
external_dag_id='jamie_tasks',
external_task_id='make_new_dough',
email=['gordon.damnsie@gmail.com', 'jamie@gmail.com'],
execution_delta=timedelta(minutes=30),
# execution_date_fn=my_function,
timeout=1800,
poke_interval=300,
mode='reschedule'
)

Here’s what they mean:

  1. dag is the present DAG object. Since Gordon is the one who wants to ascertain whether Jamie made dough, this could point to Gordon’s DAG.
  2. task_id is the unique name for this External Task Sensor.
  3. external_dag_id is the name of the DAG you desire to check. On this case, Jamie’s DAG.
  4. external_task_id is the name of the precise task you desire to check. Ideally, we must always at all times specify this. Otherwise, the sensor will check for the completion of the entire DAG as an alternative of only one specific task. In other words, Gordon will do nothing until Jamie finishes chopping onions, washing dishes, and restocking the pantry, though we only need to know whether she made dough. Or worse, if any one among these irrelevant tasks fails, the sensor will unnecessarily pause the complete pipeline.
  5. email is the list of individuals you wish Airflow to notify when the External Task Sensor fails. Have in mind that for this to work, you must have the SMTP settings properly configured within the Airflow configuration file.
  6. execution_delta is arguably essentially the most confusing part about External Task Sensors but in addition a very powerful. So, I’m dedicating a whole section to it below. Keep scrolling!
  7. execution_date_fn and execution delta are very similar. We will only use one among them at a time. Sometimes it’s easier to make use of this somewhat than execution delta. I’m also giving this its own section below.
  8. timeout limits how long a sensor can stay alive. Once we create a sensor, it consumes resources by occupying one employee slot. If the goal task never completes, these sensors will keep checking indefinitely while hogging the employee slot. Over time, we are able to run right into a Sensor Deadlock, where all employee slots grow to be occupied by useless sensors and no tasks can run anymore. Due to this fact, it’s best practice to set a maximum cut-off date for the checks.
  9. poke_interval is the duration before the sensor checks again if the previous check fails. The rationale is that we don’t want the sensor to ascertain excessively like a madman, because it adds unnecessary loads to the server. On the flip side, checking too infrequently means the sensor will wait longer than vital, delaying the pipeline. The trick is to seek out the sweet spot based on the expected run time of the external task.
  10. mode is how we would like the sensor to behave. It may be set to “poke” or “reschedule”.
    When set to “poke”, the sensor goes to sleep on failure and wakes up on the following poke interval to try again. It’s like being on standby mode. The sensor might be more reactive, but because it’s on standby, the employee slot stays occupied throughout the entire process.
    When set to “reschedule”, the sensor will check once. If the check fails, the sensor will schedule one other check at a later time but terminates itself for now, freeing up the employee slot. Airflow recommends using “reschedule” if the poke interval is bigger than 60 seconds.

Alright, that’s nearly every parameter we’d like to learn about External Task Sensor. Granted that this list isn’t exhaustive, knowing these 10 parameters might be good enough for us to establish our External Task Sensor properly for virtually all use cases.

For completeness’ sake, I’ll include Airflow’s official documentation for many who are desperate to explore it in additional detail.

Within the section above, I’ve glossed over these two parameters because they’re arguably essentially the most notorious, annoying, and confusing a part of external task sensors. But I feel it’s time we tackle them.

So what are execution_delta and execution_date_fn?

Constructing on our analogy, external_task_id tells the sensor to ascertain if Jamie accomplished the make_dough() task. But she makes loads of dough — once every hour. Are we checking if she baked previously hour, yesterday, or last week?

This ambiguity confuses External Task Sensors and that’s why Airflow got here up with two ways for us to speak this information. Each execution_delta and execution_date_fn are supposed to tell sensors the precise time of the duty.

  1. execution_delta expresses time on a relative basis, e.g.: “Did Jamie bake half-hour ago?” It accepts a datetime.timedelta object as its argument, e.g: datetime.timedelta(minutes=30).
  2. execution_date_fn expresses time on an absolute basis, e.g.: “Did Jamie bake on the third May 2023 at 4.30 pm?” It accepts a callable Python function as its argument. This function should return the execution date of the duty that we would like to ascertain on, e.g: datetime.datetime(yr=2023,month=5,day=3,hour=4,minute=30).

Since each of them convey the identical information, Airflow only allows us to make use of one or the opposite, but not each at the identical time.

I generally use execution_delta because the de-facto selection. But, there are scenarios where it’s too complicated to calculate the execution_delta. In that case, I’d use execution_date_fn as an alternative.

Easy methods to calculate execution_delta?

The word, execution_delta, is brief for delta (a.k.a difference) of execution dates (a.k.a the previous runtime of our tasks).

The formula for execution_delta. Image by creator.

I’d like to spotlight the keyword here — “previous”.

A few of it’s possible you’ll be wondering… Why does Airflow want the time difference of previous runs, but not the present runs? This used to confuse the crap out of me once I first began using Airflow.

Seems there’s a wonderfully good reason. Nonetheless, I don’t wish to derail from the subject at hand so I’ll include it within the later section (here). For now, let’s just accept the formula as-is and see how we might apply this.

Suppose that Jamie makes dough every hour (e.g: 13:00 pm, 14:00 pm, 15:00 pm, …). Gordon also makes cookies every hour, but he makes them on the thirtieth minute of each hour (e.g: 13:30 pm, 14:30 pm, 15:30 pm, …).

At 14:30 pm sharp, Gordon gets able to bake his cookie. Before he starts, he would want to ascertain if Jamie made fresh dough recently. The newest run for make_dough() could be 14:00 pm.

This time series shows the duty dependencies between Jamie and Gordon. Gordon at all times checks whether Jamie accomplished her task half an hour ago. Chef (F) and Chef (M) icons by Freepik.

On condition that each Gordon and Jamie’s tasks are scheduled hourly, their execution date (a.k.a previous runs) for the 14:30 pm run could be…

  • Gordon’s execution date = 14:30 pm — 1 hour = 13:30 pm
  • Jamie’s execution date = 14:00 pm — 1 hour = 13:00 pm

We will plug these values into the formula, and voilà!

The execution_delta comes out to be datetime.timedelta(minute=30) for one specific run. Image by creator.

You may do the identical calculation for various runs of the tasks to get their respective execution_delta.

When calculating execution delta, it’s helpful to put them out in a format like this. We would like to calculate the execution deltas for multiple runs, not only one, to be able to be certain that they’re all the identical! Image by creator.

On this (cherry-picked) example, the entire execution_delta seems to be the exact same. We will pass this to our External Task Sensor and all the pieces will work.

from airflow.sensors.external_task import ExternalTaskSensor

ext_task_sensor = ExternalTaskSensor(
dag=gordon_tasks,
task_id='check_dough_freshness',
external_dag_id='jamie_tasks',
external_task_id='make_new_dough',
email=['gordon.damnsie@gmail.com', 'jamie@gmail.com'],
execution_delta=timedelta(minutes=30), # Pass the execution delta here
timeout=1800,
poke_interval=300,
mode='reschedule'
)

But-!

The execution_delta can be different sometimes. This normally happens when the schedule intervals of the 2 dags are different (e.g.: each day vs weekly, each day vs monthly, …).

For instance, let’s say that Jamie makes her dough weekly on Sunday at 14:00 pm, but Gordon makes his cookies each day at 14:30 pm.

The arrow between Jamie’s task and Gordon’s sensor represents the execution delta. The execution delta gets longer over the week until it resets again on Sunday. Chef (F) and Chef (M) by Freepik.

If we do the identical calculations, you will notice that the execution deltas differ for each run.

Note that execution deltas can vary for various runs. Image by creator.

This becomes an issue because execution_delta only accepts a single datetime object as its argument. We will’t input a special value of execution_delta for each run.

In cases like this, we’d like execution_date_fn.

Easy methods to calculate Execution Date Function?

The execution_date_fn is just an everyday Python function. As with all Python functions, it takes some argument(s) and returns some output(s). But the great thing about using a function is the power to return a special output based on the function’s inputs and logic.

Within the case of execution_date_fn, Airflow passes the current task’s execution date as an argument and expects the function to return the external task’s execution date. Note that these execution dates must be expressed in UTC time.

def my_exec_date_fn(gordon_exec_date):
# Add your logic here.
return jamie_exec_date

ext_task_sensor = ExternalTaskSensor(
dag=gordon_tasks,
task_id='check_dough_freshness',
external_dag_id='jamie_tasks',
external_task_id='make_new_dough',
email=['gordon.damnsie@gmail.com', 'jamie@gmail.com'],
execution_date_fn=my_exec_date_fn, # Pass the function here.
timeout=1800,
poke_interval=300,
mode='reschedule'
)

Based on our earlier case study, our execution_date_fn would want to do the next…

My Airflow is configured to local time (GMT+8), so I want to deduct 8 hours to get the UTC time. Image by creator.

One naive way may very well be hardcoding each run, until the top of time.

# The naive way (This can be a bad practice. Don't do that.)
def my_exec_date_fn(gordon_exec_date):
if gordon_exec_date == datetime(yr=2023,month=3,day=14,hour=6,minute=30):
jamie_exec_date = datetime(yr=2023,month=3,day=5,hour=6,minute=0)
elif gordon_exec_date == datetime(yr=2023,month=3,day=15,hour=6,minute=30):
jamie_exec_date = datetime(yr=2023,month=3,day=5,hour=6,minute=0)
elif gordon_exec_date == datetime(yr=2023,month=3,day=16,hour=6,minute=30):
jamie_exec_date = datetime(yr=2023,month=3,day=5,hour=6,minute=0)
elif gordon_exec_date == datetime(yr=2023,month=3,day=17,hour=6,minute=30):
jamie_exec_date = datetime(yr=2023,month=3,day=5,hour=6,minute=0)
...

return jamie_exec_date

This works however it is unquestionably not essentially the most efficient way.

A greater approach is to search for consistent patterns and use that to programmatically derive the outputs. Normally, place to search for patterns is the execution_delta, because it comprises the connection between the execution dates (we talked about this here).

Moreover, we also can have a look at datetime attributes, comparable to the day of the week. If we actually give it some thought, our External Task Sensor will at all times be pointing to a Sunday because Jamie only makes dough on Sunday. As we move through the week, Gordon’s task date might be further and further away from this Sunday until it resets again the following Sunday. Then, it repeats.

That is showing the time difference between the present runs for simplicity’s sake. Execution_date_fn looks at previous runs, but we’ll see the identical patterns there too. Chef (F) and Chef (M) icons by Freepik.

This means that day of the week will also be helpful in coming up with our execution_date_fn. So let’s add the day of the week to our table. I’ll be labeling Monday as 1 and Sunday as 7 as per the ISO 8601 standard.

The numbers in brackets are the week of day, where Monday is 1 and Sunday is 7. Image by creator.

By labeling them, it becomes immediately clear that…

  • The execution_delta starts from 6 on a Saturday.
  • The execution_delta increases by 1 every single day, as much as a maximum of 12 every Friday.
  • The execution_delta then resets back to a 6 on a Saturday.

We will re-create that relationship in a Python function and assign this execution_date_fn to our External Task Sensor.

def my_exec_date_fn(gordon_exec_date):
day_of_week = gordon_exec_date.isoweekday()

if day_of_week in (6, 7):
time_diff = timedelta(days=day_of_week, minute=30)
jamie_exec_date = gordon_exec_date - time_diff
elif day_of_week in (1, 2, 3, 4, 5):
time_diff = timedelta(days=day_of_week+7, minute=30)
jamie_exec_date = gordon_exec_date - time_diff

return jamie_exec_date

ext_task_sensor = ExternalTaskSensor(
dag=gordon_tasks,
task_id='check_dough_freshness',
external_dag_id='jamie_tasks',
external_task_id='make_new_dough',
email=['gordon.damnsie@gmail.com', 'jamie@gmail.com'],
execution_date_fn=my_exec_date_fn,
timeout=1800,
poke_interval=300,
mode='reschedule'
)

There we’ve it — our very own execution_date_fn. With a little bit of creativity, execution_date_fn can cater to any scenario.

Up until this point, we’ve covered all the pieces you must know to start with External Task Sensor. On this section, I believed it’d be nice to collate the entire things we’ve learned to see how the pieces fit together in our data pipelines.

Initially, we’ll be creating Jamie DAG, in a file called jamie_dag.py.

from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.sensors.external_task import ExternalTaskSensor

# Define task 1
def make_dough():
# include your secret recipe here!
return cookies

# Create DAG
jamie_tasks = DAG(
dag_id='jamie_tasks',
description='Jamie to do list. (a.k.a making dough only)',
schedule_interval='5 3 * * *',
...
)

# Include task 0 in DAG (as a place to begin)
start = DummyOperator(
dag=jamie_tasks,
task_id='start'
)

# Include task 1 in DAG
make_dough = PythonOperator(
dag=jamie_tasks,
task_id='make_dough',
python_callable=make_dough,
...
)

# Create dependencies (deciding the sequence of task to run)
start >> make_dough

Then, we’ll be creating Gordon DAG, in one other file called gordon_dag.py.

from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.sensors.external_task import ExternalTaskSensor

# Define task 1
def bake_cookies():
# include your secret recipe here!
return cookies

# Define task 2
def make_money():
# include your money making technique step-by-step here.
return money

# Define execution_date_fn for sensor 1
def my_exec_date_fn(gordon_exec_date):
day_of_week = gordon_exec_date.isoweekday()

if day_of_week in (6, 7):
time_diff = timedelta(days=day_of_week, minute=30)
jamie_exec_date = gordon_exec_date - time_diff
elif day_of_week in (1, 2, 3, 4, 5):
time_diff = timedelta(days=day_of_week+7, minute=30)
jamie_exec_date = gordon_exec_date - time_diff

return jamie_exec_date

# Create DAG
gordon_tasks = DAG(
dag_id='gordon_tasks',
description='List of things that Gordon must do.',
schedule_interval='5 3 * * *',
...
)

# Include task 0 in DAG (as a place to begin)
start = DummyOperator(
dag=gordon_tasks,
task_id='start'
)

# Include task 1 in DAG
bake_cookies = PythonOperator(
dag=gordon_tasks,
task_id='bake_cookies',
python_callable=bake_cookies,
...
)

# Include task 2 in DAG
make_money = PythonOperator(
dag=gordon_tasks,
task_id='make_money',
python_callable=make_money,
...
)

# Create sensor 1
check_dough_freshness = ExternalTaskSensor(
dag=gordon_tasks,
task_id='check_dough_freshness',
external_dag_id='jamie_tasks',
external_task_id='make_new_dough',
email=['gordon.damnsie@gmail.com', 'jamie@gmail.com'],
execution_date_fn=my_exec_date_fn,
timeout=1800,
poke_interval=300,
mode='reschedule'
)

# Create dependencies (deciding the sequence of task to run)
(start
>> check_dough_freshness
>> bake_cookies
>> make_money)

Note that External Task Sensor is in gordon_dag.py and never jamie_dag.py since we would like Gordon to be checking on Jamie, not the opposite way around. Gordon’s DAG could be the present DAG and Jamie the external DAG.

And… there we’ve it!

We’ve created our very first External Task Sensor, check_dough_fresness. This sensor will poke Jamie’s make_new_dough() returns either Success or Fail. If it fails, bake_cookies() and make_money() is not going to run.

Dates in Apache Airflow are confusing because there are such a lot of date-related terminologies, comparable to start_date, end_date, schedule_interval, execution_date, etc. It’s a multitude, really. But let’s attempt to figure it out with a story.

Suppose that our boss desires to know the sales performance of his company. He wants this data to be refreshed every single day at 12 midnight for the following 6 months.

First, we write an advanced SQL query that generates the sales performance data. It takes 6 hours to run the query.

  • task_start is the beginning time of a task.
  • task_end is the top time of a task.
  • task_duration is the time it takes to run the duty.
A single task. Image by creator.

Each day, we’ll must run this task at 12 midnight.

A single task, scheduled at 12am and runs for six hours. Image by creator.

To automate this question, we create an Airflow DAG and specify the start_date and end_date. Airflow will execute the DAG so long as today’s date falls inside this era.

An Airflow DAG. Image by creator.

Then, we put the duty into the Airflow DAG.

We want this data refreshed once a day at 12 midnight. So, we set the schedule_interval to "0 0 * * *", which is the CRON equivalent of each day at 12 midnight.

The schedule_interval essentially adds a delay between each consecutive schedule, telling Airflow only run the duty at a selected time, since we don’t want the duty to re-run again as soon because it finishes.

  • interval_start refers back to the start time of a selected schedule interval.
  • interval_end refers back to the end time of a selected schedule interval.
Note that interval_start and interval_end can overlap. The interval_end of the previous schedule interval might be the identical because the interval_start of the following schedule interval. Image by creator.

Here comes essentially the most mind-blowing part — although seemingly counterintuitive, Airflow Scheduler triggers a DAG run on the end of its schedule interval, somewhat than originally of it.

Which means Airflow is not going to do anything within the first-ever schedule interval. Our query will run for the primary time on 2nd Jan 2023 at 12 am.

The coloured bars are like data. All of the “yellow” data only gets summarized on 2nd Jan. Image by creator.

It’s because Airflow is originally created as an ETL tool. It’s built on the concept that data from a time frame gets summarised on the end of the interval.

For instance, if we desired to know the sales of cookies for the first of January, we wouldn’t create a sales report on the first of January at 1 pm since the day hasn’t ended yet and the sales number could be incomplete. As an alternative, we might only process the information when the clock strikes 12 midnight. Today, we might be processing yesterday’s data.

Why is that this essential?

Since we’re summarizing the previous run’s data, the sales report we’re producing on the 2nd of Jan describes the first of Jan sales, not the 2nd of Jan sales.

For that reason, Airflow finds it more meaningful to check with this run because the 1st of Jan run though it’s executed on the 2nd. To raised differentiate the dates, Airflow gives a special name to the start of a schedule interval—execution_date.

Although we run the “yellow” task on 2nd Jan, its execution date is definitely 1st Jan. Image by creator.

For this reason we at all times take the difference of the “previous” run once we calculate execution_delta since it is the delta of the execution_dates, which is basically the “previous” run.

External Task Sensors are like gatekeepers. They stop bad data from going downstream by ensuring that tasks are executed in a selected order and that the vital dependencies are met before proceeding with subsequent tasks.

For many who have never used External Task Sensors before, I hope the article was in a position to convey its importance and persuade you to begin using them. For many who have been using them, I hope a number of the insights listed here are in a position to help deepen your understanding.

Thanks on your time, and have an excellent day.

LEAVE A REPLY

Please enter your comment!
Please enter your name here