Queues
Stratal provides a built-in queue system backed by Cloudflare Queues. The QueueModule gives you a provider-based abstraction over queue operations with type-safe queue names, automatic message enrichment, and consumer routing by message type.
1. Configure your queue binding
Section titled “1. Configure your queue binding”Add a queue binding to your wrangler.jsonc:
{ "queues": { "producers": [ { "binding": "NOTIFICATIONS_QUEUE", "queue": "notifications-queue" } ], "consumers": [ { "queue": "notifications-queue", "max_retries": 3, "max_batch_timeout": 30, "max_batch_size": 10, "dead_letter_queue": "notifications-queue-dlq" } ] }}The binding name must be the UPPER_SNAKE_CASE version of the queue name. Stratal converts kebab-case queue names automatically (for example, notifications-queue resolves to the NOTIFICATIONS_QUEUE binding).
2. Register the QueueModule
Section titled “2. Register the QueueModule”Import QueueModule in your root module and call forRootAsync to configure the provider, then call registerQueue for each queue your application uses:
import { Module } from 'stratal/module'import { DI_TOKENS } from 'stratal/di'import type { StratalEnv } from 'stratal'import { QueueModule } from 'stratal/queue'
@Module({ imports: [ QueueModule.forRootAsync({ inject: [DI_TOKENS.CloudflareEnv], useFactory: (env: StratalEnv) => ({ provider: env.ENVIRONMENT === 'test' ? 'sync' : 'cloudflare', }), }), QueueModule.registerQueue('notifications-queue'), ],})export class AppModule {}The provider option accepts 'cloudflare' for production or 'sync' for testing. When no options are provided, it defaults to 'cloudflare'.
registerQueue creates a DI binding that maps the queue name to an IQueueSender instance, so you can inject it anywhere in your application.
Module options
Section titled “Module options”The factory returns a QueueModuleOptions object:
interface QueueModuleOptions { provider: 'cloudflare' | 'sync' store?: { binding?: string // KV namespace binding name. Default: 'CACHE' } idempotency?: { ttl?: number // Seconds a processed idempotency key is remembered. Default: 86400 (24h) } failedJobs?: { retention?: number // Age (seconds) past which FailedJobCleanupJob deletes a failed job. Default: 604800 (7d) } maxRetries?: number // Attempts before a message is stored as a failed job. Default: 3}KV store binding
Section titled “KV store binding”The queue subsystem persists idempotency claims and failed jobs to a KV namespace. The store.binding option names that namespace and defaults to CACHE:
{ "kv_namespaces": [ { "binding": "CACHE", "id": "..." } ]}Override store.binding only if your KV namespace uses a different binding name. For the cloudflare provider, the binding is validated at app boot: if it is missing, QueueModule throws a QueueError during initialization rather than failing on every queue invocation. The sync provider processes inline and never touches the store, so it does not require the binding.
Type-safe queue names
Section titled “Type-safe queue names”By default, QueueName is typed as string. You can narrow it by augmenting the QueueNames interface so that QueueName becomes a union of the declared keys. Any call to registerQueue or @InjectQueue with an invalid name will then produce a type error.
The recommended approach is to derive queue names automatically from your Cloudflare.Env bindings rather than maintaining a separate manual list. A binding named NOTIFICATIONS_QUEUE produces the queue name notifications-queue, and ORDER_EVENTS_QUEUE produces order-events-queue. This way you only maintain bindings in one place - your Wrangler config.
See the Environment Typing - Typing queue bindings guide for the full setup, including the TypeScript utility types that perform this conversion.
Sending messages
Section titled “Sending messages”Inject a queue sender using the queue name as the injection token:
import { Transient } from 'stratal/di'import { InjectQueue, type IQueueSender } from 'stratal/queue'
@Transient()export class NotificationService { constructor( @InjectQueue('notifications-queue') private readonly queue: IQueueSender, ) {}
async sendWelcome(userId: string, email: string) { await this.queue.dispatch({ type: 'notification.welcome', payload: { userId, email }, }) }}The dispatch method accepts a DispatchMessage object (Omit<QueueMessage, 'id'>). You provide type, payload, and optionally metadata. The id is generated automatically:
| Field | Type | Description |
|---|---|---|
type | string | Message type used for consumer routing |
payload | T | The message payload (generic) |
metadata | object? | Optional metadata; locale, idempotencyKey, and binding are auto-populated if not provided |
Idempotency
Section titled “Idempotency”Cloudflare Queues deliver at least once, so a consumer can see the same message more than once. Stratal attaches an idempotency key to every dispatch and skips a message that has already been processed.
If you do not supply metadata.idempotencyKey, the sender derives one from a deterministic SHA-256 hash of the message type and payload. Key ordering in the payload does not matter: the hash uses a key-sorted serialization, so two semantically identical payloads produce the same key. Identical dispatches are therefore deduplicated automatically. Supply your own key to control deduplication explicitly:
// Auto-derived key (hash of type + payload)await this.queue.dispatch({ type: 'email.send', payload: { to: 'user@example.com', subject: 'Hello' },})
// Explicit idempotency keyawait this.queue.dispatch({ type: 'order.process', payload: { orderId: '123' }, metadata: { idempotencyKey: 'order:123' },})When a message is processed successfully, its key is recorded in the KV store and remembered for idempotency.ttl seconds (default 24 hours). A redelivered message whose key is still recorded is acknowledged without re-running the consumer.
Consuming messages
Section titled “Consuming messages”Create a consumer class that implements the IQueueConsumer interface:
import { Transient } from 'stratal/di'import type { IQueueConsumer, QueueMessage } from 'stratal/queue'
interface WelcomePayload { userId: string email: string}
@Transient()export class WelcomeConsumer implements IQueueConsumer<WelcomePayload> { readonly messageTypes = ['notification.welcome']
async handle(message: QueueMessage<WelcomePayload>) { const { userId, email } = message.payload // Send welcome email, create onboarding tasks, etc. }
async onError(error: Error, message: QueueMessage<WelcomePayload>) { console.error(`Failed to process welcome for ${message.payload.userId}`, error) }}The messageTypes array declares which message types this consumer handles. A consumer can listen to multiple types. Use the wildcard '*' to match all message types.
The onError method is optional. When handle throws, onError is called and the message is retried automatically.
Register consumers
Section titled “Register consumers”Add consumer classes to the consumers array in any module:
@Module({ consumers: [WelcomeConsumer],})export class NotificationsModule {}Stratal resolves each consumer from the DI container and indexes it by its declared message types. When a queue message arrives, every consumer whose messageTypes includes the message’s type will receive it.
Advanced consumer patterns
Section titled “Advanced consumer patterns”Multi-type consumer
Section titled “Multi-type consumer”A single consumer can handle multiple message types by listing them in its messageTypes array. Use a switch on message.type to branch logic:
import { Transient } from 'stratal/di'import type { IQueueConsumer, QueueMessage } from 'stratal/queue'
@Transient()export class OrderEventsConsumer implements IQueueConsumer { readonly messageTypes = ['order.created', 'order.updated', 'order.cancelled']
async handle(message: QueueMessage) { switch (message.type) { case 'order.created': await this.handleCreated(message) break case 'order.updated': await this.handleUpdated(message) break case 'order.cancelled': await this.handleCancelled(message) break } }
private async handleCreated(message: QueueMessage) { /* ... */ } private async handleUpdated(message: QueueMessage) { /* ... */ } private async handleCancelled(message: QueueMessage) { /* ... */ }}Wildcard consumer
Section titled “Wildcard consumer”Use '*' to match every message type. This is useful for cross-cutting concerns like audit logging:
import { Transient } from 'stratal/di'import type { IQueueConsumer, QueueMessage } from 'stratal/queue'
@Transient()export class AuditLogConsumer implements IQueueConsumer { readonly messageTypes = ['*']
async handle(message: QueueMessage) { await this.auditLog.record({ id: message.id, type: message.type, payload: message.payload, }) }}Message structure
Section titled “Message structure”Every queue message follows the QueueMessage interface:
interface QueueMessage<T = unknown> { id: string // Auto-generated UUID type: string // Routing key for consumers payload: T // Your message data metadata?: { locale?: string // Auto-set from i18n context idempotencyKey?: string // Auto-derived from type + payload if omitted binding?: string // Producer binding the message was dispatched through [key: string]: unknown }}When you call dispatch, the QueueSender enriches your message with:
- A unique
idviacrypto.randomUUID() - The current
localefrom the i18n service (if not already set in metadata) - An
idempotencyKeyderived from the messagetypeandpayload(if not already set in metadata) - The
bindingthe message was dispatched through, so a failed message can later be re-enqueued to the correct producer
See Idempotency for how the derived key deduplicates redelivered messages.
The sync provider
Section titled “The sync provider”The sync provider is designed for testing. Instead of sending messages to a Cloudflare Queue, it processes them synchronously by calling the matching consumer’s handle method directly:
QueueModule.forRootAsync({ inject: [DI_TOKENS.CloudflareEnv], useFactory: (env: StratalEnv) => ({ provider: env.ENVIRONMENT === 'test' ? 'sync' : 'cloudflare', }),})With the sync provider, dispatch blocks until the consumer finishes processing. If a consumer throws, the error propagates immediately, making it straightforward to assert on failures in tests.
Worker setup
Section titled “Worker setup”To handle queue messages in production, export a Stratal instance. The queue handler is built in:
import { Stratal } from 'stratal'import { AppModule } from './app.module'
export default new Stratal({ module: AppModule })When a batch of messages arrives from Cloudflare Queues, Stratal calls app.handleQueue(batch, queueName), which:
- Extracts the locale from the first message’s metadata for i18n context.
- Creates a request scope so consumers can use request-scoped dependencies.
- Iterates over each message in the batch.
- Finds matching consumers by message type.
- Calls
handleon each consumer. On success, the message is acknowledged and its idempotency key is recorded. On failure,onErroris called and the message is retried untilmaxRetriesis exhausted, after which it is recorded as a failed job.
Failed job management
Section titled “Failed job management”When a consumer keeps throwing past the configured maxRetries (default 3), Stratal stops retrying, acknowledges the message so Cloudflare does not redeliver it forever, and persists it to the KV store as a FailedJob. Failed jobs are kept indefinitely until you retry or purge them, so you can inspect and replay them later.
A failed job captures the original message plus failure context:
interface FailedJob { id: string // The message id queue: string // Cloudflare queue name the message was consumed from binding: string // Producer binding to re-enqueue through on retry type: string consumer: string // Class name of the consumer that failed attempts: number failedAt: string // ISO timestamp message: QueueMessage error: { name: string; message: string; stack?: string }}Inspecting and replaying failed jobs
Section titled “Inspecting and replaying failed jobs”Use the Quarry CLI to work with failed jobs from the command line:
# List failed jobs (default limit: 50)npx quarry queue:failednpx quarry queue:failed --queue=NOTIFICATIONS_QUEUE --limit=100
# Retry a single job by message id, or all of themnpx quarry queue:retry <message-id>npx quarry queue:retry --allnpx quarry queue:retry --all --queue=NOTIFICATIONS_QUEUE
# Delete a single failed job, or all of themnpx quarry queue:purge <message-id>npx quarry queue:purge --allnpx quarry queue:purge --all --queue=NOTIFICATIONS_QUEUERetrying re-enqueues the stored message through the producer binding it was originally dispatched from.
Automatic cleanup (opt-in cron)
Section titled “Automatic cleanup (opt-in cron)”Failed jobs persist indefinitely, so unbounded failures can grow KV usage over time. To bound this, register the FailedJobCleanupJob cron, which deletes failed jobs older than failedJobs.retention (default 7 days). It is not registered for you:
import { FailedJobCleanupJob, failedJobCleanupJob, QueueModule } from 'stratal/queue'
@Module({ imports: [ QueueModule.forRootAsync({ inject: [DI_TOKENS.CloudflareEnv], useFactory: () => ({ provider: 'cloudflare', failedJobs: { retention: 1209600 }, // 14 days }), }), ], jobs: [FailedJobCleanupJob], // daily at 00:00 UTC // ...or a custom schedule: jobs: [failedJobCleanupJob('0 3 * * 0')]})export class AppModule {}FailedJobCleanupJob runs daily at 00:00 UTC. Use failedJobCleanupJob(schedule) to bind it to a different cron expression. Either way, add a matching trigger to wrangler.jsonc ("0 0 * * *" for the default schedule). See Cron Jobs for how the jobs array and scheduling work.
Programmatic access
Section titled “Programmatic access”For custom tooling, inject the QueueStore via the QUEUE_TOKENS.QueueStore token:
import { Transient, inject } from 'stratal/di'import { QUEUE_TOKENS, type QueueStore } from 'stratal/queue'
@Transient()export class FailedJobInspector { constructor( @inject(QUEUE_TOKENS.QueueStore) private readonly store: QueueStore, ) {}
async inspect() { const { keys, cursor } = await this.store.listFailedJobs({ limit: 50 }) const job = await this.store.getFailedJob(keys[0].id) await this.store.removeFailedJob(keys[0].id) await this.store.purgeFailedJobs() }}Testing
Section titled “Testing”The sync provider processes messages synchronously, so you can write integration tests that dispatch a message and immediately assert on the consumer’s behavior. Use @stratal/testing to create a test module with the sync provider:
import { Test, type TestingModule } from '@stratal/testing'import { DI_TOKENS, Transient } from 'stratal/di'import { QueueModule, QUEUE_TOKENS } from 'stratal/queue'import type { IQueueConsumer, ConsumerRegistry, QueueRegistry } from 'stratal/queue'
describe('Queue Integration', () => { let module: TestingModule
beforeAll(async () => { module = await Test.createTestingModule({ imports: [ QueueModule.forRootAsync({ useFactory: () => ({ provider: 'sync' }), }), QueueModule.registerQueue('notifications-queue'), ], }).compile() })
it('should process messages immediately', async () => { const handleFn = vi.fn().mockResolvedValue(undefined)
@Transient() class TestConsumer implements IQueueConsumer { readonly messageTypes = ['user.created'] handle = handleFn }
// Register the consumer class with the message types it handles. // A fresh instance is resolved from the container per message. const registry = module.get<ConsumerRegistry>(DI_TOKENS.ConsumerRegistry) registry.register(TestConsumer, ['user.created'])
// Dispatch within a request scope await module.runInRequestScope(async () => { const queueRegistry = module.container.resolve<QueueRegistry>( QUEUE_TOKENS.QueueRegistry, ) const sender = queueRegistry.getQueue('notifications-queue')
await sender.dispatch({ type: 'user.created', payload: { userId: '123' }, }) })
expect(handleFn).toHaveBeenCalledWith( expect.objectContaining({ type: 'user.created', payload: { userId: '123' }, }), ) })})Because the sync provider calls consumers inline, any error thrown in handle propagates directly to the dispatch call, making failure scenarios easy to test.
Best practices
Section titled “Best practices”- Use descriptive message types. Prefer hierarchical names like
user.createdororder.shippedover vague names likecreatedorupdate. - Keep payloads minimal. Dispatch only IDs or references, then fetch fresh data inside the consumer. This avoids stale-data issues and keeps queue messages small.
- Design for idempotency. Stratal deduplicates on the idempotency key, but delivery is best-effort, not exactly-once. Keep handler side effects idempotent, and pass an explicit
metadata.idempotencyKeywhen the default hash oftypepluspayloadis too coarse or too fine. - Separate queues for different domains. Use dedicated queues (
notifications-queue,analytics-queue,email-queue) rather than routing everything through a single global queue.
Inspecting consumers
Section titled “Inspecting consumers”Use the queue:list Quarry CLI command to see all registered queue consumers in your application:
npx quarry queue:listThis outputs a table with the following columns:
| Column | Description |
|---|---|
| Consumer | The consumer class name |
| Message Types | Comma-separated list of message types the consumer handles |
Next steps
Section titled “Next steps”- Modules to learn how the
consumersarray fits into module configuration. - Email to see how the email integration uses queues for async dispatch.
- Cron Jobs for scheduled task execution, including the
FailedJobCleanupJob. - Quarry CLI for the
queue:failed,queue:retry, andqueue:purgecommands.