Skip to content

Fix: Python asyncio Blocking the Event Loop — Mixing Sync and Async Code

FixDevs ·

Quick Answer

How to fix Python asyncio event loop blocking — using run_in_executor for sync calls, asyncio.to_thread, avoiding blocking I/O in coroutines, and detecting event loop stalls.

The Problem

An async Python application becomes unresponsive under load:

async def handle_request(request):
    # This blocks the entire event loop for every request
    data = requests.get('https://api.example.com/data')  # Sync HTTP call
    return process(data.json())

Or an async route is slow despite using async def:

@app.get("/report")
async def generate_report():
    # Looks async, but calls a blocking CPU-bound function
    report = generate_pdf(data)  # Synchronous — blocks all other requests
    return {"report": report}

Or asyncio.run() called inside an already-running event loop:

RuntimeError: This event loop is already running.

Or mixing time.sleep() with async code:

async def poll():
    while True:
        await check_status()
        time.sleep(5)  # Blocks the event loop — no other coroutines run during sleep

Why This Happens

Python’s asyncio runs coroutines on a single-threaded event loop. The event loop can only do one thing at a time — it switches between coroutines at await points. Any synchronous (blocking) code that runs inside a coroutine holds the event loop hostage for its entire duration:

  • Blocking I/Orequests.get(), open() with standard file I/O, psycopg2 queries run synchronously. The event loop can’t switch to other coroutines while waiting.
  • CPU-bound operations — image processing, PDF generation, data transformation. Python’s GIL means only one thread runs Python code at a time — CPU-bound work in a coroutine blocks all others.
  • time.sleep() — blocks the thread, unlike await asyncio.sleep() which yields control back to the event loop.
  • Synchronous ORM calls — SQLAlchemy’s standard session is synchronous. Using it in an async route blocks the event loop.
  • asyncio.run() inside async codeasyncio.run() creates a new event loop. Calling it inside an already-running loop raises RuntimeError.

Fix 1: Use asyncio.to_thread for Blocking I/O

asyncio.to_thread() (Python 3.9+) runs a synchronous function in a separate thread, freeing the event loop:

import asyncio
import requests  # Synchronous HTTP library

# WRONG — blocks the event loop
async def fetch_data_wrong():
    response = requests.get('https://api.example.com/data')
    return response.json()

# CORRECT — run in a thread pool
async def fetch_data():
    response = await asyncio.to_thread(requests.get, 'https://api.example.com/data')
    return response.json()

# With keyword arguments
async def fetch_with_params():
    response = await asyncio.to_thread(
        requests.get,
        'https://api.example.com/data',
        timeout=30,
        headers={'Authorization': 'Bearer token'}
    )
    return response.json()

Python 3.8 and earlier — use loop.run_in_executor():

import asyncio
from concurrent.futures import ThreadPoolExecutor

executor = ThreadPoolExecutor(max_workers=10)

async def fetch_data():
    loop = asyncio.get_event_loop()
    response = await loop.run_in_executor(
        executor,
        lambda: requests.get('https://api.example.com/data')
    )
    return response.json()

Better solution: Use an async HTTP library instead of running sync libraries in threads. httpx with async support or aiohttp are purpose-built for async code and don’t need thread pools.

Fix 2: Use Async Libraries Instead of Sync Ones

The best fix for blocking I/O is to use async-native libraries:

# SYNC (blocks event loop)       → ASYNC alternative
# requests                       → httpx (async) or aiohttp
# psycopg2 (PostgreSQL)          → asyncpg or psycopg3 (async)
# pymysql (MySQL)                → aiomysql
# redis-py (sync)                → redis.asyncio (included in redis-py v4+)
# pymongo (sync)                 → motor (async MongoDB)
# boto3 (sync)                   → aioboto3 or boto3 run_in_executor
# SQLAlchemy sync                → SQLAlchemy async (1.4+) with asyncpg
# smtplib                        → aiosmtplib
# time.sleep()                   → await asyncio.sleep()

HTTP requests with httpx:

import httpx

# WRONG — sync requests in async function
async def get_user_wrong(user_id: int):
    response = requests.get(f'https://api.example.com/users/{user_id}')
    return response.json()

# CORRECT — async httpx client
async def get_user(user_id: int):
    async with httpx.AsyncClient() as client:
        response = await client.get(f'https://api.example.com/users/{user_id}')
        response.raise_for_status()
        return response.json()

# Reuse client across requests (more efficient)
client = httpx.AsyncClient(timeout=30.0)

async def get_user_efficient(user_id: int):
    response = await client.get(f'https://api.example.com/users/{user_id}')
    return response.json()

Database with SQLAlchemy async:

from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker

engine = create_async_engine('postgresql+asyncpg://user:pass@localhost/db')
AsyncSessionLocal = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)

# WRONG — sync SQLAlchemy in async function
async def get_users_wrong():
    db = SessionLocal()          # Sync session
    users = db.query(User).all() # Blocks event loop
    return users

# CORRECT — async SQLAlchemy
async def get_users():
    async with AsyncSessionLocal() as session:
        result = await session.execute(select(User))
        return result.scalars().all()

Fix 3: Offload CPU-Bound Work to ProcessPoolExecutor

Threading doesn’t help for CPU-bound work (Python’s GIL limits parallelism). Use ProcessPoolExecutor for CPU-intensive tasks:

import asyncio
from concurrent.futures import ProcessPoolExecutor

# CPU-bound function — runs in a separate process (no GIL restriction)
def generate_pdf_sync(data: dict) -> bytes:
    # Expensive CPU-bound work
    return pdf_library.generate(data)

# Process pool — creates separate Python processes
process_pool = ProcessPoolExecutor(max_workers=4)

async def generate_report(data: dict) -> bytes:
    loop = asyncio.get_event_loop()
    # Run in a separate process — doesn't block the event loop
    pdf_bytes = await loop.run_in_executor(process_pool, generate_pdf_sync, data)
    return pdf_bytes

FastAPI with CPU-bound tasks:

from fastapi import FastAPI, BackgroundTasks
from concurrent.futures import ProcessPoolExecutor
import asyncio

app = FastAPI()
executor = ProcessPoolExecutor(max_workers=4)

@app.post("/reports")
async def create_report(data: ReportData, background_tasks: BackgroundTasks):
    # Don't block the request — generate report in background
    background_tasks.add_task(generate_and_store_report, data)
    return {"status": "Report generation started", "report_id": new_id}

async def generate_and_store_report(data: ReportData):
    loop = asyncio.get_event_loop()
    pdf = await loop.run_in_executor(executor, generate_pdf_sync, data.dict())
    await store_report(pdf)

Fix 4: Fix time.sleep() in Async Code

Replace all time.sleep() calls in coroutines with await asyncio.sleep():

import time
import asyncio

# WRONG — blocks the entire event loop
async def poll_status():
    while True:
        status = await check_status()
        time.sleep(5)  # All other coroutines paused for 5 seconds

# CORRECT — yields control back to event loop during sleep
async def poll_status():
    while True:
        status = await check_status()
        await asyncio.sleep(5)  # Other coroutines run during this wait

# WRONG in unit tests too
def test_async_function():
    asyncio.run(async_function())
    time.sleep(1)   # Testing with real time — slow and unreliable
    # Use pytest-asyncio and monkeypatching instead

Find all time.sleep() calls in your codebase:

grep -rn "time\.sleep\(" --include="*.py" .
# Review each one — any inside an async function is a bug

Fix 5: Detect Event Loop Blocking

Use asyncio’s debug mode and slow callback monitoring to find blocking calls:

import asyncio
import logging

# Enable asyncio debug mode
asyncio.set_event_loop_policy(asyncio.DefaultEventLoopPolicy())

async def main():
    loop = asyncio.get_event_loop()

    # Warn when a callback takes longer than 100ms (blocks the loop)
    loop.slow_callback_duration = 0.1  # seconds

    # Enable debug mode — logs blocked callbacks
    loop.set_debug(True)

    await your_application()

asyncio.run(main())

Or set via environment variable:

PYTHONASYNCIODEBUG=1 python server.py
# Logs:
# Executing <Task finished name='Task-1' coro=<slow_task() done>
# took 0.523 seconds  ← Blocked event loop for 523ms

Use aiomonitor for runtime profiling:

pip install aiomonitor

import asyncio
import aiomonitor

async def main():
    # Starts an aiomonitor server (telnet localhost 50101)
    async with aiomonitor.start_monitor(asyncio.get_event_loop()):
        await your_app()

Fix 6: Fix asyncio.run() Called Inside Running Loop

asyncio.run() creates a new event loop — calling it from inside a running loop raises RuntimeError:

# WRONG — asyncio.run() inside async code
async def outer():
    result = asyncio.run(inner())  # RuntimeError: This event loop is already running

# WRONG — in Jupyter notebooks (the kernel runs its own event loop)
asyncio.run(some_coroutine())  # RuntimeError in Jupyter
# CORRECT — use await inside async functions
async def outer():
    result = await inner()  # await the coroutine directly

# CORRECT — for calling async from sync code in a running loop (e.g., Django sync view)
import asyncio

def sync_function_needing_async():
    # Get or create an event loop
    try:
        loop = asyncio.get_event_loop()
        if loop.is_running():
            # In a running loop — use run_coroutine_threadsafe
            future = asyncio.run_coroutine_threadsafe(async_function(), loop)
            return future.result(timeout=30)
        else:
            return loop.run_until_complete(async_function())
    except RuntimeError:
        # No current event loop — create one
        return asyncio.run(async_function())

# CORRECT — in Jupyter notebooks, use await directly (Jupyter supports it)
result = await some_coroutine()

# Or install nest_asyncio for Jupyter
import nest_asyncio
nest_asyncio.apply()
asyncio.run(some_coroutine())  # Now works in Jupyter

Fix 7: Async Database Sessions in FastAPI

A complete pattern for async SQLAlchemy with FastAPI:

from fastapi import FastAPI, Depends
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker
from sqlalchemy.orm import DeclarativeBase
from sqlalchemy import select

DATABASE_URL = "postgresql+asyncpg://user:password@localhost/mydb"

engine = create_async_engine(DATABASE_URL, echo=True, pool_size=20)
AsyncSessionLocal = async_sessionmaker(engine, expire_on_commit=False)

class Base(DeclarativeBase):
    pass

app = FastAPI()

# Async database session dependency
async def get_db() -> AsyncSession:
    async with AsyncSessionLocal() as session:
        try:
            yield session
            await session.commit()
        except Exception:
            await session.rollback()
            raise

# Routes use async session — no event loop blocking
@app.get("/users")
async def list_users(db: AsyncSession = Depends(get_db)):
    result = await db.execute(select(User))
    users = result.scalars().all()
    return users

@app.post("/users")
async def create_user(data: UserCreate, db: AsyncSession = Depends(get_db)):
    user = User(**data.model_dump())
    db.add(user)
    await db.flush()  # Get the generated ID without committing
    return user

Still Not Working?

Check if a third-party library is synchronous — many popular libraries (Stripe, Twilio SDKs, some ORMs) are synchronous. Check the library’s docs for async support or use asyncio.to_thread().

Async function doesn’t mean concurrent — declaring async def doesn’t make a function non-blocking. It only means the function can yield control at await points. A function with no await statements runs synchronously even if declared async.

Connection pool exhaustion — if every request opens a new async database connection, the pool fills up and new connections wait. Reuse connection pools across requests:

# WRONG — new engine per request
@app.get("/data")
async def get_data():
    engine = create_async_engine(DATABASE_URL)  # Don't create per request
    ...

# CORRECT — shared engine at module level
engine = create_async_engine(DATABASE_URL, pool_size=20, max_overflow=10)

asyncio.gather() for concurrent tasks — if you need to make multiple independent async calls, run them concurrently:

# Sequential — each await waits for the previous
user = await get_user(user_id)
orders = await get_orders(user_id)
permissions = await get_permissions(user_id)
# Total time: sum of all three

# Concurrent — all three run in parallel
user, orders, permissions = await asyncio.gather(
    get_user(user_id),
    get_orders(user_id),
    get_permissions(user_id),
)
# Total time: max of the three

For related issues, see Fix: FastAPI Dependency Injection Error and Fix: Celery Task Not Executing.

F

FixDevs

Solo developer based in Japan. Every solution is cross-referenced with official documentation and tested before publishing.

Was this article helpful?

Related Articles