Для проекта я пытаюсь регистрировать основные транзакции пользователя, такие как добавление и удаление элемента и для нескольких типов элементов, и отправлять сообщение в kafka для каждой транзакции. Точность механизма регистрации не имеет решающего значения, и я не хочу, чтобы он блокировал мой бизнес-код в случае простоя сервера kafka. В этом случае лучше использовать асинхронный подход для отправки данных в kafka.
Мой код производителя kafka находится в его загрузочном проекте. Чтобы сделать его асинхронным, мне просто нужно добавить две аннотации: @EnableAsync и @Async.
@EnableAsync будет использоваться в вашем классе конфигурации (также помните, что ваш класс с @SpringBootApplication также является классом конфигурации) и попытается найти bean-компонент TaskExecutor. Если нет, то создается SimpleAsyncTaskExecutor. SimpleAsyncTaskExecutor подходит для игрушечных проектов, но для чего-то большего, чем это, это немного рискованно, поскольку не ограничивает параллельные потоки и не использует потоки повторно. Чтобы быть в безопасности, мы также добавим bean-компонент executor.
Так,
1
2
3
4
5
6
|
@SpringBootApplication public class KafkaUtilsApplication { public static void main(String[] args) { SpringApplication.run(KafkaUtilsApplication. class , args); } } |
станет
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
|
@EnableAsync @SpringBootApplication public class KafkaUtilsApplication { public static void main(String[] args) { SpringApplication.run(KafkaUtilsApplication. class , args); } @Bean public Executor taskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize( 2 ); executor.setMaxPoolSize( 2 ); executor.setQueueCapacity( 500 ); executor.setThreadNamePrefix( "KafkaMsgExecutor-" ); executor.initialize(); return executor; } } |
Как видите, здесь не так много изменений. Значения по умолчанию, которые я установил, должны быть настроены в соответствии с потребностями вашего приложения.
Второе, что нам нужно, это добавление @Async.
Мой старый код был:
01
02
03
04
05
06
07
08
09
10
11
12
13
|
@Service public class KafkaProducerServiceImpl implements KafkaProducerService { private static final String TOPIC = "logs" ; @Autowired private KafkaTemplate<String, KafkaInfo> kafkaTemplate; @Override public void sendMessage(String id, KafkaType kafkaType, KafkaStatus kafkaStatus) { kafkaTemplate.send(TOPIC, new KafkaInfo(id, kafkaType, kafkaStatus); } } |
Как видите, код синхронизации довольно прост. Он просто берет kafkaTemplate и отправляет объект сообщения в раздел «logs». Мой новый код немного длиннее.
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
@Service public class KafkaProducerServiceImpl implements KafkaProducerService { private static final String TOPIC = "logs" ; @Autowired private KafkaTemplate kafkaTemplate; @Async @Override public void sendMessage(String id, KafkaType kafkaType, KafkaStatus kafkaStatus) { ListenableFuture<SendResult<String, KafkaInfo>> future = kafkaTemplate.send(TOPIC, new KafkaInfo(id, kafkaType, kafkaStatus)); future.addCallback( new ListenableFutureCallback<>() { @Override public void onSuccess( final SendResult<String, KafkaInfo> message) { // left empty intentionally } @Override public void onFailure( final Throwable throwable) { // left empty intentionally } }); } } |
Здесь onSuccess () не очень важен для меня. Но с помощью onFailure () я могу зарегистрировать исключение, чтобы получить информацию о проблеме с моим сервером kafka.
Есть еще одна вещь, которую я должен поделиться с вами. Для отправки объекта через kafkatemplate я должен снабдить его файлом сериализатора, который у меня есть.
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
|
public class KafkaInfoSerializer implements Serializer<kafkainfo> { @Override public void configure(Map map, boolean b) { } @Override public byte [] serialize(String arg0, KafkaInfo info) { byte [] retVal = null ; ObjectMapper objectMapper = new ObjectMapper(); try { retVal = objectMapper.writeValueAsString(info).getBytes(); } catch (Exception e) { // log the exception } return retVal; } @Override public void close() { } } |
Также не забудьте добавить конфигурацию для него. Существует несколько способов определения сериализаторов для кафки. Один из самых простых способов — добавить его в application.properties.
spring.kafka.producer.key-serializer = org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer = com.sezinkarli.kafkautils.serializer.KafkaInfoSerializer
Теперь у вас есть загрузочный проект, который может отправлять асинхронные объекты в нужную тему.
Опубликовано на Java Code Geeks с разрешения Сезина Карли, партнера нашей программы JCG . Смотрите оригинальную статью здесь: отправьте свои данные асинхронно на Kafka Мнения, высказанные участниками Java Code Geeks, являются их собственными. |