📚 Shared Packages
Kafka Package
Shared Kafka client, producer, and consumer configurations
Kafka Package
The Kafka Package (@repo/kafka) provides centralized Apache Kafka client configurations, producer, and consumer utilities used across all microservices in the e-commerce platform. This ensures consistent event streaming and messaging throughout the system.
🛠️ Technology Stack
- Apache Kafka: Distributed event streaming platform
- Node.js Client: kafka-js for Kafka connectivity
- TypeScript: Full type safety for Kafka operations
- Docker: Kafka cluster setup with Docker Compose
- Configuration: Centralized Kafka configuration management
📁 Package Structure
packages/kafka/
├── src/
│ ├── client.ts # Kafka client configuration
│ ├── producer.ts # Kafka producer utilities
│ ├── consumer.ts # Kafka consumer utilities
│ └── index.ts # Main exports
├── docker-compose.yml # Kafka cluster setup
└── package.json🏗️ Architecture
Kafka Cluster Setup
Docker Compose Configuration
version: '3.8'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.4.0
hostname: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: confluentinc/cp-kafka:7.4.0
hostname: kafka
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:29092,PLAINTEXT_HOST://0.0.0.0:9092
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
depends_on:
- zookeeperTopic Configuration
The Kafka cluster includes predefined topics for different event types:
export const KAFKA_TOPICS = {
PRODUCT_EVENTS: 'product.events',
ORDER_EVENTS: 'order.events',
USER_EVENTS: 'user.events',
PAYMENT_EVENTS: 'payment.events',
} as const;🔧 Core Components
Kafka Client
Client Configuration
// packages/kafka/src/client.ts
import { Kafka, logLevel } from 'kafkajs';
export const kafkaClient = new Kafka({
clientId: 'ecommerce-app',
brokers: process.env.KAFKA_BROKERS?.split(',') || ['localhost:9092'],
logLevel: logLevel.WARN,
retry: {
initialRetryTime: 100,
retries: 8,
},
});Client Features
- Connection Management: Automatic connection handling and reconnection
- Error Handling: Comprehensive error handling and logging
- Configuration: Environment-based broker configuration
Producer Utilities
Producer Setup
// packages/kafka/src/producer.ts
import { kafkaClient } from './client';
export const producer = kafkaClient.producer({
allowAutoTopicCreation: false,
transactionTimeout: 30000,
});
export const connectProducer = async () => {
try {
await producer.connect();
console.log('Kafka Producer connected successfully');
} catch (error) {
console.error('Failed to connect Kafka Producer:', error);
throw error;
}
};
export const disconnectProducer = async () => {
try {
await producer.disconnect();
console.log('Kafka Producer disconnected successfully');
} catch (error) {
console.error('Failed to disconnect Kafka Producer:', error);
}
};Event Publishing
export const publishEvent = async (
topic: string,
event: {
eventId: string;
eventType: string;
timestamp: string;
data: any;
}
) => {
try {
await producer.send({
topic,
messages: [
{
key: event.eventId,
value: JSON.stringify(event),
timestamp: new Date(event.timestamp).getTime(),
},
],
});
console.log(`Event published to ${topic}:`, event.eventType);
} catch (error) {
console.error(`Failed to publish event to ${topic}:`, error);
throw error;
}
};Consumer Utilities
Consumer Setup
// packages/kafka/src/consumer.ts
import { kafkaClient } from './client';
export const createConsumer = (groupId: string) => {
return kafkaClient.consumer({
groupId,
allowAutoTopicCreation: false,
sessionTimeout: 30000,
heartbeatInterval: 3000,
});
};
export const connectConsumer = async (consumer: any) => {
try {
await consumer.connect();
console.log('Kafka Consumer connected successfully');
} catch (error) {
console.error('Failed to connect Kafka Consumer:', error);
throw error;
}
};
export const disconnectConsumer = async (consumer: any) => {
try {
await consumer.disconnect();
console.log('Kafka Consumer disconnected successfully');
} catch (error) {
console.error('Failed to disconnect Kafka Consumer:', error);
}
};Event Subscription
export const subscribeToTopic = async (
consumer: any,
topic: string,
handler: (event: any) => Promise<void>
) => {
try {
await consumer.subscribe({
topic,
fromBeginning: false,
});
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
try {
const event = JSON.parse(message.value?.toString() || '{}');
console.log(`Processing event from ${topic}:`, event.eventType);
await handler(event);
} catch (error) {
console.error('Error processing message:', error);
// Implement dead letter queue logic here
}
},
});
} catch (error) {
console.error(`Failed to subscribe to topic ${topic}:`, error);
throw error;
}
};🔄 Usage Across Services
Product Service Usage
// In Product Service
import { producer, publishEvent, KAFKA_TOPICS } from '@repo/kafka';
export class ProductService {
async createProduct(productData: CreateProductData) {
// Create product in database
const product = await Product.create(productData);
// Publish event to Kafka
await publishEvent(KAFKA_TOPICS.PRODUCT_EVENTS, {
eventId: generateId(),
eventType: 'product.created',
timestamp: new Date().toISOString(),
data: {
id: product.id,
name: product.name,
price: product.price,
categorySlug: product.categorySlug,
sizes: product.sizes,
colors: product.colors,
images: product.images,
},
});
return product;
}
}Order Service Usage
// In Order Service
import { createConsumer, subscribeToTopic, KAFKA_TOPICS } from '@repo/kafka';
export class OrderService {
async initialize() {
const consumer = createConsumer('order-service-group');
await subscribeToTopic(
consumer,
KAFKA_TOPICS.PAYMENT_EVENTS,
this.handlePaymentEvent.bind(this)
);
}
private async handlePaymentEvent(event: any) {
if (event.eventType === 'payment.successful') {
await this.createOrderFromPayment(event.data);
}
}
}Email Service Usage
// In Email Service
import { createConsumer, subscribeToTopic, KAFKA_TOPICS } from '@repo/kafka';
export class EmailService {
async initialize() {
const consumer = createConsumer('email-service-group');
// Subscribe to multiple topics
await subscribeToTopic(
consumer,
KAFKA_TOPICS.USER_EVENTS,
this.handleUserEvent.bind(this)
);
await subscribeToTopic(
consumer,
KAFKA_TOPICS.ORDER_EVENTS,
this.handleOrderEvent.bind(this)
);
}
private async handleUserEvent(event: any) {
if (event.eventType === 'user.created') {
await this.sendWelcomeEmail(event.data);
}
}
private async handleOrderEvent(event: any) {
if (event.eventType === 'order.created') {
await this.sendOrderConfirmation(event.data);
}
}
}🛡️ Error Handling & Resilience
Producer Error Handling
export const publishEventWithRetry = async (
topic: string,
event: any,
maxRetries: number = 3
) => {
for (let attempt = 1; attempt <= maxRetries; attempt++) {
try {
await publishEvent(topic, event);
return;
} catch (error) {
console.error(`Failed to publish event (attempt ${attempt}/${maxRetries}):`, error);
if (attempt === maxRetries) {
throw new Error(`Failed to publish event after ${maxRetries} attempts`);
}
// Exponential backoff
await new Promise(resolve =>
setTimeout(resolve, Math.pow(2, attempt) * 1000)
);
}
}
};Consumer Error Handling
export const subscribeWithErrorHandling = async (
consumer: any,
topic: string,
handler: (event: any) => Promise<void>
) => {
await consumer.subscribe({ topic, fromBeginning: false });
await consumer.run({
eachMessage: async ({ message }) => {
try {
const event = JSON.parse(message.value?.toString() || '{}');
await handler(event);
} catch (error) {
console.error('Error processing message:', error);
// Send to dead letter queue
await sendToDeadLetterQueue(topic, message, error);
}
},
});
};🚀 Performance & Scalability
Producer Optimization
- Batching: Batch multiple events for efficient sending
- Compression: Enable message compression for better throughput
- Partitioning: Proper partitioning strategy for load distribution
Consumer Optimization
- Consumer Groups: Multiple consumers for parallel processing
- Offset Management: Proper offset commit strategies
- Rebalancing: Handle partition rebalancing gracefully
📊 Monitoring & Observability
Metrics Collection
// Producer metrics
const producerMetrics = {
messagesSent: 0,
messagesFailed: 0,
averageLatency: 0,
};
// Consumer metrics
const consumerMetrics = {
messagesProcessed: 0,
messagesFailed: 0,
lag: 0,
};Health Checks
export const checkKafkaHealth = async () => {
try {
// Check producer connection
await producer.send({
topic: 'health-check',
messages: [{ value: 'ping' }],
});
// Check consumer connection
const admin = kafkaClient.admin();
await admin.connect();
const topics = await admin.listTopics();
await admin.disconnect();
return {
status: 'healthy',
topics: topics.length,
};
} catch (error) {
return {
status: 'unhealthy',
error: error.message,
};
}
};🔧 Development & Deployment
Environment Configuration
# Kafka Configuration
KAFKA_BROKERS=localhost:9092
KAFKA_CLIENT_ID=ecommerce-app
KAFKA_LOG_LEVEL=WARN
# Producer Configuration
KAFKA_PRODUCER_ALLOW_AUTO_TOPIC_CREATION=false
KAFKA_PRODUCER_TRANSACTION_TIMEOUT=30000
# Consumer Configuration
KAFKA_CONSUMER_GROUP_ID=your-service-group
KAFKA_CONSUMER_SESSION_TIMEOUT=30000Local Development Setup
# Start Kafka cluster
cd packages/kafka
docker-compose up -d
# Verify Kafka is running
docker-compose ps
# Check logs if needed
docker-compose logs kafkaService Integration
// In each service's entry point
import { connectProducer, disconnectProducer } from '@repo/kafka';
export const startService = async () => {
try {
// Connect to Kafka
await connectProducer();
// Start your service
app.listen(PORT, () => {
console.log(`Service running on port ${PORT}`);
});
// Graceful shutdown
process.on('SIGTERM', async () => {
await disconnectProducer();
process.exit(0);
});
} catch (error) {
console.error('Failed to start service:', error);
process.exit(1);
}
};🧪 Testing
Unit Tests
import { publishEvent, KAFKA_TOPICS } from '@repo/kafka';
describe('Kafka Producer', () => {
beforeAll(async () => {
await connectProducer();
});
afterAll(async () => {
await disconnectProducer();
});
it('should publish event successfully', async () => {
const event = {
eventId: 'test-event-id',
eventType: 'test.event',
timestamp: new Date().toISOString(),
data: { test: 'data' },
};
// Mock the producer.send method to avoid actual Kafka call
const sendSpy = jest.spyOn(producer, 'send').mockResolvedValue();
await publishEvent(KAFKA_TOPICS.PRODUCT_EVENTS, event);
expect(sendSpy).toHaveBeenCalledWith({
topic: KAFKA_TOPICS.PRODUCT_EVENTS,
messages: [
expect.objectContaining({
key: event.eventId,
value: JSON.stringify(event),
}),
],
});
});
});Integration Tests
describe('Kafka Integration', () => {
let consumer: any;
let producer: any;
beforeAll(async () => {
// Set up test consumer and producer
consumer = createConsumer('test-group');
await connectConsumer(consumer);
await connectProducer();
});
afterAll(async () => {
await disconnectConsumer(consumer);
await disconnectProducer();
});
it('should handle event publishing and consumption', async () => {
const testEvent = {
eventId: 'integration-test-event',
eventType: 'test.integration',
timestamp: new Date().toISOString(),
data: { test: 'integration-data' },
};
// Subscribe to test topic
await subscribeToTopic(consumer, 'test.events', async (event) => {
expect(event).toEqual(testEvent);
});
// Publish test event
await publishEvent('test.events', testEvent);
// Wait for event processing
await new Promise(resolve => setTimeout(resolve, 1000));
});
});🔮 Future Enhancements
Planned Features
- Schema Registry: Confluent Schema Registry integration for event schemas
- Dead Letter Queue: Proper dead letter queue implementation
- Event Sourcing: Event sourcing patterns for audit trails
- Stream Processing: Kafka Streams for real-time data processing
- Monitoring Dashboard: Grafana dashboard for Kafka metrics
Performance Improvements
- Connection Pooling: Advanced connection pool management
- Message Batching: Optimized batch processing
- Partitioning Strategy: Dynamic partitioning based on event types
- Compression: Message compression for better throughput
Resilience Enhancements
- Circuit Breaker: Circuit breaker pattern for Kafka operations
- Fallback Strategies: Fallback mechanisms when Kafka is unavailable
- Idempotency: Ensure idempotent event processing
- Transaction Support: Kafka transaction support for consistency