Skip to content

Fix: Celery Task Not Executing — Worker Not Processing Tasks

FixDevs ·

Quick Answer

How to fix Celery tasks not executing — worker configuration, broker connection issues, task routing, serialization errors, and debugging stuck or lost tasks.

The Problem

A Celery task is called but never executes:

result = send_email.delay(user_id=42)
print(result.id)  # Task ID is returned
# But the email is never sent — worker isn't picking up the task

Or the task appears in the queue but stays there indefinitely:

celery -A myapp inspect active
# Queues show tasks waiting, but no worker picks them up

Or a worker is running but shows tasks as failed without executing the function body:

[ERROR/ForkPoolWorker-1] Task myapp.tasks.send_email[abc-123] raised unexpected: SerializationError

Or tasks disappear silently:

result = heavy_task.delay()
result.state   # 'PENDING' forever — task was never received

Why This Happens

Celery has multiple layers — the task producer (your app), the broker (Redis/RabbitMQ), and the worker (Celery process). Issues at any layer cause tasks not to execute:

  • Worker not running — the most common cause. task.delay() puts the task in the broker queue, but if no worker is listening, it stays there.
  • Wrong broker URL — the app and worker connect to different broker instances (e.g., different Redis databases or hosts). The app puts tasks into one broker; the worker reads from another.
  • Task not registered — the worker process started before the task module was imported, or the task module isn’t in the same application as the worker. Celery can’t find the task function.
  • Serialization error — the task arguments contain non-serializable objects (Django model instances, datetime without timezone, file handles). The task can’t be serialized to send to the broker.
  • Wrong queue — the task is sent to a queue that no running worker is consuming.
  • Concurrency settings — worker concurrency is 0 or too low; all workers are busy with other tasks.
  • Task acknowledgment before execution — with acks_late=False (default), tasks are acknowledged as soon as received. If the worker crashes before executing, the task is lost.

Fix 1: Verify the Worker Is Running

The first step is confirming a Celery worker is actually running and connected to the right broker:

# Start a worker in the foreground (verbose output)
celery -A myapp worker --loglevel=debug

# Check running workers
celery -A myapp inspect ping
# Expected: {'celery@hostname': {'ok': 'pong'}}
# No response = no workers running

# List registered tasks
celery -A myapp inspect registered
# Should list your task names

# Check active tasks
celery -A myapp inspect active

# Check queue lengths (Redis broker)
celery -A myapp inspect reserved

Check the Celery status:

celery -A myapp status
# celery@hostname: OK
# 1 node online.  ← At least one worker must be online

Common issue — worker and app using different broker URLs:

# settings.py or celery.py — broker URL
CELERY_BROKER_URL = 'redis://localhost:6379/0'

# Check what the worker is using
celery -A myapp inspect conf | grep broker_url
# Must match the app's CELERY_BROKER_URL

Fix 2: Configure Celery Correctly

A correct Celery configuration in Django:

# myapp/celery.py
import os
from celery import Celery

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myapp.settings')

app = Celery('myapp')

# Read config from Django settings, using CELERY_ prefix
app.config_from_object('django.conf:settings', namespace='CELERY')

# Auto-discover tasks in all installed apps
app.autodiscover_tasks()
# myapp/__init__.py — import Celery app so it's initialized with Django
from .celery import app as celery_app

__all__ = ('celery_app',)
# settings.py
CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'UTC'

Standalone Celery (non-Django):

# tasks.py
from celery import Celery

app = Celery(
    'tasks',
    broker='redis://localhost:6379/0',
    backend='redis://localhost:6379/0',
)

app.conf.update(
    task_serializer='json',
    accept_content=['json'],
    result_serializer='json',
    timezone='UTC',
    enable_utc=True,
)

@app.task
def add(x, y):
    return x + y
# Start worker for standalone setup
celery -A tasks worker --loglevel=info

Fix 3: Fix Task Registration Issues

Tasks must be imported (and decorated) before the worker starts. autodiscover_tasks() handles this for Django, but requires tasks to be in a tasks.py file in each app:

myapp/
├── __init__.py
├── celery.py
├── settings.py
└── myapp/
    ├── __init__.py
    ├── models.py
    ├── views.py
    └── tasks.py     ← Tasks must be here for autodiscover_tasks()

Verify the task is registered after starting the worker:

celery -A myapp inspect registered
# Expected:
# celery@hostname:
#   - myapp.tasks.send_email
#   - myapp.tasks.process_payment
#   - myapp.tasks.generate_report

If your task doesn’t appear in the list, the module isn’t being imported.

Explicitly include task modules:

# celery.py
app = Celery('myapp')
app.config_from_object('django.conf:settings', namespace='CELERY')

# Explicit list instead of autodiscover
app.autodiscover_tasks([
    'myapp.notifications',
    'myapp.billing',
    'myapp.reports',
])

# Or manually include
app.conf.update(
    include=['myapp.tasks', 'myapp.notifications.tasks']
)

Check the task decorator:

# WRONG — using @app.task but 'app' not available in the module
from celery import task

@task
def send_email(user_id):   # This registers with a different app instance
    pass

# CORRECT for Django — use shared_task
from celery import shared_task

@shared_task
def send_email(user_id):
    pass

# CORRECT for standalone — use @app.task
from .celery import app

@app.task
def send_email(user_id):
    pass

Common Mistake: Mixing @app.task and @shared_task. In Django projects, always use @shared_task — it automatically binds to the configured Celery app without importing it directly, avoiding circular imports.

Fix 4: Fix Serialization Errors

Celery serializes task arguments to JSON (or pickle) before sending to the broker. Non-serializable objects cause tasks to fail at the producer side:

# WRONG — passing a Django model instance (not JSON serializable)
from myapp.models import User

user = User.objects.get(id=42)
send_email.delay(user=user)   # SerializationError
# CORRECT — pass the ID (primitive), look up the object inside the task
send_email.delay(user_id=42)

@shared_task
def send_email(user_id):
    user = User.objects.get(id=user_id)   # Fetch inside the task
    # ... send email

Datetime objects — use ISO strings:

# WRONG — datetime objects may not serialize correctly across timezones
from datetime import datetime
process_at = datetime.now()
schedule_task.delay(process_at=process_at)

# CORRECT — convert to ISO string
process_at = datetime.utcnow().isoformat()
schedule_task.delay(process_at=process_at)

# In the task, parse back
from datetime import datetime
@shared_task
def schedule_task(process_at):
    dt = datetime.fromisoformat(process_at)

Verify JSON serialization before calling .delay():

import json

def test_serializable(**kwargs):
    try:
        json.dumps(kwargs)
        return True
    except TypeError as e:
        print(f"Not serializable: {e}")
        return False

# Test before sending
test_serializable(user_id=42, amount=99.99, process_at='2026-03-20T10:00:00')

Fix 5: Fix Queue Routing Issues

By default, all tasks go to the celery queue. If you have multiple queues, tasks sent to the wrong queue won’t be processed:

# settings.py — define queue routing
CELERY_TASK_ROUTES = {
    'myapp.tasks.send_email': {'queue': 'email'},
    'myapp.tasks.process_payment': {'queue': 'payments'},
    'myapp.tasks.generate_report': {'queue': 'reports'},
}

# Or use default queue for all
CELERY_DEFAULT_QUEUE = 'celery'
# Start workers for specific queues
celery -A myapp worker -Q email --loglevel=info          # Email queue worker
celery -A myapp worker -Q payments --loglevel=info       # Payments queue worker
celery -A myapp worker -Q celery,reports --loglevel=info # Default + reports

# If you send to 'email' queue but no worker listens to 'email' → tasks pile up

Check queue lengths in Redis:

# Redis CLI — check the queue length
redis-cli LLEN celery         # Default queue
redis-cli LLEN email          # Email queue
redis-cli LLEN payments       # Payments queue

Send a task to a specific queue explicitly:

# In code
send_email.apply_async(args=[user_id], queue='email')

# Or via task decorator default
@shared_task(queue='email')
def send_email(user_id):
    pass

Fix 6: Handle Task Failures and Retries

Tasks that fail silently look like they’re “not executing” — they run but raise an exception that’s not logged:

# WRONG — exception swallowed, task appears to succeed
@shared_task
def send_email(user_id):
    try:
        user = User.objects.get(id=user_id)
        email_service.send(user.email, 'Hello!')
    except Exception:
        pass   # Silent failure — task marked as SUCCESS despite not working
# CORRECT — let exceptions propagate so Celery marks the task as FAILURE
@shared_task(
    bind=True,
    max_retries=3,
    default_retry_delay=60,   # Retry after 60 seconds
)
def send_email(self, user_id):
    try:
        user = User.objects.get(id=user_id)
        email_service.send(user.email, 'Hello!')
    except User.DoesNotExist:
        # Don't retry — user doesn't exist
        raise
    except ConnectionError as exc:
        # Retry on transient errors
        raise self.retry(exc=exc)

Monitor task results to detect failures:

result = send_email.delay(user_id=42)

# Check status after some time
print(result.state)   # PENDING, STARTED, SUCCESS, FAILURE, RETRY
print(result.result)  # Return value (if SUCCESS) or exception (if FAILURE)

# If FAILURE, get the traceback
if result.state == 'FAILURE':
    print(result.traceback)

Enable Flower for real-time task monitoring:

pip install flower
celery -A myapp flower --port=5555
# Visit http://localhost:5555 — see all workers, tasks, and failures

Fix 7: Ensure Tasks Survive Worker Crashes

By default, tasks are acknowledged (removed from the queue) as soon as a worker receives them. If the worker crashes mid-execution, the task is lost:

# settings.py — acknowledge tasks AFTER execution (not before)
CELERY_TASK_ACKS_LATE = True

# Also prevent worker from prefetching too many tasks
CELERY_WORKER_PREFETCH_MULTIPLIER = 1
# Per-task — use acks_late for critical tasks
@shared_task(acks_late=True)
def process_payment(payment_id):
    # If worker crashes here, task goes back to queue and retries
    payment = Payment.objects.get(id=payment_id)
    payment_gateway.charge(payment)
    payment.mark_complete()

Use task_reject_on_worker_lost to requeue tasks when a worker dies:

CELERY_TASK_REJECT_ON_WORKER_LOST = True
CELERY_TASK_ACKS_LATE = True

Warning: acks_late=True with non-idempotent tasks risks double execution if the worker crashes after completing the task but before acknowledging it. Make tasks idempotent (safe to run multiple times) or use database-level deduplication.

Still Not Working?

Check the broker connection directly:

# Test Redis connection
redis-cli ping
# PONG — Redis is running

redis-cli -h <broker-host> -p 6379 ping
# If this fails, the broker is unreachable from the worker machine

Test with a simple task:

from myapp.celery import app

@app.task
def debug_task():
    print('Task executed!')
    return 'done'

# In Django shell
python manage.py shell
>>> from myapp.tasks import debug_task
>>> result = debug_task.delay()
>>> result.get(timeout=10)
# Should print 'Task executed!' in worker logs and return 'done'

Check worker concurrency — if concurrency is set to 0 or 1 and a long task is running, other tasks queue up:

celery -A myapp worker --concurrency=4 --loglevel=info

# Check current concurrency
celery -A myapp inspect stats | grep pool

For Docker deployments, ensure the worker container can reach the broker:

# docker-compose.yml
services:
  worker:
    build: .
    command: celery -A myapp worker --loglevel=info
    environment:
      - CELERY_BROKER_URL=redis://redis:6379/0  # Use service name, not localhost
    depends_on:
      - redis

  redis:
    image: redis:7-alpine

For related issues, see Fix: Redis Connection Refused and Fix: Python Asyncio Event Loop Error.

F

FixDevs

Solo developer based in Japan. Every solution is cross-referenced with official documentation and tested before publishing.

Was this article helpful?

Related Articles