Статьи

Отправьте ваши данные асинхронно на Kafka

Для проекта я пытаюсь регистрировать основные транзакции пользователя, такие как добавление и удаление элемента и для нескольких типов элементов, и отправлять сообщение в kafka для каждой транзакции. Точность механизма регистрации не имеет решающего значения, и я не хочу, чтобы он блокировал мой бизнес-код в случае простоя сервера kafka. В этом случае лучше использовать асинхронный подход для отправки данных в kafka.

Мой код производителя kafka находится в его загрузочном проекте. Чтобы сделать его асинхронным, мне просто нужно добавить две аннотации: @EnableAsync и @Async.

@EnableAsync будет использоваться в вашем классе конфигурации (также помните, что ваш класс с @SpringBootApplication также является классом конфигурации) и попытается найти bean-компонент TaskExecutor. Если нет, то создается SimpleAsyncTaskExecutor. SimpleAsyncTaskExecutor подходит для игрушечных проектов, но для чего-то большего, чем это, это немного рискованно, поскольку не ограничивает параллельные потоки и не использует потоки повторно. Чтобы быть в безопасности, мы также добавим bean-компонент executor.

Так,

1
2
3
4
5
6
@SpringBootApplication
public class KafkaUtilsApplication {
    public static void main(String[] args) {
        SpringApplication.run(KafkaUtilsApplication.class, args);
    }
}

станет

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
@EnableAsync
@SpringBootApplication
public class KafkaUtilsApplication {
    public static void main(String[] args) {
        SpringApplication.run(KafkaUtilsApplication.class, args);
    }
 
    @Bean
    public Executor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(2);
        executor.setMaxPoolSize(2);
        executor.setQueueCapacity(500);
        executor.setThreadNamePrefix("KafkaMsgExecutor-");
        executor.initialize();
        return executor;
    }
}

Как видите, здесь не так много изменений. Значения по умолчанию, которые я установил, должны быть настроены в соответствии с потребностями вашего приложения.

Второе, что нам нужно, это добавление @Async.

Мой старый код был:

01
02
03
04
05
06
07
08
09
10
11
12
13
@Service
public class KafkaProducerServiceImpl implements KafkaProducerService {
 
    private static final String TOPIC = "logs";
 
    @Autowired
    private KafkaTemplate<String, KafkaInfo> kafkaTemplate;
 
    @Override
    public void sendMessage(String id, KafkaType kafkaType, KafkaStatus kafkaStatus) {
        kafkaTemplate.send(TOPIC, new KafkaInfo(id, kafkaType, kafkaStatus);
    }
}

Как видите, код синхронизации довольно прост. Он просто берет kafkaTemplate и отправляет объект сообщения в раздел «logs». Мой новый код немного длиннее.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@Service
public class KafkaProducerServiceImpl implements KafkaProducerService {
 
    private static final String TOPIC = "logs";
 
    @Autowired
    private KafkaTemplate kafkaTemplate;
 
    @Async
    @Override
    public void sendMessage(String id, KafkaType kafkaType, KafkaStatus kafkaStatus) {
        ListenableFuture<SendResult<String, KafkaInfo>> future = kafkaTemplate.send(TOPIC, new KafkaInfo(id, kafkaType, kafkaStatus));
        future.addCallback(new ListenableFutureCallback<>() {
            @Override
            public void onSuccess(final SendResult<String, KafkaInfo> message) {
                // left empty intentionally
            }
 
            @Override
            public void onFailure(final Throwable throwable) {
                // left empty intentionally
 
            }
        });
    }
}

Здесь onSuccess () не очень важен для меня. Но с помощью onFailure () я могу зарегистрировать исключение, чтобы получить информацию о проблеме с моим сервером kafka.

Есть еще одна вещь, которую я должен поделиться с вами. Для отправки объекта через kafkatemplate я должен снабдить его файлом сериализатора, который у меня есть.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
public class KafkaInfoSerializer implements Serializer<kafkainfo> {
 
    @Override
    public void configure(Map map, boolean b) {
    }
 
    @Override
    public byte[] serialize(String arg0, KafkaInfo info) {
        byte[] retVal = null;
        ObjectMapper objectMapper = new ObjectMapper();
        try {
            retVal = objectMapper.writeValueAsString(info).getBytes();
        } catch (Exception e) {
            // log the exception
        }
        return retVal;
    }
 
    @Override
    public void close() {
    }
}

Также не забудьте добавить конфигурацию для него. Существует несколько способов определения сериализаторов для кафки. Один из самых простых способов — добавить его в application.properties.

spring.kafka.producer.key-serializer = org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer = com.sezinkarli.kafkautils.serializer.KafkaInfoSerializer

Теперь у вас есть загрузочный проект, который может отправлять асинхронные объекты в нужную тему.

Опубликовано на Java Code Geeks с разрешения Сезина Карли, партнера нашей программы JCG . Смотрите оригинальную статью здесь: отправьте свои данные асинхронно на Kafka

Мнения, высказанные участниками Java Code Geeks, являются их собственными.