Skip to content

Real-Time Streaming

Stream generated events with concurrent user sessions, timestamp-ordered interleaving, and configurable pacing.

Basic Usage

typescript
import { stream, defineJourney } from '@synode/core';

for await (const event of stream(journey, { users: 50 })) {
  console.log(event.name, event.timestamp);
}

Pacing Modes

Control how fast events are emitted:

typescript
// Real-time: actual timeSpan delays between events
{ mode: 'realtime' }

// Accelerated: 10x faster than real-time
{ mode: 'realtime', speed: 10 }

// Fixed: constant delay between every event
{ mode: 'fixed', delayMs: 100 }

// None: as fast as possible (default)
{ mode: 'none' }

Concurrent Sessions

Simulate multiple users generating events simultaneously. Events are interleaved by timestamp — like observing live platform traffic.

typescript
for await (const event of stream(journey, {
  users: 200,
  concurrency: 20,          // 20 sessions at once
  pacing: { mode: 'realtime' },
})) {
  await pushToKafka(event);
}

With concurrency: 20, the stream maintains 20 active user sessions. When one finishes, the next user starts. Events from all sessions are merged in timestamp order.

Continuous Mode

Set users: Infinity for an endless stream that keeps spawning new users:

typescript
const controller = new AbortController();
setTimeout(() => controller.abort(), 60_000); // stop after 1 minute

for await (const event of stream(journey, {
  users: Infinity,
  concurrency: 50,
  spawnRate: 5,              // 5 new users per second
  pacing: { mode: 'realtime', speed: 10 },
  signal: controller.signal,
})) {
  await ingest(event);
}

Stopping Continuous Streams

  • signal: controller.signal + controller.abort() — clean external stop
  • maxEvents: 10_000 — automatic stop after N events

Load Testing

For maximum throughput, skip pacing and use high concurrency:

typescript
let count = 0;
for await (const event of stream(journey, {
  users: 50_000,
  concurrency: 200,
  maxEvents: 100_000,
})) {
  count++;
}
console.log(`Generated ${count} events`);

StreamOptions Reference

OptionTypeDefaultDescription
usersnumber1Total users. Infinity for continuous.
concurrencynumber1Simultaneous user sessions
pacingPacingOptions{ mode: 'none' }Event emission timing
spawnRatenumber1Users/second in continuous mode
maxEventsnumberInfinityEvent count safety limit
signalAbortSignalExternal stop control
personaPersonaDefinitionUser persona
datasetsDatasetDefinition[]Datasets to pre-generate
preloadedDatasetsDataset[]Pre-loaded datasets
startDate / endDateDateDate range for user start times
eventSchemaEventSchemaConfigSchema validation

How It Works

Internally, stream() uses a priority queue (min-heap) to merge events from concurrent sessions by timestamp:

When a session ends, a new user is spawned (if any remain) and pushed into the heap.