Привет, я столкнулся с проблемой, которая имеет очень хорошее решение с помощью Spring Integration. Много раз нам нужен сценарий отправки сообщения неизвестному количеству адресатов. Для этого у нас есть методология Темы. Но в некоторых случаях мы также хотим получать ответы от всех получателей, которые получили сообщение, и объединять их в единый результат. Для этого мы можем использовать каналы в сочетании с интерфейсами Aggregator и ReleaseStrategy. В этом посте я не буду концентрироваться на «реализации каналов».
Допустим, у нас есть продюсер, который отправляет свое сообщение в тему. Теперь у нас есть потребитель, который получает это сообщение. Используя шлюз и интерфейс процессора, мы можем отправить это сообщение в любом виде:
1
2
3
4
|
public interface Processor { public void sendResponse(String response); } |
Код потребителя:
01
02
03
04
05
06
07
08
09
10
11
12
|
@Override public void onMessage(Message message) { String resultMessage = "" ; try { processor.sendResponse(resultMessage); } catch (Exception e) { log.error( "Error while processing message in channel consumer. errorMsg=" + e.getMessage(), e); } } |
Теперь сообщение будет доставлено на канал («In Channel»). Мы можем добавить к этому сообщению дополнительную информацию (в случае, если у нас разные группы сообщений). После добавления дополнительной информации мы отправляем это сообщение в другой канал («Out channel»). Теперь вот волшебство: мы создаем два pojo, которые позже будут привязаны к интерфейсам, используя конфигурацию XML. ReleaseStrategy:
01
02
03
04
05
06
07
08
09
10
11
12
13
|
public class ReleaseStrategy { public boolean canRelease(List results) { // check if all 5 subscribers sent responses if (results.size() == 5 ) { return true ; } return false ; } } |
Накопитель:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
|
public class Aggregator { public String aggregate(List results) { String finalResult= "SUCCESS_RESULT" ; for (String result: results) { if (result.equals( "ERROR_RESULT" )) { finalResult= "ERROR_RESULT" ; break ; } } return finalResult; } } |
В основном здесь происходит то, что после того, как мы вернем «истинное» значение через метод canRelease интерфейса ReleaseStrategy, агрегатор сможет получать агрегированное сообщение и отправлять один результат в конечный пункт назначения (это может быть очередь, в которую получит другой потребитель сообщения) и обработать результат) Конфигурация Xml:
1
|
< ? xml version = "1.0" encoding = "UTF-8" ?> |