Код IT
← Каталог

Java-приложение с Apache Kafka и PostgreSQL — consumer

Фрагмент из «Java-приложение с Apache Kafka и PostgreSQL»: consumer.

java infra-securityencyclopedia8-05-mikroservisy-i-integratsiya-1191 embed URL статья в энциклопедии
Java main.java
// src/main/java/ru/timur/crm/consumer/CustomerConsumer.java
package ru.timur.crm.consumer;

import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import ru.timur.crm.config.KafkaConfig;
import ru.timur.crm.model.Customer;
import ru.timur.crm.service.CustomerService;

import java.sql.SQLException;
import java.time.Duration;
import java.util.Collections;

public class CustomerConsumer implements Runnable {
    private final KafkaConsumer<String, Customer> consumer;
    private final CustomerService customerService;

    public CustomerConsumer(String groupId) throws SQLException {
        this.consumer = new KafkaConsumer<>(KafkaConfig.consumerProps(groupId));
        this.customerService = new CustomerService();
        this.consumer.subscribe(Collections.singletonList(KafkaConfig.CUSTOMER_TOPIC));
    }

    @Override
    public void run() {
        try {
            while (!Thread.currentThread().isInterrupted()) {
                ConsumerRecords<String, Customer> records =
                        consumer.poll(Duration.ofMillis(1000));

                for (var record : records) {
                    Customer customer = record.value();
                    System.out.println("Получен клиент: " + customer);

                    // Сохранение в базу данных
                    try {
                        customerService.saveCustomer(customer);
                        System.out.println("Клиент сохранён в БД: " + customer.getId());
                    } catch (SQLException e) {
                        System.err.println("Ошибка сохранения клиента в БД: " + e.getMessage());
                        // В продакшене здесь нужно реализовать retry-логику или DLQ
                    }
                }

                // Коммит оффсетов после успешной обработки
                if (!records.isEmpty()) {
                    consumer.commitSync();
                }
            }
        } catch (Exception e) {
            if (!(e instanceof InterruptedException)) {
                e.printStackTrace();
            }
        } finally {
            try {
                customerService.close();
                consumer.close();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}
// src/main/java/ru/timur/crm/consumer/CustomerConsumer.java
package ru.timur.crm.consumer;

import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import ru.timur.crm.config.KafkaConfig;
import ru.timur.crm.model.Customer;
import ru.timur.crm.service.CustomerService;

import java.sql.SQLException;
import java.time.Duration;
import java.util.Collections;

public class CustomerConsumer implements Runnable {
    private final KafkaConsumer<String, Customer> consumer;
    private final CustomerService customerService;

    public CustomerConsumer(String groupId) throws SQLException {
        this.consumer = new KafkaConsumer<>(KafkaConfig.consumerProps(groupId));
        this.customerService = new CustomerService();
        this.consumer.subscribe(Collections.singletonList(KafkaConfig.CUSTOMER_TOPIC));
    }

    @Override
    public void run() {
        try {
            while (!Thread.currentThread().isInterrupted()) {
                ConsumerRecords<String, Customer> records =
                        consumer.poll(Duration.ofMillis(1000));

                for (var record : records) {
                    Customer customer = record.value();
                    System.out.println("Получен клиент: " + customer);

                    // Сохранение в базу данных
                    try {
                        customerService.saveCustomer(customer);
                        System.out.println("Клиент сохранён в БД: " + customer.getId());
                    } catch (SQLException e) {
                        System.err.println("Ошибка сохранения клиента в БД: " + e.getMessage());
                        // В продакшене здесь нужно реализовать retry-логику или DLQ
                    }
                }

                // Коммит оффсетов после успешной обработки
                if (!records.isEmpty()) {
                    consumer.commitSync();
                }
            }
        } catch (Exception e) {
            if (!(e instanceof InterruptedException)) {
                e.printStackTrace();
            }
        } finally {
            try {
                customerService.close();
                consumer.close();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}