Fix: Python asyncio Blocking the Event Loop — Mixing Sync and Async Code
Quick Answer
How to fix Python asyncio event loop blocking — using run_in_executor for sync calls, asyncio.to_thread, avoiding blocking I/O in coroutines, and detecting event loop stalls.
The Problem
An async Python application becomes unresponsive under load:
async def handle_request(request):
# This blocks the entire event loop for every request
data = requests.get('https://api.example.com/data') # Sync HTTP call
return process(data.json())Or an async route is slow despite using async def:
@app.get("/report")
async def generate_report():
# Looks async, but calls a blocking CPU-bound function
report = generate_pdf(data) # Synchronous — blocks all other requests
return {"report": report}Or asyncio.run() called inside an already-running event loop:
RuntimeError: This event loop is already running.Or mixing time.sleep() with async code:
async def poll():
while True:
await check_status()
time.sleep(5) # Blocks the event loop — no other coroutines run during sleepWhy This Happens
Python’s asyncio runs coroutines on a single-threaded event loop. The event loop can only do one thing at a time — it switches between coroutines at await points. Any synchronous (blocking) code that runs inside a coroutine holds the event loop hostage for its entire duration:
- Blocking I/O —
requests.get(),open()with standard file I/O,psycopg2queries run synchronously. The event loop can’t switch to other coroutines while waiting. - CPU-bound operations — image processing, PDF generation, data transformation. Python’s GIL means only one thread runs Python code at a time — CPU-bound work in a coroutine blocks all others.
time.sleep()— blocks the thread, unlikeawait asyncio.sleep()which yields control back to the event loop.- Synchronous ORM calls — SQLAlchemy’s standard session is synchronous. Using it in an async route blocks the event loop.
asyncio.run()inside async code —asyncio.run()creates a new event loop. Calling it inside an already-running loop raisesRuntimeError.
Fix 1: Use asyncio.to_thread for Blocking I/O
asyncio.to_thread() (Python 3.9+) runs a synchronous function in a separate thread, freeing the event loop:
import asyncio
import requests # Synchronous HTTP library
# WRONG — blocks the event loop
async def fetch_data_wrong():
response = requests.get('https://api.example.com/data')
return response.json()
# CORRECT — run in a thread pool
async def fetch_data():
response = await asyncio.to_thread(requests.get, 'https://api.example.com/data')
return response.json()
# With keyword arguments
async def fetch_with_params():
response = await asyncio.to_thread(
requests.get,
'https://api.example.com/data',
timeout=30,
headers={'Authorization': 'Bearer token'}
)
return response.json()Python 3.8 and earlier — use loop.run_in_executor():
import asyncio
from concurrent.futures import ThreadPoolExecutor
executor = ThreadPoolExecutor(max_workers=10)
async def fetch_data():
loop = asyncio.get_event_loop()
response = await loop.run_in_executor(
executor,
lambda: requests.get('https://api.example.com/data')
)
return response.json()Better solution: Use an async HTTP library instead of running sync libraries in threads.
httpxwith async support oraiohttpare purpose-built for async code and don’t need thread pools.
Fix 2: Use Async Libraries Instead of Sync Ones
The best fix for blocking I/O is to use async-native libraries:
# SYNC (blocks event loop) → ASYNC alternative
# requests → httpx (async) or aiohttp
# psycopg2 (PostgreSQL) → asyncpg or psycopg3 (async)
# pymysql (MySQL) → aiomysql
# redis-py (sync) → redis.asyncio (included in redis-py v4+)
# pymongo (sync) → motor (async MongoDB)
# boto3 (sync) → aioboto3 or boto3 run_in_executor
# SQLAlchemy sync → SQLAlchemy async (1.4+) with asyncpg
# smtplib → aiosmtplib
# time.sleep() → await asyncio.sleep()HTTP requests with httpx:
import httpx
# WRONG — sync requests in async function
async def get_user_wrong(user_id: int):
response = requests.get(f'https://api.example.com/users/{user_id}')
return response.json()
# CORRECT — async httpx client
async def get_user(user_id: int):
async with httpx.AsyncClient() as client:
response = await client.get(f'https://api.example.com/users/{user_id}')
response.raise_for_status()
return response.json()
# Reuse client across requests (more efficient)
client = httpx.AsyncClient(timeout=30.0)
async def get_user_efficient(user_id: int):
response = await client.get(f'https://api.example.com/users/{user_id}')
return response.json()Database with SQLAlchemy async:
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
engine = create_async_engine('postgresql+asyncpg://user:pass@localhost/db')
AsyncSessionLocal = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
# WRONG — sync SQLAlchemy in async function
async def get_users_wrong():
db = SessionLocal() # Sync session
users = db.query(User).all() # Blocks event loop
return users
# CORRECT — async SQLAlchemy
async def get_users():
async with AsyncSessionLocal() as session:
result = await session.execute(select(User))
return result.scalars().all()Fix 3: Offload CPU-Bound Work to ProcessPoolExecutor
Threading doesn’t help for CPU-bound work (Python’s GIL limits parallelism). Use ProcessPoolExecutor for CPU-intensive tasks:
import asyncio
from concurrent.futures import ProcessPoolExecutor
# CPU-bound function — runs in a separate process (no GIL restriction)
def generate_pdf_sync(data: dict) -> bytes:
# Expensive CPU-bound work
return pdf_library.generate(data)
# Process pool — creates separate Python processes
process_pool = ProcessPoolExecutor(max_workers=4)
async def generate_report(data: dict) -> bytes:
loop = asyncio.get_event_loop()
# Run in a separate process — doesn't block the event loop
pdf_bytes = await loop.run_in_executor(process_pool, generate_pdf_sync, data)
return pdf_bytesFastAPI with CPU-bound tasks:
from fastapi import FastAPI, BackgroundTasks
from concurrent.futures import ProcessPoolExecutor
import asyncio
app = FastAPI()
executor = ProcessPoolExecutor(max_workers=4)
@app.post("/reports")
async def create_report(data: ReportData, background_tasks: BackgroundTasks):
# Don't block the request — generate report in background
background_tasks.add_task(generate_and_store_report, data)
return {"status": "Report generation started", "report_id": new_id}
async def generate_and_store_report(data: ReportData):
loop = asyncio.get_event_loop()
pdf = await loop.run_in_executor(executor, generate_pdf_sync, data.dict())
await store_report(pdf)Fix 4: Fix time.sleep() in Async Code
Replace all time.sleep() calls in coroutines with await asyncio.sleep():
import time
import asyncio
# WRONG — blocks the entire event loop
async def poll_status():
while True:
status = await check_status()
time.sleep(5) # All other coroutines paused for 5 seconds
# CORRECT — yields control back to event loop during sleep
async def poll_status():
while True:
status = await check_status()
await asyncio.sleep(5) # Other coroutines run during this wait
# WRONG in unit tests too
def test_async_function():
asyncio.run(async_function())
time.sleep(1) # Testing with real time — slow and unreliable
# Use pytest-asyncio and monkeypatching insteadFind all time.sleep() calls in your codebase:
grep -rn "time\.sleep\(" --include="*.py" .
# Review each one — any inside an async function is a bugFix 5: Detect Event Loop Blocking
Use asyncio’s debug mode and slow callback monitoring to find blocking calls:
import asyncio
import logging
# Enable asyncio debug mode
asyncio.set_event_loop_policy(asyncio.DefaultEventLoopPolicy())
async def main():
loop = asyncio.get_event_loop()
# Warn when a callback takes longer than 100ms (blocks the loop)
loop.slow_callback_duration = 0.1 # seconds
# Enable debug mode — logs blocked callbacks
loop.set_debug(True)
await your_application()
asyncio.run(main())Or set via environment variable:
PYTHONASYNCIODEBUG=1 python server.py
# Logs:
# Executing <Task finished name='Task-1' coro=<slow_task() done>
# took 0.523 seconds ← Blocked event loop for 523msUse aiomonitor for runtime profiling:
pip install aiomonitor
import asyncio
import aiomonitor
async def main():
# Starts an aiomonitor server (telnet localhost 50101)
async with aiomonitor.start_monitor(asyncio.get_event_loop()):
await your_app()Fix 6: Fix asyncio.run() Called Inside Running Loop
asyncio.run() creates a new event loop — calling it from inside a running loop raises RuntimeError:
# WRONG — asyncio.run() inside async code
async def outer():
result = asyncio.run(inner()) # RuntimeError: This event loop is already running
# WRONG — in Jupyter notebooks (the kernel runs its own event loop)
asyncio.run(some_coroutine()) # RuntimeError in Jupyter# CORRECT — use await inside async functions
async def outer():
result = await inner() # await the coroutine directly
# CORRECT — for calling async from sync code in a running loop (e.g., Django sync view)
import asyncio
def sync_function_needing_async():
# Get or create an event loop
try:
loop = asyncio.get_event_loop()
if loop.is_running():
# In a running loop — use run_coroutine_threadsafe
future = asyncio.run_coroutine_threadsafe(async_function(), loop)
return future.result(timeout=30)
else:
return loop.run_until_complete(async_function())
except RuntimeError:
# No current event loop — create one
return asyncio.run(async_function())
# CORRECT — in Jupyter notebooks, use await directly (Jupyter supports it)
result = await some_coroutine()
# Or install nest_asyncio for Jupyter
import nest_asyncio
nest_asyncio.apply()
asyncio.run(some_coroutine()) # Now works in JupyterFix 7: Async Database Sessions in FastAPI
A complete pattern for async SQLAlchemy with FastAPI:
from fastapi import FastAPI, Depends
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker
from sqlalchemy.orm import DeclarativeBase
from sqlalchemy import select
DATABASE_URL = "postgresql+asyncpg://user:password@localhost/mydb"
engine = create_async_engine(DATABASE_URL, echo=True, pool_size=20)
AsyncSessionLocal = async_sessionmaker(engine, expire_on_commit=False)
class Base(DeclarativeBase):
pass
app = FastAPI()
# Async database session dependency
async def get_db() -> AsyncSession:
async with AsyncSessionLocal() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
# Routes use async session — no event loop blocking
@app.get("/users")
async def list_users(db: AsyncSession = Depends(get_db)):
result = await db.execute(select(User))
users = result.scalars().all()
return users
@app.post("/users")
async def create_user(data: UserCreate, db: AsyncSession = Depends(get_db)):
user = User(**data.model_dump())
db.add(user)
await db.flush() # Get the generated ID without committing
return userStill Not Working?
Check if a third-party library is synchronous — many popular libraries (Stripe, Twilio SDKs, some ORMs) are synchronous. Check the library’s docs for async support or use asyncio.to_thread().
Async function doesn’t mean concurrent — declaring async def doesn’t make a function non-blocking. It only means the function can yield control at await points. A function with no await statements runs synchronously even if declared async.
Connection pool exhaustion — if every request opens a new async database connection, the pool fills up and new connections wait. Reuse connection pools across requests:
# WRONG — new engine per request
@app.get("/data")
async def get_data():
engine = create_async_engine(DATABASE_URL) # Don't create per request
...
# CORRECT — shared engine at module level
engine = create_async_engine(DATABASE_URL, pool_size=20, max_overflow=10)asyncio.gather() for concurrent tasks — if you need to make multiple independent async calls, run them concurrently:
# Sequential — each await waits for the previous
user = await get_user(user_id)
orders = await get_orders(user_id)
permissions = await get_permissions(user_id)
# Total time: sum of all three
# Concurrent — all three run in parallel
user, orders, permissions = await asyncio.gather(
get_user(user_id),
get_orders(user_id),
get_permissions(user_id),
)
# Total time: max of the threeFor related issues, see Fix: FastAPI Dependency Injection Error and Fix: Celery Task Not Executing.
Solo developer based in Japan. Every solution is cross-referenced with official documentation and tested before publishing.
Was this article helpful?
Related Articles
Fix: FastAPI BackgroundTasks Not Working — Task Not Running or Dependency Errors
How to fix FastAPI BackgroundTasks — task not executing, dependency injection in tasks, error handling, Celery for heavy tasks, and lifespan-managed background workers.
Fix: Python asyncio.gather Not Handling Errors — Exceptions Swallowed or All Tasks Cancelled
How to fix asyncio.gather error handling — return_exceptions parameter, partial failures, task cancellation propagation, TaskGroup alternatives, and exception isolation patterns.
Fix: Pydantic ValidationError — Field Required, Value Not a Valid Type, or Extra Fields
How to fix Pydantic v2 validation errors — required fields, type coercion, model_validator, custom validators, extra fields config, and migrating from Pydantic v1.
Fix: FastAPI Dependency Injection Errors — Dependencies Not Working
How to fix FastAPI dependency injection errors — async dependencies, database sessions, sub-dependencies, dependency overrides in tests, and common DI mistakes.