Статьи

Агрегирование асинхронных результатов с использованием Spring Integration

Привет, я столкнулся с проблемой, которая имеет очень хорошее решение с помощью Spring Integration. Много раз нам нужен сценарий отправки сообщения неизвестному количеству адресатов. Для этого у нас есть методология Темы. Но в некоторых случаях мы также хотим получать ответы от всех получателей, которые получили сообщение, и объединять их в единый результат. Для этого мы можем использовать каналы в сочетании с интерфейсами Aggregator и ReleaseStrategy. В этом посте я не буду концентрироваться на «реализации каналов».

Допустим, у нас есть продюсер, который отправляет свое сообщение в тему. Теперь у нас есть потребитель, который получает это сообщение. Используя шлюз и интерфейс процессора, мы можем отправить это сообщение в любом виде:

1
2
3
4
public interface Processor
{
    public void sendResponse(String response);
}

Код потребителя:

01
02
03
04
05
06
07
08
09
10
11
12
@Override public void onMessage(Message message)
{
  String resultMessage = "";
    try
    {
          processor.sendResponse(resultMessage);
    }
    catch (Exception e)
    {
         log.error("Error while processing message in channel consumer. errorMsg=" + e.getMessage(), e);
     }
}

Теперь сообщение будет доставлено на канал («In Channel»). Мы можем добавить к этому сообщению дополнительную информацию (в случае, если у нас разные группы сообщений). После добавления дополнительной информации мы отправляем это сообщение в другой канал («Out channel»). Теперь вот волшебство: мы создаем два pojo, которые позже будут привязаны к интерфейсам, используя конфигурацию XML. ReleaseStrategy:

01
02
03
04
05
06
07
08
09
10
11
12
13
public class ReleaseStrategy
{
    public boolean canRelease(List results)
    {
            // check if all 5  subscribers sent responses
        if (results.size() == 5)
                {
 
            return true;
        }
        return false;
    }
}

Накопитель:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
public class Aggregator
{
    public String aggregate(List results)
    {
               String finalResult= "SUCCESS_RESULT";
        for (String result: results) {
 
            if (result.equals("ERROR_RESULT")) {
                finalResult= "ERROR_RESULT";
                break;
            }
        }
 
        return finalResult;
    }
}

В основном здесь происходит то, что после того, как мы вернем «истинное» значение через метод canRelease интерфейса ReleaseStrategy, агрегатор сможет получать агрегированное сообщение и отправлять один результат в конечный пункт назначения (это может быть очередь, в которую получит другой потребитель сообщения) и обработать результат) Конфигурация Xml:

1
< ?xml version="1.0" encoding="UTF-8"?>