Asynchronous Tasks Using Flask, Redis, and Celery

Introduction

As web applications evolve and their usage increases, the use-cases also diversify. We are now building and using websites for more complex tasks than ever before. Some of these tasks can be processed and feedback relayed to the users instantly, while others require further processing and relaying of results later. The increased adoption of Internet access and Internet-enabled devices has led to increased end-user traffic.

In a bid to handle increased traffic or increased complexity of functionality, sometimes we may choose to defer the work and have the results relayed at a later time. This way, we do not get to keep the user waiting for an unknown time on our web application, and instead send the results at a later time. We can achieve this by utilizing background tasks to process work when there is low traffic or process work in batches.

One of the solutions we can use to achieve this is Celery. It helps us break down complex pieces of work and have them performed by different machines to ease the load on one machine or reduce the time taken to completion.

In this post, we will explore the usage of Celery to schedule background tasks in a Flask application to offload resource-intensive tasks and prioritize responding to end-users.

What is a Task Queue?

A task queue is a mechanism to distribute small units of work or tasks that can be executed without interfering with the request-response cycle of most web-based applications.

Task queues are helpful with delegating work that would otherwise slow down applications while waiting for responses. They can also be used to handle resource-intensive tasks while the main machine or process interacts with the user.

This way, the interaction with the user is consistent, timely, and unaffected by the workload.

What is Celery?

Celery is an asynchronous task queue based on distributed message passing to distribute workload across machines or threads. A celery system consists of a client, a broker, and several workers.

These workers are responsible for the execution of the tasks or pieces of work that are placed in the queue and relaying the results. With Celery, you can have both local and remote workers meaning that work can be delegated to different and more capable machines over the Internet and results relayed back to the client.

This way, the load on the main machine is alleviated and more resources are available to handle user requests as they come in.

The client in a Celery setup is responsible for issuing jobs to the workers and also communicating with them using a message broker. The broker facilitates the communication between the client and the workers in a Celery installation through a message queue, where a message is added to the queue and the broker delivers it to the client.

Examples of such message brokers include Redis and RabbitMQ.

Why use Celery?

There are various reasons why we should Celery for our background tasks. First, it is quite scalable, allowing more workers to be added on-demand to cater to increased load or traffic. Celery is also still in active development, meaning it is a supported project alongside its concise documentation and active community of users.

Another advantage is that Celery is easy to integrate into multiple web frameworks, with most having libraries to facilitate integration.

It also provides the functionality to interact with other web applications through webhooks where there is no library to support the interaction.

Celery can also use a variety of message brokers which offers us flexibility. RabbitMQ is recommended but it can also support Redis and Beanstalk.

Demo Application

We'll build a Flask application that allows users to set reminders that will be delivered to their emails at a set time.

We will also provide the functionality to customize the amount of time before the message or reminder is invoked and the message is sent out to the user.

Setup

Like any other project, our work will take place in a virtual environment which we will create and manage using the Pipenv tool:

$ pipenv install --three
$ pipenv shell

For this project, we will need to install the Flask and Celery packages to start:

$ pipenv install flask celery

This is what our Flask application file structure will look like:

.
├── Pipfile                    # manage our environment
├── Pipfile.lock
├── README.md
├── __init__.py
├── app.py                     # main Flask application implementation
├── config.py                  # to host the configuration
├── requirements.txt           # store our requirements
└── templates
    └── index.html             # the landing page

1 directory, 8 files

For our Celery-based project, we will use Redis as the message broker and we can find the instructions to set it up on their homepage.

Implementation

Let's start by creating the Flask application that will render a form that allows users to enter the details of the message to be sent at a future time.

We will add the following to our app.py file:

from flask import Flask, flash, render_template, request, redirect, url_for

app = Flask(__name__)
app.config.from_object("config")
app.secret_key = app.config['SECRET_KEY']

@app.route('/', methods=['GET', 'POST'])
def index():
    if request.method == 'GET':
        return render_template('index.html')

    elif request.method == 'POST':
        email = request.form['email']
        first_name = request.form['first_name']
        last_name = request.form['last_name']
        message = request.form['message']
        duration = request.form['duration']
        duration_unit = request.form['duration_unit']

        flash(“Message scheduled”)
        return redirect(url_for('index'))


if __name__ == '__main__':
    app.run(debug=True)

This is a really simple app with just a single route to handle a GET and POST request for the form. Once the details are submitted, we can hand over the data to a function that will schedule the job.

In order to declutter our main application file, we will put the configuration variables in a separate config.py file and load the config from the file:

app.config.from_object("config")

Our config.py file will be in the same folder as the app.py file and contains some basic configurations:

SECRET_KEY = 'very_very_secure_and_secret'
# more config

For now, let us implement the landing page as index.html:

{% for message in get_flashed_messages() %}
  <p style="color: red;">{{ message }}</p>
{% endfor %}

<form method="POST">
    First Name: <input id="first_name" name="first_name" type="text">
    Last Name: <input id="last_name" name="last_name" type="text">
    Email: <input id="email" name="email" type="email">
    Message: <textarea id="textarea" name="message"></textarea>
    Duration: <input id="duration" name="duration" placeholder="Enter duration as a number. for example: 3" type="text">

   <select name="duration_unit">
      <option value="" disabled selected>Choose the duration</option>
      <option value="1">Minutes</option>
      <option value="2">Hours</option>
      <option value="3">Days</option>
   </select>

   <button type="submit" name="action">Submit </button>
</form>

Styling and formatting has been truncated for brevity, feel free to format/style your HTML as you'd like.

We can now start our application:

Sending Emails Using Flask-Mail

In order to send emails from our Flask application, we will use the Flask-Mail library, which we add to our project as follows:

$ pipenv install flask-mail

With our Flask application and the form in place, we can now integrate Flask-Mail in our app.py:

from flask_mail import Mail, Message

app = Flask(__name__)
app.config.from_object("config")
app.secret_key = app.config['SECRET_KEY']

# set up Flask-Mail Integration
mail = Mail(app)

def send_mail(data):
    """ Function to send emails.
    """
    with app.app_context():
        msg = Message("Ping!",
                    sender="admin.ping",
                    recipients=[data['email']])
        msg.body = data['message']
        mail.send(msg)

The function send_main(data) will receive the message to be sent and the recipient of the email and then it will be invoked after the specified time has passed to send the email to the user.

We will also need to add the following variables to our config.py in order for Flask-Mail to work:

# Flask-Mail
MAIL_SERVER = 'smtp.googlemail.com'
MAIL_PORT = 587
MAIL_USE_TLS = True
MAIL_USERNAME = 'mail-username'
MAIL_PASSWORD = 'mail-password'

Celery Integration

With our Flask application ready and equipped with email sending functionality, we can now integrate Celery in order to schedule the emails to be sent out at a later date.

Free eBook: Git Essentials

Check out our hands-on, practical guide to learning Git, with best-practices, industry-accepted standards, and included cheat sheet. Stop Googling Git commands and actually learn it!

Our app.py will be modified again:

# Existing imports are maintained
from celery import Celery

# Flask app and flask-mail configuration truncated

# Set up celery client
client = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
client.conf.update(app.config)

# Add this decorator to our send_mail function
@client.task
def send_mail(data):
    # Function remains the same

@app.route('/', methods=['GET', 'POST'])
def index():
    if request.method == 'GET':
        return render_template('index.html')

    elif request.method == 'POST':
        data = {}
        data['email'] = request.form['email']
        data['first_name'] = request.form['first_name']
        data['last_name'] = request.form['last_name']
        data['message'] = request.form['message']
        duration = int(request.form['duration'])
        duration_unit = request.form['duration_unit']

        if duration_unit == 'minutes':
            duration *= 60
        elif duration_unit == 'hours':
            duration *= 3600
        elif duration_unit == 'days':
            duration *= 86400

        send_mail.apply_async(args=[data], countdown=duration)
        flash(f"Email will be sent to {data['email']} in {request.form['duration']} {duration_unit}")

        return redirect(url_for('index'))

We import celery and use it to initialize the Celery client in our Flask application by attaching the URL for the messaging broker. In our case, we will be using Redis as the broker, thus we add the following to our config.py:

CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'

In order to have our send_mail() function executed as a background task, we will add the @client.task decorator so that our Celery client will be aware of it.

After setting up the Celery client, the main function which also handles form input is modified.

First, we pack the input data for the send_mail() function in a dictionary. Then, we invoke our mailing function through the Celery Task Calling API using the function apply_async, which takes in the arguments required by our function.

An optional countdown parameter is set, defining a delay between running the code and performing the task.

This duration is in seconds, which is the reason why we convert the duration passed by the user into seconds depending on the unit of time they choose.

After the user has submitted the form, we will acknowledge the reception and notify them through a banner message when the message will be sent out.

Bringing Everything Together

In order to run our project, we will need two terminals, one to start our Flask application and the other to start the Celery worker that will send messages in the background.

Start the Flask app in the first terminal:

$ python app.py

In the second terminal, start the virtual environment and then start the Celery worker:

# start the virtualenv
$ pipenv shell
$ celery worker -A app.client --loglevel=info

If everything goes well, we will get the following feedback in the terminal running the Celery client:

Now let us navigate to http://localhost:5000 and fill in the details scheduling the email to arrive after 2 minutes of submission.

Above the form, a message will appear indicating the address that will receive the email and the duration after which the email will be sent. In our Celery terminal, we will also be able to see the a log entry that signifies that our email has been scheduled:

[2019-10-23 16:27:25,399: INFO/MainProcess] Received task: app.send_mail[d65025c8-a291-40d0-aea2-e816cb40cd78]  ETA:[2019-10-23 13:29:25.170622+00:00]

The ETA section of the entry shows when our send_email() function will be called and thus when the email will be sent.

So far, so good. Our emails are being scheduled and sent out in the specified time, however, one thing is missing. We have no visibility of the tasks before or after they are executed and we have no way of telling whether the email was actually sent or not.

For this reason, let's implement a monitoring solution for our background tasks so that we can view tasks and also be aware in case something goes wrong and the tasks are not executed as planned.

Monitoring our Celery Cluster Using Flower

Flower is a web-based tool that will provide visibility of our Celery setup and provide the functionality to view task progress, history, details, and statistics, including success or failure rates. We can also monitor all the workers in our cluster and the tasks they are currently handling.

Installing Flower is as easy as:

$ pipenv install flower

Earlier on, we specified the details of our Celery client in our app.py file. We'll need to pass that client to Flower in order to monitor it.

To achieve this we need to open up a third terminal window, jump into our virtual environment, and start our monitoring tool:

$ pipenv shell
$ flower -A app.client --port=5555

When starting Flower, we specify the Celery client by passing it through the application (-A) argument, and also specifying the port to be used through the --port argument.

With our monitoring in place, let us schedule another email to be sent on the dashboard, and then navigate to http://localhost:5555, where we are welcomed by the following:

On this page, we can see the list of workers in our Celery cluster, which is currently just made up of our machine.

To view the email we have just scheduled, click on the Tasks button on the top left side of the dashboard and this will take us to the page where we can see the tasks that have been scheduled:

In this section, we can see that we had scheduled two emails and one has been successfully sent out at the scheduled time. The emails were scheduled to be sent out after 1 minute and 5 minutes respectively for testing purposes.

We can also see the time the text was received and when it was executed from this section.

In the monitor section, there are graphs displaying the success and failure rates of the background tasks.

We can schedule messages for as long as we wish, but that also means that our worker has to be online and functional at the time the task is supposed to be executed.

Conclusion

We have successfully set up a Celery cluster and integrated it into our Flask application that allows users to schedule emails to be sent out after a certain time in the future.

The email sending functionality has been delegated to a background task and placed in a queue where it will be picked and executed by a worker in our local Celery cluster.

The source code for this project is, as always, available on GitHub.

Was this article helpful?

Improve your dev skills!

Get tutorials, guides, and dev jobs in your inbox.

No spam ever. Unsubscribe at any time. Read our Privacy Policy.

Course

Data Visualization in Python with Matplotlib and Pandas

# python# pandas# matplotlib

Data Visualization in Python with Matplotlib and Pandas is a course designed to take absolute beginners to Pandas and Matplotlib, with basic Python knowledge, and...

David Landup
David Landup
Details

Make Clarity from Data - Quickly Learn Data Visualization with Python

Learn the landscape of Data Visualization tools in Python - work with Seaborn, Plotly, and Bokeh, and excel in Matplotlib!

From simple plot types to ridge plots, surface plots and spectrograms - understand your data and learn to draw conclusions from it.

© 2013-2024 Stack Abuse. All rights reserved.

AboutDisclosurePrivacyTerms