Для проекта я пытаюсь регистрировать основные транзакции пользователя, такие как добавление и удаление элемента и для нескольких типов элементов, и отправлять сообщение в kafka для каждой транзакции. Точность механизма регистрации не имеет решающего значения, и я не хочу, чтобы он блокировал мой бизнес-код в случае простоя сервера kafka. В этом случае лучше использовать асинхронный подход для отправки данных в kafka.
Мой код производителя kafka находится в его загрузочном проекте. Чтобы сделать его асинхронным, мне просто нужно добавить две аннотации: @EnableAsync и @Async.
@EnableAsync будет использоваться в вашем классе конфигурации (также помните, что ваш класс с @SpringBootApplication также является классом конфигурации) и попытается найти bean-компонент TaskExecutor. Если нет, то создается SimpleAsyncTaskExecutor. SimpleAsyncTaskExecutor подходит для игрушечных проектов, но для чего-то большего, чем это, это немного рискованно, поскольку не ограничивает параллельные потоки и не использует потоки повторно. Чтобы быть в безопасности, мы также добавим bean-компонент executor.
Так,
|
1
2
3
4
5
6
|
@SpringBootApplicationpublic class KafkaUtilsApplication { public static void main(String[] args) { SpringApplication.run(KafkaUtilsApplication.class, args); }} |
станет
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
|
@EnableAsync@SpringBootApplicationpublic class KafkaUtilsApplication { public static void main(String[] args) { SpringApplication.run(KafkaUtilsApplication.class, args); } @Bean public Executor taskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(2); executor.setMaxPoolSize(2); executor.setQueueCapacity(500); executor.setThreadNamePrefix("KafkaMsgExecutor-"); executor.initialize(); return executor; }} |
Как видите, здесь не так много изменений. Значения по умолчанию, которые я установил, должны быть настроены в соответствии с потребностями вашего приложения.
Второе, что нам нужно, это добавление @Async.
Мой старый код был:
|
01
02
03
04
05
06
07
08
09
10
11
12
13
|
@Servicepublic class KafkaProducerServiceImpl implements KafkaProducerService { private static final String TOPIC = "logs"; @Autowired private KafkaTemplate<String, KafkaInfo> kafkaTemplate; @Override public void sendMessage(String id, KafkaType kafkaType, KafkaStatus kafkaStatus) { kafkaTemplate.send(TOPIC, new KafkaInfo(id, kafkaType, kafkaStatus); }} |
Как видите, код синхронизации довольно прост. Он просто берет kafkaTemplate и отправляет объект сообщения в раздел «logs». Мой новый код немного длиннее.
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
@Servicepublic class KafkaProducerServiceImpl implements KafkaProducerService { private static final String TOPIC = "logs"; @Autowired private KafkaTemplate kafkaTemplate; @Async @Override public void sendMessage(String id, KafkaType kafkaType, KafkaStatus kafkaStatus) { ListenableFuture<SendResult<String, KafkaInfo>> future = kafkaTemplate.send(TOPIC, new KafkaInfo(id, kafkaType, kafkaStatus)); future.addCallback(new ListenableFutureCallback<>() { @Override public void onSuccess(final SendResult<String, KafkaInfo> message) { // left empty intentionally } @Override public void onFailure(final Throwable throwable) { // left empty intentionally } }); }} |
Здесь onSuccess () не очень важен для меня. Но с помощью onFailure () я могу зарегистрировать исключение, чтобы получить информацию о проблеме с моим сервером kafka.
Есть еще одна вещь, которую я должен поделиться с вами. Для отправки объекта через kafkatemplate я должен снабдить его файлом сериализатора, который у меня есть.
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
|
public class KafkaInfoSerializer implements Serializer<kafkainfo> { @Override public void configure(Map map, boolean b) { } @Override public byte[] serialize(String arg0, KafkaInfo info) { byte[] retVal = null; ObjectMapper objectMapper = new ObjectMapper(); try { retVal = objectMapper.writeValueAsString(info).getBytes(); } catch (Exception e) { // log the exception } return retVal; } @Override public void close() { }} |
Также не забудьте добавить конфигурацию для него. Существует несколько способов определения сериализаторов для кафки. Один из самых простых способов — добавить его в application.properties.
spring.kafka.producer.key-serializer = org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer = com.sezinkarli.kafkautils.serializer.KafkaInfoSerializer
Теперь у вас есть загрузочный проект, который может отправлять асинхронные объекты в нужную тему.
|
Опубликовано на Java Code Geeks с разрешения Сезина Карли, партнера нашей программы JCG . Смотрите оригинальную статью здесь: отправьте свои данные асинхронно на Kafka Мнения, высказанные участниками Java Code Geeks, являются их собственными. |