Код IT
← Каталог

Реактивная коммуникация — Python с confluent-kafka

Фрагмент из «Реактивная коммуникация»: Python с confluent-kafka.

python infra-securityencyclopedia8-05-mikroservisy-i-integratsiya-116 embed URL статья в энциклопедии
Python main.py

import faust

from datetime import datetime

app = faust.App(
    'order-processing',
    broker='kafka://localhost:9092',
    store='memory://',
    version=1
)

class OrderEvent(faust.Record, serializer='json'):
    type: str
    order_id: str
    customer_id: str
    items: list
    timestamp: str

orders_topic = app.topic('orders', value_type=OrderEvent)

# Таблица для агрегации по клиентам
customer_orders = app.Table(
    'customer_orders',
    default=int
)

@app.agent(orders_topic)
async def process_orders(stream):
    async for event in stream:
        if event.type == 'OrderCreated':
            print(f"Обработка заказа {event.order_id}")
            
            # Инкремент счётчика заказов клиента
            customer_orders[event.customer_id] += 1
            
            # Логика обработки...
            await process_order(event)

async def process_order(event):
    # Бизнес-логика
    pass

# Периодическая задача для отчётов
@app.timer(interval=60.0)
async def report_stats():
    total = sum(customer_orders.values())
    print(f"Статистика: всего заказов = {total}")
    for customer, count in customer_orders.items():
        print(f"  Клиент {customer}: {count} заказов")

if __name__ == '__main__':
    app.main()

import faust

from datetime import datetime

app = faust.App(
    'order-processing',
    broker='kafka://localhost:9092',
    store='memory://',
    version=1
)

class OrderEvent(faust.Record, serializer='json'):
    type: str
    order_id: str
    customer_id: str
    items: list
    timestamp: str

orders_topic = app.topic('orders', value_type=OrderEvent)

# Таблица для агрегации по клиентам
customer_orders = app.Table(
    'customer_orders',
    default=int
)

@app.agent(orders_topic)
async def process_orders(stream):
    async for event in stream:
        if event.type == 'OrderCreated':
            print(f"Обработка заказа {event.order_id}")
            
            # Инкремент счётчика заказов клиента
            customer_orders[event.customer_id] += 1
            
            # Логика обработки...
            await process_order(event)

async def process_order(event):
    # Бизнес-логика
    pass

# Периодическая задача для отчётов
@app.timer(interval=60.0)
async def report_stats():
    total = sum(customer_orders.values())
    print(f"Статистика: всего заказов = {total}")
    for customer, count in customer_orders.items():
        print(f"  Клиент {customer}: {count} заказов")

if __name__ == '__main__':
    app.main()