Привет, я столкнулся с проблемой, которая имеет очень хорошее решение с помощью Spring Integration. Много раз нам нужен сценарий отправки сообщения неизвестному количеству адресатов. Для этого у нас есть методология Темы. Но в некоторых случаях мы также хотим получать ответы от всех получателей, которые получили сообщение, и объединять их в единый результат. Для этого мы можем использовать каналы в сочетании с интерфейсами Aggregator и ReleaseStrategy. В этом посте я не буду концентрироваться на «реализации каналов».
Допустим, у нас есть продюсер, который отправляет свое сообщение в тему. Теперь у нас есть потребитель, который получает это сообщение. Используя шлюз и интерфейс процессора, мы можем отправить это сообщение в любом виде:
|
1
2
3
4
|
public interface Processor{ public void sendResponse(String response);} |
Код потребителя:
|
01
02
03
04
05
06
07
08
09
10
11
12
|
@Override public void onMessage(Message message){ String resultMessage = ""; try { processor.sendResponse(resultMessage); } catch (Exception e) { log.error("Error while processing message in channel consumer. errorMsg=" + e.getMessage(), e); }} |
Теперь сообщение будет доставлено на канал («In Channel»). Мы можем добавить к этому сообщению дополнительную информацию (в случае, если у нас разные группы сообщений). После добавления дополнительной информации мы отправляем это сообщение в другой канал («Out channel»). Теперь вот волшебство: мы создаем два pojo, которые позже будут привязаны к интерфейсам, используя конфигурацию XML. ReleaseStrategy:
|
01
02
03
04
05
06
07
08
09
10
11
12
13
|
public class ReleaseStrategy{ public boolean canRelease(List results) { // check if all 5 subscribers sent responses if (results.size() == 5) { return true; } return false; }} |
Накопитель:
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
|
public class Aggregator{ public String aggregate(List results) { String finalResult= "SUCCESS_RESULT"; for (String result: results) { if (result.equals("ERROR_RESULT")) { finalResult= "ERROR_RESULT"; break; } } return finalResult; }} |
В основном здесь происходит то, что после того, как мы вернем «истинное» значение через метод canRelease интерфейса ReleaseStrategy, агрегатор сможет получать агрегированное сообщение и отправлять один результат в конечный пункт назначения (это может быть очередь, в которую получит другой потребитель сообщения) и обработать результат) Конфигурация Xml:
|
1
|
< ?xml version="1.0" encoding="UTF-8"?> |