Skip to content

Building Custom Adapters

Implement the OutputAdapter interface to route events to any destination: databases, message queues, cloud storage, or custom pipelines.

OutputAdapter Interface

ts
interface OutputAdapter {
  write(event: Event): Promise<void> | void;
  close?(): Promise<void> | void;
}
  • write(event) -- called once per generated event. Can be sync or async.
  • close() -- optional. Called once after all events are written. Use for flushing buffers, closing connections, releasing resources.

Example: Database Adapter

Insert events directly into a database.

ts
import type { OutputAdapter, Event } from '@synode/core';

interface DatabaseAdapterOptions {
  connectionString: string;
  tableName: string;
  batchSize?: number;
}

class DatabaseAdapter implements OutputAdapter {
  private buffer: Event[] = [];
  private readonly batchSize: number;
  private db: DatabaseConnection;

  constructor(private options: DatabaseAdapterOptions) {
    this.batchSize = options.batchSize ?? 100;
    this.db = createConnection(options.connectionString);
  }

  async write(event: Event): Promise<void> {
    this.buffer.push(event);

    if (this.buffer.length >= this.batchSize) {
      await this.flush();
    }
  }

  async close(): Promise<void> {
    if (this.buffer.length > 0) {
      await this.flush();
    }
    await this.db.close();
  }

  private async flush(): Promise<void> {
    const batch = this.buffer;
    this.buffer = [];
    await this.db.insertMany(
      this.options.tableName,
      batch.map((e) => ({
        id: e.id,
        user_id: e.userId,
        event_name: e.name,
        timestamp: e.timestamp,
        payload: JSON.stringify(e.payload),
      })),
    );
  }
}

Usage:

ts
import { generate } from '@synode/core';

const adapter = new DatabaseAdapter({
  connectionString: 'postgres://localhost:5432/analytics',
  tableName: 'events',
  batchSize: 500,
});

await generate(journey, { users: 10000, adapter });
// close() is called automatically by generate

Example: Filter Adapter

Wraps another adapter and filters events based on a predicate. Demonstrates the decorator pattern.

ts
import type { OutputAdapter, Event } from '@synode/core';

class FilterAdapter implements OutputAdapter {
  constructor(
    private inner: OutputAdapter,
    private predicate: (event: Event) => boolean,
  ) {}

  async write(event: Event): Promise<void> {
    if (this.predicate(event)) {
      await this.inner.write(event);
    }
  }

  async close(): Promise<void> {
    await this.inner.close?.();
  }
}

Usage:

ts
import { FileAdapter } from '@synode/adapter-file';

// Only write purchase events to file
const adapter = new FilterAdapter(
  new FileAdapter({ path: './purchases.jsonl', format: 'jsonl' }),
  (event) => event.name === 'purchase',
);

await generate(journeys, { users: 5000, adapter });

Composing Custom Adapters

Use CompositeAdapter to combine custom adapters with built-in ones.

ts
import { CompositeAdapter } from '@synode/adapter-composite';
import { FileAdapter } from '@synode/adapter-file';

const adapter = new CompositeAdapter([
  new DatabaseAdapter({ connectionString: '...', tableName: 'events' }),
  new FileAdapter({ path: './backup.jsonl', format: 'jsonl' }),
  new CountingAdapter(),
]);

await generate(journey, { users: 10000, adapter });

close() Lifecycle

The generate function calls adapter.close() automatically after all users are processed. If you use an adapter outside of generate, call close() manually.

Key patterns for close():

  • Flush any buffered data
  • Close database connections, file handles, network sockets
  • Log summary statistics
  • Release resources