Skip to content

Fix: FastAPI BackgroundTasks Not Working — Task Not Running or Dependency Errors

FixDevs ·

Quick Answer

How to fix FastAPI BackgroundTasks — task not executing, dependency injection in tasks, error handling, Celery for heavy tasks, and lifespan-managed background workers.

The Problem

A FastAPI BackgroundTasks function never runs:

from fastapi import FastAPI, BackgroundTasks

app = FastAPI()

def send_email(email: str):
    print(f"Sending email to {email}")  # Never prints

@app.post("/register")
async def register(email: str, background_tasks: BackgroundTasks):
    background_tasks.add_task(send_email, email)
    return {"message": "Registered"}

Or the task runs but raises an unhandled exception that silently disappears:

def process_data(data: dict):
    result = data["key"]  # KeyError — but the endpoint returns 200 anyway

Or you need to use a database session inside a background task, but get:

sqlalchemy.exc.InvalidRequestError: Instance <User> is not bound to a Session

Or a task takes too long and blocks the server response.

Why This Happens

FastAPI BackgroundTasks runs after the response is sent to the client. Common failures:

  • Task function isn’t actually being addedbackground_tasks.add_task() must be called before the endpoint returns. If an early return or exception happens first, the task is never queued.
  • Async vs sync task confusionBackgroundTasks can run both async def and def functions, but there are differences in how they execute.
  • Database session closed before task runs — the database session created per-request is closed when the request ends. By the time BackgroundTasks runs, the session is gone. Passing ORM objects to tasks causes “detached instance” errors.
  • Exceptions swallowed silently — if a background task raises an exception, FastAPI logs it but doesn’t crash the server. The error is silent from the client’s perspective.
  • BackgroundTasks is not for heavy work — it runs in the same process, blocking the event loop for CPU-bound tasks or tying up a worker for long-running I/O tasks. For production workloads, use Celery or similar.

Fix 1: Verify the Task Is Registered Before Return

add_task() must be called before the function returns:

from fastapi import FastAPI, BackgroundTasks

app = FastAPI()

def send_welcome_email(email: str, name: str):
    # Simulate sending email
    import time
    time.sleep(2)
    print(f"Email sent to {email}")

@app.post("/register")
async def register(
    email: str,
    name: str,
    background_tasks: BackgroundTasks,
):
    # WRONG — early return before adding the task
    if not email:
        return {"error": "Email required"}
    # Task never added if we returned above

    # CORRECT — add task, then return
    background_tasks.add_task(send_welcome_email, email, name)
    return {"message": "Registered successfully"}


# Multiple background tasks — all run after the response
@app.post("/order")
async def create_order(order_id: str, background_tasks: BackgroundTasks):
    background_tasks.add_task(send_confirmation_email, order_id)
    background_tasks.add_task(update_inventory, order_id)
    background_tasks.add_task(notify_warehouse, order_id)
    return {"order_id": order_id, "status": "processing"}

Pass arguments correctly:

# Positional args after the function
background_tasks.add_task(my_function, arg1, arg2)

# Keyword args
background_tasks.add_task(my_function, email=email, subject="Welcome")

# Mixed
background_tasks.add_task(my_function, email, subject="Welcome", delay=5)

Fix 2: Use Async Tasks for I/O-Bound Work

BackgroundTasks supports both async def and regular def functions. For I/O operations like HTTP requests or file writes, use async def:

import httpx
from fastapi import FastAPI, BackgroundTasks

app = FastAPI()

# WRONG — sync function blocks the event loop for I/O
def sync_send_webhook(url: str, data: dict):
    import requests
    requests.post(url, json=data)  # Blocks the thread

# CORRECT — async function for I/O-bound work
async def async_send_webhook(url: str, data: dict):
    async with httpx.AsyncClient() as client:
        await client.post(url, json=data)

# CORRECT — sync function for CPU-bound work (runs in thread pool)
def sync_process_image(image_path: str):
    # CPU-bound: runs in thread pool, won't block event loop
    from PIL import Image
    img = Image.open(image_path)
    img.thumbnail((200, 200))
    img.save(image_path.replace('.jpg', '_thumb.jpg'))

@app.post("/process")
async def process(background_tasks: BackgroundTasks):
    background_tasks.add_task(async_send_webhook, "https://webhook.site/...", {"event": "processed"})
    background_tasks.add_task(sync_process_image, "/tmp/upload.jpg")
    return {"status": "processing"}

Fix 3: Create a New Database Session Inside the Task

Never reuse the request’s database session inside a background task. Create a new session:

from fastapi import FastAPI, BackgroundTasks, Depends
from sqlalchemy.orm import Session
from database import SessionLocal, User

app = FastAPI()

# WRONG — passing the request session to the background task
def update_last_login_wrong(user: User, db: Session):
    # db session is already closed by the time this runs
    user.last_login = datetime.utcnow()
    db.commit()  # sqlalchemy.exc.InvalidRequestError

# CORRECT — create a fresh session inside the task
def update_last_login(user_id: int):
    db = SessionLocal()  # New session
    try:
        user = db.query(User).filter(User.id == user_id).first()
        if user:
            user.last_login = datetime.utcnow()
            db.commit()
    except Exception:
        db.rollback()
        raise
    finally:
        db.close()  # Always close

@app.post("/login")
async def login(
    user_id: int,
    background_tasks: BackgroundTasks,
    db: Session = Depends(get_db),
):
    user = db.query(User).filter(User.id == user_id).first()
    if not user:
        raise HTTPException(status_code=404, detail="User not found")

    # Pass the ID (a plain value), not the ORM object
    background_tasks.add_task(update_last_login, user.id)

    return {"message": "Logged in"}

With SQLAlchemy async sessions:

from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker

async_session_factory = async_sessionmaker(engine, expire_on_commit=False)

async def async_update_stats(user_id: int, event: str):
    async with async_session_factory() as session:
        async with session.begin():
            user = await session.get(User, user_id)
            if user:
                user.event_count += 1

Fix 4: Add Error Handling to Background Tasks

Exceptions in background tasks are silently swallowed from the client’s perspective. Add explicit error handling and logging:

import logging
from fastapi import FastAPI, BackgroundTasks

logger = logging.getLogger(__name__)
app = FastAPI()

def send_email(to: str, subject: str, body: str):
    try:
        # Email sending logic
        import smtplib
        # ... send email ...
        logger.info(f"Email sent to {to}")
    except smtplib.SMTPException as e:
        logger.error(f"Failed to send email to {to}: {e}")
        # Optionally: retry logic, dead letter queue, alert
    except Exception as e:
        logger.exception(f"Unexpected error sending email to {to}")
        raise  # Re-raise if you want FastAPI to log the traceback

@app.post("/notify")
async def notify(email: str, background_tasks: BackgroundTasks):
    background_tasks.add_task(
        send_email,
        to=email,
        subject="Notification",
        body="You have a new message",
    )
    return {"message": "Notification queued"}

Wrap tasks in an error handler:

import functools

def with_error_handling(func):
    @functools.wraps(func)
    async def async_wrapper(*args, **kwargs):
        try:
            return await func(*args, **kwargs)
        except Exception as e:
            logger.exception(f"Background task {func.__name__} failed: {e}")

    @functools.wraps(func)
    def sync_wrapper(*args, **kwargs):
        try:
            return func(*args, **kwargs)
        except Exception as e:
            logger.exception(f"Background task {func.__name__} failed: {e}")

    if asyncio.iscoroutinefunction(func):
        return async_wrapper
    return sync_wrapper


@with_error_handling
def send_email(to: str, body: str):
    # Any exception here is caught and logged
    ...

Fix 5: Use Lifespan for Long-Running Background Workers

BackgroundTasks is per-request. For tasks that need to run continuously (polling, periodic cleanup), use the lifespan context manager:

import asyncio
from contextlib import asynccontextmanager
from fastapi import FastAPI

async def periodic_cleanup():
    """Runs in the background for the lifetime of the application."""
    while True:
        try:
            # Clean up expired sessions, temp files, etc.
            await cleanup_expired_sessions()
            logger.info("Cleanup completed")
        except Exception as e:
            logger.error(f"Cleanup failed: {e}")
        await asyncio.sleep(300)  # Run every 5 minutes

@asynccontextmanager
async def lifespan(app: FastAPI):
    # Startup: launch background task
    task = asyncio.create_task(periodic_cleanup())
    yield
    # Shutdown: cancel the task
    task.cancel()
    try:
        await task
    except asyncio.CancelledError:
        pass

app = FastAPI(lifespan=lifespan)

@app.get("/health")
async def health():
    return {"status": "ok"}

Fix 6: Use Celery for Heavy or Reliable Tasks

BackgroundTasks is fire-and-forget: if the server crashes, tasks are lost. For reliability or CPU-heavy work, use Celery:

# celery_app.py
from celery import Celery

celery_app = Celery(
    "worker",
    broker="redis://localhost:6379/0",
    backend="redis://localhost:6379/0",
)

@celery_app.task
def send_email_task(to: str, subject: str, body: str):
    # This runs in a separate Celery worker process
    send_email(to, subject, body)

@celery_app.task(bind=True, max_retries=3)
def process_payment(self, order_id: str):
    try:
        charge_payment(order_id)
    except PaymentError as e:
        # Retry after 60 seconds, up to 3 times
        raise self.retry(exc=e, countdown=60)
# main.py
from fastapi import FastAPI
from celery_app import send_email_task, process_payment

app = FastAPI()

@app.post("/order")
async def create_order(order_id: str, email: str):
    # Dispatch to Celery — survives server restarts
    process_payment.delay(order_id)
    send_email_task.delay(email, "Order Confirmation", f"Order {order_id} received")
    return {"order_id": order_id}

When to use BackgroundTasks vs Celery:

BackgroundTasksCelery
SetupZero configRequires broker (Redis/RabbitMQ)
DurabilityLost on crashPersisted in broker
RetriesManualBuilt-in
MonitoringNoneFlower dashboard
Use caseQuick fire-and-forgetReliable, retriable, scheduled

Still Not Working?

Task runs but changes aren’t visible — if the task writes to a database and you query immediately after the endpoint returns, the task may not have finished yet. BackgroundTasks runs after the response is sent but there’s no guarantee of when it completes.

BackgroundTasks in test mode — by default, FastAPI’s TestClient (Starlette’s TestClient) runs background tasks synchronously after the response, so they do execute during tests. If you’re using an async test client (httpx.AsyncClient), background tasks may not run — use TestClient for testing background tasks.

BackgroundTasks doesn’t receive dependency-injected values — you can’t use Depends() inside the task function itself (it’s not an endpoint). Pass the values you need as arguments when calling add_task().

CPU-bound tasks freeze the serverBackgroundTasks runs in the same process. Computationally heavy work (image processing, data analysis) blocks other requests. Use a thread pool (run_in_executor) or Celery workers for CPU-bound tasks.

For related FastAPI issues, see Fix: FastAPI 422 Unprocessable Entity and Fix: FastAPI Dependency Injection Error.

F

FixDevs

Solo developer based in Japan. Every solution is cross-referenced with official documentation and tested before publishing.

Was this article helpful?

Related Articles