Fix: arq Not Working — Worker Not Picking Jobs, WorkerSettings, Cron, Retries, and Result Expiry
Part of: Python Errors
Quick Answer
How to fix Python arq errors — worker can't find tasks, WorkerSettings class structure, cron syntax differences, msgpack serialization errors, job_id deduplication, result expiration, and Redis connection pooling.
The Error
You enqueue a job from FastAPI and the worker logs it as unknown:
WARNING - task my_app.tasks.send_email is not in the registered tasksOr your worker starts but does nothing:
$ arq tasks.WorkerSettings
14:32:01: Starting worker for 0 functions
14:32:01: redis_version=7.2.4 mem_usage=2.34M clients_connected=1 db_keys=0
# Sits idle. Never runs anything.Or you enqueue_job and get this:
TypeError: Object of type Decimal is not msgpack serializableOr cron jobs aren’t firing at the times you expect:
cron_jobs = [
cron("send_report", hour=3, minute=0),
]
# Configured. Job never runs at 3:00 AM.Why This Happens
arq is small and opinionated. Most failures map to one of:
WorkerSettingsmust list every task. Unlike Celery’s auto-discovery, arq requires you to explicitly enumerate task functions inWorkerSettings.functions. Missing one → worker doesn’t register it → “not in the registered tasks” when enqueued.- arq uses msgpack by default. Job arguments are serialized with msgpack, which supports a smaller set of types than pickle.
Decimal,datetime(in some cases), custom classes, and other Python objects fail. - Cron jobs use a custom DSL, not standard cron strings.
cron("...", hour=3)is a function call with keyword args — notcron("0 3 * * *"). Mixing up the syntax silently produces wrong schedules. - Worker discovery is by import path.
arq tasks.WorkerSettingsmust resolve in Python’s import path. Run from the wrong directory, get module-not-found.
Underneath these symptoms is a single design decision worth internalizing: arq is asyncio-native and trusts you to be explicit. Where Celery goes out of its way to auto-discover, marshal arbitrary objects, and patch sync code into a worker context, arq does almost none of that. The good news is the failure modes are local: a missing task in functions produces a clean error, a non-msgpack argument fails at enqueue time, and the worker’s startup banner tells you exactly which functions it loaded. The bad news is that any sloppiness in your import paths or argument types becomes a silent stall the moment a real job lands in the queue.
The other thing to understand is that arq is one process holding one event loop. There is no per-task subprocess fork like Celery’s prefork pool. A blocking call inside one task — requests.get(url), time.sleep(), a synchronous SQLAlchemy session — freezes the entire worker until it returns. The queue keeps growing in Redis, every other job waits behind the blocked one, and your monitoring sees the queue depth climb while CPU stays low. If you’re seeing “background jobs are slow” without an obvious cause, the first thing to check is whether any task is running sync code in disguise (especially through a library wrapper that hides the blocking call).
Fix 1: Register Tasks in WorkerSettings.functions
Every task the worker should run must be in functions:
# tasks.py
from arq import create_pool
from arq.connections import RedisSettings
async def send_email(ctx, to: str, subject: str):
print(f"Sending to {to}: {subject}")
# ... actual send ...
return {"ok": True}
async def process_payment(ctx, payment_id: int):
# ...
pass
class WorkerSettings:
functions = [send_email, process_payment]
redis_settings = RedisSettings(host="localhost", port=6379)# enqueue.py (from your web app)
from arq import create_pool
from arq.connections import RedisSettings
async def main():
redis = await create_pool(RedisSettings(host="localhost"))
await redis.enqueue_job("send_email", "[email protected]", "Welcome")Run the worker:
arq tasks.WorkerSettingsThe first log line should list the registered functions:
Starting worker for 2 functions: send_email, process_paymentIf you see 0 functions, your WorkerSettings class wasn’t loaded — usually a typo or wrong import path.
Pro Tip: Put all your task functions in one tasks.py (or a tasks/ package with __init__.py re-exporting them). Easier to keep WorkerSettings.functions in sync.
Fix 2: Task Function Signature Must Accept ctx
Every arq task takes ctx as the first positional argument:
async def send_email(ctx, to: str, subject: str):
# ctx is a dict-like with: job_id, job_try, enqueue_time, etc.
print(f"Job {ctx['job_id']} attempt {ctx['job_try']}")
...The ctx dict carries runtime metadata. Don’t omit it — arq calls the function with ctx as the first arg unconditionally:
# Wrong:
async def send_email(to: str, subject: str): ...
# arq calls: send_email(ctx, "[email protected]", "Welcome")
# TypeError: send_email() takes 2 positional arguments but 3 were givenInside the task, you can also pull shared resources from ctx. Set them up in on_startup:
class WorkerSettings:
functions = [...]
async def on_startup(ctx):
ctx["http"] = httpx.AsyncClient()
async def on_shutdown(ctx):
await ctx["http"].aclose()
async def fetch_url(ctx, url: str):
return (await ctx["http"].get(url)).texton_startup runs once when the worker boots. The ctx it populates is shared across all jobs that worker processes.
Fix 3: Serialize Non-msgpack Types Explicitly
msgpack supports: int, float, str, bytes, bool, None, list, dict. Other types need conversion before enqueue:
from decimal import Decimal
from datetime import datetime
# Wrong — fails with serialization error:
await redis.enqueue_job("process_payment", amount=Decimal("9.99"))
# Right — convert at the call site:
await redis.enqueue_job("process_payment", amount=str(Decimal("9.99")))For complex objects, prefer IDs:
# Wrong — Pydantic model serialization is fragile:
await redis.enqueue_job("send_invoice", invoice=invoice_model)
# Right — pass the ID, fetch in the task:
await redis.enqueue_job("send_invoice", invoice_id=invoice.id)
async def send_invoice(ctx, invoice_id: int):
invoice = await fetch_invoice(invoice_id)
...To switch arq’s serializer to pickle (supports anything but is slower and unsafe across versions):
import pickle
class WorkerSettings:
functions = [...]
job_serializer = pickle.dumps
job_deserializer = pickle.loadsCommon Mistake: Mixing serializers between producer and consumer. The web app’s arq pool and the worker must use the same serializer. Configure both.
Fix 4: Cron Jobs With cron()
arq’s cron uses keyword args, not a cron string:
from arq.cron import cron
cron_jobs = [
# Every day at 03:00 UTC
cron("send_report", hour={3}, minute={0}),
# Every weekday at 09:30
cron("morning_digest", weekday={"mon", "tue", "wed", "thu", "fri"}, hour={9}, minute={30}),
# Every 15 minutes
cron("health_check", minute={0, 15, 30, 45}),
]
class WorkerSettings:
functions = [...]
cron_jobs = cron_jobsThe keyword args (hour, minute, weekday, month, day) accept sets, lists, or scalars. arq fires the job when the current time matches all specified fields.
Pro Tip: Test cron schedules in the REPL before deploying:
from arq.cron import cron
from datetime import datetime
job = cron("test", hour={3}, minute={0})
print(job.next(datetime.utcnow()))
# datetime of the next scheduled firingFix 5: Job Deduplication With _job_id
If you enqueue the same job multiple times for the same external event, use _job_id for at-most-once semantics:
# At-most-once: only one job runs per user per day
job_id = f"daily_email:{user_id}:{date.today().isoformat()}"
await redis.enqueue_job("send_daily_email", user_id=user_id, _job_id=job_id)arq treats the _job_id as a unique key in Redis. A second enqueue_job with the same _job_id is a no-op (returns the existing job’s info).
For one-shot delayed scheduling:
from datetime import datetime, timedelta
await redis.enqueue_job(
"reminder",
user_id=42,
_defer_until=datetime.utcnow() + timedelta(hours=2),
)_defer_until (or _defer_by for a timedelta) schedules the job for later execution.
Fix 6: Retries and max_tries
By default, arq retries failed jobs up to 5 times with exponential backoff. Tune per task or globally:
from arq import Retry
async def call_flaky_api(ctx, payload):
try:
return await external_call(payload)
except TransientError:
# Tell arq to retry with custom delay.
raise Retry(defer=60)
class WorkerSettings:
functions = [...]
max_tries = 3 # Globally cap retriesRetry(defer=N) schedules the next attempt N seconds out. For permanent failures, raise a regular exception — once max_tries is exhausted, the job goes to the failed queue.
To inspect failures:
from arq import create_pool
redis = await create_pool(RedisSettings(...))
result = await redis.zrange("arq:result", 0, -1, withscores=True)
# Failed job results stay around until result_expireFix 7: Result Expiry and Job Status
By default, job results expire after a short window (around an hour in recent arq versions — check your installed version’s defaults). Configure explicitly:
class WorkerSettings:
functions = [...]
keep_result = 86400 # 24 hours in seconds — bumped to a full dayTo wait for and read a result:
job = await redis.enqueue_job("compute", input_data)
result = await job.result(timeout=60) # Blocks up to 60s
print(result)To check status without blocking:
status = await job.status() # JobStatus.queued / .deferred / .in_progress / .complete / .not_foundNote: .result() blocks the calling coroutine. In a web request handler, this turns your async queue into a sync RPC — usually wrong. Return the job ID, let the client poll.
Fix 8: Connection Pooling and Web Integration
For FastAPI / Starlette apps that enqueue jobs, share one Redis pool across requests:
from contextlib import asynccontextmanager
from fastapi import FastAPI
from arq import create_pool
from arq.connections import RedisSettings
redis_pool = None
@asynccontextmanager
async def lifespan(app: FastAPI):
global redis_pool
redis_pool = await create_pool(RedisSettings(host="localhost"))
yield
await redis_pool.close()
app = FastAPI(lifespan=lifespan)
@app.post("/email")
async def trigger_email(to: str):
job = await redis_pool.enqueue_job("send_email", to, "Welcome")
return {"job_id": job.job_id}Don’t create_pool per request — opening a Redis connection per request adds 1-10ms of latency and exhausts the connection limit fast under load.
Common Mistake: Forgetting await redis_pool.close() in shutdown. Without it, you leak connections that may interfere with redeploys (Redis maxclients reached).
Production Incident Lens: When All Background Jobs Stop
When arq stops processing jobs in production, the blast radius is every async path that depended on a worker: signup welcome emails, payment webhooks, scheduled reports, retry queues, cache warmers. The user-visible foreground stays up (your FastAPI handlers are still responding), which makes the incident harder to detect on conventional uptime monitors. The first signal is usually a stale dashboard metric — emails-sent flatlines, last-job-completed climbs past the SLO. By the time someone notices, you may have thousands of queued jobs in Redis and a worker that’s either crashed, stuck in a blocking call, or pointed at the wrong queue.
The triage order matters. First, check that a worker process exists: ps aux | grep arq on the worker host, or the equivalent in your container orchestrator. A surprising number of “arq isn’t working” incidents are simply a deploy that restarted the API but never started a new worker. Second, check the queue depth directly in Redis: redis-cli LLEN arq:queue (or your custom queue name). A steady climb means producers are still enqueuing but no consumer is draining. Third, if the worker is up but idle, check whether it’s stuck on one job — look at the worker logs for the most recent “starting job” line without a corresponding “completed” or “failed.”
For monitoring, instrument three numbers and alert on each independently: queue depth (climbing means producer-consumer imbalance), oldest-job age (climbing means head-of-line blocking), and successful-completions per minute (dropping to zero means a worker outage). Page on any one of them crossing a threshold rather than waiting for a composite SLO to fire — by the time queue depth and oldest age both alert, you’ve usually been broken for ten minutes already.
Still Not Working?
A few less-obvious failures:
- Worker shows
0 functionseven withfunctions=[...]. Wrong import —arq tasks.WorkerSettingsneedstasks.pyin the working dir. Trypython -c "from tasks import WorkerSettings; print(WorkerSettings.functions)"to confirm. - Cron jobs run twice when you have two workers. That’s expected — arq doesn’t elect a leader. Use
_job_idwith a time-based suffix to dedupe, or run only one worker for cron and others for general jobs. max_jobsdoesn’t limit concurrency the way you think. It caps concurrent jobs on a single worker. To limit across workers, scale workers down or use Redis-based semaphores.enqueue_jobreturnsNoneinstead of a job. That happens when_job_idalready exists (deduplication). Check forNoneif you’re using_job_id.- Health checks fail silently. Set
health_check_intervaland arq logs its own health periodically; for monitoring, exposearq:health-check:<queue>from Redis. on_job_start/on_job_endnot firing. They’re class-level onWorkerSettings, not free functions. Check indentation.- Worker memory grows over time. A task leaks (open file, lingering HTTP connection). Use
on_shutdownto close shared resources, and considermax_jobs * burstrecycling. - Multiple queues, jobs land on the wrong one. Specify
_queue_nameon enqueue and passqueue_nameinWorkerSettings. Default queue isarq:queue. - Worker crashes after a Redis failover and never reconnects. arq’s reconnect logic is best-effort; some Redis HA setups break in ways that leave the worker holding a dead pool. Run workers under a supervisor (systemd, Kubernetes) with a quick restart policy so a process crash is the recovery path.
- Jobs vanish without trace after
_defer_until. A clock skew between producer and Redis can place the deferred timestamp in the past, making the job eligible immediately — and if no worker is up at that moment, the job runs and disappears with no record. Pin both producer and worker clocks via NTP and store a paper trail in your own DB before enqueuing. - A single slow task blocks the whole worker. arq runs
max_jobscoroutines but they share one loop; CPU-heavy work or accidental sync I/O starves everything else. Move CPU-heavy work to a separate worker pool or off-process executor.
For related Python async task queue issues, see Dramatiq not working, Celery task not received, APScheduler not working, and Redis connection refused.
Solo developer based in Japan. Every solution is cross-referenced with official documentation and tested before publishing.
Was this article helpful?
Related Articles
Fix: aiosqlite Not Working — Single Writer, WAL Mode, Row Factory, and Connection Patterns
How to fix Python aiosqlite errors — database is locked, WAL mode for concurrent reads, foreign_keys PRAGMA, row factory for dict-like rows, connection per request vs pool, datetime detect_types, and FastAPI integration.
Fix: Dramatiq Not Working — Actor Not Found, Broker Connection, Retries, and Django Integration
How to fix Dramatiq errors — ActorNotFound on worker, broker connection refused, Redis vs RabbitMQ trade-offs, message retries not triggering, async actors, and django-dramatiq AppConfig setup.
Fix: Python asyncio Blocking the Event Loop — Mixing Sync and Async Code
How to fix Python asyncio event loop blocking — using run_in_executor for sync calls, asyncio.to_thread, avoiding blocking I/O in coroutines, and detecting event loop stalls.
Fix: APScheduler Not Working — Jobs Not Running, Gunicorn Duplicates, and Timezone Issues
How to fix APScheduler — BackgroundScheduler exits when script ends, jobs run multiple times under Gunicorn, AsyncIOScheduler not firing, misfire_grace_time skips, and timezone-aware cron triggers.