Skip to content

Queues

Stratal provides a built-in queue system backed by Cloudflare Queues. The QueueModule gives you a provider-based abstraction over queue operations with type-safe queue names, automatic message enrichment, and consumer routing by message type.

Add a queue binding to your wrangler.jsonc:

{
"queues": {
"producers": [
{
"binding": "NOTIFICATIONS_QUEUE",
"queue": "notifications-queue"
}
],
"consumers": [
{
"queue": "notifications-queue",
"max_retries": 3,
"max_batch_timeout": 30,
"max_batch_size": 10,
"dead_letter_queue": "notifications-queue-dlq"
}
]
}
}

The binding name must be the UPPER_SNAKE_CASE version of the queue name. Stratal converts kebab-case queue names automatically (for example, notifications-queue resolves to the NOTIFICATIONS_QUEUE binding).

Import QueueModule in your root module and call forRootAsync to configure the provider, then call registerQueue for each queue your application uses:

import { Module } from 'stratal/module'
import { DI_TOKENS } from 'stratal/di'
import type { StratalEnv } from 'stratal'
import { QueueModule } from 'stratal/queue'
@Module({
imports: [
QueueModule.forRootAsync({
inject: [DI_TOKENS.CloudflareEnv],
useFactory: (env: StratalEnv) => ({
provider: env.ENVIRONMENT === 'test' ? 'sync' : 'cloudflare',
}),
}),
QueueModule.registerQueue('notifications-queue'),
],
})
export class AppModule {}

The provider option accepts 'cloudflare' for production or 'sync' for testing. When no options are provided, it defaults to 'cloudflare'.

registerQueue creates a DI binding that maps the queue name to an IQueueSender instance, so you can inject it anywhere in your application.

The factory returns a QueueModuleOptions object:

interface QueueModuleOptions {
provider: 'cloudflare' | 'sync'
store?: {
binding?: string // KV namespace binding name. Default: 'CACHE'
}
idempotency?: {
ttl?: number // Seconds a processed idempotency key is remembered. Default: 86400 (24h)
}
failedJobs?: {
retention?: number // Age (seconds) past which FailedJobCleanupJob deletes a failed job. Default: 604800 (7d)
}
maxRetries?: number // Attempts before a message is stored as a failed job. Default: 3
}

The queue subsystem persists idempotency claims and failed jobs to a KV namespace. The store.binding option names that namespace and defaults to CACHE:

{
"kv_namespaces": [
{ "binding": "CACHE", "id": "..." }
]
}

Override store.binding only if your KV namespace uses a different binding name. For the cloudflare provider, the binding is validated at app boot: if it is missing, QueueModule throws a QueueError during initialization rather than failing on every queue invocation. The sync provider processes inline and never touches the store, so it does not require the binding.

By default, QueueName is typed as string. You can narrow it by augmenting the QueueNames interface so that QueueName becomes a union of the declared keys. Any call to registerQueue or @InjectQueue with an invalid name will then produce a type error.

The recommended approach is to derive queue names automatically from your Cloudflare.Env bindings rather than maintaining a separate manual list. A binding named NOTIFICATIONS_QUEUE produces the queue name notifications-queue, and ORDER_EVENTS_QUEUE produces order-events-queue. This way you only maintain bindings in one place - your Wrangler config.

See the Environment Typing - Typing queue bindings guide for the full setup, including the TypeScript utility types that perform this conversion.

Inject a queue sender using the queue name as the injection token:

import { Transient } from 'stratal/di'
import { InjectQueue, type IQueueSender } from 'stratal/queue'
@Transient()
export class NotificationService {
constructor(
@InjectQueue('notifications-queue') private readonly queue: IQueueSender,
) {}
async sendWelcome(userId: string, email: string) {
await this.queue.dispatch({
type: 'notification.welcome',
payload: { userId, email },
})
}
}

The dispatch method accepts a DispatchMessage object (Omit<QueueMessage, 'id'>). You provide type, payload, and optionally metadata. The id is generated automatically:

FieldTypeDescription
typestringMessage type used for consumer routing
payloadTThe message payload (generic)
metadataobject?Optional metadata; locale, idempotencyKey, and binding are auto-populated if not provided

Cloudflare Queues deliver at least once, so a consumer can see the same message more than once. Stratal attaches an idempotency key to every dispatch and skips a message that has already been processed.

If you do not supply metadata.idempotencyKey, the sender derives one from a deterministic SHA-256 hash of the message type and payload. Key ordering in the payload does not matter: the hash uses a key-sorted serialization, so two semantically identical payloads produce the same key. Identical dispatches are therefore deduplicated automatically. Supply your own key to control deduplication explicitly:

// Auto-derived key (hash of type + payload)
await this.queue.dispatch({
type: 'email.send',
payload: { to: 'user@example.com', subject: 'Hello' },
})
// Explicit idempotency key
await this.queue.dispatch({
type: 'order.process',
payload: { orderId: '123' },
metadata: { idempotencyKey: 'order:123' },
})

When a message is processed successfully, its key is recorded in the KV store and remembered for idempotency.ttl seconds (default 24 hours). A redelivered message whose key is still recorded is acknowledged without re-running the consumer.

Create a consumer class that implements the IQueueConsumer interface:

import { Transient } from 'stratal/di'
import type { IQueueConsumer, QueueMessage } from 'stratal/queue'
interface WelcomePayload {
userId: string
email: string
}
@Transient()
export class WelcomeConsumer implements IQueueConsumer<WelcomePayload> {
readonly messageTypes = ['notification.welcome']
async handle(message: QueueMessage<WelcomePayload>) {
const { userId, email } = message.payload
// Send welcome email, create onboarding tasks, etc.
}
async onError(error: Error, message: QueueMessage<WelcomePayload>) {
console.error(`Failed to process welcome for ${message.payload.userId}`, error)
}
}

The messageTypes array declares which message types this consumer handles. A consumer can listen to multiple types. Use the wildcard '*' to match all message types.

The onError method is optional. When handle throws, onError is called and the message is retried automatically.

Add consumer classes to the consumers array in any module:

@Module({
consumers: [WelcomeConsumer],
})
export class NotificationsModule {}

Stratal resolves each consumer from the DI container and indexes it by its declared message types. When a queue message arrives, every consumer whose messageTypes includes the message’s type will receive it.

A single consumer can handle multiple message types by listing them in its messageTypes array. Use a switch on message.type to branch logic:

import { Transient } from 'stratal/di'
import type { IQueueConsumer, QueueMessage } from 'stratal/queue'
@Transient()
export class OrderEventsConsumer implements IQueueConsumer {
readonly messageTypes = ['order.created', 'order.updated', 'order.cancelled']
async handle(message: QueueMessage) {
switch (message.type) {
case 'order.created':
await this.handleCreated(message)
break
case 'order.updated':
await this.handleUpdated(message)
break
case 'order.cancelled':
await this.handleCancelled(message)
break
}
}
private async handleCreated(message: QueueMessage) { /* ... */ }
private async handleUpdated(message: QueueMessage) { /* ... */ }
private async handleCancelled(message: QueueMessage) { /* ... */ }
}

Use '*' to match every message type. This is useful for cross-cutting concerns like audit logging:

import { Transient } from 'stratal/di'
import type { IQueueConsumer, QueueMessage } from 'stratal/queue'
@Transient()
export class AuditLogConsumer implements IQueueConsumer {
readonly messageTypes = ['*']
async handle(message: QueueMessage) {
await this.auditLog.record({
id: message.id,
type: message.type,
payload: message.payload,
})
}
}

Every queue message follows the QueueMessage interface:

interface QueueMessage<T = unknown> {
id: string // Auto-generated UUID
type: string // Routing key for consumers
payload: T // Your message data
metadata?: {
locale?: string // Auto-set from i18n context
idempotencyKey?: string // Auto-derived from type + payload if omitted
binding?: string // Producer binding the message was dispatched through
[key: string]: unknown
}
}

When you call dispatch, the QueueSender enriches your message with:

  • A unique id via crypto.randomUUID()
  • The current locale from the i18n service (if not already set in metadata)
  • An idempotencyKey derived from the message type and payload (if not already set in metadata)
  • The binding the message was dispatched through, so a failed message can later be re-enqueued to the correct producer

See Idempotency for how the derived key deduplicates redelivered messages.

The sync provider is designed for testing. Instead of sending messages to a Cloudflare Queue, it processes them synchronously by calling the matching consumer’s handle method directly:

QueueModule.forRootAsync({
inject: [DI_TOKENS.CloudflareEnv],
useFactory: (env: StratalEnv) => ({
provider: env.ENVIRONMENT === 'test' ? 'sync' : 'cloudflare',
}),
})

With the sync provider, dispatch blocks until the consumer finishes processing. If a consumer throws, the error propagates immediately, making it straightforward to assert on failures in tests.

To handle queue messages in production, export a Stratal instance. The queue handler is built in:

import { Stratal } from 'stratal'
import { AppModule } from './app.module'
export default new Stratal({ module: AppModule })

When a batch of messages arrives from Cloudflare Queues, Stratal calls app.handleQueue(batch, queueName), which:

  1. Extracts the locale from the first message’s metadata for i18n context.
  2. Creates a request scope so consumers can use request-scoped dependencies.
  3. Iterates over each message in the batch.
  4. Finds matching consumers by message type.
  5. Calls handle on each consumer. On success, the message is acknowledged and its idempotency key is recorded. On failure, onError is called and the message is retried until maxRetries is exhausted, after which it is recorded as a failed job.

When a consumer keeps throwing past the configured maxRetries (default 3), Stratal stops retrying, acknowledges the message so Cloudflare does not redeliver it forever, and persists it to the KV store as a FailedJob. Failed jobs are kept indefinitely until you retry or purge them, so you can inspect and replay them later.

A failed job captures the original message plus failure context:

interface FailedJob {
id: string // The message id
queue: string // Cloudflare queue name the message was consumed from
binding: string // Producer binding to re-enqueue through on retry
type: string
consumer: string // Class name of the consumer that failed
attempts: number
failedAt: string // ISO timestamp
message: QueueMessage
error: { name: string; message: string; stack?: string }
}

Use the Quarry CLI to work with failed jobs from the command line:

Terminal window
# List failed jobs (default limit: 50)
npx quarry queue:failed
npx quarry queue:failed --queue=NOTIFICATIONS_QUEUE --limit=100
# Retry a single job by message id, or all of them
npx quarry queue:retry <message-id>
npx quarry queue:retry --all
npx quarry queue:retry --all --queue=NOTIFICATIONS_QUEUE
# Delete a single failed job, or all of them
npx quarry queue:purge <message-id>
npx quarry queue:purge --all
npx quarry queue:purge --all --queue=NOTIFICATIONS_QUEUE

Retrying re-enqueues the stored message through the producer binding it was originally dispatched from.

Failed jobs persist indefinitely, so unbounded failures can grow KV usage over time. To bound this, register the FailedJobCleanupJob cron, which deletes failed jobs older than failedJobs.retention (default 7 days). It is not registered for you:

import { FailedJobCleanupJob, failedJobCleanupJob, QueueModule } from 'stratal/queue'
@Module({
imports: [
QueueModule.forRootAsync({
inject: [DI_TOKENS.CloudflareEnv],
useFactory: () => ({
provider: 'cloudflare',
failedJobs: { retention: 1209600 }, // 14 days
}),
}),
],
jobs: [FailedJobCleanupJob], // daily at 00:00 UTC
// ...or a custom schedule: jobs: [failedJobCleanupJob('0 3 * * 0')]
})
export class AppModule {}

FailedJobCleanupJob runs daily at 00:00 UTC. Use failedJobCleanupJob(schedule) to bind it to a different cron expression. Either way, add a matching trigger to wrangler.jsonc ("0 0 * * *" for the default schedule). See Cron Jobs for how the jobs array and scheduling work.

For custom tooling, inject the QueueStore via the QUEUE_TOKENS.QueueStore token:

import { Transient, inject } from 'stratal/di'
import { QUEUE_TOKENS, type QueueStore } from 'stratal/queue'
@Transient()
export class FailedJobInspector {
constructor(
@inject(QUEUE_TOKENS.QueueStore) private readonly store: QueueStore,
) {}
async inspect() {
const { keys, cursor } = await this.store.listFailedJobs({ limit: 50 })
const job = await this.store.getFailedJob(keys[0].id)
await this.store.removeFailedJob(keys[0].id)
await this.store.purgeFailedJobs()
}
}

The sync provider processes messages synchronously, so you can write integration tests that dispatch a message and immediately assert on the consumer’s behavior. Use @stratal/testing to create a test module with the sync provider:

import { Test, type TestingModule } from '@stratal/testing'
import { DI_TOKENS, Transient } from 'stratal/di'
import { QueueModule, QUEUE_TOKENS } from 'stratal/queue'
import type { IQueueConsumer, ConsumerRegistry, QueueRegistry } from 'stratal/queue'
describe('Queue Integration', () => {
let module: TestingModule
beforeAll(async () => {
module = await Test.createTestingModule({
imports: [
QueueModule.forRootAsync({
useFactory: () => ({ provider: 'sync' }),
}),
QueueModule.registerQueue('notifications-queue'),
],
}).compile()
})
it('should process messages immediately', async () => {
const handleFn = vi.fn().mockResolvedValue(undefined)
@Transient()
class TestConsumer implements IQueueConsumer {
readonly messageTypes = ['user.created']
handle = handleFn
}
// Register the consumer class with the message types it handles.
// A fresh instance is resolved from the container per message.
const registry = module.get<ConsumerRegistry>(DI_TOKENS.ConsumerRegistry)
registry.register(TestConsumer, ['user.created'])
// Dispatch within a request scope
await module.runInRequestScope(async () => {
const queueRegistry = module.container.resolve<QueueRegistry>(
QUEUE_TOKENS.QueueRegistry,
)
const sender = queueRegistry.getQueue('notifications-queue')
await sender.dispatch({
type: 'user.created',
payload: { userId: '123' },
})
})
expect(handleFn).toHaveBeenCalledWith(
expect.objectContaining({
type: 'user.created',
payload: { userId: '123' },
}),
)
})
})

Because the sync provider calls consumers inline, any error thrown in handle propagates directly to the dispatch call, making failure scenarios easy to test.

  • Use descriptive message types. Prefer hierarchical names like user.created or order.shipped over vague names like created or update.
  • Keep payloads minimal. Dispatch only IDs or references, then fetch fresh data inside the consumer. This avoids stale-data issues and keeps queue messages small.
  • Design for idempotency. Stratal deduplicates on the idempotency key, but delivery is best-effort, not exactly-once. Keep handler side effects idempotent, and pass an explicit metadata.idempotencyKey when the default hash of type plus payload is too coarse or too fine.
  • Separate queues for different domains. Use dedicated queues (notifications-queue, analytics-queue, email-queue) rather than routing everything through a single global queue.

Use the queue:list Quarry CLI command to see all registered queue consumers in your application:

Terminal window
npx quarry queue:list

This outputs a table with the following columns:

ColumnDescription
ConsumerThe consumer class name
Message TypesComma-separated list of message types the consumer handles
  • Modules to learn how the consumers array fits into module configuration.
  • Email to see how the email integration uses queues for async dispatch.
  • Cron Jobs for scheduled task execution, including the FailedJobCleanupJob.
  • Quarry CLI for the queue:failed, queue:retry, and queue:purge commands.