Fix: Dramatiq Not Working — Actor Not Found, Broker Connection, Retries, and Django Integration
Part of: Python Errors
Quick Answer
How to fix Dramatiq errors — ActorNotFound on worker, broker connection refused, Redis vs RabbitMQ trade-offs, message retries not triggering, async actors, and django-dramatiq AppConfig setup.
The Error
You start a worker and your tasks never run, or you see:
ERROR - dramatiq.worker.WorkerThread - Failed to process message myapp.tasks.send_email with unhandled exception.
ActorNotFound: send_emailOr the worker won’t even start because it can’t reach the broker:
redis.exceptions.ConnectionError: Error -2 connecting to redis:6379. Name or service not known.Or you sent a message but it’s just sitting in Redis:
$ redis-cli LLEN dramatiq:default
(integer) 47
# Nothing draining the queue.Or a Django-integrated project gives:
django.core.exceptions.AppRegistryNotReady: Apps aren't loaded yet.Why This Happens
Dramatiq splits work between a sender (your web process) and one or more worker processes. The two sides communicate through a broker (Redis or RabbitMQ). Most failures come from one of these:
- Worker doesn’t import your actor module. Dramatiq workers only know about actors that get imported when the worker boots. If you launch
dramatiq myapp.tasksbutmyapp.tasks.send_emaillives in a submodule that’s only imported when a view runs, the worker has never seen it. The result:ActorNotFoundwhen a message for that actor arrives. - Broker URL mismatch between sender and worker. Your web app might point at
redis://localhost:6379while the worker container points atredis://redis:6379(the docker-compose service name). Messages go into different brokers and never meet. - No worker process running. Sounds obvious, but easy to miss when you’ve been running
python manage.py runserverand assuming tasks are processed somewhere. Dramatiq does not auto-spawn workers. You need a separatedramatiqprocess. - Django apps not initialized. Django actors that import models at module load time fail when the Dramatiq worker boots before
django.setup()runs. The officialdjango-dramatiqpackage handles this — manual setups need to calldjango.setup()first.
The deeper architectural point is that Dramatiq is a threaded worker by default, not async or multi-process. One dramatiq process spawns N threads (default 8), each pulling messages off the broker. That model has implications: a CPU-bound actor will hold a thread for the duration of its work, and Python’s GIL means CPU-bound parallelism caps out at one core regardless of thread count. For CPU-bound work, switch to --processes. For I/O-bound work (DB queries, HTTP), threads are fine. The choice you make at deploy time silently determines whether scaling out workers actually buys you more throughput, or whether you’ve just added more threads waiting on the same GIL.
The broker choice (Redis vs RabbitMQ) also drives different failure modes. Redis is faster to set up, has lower latency, and is fine for most workloads — but visibility timeouts and message duplication semantics differ from RabbitMQ. RabbitMQ has stronger delivery guarantees, native dead-letter queue support, and better tooling for inspecting stuck messages, but adds an operational dependency. In production incidents, the broker is often the silent middleman: messages are being produced, no errors are logged, but the queue depth is climbing because workers can’t keep up or have lost their connection to the broker. Always include broker-side metrics (queue depth, consumer count, connection count) in your dashboards — not just worker-side logs.
Fix 1: Make the Worker Import All Actor Modules
Run the worker against a module (or list of modules) that imports every actor:
dramatiq myapp.tasks
# Imports myapp.tasks and any actor defined in it.
dramatiq myapp.tasks myapp.email_actors myapp.reports
# Imports multiple modules.The simplest pattern: put all actors in myapp.tasks (or import them there), and run dramatiq myapp.tasks. The worker prints which actors it registered on startup:
Worker [PID 12345] is now ready to process messages.
Actors registered: send_email, process_payment, generate_reportIf send_email isn’t on that list, your worker doesn’t know about it. The most common cause is defining @dramatiq.actor inside a views.py or signals.py that the worker never imports.
Common Mistake: Defining actors lazily inside if __name__ == "__main__": or inside function bodies. Decorators only run when the surrounding code runs. Put actors at module top level.
Fix 2: Configure the Broker Before Importing Actors
Dramatiq has a default RabbitmqBroker pointing at amqp://localhost:5672. If you want Redis or a different host, configure the broker before any @dramatiq.actor decorator runs:
# myapp/broker.py
import dramatiq
from dramatiq.brokers.redis import RedisBroker
redis_broker = RedisBroker(url="redis://localhost:6379/0")
dramatiq.set_broker(redis_broker)# myapp/tasks.py
from myapp import broker # Side effect: sets the broker.
import dramatiq
@dramatiq.actor
def send_email(to: str, subject: str): ...Order matters. @dramatiq.actor binds the actor to whichever broker is current when the decorator runs. If you call set_broker after defining actors, those actors are still bound to the old default broker — and messages disappear into the wrong place.
For RabbitMQ:
from dramatiq.brokers.rabbitmq import RabbitmqBroker
broker = RabbitmqBroker(url="amqp://guest:guest@localhost:5672/")
dramatiq.set_broker(broker)Fix 3: Verify Broker URLs Match Across Sender and Worker
Print the broker URL in both contexts and compare:
import dramatiq
print("Broker:", dramatiq.get_broker())For docker-compose, both services should point at the same broker host. Use the service name (redis://redis:6379), not localhost:
# docker-compose.yml
services:
web:
environment:
DRAMATIQ_BROKER_URL: redis://redis:6379/0
worker:
environment:
DRAMATIQ_BROKER_URL: redis://redis:6379/0
command: dramatiq myapp.tasks
redis:
image: redis:7Then in code:
import os
import dramatiq
from dramatiq.brokers.redis import RedisBroker
dramatiq.set_broker(RedisBroker(url=os.environ["DRAMATIQ_BROKER_URL"]))Fix 4: Set Up Retries and Backoff
Dramatiq retries failed messages by default (up to Retries.DEFAULT_MAX_RETRIES, which is 20) with exponential backoff up to about an hour. You can tune this per actor:
import dramatiq
@dramatiq.actor(
max_retries=3,
min_backoff=1_000, # 1 second (milliseconds)
max_backoff=60_000, # 1 minute
retry_when=lambda retries_so_far, exception: not isinstance(exception, ValueError),
)
def send_email(to: str):
...To never retry, set max_retries=0. To retry only specific exceptions, use retry_when. A common production pattern:
@dramatiq.actor(max_retries=5, min_backoff=5_000)
def call_external_api(...):
try:
return external.do_thing(...)
except external.TransientError:
raise # Retried by Dramatiq.
except external.PermanentError as e:
# Don't retry. Log and move on.
logger.error("Permanent failure", exc_info=e)Pro Tip: Dramatiq’s exponential backoff is jittered automatically. Don’t add your own time.sleep() inside the actor to “rate limit” retries — it blocks the worker thread for everyone.
Fix 5: Use Results Backend Only When You Need It
By default Dramatiq fire-and-forgets. If you call actor.send().get_result() without configuring a results backend, you get:
RuntimeError: A result backend must be configured to use the Results middleware.Add the middleware on the broker, before defining actors:
from dramatiq.brokers.redis import RedisBroker
from dramatiq.results import Results
from dramatiq.results.backends import RedisBackend
result_backend = RedisBackend(url="redis://localhost:6379/0")
broker = RedisBroker(url="redis://localhost:6379/0")
broker.add_middleware(Results(backend=result_backend))
dramatiq.set_broker(broker)
@dramatiq.actor(store_results=True)
def add(a, b):
return a + bThen:
message = add.send(1, 2)
result = message.get_result(block=True, timeout=10_000)
print(result) # 3Note: Results blocking is convenient for tests, dangerous in web requests. Blocking inside a web handler turns a fire-and-forget queue into a sync RPC and defeats the purpose of having a queue. For request-scoped results, return a job ID and poll from the client.
Fix 6: Async Actors
Dramatiq doesn’t natively support async def actors. Three options:
Option A: wrap the coroutine. Simplest, no extra dependencies:
import asyncio
import dramatiq
@dramatiq.actor
def fetch_url(url: str):
async def _do():
async with httpx.AsyncClient() as c:
r = await c.get(url)
return r.text
return asyncio.run(_do())Option B: use a third-party async middleware which runs one event loop per worker process and lets you write async def actors directly. Search PyPI for current options — historically several have come and gone.
Option C: keep actors sync, run async client libraries via anyio.from_thread. Good if you only have one or two async calls per actor.
The asyncio.run pattern in Option A creates a fresh loop per message. That’s fine for simple HTTP or DB calls but wasteful if the actor is hot — switch to Option B in that case.
Fix 7: Django Integration With django-dramatiq
For Django projects, use the django-dramatiq package — it handles django.setup() ordering, model serialization, and provides a management command:
pip install django-dramatiq# settings.py
INSTALLED_APPS = [
...,
"django_dramatiq",
"myapp",
]
DRAMATIQ_BROKER = {
"BROKER": "dramatiq.brokers.redis.RedisBroker",
"OPTIONS": {"url": "redis://localhost:6379/0"},
"MIDDLEWARE": [
"dramatiq.middleware.AgeLimit",
"dramatiq.middleware.TimeLimit",
"dramatiq.middleware.Callbacks",
"dramatiq.middleware.Retries",
"django_dramatiq.middleware.DbConnectionsMiddleware",
"django_dramatiq.middleware.AdminMiddleware",
],
}
DRAMATIQ_TASKS_DATABASE = "default"# myapp/tasks.py
import dramatiq
from django.contrib.auth import get_user_model
@dramatiq.actor
def deactivate_user(user_id: int):
User = get_user_model()
User.objects.filter(id=user_id).update(is_active=False)Run the worker with the Django command (not bare dramatiq):
python manage.py rundramatiqThis handles django.setup(), discovers actors in all INSTALLED_APPS/tasks.py files, and integrates with Django’s logging.
Common Mistake: Running dramatiq myapp.tasks directly in a Django project. Models will raise AppRegistryNotReady because Django isn’t initialized. Always use rundramatiq.
Fix 8: Idempotency and the unique_together Trick
Dramatiq doesn’t deduplicate messages — at-least-once delivery means an actor can run twice (worker crash mid-task, broker redelivery, manual replays). Make actors idempotent.
If you can’t, use dramatiq.middleware.CurrentMessage plus a database constraint:
import dramatiq
from dramatiq.middleware import CurrentMessage
broker.add_middleware(CurrentMessage())
@dramatiq.actor
def process_payment(payment_id: int):
msg = CurrentMessage.get_current_message()
# Use msg.message_id as a dedup key in a unique-indexed table.
try:
ProcessedMessage.objects.create(message_id=msg.message_id, payment_id=payment_id)
except IntegrityError:
return # Already processed.
# ...real work...A unique index on message_id makes the second attempt a no-op. Cheaper than locks, survives worker crashes.
Production Incident Lens: When the Queue Quietly Backs Up
The signature pattern for a Dramatiq production incident is invisible queue growth. Your web app keeps responding to requests, every actor.send() returns instantly because it’s just a Redis/RabbitMQ enqueue, no errors fire — yet downstream side effects (emails, webhooks, indexing, billing reconciliation) stop happening. Hours later, support tickets surface: “I never got my confirmation email.” By then the broker has thousands of unprocessed messages, and once you restart the worker pool the broker dumps them all at once, hitting external rate limits and amplifying the original outage.
The triage playbook for “all background jobs stuck” looks like this: confirm the broker is reachable (redis-cli PING or rabbitmqctl list_queues), confirm worker processes are alive, confirm the actor registration banner matches what’s in the queue, and check for stuck threads (a long-running actor that exceeded time_limit may have been killed mid-message, leaving a half-acked state). On RabbitMQ, also check whether you’ve hit a per-queue or per-connection limit — RabbitMQ silently rejects messages once flow control kicks in. On Redis, check INFO clients for the connection count and LLEN for queue depth.
The other production trap is “good failover, no consumer.” When Redis or RabbitMQ fails over to a replica, the worker’s existing connection drops and Dramatiq tries to reconnect — but if the reconnect logic can’t reach the new primary (DNS lag, sidecar restart timing), the worker process keeps running but stops processing messages. Ship a healthcheck endpoint on the worker that returns 200 only when the worker has successfully drained a message in the last N seconds, and let your orchestrator restart unhealthy workers automatically. The worst Dramatiq outages are the silent ones; make sure the worker actually proves it’s working, not just that the process exists.
Still Not Working?
A few less-obvious failures:
- Worker eats messages but logs
Skipping message X.Y.Z because actor not registered. SameActorNotFoundissue — the actor exists in your code but the worker hasn’t imported it. Re-checkdramatiq myapp.tasksarguments. PickleErroron the worker. You passed a non-pickleable arg (a database connection, a request object, a lambda). Pass IDs or primitives, fetch objects inside the actor.- Messages stuck in
dramatiq:default.DQ. That’s the delay queue. Either backoff scheduled it for later, orAgeLimitmiddleware will move it to dead letters when expired. Inspect withredis-cli ZRANGE dramatiq:default.DQ 0 -1 WITHSCORES. TimeLimitExceededafter long-running task. Default time limit is 10 minutes. Override per actor:@dramatiq.actor(time_limit=3600_000)(1 hour, in ms).- Worker uses 100% CPU even when idle. You’re on
--processesmode with a low message volume. Switch to--threads(the default), which is cheaper for I/O-bound work. - Two workers process the same message. Redis broker delivers messages with a visibility timeout. If your actor exceeds it, the broker thinks the message was lost and redelivers. Either shorten the actor or extend the visibility timeout in broker options.
- Django queries hang in the worker. Connection isn’t closed between messages.
django_dramatiq.middleware.DbConnectionsMiddlewarefixes this — include it inMIDDLEWARE. - Prometheus metrics scraping fails. Dramatiq workers expose metrics on port 9191 by default. Open the port in your container/firewall or disable via
dramatiq --no-prom. - A thundering herd after broker recovery. When the broker comes back, every queued message dispatches at once. Downstream APIs may be unprepared. Either rate-limit at the actor level or use a token-bucket middleware to smooth the recovery.
- Worker restarts on every deploy lose in-flight messages. If you SIGTERM the worker without a graceful drain period, in-flight actors are interrupted. Configure your orchestrator to allow at least the longest
time_limitworth of grace, and preferdramatiq --skip-loggingplus a graceful shutdown signal. - Same actor runs in two regions consume the same queue. A multi-region deploy where both regions point at the same Redis URL will have workers in both regions fighting for messages. Either dedicate a queue per region or pick one region as the active consumer.
For related Python task queue and async-job issues, see Celery beat not working, Celery task not received, Redis connection refused, and APScheduler not working.
Solo developer based in Japan. Every solution is cross-referenced with official documentation and tested before publishing.
Was this article helpful?
Related Articles
Fix: arq Not Working — Worker Not Picking Jobs, WorkerSettings, Cron, Retries, and Result Expiry
How to fix Python arq errors — worker can't find tasks, WorkerSettings class structure, cron syntax differences, msgpack serialization errors, job_id deduplication, result expiration, and Redis connection pooling.
Fix: Gunicorn Not Working — Worker Timeout, Boot Errors, and Signal Handling
How to fix Gunicorn errors — WORKER TIMEOUT killed, ImportError cannot import app, worker class not found, connection refused 502 behind nginx, graceful reload not working, and sync vs async worker selection.
Fix: Django REST Framework 403 Permission Denied
How to fix Django REST Framework 403 Forbidden and permission denied errors — authentication classes, permission classes, IsAuthenticated vs AllowAny, object-level permissions, and CSRF issues.
Fix: Django Migration Conflict (Conflicting Migrations Cannot Be Applied)
How to fix Django migration conflicts — why multiple leaf migrations conflict, how to merge conflicting migrations, resolve dependency chains, and set up a team workflow to prevent migration conflicts.