Event-Driven Architecture Guide: Kafka, RabbitMQ, Event Sourcing, CQRS, Sagas & Stream Processing
Master event-driven architecture — Apache Kafka deep dive, RabbitMQ patterns, event sourcing with projections and snapshots, CQRS command/query separation, saga orchestration vs choreography, async messaging patterns, schema evolution (Avro/Protobuf), serverless event processing, stream processing with Kafka Streams and Flink, testing strategies, and monitoring with distributed tracing.
- Events are immutable facts describing "something that happened"
- Kafka for high-throughput event streaming and durable logs; RabbitMQ for flexible routing and task queues
- Event sourcing stores every state change as an immutable event; replay to derive current state
- CQRS separates read and write models, allowing each to scale and optimize independently
- Saga pattern for distributed transactions — orchestration for complex flows, choreography for loose coupling
- Use Avro/Protobuf + Schema Registry for event schema evolution
- Kafka Streams / Flink for stateful stream processing (windowing, aggregation, joins)
- Monitor consumer lag, dead letter queues, and end-to-end latency as core health indicators
- EDA achieves temporal and spatial decoupling between services via async communication
- Kafka partitions and consumer groups natively support horizontal scaling and message replay
- RabbitMQ exchange types (direct/topic/fanout/headers) provide flexible message routing
- Event sourcing + projections = full audit trail + multi-view read optimization
- Schema evolution requires backward compatibility: only add optional fields, never remove required ones
- Use contract testing, embedded brokers, and event replay to ensure system correctness
- Distributed tracing (OpenTelemetry) + consumer lag monitoring are the two pillars of observability
1. Event-Driven Architecture Fundamentals
Event-driven architecture (EDA) is a software design paradigm where systems communicate by producing and consuming events. An event represents a state change — a fact that has already occurred. Unlike request-response models, producers in EDA do not know or care who consumes their events.
Core Concepts
| Concept | Description | Example |
|---|---|---|
| Event | Immutable fact recording a past state change | OrderPlaced, UserRegistered |
| Command | Intent to perform an action (may be rejected) | PlaceOrder, RegisterUser |
| Query | Request data without modifying state | GetOrderById, ListUsers |
| Event Bus | Transport channel connecting producers and consumers | Kafka, RabbitMQ, NATS |
| Producer | Service or component that publishes events | Order Service, Payment Service |
| Consumer | Service or component that subscribes to and processes events | Notification Service, Analytics |
// Event interface — the foundation of EDA
interface DomainEvent {
eventId: string; // Unique event identifier (UUID)
eventType: string; // e.g., "OrderPlaced"
aggregateId: string; // ID of the entity that produced the event
aggregateType: string; // e.g., "Order"
timestamp: string; // ISO 8601 timestamp
version: number; // Schema version for evolution
payload: Record<string, unknown>;
metadata: {
correlationId: string; // Traces a request across services
causationId: string; // ID of the event/command that caused this
userId?: string;
};
}
// Example: OrderPlaced event
const orderPlaced: DomainEvent = {
eventId: "evt-a1b2c3d4",
eventType: "OrderPlaced",
aggregateId: "order-12345",
aggregateType: "Order",
timestamp: "2026-02-28T10:30:00Z",
version: 1,
payload: {
customerId: "cust-67890",
items: [
{ productId: "prod-001", quantity: 2, price: 29.99 },
{ productId: "prod-002", quantity: 1, price: 49.99 },
],
totalAmount: 109.97,
currency: "USD",
},
metadata: {
correlationId: "corr-xyz-789",
causationId: "cmd-place-order-456",
userId: "user-abc",
},
};2. Apache Kafka Deep Dive
Apache Kafka is a distributed event streaming platform designed for high-throughput, durable, and replayable event processing. Kafka organizes data into topics, each divided into partitions for parallel processing and horizontal scaling.
Topics, Partitions, and Consumer Groups
// Kafka producer — Node.js with kafkajs
import { Kafka, Partitioners } from "kafkajs";
const kafka = new Kafka({
clientId: "order-service",
brokers: ["kafka-1:9092", "kafka-2:9092", "kafka-3:9092"],
});
const producer = kafka.producer({
createPartitioner: Partitioners.DefaultPartitioner,
idempotent: true, // Enable exactly-once semantics
maxInFlightRequests: 5,
transactionalId: "order-producer-txn",
});
async function publishOrderEvent(order: Order): Promise<void> {
await producer.connect();
// Key determines partition — same orderId always goes to same partition
await producer.send({
topic: "orders.events",
messages: [
{
key: order.id,
value: JSON.stringify({
eventType: "OrderPlaced",
payload: order,
timestamp: new Date().toISOString(),
}),
headers: {
"correlation-id": order.correlationId,
"event-type": "OrderPlaced",
"schema-version": "1",
},
},
],
});
}// Kafka consumer with consumer group
const consumer = kafka.consumer({
groupId: "inventory-service-group",
sessionTimeout: 30000,
heartbeatInterval: 3000,
maxBytesPerPartition: 1048576, // 1MB
});
async function startConsumer(): Promise<void> {
await consumer.connect();
await consumer.subscribe({
topics: ["orders.events"],
fromBeginning: false,
});
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
const event = JSON.parse(message.value!.toString());
const correlationId = message.headers?.["correlation-id"]?.toString();
console.log(
`[Partition \${partition}] Processing \${event.eventType}`,
`offset=\${message.offset}, key=\${message.key}`
);
switch (event.eventType) {
case "OrderPlaced":
await reserveInventory(event.payload);
break;
case "OrderCancelled":
await releaseInventory(event.payload);
break;
default:
console.warn(`Unknown event type: \${event.eventType}`);
}
},
});
}Kafka Exactly-Once Semantics
Kafka achieves exactly-once semantics (EOS) through idempotent producers and the transactional API, ensuring messages are not duplicated on producer retries and consumers only see messages after transaction commit.
// Exactly-once: transactional produce + consume
async function processAndForward(inputTopic: string, outputTopic: string) {
const transaction = await producer.transaction();
try {
// Read from input topic
const messages = await consumeBatch(inputTopic);
for (const msg of messages) {
const enriched = await enrichEvent(msg);
// Write to output topic within the same transaction
await transaction.send({
topic: outputTopic,
messages: [{ key: msg.key, value: JSON.stringify(enriched) }],
});
}
// Commit offsets and produced messages atomically
await transaction.sendOffsets({
consumerGroupId: "enrichment-group",
topics: [{ topic: inputTopic, partitions: messages.map(m => ({
partition: m.partition,
offset: (parseInt(m.offset) + 1).toString(),
}))}],
});
await transaction.commit();
} catch (err) {
await transaction.abort();
throw err;
}
}3. RabbitMQ Patterns
RabbitMQ is a feature-rich message broker that provides flexible routing through exchanges and queues. It supports multiple exchange types, each suited for different messaging scenarios.
| Exchange Type | Routing | Use Case |
|---|---|---|
| Direct | Exact routing key match | Task distribution, log levels |
| Topic | Wildcard pattern match on routing key | Multi-tenant events, geo routing |
| Fanout | Broadcast to all bound queues | Notifications, cache invalidation |
| Headers | Match on message header attributes | Complex routing, content-type routing |
// RabbitMQ with amqplib — Topic exchange + Dead Letter Queue
import amqp from "amqplib";
async function setupRabbitMQ() {
const connection = await amqp.connect("amqp://localhost:5672");
const channel = await connection.createChannel();
// Dead letter exchange for failed messages
await channel.assertExchange("dlx.events", "fanout", { durable: true });
await channel.assertQueue("dlq.events", {
durable: true,
arguments: { "x-message-ttl": 86400000 }, // 24h retention
});
await channel.bindQueue("dlq.events", "dlx.events", "");
// Main topic exchange
await channel.assertExchange("domain.events", "topic", { durable: true });
// Order events queue with DLQ
await channel.assertQueue("order.processing", {
durable: true,
arguments: {
"x-dead-letter-exchange": "dlx.events",
"x-dead-letter-routing-key": "order.failed",
"x-max-retries": 3,
},
});
// Bind queue with topic pattern
// order.* matches order.placed, order.shipped, etc.
await channel.bindQueue("order.processing", "domain.events", "order.*");
// Publish an event
channel.publish(
"domain.events",
"order.placed",
Buffer.from(JSON.stringify({
eventType: "OrderPlaced",
orderId: "order-12345",
timestamp: new Date().toISOString(),
})),
{
persistent: true,
messageId: "msg-uuid-123",
contentType: "application/json",
headers: { "x-retry-count": 0 },
}
);
// Consumer with manual acknowledgment
await channel.prefetch(10); // Process 10 messages at a time
await channel.consume("order.processing", async (msg) => {
if (!msg) return;
try {
const event = JSON.parse(msg.content.toString());
await processOrderEvent(event);
channel.ack(msg); // Acknowledge success
} catch (err) {
const retryCount = (msg.properties.headers?.["x-retry-count"] || 0) + 1;
if (retryCount >= 3) {
channel.nack(msg, false, false); // Send to DLQ
} else {
channel.nack(msg, false, true); // Requeue for retry
}
}
});
}4. Event Sourcing
Event sourcing stores state as a sequence of immutable events rather than just the current state. Current state is derived by replaying events in order. This provides a complete audit trail, time-travel query capability, and the ability to build multiple read-optimized views from the event stream.
Event Store Implementation
// Event Store — append-only with optimistic concurrency
interface StoredEvent {
streamId: string;
version: number;
eventType: string;
payload: Record<string, unknown>;
metadata: Record<string, string>;
timestamp: Date;
}
class EventStore {
constructor(private db: Database) {}
async appendToStream(
streamId: string,
events: StoredEvent[],
expectedVersion: number
): Promise<void> {
const currentVersion = await this.getStreamVersion(streamId);
// Optimistic concurrency check
if (currentVersion !== expectedVersion) {
throw new ConcurrencyError(
`Expected version \${expectedVersion}, got \${currentVersion}`
);
}
const transaction = await this.db.beginTransaction();
try {
for (let i = 0; i < events.length; i++) {
await transaction.execute(
`INSERT INTO events (stream_id, version, event_type, payload, metadata, timestamp)
VALUES (?, ?, ?, ?, ?, ?)`,
[
streamId,
expectedVersion + i + 1,
events[i].eventType,
JSON.stringify(events[i].payload),
JSON.stringify(events[i].metadata),
events[i].timestamp,
]
);
}
await transaction.commit();
} catch (err) {
await transaction.rollback();
throw err;
}
}
async readStream(streamId: string, fromVersion = 0): Promise<StoredEvent[]> {
return this.db.query(
`SELECT * FROM events
WHERE stream_id = ? AND version > ?
ORDER BY version ASC`,
[streamId, fromVersion]
);
}
}Projections and Snapshots
// Aggregate with snapshot support
class OrderAggregate {
private state: OrderState = { status: "draft", items: [], total: 0 };
private version = 0;
private uncommittedEvents: StoredEvent[] = [];
// Rebuild from events with snapshot optimization
static async load(
eventStore: EventStore,
snapshotStore: SnapshotStore,
orderId: string
): Promise<OrderAggregate> {
const aggregate = new OrderAggregate();
// Try loading from snapshot first
const snapshot = await snapshotStore.get(orderId);
if (snapshot) {
aggregate.state = snapshot.state;
aggregate.version = snapshot.version;
}
// Replay events after the snapshot
const events = await eventStore.readStream(orderId, aggregate.version);
for (const event of events) {
aggregate.apply(event, false);
}
// Take a new snapshot every 100 events
if (events.length > 100) {
await snapshotStore.save(orderId, aggregate.state, aggregate.version);
}
return aggregate;
}
placeOrder(customerId: string, items: OrderItem[]): void {
if (this.state.status !== "draft") {
throw new Error("Order already placed");
}
this.apply({
eventType: "OrderPlaced",
payload: { customerId, items, total: this.calculateTotal(items) },
} as StoredEvent, true);
}
private apply(event: StoredEvent, isNew: boolean): void {
// State mutation based on event type
switch (event.eventType) {
case "OrderPlaced":
this.state.status = "placed";
this.state.items = event.payload.items as OrderItem[];
this.state.total = event.payload.total as number;
break;
case "OrderShipped":
this.state.status = "shipped";
break;
case "OrderCancelled":
this.state.status = "cancelled";
break;
}
this.version++;
if (isNew) this.uncommittedEvents.push(event);
}
}Projections (Read Models)
// Projection — build read-optimized views from events
class OrderSummaryProjection {
constructor(private readDb: Database) {}
async handle(event: StoredEvent): Promise<void> {
switch (event.eventType) {
case "OrderPlaced":
await this.readDb.execute(
`INSERT INTO order_summaries
(order_id, customer_id, status, total, item_count, placed_at)
VALUES (?, ?, ?, ?, ?, ?)`,
[
event.streamId,
event.payload.customerId,
"placed",
event.payload.total,
(event.payload.items as unknown[]).length,
event.timestamp,
]
);
break;
case "OrderShipped":
await this.readDb.execute(
`UPDATE order_summaries
SET status = ?, shipped_at = ?
WHERE order_id = ?`,
["shipped", event.timestamp, event.streamId]
);
break;
case "OrderCancelled":
await this.readDb.execute(
`UPDATE order_summaries
SET status = ?, cancelled_at = ?
WHERE order_id = ?`,
["cancelled", event.timestamp, event.streamId]
);
break;
}
}
}5. CQRS Implementation
CQRS (Command Query Responsibility Segregation) separates write operations (commands) and read operations (queries) into distinct models. The write model is optimized for data consistency and business rule validation, while the read model is optimized for query performance. The two models are synchronized via events.
// CQRS — Command side
interface Command {
type: string;
payload: Record<string, unknown>;
metadata: { userId: string; correlationId: string };
}
class CommandBus {
private handlers = new Map<string, CommandHandler>();
register(commandType: string, handler: CommandHandler): void {
this.handlers.set(commandType, handler);
}
async dispatch(command: Command): Promise<void> {
const handler = this.handlers.get(command.type);
if (!handler) throw new Error(`No handler for: \${command.type}`);
await handler.execute(command);
}
}
class PlaceOrderHandler implements CommandHandler {
constructor(
private eventStore: EventStore,
private snapshotStore: SnapshotStore
) {}
async execute(command: Command): Promise<void> {
const { customerId, items } = command.payload;
// Load aggregate from event store
const order = await OrderAggregate.load(
this.eventStore, this.snapshotStore, command.payload.orderId as string
);
// Execute business logic (may throw if invalid)
order.placeOrder(customerId as string, items as OrderItem[]);
// Persist new events
await this.eventStore.appendToStream(
command.payload.orderId as string,
order.getUncommittedEvents(),
order.getVersion()
);
}
}// CQRS — Query side
interface Query {
type: string;
params: Record<string, unknown>;
}
class QueryBus {
private handlers = new Map<string, QueryHandler>();
register(queryType: string, handler: QueryHandler): void {
this.handlers.set(queryType, handler);
}
async dispatch<T>(query: Query): Promise<T> {
const handler = this.handlers.get(query.type);
if (!handler) throw new Error(`No handler for: \${query.type}`);
return handler.execute(query) as Promise<T>;
}
}
class GetOrderSummaryHandler implements QueryHandler {
constructor(private readDb: Database) {}
async execute(query: Query): Promise<OrderSummary> {
// Query the read-optimized projection table
const result = await this.readDb.queryOne(
`SELECT order_id, customer_id, status, total, item_count,
placed_at, shipped_at, cancelled_at
FROM order_summaries WHERE order_id = ?`,
[query.params.orderId]
);
if (!result) throw new NotFoundError("Order not found");
return result;
}
}6. Saga Pattern
The saga pattern manages distributed transactions across multiple services. Each saga step is a local transaction paired with a compensating action for rollback. Sagas come in two flavors: orchestration (central coordinator) and choreography (event-driven).
| Aspect | Orchestration | Choreography |
|---|---|---|
| Coordination | Central orchestrator directs all steps | Each service listens for events and decides |
| Coupling | Orchestrator knows all steps | Services only know their own events |
| Visibility | Centralized state tracking | Distributed, needs tracing tools |
| Best For | Complex multi-step workflows (5+ steps) | Simple flows (2-4 steps) |
Orchestration Saga Implementation
// Orchestration Saga — Order fulfillment
type SagaStep = {
name: string;
execute: (context: SagaContext) => Promise<void>;
compensate: (context: SagaContext) => Promise<void>;
};
class OrderSaga {
private steps: SagaStep[] = [
{
name: "ReserveInventory",
execute: async (ctx) => {
ctx.inventoryReservationId = await inventoryService.reserve(
ctx.orderId, ctx.items
);
},
compensate: async (ctx) => {
await inventoryService.cancelReservation(ctx.inventoryReservationId);
},
},
{
name: "ProcessPayment",
execute: async (ctx) => {
ctx.paymentId = await paymentService.charge(
ctx.customerId, ctx.totalAmount
);
},
compensate: async (ctx) => {
await paymentService.refund(ctx.paymentId);
},
},
{
name: "ArrangeShipping",
execute: async (ctx) => {
ctx.shipmentId = await shippingService.createShipment(
ctx.orderId, ctx.shippingAddress
);
},
compensate: async (ctx) => {
await shippingService.cancelShipment(ctx.shipmentId);
},
},
];
async run(context: SagaContext): Promise<void> {
const completedSteps: SagaStep[] = [];
for (const step of this.steps) {
try {
console.log(`Executing step: \${step.name}`);
await step.execute(context);
completedSteps.push(step);
} catch (error) {
console.error(`Step \${step.name} failed:`, error);
// Compensate in reverse order
for (const completed of completedSteps.reverse()) {
try {
console.log(`Compensating step: \${completed.name}`);
await completed.compensate(context);
} catch (compError) {
console.error(`Compensation failed for \${completed.name}:`, compError);
// Log to dead letter queue for manual intervention
await deadLetterQueue.publish({
sagaId: context.sagaId,
failedStep: completed.name,
error: compError,
});
}
}
throw new SagaFailedError(step.name, error);
}
}
}
}7. Domain Events in DDD
In Domain-Driven Design, domain events represent meaningful state changes in the business domain. Aggregate roots produce domain events after executing commands, and event handlers respond to these events within the same bounded context or across bounded contexts, achieving eventual consistency.
// Domain Events in DDD — Aggregate producing events
abstract class AggregateRoot {
private domainEvents: DomainEvent[] = [];
protected version = 0;
protected addDomainEvent(event: DomainEvent): void {
this.domainEvents.push(event);
}
getDomainEvents(): DomainEvent[] {
return [...this.domainEvents];
}
clearDomainEvents(): void {
this.domainEvents = [];
}
}
class Order extends AggregateRoot {
private status: OrderStatus = "draft";
private items: OrderItem[] = [];
place(customerId: string, items: OrderItem[]): void {
// Business rule validation
if (items.length === 0) throw new Error("Order must have items");
if (this.status !== "draft") throw new Error("Order already placed");
this.status = "placed";
this.items = items;
this.addDomainEvent({
eventId: generateUUID(),
eventType: "OrderPlaced",
aggregateId: this.id,
aggregateType: "Order",
timestamp: new Date().toISOString(),
version: ++this.version,
payload: { customerId, items, total: this.calculateTotal() },
metadata: { correlationId: generateUUID(), causationId: "" },
});
}
cancel(reason: string): void {
if (this.status === "shipped") {
throw new Error("Cannot cancel shipped order");
}
this.status = "cancelled";
this.addDomainEvent({
eventId: generateUUID(),
eventType: "OrderCancelled",
aggregateId: this.id,
aggregateType: "Order",
timestamp: new Date().toISOString(),
version: ++this.version,
payload: { reason },
metadata: { correlationId: generateUUID(), causationId: "" },
});
}
}
// Event dispatcher — publish domain events after persistence
class DomainEventDispatcher {
private handlers = new Map<string, DomainEventHandler[]>();
subscribe(eventType: string, handler: DomainEventHandler): void {
const existing = this.handlers.get(eventType) || [];
this.handlers.set(eventType, [...existing, handler]);
}
async dispatch(events: DomainEvent[]): Promise<void> {
for (const event of events) {
const handlers = this.handlers.get(event.eventType) || [];
await Promise.all(
handlers.map(h => h.handle(event))
);
}
}
}8. Async Messaging Patterns
Async messaging patterns define how services exchange messages. Choosing the right pattern depends on message semantics (event vs command), number of consumers, and delivery guarantee requirements.
| Pattern | Description | Consumers | Use Case |
|---|---|---|---|
| Pub/Sub | One-to-many broadcast, all subscribers receive a copy | Multiple | Event notification, cache invalidation |
| Point-to-Point | One-to-one, message processed by exactly one consumer | One | Task queues, command processing |
| Request-Reply | Async request-response via temporary reply queue | One | Async RPC, long operations |
| Competing Consumers | Multiple consumers compete for messages from one queue | Multiple (competing) | Load balancing, horizontal scaling |
// Request-Reply pattern with correlation IDs
class AsyncRequestReply {
private pendingRequests = new Map<string, {
resolve: (value: unknown) => void;
reject: (reason: unknown) => void;
timer: NodeJS.Timeout;
}>();
constructor(
private channel: MessageChannel,
private replyQueue: string,
private timeoutMs = 30000
) {
// Listen for replies on the dedicated reply queue
this.channel.consume(this.replyQueue, (msg) => {
if (!msg) return;
const correlationId = msg.properties.correlationId;
const pending = this.pendingRequests.get(correlationId);
if (pending) {
clearTimeout(pending.timer);
this.pendingRequests.delete(correlationId);
const reply = JSON.parse(msg.content.toString());
if (reply.error) {
pending.reject(new Error(reply.error));
} else {
pending.resolve(reply.data);
}
}
this.channel.ack(msg);
});
}
async request(queue: string, payload: unknown): Promise<unknown> {
const correlationId = generateUUID();
return new Promise((resolve, reject) => {
const timer = setTimeout(() => {
this.pendingRequests.delete(correlationId);
reject(new Error("Request timed out"));
}, this.timeoutMs);
this.pendingRequests.set(correlationId, { resolve, reject, timer });
this.channel.sendToQueue(queue, Buffer.from(JSON.stringify(payload)), {
correlationId,
replyTo: this.replyQueue,
expiration: String(this.timeoutMs),
});
});
}
}9. Event Schema Evolution
In long-lived event-driven systems, event schemas inevitably need to evolve. The key to schema evolution is maintaining backward compatibility, ensuring new consumers can read old events and old consumers are not broken by new events.
| Compatibility Type | Description | Rules |
|---|---|---|
| Backward | New consumers can read old events | New fields must have defaults |
| Forward | Old consumers can read new events | No field removal, no type changes |
| Full | Both forward and backward compatible | Only add optional fields with defaults |
// Avro schema evolution example
// Version 1 — Original OrderPlaced schema
const orderPlacedV1 = {
type: "record",
name: "OrderPlaced",
namespace: "com.example.events",
fields: [
{ name: "orderId", type: "string" },
{ name: "customerId", type: "string" },
{ name: "totalAmount", type: "double" },
{ name: "currency", type: "string" },
{ name: "timestamp", type: "string" },
],
};
// Version 2 — Added optional fields (backward compatible)
const orderPlacedV2 = {
type: "record",
name: "OrderPlaced",
namespace: "com.example.events",
fields: [
{ name: "orderId", type: "string" },
{ name: "customerId", type: "string" },
{ name: "totalAmount", type: "double" },
{ name: "currency", type: "string" },
{ name: "timestamp", type: "string" },
// New optional fields with defaults — backward compatible
{ name: "discountCode", type: ["null", "string"], default: null },
{ name: "priority", type: "string", default: "normal" },
{ name: "channel", type: "string", default: "web" },
],
};
// Schema Registry client
import { SchemaRegistry } from "@kafkajs/confluent-schema-registry";
const registry = new SchemaRegistry({
host: "http://schema-registry:8081",
});
// Register schema with compatibility check
async function registerSchema() {
const { id } = await registry.register({
type: "AVRO",
schema: JSON.stringify(orderPlacedV2),
}, {
subject: "orders.events-value",
});
console.log("Registered schema with id:", id);
}
// Produce with schema validation
async function produceWithSchema(event: OrderPlacedEvent) {
const schemaId = await registry.getLatestSchemaId("orders.events-value");
const encodedValue = await registry.encode(schemaId, event);
await producer.send({
topic: "orders.events",
messages: [{ key: event.orderId, value: encodedValue }],
});
}10. Serverless Event Processing
Serverless platforms are a natural fit for event-driven architecture because functions are inherently event handlers. AWS Lambda + EventBridge and Azure Functions + Event Grid are two mainstream serverless event processing solutions.
// AWS Lambda + EventBridge — Order event processing
// serverless.yml (Serverless Framework)
// service: order-processor
// provider:
// name: aws
// runtime: nodejs20.x
// functions:
// processOrder:
// handler: handler.processOrder
// events:
// - eventBridge:
// pattern:
// source: ["order-service"]
// detail-type: ["OrderPlaced", "OrderCancelled"]
import { EventBridgeEvent, Context } from "aws-lambda";
import { EventBridgeClient, PutEventsCommand } from "@aws-sdk/client-eventbridge";
const eventBridge = new EventBridgeClient({});
// Producer — publish event to EventBridge
export async function publishOrderEvent(order: Order): Promise<void> {
await eventBridge.send(new PutEventsCommand({
Entries: [{
Source: "order-service",
DetailType: "OrderPlaced",
Detail: JSON.stringify({
orderId: order.id,
customerId: order.customerId,
totalAmount: order.total,
items: order.items,
}),
EventBusName: "orders-bus",
}],
}));
}
// Consumer — Lambda handler triggered by EventBridge
export async function processOrder(
event: EventBridgeEvent<"OrderPlaced", OrderPayload>,
context: Context
): Promise<void> {
const { orderId, customerId, totalAmount } = event.detail;
console.log(
`Processing \${event["detail-type"]} for order \${orderId}`,
`requestId=\${context.awsRequestId}`
);
// Idempotency check — prevent duplicate processing
const alreadyProcessed = await checkIdempotencyKey(orderId);
if (alreadyProcessed) {
console.log(`Order \${orderId} already processed, skipping`);
return;
}
await reserveInventory(orderId);
await sendConfirmationEmail(customerId, orderId);
await storeIdempotencyKey(orderId);
// Chain next event
await eventBridge.send(new PutEventsCommand({
Entries: [{
Source: "fulfillment-service",
DetailType: "InventoryReserved",
Detail: JSON.stringify({ orderId, customerId }),
EventBusName: "orders-bus",
}],
}));
}11. Stream Processing
Stream processing engines like Kafka Streams and Apache Flink perform real-time stateful computations over unbounded event streams, including windowed aggregations, stream-stream joins, and complex event processing.
Kafka Streams Example
// Kafka Streams topology — Java
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
import java.time.Duration;
public class OrderAnalyticsTopology {
public static Topology build() {
StreamsBuilder builder = new StreamsBuilder();
// Source: read order events
KStream<String, OrderEvent> orders = builder
.stream("orders.events",
Consumed.with(Serdes.String(), orderEventSerde));
// Windowed aggregation: order count per customer per hour
KTable<Windowed<String>, Long> hourlyOrderCounts = orders
.filter((key, event) -> "OrderPlaced".equals(event.getType()))
.selectKey((key, event) -> event.getCustomerId())
.groupByKey()
.windowedBy(
TimeWindows.ofSizeWithNoGrace(Duration.ofHours(1))
)
.count(Materialized.as("hourly-order-counts"));
// Real-time revenue per product
KTable<String, Double> revenueByProduct = orders
.filter((key, event) -> "OrderPlaced".equals(event.getType()))
.flatMapValues(event -> event.getItems())
.selectKey((key, item) -> item.getProductId())
.groupByKey()
.aggregate(
() -> 0.0,
(productId, item, total) -> total + item.getPrice() * item.getQuantity(),
Materialized.as("revenue-by-product")
);
// Stream-stream join: match orders with payments within 5 min
KStream<String, PaymentEvent> payments = builder
.stream("payments.events",
Consumed.with(Serdes.String(), paymentEventSerde));
KStream<String, OrderWithPayment> matched = orders
.join(
payments,
(order, payment) -> new OrderWithPayment(order, payment),
JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(5)),
StreamJoined.with(Serdes.String(), orderEventSerde, paymentEventSerde)
);
matched.to("orders.matched",
Produced.with(Serdes.String(), orderWithPaymentSerde));
return builder.build();
}
}Apache Flink Windowing Example
// Apache Flink — Python (PyFlink) windowed aggregation
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.window import TumblingEventTimeWindows
from pyflink.common.time import Time
from pyflink.common.watermark_strategy import WatermarkStrategy
import json
env = StreamExecutionEnvironment.get_execution_environment()
# Configure Kafka source
kafka_source = KafkaSource.builder() \
.set_bootstrap_servers("kafka:9092") \
.set_topics("orders.events") \
.set_group_id("flink-analytics") \
.set_starting_offsets(KafkaOffsetsInitializer.earliest()) \
.set_value_only_deserializer(SimpleStringSchema()) \
.build()
orders = env.from_source(
kafka_source,
WatermarkStrategy.for_bounded_out_of_orderness(Time.seconds(5)),
"OrderEvents"
)
# Tumbling window: revenue per minute
revenue_per_minute = orders \
.map(lambda raw: json.loads(raw)) \
.filter(lambda e: e["eventType"] == "OrderPlaced") \
.key_by(lambda e: "global") \
.window(TumblingEventTimeWindows.of(Time.minutes(1))) \
.reduce(lambda a, b: {
"totalRevenue": a["totalRevenue"] + b["payload"]["totalAmount"],
"orderCount": a["orderCount"] + 1,
})
revenue_per_minute.print()
env.execute("Order Revenue Analytics")12. Testing Event-Driven Systems
Testing event-driven systems requires special strategies: contract testing validates schema compatibility between producers and consumers, event replay testing verifies projection correctness, and chaos engineering tests system resilience under broker failures.
// Contract testing with embedded Kafka (Jest)
import { EmbeddedKafka } from "@testcontainers/kafka";
describe("OrderPlaced event contract", () => {
let kafka: EmbeddedKafka;
beforeAll(async () => {
kafka = await new EmbeddedKafka().start();
});
afterAll(async () => {
await kafka.stop();
});
test("producer publishes valid OrderPlaced event", async () => {
const producer = kafka.producer();
await producer.connect();
// Produce event
await producer.send({
topic: "orders.events",
messages: [{
key: "order-123",
value: JSON.stringify({
eventType: "OrderPlaced",
orderId: "order-123",
customerId: "cust-456",
totalAmount: 99.99,
items: [{ productId: "p1", quantity: 2, price: 49.995 }],
timestamp: new Date().toISOString(),
}),
}],
});
// Consume and validate schema
const consumer = kafka.consumer({ groupId: "test-group" });
await consumer.connect();
await consumer.subscribe({ topics: ["orders.events"] });
const received = await consumeOneMessage(consumer);
const event = JSON.parse(received.value!.toString());
// Contract assertions
expect(event).toHaveProperty("eventType", "OrderPlaced");
expect(event).toHaveProperty("orderId");
expect(event).toHaveProperty("customerId");
expect(event).toHaveProperty("totalAmount");
expect(typeof event.totalAmount).toBe("number");
expect(Array.isArray(event.items)).toBe(true);
expect(event.items[0]).toHaveProperty("productId");
expect(event.items[0]).toHaveProperty("quantity");
expect(event.items[0]).toHaveProperty("price");
});
});
// Event replay testing — verify projections
describe("OrderSummaryProjection", () => {
test("rebuilds correct state from event sequence", async () => {
const events: StoredEvent[] = [
{
streamId: "order-001",
version: 1,
eventType: "OrderPlaced",
payload: {
customerId: "cust-1",
items: [{ productId: "p1", quantity: 2, price: 25 }],
total: 50,
},
metadata: {},
timestamp: new Date("2026-01-01"),
},
{
streamId: "order-001",
version: 2,
eventType: "OrderShipped",
payload: { trackingNumber: "TRACK-123" },
metadata: {},
timestamp: new Date("2026-01-02"),
},
];
const projection = new OrderSummaryProjection(testDb);
for (const event of events) {
await projection.handle(event);
}
const summary = await testDb.queryOne(
"SELECT * FROM order_summaries WHERE order_id = ?",
["order-001"]
);
expect(summary.status).toBe("shipped");
expect(summary.total).toBe(50);
expect(summary.item_count).toBe(1);
});
});Chaos Engineering Tests
// Chaos engineering — simulate broker failures
describe("Resilience under Kafka broker failure", () => {
test("consumer recovers after broker restart", async () => {
const kafka = await new KafkaContainer()
.withExposedPorts(9092)
.start();
// Produce messages
await produceTestMessages(kafka, 100);
// Start consuming
const consumed: string[] = [];
const consumer = startTestConsumer(kafka, (msg) => {
consumed.push(msg);
});
// Wait for partial consumption
await waitUntil(() => consumed.length >= 50);
// Simulate broker crash
await kafka.stop();
await delay(5000); // Broker is down for 5 seconds
await kafka.start();
// Verify consumer reconnects and processes remaining messages
await waitUntil(() => consumed.length === 100, 30000);
expect(consumed.length).toBe(100);
// Verify no duplicates (idempotent processing)
const uniqueIds = new Set(consumed.map(m => JSON.parse(m).eventId));
expect(uniqueIds.size).toBe(100);
});
});13. Monitoring & Observability
Observability in event-driven systems revolves around three key dimensions: distributed tracing (request chains across services), consumer lag monitoring (message processing speed), and dead letter queue alerting (failed messages).
// Distributed tracing with OpenTelemetry
import { trace, context, propagation, SpanKind } from "@opentelemetry/api";
import { NodeTracerProvider } from "@opentelemetry/sdk-trace-node";
import { JaegerExporter } from "@opentelemetry/exporter-jaeger";
const tracer = trace.getTracer("order-service", "1.0.0");
// Producer — inject trace context into message headers
async function produceWithTracing(
topic: string,
event: DomainEvent
): Promise<void> {
const span = tracer.startSpan(`publish \${event.eventType}`, {
kind: SpanKind.PRODUCER,
attributes: {
"messaging.system": "kafka",
"messaging.destination": topic,
"messaging.message_id": event.eventId,
"event.type": event.eventType,
},
});
// Inject trace context into Kafka headers
const headers: Record<string, string> = {};
propagation.inject(context.active(), headers);
try {
await producer.send({
topic,
messages: [{
key: event.aggregateId,
value: JSON.stringify(event),
headers,
}],
});
span.setStatus({ code: 0 }); // OK
} catch (err) {
span.setStatus({ code: 2, message: String(err) }); // ERROR
throw err;
} finally {
span.end();
}
}
// Consumer — extract trace context from message headers
async function consumeWithTracing(
message: KafkaMessage
): Promise<void> {
// Extract parent trace context from Kafka headers
const parentCtx = propagation.extract(
context.active(),
message.headers
);
const span = tracer.startSpan(
"process OrderPlaced",
{ kind: SpanKind.CONSUMER },
parentCtx
);
return context.with(
trace.setSpan(parentCtx, span),
async () => {
try {
const event = JSON.parse(message.value!.toString());
await processEvent(event);
span.setStatus({ code: 0 });
} catch (err) {
span.setStatus({ code: 2, message: String(err) });
throw err;
} finally {
span.end();
}
}
);
}Consumer Lag & DLQ Monitoring
// Consumer lag monitoring with Prometheus metrics
import { Counter, Gauge, Histogram } from "prom-client";
const consumerLag = new Gauge({
name: "kafka_consumer_lag",
help: "Consumer lag in messages",
labelNames: ["topic", "partition", "consumer_group"],
});
const messagesProcessed = new Counter({
name: "messages_processed_total",
help: "Total messages processed",
labelNames: ["topic", "event_type", "status"],
});
const processingDuration = new Histogram({
name: "message_processing_duration_seconds",
help: "Time to process a single message",
labelNames: ["topic", "event_type"],
buckets: [0.01, 0.05, 0.1, 0.5, 1, 5, 10],
});
const dlqMessages = new Counter({
name: "dlq_messages_total",
help: "Messages sent to dead letter queue",
labelNames: ["topic", "error_type"],
});
// Monitor consumer lag periodically
async function monitorLag(admin: KafkaAdmin, groupId: string) {
const offsets = await admin.fetchOffsets({ groupId });
const topicOffsets = await admin.fetchTopicOffsets("orders.events");
for (const partition of offsets) {
const latest = topicOffsets.find(
t => t.partition === partition.partition
);
if (latest) {
const lag = parseInt(latest.offset) - parseInt(partition.offset);
consumerLag.set(
{ topic: "orders.events", partition: String(partition.partition), consumer_group: groupId },
lag
);
}
}
}
// Alerting rules (Prometheus/Grafana)
// ALERT ConsumerLagHigh
// IF kafka_consumer_lag > 10000
// FOR 5m
// LABELS { severity = "warning" }
// ANNOTATIONS { summary = "Consumer lag exceeding 10K messages" }
//
// ALERT DLQMessagesIncreasing
// IF rate(dlq_messages_total[5m]) > 0
// FOR 1m
// LABELS { severity = "critical" }
// ANNOTATIONS { summary = "Messages being sent to DLQ" }Technology Comparison
| Dimension | Kafka | RabbitMQ | AWS EventBridge |
|---|---|---|---|
| Throughput | Very high (millions/sec) | High (tens of thousands/sec) | Moderate (quota-limited) |
| Message Retention | Configurable (days/infinite) | Deleted after consumption | 24h replay |
| Message Replay | Native (offset seeking) | Not supported | Limited (event archive) |
| Routing Flexibility | Key-based partitioning | Very high (exchanges + bindings) | Rule-based content routing |
| Ops Complexity | High (ZooKeeper/KRaft) | Moderate | Low (fully managed) |
| Best For | Event sourcing, streaming, data pipelines | Task queues, RPC, complex routing | Serverless event routing, AWS integration |
Frequently Asked Questions
What is event-driven architecture and when should I use it?
Event-driven architecture (EDA) is a software design pattern where state changes are captured as immutable events and propagated asynchronously between loosely coupled services. Use EDA when you need temporal decoupling between producers and consumers, high throughput and scalability, real-time data processing, audit trails, or when multiple downstream services need to react to the same business event independently.
What is the difference between Apache Kafka and RabbitMQ?
Kafka is a distributed log designed for high-throughput event streaming with persistent storage, consumer groups, and replay capability. RabbitMQ is a traditional message broker optimized for flexible routing, message acknowledgment, and complex exchange patterns. Use Kafka for event sourcing, stream processing, and high-volume data pipelines. Use RabbitMQ for task queues, RPC-style communication, and scenarios requiring complex routing logic with exchanges.
What is event sourcing and how does it differ from traditional CRUD?
Event sourcing stores every state change as an immutable event in an append-only event store, rather than overwriting the current state in a database. The current state is derived by replaying events. This provides a complete audit trail, enables temporal queries, supports event replay for debugging, and allows building multiple read-optimized projections. Traditional CRUD only stores the latest state and loses change history.
What is CQRS and why combine it with event sourcing?
CQRS (Command Query Responsibility Segregation) separates write operations (commands) from read operations (queries) into distinct models. When combined with event sourcing, commands produce events that are stored in the event store, and projections consume these events to build read-optimized views. This allows independent scaling of reads and writes, optimized query models, and eventual consistency between write and read sides.
What is the saga pattern for distributed transactions?
The saga pattern manages distributed transactions across multiple microservices by breaking them into a sequence of local transactions, each paired with a compensating action for rollback. Orchestration uses a central coordinator to direct the saga steps. Choreography uses events where each service listens for events and triggers the next step. Use orchestration for complex workflows with many steps, and choreography for simpler flows with fewer services.
How do I handle schema evolution in event-driven systems?
Use a schema registry (Confluent Schema Registry) with serialization formats like Avro or Protobuf that support schema evolution. Follow compatibility rules: backward compatibility allows new consumers to read old events, forward compatibility allows old consumers to read new events. Always add optional fields with defaults, never remove required fields, and version your event schemas. Use the schema registry to enforce compatibility checks at publish time.
How do I test event-driven systems effectively?
Use contract testing to verify event schemas between producers and consumers. Use embedded brokers (EmbeddedKafka, TestContainers) for integration tests. Implement event replay testing by storing known event sequences and verifying projections produce correct state. Use consumer-driven contract testing with tools like Pact. For resilience, apply chaos engineering to simulate broker failures, network partitions, and consumer lag.
How do I monitor and observe event-driven architectures?
Monitor consumer lag (difference between latest offset and consumer offset) as the primary health indicator. Use distributed tracing (OpenTelemetry) to propagate correlation IDs through event headers across service boundaries. Set up dead letter queue monitoring with alerts for poison messages. Track end-to-end latency from event production to final consumption. Use tools like Kafka Manager, Grafana with Prometheus exporters, and Jaeger for trace visualization.
Conclusion
Event-driven architecture is a powerful paradigm for building scalable, resilient, and loosely coupled distributed systems. Apache Kafka and RabbitMQ provide messaging infrastructure for different scenarios. Event sourcing and CQRS offer audit trails and performance optimization for complex business logic. The saga pattern addresses distributed transaction challenges. Schema evolution ensures long-term maintainability. Stream processing engines enable real-time analytics. Comprehensive testing and monitoring strategies ensure production reliability. Choose the right combination of patterns and tools for your specific needs, start simple, and introduce more patterns as complexity grows.