Skip to content

Core API

Creates a type-safe event registry from event definitions.

function defineEvent<T extends EventDefinitionInput[]>(
definitions: T
): EventRegistry<T>
ParameterTypeDescription
definitionsEventDefinitionInput[]Array of event definitions
interface EventDefinitionInput {
name: string;
schema: StandardSchema;
options?: EventOptions;
}
interface EventOptions {
channel?: string;
description?: string;
}
import { defineEvent } from "@pubsubjs/core";
import { z } from "zod";
const events = defineEvent([
{
name: "user.created",
schema: z.object({
userId: z.string(),
email: z.string().email(),
}),
options: {
channel: "users",
description: "Emitted when a new user is created",
},
},
]);

Class for publishing type-safe events.

class Publisher<TEvents extends EventRegistry>
new Publisher(options: PublisherOptions<TEvents>)
OptionTypeRequiredDescription
eventsTEventsYesEvent registry
transportTransportYesTransport to use
middlewarePublishMiddleware[]NoMiddleware chain
channelStrategy(name: string) => stringNoChannel naming strategy
skipValidationbooleanNoSkip payload validation
autoReconnectbooleanNoEnable auto-reconnection
reconnectIntervalnumberNoReconnection interval (ms)
maxReconnectAttemptsnumberNoMax reconnection attempts
async publish<TEventName extends EventNames<TEvents>>(
eventName: TEventName,
payload: EventPayload<TEvents, TEventName>,
options?: PublishOptions
): Promise<void>
async connect(): Promise<void>
async disconnect(): Promise<void>
PropertyTypeDescription
stateConnectionStateCurrent connection state
isConnectedbooleanWhether connected

Class for subscribing to events.

class Subscriber<
TEvents extends EventRegistry,
TContext extends BaseContext = BaseContext,
TPublisher extends PublisherInterface | undefined = undefined
>
new Subscriber(options: SubscriberOptions<TEvents, TContext, TPublisher>)
OptionTypeRequiredDescription
eventsTEventsYesEvent registry
transportTransportYesTransport to use
middlewareSubscribeMiddleware[]NoMiddleware chain
contextFactoryContextFactory<TContext>NoCustom context factory
publisherTPublisherNoPublisher for reply patterns
onErrorSubscriberErrorHandlerNoError handler
channelStrategy(name: string) => stringNoChannel naming strategy
skipValidationbooleanNoSkip payload validation
on<TEventName extends EventNames<TEvents>>(
eventName: TEventName,
handler: EventHandler<EventPayload<TEvents, TEventName>, TContext, TPublisher>
): this
off<TEventName extends EventNames<TEvents>>(eventName: TEventName): this
onMany(handlers: HandlerMap<TEvents, TContext, TPublisher>): this
async subscribe(): Promise<void>
async unsubscribe(): Promise<void>
PropertyTypeDescription
stateConnectionStateCurrent connection state
isConnectedbooleanWhether connected

Publisher middleware that logs events.

function createLoggingMiddleware<TEvents>(): PublishMiddleware<TEvents>

Subscriber middleware that logs events.

function createSubscriberLoggingMiddleware<TEvents, TContext>(): SubscribeMiddleware<TEvents, TContext>

Reports handler duration.

function createSubscriberTimingMiddleware<TEvents, TContext>(
onTiming: (eventName: string, durationMs: number) => void
): SubscribeMiddleware<TEvents, TContext>

Prevents duplicate message processing.

function createIdempotencyMiddleware<TEvents, TContext>(
options: IdempotencyOptions
): SubscribeMiddleware<TEvents, TContext>
interface IdempotencyOptions {
hasProcessed: (messageId: string) => boolean | Promise<boolean>;
markProcessed: (messageId: string) => void | Promise<void>;
}

Limits event processing rate.

function createRateLimitMiddleware<TEvents, TContext>(
options: RateLimitOptions
): SubscribeMiddleware<TEvents, TContext>
interface RateLimitOptions {
maxEvents: number;
windowMs: number;
onLimit?: (eventName: string, payload: unknown) => void;
}

Extracts event names as union type.

type EventNames<T extends EventRegistry> = keyof T & string

Extracts payload type for an event.

type EventPayload<T extends EventRegistry, K extends EventNames<T>> = InferOutput<T[K]["schema"]>
type ConnectionState = "disconnected" | "connecting" | "connected" | "reconnecting"
type PublishMiddleware<TEvents extends EventRegistry> = (
eventName: EventNames<TEvents>,
payload: unknown,
options: PublishOptions | undefined,
next: () => Promise<void>
) => Promise<void>
type SubscribeMiddleware<
TEvents extends EventRegistry,
TContext extends BaseContext = BaseContext
> = (
eventName: EventNames<TEvents>,
payload: unknown,
context: TContext,
next: () => Promise<void>
) => Promise<void>
interface BaseContext {
messageId: string;
timestamp: Date;
}

Thrown when payload validation fails.

class ValidationError extends Error {
issues: StandardSchemaIssue[];
}

Thrown when event is not defined.

class UnknownEventError extends Error {
eventName: string;
}

Thrown when connection fails.

class ConnectionError extends Error {}