SSE Testing
Stratal’s testing package provides an SSE testing API that mirrors the HTTP and WebSocket testing patterns. Use module.sse() to build requests, connect to your streaming endpoints, and assert on received events.
Creating a connection
Section titled “Creating a connection”Access the SSE builder through module.sse() and call connect() to establish a connection:
const sse = await module.sse('/events').connect()await sse.assertEventData('hello')The connect() method sends an HTTP request to your application and returns a TestSseConnection that reads from the event stream.
Building the request
Section titled “Building the request”module.sse(path) returns a TestSseRequest builder that lets you configure the request before connecting:
| Method | Returns | Description |
|---|---|---|
withHeaders(headers) | this | Add custom headers to the request |
actingAs(user) | this | Authenticate the connection as a specific user |
connect() | Promise<TestSseConnection> | Send the request and return a live connection |
const sse = await module.sse('/events') .withHeaders({ 'X-Request-ID': 'test-123' }) .connect()Authenticated SSE
Section titled “Authenticated SSE”Use actingAs() to authenticate the SSE connection. This works the same as the HTTP testing actingAs() — it creates a session and attaches the auth headers to the request:
const sse = await module.sse('/events') .actingAs({ id: user.id }) .connect()TestSseConnection
Section titled “TestSseConnection”Once connected, TestSseConnection provides methods for waiting on events, asserting on their content, and accessing the raw response.
Waiting for events
Section titled “Waiting for events”| Method | Returns | Description |
|---|---|---|
waitForEvent(timeout?) | Promise<TestSseEvent> | Wait for and return the next event |
waitForEnd(timeout?) | Promise<void> | Wait for the stream to end |
collectEvents(timeout?) | Promise<TestSseEvent[]> | Collect all events until the stream ends |
All timeout parameters default to 5000ms. If no event is received within the timeout, the test fails with a descriptive error.
Assertions
Section titled “Assertions”| Method | Returns | Description |
|---|---|---|
assertEvent(expected, timeout?) | Promise<void> | Assert the next event matches the expected shape (partial match) |
assertEventData(expected, timeout?) | Promise<void> | Assert the next event’s data field equals the expected string |
assertJsonEventData(expected, timeout?) | Promise<void> | Assert the next event’s data field, parsed as JSON, matches the expected value |
Raw access
Section titled “Raw access”Use the raw property to access the underlying Response object for advanced scenarios:
const sse = await module.sse('/events').connect()console.log(sse.raw.headers.get('content-type')) // text/event-streamTestSseEvent
Section titled “TestSseEvent”Each event received from the stream is represented as a TestSseEvent:
| Field | Type | Description |
|---|---|---|
data | string | The event data |
event | string | undefined | The event type (from the event: field) |
id | string | undefined | The event ID (from the id: field) |
retry | number | undefined | The reconnection time in milliseconds (from the retry: field) |
Full test example
Section titled “Full test example”Here is a complete test suite for a streaming controller that emits events:
import { Test, type TestingModule } from '@stratal/testing'import { afterAll, beforeEach, describe, it, expect } from 'vitest'import { StreamingModule } from '../streaming.module'
describe('StreamingController', () => { let module: TestingModule
beforeEach(async () => { module = await Test.createTestingModule({ imports: [StreamingModule], }).compile() })
afterAll(async () => { await module.close() })
it('should stream events', async () => { const sse = await module.sse('/events/stream').connect()
await sse.assertEventData('event-1') await sse.assertEventData('event-2') await sse.waitForEnd() })
it('should stream named events', async () => { const sse = await module.sse('/events/stream').connect()
await sse.assertEvent({ event: 'message', data: 'hello' }) await sse.waitForEnd() })
it('should stream JSON data', async () => { const sse = await module.sse('/events/json').connect()
await sse.assertJsonEventData({ status: 'ok', count: 1 }) await sse.waitForEnd() })
it('should collect all events', async () => { const sse = await module.sse('/events/stream').connect()
const events = await sse.collectEvents() expect(events).toHaveLength(3) expect(events[0].data).toBe('event-1') })
it('should support authenticated connections', async () => { const sse = await module.sse('/events/stream') .actingAs({ id: 'user-123' }) .connect()
await sse.assertEventData('event-1') await sse.waitForEnd() })})Next steps
Section titled “Next steps”- Controllers and Routing for building streaming endpoints.
- HTTP Testing for the equivalent HTTP testing API.
- WebSocket Testing for the WebSocket testing API.
- Testing Module for creating modules and provider overrides.