When building applications within the confines of a single-threaded, synchronous language, the limitations become very obvious very quickly. The first thing that comes to mind is writes: the very definition of an I/O-bound task. When writing data to files (or databases), each "write" action intentionally occupies a thread until the write is complete. This makes a lot of sense for ensuring data integrity in most systems. For example, if two operations simultaneously attempt to update a database record, which one is correct? Alternatively, if a script requires an HTTP request to succeed before continuing, how could we move on until we know the request succeeded?

HTTP requests are among the most common thread-blocking operations. When we write scripts that expect data from an external third party, we introduce a myriad of uncertainties that can only be answered by the request itself, such as response time latency, the nature of data we expect to receive, or whether the request will succeed. Even when working with APIs we're confident in, no operation is sure to succeed until it's complete. Hence, we're "blocked."

As applications increase complexity to support more simultaneous user interactions, software is moving away from the paradigm of being executed linearly. So while we might not be sure that a specific request succeeds or a database write is completed, this can be acceptable as long as we have ways to handle and mitigate these issues gracefully.

A Problem Worthy of Asynchronous Execution

How long do you suppose it would take a Python script to execute a few hundred HTTP requests, parse each response, and write the output to a single file? If you were to use requests in a simple for loop, you'd need to wait a fair amount of time for Python to execute each request, open a file, write to it, close it, and move on to the next.

Let's put asyncio's ability to improve script efficiency to an actual test. We'll execute two I/O-blocking actions per task for a few hundred URLs: executing and parsing an HTTP request and writing the desired result to a single file. The input for our experiment will be a ton of URLs, with the expected output to be metadata parsed from those URLs. Let's see how long it takes to do this for hundreds of URLs.

This site has roughly two hundred published posts of its own, which makes it a great guinea pig for this little experiment. I've created a CSV that contains the URLs to these posts, which will be our input. Here's a sneak peek below:

Sample Input

url
https://hackersandslackers.com/intro-to-asyncio-concurrency/
https://hackersandslackers.com/multiple-python-versions-ubuntu-20-04/
https://hackersandslackers.com/google-bigquery-python/
https://hackersandslackers.com/plotly-chart-studio/
https://hackersandslackers.com/deploy-serverless-golang-functions-with-netlify/
https://hackersandslackers.com/scrape-metadata-json-ld/
https://hackersandslackers.com/terraform-with-google-cloud/
https://hackersandslackers.com/deploy-golang-app-nginx/
https://hackersandslackers.com/4-ways-to-improve-your-plotly-graphs/
https://hackersandslackers.com/create-your-first-golang-app/
...etc
Input CSV

Sample Output

For each URL found in our input CSV, our script will fetch the URL, parse the page, and write some choice data to a single CSV. The result will resemble the below example:

title description primary_tag url published_at
Intro to Asynchronous Python with Asyncio Execute multiple tasks concurrently in Python with Asyncio: Python`s built-in async library. Python https://hackersandslackers.com/intro-to-asyncio-concurrency/ 2022-01-04
Deploy Serverless Golang Functions with Netlify Write and deploy Golang Lambda Functions to your GatsbyJS site on Netlify. JAMStack https://hackersandslackers.com/deploy-serverless-golang-functions-with-netlify/ 2020-08-02
SSH & SCP in Python with Paramiko Automate remote server tasks by using the Paramiko & SCP Python libraries. Use Python to SSH into hosts; execute tasks; transfer files; etc. Python https://hackersandslackers.com/automate-ssh-scp-python-paramiko/ 2020-01-03
Create Cloud-hosted Charts with Plotly Chart Studio Use Pandas and Plotly to create cloud-hosted data visualizations on-demand in Python. Plotly https://hackersandslackers.com/plotly-chart-studio/ 2020-09-03
Create Your First Golang App Set up a local Golang environment and learn the basics to create and publish your first `Hello world` app. Golang https://hackersandslackers.com/create-your-first-golang-app/ 2020-05-25
Creating Interactive Views in Django Create interactive user experiences by writing Django views to handle dynamic content; submitting forms; and interacting with data. Django https://hackersandslackers.com/creating-django-views/ 2020-04-23
Define Relationships Between SQLAlchemy Data Models SQLAlchemy`s ORM easily defines data models with relationships such as one-to-one; one-to-many; and many-to-many relationships. SQLAlchemy https://hackersandslackers.com/sqlalchemy-data-models/ 2019-07-11
...etc
Example of what our script will output

Tools For The Job

We're going to need three core Python libraries to pull this off:

  • Asyncio: Python's bread-and-butter library for running asynchronous IO-bound tasks. The library has built itself into the Python core language, introducing async/await keywords that denote when a function is run asynchronously and when to wait on such a function (respectively).
  • Aiohttp: When used on the client-side, similar to Python's requests library for making asynchronous requests. Alternatively, aiohttp can be used inversely: as an application webserver to handle incoming requests & serving responses, but that's a tale for another time.
  • Aiofiles: Makes writing to disk (such as creating and writing bytes to files) a non-blocking task, such that multiple writes can happen on the same thread without blocking one another - even when multiple tasks are bound to the same file.
$ pip install asyncio aiohttp aiofiles

Install the necessary libraries

BONUS: Dependencies to Optimize Speed

aiohttp can execute requests even faster by simply installing a few supplemental libraries. These libraries are cchardet (character encoding detection), aiodns (asynchronous DNS resolution), and brotlipy (lossless compression). I'd highly recommend installing these using the conveniently provided shortcut below (take it from me, I'm a stranger on the internet):

$ pip install aiohttp[speedups]

Install supplemental dependencies

Preparing an Asynchronous Script/Application

We're going to structure this script like any other Python script. Our main module, aiohttp_aiofiles_tutorial will handle all of our logic. config.py and main.py both live outside the main module and offer our script some basic configuration and an entry point respectively:

/aiohttp-aiofiles-tutorial
├── /aiohttp_aiofiles_tutorial
│   ├── __init__.py
│   ├── fetcher.py
│   ├── loops.py
│   ├── tasks.py
│   ├── parser.py
│   └── /data  # Source data
│       ├── __init__.py
│       ├── parser.py
│       ├── tests
│       └── urls.csv
├── /export  # Destination for exported data
├── config.py
├── logger.py
├── main.py
├── pyproject.toml
├── Makefile
├── README.md
└── requirements.txt

Project structure of our async fetcher/writer

/export is an empty directory in which we'll write our output file.

The /data submodule contains the input CSV mentioned above and some basic logic to parse it. It's not much to phone home about, but if you're curious the source is available on the Github repo.

Kicking Things Off

With sleeves rolled high, we start with the obligatory script "entry point," main.py. This initiates the core function in /aiohttp_aiofiles_tutorial called init_script():

"""Script entry point."""
import asyncio

from aiohttp_aiofiles_tutorial import init_script


if __name__ == "__main__":
    asyncio.run(init_script())

main.py

This seems like we're running a single function/coroutine init_script() via asyncio.run(), which seems counter-intuitive at first glance. Isn't the point of asyncio to run multiple coroutines concurrently, you ask?

Indeed it is! init_script() is a coroutine that calls other coroutines. Some of these coroutines create tasks out of other coroutines, others execute them, etc. asyncio.run() creates an event loop that doesn't stop running until the target coroutine is done, including all the coroutines that the parent coroutine calls. So, if we keep things clean, asyncio.run() is a one-time call to initialize a script.

Initializing Our Script

Here's where the fun begins. We've established that the purpose of our script is to output a single CSV file, and that's where we'll start: by creating and opening an output file within the context in which our entire script will operate:

"""Make hundreds of requests concurrently and save responses to disk."""
import aiofiles
from config import EXPORT_FILEPATH


async def init_script():
    """Prepare output file & kickoff task creation/execution."""
    async with aiofiles.open(EXPORT_FILEPATH, mode="w+") as outfile:
        await outfile.write(
            "title,description,primary_tag,url,published_at\n"
        )
        # (The rest of our script logic will be executed here).
        # ...

aiohttp_aiofiles_tutorial/__init__.py

Our script begins by opening a file context with aiofiles. As long as our script operates inside the context of an open async file via async with aiofiles.open() as outfile:, we can write to this file constantly without worrying about opening and closing the file.

Compare this to the synchronous (default) implementation of handling file I/O in Python, with open() as outfile:. By using aiofiles, we can write data to the same file from multiple sources at virtually the same time.

EXPORT_FILEPATH happens to target a CSV (/export/hackers_pages_metadata.csv). Every CSV needs a row of headers; hence our one-off usage of await outfile.write() writing headers immediately after opening our CSV:

...
await outfile.write(
    "title,description,primary_tag,url,published_at\n"
)

Write a single row to a CSV

Moving Along

Below is the fully fleshed-out version of __init__.py will ultimately put our script into action. The most notable addition is the introduction of the execute_fetcher_tasks() coroutine; we'll dissect this one piece at a time:

"""Make hundreds of requests concurrently and save responses to disk."""
import asyncio
import time

import aiofiles
from aiofiles.threadpool.text import AsyncTextIOWrapper as AsyncIOFile
from aiohttp import ClientSession
from config import EXPORT_FILEPATH, HTTP_HEADERS

from .data import urls_to_fetch  # URLs parsed from a CSV
from .tasks import create_tasks  # Creates one task per URL


async def init_script():
    """Prepare output file & kickoff task creation/execution."""
    async with aiofiles.open(EXPORT_FILEPATH, mode="w+") as outfile:
        await outfile.write(
            "title,description,primary_tag,url,published_at\n"
        )
        await execute_fetcher_tasks(outfile)
        await outfile.close()


async def execute_fetcher_tasks(outfile: AsyncIOFile):
    """
    Open async HTTP session & execute created tasks.

    :param AsyncIOFile outfile: Path of local file to write to.
    """
    async with ClientSession(headers=HTTP_HEADERS) as session:
        task_list = await create_tasks(session, urls_to_fetch, outfile)
        await asyncio.gather(*task_list)

aiohttp_aiofiles_tutorial/__init__.py

execute_fetcher_tasks() is broken out mainly to organize our code. This coroutine accepts outfile as a parameter, which will serve as the destination for data we end up parsing. Taking this line-by-line:

  • async with ClientSession(headers=HTTP_HEADERS) as session: Unlike the Python requests library, aiohttp enables us to open a client-side session that creates a connection pool that allows for up to 100 active connections simultaneously. Because we will make under 200 requests, the time it will take to fetch all these URLs will be comparable to the time it takes Python to fetch two under normal circumstances.
  • create_tasks(): This function we're about to define accepts three parameters. The first is the async ClientSession we just opened a line earlier. Next, we have the urls_to_fetch variable (imported earlier in our script). This is a simple Python list of strings, where each string is a URL parsed from our earlier "input" CSV. That logic is handled elsewhere via a simple function (and not important for this tutorial). Lastly, outfile is passed along, as we'll be writing to this file later. With these parameters, create_tasks() will create a task for each of our 174 URLs. Each of these will download the contents of the given URL to the target directory. This function returns the tasks but will not execute them until we give the word, which happens via...
  • asyncio.gather(*task_list): Asyncio's gather() method performs a collection of tasks inside the currently running event loop. Once this kicks off, the speed benefits of asynchronous I/O will become immediately apparent.

Creating Asyncio Tasks

If you recall, a Python Task wraps a function (coroutine) that we'll execute in the future. In addition, each task can be temporarily put on hold for other tasks. A predefined coroutine must be passed along with the proper parameters before execution to create a task.

I separated create_tasks() to return a list of Python Tasks, where each "task" will execute fetching one of our URLs:

"""Prepare tasks to be executed."""
import asyncio
from asyncio import Task
from typing import List

from aiofiles.threadpool.text import AsyncTextIOWrapper as AsyncIOFile
from aiohttp import ClientSession

from .fetcher import fetch_url_and_save_data


async def create_tasks(
    session: ClientSession, urls: List[str], outfile: AsyncIOFile
) -> List[Task]:
    """
    Create asyncio tasks to parse HTTP request responses.

    :param ClientSession session: Async HTTP requests session.
    :param List[str] urls: Resource URLs to fetch.
    :param AsyncIOFile outfile: Path of local file to write to.

    :returns: List[Task]
    """
    task_list = []
    for i, url in enumerate(urls):
        task = asyncio.create_task(
            fetch_url_and_save_data(
                session,
                url,
                outfile,
                len(urls),
                i
            )
        )
        task_list.append(task)
    return task_list

aiohttp_aiofiles_tutorial/tasks.py

A few notable things about asyncio Tasks:

  • We're defining "work is to be done" up front. The creation of a Task doesn't execute code. Our script will run the same function 174 times concurrently with different parameters. It makes sense that we'd want to define these tasks upfront.
  • Defining tasks is quick and straightforward. In an instant, each URL from our CSV will have a corresponding Task created and added to task_list.
  • With our tasks prepared, there's only one thing left to do to kick them all off and start the party. That's where the asyncio.gather(*task_list) line from __init__.py comes into play.
💡
Asyncio's Task object is a class with its attributes and methods, providing a wrapper with ways to check task status, cancel tasks, and so forth.

Executing our Tasks

Back in create_tasks(), we created tasks that each individually execute a method called fetch_url_and_save_data() per task. This function does three things:

  • Make an async request to the given task's URL via aiohttp's session context (handled by async with session.get(url) as resp:)
  • Read the body of the response as a string.
  • Write the contents of the response body to a file by passing html to our last function, parse_html_page_metadata():
"""Fetch URLs, extract their contents, and write parsed data to file."""
from aiofiles.threadpool.text import AsyncTextIOWrapper as AsyncIOFile
from aiohttp import ClientError, ClientSession, InvalidURL
from logger import LOGGER

from .parser import parse_html_page_metadata


async def fetch_url_and_save_data(
    session: ClientSession,
    url: str,
    outfile: AsyncIOFile,
    total_count: int,
    i: int,
):
    """
    Fetch raw HTML from a URL prior to parsing.

    :param ClientSession session: Async HTTP requests session.
    :param str url: Target URL to be fetched.
    :param AsyncIOFile outfile: Path of local file to write to.
    :param int total_count: Total number of URLs to be fetched.
    :param int i: Current iteration of URL out of total URLs.
    """
    try:
        async with session.get(url) as resp:
            if resp.status != 200:
                pass
            html = await resp.text()
            page_metadata = await parse_html_page_metadata(html, url)
            await outfile.write(f"{page_metadata}\n")
            LOGGER.info(
                f"Fetched URL {i} of {total_count}: {page_metadata}"
            )
    except InvalidURL as e:
        LOGGER.error(f"Unable to fetch invalid URL `{url}`: {e}")
    except ClientError as e:
        LOGGER.error(f"ClientError while fetching URL `{url}`: {e}")
    except Exception as e:
        LOGGER.error(
            f"Unexpected error while fetching URL `{url}`: {e}"
        )

aiohttp_aiofiles_tutorial/fetcher.py

When fetching a URL via an aiohttp ClientSession, calling the .text() method on the response (await resp.text()) will return the response of a request as a string. This is not to be confused with .body(), which returns a bytes object (useful for pulling media files or anything besides a string).

If you're keeping track, we're now three "contexts" deep:

  1. We started our script by opening a aiofiles.open() context, which will remain open until our script is complete. This allows us to write to our outfile from any task for the duration of our script.
  2. After writing headers to our CSV file, we opened a persistent client request session with async with ClientSession() as session, which allows us to make requests en masse as long as the session is open.
  3. In the snippet above, we've entered a third and final context: the response context for a single URL (via async with session.get(url) as resp). Unlike the other two contexts, we'll enter and leave this context 174 times (once per URL).

Inside each URL response context is where we finally start producing some output. This leaves us with our final bit of logic (await parse_html_page_metadata(html, url)) which parses each URL response and returns some scraped metadata from the page before writing said metadata to our outfile on the next line, await outfile.write(f"{page_metadata}\n").

Write Parsed Metadata to CSV

How are we planning to rip metadata out of HTML pages, you ask? With BeautifulSoup, of course! With the HTML of an HTTP response in hand, we use bs4 to parse each URL response and return values for each of the columns in our outfile: title, description, primary_tag, published at, and url.

These five values are returned as a comma-separated string, then written to our outfile CSV as a single row.

"""Parse metadata from raw HTML."""
from bs4 import BeautifulSoup
from bs4.builder import ParserRejectedMarkup
from logger import LOGGER


async def parse_html_page_metadata(html: str, url: str) -> str:
    """
    Extract page metadata from raw HTML into a CSV row.

    :param str html: Raw HTML source of a given fetched URL.
    :param str url: URL associated with the extracted HTML.

    :returns: str
    """
    try:
        soup = BeautifulSoup(html, "html.parser")
        title = soup.title.string.replace(",", ";")
        description = (
            soup.head.select_one("meta[name=description]")
            .get("content")
            .replace(",", ";")
            .replace('"', "`")
            .replace("'", "`")
        )
        primary_tag = (
            soup.head
            .select_one("meta[property='article:tag']")
            .get("content")
        )
        published_at = (
            soup.head
            .select_one("meta[property='article:published_time']")
            .get("content")
            .split("T")[0]
        )
        if primary_tag is None:
            primary_tag = ""
        return f"{title}, {description}, {primary_tag}, {url}, {published_at}"
    except ParserRejectedMarkup as e:
        LOGGER.error(
            f"Failed to parse invalid html for {url}: {e}"
        )
    except ValueError as e:
        LOGGER.error(
            f"ValueError occurred when parsing html for {url}: {e}"
        )
    except Exception as e:
        LOGGER.error(
            f"Parsing failed when parsing html for {url}: {e}"
        )

aiohttp_aiofiles_tutorial/parser.py

Run the Jewels, Run the Script

Let's take this bad boy for a spin. I threw a timer into __init__.py to log the number of seconds that elapsed for the duration of the script:

"""Make hundreds of requests concurrently and save responses to disk."""
import time
from time import perf_counter as timer

...


async def init_script():
    """Prepare output file & kickoff task creation/execution."""
    start_time = timer()  # Add timer to function
    async with aiofiles.open(EXPORT_FILEPATH, mode="w+") as outfile:
        await outfile.write(
            "title,description,primary_tag,url,published_at\n"
        )
        await execute_fetcher_tasks(outfile)
        await outfile.close()
    LOGGER.success(
         f"Executed {__name__} in {time.perf_counter() - start_time:0.2f} seconds."
     )  # Log time of execution


...

aiohttp_aiofiles_tutorial/__init__.py

Mash that mfing make run command if you're following along in the repo (or just punch in python3 main.py). Strap yourself in:

...
16:12:34 PM | INFO: Fetched URL 165 of 173: Setting up a MySQL Database on Ubuntu, Setting up MySQL the old-fashioned way: on a linux server, DevOps, https://hackersandslackers.com/set-up-mysql-database/, 2018-04-17 

16:12:34 PM | INFO: Fetched URL 164 of 173: Dropping Rows of Data Using Pandas, Square one of cleaning your Pandas Dataframes: dropping empty or problematic data., Data Analysis, https://hackersandslackers.com/pandas-dataframe-drop/, 2018-04-18 

16:12:34 PM | INFO: Fetched URL 167 of 173: Installing Django CMS on Ubuntu, Get the play-by-play on how to install DjangoCMS: the largest of three major CMS products for Python`s Django framework., Software, https://hackersandslackers.com/installing-django-cms/, 2017-11-19 

16:12:34 PM | INFO: Fetched URL 166 of 173: Starting a Python Web App with Flask & Heroku, Pairing Flask with zero-effort container deployments is a deadly path to addiction., Architecture, https://hackersandslackers.com/flask-app-heroku/, 2018-02-13 

16:12:34 PM | INFO: Fetched URL 171 of 173: Another 'Intro to Data Analysis in Python Using Pandas' Post, An introduction to Python`s quintessential data analysis library., Data Analysis, https://hackersandslackers.com/intro-python-pandas/, 2017-11-16 

16:12:34 PM | INFO: Fetched URL 172 of 173: Managing Python Environments With Virtualenv, Embrace core best-practices in Python by managing your Python packages using virtualenv and virtualenvwrapper., Software, https://hackersandslackers.com/python-virtualenv-virtualenvwrapper/, 2017-11-15 

16:12:34 PM | INFO: Fetched URL 170 of 173: Visualize Folder Structures with Python’s Treelib, Using Python`s treelib library to output the contents of local directories as visual tree representations., Data Engineering, https://hackersandslackers.com/python-tree-hierachies-treelib/, 2017-11-17 

16:12:34 PM | INFO: Fetched URL 169 of 173: Merge Sets of Data in Python Using Pandas, Perform SQL-like merges of data using Python`s Pandas., Data Analysis, https://hackersandslackers.com/merge-dataframes-with-pandas/, 2017-11-17 

16:12:34 PM | INFO: Fetched URL 168 of 173: Starting an ExpressJS App, Installation guide for ExpressJS with popular customization options., JavaScript, https://hackersandslackers.com/create-an-expressjs-app/, 2017-11-18 

16:12:34 PM | SUCCESS: Executed aiohttp_aiofiles_tutorial in 2.96 seconds. 

The tail end of our log after fetching 174 pages in ~3 seconds

The higher end of our script's execution time is 3 seconds. A typical Python request takes 1-2 seconds to complete, so our speed optimization is hundreds of times faster for a sample size of data like this.

Writing async scripts in Python surely takes more effort, but not hundreds or thousands of times more effort. Even if isn't speed you're after, handling the volume of larger-scale applications renders asyncio critical. For example, if your chatbot or webserver is in the middle of handling a user's request, what happens when a second user attempts to interact with your app in the meantime? Often the answer is nothing: User 1 gets what they want, and User 2 is stuck taking to a blocked thread.

Anyway, seeing is believing. Here's the source code for this tutorial:

GitHub - hackersandslackers/aiohttp-aiofiles-tutorial: 🔄 🌐 Handle thousands of HTTP requests, disk writes, and other I/O-bound tasks simultaneously with Python’s quintessential async libraries.
🔄 🌐 Handle thousands of HTTP requests, disk writes, and other I/O-bound tasks simultaneously with Python's quintessential async libraries. - GitHub - hackersandslackers/aiohttp-aiofiles-tutor...

Source code for this tutorial