Real-Time Streaming
Stream generated events with concurrent user sessions, timestamp-ordered interleaving, and configurable pacing.
Basic Usage
import { stream, defineJourney } from '@synode/core';
for await (const event of stream(journey, { users: 50 })) {
console.log(event.name, event.timestamp);
}Pacing Modes
Control how fast events are emitted:
// Real-time: actual timeSpan delays between events
{ mode: 'realtime' }
// Accelerated: 10x faster than real-time
{ mode: 'realtime', speed: 10 }
// Fixed: constant delay between every event
{ mode: 'fixed', delayMs: 100 }
// None: as fast as possible (default)
{ mode: 'none' }Concurrent Sessions
Simulate multiple users generating events simultaneously. Events are interleaved by timestamp — like observing live platform traffic.
for await (const event of stream(journey, {
users: 200,
concurrency: 20, // 20 sessions at once
pacing: { mode: 'realtime' },
})) {
await pushToKafka(event);
}With concurrency: 20, the stream maintains 20 active user sessions. When one finishes, the next user starts. Events from all sessions are merged in timestamp order.
Continuous Mode
Set users: Infinity for an endless stream that keeps spawning new users:
const controller = new AbortController();
setTimeout(() => controller.abort(), 60_000); // stop after 1 minute
for await (const event of stream(journey, {
users: Infinity,
concurrency: 50,
spawnRate: 5, // 5 new users per second
pacing: { mode: 'realtime', speed: 10 },
signal: controller.signal,
})) {
await ingest(event);
}Stopping Continuous Streams
signal: controller.signal+controller.abort()— clean external stopmaxEvents: 10_000— automatic stop after N events
Load Testing
For maximum throughput, skip pacing and use high concurrency:
let count = 0;
for await (const event of stream(journey, {
users: 50_000,
concurrency: 200,
maxEvents: 100_000,
})) {
count++;
}
console.log(`Generated ${count} events`);StreamOptions Reference
| Option | Type | Default | Description |
|---|---|---|---|
users | number | 1 | Total users. Infinity for continuous. |
concurrency | number | 1 | Simultaneous user sessions |
pacing | PacingOptions | { mode: 'none' } | Event emission timing |
spawnRate | number | 1 | Users/second in continuous mode |
maxEvents | number | Infinity | Event count safety limit |
signal | AbortSignal | — | External stop control |
persona | PersonaDefinition | — | User persona |
datasets | DatasetDefinition[] | — | Datasets to pre-generate |
preloadedDatasets | Dataset[] | — | Pre-loaded datasets |
startDate / endDate | Date | — | Date range for user start times |
eventSchema | EventSchemaConfig | — | Schema validation |
How It Works
Internally, stream() uses a priority queue (min-heap) to merge events from concurrent sessions by timestamp:
When a session ends, a new user is spawned (if any remain) and pushed into the heap.
