Fix: Python asyncio.gather Not Handling Errors — Exceptions Swallowed or All Tasks Cancelled
Quick Answer
How to fix asyncio.gather error handling — return_exceptions parameter, partial failures, task cancellation propagation, TaskGroup alternatives, and exception isolation patterns.
The Problem
asyncio.gather() cancels all tasks when one fails:
import asyncio
async def fetch_user(user_id: int):
if user_id == 2:
raise ValueError(f"User {user_id} not found")
return {"id": user_id, "name": f"User {user_id}"}
async def main():
results = await asyncio.gather(
fetch_user(1),
fetch_user(2), # Raises ValueError
fetch_user(3),
)
# All tasks are cancelled — only the ValueError propagates
# fetch_user(1) and fetch_user(3) results are lostOr exceptions are silently ignored with return_exceptions=True but you don’t check the results:
results = await asyncio.gather(
fetch_user(1),
fetch_user(2), # Raises ValueError
fetch_user(3),
return_exceptions=True
)
# results = [{"id": 1, ...}, ValueError("User 2 not found"), {"id": 3, ...}]
for result in results:
process(result) # process() called with a ValueError object — unexpected behaviorOr tasks started with asyncio.gather() keep running after an exception:
# With return_exceptions=True — all tasks complete even on failure
# Without it — one exception cancels remaining pending tasks
# The behavior surprises developers either wayWhy This Happens
asyncio.gather() has two distinct behaviors controlled by return_exceptions:
return_exceptions=False(default) — the first exception immediately propagates to thegather()call. The other tasks are NOT automatically cancelled — they continue running in the background but their results are discarded. The exception from the failed task is raised.return_exceptions=True— all tasks run to completion regardless of exceptions. Results (including exceptions as values) are returned in a list in the same order as the input tasks. No automatic cancellation.
Common errors:
- Not checking whether results are exceptions when using
return_exceptions=True - Assuming other tasks stop when one fails with
return_exceptions=False— they don’t - Using
asyncio.gather()whenasyncio.TaskGroup(Python 3.11+) would be safer and clearer
Fix 1: Use return_exceptions=True and Filter Results
Handle mixed success/failure results correctly:
import asyncio
from typing import TypeVar, Union
T = TypeVar('T')
async def fetch_user(user_id: int) -> dict:
if user_id == 2:
raise ValueError(f"User {user_id} not found")
await asyncio.sleep(0.1) # Simulate I/O
return {"id": user_id, "name": f"User {user_id}"}
async def main():
user_ids = [1, 2, 3, 4]
results = await asyncio.gather(
*[fetch_user(uid) for uid in user_ids],
return_exceptions=True
)
# Separate successes from failures
successes = []
failures = []
for user_id, result in zip(user_ids, results):
if isinstance(result, Exception):
failures.append((user_id, result))
print(f"Failed to fetch user {user_id}: {result}")
else:
successes.append(result)
print(f"Fetched {len(successes)} users, {len(failures)} failures")
return successes
asyncio.run(main())Generic helper for gather with error handling:
async def gather_with_errors(*coros, logger=None):
"""
Run coroutines concurrently. Returns (results, errors) tuple.
results: list of successful return values
errors: list of (index, exception) tuples
"""
raw_results = await asyncio.gather(*coros, return_exceptions=True)
results = []
errors = []
for i, result in enumerate(raw_results):
if isinstance(result, BaseException):
errors.append((i, result))
if logger:
logger.error(f"Task {i} failed: {result}")
else:
results.append(result)
return results, errors
# Usage
async def main():
results, errors = await gather_with_errors(
fetch_user(1),
fetch_user(2),
fetch_user(3),
)
# results = [{"id": 1}, {"id": 3}]
# errors = [(1, ValueError("User 2 not found"))]Fix 2: Cancel Remaining Tasks on First Failure
With return_exceptions=False, other tasks continue silently. Cancel them explicitly:
import asyncio
async def gather_cancel_on_first_error(*coros):
"""
Like gather(), but cancels all remaining tasks when one fails.
Returns results or raises the first exception.
"""
tasks = [asyncio.create_task(coro) for coro in coros]
try:
return await asyncio.gather(*tasks)
except Exception:
# Cancel all remaining tasks
for task in tasks:
if not task.done():
task.cancel()
# Wait for cancellations to complete
await asyncio.gather(*tasks, return_exceptions=True)
raise # Re-raise the original exception
async def main():
try:
results = await gather_cancel_on_first_error(
fetch_data(1),
fetch_data(2), # Fails
fetch_data(3),
)
except ValueError as e:
print(f"One task failed: {e}")
print("All other tasks were cancelled")Fix 3: Use asyncio.TaskGroup (Python 3.11+)
asyncio.TaskGroup is the modern replacement for many gather() patterns. It guarantees all tasks are cancelled when any task fails:
import asyncio
async def main():
results = []
try:
async with asyncio.TaskGroup() as tg:
# All tasks start concurrently
task1 = tg.create_task(fetch_user(1))
task2 = tg.create_task(fetch_user(2)) # Will fail
task3 = tg.create_task(fetch_user(3))
# This line only reached if ALL tasks succeed
# tg waits for all tasks; if any fails, remaining are cancelled
results = [task1.result(), task2.result(), task3.result()]
except* ValueError as eg:
# Python 3.11+ ExceptionGroup — collect all failures
for exc in eg.exceptions:
print(f"Task failed: {exc}")
return resultsTaskGroup vs gather() key differences:
| Feature | asyncio.gather() | asyncio.TaskGroup |
|---|---|---|
| Cancel others on failure | No (with return_exceptions=False) | Yes — always |
| Return mixed results | Yes (with return_exceptions=True) | No — raises ExceptionGroup |
| Exception type | Single exception or list | ExceptionGroup |
| Python version | 3.7+ | 3.11+ |
| Task tracking | Must save tasks manually | tg.create_task() returns Task |
Fix 4: Set Timeouts on Concurrent Tasks
Individual tasks should have timeouts to prevent one slow task from blocking the group:
import asyncio
async def fetch_with_timeout(coro, timeout: float):
"""Wrap a coroutine with a timeout."""
try:
return await asyncio.wait_for(coro, timeout=timeout)
except asyncio.TimeoutError:
raise asyncio.TimeoutError(f"Task timed out after {timeout}s")
async def main():
results = await asyncio.gather(
fetch_with_timeout(fetch_user(1), timeout=5.0),
fetch_with_timeout(fetch_user(2), timeout=5.0),
fetch_with_timeout(fetch_user(3), timeout=5.0),
return_exceptions=True,
)
for i, result in enumerate(results):
if isinstance(result, asyncio.TimeoutError):
print(f"Task {i} timed out")
elif isinstance(result, Exception):
print(f"Task {i} failed: {result}")
else:
print(f"Task {i} succeeded: {result}")Global timeout with asyncio.wait_for:
async def main():
try:
# All tasks must complete within 10 seconds
results = await asyncio.wait_for(
asyncio.gather(
fetch_user(1),
fetch_user(2),
fetch_user(3),
return_exceptions=True,
),
timeout=10.0
)
except asyncio.TimeoutError:
print("Overall operation timed out")Fix 5: asyncio.wait for More Control
asyncio.wait() gives more control than gather() — process tasks as they complete:
import asyncio
async def main():
tasks = {
asyncio.create_task(fetch_user(uid), name=f"fetch-{uid}")
for uid in [1, 2, 3, 4, 5]
}
# Process tasks as they complete (not in original order)
done, pending = await asyncio.wait(
tasks,
return_when=asyncio.FIRST_EXCEPTION, # Stop after first error
# Options: FIRST_EXCEPTION, FIRST_COMPLETED, ALL_COMPLETED
)
# Process completed tasks
for task in done:
if task.exception():
print(f"Task {task.get_name()} failed: {task.exception()}")
else:
print(f"Task {task.get_name()} result: {task.result()}")
# Cancel remaining tasks
for task in pending:
task.cancel()
# Wait for cancellations
if pending:
await asyncio.wait(pending)Process results as they arrive:
async def main():
tasks = [
asyncio.create_task(fetch_user(uid))
for uid in range(1, 11) # 10 users
]
# Process each task as it completes
for coro in asyncio.as_completed(tasks):
try:
result = await coro
print(f"Got result: {result}")
except Exception as e:
print(f"Task failed: {e}")
# Other tasks continue runningFix 6: Common gather() Patterns
Batch processing — limit concurrent tasks:
import asyncio
async def process_in_batches(items, batch_size: int, processor):
"""Process items in batches to limit concurrency."""
results = []
for i in range(0, len(items), batch_size):
batch = items[i:i + batch_size]
batch_results = await asyncio.gather(
*[processor(item) for item in batch],
return_exceptions=True,
)
results.extend(batch_results)
return results
# Or use a semaphore for fine-grained control
async def limited_gather(coros, max_concurrent: int):
semaphore = asyncio.Semaphore(max_concurrent)
async def with_semaphore(coro):
async with semaphore:
return await coro
return await asyncio.gather(
*[with_semaphore(coro) for coro in coros],
return_exceptions=True,
)
# Usage — max 5 concurrent HTTP requests
results = await limited_gather(
[fetch_url(url) for url in urls],
max_concurrent=5,
)Retry individual failed tasks:
import asyncio
from typing import TypeVar, Callable, Awaitable
T = TypeVar('T')
async def retry(
coro_factory: Callable[[], Awaitable[T]],
retries: int = 3,
delay: float = 1.0,
) -> T:
last_error = None
for attempt in range(retries):
try:
return await coro_factory()
except asyncio.CancelledError:
raise # Never retry cancellation
except Exception as e:
last_error = e
if attempt < retries - 1:
await asyncio.sleep(delay * (2 ** attempt)) # Exponential backoff
raise last_error
# Usage with gather
results = await asyncio.gather(
retry(lambda: fetch_user(1)),
retry(lambda: fetch_user(2), retries=5),
retry(lambda: fetch_user(3)),
return_exceptions=True,
)Fix 7: Debug asyncio.gather Issues
Identify which tasks failed and why:
import asyncio
import traceback
async def debug_gather(*coros):
"""gather() with detailed error reporting."""
tasks = [asyncio.create_task(coro) for coro in coros]
# Add names to tasks for easier debugging
for i, task in enumerate(tasks):
task.set_name(f"task-{i}")
results = await asyncio.gather(*tasks, return_exceptions=True)
for task, result in zip(tasks, results):
if isinstance(result, Exception):
print(f"\nTask '{task.get_name()}' FAILED:")
traceback.print_exception(type(result), result, result.__traceback__)
else:
print(f"Task '{task.get_name()}' succeeded: {result}")
return results
# Enable asyncio debug mode for more verbose output
asyncio.run(debug_gather(
fetch_user(1),
fetch_user(2),
fetch_user(3),
), debug=True)Asyncio debug mode catches common mistakes:
# Enable via environment variable
PYTHONASYNCIODEBUG=1 python main.py
# Or in code
import asyncio
asyncio.get_event_loop().set_debug(True)
# Debug mode warns about:
# - Coroutines that were never awaited
# - Slow callbacks (>100ms blocking the event loop)
# - Misuse of thread-unsafe operationsStill Not Working?
BaseException vs Exception — asyncio.CancelledError is a BaseException, not Exception in Python 3.8+. Using isinstance(result, Exception) to check for failures won’t catch CancelledError. Use isinstance(result, BaseException) or check for CancelledError separately.
Tasks created before gather — tasks created with asyncio.create_task() start immediately, even before gather() is called. If you create_task() and then never await the result (and never call gather()), the task runs independently and exceptions are silently logged as “unhandled exception in task.”
return_exceptions and exception chaining — with return_exceptions=True, exceptions lose their original context (chained exceptions). If you need the full exception chain, use return_exceptions=False with try/except.
For related Python issues, see Fix: Python asyncio Blocking the Event Loop and Fix: Python Decorator Not Working.
Solo developer based in Japan. Every solution is cross-referenced with official documentation and tested before publishing.
Was this article helpful?
Related Articles
Fix: Python contextmanager Not Working — GeneratorExit, Missing yield, or Cleanup Not Running
How to fix Python context manager issues — @contextmanager generator, __enter__ and __exit__, exception handling inside with blocks, async context managers, and common pitfalls.
Fix: Python Protocol Not Working — Type Checker Rejects Compatible Class, runtime_checkable Fails, or Protocol Not Recognized
How to fix Python Protocol class issues — structural subtyping vs nominal typing, runtime_checkable, Protocol inheritance, TypeVar constraints, and common mypy/pyright errors with Protocol.
Fix: Python pathlib Not Working — Path Object Errors, Joins, and Common Pitfalls
How to fix Python pathlib issues — TypeError with string concatenation, path joining, glob patterns, reading files, cross-platform paths, and migrating from os.path.
Fix: Python Decorator Not Working — Function Signature Lost or Decorator Not Applied
How to fix Python decorator issues — functools.wraps, decorator factories with arguments, class decorators, stacking order, async function decorators, and common pitfalls.