Skip to content

Fix: BullMQ Not Working — Jobs Not Processing, Workers Not Starting, or Redis Connection Failing

FixDevs ·

Quick Answer

How to fix BullMQ issues — queue and worker setup, Redis connection, job scheduling, retry strategies, concurrency, rate limiting, event listeners, and dashboard monitoring.

The Problem

Jobs are added to the queue but never process:

import { Queue, Worker } from 'bullmq';

const queue = new Queue('email');
await queue.add('send', { to: '[email protected]', subject: 'Hello' });
// Job sits in the queue forever — worker never picks it up

Or the worker starts but immediately crashes:

Error: connect ECONNREFUSED 127.0.0.1:6379

Or jobs process but fail silently:

Job completed with status "failed" but no error message visible

Why This Happens

BullMQ is a job queue library built on Redis. It requires a running Redis instance and separate queue/worker processes:

  • Queue and Worker are separate conceptsQueue adds jobs, Worker processes them. Creating a Queue without a Worker means jobs pile up with nothing to process them. They can run in different processes or even different servers.
  • Redis must be running and accessible — BullMQ stores all job data in Redis. Without a Redis connection, neither queue nor worker can operate. The default connection is localhost:6379.
  • Workers must import the same queue namenew Worker('email', ...) only processes jobs from the queue named 'email'. A typo or different name means the worker listens on an empty queue.
  • Job processors must handle errors — if the processor function throws and you don’t listen for 'failed' events, errors vanish silently. BullMQ retries failed jobs by default (3 attempts), then marks them as permanently failed.

Fix 1: Basic Queue and Worker Setup

npm install bullmq ioredis
// lib/queue.ts — shared connection and queue definition
import { Queue, Worker, QueueEvents } from 'bullmq';
import IORedis from 'ioredis';

// Shared Redis connection
const connection = new IORedis({
  host: process.env.REDIS_HOST || 'localhost',
  port: parseInt(process.env.REDIS_PORT || '6379'),
  password: process.env.REDIS_PASSWORD,
  maxRetriesPerRequest: null,  // Required for BullMQ
});

// Define queues
export const emailQueue = new Queue('email', { connection });
export const reportQueue = new Queue('report', { connection });

// Add a job
export async function sendEmail(to: string, subject: string, html: string) {
  return emailQueue.add('send-email', { to, subject, html }, {
    attempts: 3,
    backoff: {
      type: 'exponential',
      delay: 2000,  // 2s, 4s, 8s
    },
    removeOnComplete: { age: 3600 },  // Remove completed jobs after 1 hour
    removeOnFail: { age: 86400 },      // Keep failed jobs for 24 hours
  });
}

// Schedule a delayed job
export async function scheduleReminder(userId: string, message: string, delay: number) {
  return emailQueue.add('reminder', { userId, message }, {
    delay,  // milliseconds
  });
}

// Schedule recurring job (cron)
export async function setupRecurringJobs() {
  await reportQueue.upsertJobScheduler('daily-report', {
    pattern: '0 9 * * *',  // Every day at 9 AM
  }, {
    name: 'generate-daily-report',
    data: { type: 'daily' },
  });

  await reportQueue.upsertJobScheduler('weekly-cleanup', {
    pattern: '0 3 * * 0',  // Every Sunday at 3 AM
  }, {
    name: 'cleanup-old-data',
    data: { type: 'cleanup' },
  });
}
// workers/email.worker.ts — processes email jobs
import { Worker, Job } from 'bullmq';
import IORedis from 'ioredis';

const connection = new IORedis({
  host: process.env.REDIS_HOST || 'localhost',
  port: parseInt(process.env.REDIS_PORT || '6379'),
  password: process.env.REDIS_PASSWORD,
  maxRetriesPerRequest: null,
});

const emailWorker = new Worker(
  'email',
  async (job: Job) => {
    // Job processing logic
    switch (job.name) {
      case 'send-email': {
        const { to, subject, html } = job.data;
        console.log(`Sending email to ${to}: ${subject}`);

        await sendEmailViaProvider(to, subject, html);

        // Return value is stored as job result
        return { sent: true, to, timestamp: new Date().toISOString() };
      }
      case 'reminder': {
        const { userId, message } = job.data;
        await sendPushNotification(userId, message);
        return { notified: true };
      }
      default:
        throw new Error(`Unknown job name: ${job.name}`);
    }
  },
  {
    connection,
    concurrency: 5,  // Process up to 5 jobs simultaneously
    limiter: {
      max: 10,       // Max 10 jobs
      duration: 1000, // Per 1 second (rate limiting)
    },
  },
);

// Event listeners — critical for monitoring
emailWorker.on('completed', (job, result) => {
  console.log(`Job ${job.id} completed:`, result);
});

emailWorker.on('failed', (job, error) => {
  console.error(`Job ${job?.id} failed:`, error.message);
  // Alert on repeated failures
  if (job && job.attemptsMade >= job.opts.attempts!) {
    alertOncall(`Email job permanently failed: ${error.message}`);
  }
});

emailWorker.on('error', (error) => {
  console.error('Worker error:', error);
});

emailWorker.on('stalled', (jobId) => {
  console.warn(`Job ${jobId} stalled — will be reprocessed`);
});

console.log('Email worker started');

Fix 2: Job Progress and Events

// Worker with progress reporting
const videoWorker = new Worker(
  'video',
  async (job: Job) => {
    const { videoId, format } = job.data;
    const totalSteps = 4;

    // Step 1: Download
    await job.updateProgress(25);
    await job.log('Downloading video...');
    const file = await downloadVideo(videoId);

    // Step 2: Transcode
    await job.updateProgress(50);
    await job.log(`Transcoding to ${format}...`);
    const transcoded = await transcodeVideo(file, format);

    // Step 3: Upload
    await job.updateProgress(75);
    await job.log('Uploading result...');
    const url = await uploadToStorage(transcoded);

    // Step 4: Complete
    await job.updateProgress(100);
    await job.log('Complete!');

    return { url, format };
  },
  { connection, concurrency: 2 },
);

// Listen for progress on the client side
import { QueueEvents } from 'bullmq';

const queueEvents = new QueueEvents('video', { connection });

queueEvents.on('progress', ({ jobId, data }) => {
  console.log(`Job ${jobId} progress: ${data}%`);
});

queueEvents.on('completed', ({ jobId, returnvalue }) => {
  console.log(`Job ${jobId} completed:`, returnvalue);
});

// API route — check job status
export async function GET(req: Request) {
  const jobId = new URL(req.url).searchParams.get('id');
  if (!jobId) return Response.json({ error: 'Missing id' }, { status: 400 });

  const job = await videoQueue.getJob(jobId);
  if (!job) return Response.json({ error: 'Not found' }, { status: 404 });

  const state = await job.getState();
  const progress = job.progress;
  const logs = await videoQueue.getJobLogs(jobId);

  return Response.json({ state, progress, logs: logs.logs, result: job.returnvalue });
}

Fix 3: Retry Strategies

// Per-job retry configuration
await queue.add('critical-task', data, {
  attempts: 5,
  backoff: {
    type: 'exponential',
    delay: 1000,  // 1s, 2s, 4s, 8s, 16s
  },
});

// Fixed delay retries
await queue.add('api-call', data, {
  attempts: 3,
  backoff: {
    type: 'fixed',
    delay: 5000,  // Always wait 5 seconds between retries
  },
});

// Custom backoff strategy
await queue.add('smart-retry', data, {
  attempts: 4,
  backoff: {
    type: 'custom',
  },
});

// In worker — custom backoff logic
const worker = new Worker('smart-retry', processor, {
  connection,
  settings: {
    backoffStrategy: (attemptsMade: number, type: string, err: Error) => {
      // Rate limited — wait longer
      if (err.message.includes('429')) {
        return 60000;  // 1 minute
      }
      // Server error — exponential
      if (err.message.includes('500')) {
        return Math.min(2 ** attemptsMade * 1000, 30000);
      }
      // Default
      return 2000;
    },
  },
});

Fix 4: Job Priorities and Flows

// Priority queues — lower number = higher priority
await queue.add('urgent', data, { priority: 1 });   // Processed first
await queue.add('normal', data, { priority: 5 });
await queue.add('low', data, { priority: 10 });      // Processed last

// Job flows — parent-child dependencies
import { FlowProducer } from 'bullmq';

const flow = new FlowProducer({ connection });

// Parent job waits for all children to complete
await flow.add({
  name: 'generate-report',
  queueName: 'report',
  data: { reportId: 'monthly-2026-03' },
  children: [
    {
      name: 'fetch-sales-data',
      queueName: 'data',
      data: { source: 'sales', month: '2026-03' },
    },
    {
      name: 'fetch-user-data',
      queueName: 'data',
      data: { source: 'users', month: '2026-03' },
    },
    {
      name: 'fetch-analytics',
      queueName: 'data',
      data: { source: 'analytics', month: '2026-03' },
    },
  ],
});

// The 'generate-report' job only runs after all 3 children complete
// Access children results in the parent worker:
const reportWorker = new Worker('report', async (job) => {
  const childrenValues = await job.getChildrenValues();
  // childrenValues = { 'data:fetch-sales-data:123': {...}, ... }

  return generateReport(childrenValues);
}, { connection });

Fix 5: Dashboard Monitoring

npm install @bull-board/api @bull-board/express
# Or for Next.js: npm install @bull-board/api @bull-board/next
// Express dashboard
import { createBullBoard } from '@bull-board/api';
import { BullMQAdapter } from '@bull-board/api/bullMQAdapter';
import { ExpressAdapter } from '@bull-board/express';
import express from 'express';

const serverAdapter = new ExpressAdapter();
serverAdapter.setBasePath('/admin/queues');

createBullBoard({
  queues: [
    new BullMQAdapter(emailQueue),
    new BullMQAdapter(reportQueue),
    new BullMQAdapter(videoQueue),
  ],
  serverAdapter,
});

const app = express();
app.use('/admin/queues', serverAdapter.getRouter());
app.listen(3001, () => console.log('Bull Board on http://localhost:3001/admin/queues'));

Fix 6: Graceful Shutdown

// Proper cleanup on process exit
async function shutdown() {
  console.log('Shutting down workers...');

  // Close workers — wait for current jobs to finish
  await emailWorker.close();
  await reportWorker.close();

  // Close queues
  await emailQueue.close();
  await reportQueue.close();

  // Close Redis connection
  await connection.quit();

  console.log('Shutdown complete');
  process.exit(0);
}

process.on('SIGTERM', shutdown);
process.on('SIGINT', shutdown);

Still Not Working?

Jobs added but never process — the worker isn’t running or uses a different queue name. Verify new Worker('email', ...) matches new Queue('email') exactly. Also check that the worker process is actually started — it’s a separate script, not automatically spawned by the queue.

“ECONNREFUSED 127.0.0.1:6379” — Redis isn’t running. Start Redis: redis-server (local) or configure the connection for your Redis provider (Upstash, Redis Cloud, etc.). Set maxRetriesPerRequest: null in the IORedis config — BullMQ requires this.

Jobs fail with no error message — add worker.on('failed', (job, error) => console.error(error)) to see failure reasons. Also check worker.on('error', ...) for connection-level errors. Without event listeners, failures are silent.

Stalled jobs reprocess unexpectedly — a job is “stalled” when the worker stops sending heartbeats (crash, timeout). BullMQ automatically retries stalled jobs. Increase stalledInterval if your jobs take a long time, or ensure long-running jobs call job.updateProgress() periodically to keep the heartbeat alive.

For related backend issues, see Fix: Upstash Not Working and Fix: Inngest Not Working.

F

FixDevs

Solo developer based in Japan. Every solution is cross-referenced with official documentation and tested before publishing.

Was this article helpful?

Related Articles