Building Custom Adapters
Implement the OutputAdapter interface to route events to any destination: databases, message queues, cloud storage, or custom pipelines.
OutputAdapter Interface
ts
interface OutputAdapter {
write(event: Event): Promise<void> | void;
close?(): Promise<void> | void;
}write(event)-- called once per generated event. Can be sync or async.close()-- optional. Called once after all events are written. Use for flushing buffers, closing connections, releasing resources.
Example: Database Adapter
Insert events directly into a database.
ts
import type { OutputAdapter, Event } from '@synode/core';
interface DatabaseAdapterOptions {
connectionString: string;
tableName: string;
batchSize?: number;
}
class DatabaseAdapter implements OutputAdapter {
private buffer: Event[] = [];
private readonly batchSize: number;
private db: DatabaseConnection;
constructor(private options: DatabaseAdapterOptions) {
this.batchSize = options.batchSize ?? 100;
this.db = createConnection(options.connectionString);
}
async write(event: Event): Promise<void> {
this.buffer.push(event);
if (this.buffer.length >= this.batchSize) {
await this.flush();
}
}
async close(): Promise<void> {
if (this.buffer.length > 0) {
await this.flush();
}
await this.db.close();
}
private async flush(): Promise<void> {
const batch = this.buffer;
this.buffer = [];
await this.db.insertMany(
this.options.tableName,
batch.map((e) => ({
id: e.id,
user_id: e.userId,
event_name: e.name,
timestamp: e.timestamp,
payload: JSON.stringify(e.payload),
})),
);
}
}Usage:
ts
import { generate } from '@synode/core';
const adapter = new DatabaseAdapter({
connectionString: 'postgres://localhost:5432/analytics',
tableName: 'events',
batchSize: 500,
});
await generate(journey, { users: 10000, adapter });
// close() is called automatically by generateExample: Filter Adapter
Wraps another adapter and filters events based on a predicate. Demonstrates the decorator pattern.
ts
import type { OutputAdapter, Event } from '@synode/core';
class FilterAdapter implements OutputAdapter {
constructor(
private inner: OutputAdapter,
private predicate: (event: Event) => boolean,
) {}
async write(event: Event): Promise<void> {
if (this.predicate(event)) {
await this.inner.write(event);
}
}
async close(): Promise<void> {
await this.inner.close?.();
}
}Usage:
ts
import { FileAdapter } from '@synode/adapter-file';
// Only write purchase events to file
const adapter = new FilterAdapter(
new FileAdapter({ path: './purchases.jsonl', format: 'jsonl' }),
(event) => event.name === 'purchase',
);
await generate(journeys, { users: 5000, adapter });Composing Custom Adapters
Use CompositeAdapter to combine custom adapters with built-in ones.
ts
import { CompositeAdapter } from '@synode/adapter-composite';
import { FileAdapter } from '@synode/adapter-file';
const adapter = new CompositeAdapter([
new DatabaseAdapter({ connectionString: '...', tableName: 'events' }),
new FileAdapter({ path: './backup.jsonl', format: 'jsonl' }),
new CountingAdapter(),
]);
await generate(journey, { users: 10000, adapter });close() Lifecycle
The generate function calls adapter.close() automatically after all users are processed. If you use an adapter outside of generate, call close() manually.
Key patterns for close():
- Flush any buffered data
- Close database connections, file handles, network sockets
- Log summary statistics
- Release resources
