SSE Transport
The SSE (Server-Sent Events) transport enables efficient server-to-client streaming over HTTP.
Installation
Section titled “Installation”bun add @pubsubjs/transport-sseWhat is SSE?
Section titled “What is SSE?”Server-Sent Events is a standard for pushing updates from server to client over HTTP:
- Unidirectional: Server → Client only
- Auto-reconnect: Built-in reconnection handling
- Simple: Works over standard HTTP
- Firewall-friendly: Uses regular HTTP port
Server Transport
Section titled “Server Transport”Basic Setup
Section titled “Basic Setup”import { SSEServerTransport } from "@pubsubjs/transport-sse";import { Publisher } from "@pubsubjs/core";import { events } from "./events";
const transport = new SSEServerTransport();const publisher = new Publisher({ events, transport });
Bun.serve({ port: 3000, routes: { "/events": (req) => transport.handleRequest(req), "/api/notify": async (req) => { const body = await req.json(); await publisher.publish("notification", body); return new Response("OK"); }, },});Server Options
Section titled “Server Options”const transport = new SSEServerTransport({ // Keep-alive interval (default: 15000ms) keepAliveInterval: 15000,
// Retry interval for clients (default: 3000ms) retryInterval: 3000,
// Custom headers headers: { "Access-Control-Allow-Origin": "*", },});Broadcasting to All Clients
Section titled “Broadcasting to All Clients”// All connected clients receive thisawait publisher.publish("announcement", { message: "Server will restart in 5 minutes",});Targeting Specific Clients
Section titled “Targeting Specific Clients”// Only specific clients receive thisawait publisher.publish("private.message", payload, { targetIds: ["client-123"],});Client Connection Tracking
Section titled “Client Connection Tracking”transport.on("connection", (clientId) => { console.log(`Client connected: ${clientId}`); // Store client info, associate with user, etc.});
transport.on("disconnection", (clientId) => { console.log(`Client disconnected: ${clientId}`);});Client Transport
Section titled “Client Transport”Browser Usage
Section titled “Browser Usage”import { SSEClientTransport } from "@pubsubjs/transport-sse";import { Subscriber } from "@pubsubjs/core";import { events } from "./events";
const transport = new SSEClientTransport({ url: "http://localhost:3000/events",});
const subscriber = new Subscriber({ events, transport });
subscriber.on("notification", (payload) => { showNotification(payload.message);});
await subscriber.subscribe();Client Options
Section titled “Client Options”const transport = new SSEClientTransport({ // SSE endpoint URL url: "https://api.example.com/events",
// Custom headers (via query params or EventSource polyfill) headers: { Authorization: `Bearer ${token}`, },
// Reconnection reconnect: true, reconnectInterval: 3000,});React Integration
Section titled “React Integration”import { useEffect, useState } from "react";import { SSEClientTransport } from "@pubsubjs/transport-sse";import { Subscriber } from "@pubsubjs/core";
function useSSENotifications() { const [notifications, setNotifications] = useState([]);
useEffect(() => { const transport = new SSEClientTransport({ url: "/api/events", });
const subscriber = new Subscriber({ events, transport });
subscriber.on("notification", (payload) => { setNotifications((prev) => [...prev, payload]); });
subscriber.subscribe();
return () => subscriber.unsubscribe(); }, []);
return notifications;}Use Cases
Section titled “Use Cases”Live Notifications
Section titled “Live Notifications”// Serversubscriber.on("order.shipped", async (payload, { publisher }) => { await publisher.publish("notification", { userId: payload.customerId, type: "shipping", message: `Your order ${payload.orderId} has shipped!`, });}, { targetIds: [payload.customerId],});
// Clientsubscriber.on("notification", (payload) => { toast.show(payload.message);});Real-time Dashboard
Section titled “Real-time Dashboard”// Server: Broadcast metrics every secondsetInterval(async () => { const metrics = await collectMetrics(); await publisher.publish("metrics.update", metrics);}, 1000);
// Client: Update dashboardsubscriber.on("metrics.update", (payload) => { updateChart(payload.cpu, payload.memory); updateTable(payload.requests);});Live Feed
Section titled “Live Feed”// Serversubscriber.on("post.created", async (payload) => { await publisher.publish("feed.update", { type: "new_post", post: payload, });});
// Clientsubscriber.on("feed.update", (payload) => { if (payload.type === "new_post") { prependToFeed(payload.post); }});Authentication
Section titled “Authentication”Token in URL
Section titled “Token in URL”// Clientconst transport = new SSEClientTransport({ url: `https://api.example.com/events?token=${token}`,});
// Serverapp.get("/events", (req) => { const token = req.query.token; const user = verifyToken(token); if (!user) { return new Response("Unauthorized", { status: 401 }); } return transport.handleRequest(req, { userId: user.id });});Cookie-based
Section titled “Cookie-based”// Client (cookies sent automatically)const transport = new SSEClientTransport({ url: "https://api.example.com/events", withCredentials: true,});
// Serverapp.get("/events", (req) => { const session = getSession(req.cookies); if (!session) { return new Response("Unauthorized", { status: 401 }); } return transport.handleRequest(req, { userId: session.userId });});SSE vs WebSocket
Section titled “SSE vs WebSocket”| Feature | SSE | WebSocket |
|---|---|---|
| Direction | Server → Client | Bidirectional |
| Protocol | HTTP | WebSocket |
| Reconnection | Built-in | Manual |
| Binary data | No (text only) | Yes |
| Browser support | All modern | All modern |
| Proxy/firewall | Usually works | May be blocked |
Use SSE when:
- You only need server-to-client communication
- Firewalls block WebSocket
- You want simpler infrastructure
Use WebSocket when:
- You need bidirectional communication
- You’re sending binary data
- You need lower latency
Best Practices
Section titled “Best Practices”Connection Limits
Section titled “Connection Limits”Browsers limit SSE connections per domain (usually 6). Use a single connection with channel multiplexing:
// Client subscribes to one endpoint, receives multiple event typessubscriber.on("notification", handleNotification);subscriber.on("metrics", handleMetrics);subscriber.on("feed", handleFeed);Graceful Degradation
Section titled “Graceful Degradation”// Fall back to polling if SSE not supportedif (typeof EventSource === "undefined") { startPolling();} else { startSSE();}Next Steps
Section titled “Next Steps”- WebSocket Transport - Bidirectional communication
- Redis Transport - Distributed systems
- React Integration - Use with React