E-commerce Docs
📚 Shared Packages

Kafka Package

Shared Kafka client, producer, and consumer configurations

Kafka Package

The Kafka Package (@repo/kafka) provides centralized Apache Kafka client configurations, producer, and consumer utilities used across all microservices in the e-commerce platform. This ensures consistent event streaming and messaging throughout the system.

🛠️ Technology Stack

  • Apache Kafka: Distributed event streaming platform
  • Node.js Client: kafka-js for Kafka connectivity
  • TypeScript: Full type safety for Kafka operations
  • Docker: Kafka cluster setup with Docker Compose
  • Configuration: Centralized Kafka configuration management

📁 Package Structure

packages/kafka/
├── src/
│   ├── client.ts        # Kafka client configuration
│   ├── producer.ts      # Kafka producer utilities
│   ├── consumer.ts      # Kafka consumer utilities
│   └── index.ts         # Main exports
├── docker-compose.yml   # Kafka cluster setup
└── package.json

🏗️ Architecture

Kafka Cluster Setup

Docker Compose Configuration

version: '3.8'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.4.0
    hostname: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  kafka:
    image: confluentinc/cp-kafka:7.4.0
    hostname: kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:29092,PLAINTEXT_HOST://0.0.0.0:9092
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
    depends_on:
      - zookeeper

Topic Configuration

The Kafka cluster includes predefined topics for different event types:

export const KAFKA_TOPICS = {
  PRODUCT_EVENTS: 'product.events',
  ORDER_EVENTS: 'order.events',
  USER_EVENTS: 'user.events',
  PAYMENT_EVENTS: 'payment.events',
} as const;

🔧 Core Components

Kafka Client

Client Configuration

// packages/kafka/src/client.ts
import { Kafka, logLevel } from 'kafkajs';

export const kafkaClient = new Kafka({
  clientId: 'ecommerce-app',
  brokers: process.env.KAFKA_BROKERS?.split(',') || ['localhost:9092'],
  logLevel: logLevel.WARN,
  retry: {
    initialRetryTime: 100,
    retries: 8,
  },
});

Client Features

  • Connection Management: Automatic connection handling and reconnection
  • Error Handling: Comprehensive error handling and logging
  • Configuration: Environment-based broker configuration

Producer Utilities

Producer Setup

// packages/kafka/src/producer.ts
import { kafkaClient } from './client';

export const producer = kafkaClient.producer({
  allowAutoTopicCreation: false,
  transactionTimeout: 30000,
});

export const connectProducer = async () => {
  try {
    await producer.connect();
    console.log('Kafka Producer connected successfully');
  } catch (error) {
    console.error('Failed to connect Kafka Producer:', error);
    throw error;
  }
};

export const disconnectProducer = async () => {
  try {
    await producer.disconnect();
    console.log('Kafka Producer disconnected successfully');
  } catch (error) {
    console.error('Failed to disconnect Kafka Producer:', error);
  }
};

Event Publishing

export const publishEvent = async (
  topic: string,
  event: {
    eventId: string;
    eventType: string;
    timestamp: string;
    data: any;
  }
) => {
  try {
    await producer.send({
      topic,
      messages: [
        {
          key: event.eventId,
          value: JSON.stringify(event),
          timestamp: new Date(event.timestamp).getTime(),
        },
      ],
    });

    console.log(`Event published to ${topic}:`, event.eventType);
  } catch (error) {
    console.error(`Failed to publish event to ${topic}:`, error);
    throw error;
  }
};

Consumer Utilities

Consumer Setup

// packages/kafka/src/consumer.ts
import { kafkaClient } from './client';

export const createConsumer = (groupId: string) => {
  return kafkaClient.consumer({
    groupId,
    allowAutoTopicCreation: false,
    sessionTimeout: 30000,
    heartbeatInterval: 3000,
  });
};

export const connectConsumer = async (consumer: any) => {
  try {
    await consumer.connect();
    console.log('Kafka Consumer connected successfully');
  } catch (error) {
    console.error('Failed to connect Kafka Consumer:', error);
    throw error;
  }
};

export const disconnectConsumer = async (consumer: any) => {
  try {
    await consumer.disconnect();
    console.log('Kafka Consumer disconnected successfully');
  } catch (error) {
    console.error('Failed to disconnect Kafka Consumer:', error);
  }
};

Event Subscription

export const subscribeToTopic = async (
  consumer: any,
  topic: string,
  handler: (event: any) => Promise<void>
) => {
  try {
    await consumer.subscribe({
      topic,
      fromBeginning: false,
    });

    await consumer.run({
      eachMessage: async ({ topic, partition, message }) => {
        try {
          const event = JSON.parse(message.value?.toString() || '{}');

          console.log(`Processing event from ${topic}:`, event.eventType);

          await handler(event);
        } catch (error) {
          console.error('Error processing message:', error);
          // Implement dead letter queue logic here
        }
      },
    });
  } catch (error) {
    console.error(`Failed to subscribe to topic ${topic}:`, error);
    throw error;
  }
};

🔄 Usage Across Services

Product Service Usage

// In Product Service
import { producer, publishEvent, KAFKA_TOPICS } from '@repo/kafka';

export class ProductService {
  async createProduct(productData: CreateProductData) {
    // Create product in database
    const product = await Product.create(productData);

    // Publish event to Kafka
    await publishEvent(KAFKA_TOPICS.PRODUCT_EVENTS, {
      eventId: generateId(),
      eventType: 'product.created',
      timestamp: new Date().toISOString(),
      data: {
        id: product.id,
        name: product.name,
        price: product.price,
        categorySlug: product.categorySlug,
        sizes: product.sizes,
        colors: product.colors,
        images: product.images,
      },
    });

    return product;
  }
}

Order Service Usage

// In Order Service
import { createConsumer, subscribeToTopic, KAFKA_TOPICS } from '@repo/kafka';

export class OrderService {
  async initialize() {
    const consumer = createConsumer('order-service-group');

    await subscribeToTopic(
      consumer,
      KAFKA_TOPICS.PAYMENT_EVENTS,
      this.handlePaymentEvent.bind(this)
    );
  }

  private async handlePaymentEvent(event: any) {
    if (event.eventType === 'payment.successful') {
      await this.createOrderFromPayment(event.data);
    }
  }
}

Email Service Usage

// In Email Service
import { createConsumer, subscribeToTopic, KAFKA_TOPICS } from '@repo/kafka';

export class EmailService {
  async initialize() {
    const consumer = createConsumer('email-service-group');

    // Subscribe to multiple topics
    await subscribeToTopic(
      consumer,
      KAFKA_TOPICS.USER_EVENTS,
      this.handleUserEvent.bind(this)
    );

    await subscribeToTopic(
      consumer,
      KAFKA_TOPICS.ORDER_EVENTS,
      this.handleOrderEvent.bind(this)
    );
  }

  private async handleUserEvent(event: any) {
    if (event.eventType === 'user.created') {
      await this.sendWelcomeEmail(event.data);
    }
  }

  private async handleOrderEvent(event: any) {
    if (event.eventType === 'order.created') {
      await this.sendOrderConfirmation(event.data);
    }
  }
}

🛡️ Error Handling & Resilience

Producer Error Handling

export const publishEventWithRetry = async (
  topic: string,
  event: any,
  maxRetries: number = 3
) => {
  for (let attempt = 1; attempt <= maxRetries; attempt++) {
    try {
      await publishEvent(topic, event);
      return;
    } catch (error) {
      console.error(`Failed to publish event (attempt ${attempt}/${maxRetries}):`, error);

      if (attempt === maxRetries) {
        throw new Error(`Failed to publish event after ${maxRetries} attempts`);
      }

      // Exponential backoff
      await new Promise(resolve =>
        setTimeout(resolve, Math.pow(2, attempt) * 1000)
      );
    }
  }
};

Consumer Error Handling

export const subscribeWithErrorHandling = async (
  consumer: any,
  topic: string,
  handler: (event: any) => Promise<void>
) => {
  await consumer.subscribe({ topic, fromBeginning: false });

  await consumer.run({
    eachMessage: async ({ message }) => {
      try {
        const event = JSON.parse(message.value?.toString() || '{}');
        await handler(event);
      } catch (error) {
        console.error('Error processing message:', error);

        // Send to dead letter queue
        await sendToDeadLetterQueue(topic, message, error);
      }
    },
  });
};

🚀 Performance & Scalability

Producer Optimization

  • Batching: Batch multiple events for efficient sending
  • Compression: Enable message compression for better throughput
  • Partitioning: Proper partitioning strategy for load distribution

Consumer Optimization

  • Consumer Groups: Multiple consumers for parallel processing
  • Offset Management: Proper offset commit strategies
  • Rebalancing: Handle partition rebalancing gracefully

📊 Monitoring & Observability

Metrics Collection

// Producer metrics
const producerMetrics = {
  messagesSent: 0,
  messagesFailed: 0,
  averageLatency: 0,
};

// Consumer metrics
const consumerMetrics = {
  messagesProcessed: 0,
  messagesFailed: 0,
  lag: 0,
};

Health Checks

export const checkKafkaHealth = async () => {
  try {
    // Check producer connection
    await producer.send({
      topic: 'health-check',
      messages: [{ value: 'ping' }],
    });

    // Check consumer connection
    const admin = kafkaClient.admin();
    await admin.connect();

    const topics = await admin.listTopics();
    await admin.disconnect();

    return {
      status: 'healthy',
      topics: topics.length,
    };
  } catch (error) {
    return {
      status: 'unhealthy',
      error: error.message,
    };
  }
};

🔧 Development & Deployment

Environment Configuration

# Kafka Configuration
KAFKA_BROKERS=localhost:9092
KAFKA_CLIENT_ID=ecommerce-app
KAFKA_LOG_LEVEL=WARN

# Producer Configuration
KAFKA_PRODUCER_ALLOW_AUTO_TOPIC_CREATION=false
KAFKA_PRODUCER_TRANSACTION_TIMEOUT=30000

# Consumer Configuration
KAFKA_CONSUMER_GROUP_ID=your-service-group
KAFKA_CONSUMER_SESSION_TIMEOUT=30000

Local Development Setup

# Start Kafka cluster
cd packages/kafka
docker-compose up -d

# Verify Kafka is running
docker-compose ps

# Check logs if needed
docker-compose logs kafka

Service Integration

// In each service's entry point
import { connectProducer, disconnectProducer } from '@repo/kafka';

export const startService = async () => {
  try {
    // Connect to Kafka
    await connectProducer();

    // Start your service
    app.listen(PORT, () => {
      console.log(`Service running on port ${PORT}`);
    });

    // Graceful shutdown
    process.on('SIGTERM', async () => {
      await disconnectProducer();
      process.exit(0);
    });
  } catch (error) {
    console.error('Failed to start service:', error);
    process.exit(1);
  }
};

🧪 Testing

Unit Tests

import { publishEvent, KAFKA_TOPICS } from '@repo/kafka';

describe('Kafka Producer', () => {
  beforeAll(async () => {
    await connectProducer();
  });

  afterAll(async () => {
    await disconnectProducer();
  });

  it('should publish event successfully', async () => {
    const event = {
      eventId: 'test-event-id',
      eventType: 'test.event',
      timestamp: new Date().toISOString(),
      data: { test: 'data' },
    };

    // Mock the producer.send method to avoid actual Kafka call
    const sendSpy = jest.spyOn(producer, 'send').mockResolvedValue();

    await publishEvent(KAFKA_TOPICS.PRODUCT_EVENTS, event);

    expect(sendSpy).toHaveBeenCalledWith({
      topic: KAFKA_TOPICS.PRODUCT_EVENTS,
      messages: [
        expect.objectContaining({
          key: event.eventId,
          value: JSON.stringify(event),
        }),
      ],
    });
  });
});

Integration Tests

describe('Kafka Integration', () => {
  let consumer: any;
  let producer: any;

  beforeAll(async () => {
    // Set up test consumer and producer
    consumer = createConsumer('test-group');
    await connectConsumer(consumer);

    await connectProducer();
  });

  afterAll(async () => {
    await disconnectConsumer(consumer);
    await disconnectProducer();
  });

  it('should handle event publishing and consumption', async () => {
    const testEvent = {
      eventId: 'integration-test-event',
      eventType: 'test.integration',
      timestamp: new Date().toISOString(),
      data: { test: 'integration-data' },
    };

    // Subscribe to test topic
    await subscribeToTopic(consumer, 'test.events', async (event) => {
      expect(event).toEqual(testEvent);
    });

    // Publish test event
    await publishEvent('test.events', testEvent);

    // Wait for event processing
    await new Promise(resolve => setTimeout(resolve, 1000));
  });
});

🔮 Future Enhancements

Planned Features

  • Schema Registry: Confluent Schema Registry integration for event schemas
  • Dead Letter Queue: Proper dead letter queue implementation
  • Event Sourcing: Event sourcing patterns for audit trails
  • Stream Processing: Kafka Streams for real-time data processing
  • Monitoring Dashboard: Grafana dashboard for Kafka metrics

Performance Improvements

  • Connection Pooling: Advanced connection pool management
  • Message Batching: Optimized batch processing
  • Partitioning Strategy: Dynamic partitioning based on event types
  • Compression: Message compression for better throughput

Resilience Enhancements

  • Circuit Breaker: Circuit breaker pattern for Kafka operations
  • Fallback Strategies: Fallback mechanisms when Kafka is unavailable
  • Idempotency: Ensure idempotent event processing
  • Transaction Support: Kafka transaction support for consistency