Queues
Stratal provides a built-in queue system backed by Cloudflare Queues. The QueueModule gives you a provider-based abstraction over queue operations with type-safe queue names, automatic message enrichment, and consumer routing by message type.
1. Configure your queue binding
Section titled “1. Configure your queue binding”Add a queue binding to your wrangler.jsonc:
{ "queues": { "producers": [ { "binding": "NOTIFICATIONS_QUEUE", "queue": "notifications-queue" } ], "consumers": [ { "queue": "notifications-queue", "max_retries": 3, "max_batch_timeout": 30, "max_batch_size": 10, "dead_letter_queue": "notifications-queue-dlq" } ] }}The binding name must be the UPPER_SNAKE_CASE version of the queue name. Stratal converts kebab-case queue names automatically (for example, notifications-queue resolves to the NOTIFICATIONS_QUEUE binding).
2. Register the QueueModule
Section titled “2. Register the QueueModule”Import QueueModule in your root module and call forRootAsync to configure the provider, then call registerQueue for each queue your application uses:
import { Module } from 'stratal/module'import { DI_TOKENS } from 'stratal/di'import type { StratalEnv } from 'stratal'import { QueueModule } from 'stratal/queue'
@Module({ imports: [ QueueModule.forRootAsync({ inject: [DI_TOKENS.CloudflareEnv], useFactory: (env: StratalEnv) => ({ provider: env.ENVIRONMENT === 'test' ? 'sync' : 'cloudflare', }), }), QueueModule.registerQueue('notifications-queue'), ],})export class AppModule {}The provider option accepts 'cloudflare' for production or 'sync' for testing. When no options are provided, it defaults to 'cloudflare'.
registerQueue creates a DI binding that maps the queue name to an IQueueSender instance, so you can inject it anywhere in your application.
Type-safe queue names
Section titled “Type-safe queue names”By default, QueueName is typed as string. You can narrow it by augmenting the QueueNames interface so that QueueName becomes a union of the declared keys. Any call to registerQueue or @InjectQueue with an invalid name will then produce a type error.
The recommended approach is to derive queue names automatically from your Cloudflare.Env bindings rather than maintaining a separate manual list. A binding named NOTIFICATIONS_QUEUE produces the queue name notifications-queue, and ORDER_EVENTS_QUEUE produces order-events-queue. This way you only maintain bindings in one place — your Wrangler config.
See the Environment Typing — Typing queue bindings guide for the full setup, including the TypeScript utility types that perform this conversion.
Sending messages
Section titled “Sending messages”Inject a queue sender using the queue name as the injection token:
import { Transient } from 'stratal/di'import { InjectQueue, type IQueueSender } from 'stratal/queue'
@Transient()export class NotificationService { constructor( @InjectQueue('notifications-queue') private readonly queue: IQueueSender, ) {}
async sendWelcome(userId: string, email: string) { await this.queue.dispatch({ type: 'notification.welcome', payload: { userId, email }, }) }}The dispatch method accepts a DispatchMessage object. You provide type, payload, and optionally metadata. The id and timestamp fields are generated automatically:
| Field | Type | Description |
|---|---|---|
type | string | Message type used for consumer routing |
payload | T | The message payload (generic) |
metadata | object? | Optional metadata; locale is auto-set from the i18n context if not provided |
Consuming messages
Section titled “Consuming messages”Create a consumer class that implements the IQueueConsumer interface:
import { Transient } from 'stratal/di'import type { IQueueConsumer, QueueMessage } from 'stratal/queue'
interface WelcomePayload { userId: string email: string}
@Transient()export class WelcomeConsumer implements IQueueConsumer<WelcomePayload> { readonly messageTypes = ['notification.welcome']
async handle(message: QueueMessage<WelcomePayload>) { const { userId, email } = message.payload // Send welcome email, create onboarding tasks, etc. }
async onError(error: Error, message: QueueMessage<WelcomePayload>) { console.error(`Failed to process welcome for ${message.payload.userId}`, error) }}The messageTypes array declares which message types this consumer handles. A consumer can listen to multiple types. Use the wildcard '*' to match all message types.
The onError method is optional. When handle throws, onError is called and the message is retried automatically.
Register consumers
Section titled “Register consumers”Add consumer classes to the consumers array in any module:
@Module({ consumers: [WelcomeConsumer],})export class NotificationsModule {}Stratal resolves each consumer from the DI container and indexes it by its declared message types. When a queue message arrives, every consumer whose messageTypes includes the message’s type will receive it.
Advanced consumer patterns
Section titled “Advanced consumer patterns”Multi-type consumer
Section titled “Multi-type consumer”A single consumer can handle multiple message types by listing them in its messageTypes array. Use a switch on message.type to branch logic:
import { Transient } from 'stratal/di'import type { IQueueConsumer, QueueMessage } from 'stratal/queue'
@Transient()export class OrderEventsConsumer implements IQueueConsumer { readonly messageTypes = ['order.created', 'order.updated', 'order.cancelled']
async handle(message: QueueMessage) { switch (message.type) { case 'order.created': await this.handleCreated(message) break case 'order.updated': await this.handleUpdated(message) break case 'order.cancelled': await this.handleCancelled(message) break } }
private async handleCreated(message: QueueMessage) { /* ... */ } private async handleUpdated(message: QueueMessage) { /* ... */ } private async handleCancelled(message: QueueMessage) { /* ... */ }}Wildcard consumer
Section titled “Wildcard consumer”Use '*' to match every message type. This is useful for cross-cutting concerns like audit logging:
import { Transient } from 'stratal/di'import type { IQueueConsumer, QueueMessage } from 'stratal/queue'
@Transient()export class AuditLogConsumer implements IQueueConsumer { readonly messageTypes = ['*']
async handle(message: QueueMessage) { await this.auditLog.record({ type: message.type, payload: message.payload, timestamp: message.timestamp, }) }}Message structure
Section titled “Message structure”Every queue message follows the QueueMessage interface:
interface QueueMessage<T = unknown> { id: string // Auto-generated UUID timestamp: number // Auto-generated (ms since epoch) type: string // Routing key for consumers payload: T // Your message data metadata?: { locale?: string // Auto-set from i18n context [key: string]: unknown }}When you call dispatch, the QueueSender enriches your message with:
- A unique
idviacrypto.randomUUID() - A
timestampviaDate.now() - The current
localefrom the i18n service (if not already set in metadata)
The sync provider
Section titled “The sync provider”The sync provider is designed for testing. Instead of sending messages to a Cloudflare Queue, it processes them synchronously by calling the matching consumer’s handle method directly:
QueueModule.forRootAsync({ inject: [DI_TOKENS.CloudflareEnv], useFactory: (env: StratalEnv) => ({ provider: env.ENVIRONMENT === 'test' ? 'sync' : 'cloudflare', }),})With the sync provider, dispatch blocks until the consumer finishes processing. If a consumer throws, the error propagates immediately, making it straightforward to assert on failures in tests.
Worker setup
Section titled “Worker setup”To handle queue messages in production, export a Stratal instance. The queue handler is built in:
import { Stratal } from 'stratal'import { AppModule } from './app.module'
export default new Stratal({ module: AppModule })When a batch of messages arrives from Cloudflare Queues, Stratal calls app.handleQueue(batch, queueName), which:
- Extracts the locale from the first message’s metadata for i18n context.
- Creates a request scope so consumers can use request-scoped dependencies.
- Iterates over each message in the batch.
- Finds matching consumers by message type.
- Calls
handleon each consumer. On success, the message is acknowledged. On failure,onErroris called and the message is retried.
Testing
Section titled “Testing”The sync provider processes messages synchronously, so you can write integration tests that dispatch a message and immediately assert on the consumer’s behavior. Use @stratal/testing to create a test module with the sync provider:
import { Test, type TestingModule } from '@stratal/testing'import { DI_TOKENS } from 'stratal/di'import { QueueModule, QUEUE_TOKENS } from 'stratal/queue'import type { IQueueConsumer, ConsumerRegistry, QueueRegistry } from 'stratal/queue'
describe('Queue Integration', () => { let module: TestingModule
beforeAll(async () => { module = await Test.createTestingModule({ imports: [ QueueModule.forRootAsync({ useFactory: () => ({ provider: 'sync' }), }), QueueModule.registerQueue('notifications-queue'), ], }).compile() })
it('should process messages immediately', async () => { const handleFn = vi.fn().mockResolvedValue(undefined) const consumer: IQueueConsumer = { messageTypes: ['user.created'], handle: handleFn, }
// Register the mock consumer const registry = module.get<ConsumerRegistry>(DI_TOKENS.ConsumerRegistry) registry.register(consumer)
// Dispatch within a request scope await module.runInRequestScope(async () => { const queueRegistry = module.container.resolve<QueueRegistry>( QUEUE_TOKENS.QueueRegistry, ) const sender = queueRegistry.getQueue('notifications-queue')
await sender.dispatch({ type: 'user.created', payload: { userId: '123' }, }) })
expect(handleFn).toHaveBeenCalledWith( expect.objectContaining({ type: 'user.created', payload: { userId: '123' }, }), ) })})Because the sync provider calls consumers inline, any error thrown in handle propagates directly to the dispatch call, making failure scenarios easy to test.
Best practices
Section titled “Best practices”- Use descriptive message types. Prefer hierarchical names like
user.createdororder.shippedover vague names likecreatedorupdate. - Keep payloads minimal. Dispatch only IDs or references, then fetch fresh data inside the consumer. This avoids stale-data issues and keeps queue messages small.
- Design for idempotency. Messages may be delivered more than once. Check
message.idbefore processing to avoid duplicate side effects. - Separate queues for different domains. Use dedicated queues (
notifications-queue,analytics-queue,email-queue) rather than routing everything through a single global queue.