Skip to content

Context

Context provides metadata and request-specific data to event handlers.

By default, handlers receive a BaseContext:

interface BaseContext {
messageId: string; // Unique message identifier
timestamp: Date; // When the message was created
}

Usage:

subscriber.on("user.created", (payload, { ctx }) => {
console.log(`Message ID: ${ctx.messageId}`);
console.log(`Timestamp: ${ctx.timestamp}`);
});

Create custom context with additional data using a context factory:

interface MyContext extends BaseContext {
userId?: string;
traceId?: string;
requestId?: string;
environment: string;
}
const subscriber = new Subscriber<typeof events, MyContext>({
events,
transport,
contextFactory: (metadata) => ({
messageId: metadata.messageId,
timestamp: new Date(),
userId: metadata.userId as string | undefined,
traceId: metadata.traceId as string | undefined,
requestId: metadata.requestId as string | undefined,
environment: process.env.NODE_ENV || "development",
}),
});
subscriber.on("user.created", (payload, { ctx }) => {
console.log(`User: ${ctx.userId}`);
console.log(`Trace: ${ctx.traceId}`);
console.log(`Environment: ${ctx.environment}`);
});

The context factory receives metadata from the transport:

interface TransportMetadata {
messageId: string;
channel?: string;
connectionId?: string;
[key: string]: unknown; // Additional metadata
}

Publishers can include custom metadata:

await publisher.publish("user.created", payload, {
metadata: {
userId: "user-123",
traceId: "trace-abc",
source: "api",
},
});

This metadata is passed to the subscriber’s context factory.

interface AuthContext extends BaseContext {
userId?: string;
roles: string[];
isAuthenticated: boolean;
}
const subscriber = new Subscriber<typeof events, AuthContext>({
events,
transport,
contextFactory: async (metadata) => {
const userId = metadata.userId as string | undefined;
const user = userId ? await userService.getUser(userId) : null;
return {
messageId: metadata.messageId,
timestamp: new Date(),
userId,
roles: user?.roles || [],
isAuthenticated: !!user,
};
},
});
subscriber.on("admin.action", (payload, { ctx }) => {
if (!ctx.roles.includes("admin")) {
throw new Error("Unauthorized");
}
// Handle admin action
});
interface TracingContext extends BaseContext {
traceId: string;
spanId: string;
parentSpanId?: string;
}
const subscriber = new Subscriber<typeof events, TracingContext>({
events,
transport,
contextFactory: (metadata) => ({
messageId: metadata.messageId,
timestamp: new Date(),
traceId: (metadata.traceId as string) || generateTraceId(),
spanId: generateSpanId(),
parentSpanId: metadata.spanId as string | undefined,
}),
});
subscriber.on("order.placed", async (payload, { ctx, publisher }) => {
// Start a span
const span = tracer.startSpan("process-order", {
traceId: ctx.traceId,
parentSpanId: ctx.spanId,
});
try {
await processOrder(payload);
// Propagate trace context to downstream events
await publisher.publish("order.confirmed", confirmation, {
metadata: {
traceId: ctx.traceId,
spanId: span.spanId,
},
});
} finally {
span.end();
}
});
interface RequestContext extends BaseContext {
requestId: string;
startTime: number;
logger: Logger;
}
const subscriber = new Subscriber<typeof events, RequestContext>({
events,
transport,
contextFactory: (metadata) => {
const requestId = metadata.requestId as string || generateRequestId();
return {
messageId: metadata.messageId,
timestamp: new Date(),
requestId,
startTime: Date.now(),
logger: createLogger({ requestId }),
};
},
});
subscriber.on("data.process", (payload, { ctx }) => {
ctx.logger.info("Processing started");
// Process data...
ctx.logger.info(`Completed in ${Date.now() - ctx.startTime}ms`);
});

Middleware receives the same context:

const loggingMiddleware: SubscribeMiddleware<typeof events, MyContext> = async (
eventName,
payload,
context,
next
) => {
console.log(`[${context.traceId}] Processing ${eventName}`);
await next();
console.log(`[${context.traceId}] Completed ${eventName}`);
};

The context factory can be async:

const subscriber = new Subscriber<typeof events, MyContext>({
events,
transport,
contextFactory: async (metadata) => {
// Fetch user from database
const user = await userService.getUser(metadata.userId as string);
// Validate token
const isValid = await authService.validateToken(metadata.token as string);
return {
messageId: metadata.messageId,
timestamp: new Date(),
user,
isAuthenticated: isValid,
};
},
});

TypeScript enforces context types:

interface MyContext extends BaseContext {
userId: string;
}
subscriber.on("user.created", (payload, { ctx }) => {
// TypeScript knows ctx.userId is a string
console.log(ctx.userId.toUpperCase());
// TypeScript error: Property 'invalid' does not exist
console.log(ctx.invalid);
});