Fix: BullMQ Not Working — Jobs Not Processing, Workers Not Starting, or Redis Connection Failing
Quick Answer
How to fix BullMQ issues — queue and worker setup, Redis connection, job scheduling, retry strategies, concurrency, rate limiting, event listeners, and dashboard monitoring.
The Problem
Jobs are added to the queue but never process:
import { Queue, Worker } from 'bullmq';
const queue = new Queue('email');
await queue.add('send', { to: '[email protected]', subject: 'Hello' });
// Job sits in the queue forever — worker never picks it upOr the worker starts but immediately crashes:
Error: connect ECONNREFUSED 127.0.0.1:6379Or jobs process but fail silently:
Job completed with status "failed" but no error message visibleWhy This Happens
BullMQ is a job queue library built on Redis. It requires a running Redis instance and separate queue/worker processes:
- Queue and Worker are separate concepts —
Queueadds jobs,Workerprocesses them. Creating aQueuewithout aWorkermeans jobs pile up with nothing to process them. They can run in different processes or even different servers. - Redis must be running and accessible — BullMQ stores all job data in Redis. Without a Redis connection, neither queue nor worker can operate. The default connection is
localhost:6379. - Workers must import the same queue name —
new Worker('email', ...)only processes jobs from the queue named'email'. A typo or different name means the worker listens on an empty queue. - Job processors must handle errors — if the processor function throws and you don’t listen for
'failed'events, errors vanish silently. BullMQ retries failed jobs by default (3 attempts), then marks them as permanently failed.
Fix 1: Basic Queue and Worker Setup
npm install bullmq ioredis// lib/queue.ts — shared connection and queue definition
import { Queue, Worker, QueueEvents } from 'bullmq';
import IORedis from 'ioredis';
// Shared Redis connection
const connection = new IORedis({
host: process.env.REDIS_HOST || 'localhost',
port: parseInt(process.env.REDIS_PORT || '6379'),
password: process.env.REDIS_PASSWORD,
maxRetriesPerRequest: null, // Required for BullMQ
});
// Define queues
export const emailQueue = new Queue('email', { connection });
export const reportQueue = new Queue('report', { connection });
// Add a job
export async function sendEmail(to: string, subject: string, html: string) {
return emailQueue.add('send-email', { to, subject, html }, {
attempts: 3,
backoff: {
type: 'exponential',
delay: 2000, // 2s, 4s, 8s
},
removeOnComplete: { age: 3600 }, // Remove completed jobs after 1 hour
removeOnFail: { age: 86400 }, // Keep failed jobs for 24 hours
});
}
// Schedule a delayed job
export async function scheduleReminder(userId: string, message: string, delay: number) {
return emailQueue.add('reminder', { userId, message }, {
delay, // milliseconds
});
}
// Schedule recurring job (cron)
export async function setupRecurringJobs() {
await reportQueue.upsertJobScheduler('daily-report', {
pattern: '0 9 * * *', // Every day at 9 AM
}, {
name: 'generate-daily-report',
data: { type: 'daily' },
});
await reportQueue.upsertJobScheduler('weekly-cleanup', {
pattern: '0 3 * * 0', // Every Sunday at 3 AM
}, {
name: 'cleanup-old-data',
data: { type: 'cleanup' },
});
}// workers/email.worker.ts — processes email jobs
import { Worker, Job } from 'bullmq';
import IORedis from 'ioredis';
const connection = new IORedis({
host: process.env.REDIS_HOST || 'localhost',
port: parseInt(process.env.REDIS_PORT || '6379'),
password: process.env.REDIS_PASSWORD,
maxRetriesPerRequest: null,
});
const emailWorker = new Worker(
'email',
async (job: Job) => {
// Job processing logic
switch (job.name) {
case 'send-email': {
const { to, subject, html } = job.data;
console.log(`Sending email to ${to}: ${subject}`);
await sendEmailViaProvider(to, subject, html);
// Return value is stored as job result
return { sent: true, to, timestamp: new Date().toISOString() };
}
case 'reminder': {
const { userId, message } = job.data;
await sendPushNotification(userId, message);
return { notified: true };
}
default:
throw new Error(`Unknown job name: ${job.name}`);
}
},
{
connection,
concurrency: 5, // Process up to 5 jobs simultaneously
limiter: {
max: 10, // Max 10 jobs
duration: 1000, // Per 1 second (rate limiting)
},
},
);
// Event listeners — critical for monitoring
emailWorker.on('completed', (job, result) => {
console.log(`Job ${job.id} completed:`, result);
});
emailWorker.on('failed', (job, error) => {
console.error(`Job ${job?.id} failed:`, error.message);
// Alert on repeated failures
if (job && job.attemptsMade >= job.opts.attempts!) {
alertOncall(`Email job permanently failed: ${error.message}`);
}
});
emailWorker.on('error', (error) => {
console.error('Worker error:', error);
});
emailWorker.on('stalled', (jobId) => {
console.warn(`Job ${jobId} stalled — will be reprocessed`);
});
console.log('Email worker started');Fix 2: Job Progress and Events
// Worker with progress reporting
const videoWorker = new Worker(
'video',
async (job: Job) => {
const { videoId, format } = job.data;
const totalSteps = 4;
// Step 1: Download
await job.updateProgress(25);
await job.log('Downloading video...');
const file = await downloadVideo(videoId);
// Step 2: Transcode
await job.updateProgress(50);
await job.log(`Transcoding to ${format}...`);
const transcoded = await transcodeVideo(file, format);
// Step 3: Upload
await job.updateProgress(75);
await job.log('Uploading result...');
const url = await uploadToStorage(transcoded);
// Step 4: Complete
await job.updateProgress(100);
await job.log('Complete!');
return { url, format };
},
{ connection, concurrency: 2 },
);
// Listen for progress on the client side
import { QueueEvents } from 'bullmq';
const queueEvents = new QueueEvents('video', { connection });
queueEvents.on('progress', ({ jobId, data }) => {
console.log(`Job ${jobId} progress: ${data}%`);
});
queueEvents.on('completed', ({ jobId, returnvalue }) => {
console.log(`Job ${jobId} completed:`, returnvalue);
});
// API route — check job status
export async function GET(req: Request) {
const jobId = new URL(req.url).searchParams.get('id');
if (!jobId) return Response.json({ error: 'Missing id' }, { status: 400 });
const job = await videoQueue.getJob(jobId);
if (!job) return Response.json({ error: 'Not found' }, { status: 404 });
const state = await job.getState();
const progress = job.progress;
const logs = await videoQueue.getJobLogs(jobId);
return Response.json({ state, progress, logs: logs.logs, result: job.returnvalue });
}Fix 3: Retry Strategies
// Per-job retry configuration
await queue.add('critical-task', data, {
attempts: 5,
backoff: {
type: 'exponential',
delay: 1000, // 1s, 2s, 4s, 8s, 16s
},
});
// Fixed delay retries
await queue.add('api-call', data, {
attempts: 3,
backoff: {
type: 'fixed',
delay: 5000, // Always wait 5 seconds between retries
},
});
// Custom backoff strategy
await queue.add('smart-retry', data, {
attempts: 4,
backoff: {
type: 'custom',
},
});
// In worker — custom backoff logic
const worker = new Worker('smart-retry', processor, {
connection,
settings: {
backoffStrategy: (attemptsMade: number, type: string, err: Error) => {
// Rate limited — wait longer
if (err.message.includes('429')) {
return 60000; // 1 minute
}
// Server error — exponential
if (err.message.includes('500')) {
return Math.min(2 ** attemptsMade * 1000, 30000);
}
// Default
return 2000;
},
},
});Fix 4: Job Priorities and Flows
// Priority queues — lower number = higher priority
await queue.add('urgent', data, { priority: 1 }); // Processed first
await queue.add('normal', data, { priority: 5 });
await queue.add('low', data, { priority: 10 }); // Processed last
// Job flows — parent-child dependencies
import { FlowProducer } from 'bullmq';
const flow = new FlowProducer({ connection });
// Parent job waits for all children to complete
await flow.add({
name: 'generate-report',
queueName: 'report',
data: { reportId: 'monthly-2026-03' },
children: [
{
name: 'fetch-sales-data',
queueName: 'data',
data: { source: 'sales', month: '2026-03' },
},
{
name: 'fetch-user-data',
queueName: 'data',
data: { source: 'users', month: '2026-03' },
},
{
name: 'fetch-analytics',
queueName: 'data',
data: { source: 'analytics', month: '2026-03' },
},
],
});
// The 'generate-report' job only runs after all 3 children complete
// Access children results in the parent worker:
const reportWorker = new Worker('report', async (job) => {
const childrenValues = await job.getChildrenValues();
// childrenValues = { 'data:fetch-sales-data:123': {...}, ... }
return generateReport(childrenValues);
}, { connection });Fix 5: Dashboard Monitoring
npm install @bull-board/api @bull-board/express
# Or for Next.js: npm install @bull-board/api @bull-board/next// Express dashboard
import { createBullBoard } from '@bull-board/api';
import { BullMQAdapter } from '@bull-board/api/bullMQAdapter';
import { ExpressAdapter } from '@bull-board/express';
import express from 'express';
const serverAdapter = new ExpressAdapter();
serverAdapter.setBasePath('/admin/queues');
createBullBoard({
queues: [
new BullMQAdapter(emailQueue),
new BullMQAdapter(reportQueue),
new BullMQAdapter(videoQueue),
],
serverAdapter,
});
const app = express();
app.use('/admin/queues', serverAdapter.getRouter());
app.listen(3001, () => console.log('Bull Board on http://localhost:3001/admin/queues'));Fix 6: Graceful Shutdown
// Proper cleanup on process exit
async function shutdown() {
console.log('Shutting down workers...');
// Close workers — wait for current jobs to finish
await emailWorker.close();
await reportWorker.close();
// Close queues
await emailQueue.close();
await reportQueue.close();
// Close Redis connection
await connection.quit();
console.log('Shutdown complete');
process.exit(0);
}
process.on('SIGTERM', shutdown);
process.on('SIGINT', shutdown);Still Not Working?
Jobs added but never process — the worker isn’t running or uses a different queue name. Verify new Worker('email', ...) matches new Queue('email') exactly. Also check that the worker process is actually started — it’s a separate script, not automatically spawned by the queue.
“ECONNREFUSED 127.0.0.1:6379” — Redis isn’t running. Start Redis: redis-server (local) or configure the connection for your Redis provider (Upstash, Redis Cloud, etc.). Set maxRetriesPerRequest: null in the IORedis config — BullMQ requires this.
Jobs fail with no error message — add worker.on('failed', (job, error) => console.error(error)) to see failure reasons. Also check worker.on('error', ...) for connection-level errors. Without event listeners, failures are silent.
Stalled jobs reprocess unexpectedly — a job is “stalled” when the worker stops sending heartbeats (crash, timeout). BullMQ automatically retries stalled jobs. Increase stalledInterval if your jobs take a long time, or ensure long-running jobs call job.updateProgress() periodically to keep the heartbeat alive.
For related backend issues, see Fix: Upstash Not Working and Fix: Inngest Not Working.
Solo developer based in Japan. Every solution is cross-referenced with official documentation and tested before publishing.
Was this article helpful?
Related Articles
Fix: Fastify Not Working — 404, Plugin Encapsulation, and Schema Validation Errors
How to fix Fastify issues — route 404 from plugin encapsulation, reply already sent, FST_ERR_VALIDATION, request.body undefined, @fastify/cors, hooks not running, and TypeScript type inference.
Fix: Better Auth Not Working — Login Failing, Session Null, or OAuth Callback Error
How to fix Better Auth issues — server and client setup, email/password and OAuth providers, session management, middleware protection, database adapters, and plugin configuration.
Fix: GraphQL Yoga Not Working — Schema Errors, Resolvers Not Executing, or Subscriptions Failing
How to fix GraphQL Yoga issues — schema definition, resolver patterns, context and authentication, file uploads, subscriptions with SSE, error handling, and Next.js integration.
Fix: LangChain.js Not Working — Chain Not Executing, Retriever Returning Empty, or Memory Lost Between Calls
How to fix LangChain.js issues — model setup, prompt templates, RAG with vector stores, conversational memory, structured output, tool agents, and streaming with LangChain Expression Language.