• about reply
Data Reply Logo
Menu
  • Stories
  • Careers
Choose language:
  • about Reply
Data Reply Logo
Focus On

Blog

Integrating Slack Alerts in Airflow

Kaxil Naik is a senior Data Engineer at Data Reply and a PMC Member, committer and contributor to Apache Airflow.

FOCUS ON: Airflow, Notification,

You just triggered your Airflow DAG that sends data to your clients and you being confident that the DAG will succeed (Why will it not — you wrote it.. There is no way it can fail), you go to have coffee with your colleagues in Company’s kitchen where the awesome Coffee Machine is waiting for you to serve the most delicious coffee ☕. You discuss how you can make the company better (Of course you don’t talk about how awesome the new Avengers trailer is !!). And then you finally decide to go back to your seat being a smirk to see the green status on your DAG. But wait…… what just happened, your DAG failed — of course, it was not your fault, the letters “DAG” decided to change their order to “DGA” when you were having coffee, hence your DAG failed. At the same time, your boss comes to you and asks you — “How’s your work going? Was the data sent to all clients?”⚡⚡⚡. And you wished that there was some way you could have received an alert on your mobile when you were having coffee.

Well, I can go on and on with this stupid story but the fact is you need alerting when your DAG fails so that you can take actions at the earliest. Airflow has a built-in capability to send alerts on emails but well it gets lost in the pile of other 1000 unread emails. And it is just easier to get alerts where your entire team has an eye on — SLACK.

  slack-alerts-airflow.png 0

There are 2 ways in which you can integrate Slack with Airflow.

(1) Using Slack Legacy Tokens:

Legacy tokens are an old method of generating tokens for testing and development and Slack themselves don’t recommend to use this but it is the simplest method — hence you can still use it but bear in mind that it can get deprecated anytime.

Follow this steps:

  • Create a Slack Token from https://api.slack.com/custom-integrations/legacy-tokens. You will see the list of Slack Workspaces your email is associated with. Click on Create Token next to the workspace where you want to send alerts.
  • Use the SlackAPIPostOperator (Remember to install slack dependencies pip install apache-airflow[slack]) Operator in your DAG as below:
    from airflow.operators.slack_operator import SlackAPIPostOperator
    SlackAPIPostOperator(
        task_id='failure',
        token='YOUR_TOKEN',
        text='Hello World !',
        channel='SLACK_CHANNEL',  # Replace with your Slack username
        username='airflow'
    )
    
  • You can try this example in iPython or using Jupyter notebook as follows:

slack-code.png 1

However, this is just an example to send a message on slack and not alerts on task failures. Each task in Airflow contains a parameter called on_failure_callback (of callable type) to which you pass a function to be called when a task fails. 

Example:

def slack_failed_task(context):  
    failed_alert = SlackAPIPostOperator(
        task_id='slack_failed',
        channel="#datalabs",
        token="...",
        text = ':red_circle: Task Failed',
        username = 'airflow',)
    return failed_alert.execute(context=context)
task_with_failed_slack_alerts = BashOperator(
    task_id='fail_task',
    bash_command='exit 1',
    on_failure_callback=slack_failed_task,
    provide_context=True,
    dag=dag)
    


Now when you run the dag with the above task, it would send you an alert as shown in the image below:

airflow-task-fail-slack.png 2

This is useful but there are still 2 issues with the above code:

  • The slack token is stored in plain-text
  • The Slack alert message isn’t properly formatted

The first issue can be resolved by storing the Slack token in Airflow Connections in the password field as follows:

airflow-slack-connection.png 3

I also recommend running pip install apache-airflow[crypto] which encrypts connection passwords in metadata db.

Now let’s update our function to use token and channel name from connections and also improve alert format:

from airflow.hooks.base_hook import BaseHook
from airflow.operators.slack_operator import SlackAPIPostOperator
SLACK_CONN_ID = 'slack'

def task_fail_slack_alert(context):
    """
    Sends message to a slack channel.
    If you want to send it to a "user" -> use "@user",
        if "public channel" -> use "#channel",
        if "private channel" -> use "channel"
    """
    slack_channel = BaseHook.get_connection(SLACK_CONN_ID).login
    slack_token = BaseHook.get_connection(SLACK_CONN_ID).password
    failed_alert = SlackAPIPostOperator(
        task_id='slack_failed',
        channel=slack_channel,
        token=slack_token,
        text="""
            :red_circle: Task Failed. 
            *Task*: {task}  
            *Dag*: {dag} 
            *Execution Time*: {exec_date}  
            *Log Url*: {log_url} 
            """.format(
            task=context.get('task_instance').task_id,
            dag=context.get('task_instance').dag_id,
            ti=context.get('task_instance'),
            exec_date=context.get('execution_date'),
            log_url=context.get('task_instance').log_url,
        )
    )
    return failed_alert.execute(context=context)
    

Sample alert with this function would be as follows:

slack-alert-airflow-bordered.png 4

As you can see it also gives you a Log URL so that you can directly go to the log associated with the failed task.

(2) Using Slack Web Hooks:

Slack recommends Web Hook to send data to it.

Follow the steps below:

  • Create a Slack app if you don’t have already.
    create%20slack%20app.png 5

  • Enable Incoming Webhooks on the next page 

    basic%20info.png 6
    create-incoming-webhook-1.png 7

  • Create an Incoming Webhook by clicking on Add New Webhook to Workspace on the same page

    addnew%20webhook%20to%20workspace.png 8

You will see something similar to below image:

data%20reply.png 9

So go ahead and pick a channel that the app will post to, and then click to Authorize your app. You’ll be sent back to your app settings, and you should now see a new entry under the Webhook URLs for Your Workspace section, with a Webhook URL that’ll look something like this:

https://hooks.slack.com/services/T00000000/B00000000/XXXXXXXXXXXXXXXXXXXXXXXX

From here,

  • Create an Airflow connection for Slack with HTTP connection and the part after https://hooks.slack.com/services should go under password:

    Host: https://hooks.slack.com/services
    Conn Type: HTTP
    Password: /T00000000/B00000000/XXXXXXXXXXXXXXXXXXXXXXXX

  • Create a Python function:
    from airflow.hooks.base_hook import BaseHook
    from airflow.contrib.operators.slack_webhook_operator import SlackWebhookOperator
    SLACK_CONN_ID = 'slack'
    def task_fail_slack_alert(context):
        slack_webhook_token = BaseHook.get_connection(SLACK_CONN_ID).password
        slack_msg = """
                :red_circle: Task Failed. 
                *Task*: {task}  
                *Dag*: {dag} 
                *Execution Time*: {exec_date}  
                *Log Url*: {log_url} 
                """.format(
                task=context.get('task_instance').task_id,
                dag=context.get('task_instance').dag_id,
                ti=context.get('task_instance'),
                exec_date=context.get('execution_date'),
                log_url=context.get('task_instance').log_url,
            )
        failed_alert = SlackWebhookOperator(
            task_id='slack_test',
            http_conn_id='slack',
            webhook_token=slack_webhook_token,
            message=slack_msg,
            username='airflow',
            dag=dag)
        return failed_alert.execute(context=context)
          

Bonus Tip

You can add on_failure_callback to default_args when defining DAG as below so that you get alert if any task in the DAG fails:

default_args = {
    'owner': 'airflow',
    'start_date': airflow.utils.dates.days_ago(2),
    'retries': 0,
    'on_failure_callback': task_fail_slack_alert
}
dag = DAG(
    dag_id=DAG_NAME,
    default_args=default_args,
    schedule_interval=schedule_interval,
)
         

You can follow the same steps to interate Slack with Google Cloud Composer.

Data Reply is the Reply group company offering a broad range of analytics and data processing services. We operate across different industries and business functions, working directly with executive level professionals, enabling them to achieve meaningful outcomes through effective use of data. We find that one of the biggest problems experienced by our clients today is being overwhelmed with the amount of data that they face and not knowing how to leverage it to their advantage. The vast landscape of available technology stacks and models means that choosing the right ones can be a daunting task. Most companies know that their data is valuable, and that they should be making the most out of it to stay competitive, but often don't know where to begin or what to prioritise. At Data Reply, we pride ourselves on helping clients make the right decisions to build their data strategy. With our consultants expertise, we map the right technologies to meet our clients' business needs. We deal in bespoke solutions, and offer in house training to ensure that our clients realise the full value of their big data solution.

RELATED CONTENTS

 
 
 
 
Reply ©​​ 2023​ - Company Information -
 Privacy Cookie Settings​
  • About Reply​
  • Inves​tors​​​​
  • Newsroom
  • Follow us on
  • ​
​
  • ​Privacy & Cookies Policy
  • Information (Client)
  • Information (Supplier)
  • Information (Candidate)
​Reply Enterprise Social Network​