Skip to content

Queues

Stratal provides a built-in queue system backed by Cloudflare Queues. The QueueModule gives you a provider-based abstraction over queue operations with type-safe queue names, automatic message enrichment, and consumer routing by message type.

Add a queue binding to your wrangler.jsonc:

{
"queues": {
"producers": [
{
"binding": "NOTIFICATIONS_QUEUE",
"queue": "notifications-queue"
}
],
"consumers": [
{
"queue": "notifications-queue",
"max_retries": 3,
"max_batch_timeout": 30,
"max_batch_size": 10,
"dead_letter_queue": "notifications-queue-dlq"
}
]
}
}

The binding name must be the UPPER_SNAKE_CASE version of the queue name. Stratal converts kebab-case queue names automatically (for example, notifications-queue resolves to the NOTIFICATIONS_QUEUE binding).

Import QueueModule in your root module and call forRootAsync to configure the provider, then call registerQueue for each queue your application uses:

import { Module } from 'stratal/module'
import { DI_TOKENS } from 'stratal/di'
import type { StratalEnv } from 'stratal'
import { QueueModule } from 'stratal/queue'
@Module({
imports: [
QueueModule.forRootAsync({
inject: [DI_TOKENS.CloudflareEnv],
useFactory: (env: StratalEnv) => ({
provider: env.ENVIRONMENT === 'test' ? 'sync' : 'cloudflare',
}),
}),
QueueModule.registerQueue('notifications-queue'),
],
})
export class AppModule {}

The provider option accepts 'cloudflare' for production or 'sync' for testing. When no options are provided, it defaults to 'cloudflare'.

registerQueue creates a DI binding that maps the queue name to an IQueueSender instance, so you can inject it anywhere in your application.

By default, QueueName is typed as string. You can narrow it by augmenting the QueueNames interface so that QueueName becomes a union of the declared keys. Any call to registerQueue or @InjectQueue with an invalid name will then produce a type error.

The recommended approach is to derive queue names automatically from your Cloudflare.Env bindings rather than maintaining a separate manual list. A binding named NOTIFICATIONS_QUEUE produces the queue name notifications-queue, and ORDER_EVENTS_QUEUE produces order-events-queue. This way you only maintain bindings in one place — your Wrangler config.

See the Environment Typing — Typing queue bindings guide for the full setup, including the TypeScript utility types that perform this conversion.

Inject a queue sender using the queue name as the injection token:

import { Transient } from 'stratal/di'
import { InjectQueue, type IQueueSender } from 'stratal/queue'
@Transient()
export class NotificationService {
constructor(
@InjectQueue('notifications-queue') private readonly queue: IQueueSender,
) {}
async sendWelcome(userId: string, email: string) {
await this.queue.dispatch({
type: 'notification.welcome',
payload: { userId, email },
})
}
}

The dispatch method accepts a DispatchMessage object. You provide type, payload, and optionally metadata. The id and timestamp fields are generated automatically:

FieldTypeDescription
typestringMessage type used for consumer routing
payloadTThe message payload (generic)
metadataobject?Optional metadata; locale is auto-set from the i18n context if not provided

Create a consumer class that implements the IQueueConsumer interface:

import { Transient } from 'stratal/di'
import type { IQueueConsumer, QueueMessage } from 'stratal/queue'
interface WelcomePayload {
userId: string
email: string
}
@Transient()
export class WelcomeConsumer implements IQueueConsumer<WelcomePayload> {
readonly messageTypes = ['notification.welcome']
async handle(message: QueueMessage<WelcomePayload>) {
const { userId, email } = message.payload
// Send welcome email, create onboarding tasks, etc.
}
async onError(error: Error, message: QueueMessage<WelcomePayload>) {
console.error(`Failed to process welcome for ${message.payload.userId}`, error)
}
}

The messageTypes array declares which message types this consumer handles. A consumer can listen to multiple types. Use the wildcard '*' to match all message types.

The onError method is optional. When handle throws, onError is called and the message is retried automatically.

Add consumer classes to the consumers array in any module:

@Module({
consumers: [WelcomeConsumer],
})
export class NotificationsModule {}

Stratal resolves each consumer from the DI container and indexes it by its declared message types. When a queue message arrives, every consumer whose messageTypes includes the message’s type will receive it.

A single consumer can handle multiple message types by listing them in its messageTypes array. Use a switch on message.type to branch logic:

import { Transient } from 'stratal/di'
import type { IQueueConsumer, QueueMessage } from 'stratal/queue'
@Transient()
export class OrderEventsConsumer implements IQueueConsumer {
readonly messageTypes = ['order.created', 'order.updated', 'order.cancelled']
async handle(message: QueueMessage) {
switch (message.type) {
case 'order.created':
await this.handleCreated(message)
break
case 'order.updated':
await this.handleUpdated(message)
break
case 'order.cancelled':
await this.handleCancelled(message)
break
}
}
private async handleCreated(message: QueueMessage) { /* ... */ }
private async handleUpdated(message: QueueMessage) { /* ... */ }
private async handleCancelled(message: QueueMessage) { /* ... */ }
}

Use '*' to match every message type. This is useful for cross-cutting concerns like audit logging:

import { Transient } from 'stratal/di'
import type { IQueueConsumer, QueueMessage } from 'stratal/queue'
@Transient()
export class AuditLogConsumer implements IQueueConsumer {
readonly messageTypes = ['*']
async handle(message: QueueMessage) {
await this.auditLog.record({
type: message.type,
payload: message.payload,
timestamp: message.timestamp,
})
}
}

Every queue message follows the QueueMessage interface:

interface QueueMessage<T = unknown> {
id: string // Auto-generated UUID
timestamp: number // Auto-generated (ms since epoch)
type: string // Routing key for consumers
payload: T // Your message data
metadata?: {
locale?: string // Auto-set from i18n context
[key: string]: unknown
}
}

When you call dispatch, the QueueSender enriches your message with:

  • A unique id via crypto.randomUUID()
  • A timestamp via Date.now()
  • The current locale from the i18n service (if not already set in metadata)

The sync provider is designed for testing. Instead of sending messages to a Cloudflare Queue, it processes them synchronously by calling the matching consumer’s handle method directly:

QueueModule.forRootAsync({
inject: [DI_TOKENS.CloudflareEnv],
useFactory: (env: StratalEnv) => ({
provider: env.ENVIRONMENT === 'test' ? 'sync' : 'cloudflare',
}),
})

With the sync provider, dispatch blocks until the consumer finishes processing. If a consumer throws, the error propagates immediately, making it straightforward to assert on failures in tests.

To handle queue messages in production, export a Stratal instance. The queue handler is built in:

import { Stratal } from 'stratal'
import { AppModule } from './app.module'
export default new Stratal({ module: AppModule })

When a batch of messages arrives from Cloudflare Queues, Stratal calls app.handleQueue(batch, queueName), which:

  1. Extracts the locale from the first message’s metadata for i18n context.
  2. Creates a request scope so consumers can use request-scoped dependencies.
  3. Iterates over each message in the batch.
  4. Finds matching consumers by message type.
  5. Calls handle on each consumer. On success, the message is acknowledged. On failure, onError is called and the message is retried.

The sync provider processes messages synchronously, so you can write integration tests that dispatch a message and immediately assert on the consumer’s behavior. Use @stratal/testing to create a test module with the sync provider:

import { Test, type TestingModule } from '@stratal/testing'
import { DI_TOKENS } from 'stratal/di'
import { QueueModule, QUEUE_TOKENS } from 'stratal/queue'
import type { IQueueConsumer, ConsumerRegistry, QueueRegistry } from 'stratal/queue'
describe('Queue Integration', () => {
let module: TestingModule
beforeAll(async () => {
module = await Test.createTestingModule({
imports: [
QueueModule.forRootAsync({
useFactory: () => ({ provider: 'sync' }),
}),
QueueModule.registerQueue('notifications-queue'),
],
}).compile()
})
it('should process messages immediately', async () => {
const handleFn = vi.fn().mockResolvedValue(undefined)
const consumer: IQueueConsumer = {
messageTypes: ['user.created'],
handle: handleFn,
}
// Register the mock consumer
const registry = module.get<ConsumerRegistry>(DI_TOKENS.ConsumerRegistry)
registry.register(consumer)
// Dispatch within a request scope
await module.runInRequestScope(async () => {
const queueRegistry = module.container.resolve<QueueRegistry>(
QUEUE_TOKENS.QueueRegistry,
)
const sender = queueRegistry.getQueue('notifications-queue')
await sender.dispatch({
type: 'user.created',
payload: { userId: '123' },
})
})
expect(handleFn).toHaveBeenCalledWith(
expect.objectContaining({
type: 'user.created',
payload: { userId: '123' },
}),
)
})
})

Because the sync provider calls consumers inline, any error thrown in handle propagates directly to the dispatch call, making failure scenarios easy to test.

  • Use descriptive message types. Prefer hierarchical names like user.created or order.shipped over vague names like created or update.
  • Keep payloads minimal. Dispatch only IDs or references, then fetch fresh data inside the consumer. This avoids stale-data issues and keeps queue messages small.
  • Design for idempotency. Messages may be delivered more than once. Check message.id before processing to avoid duplicate side effects.
  • Separate queues for different domains. Use dedicated queues (notifications-queue, analytics-queue, email-queue) rather than routing everything through a single global queue.
  • Modules to learn how the consumers array fits into module configuration.
  • Email to see how the email integration uses queues for async dispatch.
  • Cron Jobs for scheduled task execution.