Skip to content

Fix: Python asyncio.gather Not Handling Errors — Exceptions Swallowed or All Tasks Cancelled

FixDevs ·

Quick Answer

How to fix asyncio.gather error handling — return_exceptions parameter, partial failures, task cancellation propagation, TaskGroup alternatives, and exception isolation patterns.

The Problem

asyncio.gather() cancels all tasks when one fails:

import asyncio

async def fetch_user(user_id: int):
    if user_id == 2:
        raise ValueError(f"User {user_id} not found")
    return {"id": user_id, "name": f"User {user_id}"}

async def main():
    results = await asyncio.gather(
        fetch_user(1),
        fetch_user(2),   # Raises ValueError
        fetch_user(3),
    )
    # All tasks are cancelled — only the ValueError propagates
    # fetch_user(1) and fetch_user(3) results are lost

Or exceptions are silently ignored with return_exceptions=True but you don’t check the results:

results = await asyncio.gather(
    fetch_user(1),
    fetch_user(2),   # Raises ValueError
    fetch_user(3),
    return_exceptions=True
)

# results = [{"id": 1, ...}, ValueError("User 2 not found"), {"id": 3, ...}]
for result in results:
    process(result)   # process() called with a ValueError object — unexpected behavior

Or tasks started with asyncio.gather() keep running after an exception:

# With return_exceptions=True — all tasks complete even on failure
# Without it — one exception cancels remaining pending tasks
# The behavior surprises developers either way

Why This Happens

asyncio.gather() has two distinct behaviors controlled by return_exceptions:

  • return_exceptions=False (default) — the first exception immediately propagates to the gather() call. The other tasks are NOT automatically cancelled — they continue running in the background but their results are discarded. The exception from the failed task is raised.
  • return_exceptions=True — all tasks run to completion regardless of exceptions. Results (including exceptions as values) are returned in a list in the same order as the input tasks. No automatic cancellation.

Common errors:

  • Not checking whether results are exceptions when using return_exceptions=True
  • Assuming other tasks stop when one fails with return_exceptions=False — they don’t
  • Using asyncio.gather() when asyncio.TaskGroup (Python 3.11+) would be safer and clearer

Fix 1: Use return_exceptions=True and Filter Results

Handle mixed success/failure results correctly:

import asyncio
from typing import TypeVar, Union

T = TypeVar('T')

async def fetch_user(user_id: int) -> dict:
    if user_id == 2:
        raise ValueError(f"User {user_id} not found")
    await asyncio.sleep(0.1)   # Simulate I/O
    return {"id": user_id, "name": f"User {user_id}"}

async def main():
    user_ids = [1, 2, 3, 4]

    results = await asyncio.gather(
        *[fetch_user(uid) for uid in user_ids],
        return_exceptions=True
    )

    # Separate successes from failures
    successes = []
    failures = []

    for user_id, result in zip(user_ids, results):
        if isinstance(result, Exception):
            failures.append((user_id, result))
            print(f"Failed to fetch user {user_id}: {result}")
        else:
            successes.append(result)

    print(f"Fetched {len(successes)} users, {len(failures)} failures")
    return successes

asyncio.run(main())

Generic helper for gather with error handling:

async def gather_with_errors(*coros, logger=None):
    """
    Run coroutines concurrently. Returns (results, errors) tuple.
    results: list of successful return values
    errors: list of (index, exception) tuples
    """
    raw_results = await asyncio.gather(*coros, return_exceptions=True)

    results = []
    errors = []

    for i, result in enumerate(raw_results):
        if isinstance(result, BaseException):
            errors.append((i, result))
            if logger:
                logger.error(f"Task {i} failed: {result}")
        else:
            results.append(result)

    return results, errors

# Usage
async def main():
    results, errors = await gather_with_errors(
        fetch_user(1),
        fetch_user(2),
        fetch_user(3),
    )
    # results = [{"id": 1}, {"id": 3}]
    # errors = [(1, ValueError("User 2 not found"))]

Fix 2: Cancel Remaining Tasks on First Failure

With return_exceptions=False, other tasks continue silently. Cancel them explicitly:

import asyncio

async def gather_cancel_on_first_error(*coros):
    """
    Like gather(), but cancels all remaining tasks when one fails.
    Returns results or raises the first exception.
    """
    tasks = [asyncio.create_task(coro) for coro in coros]

    try:
        return await asyncio.gather(*tasks)
    except Exception:
        # Cancel all remaining tasks
        for task in tasks:
            if not task.done():
                task.cancel()

        # Wait for cancellations to complete
        await asyncio.gather(*tasks, return_exceptions=True)
        raise   # Re-raise the original exception

async def main():
    try:
        results = await gather_cancel_on_first_error(
            fetch_data(1),
            fetch_data(2),   # Fails
            fetch_data(3),
        )
    except ValueError as e:
        print(f"One task failed: {e}")
        print("All other tasks were cancelled")

Fix 3: Use asyncio.TaskGroup (Python 3.11+)

asyncio.TaskGroup is the modern replacement for many gather() patterns. It guarantees all tasks are cancelled when any task fails:

import asyncio

async def main():
    results = []

    try:
        async with asyncio.TaskGroup() as tg:
            # All tasks start concurrently
            task1 = tg.create_task(fetch_user(1))
            task2 = tg.create_task(fetch_user(2))   # Will fail
            task3 = tg.create_task(fetch_user(3))

        # This line only reached if ALL tasks succeed
        # tg waits for all tasks; if any fails, remaining are cancelled
        results = [task1.result(), task2.result(), task3.result()]

    except* ValueError as eg:
        # Python 3.11+ ExceptionGroup — collect all failures
        for exc in eg.exceptions:
            print(f"Task failed: {exc}")

    return results

TaskGroup vs gather() key differences:

Featureasyncio.gather()asyncio.TaskGroup
Cancel others on failureNo (with return_exceptions=False)Yes — always
Return mixed resultsYes (with return_exceptions=True)No — raises ExceptionGroup
Exception typeSingle exception or listExceptionGroup
Python version3.7+3.11+
Task trackingMust save tasks manuallytg.create_task() returns Task

Fix 4: Set Timeouts on Concurrent Tasks

Individual tasks should have timeouts to prevent one slow task from blocking the group:

import asyncio

async def fetch_with_timeout(coro, timeout: float):
    """Wrap a coroutine with a timeout."""
    try:
        return await asyncio.wait_for(coro, timeout=timeout)
    except asyncio.TimeoutError:
        raise asyncio.TimeoutError(f"Task timed out after {timeout}s")

async def main():
    results = await asyncio.gather(
        fetch_with_timeout(fetch_user(1), timeout=5.0),
        fetch_with_timeout(fetch_user(2), timeout=5.0),
        fetch_with_timeout(fetch_user(3), timeout=5.0),
        return_exceptions=True,
    )

    for i, result in enumerate(results):
        if isinstance(result, asyncio.TimeoutError):
            print(f"Task {i} timed out")
        elif isinstance(result, Exception):
            print(f"Task {i} failed: {result}")
        else:
            print(f"Task {i} succeeded: {result}")

Global timeout with asyncio.wait_for:

async def main():
    try:
        # All tasks must complete within 10 seconds
        results = await asyncio.wait_for(
            asyncio.gather(
                fetch_user(1),
                fetch_user(2),
                fetch_user(3),
                return_exceptions=True,
            ),
            timeout=10.0
        )
    except asyncio.TimeoutError:
        print("Overall operation timed out")

Fix 5: asyncio.wait for More Control

asyncio.wait() gives more control than gather() — process tasks as they complete:

import asyncio

async def main():
    tasks = {
        asyncio.create_task(fetch_user(uid), name=f"fetch-{uid}")
        for uid in [1, 2, 3, 4, 5]
    }

    # Process tasks as they complete (not in original order)
    done, pending = await asyncio.wait(
        tasks,
        return_when=asyncio.FIRST_EXCEPTION,   # Stop after first error
        # Options: FIRST_EXCEPTION, FIRST_COMPLETED, ALL_COMPLETED
    )

    # Process completed tasks
    for task in done:
        if task.exception():
            print(f"Task {task.get_name()} failed: {task.exception()}")
        else:
            print(f"Task {task.get_name()} result: {task.result()}")

    # Cancel remaining tasks
    for task in pending:
        task.cancel()

    # Wait for cancellations
    if pending:
        await asyncio.wait(pending)

Process results as they arrive:

async def main():
    tasks = [
        asyncio.create_task(fetch_user(uid))
        for uid in range(1, 11)  # 10 users
    ]

    # Process each task as it completes
    for coro in asyncio.as_completed(tasks):
        try:
            result = await coro
            print(f"Got result: {result}")
        except Exception as e:
            print(f"Task failed: {e}")
            # Other tasks continue running

Fix 6: Common gather() Patterns

Batch processing — limit concurrent tasks:

import asyncio

async def process_in_batches(items, batch_size: int, processor):
    """Process items in batches to limit concurrency."""
    results = []

    for i in range(0, len(items), batch_size):
        batch = items[i:i + batch_size]
        batch_results = await asyncio.gather(
            *[processor(item) for item in batch],
            return_exceptions=True,
        )
        results.extend(batch_results)

    return results

# Or use a semaphore for fine-grained control
async def limited_gather(coros, max_concurrent: int):
    semaphore = asyncio.Semaphore(max_concurrent)

    async def with_semaphore(coro):
        async with semaphore:
            return await coro

    return await asyncio.gather(
        *[with_semaphore(coro) for coro in coros],
        return_exceptions=True,
    )

# Usage — max 5 concurrent HTTP requests
results = await limited_gather(
    [fetch_url(url) for url in urls],
    max_concurrent=5,
)

Retry individual failed tasks:

import asyncio
from typing import TypeVar, Callable, Awaitable

T = TypeVar('T')

async def retry(
    coro_factory: Callable[[], Awaitable[T]],
    retries: int = 3,
    delay: float = 1.0,
) -> T:
    last_error = None
    for attempt in range(retries):
        try:
            return await coro_factory()
        except asyncio.CancelledError:
            raise   # Never retry cancellation
        except Exception as e:
            last_error = e
            if attempt < retries - 1:
                await asyncio.sleep(delay * (2 ** attempt))   # Exponential backoff
    raise last_error

# Usage with gather
results = await asyncio.gather(
    retry(lambda: fetch_user(1)),
    retry(lambda: fetch_user(2), retries=5),
    retry(lambda: fetch_user(3)),
    return_exceptions=True,
)

Fix 7: Debug asyncio.gather Issues

Identify which tasks failed and why:

import asyncio
import traceback

async def debug_gather(*coros):
    """gather() with detailed error reporting."""
    tasks = [asyncio.create_task(coro) for coro in coros]

    # Add names to tasks for easier debugging
    for i, task in enumerate(tasks):
        task.set_name(f"task-{i}")

    results = await asyncio.gather(*tasks, return_exceptions=True)

    for task, result in zip(tasks, results):
        if isinstance(result, Exception):
            print(f"\nTask '{task.get_name()}' FAILED:")
            traceback.print_exception(type(result), result, result.__traceback__)
        else:
            print(f"Task '{task.get_name()}' succeeded: {result}")

    return results

# Enable asyncio debug mode for more verbose output
asyncio.run(debug_gather(
    fetch_user(1),
    fetch_user(2),
    fetch_user(3),
), debug=True)

Asyncio debug mode catches common mistakes:

# Enable via environment variable
PYTHONASYNCIODEBUG=1 python main.py

# Or in code
import asyncio
asyncio.get_event_loop().set_debug(True)

# Debug mode warns about:
# - Coroutines that were never awaited
# - Slow callbacks (>100ms blocking the event loop)
# - Misuse of thread-unsafe operations

Still Not Working?

BaseException vs Exceptionasyncio.CancelledError is a BaseException, not Exception in Python 3.8+. Using isinstance(result, Exception) to check for failures won’t catch CancelledError. Use isinstance(result, BaseException) or check for CancelledError separately.

Tasks created before gather — tasks created with asyncio.create_task() start immediately, even before gather() is called. If you create_task() and then never await the result (and never call gather()), the task runs independently and exceptions are silently logged as “unhandled exception in task.”

return_exceptions and exception chaining — with return_exceptions=True, exceptions lose their original context (chained exceptions). If you need the full exception chain, use return_exceptions=False with try/except.

For related Python issues, see Fix: Python asyncio Blocking the Event Loop and Fix: Python Decorator Not Working.

F

FixDevs

Solo developer based in Japan. Every solution is cross-referenced with official documentation and tested before publishing.

Was this article helpful?

Related Articles