Overview of Async IO in Python 3.7

Python 3's asyncio module provides fundamental tools for implementing asynchronous I/O in Python. It was introduced in Python 3.4, and with each subsequent minor release, the module has evolved significantly.

This tutorial contains a general overview of the asynchronous paradigm, and how it's implemented in Python 3.7.

Blocking vs Non-Blocking I/O

The problem that asynchrony seeks to resolve is blocking I/O.

By default, when your program accesses data from an I/O source, it waits for that operation to complete before continuing to execute the program.

with open('myfile.txt', 'r') as file:
    data = file.read()
    # Until the data is read into memory, the program waits here
print(data)

The program is blocked from continuing its flow of execution while a physical device is accessed, and data is transferred.

Network operations are another common source of blocking:

# pip install --user requests
import requests

req = requests.get('https://www.stackabuse.com/')

#
# Blocking occurs here, waiting for completion of an HTTPS request
#

print(req.text)

In many cases, the delay caused by blocking is negligible. However, blocking I/O scales very poorly. If you need to wait for 1010 file reads or network transactions, performance will suffer.

Multiprocessing, Threading, and Asynchrony

Strategies for minimizing the delays of blocking I/O fall into three major categories: multiprocessing, threading, and asynchrony.

Multiprocessing

Multiprocessing is a form of parallel computing: instructions are executed in an overlapping time frame on multiple physical processors or cores. Each process spawned by the kernel incurs an overhead cost, including an independently-allocated chunk of memory (heap).

Python implements parallelism with the multiprocessing module.

The following is an example of a Python 3 program that spawns four child processes, each of which exhibits a random, independent delay. The output shows the process ID of each child, the system time before and after each delay, and the current and peak memory allocation at each step.

from multiprocessing import Process
import os, time, datetime, random, tracemalloc

tracemalloc.start()
children = 4    # number of child processes to spawn
maxdelay = 6    # maximum delay in seconds

def status():
    return ('Time: ' + 
        str(datetime.datetime.now().time()) +
        '\t Malloc, Peak: ' +
        str(tracemalloc.get_traced_memory()))

def child(num):
    delay = random.randrange(maxdelay)
    print(f"{status()}\t\tProcess {num}, PID: {os.getpid()}, Delay: {delay} seconds...")
    time.sleep(delay)
    print(f"{status()}\t\tProcess {num}: Done.")

if __name__ == '__main__':
    print(f"Parent PID: {os.getpid()}")
    for i in range(children):
        proc = Process(target=child, args=(i,))
        proc.start()

Output:

Parent PID: 16048
Time: 09:52:47.014906    Malloc, Peak: (228400, 240036)     Process 0, PID: 16051, Delay: 1 seconds...
Time: 09:52:47.016517    Malloc, Peak: (231240, 240036)     Process 1, PID: 16052, Delay: 4 seconds...
Time: 09:52:47.018786    Malloc, Peak: (231616, 240036)     Process 2, PID: 16053, Delay: 3 seconds...
Time: 09:52:47.019398    Malloc, Peak: (232264, 240036)     Process 3, PID: 16054, Delay: 2 seconds...
Time: 09:52:48.017104    Malloc, Peak: (228434, 240036)     Process 0: Done.
Time: 09:52:49.021636    Malloc, Peak: (232298, 240036)     Process 3: Done.
Time: 09:52:50.022087    Malloc, Peak: (231650, 240036)     Process 2: Done.
Time: 09:52:51.020856    Malloc, Peak: (231274, 240036)     Process 1: Done.

Threading

Threading is an alternative to multiprocessing, with benefits and downsides.

Threads are independently scheduled, and their execution may occur within an overlapping time period. Unlike multiprocessing, however, threads exist entirely in a single kernel process, and share a single allocated heap.

Python threads are concurrent — multiple sequences of machine code are executed in overlapping time frames. But they are not parallel — execution does not occur simultaneously on multiple physical cores.

The primary downsides to Python threading are memory safety and race conditions. All child threads of a parent process operate in the same shared memory space. Without additional protections, one thread may overwrite a shared value in memory without other threads being aware of it. Such data corruption would be disastrous.

To enforce thread safety, CPython implementations use a global interpreter lock (GIL). The GIL is a mutex mechanism that prevents multiple threads from executing simultaneously on Python objects. Effectively, this means that only one thread runs at any given time.

Here's the threaded version of the multiprocessing example from the previous section. Notice that very little has changed: multiprocessing.Process is replaced with threading.Thread. As indicated in the output, everything happens in a single process, and the memory footprint is significantly smaller.

from threading import Thread
import os, time, datetime, random, tracemalloc

tracemalloc.start()
children = 4    # number of child threads to spawn
maxdelay = 6    # maximum delay in seconds

def status():
    return ('Time: ' + 
        str(datetime.datetime.now().time()) +
        '\t Malloc, Peak: ' +
        str(tracemalloc.get_traced_memory()))

def child(num):
    delay = random.randrange(maxdelay)
    print(f"{status()}\t\tProcess {num}, PID: {os.getpid()}, Delay: {delay} seconds...")
    time.sleep(delay)
    print(f"{status()}\t\tProcess {num}: Done.")

if __name__ == '__main__':
    print(f"Parent PID: {os.getpid()}")
    for i in range(children):
        thr = Thread(target=child, args=(i,))
        thr.start()

Output:

Parent PID: 19770
Time: 10:44:40.942558    Malloc, Peak: (9150, 9264)     Process 0, PID: 19770, Delay: 3 seconds...
Time: 10:44:40.942937    Malloc, Peak: (13989, 14103)       Process 1, PID: 19770, Delay: 5 seconds...
Time: 10:44:40.943298    Malloc, Peak: (18734, 18848)       Process 2, PID: 19770, Delay: 3 seconds...
Time: 10:44:40.943746    Malloc, Peak: (23959, 24073)       Process 3, PID: 19770, Delay: 2 seconds...
Time: 10:44:42.945896    Malloc, Peak: (26599, 26713)       Process 3: Done.
Time: 10:44:43.945739    Malloc, Peak: (26741, 27223)       Process 0: Done.
Time: 10:44:43.945942    Malloc, Peak: (26851, 27333)       Process 2: Done.
Time: 10:44:45.948107    Malloc, Peak: (24639, 27475)       Process 1: Done.

Asynchrony

Asynchrony is an alternative to threading for writing concurrent applications. Asynchronous events occur on independent schedules, "out of sync" with one another, entirely within a single thread.

Unlike threading, in asynchronous programs the programmer controls when and how voluntary preemption occurs, making it easier to isolate and avoid race conditions.

Introduction to the Python 3.7 asyncio Module

In Python 3.7, asynchronous operations are provided by the asyncio module.

High-Level vs Low-Level asyncio API

Asyncio components are divided into high-level APIs (for writing programs), and low-level APIs (for writing libraries or frameworks based on asyncio).

Every asyncio program can be written using only the high-level APIs. If you're not writing a framework or library, you never need to touch the low-level stuff.

With that said, let's look at the core high-level APIs, and discuss the core concepts.

Coroutines

In general, a coroutine (short for cooperative subroutine) is a function designed for voluntary preemptive multitasking: it proactively yields to other routines and processes, rather than being forcefully preempted by the kernel. The term "coroutine" was coined in 1958 by Melvin Conway (of "Conway's Law" fame), to describe code that actively facilitates the needs of other parts of a system.

In asyncio, this voluntary preemption is called awaiting.

Awaitables, Async, and Await

Any object which can be awaited (voluntarily preempted by a coroutine) is called an awaitable.

The await keyword suspends the execution of the current coroutine, and calls the specified awaitable.

In Python 3.7, the three awaitable objects are coroutine, task, and future.

An asyncio coroutine is any Python function whose definition is prefixed with the async keyword.

async def my_coro():
    pass

An asyncio task is an object that wraps a coroutine, providing methods to control its execution, and query its status. A task may be created with asyncio.create_task(), or asyncio.gather().

An asyncio future is a low-level object that acts as a placeholder for data that hasn't yet been calculated or fetched. It can provide an empty structure to be filled with data later, and a callback mechanism that is triggered when the data is ready.

A task inherits all but two of the methods available to a future, so in Python 3.7 you never need to create a future object directly.

Event Loops

In asyncio, an event loop controls the scheduling and communication of awaitable objects. An event loop is required to use awaitables. Every asyncio program has at least one event loop. It's possible to have multiple event loops, but multiple event loops are strongly discouraged in Python 3.7.

A reference to the currently-running loop object is obtained by calling asyncio.get_running_loop().

Sleeping

The asyncio.sleep(delay) coroutine blocks for delay seconds. It's useful for simulating blocking I/O.

import asyncio

async def main():
    print("Sleep now.")
    await asyncio.sleep(1.5)
    print("OK, wake up!")

asyncio.run(main())
Initiating the Main Event Loop

The canonical entrance point to an asyncio program is asyncio.run(main()), where main() is a top-level coroutine.

import asyncio

async def my_coro(arg):
    "A coroutine."  
    print(arg)

async def main():
    "The top-level coroutine."
    await my_coro(42)

asyncio.run(main())

Calling asyncio.run() implicitly creates and runs an event loop. The loop object has many useful methods, including loop.time(), which returns a float representing the current time, as measured by the loop's internal clock.

Note: The asyncio.run() function cannot be called from within an existing event loop. Therefore, it is possible that you see errors if you're running the program within a supervising environment, such as Anaconda or Jupyter, which is running an event loop of its own. The example programs in this section and the following sections should be run directly from the command line by executing the python file.

The following program prints lines of text, blocking for one second after each line until the last.

import asyncio

async def my_coro(delay):
    loop = asyncio.get_running_loop()
    end_time = loop.time() + delay
    while True:
        print("Blocking...")
        await asyncio.sleep(1)
        if loop.time() > end_time:
            print("Done.")
            break

async def main():
    await my_coro(3.0)

asyncio.run(main())

Output:

Blocking...
Blocking...
Blocking...
Done.
Tasks

A task is an awaitable object that wraps a coroutine. To create and immediately schedule a task, you can call the following:

asyncio.create_task(coro(args...))

This will return a task object. Creating a task tells the loop, "go ahead and run this coroutine as soon as you can."

If you await a task, execution of the current coroutine is blocked until that task is complete.

import asyncio

async def my_coro(n):
    print(f"The answer is {n}.")

async def main():
    # By creating the task, it's scheduled to run 
    # concurrently, at the event loop's discretion.
    mytask = asyncio.create_task(my_coro(42))
    
    # If we later await the task, execution stops there
    # until the task is complete. If the task is already
    # complete before it is awaited, nothing is awaited. 
    await mytask

asyncio.run(main())

Output:

The answer is 42.

Tasks have several useful methods for managing the wrapped coroutine. Notably, you can request that a task be canceled by calling the task's .cancel() method. The task will be scheduled for cancellation on the next cycle of the event loop. Cancellation is not guaranteed: the task may complete before that cycle, in which case the cancellation does not occur.

Gathering Awaitables

Awaitables can be gathered as a group, by providing them as a list argument to the built-in coroutine asyncio.gather(awaitables).

The asyncio.gather() returns an awaitable representing the gathered awaitables, and therefore must be prefixed with await.

If any element of awaitables is a coroutine, it is immediately scheduled as a task.

Gathering is a convenient way to schedule multiple coroutines to run concurrently as tasks. It also associates the gathered tasks in some useful ways:

  • When all gathered tasks are complete, their aggregate return values are returned as a list, ordered in accordance with the awaitables list order.
  • Any gathered task may be canceled, without canceling the other tasks.
  • The gather itself can be cancelled, cancelling all tasks.
Example: Async Web Requests with aiohttp

The following example illustrates how these high-level asyncio APIs can be implemented. The following is a modified version, updated for Python 3.7, of Scott Robinson's nifty asyncio example. His program leverages the aiohttp module to grab the top posts on Reddit, and output them to the console.

Make sure that you have aiohttp module installed before you run the script below. You can download the module via the following pip command:

$ pip install --user aiohttp
import sys  
import asyncio  
import aiohttp  
import json
import datetime

async def get_json(client, url):  
    async with client.get(url) as response:
        assert response.status == 200
        return await response.read()

async def get_reddit_top(subreddit, client, numposts):  
    data = await get_json(client, 'https://www.reddit.com/r/' + 
        subreddit + '/top.json?sort=top&t=day&limit=' +
        str(numposts))

    print(f'\n/r/{subreddit}:')

    j = json.loads(data.decode('utf-8'))
    for i in j['data']['children']:
        score = i['data']['score']
        title = i['data']['title']
        link = i['data']['url']
        print('\t' + str(score) + ': ' + title + '\n\t\t(' + link + ')')

async def main():
    print(datetime.datetime.now().strftime("%A, %B %d, %I:%M %p"))
    print('---------------------------')
    loop = asyncio.get_running_loop()  
    async with aiohttp.ClientSession(loop=loop) as client:
        await asyncio.gather(
            get_reddit_top('python', client, 3),
            get_reddit_top('programming', client, 4),
            get_reddit_top('asyncio', client, 2),
            get_reddit_top('dailyprogrammer', client, 1)
            )

asyncio.run(main())

If you run the program multiple times, you'll see that the order of the output changes. That's because the JSON requests are displayed as they're received, which is dependent on the server's response time, and the intermediate network latency. On a Linux system, you can observe this in action by running the script prefixed with (e.g.) watch -n 5, which will refresh the output every 5 seconds:

Other High-level APIs

Hopefully, this overview gives you a solid foundation of how, when, and why to use asyncio. Other high-level asyncio APIs, not covered here, include:

  • stream, a set of high-level networking primitives for managing asynchronous TCP events.
  • lock, event, condition, async analogs of the synchronization primitives provided in the threading module.
  • subprocess, a set of tools for running async subprocesses, such as shell commands.
  • queue, an asynchronous analog of the queue module.
  • exception, for handling exceptions in async code.

Conclusion

Keep in mind that even if your program doesn't require asynchrony for performance reasons, you can still use asyncio if you prefer writing within the asynchronous paradigm. I hope this overview gives you a solid understanding of how, when, and why to begin using use asyncio.