Introduction
The in-memory data store - Redis is extensively used by developers as a database, cache layer, to manage job queues, and more.
It comes in handy when you are building APIs with a job queue mechanism to handle tasks like running memory-intensive jobs in the background, counting page visits, or sending bulk email campaigns.
If you are building an API in Python, the Redis Queue (RQ) module gives you functions to queue, schedule, and process these jobs by deploying workers.
In this article, you will be exploring the RQ module to set up job queues and workers, as well as the RQ-Dashboard module to visualize them.
Setting Up Redis
If you don't have an API already, or any code you can apply Redis RQ to - you can clone our GitHub repository with sample code.
Let's clone the repo and install its dependencies:
$ git clone [email protected]:StackAbuse/redis-queues-redis-queue-dashboards.git
$ cd redis-queues-redis-queue-dashboards
$ python -m venv env
$ . env/bin/activate
$ pip install -r requirements.txt
We also have to install Redis, which can actually be a bit tricky if you're not using a Linux-based OS. The easiest way to install it on non-Linux operating systems is via Docker Compose:
$ docker-compose up -d
Our docker-compose
file is configured to download a Redis image, and that command will run it in the background. Alternatively, you can install Redis locally.
For Ubuntu, that installation looks like this:
$ sudo apt-get install redis
$ sudo service redis-server start
Starting redis-server: redis-server.
$ redis-cli -v
redis-cli 4.0.9
Now that our environment is created, let's take a look at Redis Queues.
Redis Queue (RQ)
The Redis Queue (RQ) is a queuing module that runs on top of Redis. It acts as the producer to submit the jobs to the queue. The module also comes with workers which act as consumers to process the submitted jobs from the queue asynchronously. But, what is a job anyway?
Jobs are references to Python functions that are pushed to the queue.
Multiple queues to process jobs can exist, and these queues can be named in any way you'd like. The jobs submitted to the queues can be monitored using their job IDs.
Let's write a simple script to enqueue a job in a Redis Queue, in say, test.py
:
# Imported to assign redis as the backend to rq
from redis import Redis
# Imported to initialize the queue object
from rq import Queue
# Functions from the __main__ module can't be processed by workers
# Hence, we have a separate Python file containing the function
from test_job import i_am_a_job
# Create the queue object by passing in the redis object
q = Queue(connection=Redis())
# Run the job asynchronously
job = q.enqueue(i_am_a_job, 1)
# Return the function output
The function which you want to enqueue has to be imported from a separate Python file. We've imported it from test_job.py
:
# A Function (or) a job
def i_am_a_job(arg1):
# Perform some function
return arg1
Now that the Redis Queue and the function is set, let's execute the Python script:
$ python test.py
Running this command should create a job but return no output. If you get an error, review the setup steps again before continuing.
If everything worked, let's use RQ-Dashboard to manage our job.
Managing Redis Queue Jobs with RQ-Dashboard
You can inspect the state of your jobs in a Redis Queue by using RQ-Dashboard, a lightweight Flask app used to monitor Redis Queues. Let's run RQ-Dashboard to monitor the job we've just created.
In a separate Terminal, navigate to the folder where you cloned the repo. There, we'll spin up RQ-Dashboard:
$ . env/bin/activate
$ rq-dashboard
RQ Dashboard version 0.5.2
* Serving Flask app 'rq_dashboard.cli' (lazy loading)
* Environment: production
WARNING: This is a development server. Do not use it in a production deployment.
Use a production WSGI server instead.
* Debug mode: off
* Running on all addresses.
WARNING: This is a development server. Do not use it in a production deployment.
* Running on http://192.168.1.10:9181/ (Press CTRL+C to quit)
192.168.1.10 - - [11/Jun/2021 15:30:12] "GET / HTTP/1.1" 200 -
You can access RQ-Dashboard at http://localhost:9181. When you open the link, you'll notice that your job is still in the queue and there aren't any workers assigned yet:
Redis Queue Workers
Workers pick a job from the queue to execute them. In another Terminal (you can use the first as well), let's create a worker:
Check out our hands-on, practical guide to learning Git, with best-practices, industry-accepted standards, and included cheat sheet. Stop Googling Git commands and actually learn it!
$ . env/bin/activate # For new Terminals when you don't install the dependencies locally
$ rq worker --with-scheduler
15:42:38 Worker rq:worker:a33eb6277eda4969921cc8e3f1e857c0: started, version 1.8.1
15:42:38 Subscribing to channel rq:pubsub:a33eb6277eda4969921cc8e3f1e857c0
15:42:38 *** Listening on default...
15:42:38 Trying to acquire locks for default
15:42:38 Cleaning registries for queue: default
15:42:38 Scheduler for default started with PID 1093
15:42:38 default: test_job.i_am_a_job(1) (b92bf928-48dd-4fb9-a551-427866c46a38)
15:42:38 default: Job OK (b92bf928-48dd-4fb9-a551-427866c46a38)
15:42:38 Result is kept for 500 seconds
The job that you submitted has been executed and the result is kept in Redis for 500 seconds. In addition to the immediate job executions, jobs can also be scheduled to be run at a future time, similar to a CRON job. The enqueue statement can be written as a scheduled one by:
job = queue.enqueue_at(datetime(2021, 7, 7, 13, 15), i_am_a_job, 1)
These operations are the basis of using Redis Queues, monitoring them and assigning workers. Now, let's write up a small, practical application that counts the number of page visits.
Redis Queue Demo Application - Counting Site Visits
The code in the repository that you downloaded earlier includes a Flask application:
from flask import Flask
from redis import Redis
from rq import Queue
from counter import visit
app = Flask(__name__)
q = Queue(connection=Redis())
@app.route('/visit')
def count_visit():
count = q.enqueue(visit)
return "Visit has been registered"
@app.route('/')
def return_visit_count():
count = Redis().get('count').decode('utf-8') if Redis().get('count') else '0'
return (f'<h1> Congrats! Your are the visitor no.: {count} </h1>')
In your Terminal, let's run this Flask app:
$ . env/bin/activate # Unless it's already running
$ flask run
This launches the Flask app in app.py
. This app contains two routes: /
and /visit
.
Every time the http://localhost:5000/visit endpoint is hit, the count
key in Redis is incremented by 1 and the following webpage is returned.
The incrementing function is queued as a job. The number of visits is visible in the endpoint: http://localhost:5000 as:
Let's try visiting the endpoint http://localhost:5000/visit three times. This will submit our job thrice. Let's then check the status of our jobs on RQ-Dashboard. Visit http://localhost:9181 and you can observe the following webpage where our jobs are successfully submitted but no workers are processing them:
To start the Redis Queue worker and scheduler, open another Terminal, and key in the command to start a worker. Observe that the submitted jobs are getting executed one after the other:
$ . env/bin/activate # For new Terminals when you don't install the dependencies locally
$ rq worker --with-scheduler
23:40:06 Worker rq:worker:f5a178b0931b42859699ce57696ed402: started, version 1.8.1
23:40:06 Subscribing to channel rq:pubsub:f5a178b0931b42859699ce57696ed402
23:40:06 *** Listening on default...
23:40:06 Trying to acquire locks for default
23:40:06 Cleaning registries for queue: default
23:40:06 Scheduler for default started with PID 2889
23:40:06 default: counter.visit() (d23c4df8-d638-476b-b70a-dbb4b6f091f2)
23:40:06 default: Job OK (d23c4df8-d638-476b-b70a-dbb4b6f091f2)
23:40:06 Result is kept for 500 seconds
23:40:06 default: counter.visit() (f4ca10c4-16f2-4578-b1b7-67dfce3cee5a)
23:40:06 default: Job OK (f4ca10c4-16f2-4578-b1b7-67dfce3cee5a)
23:40:06 Result is kept for 500 seconds
23:40:06 default: counter.visit() (956b7b39-0b82-4ac6-b29e-fe3f0706431e)
23:40:06 default: Job OK (956b7b39-0b82-4ac6-b29e-fe3f0706431e)
23:40:06 Result is kept for 500 seconds
You can check the dashboard again and you may find that the jobs have been executed. This can be checked by pointing to the URL on your browser to http://localhost:9181. Notice that the worker is now up and running and the jobs have been processed successfully.
Let's check the number of visits by opening or refreshing the app on http://localhost:5000. Voilà! The page visit counter has been incremented by 3.
Think of a website with high traffic and one wishes to monitor the site visits and page visits. In this case, multiple instances of this API are served under a load balancer and the count is done based on the jobs submitted in the queue asynchronously.
Conclusion
In this article, we have explored the importance of job queues and how RQ and RQ-Dashboards can serve as a minimalistic job queue stack to your web apps. The practical example can be expanded to other real-world applications where the possibilities are endless.