Write concurrent, non-blocking code using async and await.
The asyncio module is Python’s built-in framework for writing concurrent code using the async/await syntax. It is designed for I/O-bound and high-level structured network code, not for CPU-heavy tasks.
Instead of blocking while waiting for operations like network or disk I/O to finish, asyncio lets your program await them and run other coroutines in the meantime, all within a single OS thread.
Import it like this: ✅ import asyncio
async def that can be awaited.async / await – Keywords for defining and pausing coroutines.asyncio that schedules and runs coroutines.asyncio.run() – High-level entry point to start the event loop and run the top-level coroutine.asyncio.create_task() – Schedule a coroutine to run concurrently as a task.asyncio.gather() – Run multiple coroutines concurrently and collect their results.await (coroutines, tasks, some futures).A coroutine is defined with async def. To run it from normal (synchronous) code, use asyncio.run() once at the entry point of your program.
asyncio.run() Syntax
import asyncio
async def main():
print("Hello ...")
await asyncio.sleep(1) # non-blocking delay
print("... AsyncIO!")
asyncio.run(main())
Use asyncio.create_task() or asyncio.gather() to run multiple coroutines concurrently on the same event loop.
asyncio.gather() Syntax
import asyncio
async def worker(name, delay):
print(f"{name} started")
await asyncio.sleep(delay)
print(f"{name} finished after {delay} seconds")
return delay
async def main():
task1 = asyncio.create_task(worker("Task A", 2))
task2 = asyncio.create_task(worker("Task B", 1))
# Wait for both tasks concurrently
results = await asyncio.gather(task1, task2)
print("All done! Results:", results)
asyncio.run(main())
This example shows how asyncio can reduce total time when you have multiple I/O-bound tasks.
import asyncio
import time
async def fetch_data(name, delay):
print(f"Start fetching {name}")
await asyncio.sleep(delay) # simulate I/O
print(f"Finished fetching {name}")
return name
async def run_concurrent():
start = time.perf_counter()
# Run three coroutines concurrently
results = await asyncio.gather(
fetch_data("User", 2),
fetch_data("Orders", 3),
fetch_data("Notifications", 1),
)
end = time.perf_counter()
print("Results:", results)
print(f"Concurrent total time: {end - start:.2f} seconds")
async def run_sequential():
start = time.perf_counter()
# Run the same coroutines sequentially
r1 = await fetch_data("User", 2)
r2 = await fetch_data("Orders", 3)
r3 = await fetch_data("Notifications", 1)
end = time.perf_counter()
print("Results:", [r1, r2, r3])
print(f"Sequential total time: {end - start:.2f} seconds")
async def main():
print("=== Concurrent ===")
await run_concurrent()
print("\n=== Sequential ===")
await run_sequential()
asyncio.run(main())
Imagine these functions perform network requests. Here we just use asyncio.sleep() to simulate latency.
import asyncio
async def get_page(url):
print(f"Downloading {url} ...")
await asyncio.sleep(1.5) # simulate network delay
print(f"Finished {url}")
return f"<html>Fake content for {url}</html>"
async def main():
urls = [
"https://example.com",
"https://example.com/blog",
"https://example.com/about",
]
tasks = [asyncio.create_task(get_page(u)) for u in urls]
pages = await asyncio.gather(*tasks)
print("\nDownloaded pages:")
for u, p in zip(urls, pages):
print(u, "->", len(p), "chars")
asyncio.run(main())
Blocking (CPU-heavy or long-running) functions will freeze the event loop. You can push them to a thread using asyncio.to_thread() to keep the loop responsive.
import asyncio
import time
def blocking_task(n):
print(f"[blocking] Starting heavy computation {n}")
time.sleep(2) # blocks the thread
print(f"[blocking] Finished computation {n}")
return n * n
async def main():
print("Before blocking call")
# Offload blocking_task to a thread so the event loop is not blocked
result = await asyncio.to_thread(blocking_task, 10)
print("After blocking call, result:", result)
asyncio.run(main())
In the concurrent part, we start three fetch_data() coroutines at the same time using asyncio.gather(). While one coroutine is waiting in await asyncio.sleep(), the event loop can switch to another coroutine.
Roughly, you will see logs like:
=== Concurrent ===
Start fetching User
Start fetching Orders
Start fetching Notifications
Finished fetching Notifications
Finished fetching User
Finished fetching Orders
Results: ['User', 'Orders', 'Notifications']
Concurrent total time: ~3.00 seconds
=== Sequential ===
Start fetching User
Finished fetching User
Start fetching Orders
Finished fetching Orders
Start fetching Notifications
Finished fetching Notifications
Results: ['User', 'Orders', 'Notifications']
Sequential total time: ~6.00 seconds
Even though each fetch_data waits for a few seconds, concurrency allows the total time to be close to the longest single delay (≈3 seconds), instead of the sum (2 + 3 + 1 = 6 seconds).
asyncio primarily for I/O-bound tasks (network, disk, database) rather than CPU-heavy work.asyncio.run(main()) as the entry point and keep main() async.asyncio.create_task() for fire-and-forget or background tasks; always keep a reference if you need to await or cancel them later.asyncio.gather() when you want to wait for a group of coroutines to finish and collect results.asyncio.to_thread() or dedicated async libraries.asyncio.run() inside already running event loops (e.g., in notebooks). Use await directly instead.countdown(n) that prints numbers from n to 1, awaiting asyncio.sleep(1) between prints.asyncio.gather().asyncio:
asyncio.to_thread() to run a blocking function (like computing Fibonacci or writing a large file).asyncio.gather(..., return_exceptions=True) vs the default behavior.