← Back to Chapters

Python AsyncIO

⚡ Python AsyncIO

Write concurrent, non-blocking code using async and await.

? Quick Overview

The asyncio module is Python’s built-in framework for writing concurrent code using the async/await syntax. It is designed for I/O-bound and high-level structured network code, not for CPU-heavy tasks.

Instead of blocking while waiting for operations like network or disk I/O to finish, asyncio lets your program await them and run other coroutines in the meantime, all within a single OS thread.

Import it like this: import asyncio

? Key Concepts

  • Coroutine – A special function declared with async def that can be awaited.
  • async / await – Keywords for defining and pausing coroutines.
  • Event loop – The core of asyncio that schedules and runs coroutines.
  • Task – A wrapper around a coroutine so that it runs concurrently on the event loop.
  • asyncio.run() – High-level entry point to start the event loop and run the top-level coroutine.
  • asyncio.create_task() – Schedule a coroutine to run concurrently as a task.
  • asyncio.gather() – Run multiple coroutines concurrently and collect their results.
  • Awaitable – Any object you can use with await (coroutines, tasks, some futures).

? Syntax and Core APIs

? Defining and Running a Coroutine

A coroutine is defined with async def. To run it from normal (synchronous) code, use asyncio.run() once at the entry point of your program.

? Basic asyncio.run() Syntax
import asyncio

async def main():
    print("Hello ...")
    await asyncio.sleep(1)  # non-blocking delay
    print("... AsyncIO!")

asyncio.run(main())

? Running Coroutines Concurrently with Tasks

Use asyncio.create_task() or asyncio.gather() to run multiple coroutines concurrently on the same event loop.

? Tasks and asyncio.gather() Syntax
import asyncio

async def worker(name, delay):
    print(f"{name} started")
    await asyncio.sleep(delay)
    print(f"{name} finished after {delay} seconds")
    return delay

async def main():
    task1 = asyncio.create_task(worker("Task A", 2))
    task2 = asyncio.create_task(worker("Task B", 1))

    # Wait for both tasks concurrently
    results = await asyncio.gather(task1, task2)
    print("All done! Results:", results)

asyncio.run(main())

? Code Examples

⏱️ Sequential vs Concurrent AsyncIO

This example shows how asyncio can reduce total time when you have multiple I/O-bound tasks.

? View Code Example
import asyncio
import time

async def fetch_data(name, delay):
    print(f"Start fetching {name}")
    await asyncio.sleep(delay)  # simulate I/O
    print(f"Finished fetching {name}")
    return name

async def run_concurrent():
    start = time.perf_counter()

    # Run three coroutines concurrently
    results = await asyncio.gather(
        fetch_data("User", 2),
        fetch_data("Orders", 3),
        fetch_data("Notifications", 1),
    )

    end = time.perf_counter()
    print("Results:", results)
    print(f"Concurrent total time: {end - start:.2f} seconds")

async def run_sequential():
    start = time.perf_counter()

    # Run the same coroutines sequentially
    r1 = await fetch_data("User", 2)
    r2 = await fetch_data("Orders", 3)
    r3 = await fetch_data("Notifications", 1)

    end = time.perf_counter()
    print("Results:", [r1, r2, r3])
    print(f"Sequential total time: {end - start:.2f} seconds")

async def main():
    print("=== Concurrent ===")
    await run_concurrent()

    print("\n=== Sequential ===")
    await run_sequential()

asyncio.run(main())

? Simple Async HTTP-Like Example (Simulated)

Imagine these functions perform network requests. Here we just use asyncio.sleep() to simulate latency.

? View Code Example
import asyncio

async def get_page(url):
    print(f"Downloading {url} ...")
    await asyncio.sleep(1.5)  # simulate network delay
    print(f"Finished {url}")
    return f"<html>Fake content for {url}</html>"

async def main():
    urls = [
        "https://example.com",
        "https://example.com/blog",
        "https://example.com/about",
    ]

    tasks = [asyncio.create_task(get_page(u)) for u in urls]
    pages = await asyncio.gather(*tasks)

    print("\nDownloaded pages:")
    for u, p in zip(urls, pages):
        print(u, "->", len(p), "chars")

asyncio.run(main())

⚠️ Mixing Blocking Code with AsyncIO

Blocking (CPU-heavy or long-running) functions will freeze the event loop. You can push them to a thread using asyncio.to_thread() to keep the loop responsive.

? View Code Example
import asyncio
import time

def blocking_task(n):
    print(f"[blocking] Starting heavy computation {n}")
    time.sleep(2)  # blocks the thread
    print(f"[blocking] Finished computation {n}")
    return n * n

async def main():
    print("Before blocking call")

    # Offload blocking_task to a thread so the event loop is not blocked
    result = await asyncio.to_thread(blocking_task, 10)

    print("After blocking call, result:", result)

asyncio.run(main())

?️ Live Output and Explanation

? Understanding the Concurrent vs Sequential Example

In the concurrent part, we start three fetch_data() coroutines at the same time using asyncio.gather(). While one coroutine is waiting in await asyncio.sleep(), the event loop can switch to another coroutine.

Roughly, you will see logs like:

=== Concurrent ===
Start fetching User
Start fetching Orders
Start fetching Notifications
Finished fetching Notifications
Finished fetching User
Finished fetching Orders
Results: ['User', 'Orders', 'Notifications']
Concurrent total time: ~3.00 seconds

=== Sequential ===
Start fetching User
Finished fetching User
Start fetching Orders
Finished fetching Orders
Start fetching Notifications
Finished fetching Notifications
Results: ['User', 'Orders', 'Notifications']
Sequential total time: ~6.00 seconds

Even though each fetch_data waits for a few seconds, concurrency allows the total time to be close to the longest single delay (≈3 seconds), instead of the sum (2 + 3 + 1 = 6 seconds).

? Tips and Best Practices

  • Use asyncio primarily for I/O-bound tasks (network, disk, database) rather than CPU-heavy work.
  • Structure your application with asyncio.run(main()) as the entry point and keep main() async.
  • Prefer asyncio.create_task() for fire-and-forget or background tasks; always keep a reference if you need to await or cancel them later.
  • Use asyncio.gather() when you want to wait for a group of coroutines to finish and collect results.
  • Avoid calling blocking functions directly inside async code; use asyncio.to_thread() or dedicated async libraries.
  • Be careful with exceptions: tasks can fail in the background. Await them or add callbacks to handle errors.
  • Do not nest asyncio.run() inside already running event loops (e.g., in notebooks). Use await directly instead.

? Common Use Cases

  • High-throughput web servers and APIs (e.g., frameworks like FastAPI, aiohttp, Sanic).
  • Network clients that open many simultaneous connections (HTTP, websockets, chat apps).
  • Microservices that perform lots of parallel I/O to databases or other services.
  • Task schedulers and background workers that manage many small I/O-bound jobs.
  • Bots and automation tools that talk to multiple external APIs concurrently.

? Try It Yourself – Practice Tasks

  1. Write a small script that:
    • Defines an async function countdown(n) that prints numbers from n to 1, awaiting asyncio.sleep(1) between prints.
    • Starts two countdowns concurrently using asyncio.gather().
  2. Simulate downloading 5 files using asyncio:
    • Each “download” function should accept a filename and delay.
    • Run them all concurrently and print total elapsed time.
  3. Create an example that:
    • Uses asyncio.to_thread() to run a blocking function (like computing Fibonacci or writing a large file).
    • Shows that your async loop can keep printing a “still responsive” message while the blocking work runs in the thread.
  4. Explore error handling:
    • Write three coroutines where one raises an exception.
    • Experiment with asyncio.gather(..., return_exceptions=True) vs the default behavior.