Код IT
← Каталог

Реактивная коммуникация — Python с confluent-kafka

Фрагмент из «Реактивная коммуникация»: Python с confluent-kafka.

python infra-securityencyclopedia8-05-mikroservisy-i-integratsiya-116 embed URL статья в энциклопедии
Python main.py
from confluent_kafka import Consumer, KafkaException

import json

class OrderConsumer:
    def __init__(self, bootstrap_servers, group_id):
        self.consumer = Consumer({
            'bootstrap.servers': bootstrap_servers,
            'group.id': group_id,
            'auto.offset.reset': 'earliest',
            'enable.auto.commit': False,
            'session.timeout.ms': 30000
        })
        self.topic = 'orders'
    
    def start(self):
        self.consumer.subscribe([self.topic])
        
        try:
            while True:
                msg = self.consumer.poll(timeout=1.0)
                
                if msg is None:
                    continue
                
                if msg.error():
                    raise KafkaException(msg.error())
                
                try:
                    event = json.loads(msg.value().decode('utf-8'))
                    self.process_event(event)
                    
                    # Подтверждение обработки
                    self.consumer.commit(asynchronous=False)
                    
                except json.JSONDecodeError as e:
                    print(f'Ошибка парсинга JSON: {e}')
                except Exception as e:
                    print(f'Ошибка обработки события: {e}')
                    # Можно отправить в DLQ (dead letter queue)
        
        except KeyboardInterrupt:
            print('Потребление остановлено')
        finally:
            self.consumer.close()
    
    def process_event(self, event):
        event_type = event.get('type')
        
        if event_type == 'OrderCreated':
            print(f"Создан заказ: {event['order_id']}")
            # Логика обработки...
        elif event_type == 'OrderCompleted':
            print(f"Завершён заказ: {event['order_id']}")
        else:
            print(f"Неизвестный тип события: {event_type}")

# Использование
consumer = OrderConsumer('localhost:9092', 'order-processing-group')
consumer.start()
from confluent_kafka import Consumer, KafkaException

import json

class OrderConsumer:
    def __init__(self, bootstrap_servers, group_id):
        self.consumer = Consumer({
            'bootstrap.servers': bootstrap_servers,
            'group.id': group_id,
            'auto.offset.reset': 'earliest',
            'enable.auto.commit': False,
            'session.timeout.ms': 30000
        })
        self.topic = 'orders'
    
    def start(self):
        self.consumer.subscribe([self.topic])
        
        try:
            while True:
                msg = self.consumer.poll(timeout=1.0)
                
                if msg is None:
                    continue
                
                if msg.error():
                    raise KafkaException(msg.error())
                
                try:
                    event = json.loads(msg.value().decode('utf-8'))
                    self.process_event(event)
                    
                    # Подтверждение обработки
                    self.consumer.commit(asynchronous=False)
                    
                except json.JSONDecodeError as e:
                    print(f'Ошибка парсинга JSON: {e}')
                except Exception as e:
                    print(f'Ошибка обработки события: {e}')
                    # Можно отправить в DLQ (dead letter queue)
        
        except KeyboardInterrupt:
            print('Потребление остановлено')
        finally:
            self.consumer.close()
    
    def process_event(self, event):
        event_type = event.get('type')
        
        if event_type == 'OrderCreated':
            print(f"Создан заказ: {event['order_id']}")
            # Логика обработки...
        elif event_type == 'OrderCompleted':
            print(f"Завершён заказ: {event['order_id']}")
        else:
            print(f"Неизвестный тип события: {event_type}")

# Использование
consumer = OrderConsumer('localhost:9092', 'order-processing-group')
consumer.start()