Fix: FastAPI BackgroundTasks Not Working — Task Not Running or Dependency Errors
Quick Answer
How to fix FastAPI BackgroundTasks — task not executing, dependency injection in tasks, error handling, Celery for heavy tasks, and lifespan-managed background workers.
The Problem
A FastAPI BackgroundTasks function never runs:
from fastapi import FastAPI, BackgroundTasks
app = FastAPI()
def send_email(email: str):
print(f"Sending email to {email}") # Never prints
@app.post("/register")
async def register(email: str, background_tasks: BackgroundTasks):
background_tasks.add_task(send_email, email)
return {"message": "Registered"}Or the task runs but raises an unhandled exception that silently disappears:
def process_data(data: dict):
result = data["key"] # KeyError — but the endpoint returns 200 anywayOr you need to use a database session inside a background task, but get:
sqlalchemy.exc.InvalidRequestError: Instance <User> is not bound to a SessionOr a task takes too long and blocks the server response.
Why This Happens
FastAPI BackgroundTasks runs after the response is sent to the client. Common failures:
- Task function isn’t actually being added —
background_tasks.add_task()must be called before the endpoint returns. If an early return or exception happens first, the task is never queued. - Async vs sync task confusion —
BackgroundTaskscan run bothasync defanddeffunctions, but there are differences in how they execute. - Database session closed before task runs — the database session created per-request is closed when the request ends. By the time
BackgroundTasksruns, the session is gone. Passing ORM objects to tasks causes “detached instance” errors. - Exceptions swallowed silently — if a background task raises an exception, FastAPI logs it but doesn’t crash the server. The error is silent from the client’s perspective.
BackgroundTasksis not for heavy work — it runs in the same process, blocking the event loop for CPU-bound tasks or tying up a worker for long-running I/O tasks. For production workloads, use Celery or similar.
Fix 1: Verify the Task Is Registered Before Return
add_task() must be called before the function returns:
from fastapi import FastAPI, BackgroundTasks
app = FastAPI()
def send_welcome_email(email: str, name: str):
# Simulate sending email
import time
time.sleep(2)
print(f"Email sent to {email}")
@app.post("/register")
async def register(
email: str,
name: str,
background_tasks: BackgroundTasks,
):
# WRONG — early return before adding the task
if not email:
return {"error": "Email required"}
# Task never added if we returned above
# CORRECT — add task, then return
background_tasks.add_task(send_welcome_email, email, name)
return {"message": "Registered successfully"}
# Multiple background tasks — all run after the response
@app.post("/order")
async def create_order(order_id: str, background_tasks: BackgroundTasks):
background_tasks.add_task(send_confirmation_email, order_id)
background_tasks.add_task(update_inventory, order_id)
background_tasks.add_task(notify_warehouse, order_id)
return {"order_id": order_id, "status": "processing"}Pass arguments correctly:
# Positional args after the function
background_tasks.add_task(my_function, arg1, arg2)
# Keyword args
background_tasks.add_task(my_function, email=email, subject="Welcome")
# Mixed
background_tasks.add_task(my_function, email, subject="Welcome", delay=5)Fix 2: Use Async Tasks for I/O-Bound Work
BackgroundTasks supports both async def and regular def functions. For I/O operations like HTTP requests or file writes, use async def:
import httpx
from fastapi import FastAPI, BackgroundTasks
app = FastAPI()
# WRONG — sync function blocks the event loop for I/O
def sync_send_webhook(url: str, data: dict):
import requests
requests.post(url, json=data) # Blocks the thread
# CORRECT — async function for I/O-bound work
async def async_send_webhook(url: str, data: dict):
async with httpx.AsyncClient() as client:
await client.post(url, json=data)
# CORRECT — sync function for CPU-bound work (runs in thread pool)
def sync_process_image(image_path: str):
# CPU-bound: runs in thread pool, won't block event loop
from PIL import Image
img = Image.open(image_path)
img.thumbnail((200, 200))
img.save(image_path.replace('.jpg', '_thumb.jpg'))
@app.post("/process")
async def process(background_tasks: BackgroundTasks):
background_tasks.add_task(async_send_webhook, "https://webhook.site/...", {"event": "processed"})
background_tasks.add_task(sync_process_image, "/tmp/upload.jpg")
return {"status": "processing"}Fix 3: Create a New Database Session Inside the Task
Never reuse the request’s database session inside a background task. Create a new session:
from fastapi import FastAPI, BackgroundTasks, Depends
from sqlalchemy.orm import Session
from database import SessionLocal, User
app = FastAPI()
# WRONG — passing the request session to the background task
def update_last_login_wrong(user: User, db: Session):
# db session is already closed by the time this runs
user.last_login = datetime.utcnow()
db.commit() # sqlalchemy.exc.InvalidRequestError
# CORRECT — create a fresh session inside the task
def update_last_login(user_id: int):
db = SessionLocal() # New session
try:
user = db.query(User).filter(User.id == user_id).first()
if user:
user.last_login = datetime.utcnow()
db.commit()
except Exception:
db.rollback()
raise
finally:
db.close() # Always close
@app.post("/login")
async def login(
user_id: int,
background_tasks: BackgroundTasks,
db: Session = Depends(get_db),
):
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(status_code=404, detail="User not found")
# Pass the ID (a plain value), not the ORM object
background_tasks.add_task(update_last_login, user.id)
return {"message": "Logged in"}With SQLAlchemy async sessions:
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker
async_session_factory = async_sessionmaker(engine, expire_on_commit=False)
async def async_update_stats(user_id: int, event: str):
async with async_session_factory() as session:
async with session.begin():
user = await session.get(User, user_id)
if user:
user.event_count += 1Fix 4: Add Error Handling to Background Tasks
Exceptions in background tasks are silently swallowed from the client’s perspective. Add explicit error handling and logging:
import logging
from fastapi import FastAPI, BackgroundTasks
logger = logging.getLogger(__name__)
app = FastAPI()
def send_email(to: str, subject: str, body: str):
try:
# Email sending logic
import smtplib
# ... send email ...
logger.info(f"Email sent to {to}")
except smtplib.SMTPException as e:
logger.error(f"Failed to send email to {to}: {e}")
# Optionally: retry logic, dead letter queue, alert
except Exception as e:
logger.exception(f"Unexpected error sending email to {to}")
raise # Re-raise if you want FastAPI to log the traceback
@app.post("/notify")
async def notify(email: str, background_tasks: BackgroundTasks):
background_tasks.add_task(
send_email,
to=email,
subject="Notification",
body="You have a new message",
)
return {"message": "Notification queued"}Wrap tasks in an error handler:
import functools
def with_error_handling(func):
@functools.wraps(func)
async def async_wrapper(*args, **kwargs):
try:
return await func(*args, **kwargs)
except Exception as e:
logger.exception(f"Background task {func.__name__} failed: {e}")
@functools.wraps(func)
def sync_wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except Exception as e:
logger.exception(f"Background task {func.__name__} failed: {e}")
if asyncio.iscoroutinefunction(func):
return async_wrapper
return sync_wrapper
@with_error_handling
def send_email(to: str, body: str):
# Any exception here is caught and logged
...Fix 5: Use Lifespan for Long-Running Background Workers
BackgroundTasks is per-request. For tasks that need to run continuously (polling, periodic cleanup), use the lifespan context manager:
import asyncio
from contextlib import asynccontextmanager
from fastapi import FastAPI
async def periodic_cleanup():
"""Runs in the background for the lifetime of the application."""
while True:
try:
# Clean up expired sessions, temp files, etc.
await cleanup_expired_sessions()
logger.info("Cleanup completed")
except Exception as e:
logger.error(f"Cleanup failed: {e}")
await asyncio.sleep(300) # Run every 5 minutes
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup: launch background task
task = asyncio.create_task(periodic_cleanup())
yield
# Shutdown: cancel the task
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
app = FastAPI(lifespan=lifespan)
@app.get("/health")
async def health():
return {"status": "ok"}Fix 6: Use Celery for Heavy or Reliable Tasks
BackgroundTasks is fire-and-forget: if the server crashes, tasks are lost. For reliability or CPU-heavy work, use Celery:
# celery_app.py
from celery import Celery
celery_app = Celery(
"worker",
broker="redis://localhost:6379/0",
backend="redis://localhost:6379/0",
)
@celery_app.task
def send_email_task(to: str, subject: str, body: str):
# This runs in a separate Celery worker process
send_email(to, subject, body)
@celery_app.task(bind=True, max_retries=3)
def process_payment(self, order_id: str):
try:
charge_payment(order_id)
except PaymentError as e:
# Retry after 60 seconds, up to 3 times
raise self.retry(exc=e, countdown=60)# main.py
from fastapi import FastAPI
from celery_app import send_email_task, process_payment
app = FastAPI()
@app.post("/order")
async def create_order(order_id: str, email: str):
# Dispatch to Celery — survives server restarts
process_payment.delay(order_id)
send_email_task.delay(email, "Order Confirmation", f"Order {order_id} received")
return {"order_id": order_id}When to use BackgroundTasks vs Celery:
| BackgroundTasks | Celery | |
|---|---|---|
| Setup | Zero config | Requires broker (Redis/RabbitMQ) |
| Durability | Lost on crash | Persisted in broker |
| Retries | Manual | Built-in |
| Monitoring | None | Flower dashboard |
| Use case | Quick fire-and-forget | Reliable, retriable, scheduled |
Still Not Working?
Task runs but changes aren’t visible — if the task writes to a database and you query immediately after the endpoint returns, the task may not have finished yet. BackgroundTasks runs after the response is sent but there’s no guarantee of when it completes.
BackgroundTasks in test mode — by default, FastAPI’s TestClient (Starlette’s TestClient) runs background tasks synchronously after the response, so they do execute during tests. If you’re using an async test client (httpx.AsyncClient), background tasks may not run — use TestClient for testing background tasks.
BackgroundTasks doesn’t receive dependency-injected values — you can’t use Depends() inside the task function itself (it’s not an endpoint). Pass the values you need as arguments when calling add_task().
CPU-bound tasks freeze the server — BackgroundTasks runs in the same process. Computationally heavy work (image processing, data analysis) blocks other requests. Use a thread pool (run_in_executor) or Celery workers for CPU-bound tasks.
For related FastAPI issues, see Fix: FastAPI 422 Unprocessable Entity and Fix: FastAPI Dependency Injection Error.
Solo developer based in Japan. Every solution is cross-referenced with official documentation and tested before publishing.
Was this article helpful?
Related Articles
Fix: Pydantic ValidationError — Field Required, Value Not a Valid Type, or Extra Fields
How to fix Pydantic v2 validation errors — required fields, type coercion, model_validator, custom validators, extra fields config, and migrating from Pydantic v1.
Fix: FastAPI Dependency Injection Errors — Dependencies Not Working
How to fix FastAPI dependency injection errors — async dependencies, database sessions, sub-dependencies, dependency overrides in tests, and common DI mistakes.
Fix: Python asyncio Blocking the Event Loop — Mixing Sync and Async Code
How to fix Python asyncio event loop blocking — using run_in_executor for sync calls, asyncio.to_thread, avoiding blocking I/O in coroutines, and detecting event loop stalls.
Fix: Pydantic ValidationError — Field Required / Value Not Valid
How to fix Pydantic ValidationError in Python — missing required fields, wrong types, custom validators, handling optional fields, v1 vs v2 API differences, and debugging complex nested models.