Skip to content

Fix: arq Not Working — Worker Not Picking Jobs, WorkerSettings, Cron, Retries, and Result Expiry

FixDevs · (Updated: )

Part of:  Python Errors

Quick Answer

How to fix Python arq errors — worker can't find tasks, WorkerSettings class structure, cron syntax differences, msgpack serialization errors, job_id deduplication, result expiration, and Redis connection pooling.

The Error

You enqueue a job from FastAPI and the worker logs it as unknown:

WARNING - task my_app.tasks.send_email is not in the registered tasks

Or your worker starts but does nothing:

$ arq tasks.WorkerSettings
14:32:01: Starting worker for 0 functions
14:32:01: redis_version=7.2.4 mem_usage=2.34M clients_connected=1 db_keys=0
# Sits idle. Never runs anything.

Or you enqueue_job and get this:

TypeError: Object of type Decimal is not msgpack serializable

Or cron jobs aren’t firing at the times you expect:

cron_jobs = [
    cron("send_report", hour=3, minute=0),
]
# Configured. Job never runs at 3:00 AM.

Why This Happens

arq is small and opinionated. Most failures map to one of:

  • WorkerSettings must list every task. Unlike Celery’s auto-discovery, arq requires you to explicitly enumerate task functions in WorkerSettings.functions. Missing one → worker doesn’t register it → “not in the registered tasks” when enqueued.
  • arq uses msgpack by default. Job arguments are serialized with msgpack, which supports a smaller set of types than pickle. Decimal, datetime (in some cases), custom classes, and other Python objects fail.
  • Cron jobs use a custom DSL, not standard cron strings. cron("...", hour=3) is a function call with keyword args — not cron("0 3 * * *"). Mixing up the syntax silently produces wrong schedules.
  • Worker discovery is by import path. arq tasks.WorkerSettings must resolve in Python’s import path. Run from the wrong directory, get module-not-found.

Underneath these symptoms is a single design decision worth internalizing: arq is asyncio-native and trusts you to be explicit. Where Celery goes out of its way to auto-discover, marshal arbitrary objects, and patch sync code into a worker context, arq does almost none of that. The good news is the failure modes are local: a missing task in functions produces a clean error, a non-msgpack argument fails at enqueue time, and the worker’s startup banner tells you exactly which functions it loaded. The bad news is that any sloppiness in your import paths or argument types becomes a silent stall the moment a real job lands in the queue.

The other thing to understand is that arq is one process holding one event loop. There is no per-task subprocess fork like Celery’s prefork pool. A blocking call inside one task — requests.get(url), time.sleep(), a synchronous SQLAlchemy session — freezes the entire worker until it returns. The queue keeps growing in Redis, every other job waits behind the blocked one, and your monitoring sees the queue depth climb while CPU stays low. If you’re seeing “background jobs are slow” without an obvious cause, the first thing to check is whether any task is running sync code in disguise (especially through a library wrapper that hides the blocking call).

Fix 1: Register Tasks in WorkerSettings.functions

Every task the worker should run must be in functions:

# tasks.py
from arq import create_pool
from arq.connections import RedisSettings

async def send_email(ctx, to: str, subject: str):
    print(f"Sending to {to}: {subject}")
    # ... actual send ...
    return {"ok": True}

async def process_payment(ctx, payment_id: int):
    # ...
    pass

class WorkerSettings:
    functions = [send_email, process_payment]
    redis_settings = RedisSettings(host="localhost", port=6379)
# enqueue.py (from your web app)
from arq import create_pool
from arq.connections import RedisSettings

async def main():
    redis = await create_pool(RedisSettings(host="localhost"))
    await redis.enqueue_job("send_email", "[email protected]", "Welcome")

Run the worker:

arq tasks.WorkerSettings

The first log line should list the registered functions:

Starting worker for 2 functions: send_email, process_payment

If you see 0 functions, your WorkerSettings class wasn’t loaded — usually a typo or wrong import path.

Pro Tip: Put all your task functions in one tasks.py (or a tasks/ package with __init__.py re-exporting them). Easier to keep WorkerSettings.functions in sync.

Fix 2: Task Function Signature Must Accept ctx

Every arq task takes ctx as the first positional argument:

async def send_email(ctx, to: str, subject: str):
    # ctx is a dict-like with: job_id, job_try, enqueue_time, etc.
    print(f"Job {ctx['job_id']} attempt {ctx['job_try']}")
    ...

The ctx dict carries runtime metadata. Don’t omit it — arq calls the function with ctx as the first arg unconditionally:

# Wrong:
async def send_email(to: str, subject: str): ...
# arq calls: send_email(ctx, "[email protected]", "Welcome")
# TypeError: send_email() takes 2 positional arguments but 3 were given

Inside the task, you can also pull shared resources from ctx. Set them up in on_startup:

class WorkerSettings:
    functions = [...]
    
    async def on_startup(ctx):
        ctx["http"] = httpx.AsyncClient()
    
    async def on_shutdown(ctx):
        await ctx["http"].aclose()

async def fetch_url(ctx, url: str):
    return (await ctx["http"].get(url)).text

on_startup runs once when the worker boots. The ctx it populates is shared across all jobs that worker processes.

Fix 3: Serialize Non-msgpack Types Explicitly

msgpack supports: int, float, str, bytes, bool, None, list, dict. Other types need conversion before enqueue:

from decimal import Decimal
from datetime import datetime

# Wrong — fails with serialization error:
await redis.enqueue_job("process_payment", amount=Decimal("9.99"))

# Right — convert at the call site:
await redis.enqueue_job("process_payment", amount=str(Decimal("9.99")))

For complex objects, prefer IDs:

# Wrong — Pydantic model serialization is fragile:
await redis.enqueue_job("send_invoice", invoice=invoice_model)

# Right — pass the ID, fetch in the task:
await redis.enqueue_job("send_invoice", invoice_id=invoice.id)

async def send_invoice(ctx, invoice_id: int):
    invoice = await fetch_invoice(invoice_id)
    ...

To switch arq’s serializer to pickle (supports anything but is slower and unsafe across versions):

import pickle

class WorkerSettings:
    functions = [...]
    job_serializer = pickle.dumps
    job_deserializer = pickle.loads

Common Mistake: Mixing serializers between producer and consumer. The web app’s arq pool and the worker must use the same serializer. Configure both.

Fix 4: Cron Jobs With cron()

arq’s cron uses keyword args, not a cron string:

from arq.cron import cron

cron_jobs = [
    # Every day at 03:00 UTC
    cron("send_report", hour={3}, minute={0}),
    
    # Every weekday at 09:30
    cron("morning_digest", weekday={"mon", "tue", "wed", "thu", "fri"}, hour={9}, minute={30}),
    
    # Every 15 minutes
    cron("health_check", minute={0, 15, 30, 45}),
]

class WorkerSettings:
    functions = [...]
    cron_jobs = cron_jobs

The keyword args (hour, minute, weekday, month, day) accept sets, lists, or scalars. arq fires the job when the current time matches all specified fields.

Pro Tip: Test cron schedules in the REPL before deploying:

from arq.cron import cron
from datetime import datetime

job = cron("test", hour={3}, minute={0})
print(job.next(datetime.utcnow()))
# datetime of the next scheduled firing

Fix 5: Job Deduplication With _job_id

If you enqueue the same job multiple times for the same external event, use _job_id for at-most-once semantics:

# At-most-once: only one job runs per user per day
job_id = f"daily_email:{user_id}:{date.today().isoformat()}"
await redis.enqueue_job("send_daily_email", user_id=user_id, _job_id=job_id)

arq treats the _job_id as a unique key in Redis. A second enqueue_job with the same _job_id is a no-op (returns the existing job’s info).

For one-shot delayed scheduling:

from datetime import datetime, timedelta

await redis.enqueue_job(
    "reminder",
    user_id=42,
    _defer_until=datetime.utcnow() + timedelta(hours=2),
)

_defer_until (or _defer_by for a timedelta) schedules the job for later execution.

Fix 6: Retries and max_tries

By default, arq retries failed jobs up to 5 times with exponential backoff. Tune per task or globally:

from arq import Retry

async def call_flaky_api(ctx, payload):
    try:
        return await external_call(payload)
    except TransientError:
        # Tell arq to retry with custom delay.
        raise Retry(defer=60)

class WorkerSettings:
    functions = [...]
    max_tries = 3  # Globally cap retries

Retry(defer=N) schedules the next attempt N seconds out. For permanent failures, raise a regular exception — once max_tries is exhausted, the job goes to the failed queue.

To inspect failures:

from arq import create_pool

redis = await create_pool(RedisSettings(...))
result = await redis.zrange("arq:result", 0, -1, withscores=True)
# Failed job results stay around until result_expire

Fix 7: Result Expiry and Job Status

By default, job results expire after a short window (around an hour in recent arq versions — check your installed version’s defaults). Configure explicitly:

class WorkerSettings:
    functions = [...]
    keep_result = 86400  # 24 hours in seconds — bumped to a full day

To wait for and read a result:

job = await redis.enqueue_job("compute", input_data)
result = await job.result(timeout=60)  # Blocks up to 60s
print(result)

To check status without blocking:

status = await job.status()  # JobStatus.queued / .deferred / .in_progress / .complete / .not_found

Note: .result() blocks the calling coroutine. In a web request handler, this turns your async queue into a sync RPC — usually wrong. Return the job ID, let the client poll.

Fix 8: Connection Pooling and Web Integration

For FastAPI / Starlette apps that enqueue jobs, share one Redis pool across requests:

from contextlib import asynccontextmanager
from fastapi import FastAPI
from arq import create_pool
from arq.connections import RedisSettings

redis_pool = None

@asynccontextmanager
async def lifespan(app: FastAPI):
    global redis_pool
    redis_pool = await create_pool(RedisSettings(host="localhost"))
    yield
    await redis_pool.close()

app = FastAPI(lifespan=lifespan)

@app.post("/email")
async def trigger_email(to: str):
    job = await redis_pool.enqueue_job("send_email", to, "Welcome")
    return {"job_id": job.job_id}

Don’t create_pool per request — opening a Redis connection per request adds 1-10ms of latency and exhausts the connection limit fast under load.

Common Mistake: Forgetting await redis_pool.close() in shutdown. Without it, you leak connections that may interfere with redeploys (Redis maxclients reached).

Production Incident Lens: When All Background Jobs Stop

When arq stops processing jobs in production, the blast radius is every async path that depended on a worker: signup welcome emails, payment webhooks, scheduled reports, retry queues, cache warmers. The user-visible foreground stays up (your FastAPI handlers are still responding), which makes the incident harder to detect on conventional uptime monitors. The first signal is usually a stale dashboard metric — emails-sent flatlines, last-job-completed climbs past the SLO. By the time someone notices, you may have thousands of queued jobs in Redis and a worker that’s either crashed, stuck in a blocking call, or pointed at the wrong queue.

The triage order matters. First, check that a worker process exists: ps aux | grep arq on the worker host, or the equivalent in your container orchestrator. A surprising number of “arq isn’t working” incidents are simply a deploy that restarted the API but never started a new worker. Second, check the queue depth directly in Redis: redis-cli LLEN arq:queue (or your custom queue name). A steady climb means producers are still enqueuing but no consumer is draining. Third, if the worker is up but idle, check whether it’s stuck on one job — look at the worker logs for the most recent “starting job” line without a corresponding “completed” or “failed.”

For monitoring, instrument three numbers and alert on each independently: queue depth (climbing means producer-consumer imbalance), oldest-job age (climbing means head-of-line blocking), and successful-completions per minute (dropping to zero means a worker outage). Page on any one of them crossing a threshold rather than waiting for a composite SLO to fire — by the time queue depth and oldest age both alert, you’ve usually been broken for ten minutes already.

Still Not Working?

A few less-obvious failures:

  • Worker shows 0 functions even with functions=[...]. Wrong import — arq tasks.WorkerSettings needs tasks.py in the working dir. Try python -c "from tasks import WorkerSettings; print(WorkerSettings.functions)" to confirm.
  • Cron jobs run twice when you have two workers. That’s expected — arq doesn’t elect a leader. Use _job_id with a time-based suffix to dedupe, or run only one worker for cron and others for general jobs.
  • max_jobs doesn’t limit concurrency the way you think. It caps concurrent jobs on a single worker. To limit across workers, scale workers down or use Redis-based semaphores.
  • enqueue_job returns None instead of a job. That happens when _job_id already exists (deduplication). Check for None if you’re using _job_id.
  • Health checks fail silently. Set health_check_interval and arq logs its own health periodically; for monitoring, expose arq:health-check:<queue> from Redis.
  • on_job_start / on_job_end not firing. They’re class-level on WorkerSettings, not free functions. Check indentation.
  • Worker memory grows over time. A task leaks (open file, lingering HTTP connection). Use on_shutdown to close shared resources, and consider max_jobs * burst recycling.
  • Multiple queues, jobs land on the wrong one. Specify _queue_name on enqueue and pass queue_name in WorkerSettings. Default queue is arq:queue.
  • Worker crashes after a Redis failover and never reconnects. arq’s reconnect logic is best-effort; some Redis HA setups break in ways that leave the worker holding a dead pool. Run workers under a supervisor (systemd, Kubernetes) with a quick restart policy so a process crash is the recovery path.
  • Jobs vanish without trace after _defer_until. A clock skew between producer and Redis can place the deferred timestamp in the past, making the job eligible immediately — and if no worker is up at that moment, the job runs and disappears with no record. Pin both producer and worker clocks via NTP and store a paper trail in your own DB before enqueuing.
  • A single slow task blocks the whole worker. arq runs max_jobs coroutines but they share one loop; CPU-heavy work or accidental sync I/O starves everything else. Move CPU-heavy work to a separate worker pool or off-process executor.

For related Python async task queue issues, see Dramatiq not working, Celery task not received, APScheduler not working, and Redis connection refused.

F

FixDevs

Solo developer based in Japan. Every solution is cross-referenced with official documentation and tested before publishing.

Was this article helpful?

Related Articles