Статьи

Обработка сообщений с помощью Spring Integration

Spring Integration предоставляет расширение среды Spring для поддержки хорошо известных шаблонов корпоративной интеграции. Он обеспечивает легкий обмен сообщениями в приложениях на базе Spring и поддерживает интеграцию с внешними системами. Одна из важнейших целей Spring Integration — предоставить простую модель для построения поддерживаемых и тестируемых решений для интеграции предприятия.

Основные компоненты

Сообщение: это универсальная оболочка для любого объекта Java в сочетании с метаданными, используемыми платформой при обработке этого объекта. Он состоит из полезной нагрузки и заголовка (ов). Полезная нагрузка сообщения может быть любым объектом Java, а заголовок сообщения — это строка / карта объекта, содержащая имя и значение заголовка. MessageBuilder используется для создания сообщений, охватывающих полезную нагрузку и заголовки, следующим образом:

1
2
3
4
5
6
7
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
 
Message message = MessageBuilder.withPayload("Message Payload")
                .setHeader("Message_Header1", "Message_Header1_Value")
                .setHeader("Message_Header2", "Message_Header2_Value")
                .build();

Канал сообщений. Канал сообщений — это компонент, через который передаются сообщения, поэтому его можно рассматривать как канал между производителем сообщения и потребителем. Производитель отправляет сообщение на канал, а потребитель получает сообщение от канала. Канал сообщений может следовать семантике «точка-точка» или «публикация / подписка». С каналом «точка-точка» самое большее один потребитель может получить каждое сообщение, отправленное на канал. С помощью каналов публикации / подписки несколько подписчиков могут получать каждое сообщение, отправленное на канал. Spring Integration поддерживает оба из них.

В этом примере проекта используются прямой канал и нулевой канал. Прямой канал — это тип канала по умолчанию в Spring Integration и самый простой вариант двухточечного канала. Нулевой канал — это фиктивный канал сообщений, который используется в основном для тестирования и отладки. Он не отправляет сообщение от отправителя получателю, но его метод send всегда возвращает true, а метод receive возвращает нулевое значение. В дополнение к DirectChannel и NullChannel, Spring Integration предоставляет различные реализации каналов сообщений, такие как PublishSubscribeChannel, QueueChannel, PriorityChannel, RendezvousChannel, ExecutorChannel и ScopedChannel.

Конечная точка сообщения: конечная точка сообщения изолирует код приложения от инфраструктуры. Другими словами, это уровень абстракции между кодом приложения и структурой обмена сообщениями.

Конечные точки основного сообщения

Трансформатор: Трансформатор сообщений отвечает за преобразование содержимого или структуры сообщения и возврат измененного сообщения. Например: его можно использовать для преобразования полезной нагрузки сообщения из одного формата в другой или для изменения значений заголовка сообщения.

Фильтр: Фильтр сообщений определяет, следует ли передавать сообщение в канал сообщений.

Маршрутизатор: Маршрутизатор сообщений решает, какой канал (каналы) должен получить сообщение следующим, если оно доступно.

Разделитель: Разделитель разбивает входящее сообщение на несколько сообщений и отправляет их на соответствующий канал.

Агрегатор: Агрегатор объединяет несколько сообщений в одно сообщение.

Активатор службы: Активатор службы — это общая конечная точка для подключения экземпляра службы к системе обмена сообщениями.

Адаптер канала: Адаптер канала — это конечная точка, которая соединяет канал сообщений с внешней системой. Канальные адаптеры могут быть входящими или исходящими. Конечная точка адаптера входящего канала соединяет внешнюю систему с MessageChannel. Конечная точка адаптера исходящего канала соединяет MessageChannel с внешней системой.

Шлюз обмена сообщениями. Шлюз является точкой входа для системы обмена сообщениями и скрывает API обмена сообщениями от внешней системы. Он двунаправлен, покрывая каналы запроса и ответа.

Также Spring Integration предоставляет различные канальные адаптеры и шлюзы обмена сообщениями (для AMQP, File, Redis, Gemfire, Http, Jdbc, JPA, JMS, RMI, Stream и т. Д.) Для поддержки обмена сообщениями с внешними системами. Пожалуйста, посетите справочную документацию Spring Integration для получения подробной информации.

В следующем примере реализации обмена сообщениями о грузе показано поведение базовых оконечных точек для простоты понимания. Система обмена сообщениями о грузе прослушивает сообщения о грузе из внешней системы с помощью интерфейса CargoGateway. Полученные грузовые сообщения обрабатываются с использованием CargoSplitter, CargoFilter, CargoRouter, CargoTransformer MessageEndpoints. После этого обработанные успешные внутренние и международные грузовые сообщения отправляются в CargoServiceActivator.

Spring Integration системы Cargo Messaging выглядит следующим образом:

otv_si

Давайте рассмотрим пример внедрения системы обмена сообщениями.

Используемые технологии

  • JDK 1.8.0_25
  • Весна 4.1.2
  • Spring Integration 4.1.0
  • Maven 3.2.2
  • Убунту 14.04

Иерархия проекта выглядит следующим образом:

otv_si3

ШАГ 1: Зависимости

Зависимости добавляются в Maven pom.xml.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
<properties>
    <spring.version>4.1.2.RELEASE</spring.version>
    <spring.integration.version>4.1.0.RELEASE</spring.integration.version>
</properties>
 
<dependencies>
    <!-- Spring 4 dependencies -->
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-context</artifactId>
        <version>${spring.version}</version>
    </dependency>
     
    <!-- Spring Integration dependencies -->
    <dependency>
        <groupId>org.springframework.integration</groupId>
        <artifactId>spring-integration-core</artifactId>
        <version>${spring.integration.version}</version>
    </dependency>
</dependencies>

ШАГ 2: Грузовой строитель

CargoBuilder создан для создания запросов на груз.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
public class Cargo {
 
    public enum ShippingType {
        DOMESTIC, INTERNATIONAL
    }
 
    private final long trackingId;
    private final String receiverName;
    private final String deliveryAddress;
    private final double weight;
    private final String description;
    private final ShippingType shippingType;
    private final int deliveryDayCommitment;
    private final int region;
 
    private Cargo(CargoBuilder cargoBuilder) {
        this.trackingId = cargoBuilder.trackingId;
        this.receiverName = cargoBuilder.receiverName;
        this.deliveryAddress = cargoBuilder.deliveryAddress;
        this.weight = cargoBuilder.weight;
        this.description = cargoBuilder.description;
        this.shippingType = cargoBuilder.shippingType;
        this.deliveryDayCommitment = cargoBuilder.deliveryDayCommitment;
        this.region = cargoBuilder.region;
    }
 
    // Getter methods...
     
    @Override
    public String toString() {
        return "Cargo [trackingId=" + trackingId + ", receiverName="
                + receiverName + ", deliveryAddress=" + deliveryAddress
                + ", weight=" + weight + ", description=" + description
                + ", shippingType=" + shippingType + ", deliveryDayCommitment="
                + deliveryDayCommitment + ", region=" + region + "]";
    }
 
    public static class CargoBuilder {
         
        private final long trackingId;
        private final String receiverName;
        private final String deliveryAddress;
        private final double weight;
        private final ShippingType shippingType;
        private int deliveryDayCommitment;
        private int region;
        private String description;
         
        public CargoBuilder(long trackingId, String receiverName,
                            String deliveryAddress, double weight,
                            ShippingType shippingType) {
            this.trackingId = trackingId;
            this.receiverName = receiverName;
            this.deliveryAddress = deliveryAddress;
            this.weight = weight;
            this.shippingType = shippingType;
        }
 
        public CargoBuilder setDeliveryDayCommitment(int deliveryDayCommitment) {
            this.deliveryDayCommitment = deliveryDayCommitment;
            return this;
        }
 
        public CargoBuilder setDescription(String description) {
            this.description = description;
            return this;
        }
         
        public CargoBuilder setRegion(int region) {
            this.region = region;
            return this;
        }
 
        public Cargo build() {
            Cargo cargo = new Cargo(this);
            if ((ShippingType.DOMESTIC == cargo.getShippingType()) && (cargo.getRegion() <= 0 || cargo.getRegion() > 4)) {
                throw new IllegalStateException("Region is invalid! Cargo Tracking Id : " + cargo.getTrackingId());
            }
             
            return cargo;
        }
         
    }

ШАГ 3: Грузовое сообщение

CargoMessage является родительским классом внутренних и международных грузовых сообщений.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
public class CargoMessage {
 
    private final Cargo cargo;
 
    public CargoMessage(Cargo cargo) {
        this.cargo = cargo;
    }
 
    public Cargo getCargo() {
        return cargo;
    }
 
    @Override
    public String toString() {
        return cargo.toString();
    }
}

ШАГ 4: Внутреннее грузовое сообщение

Класс DomesticCargoMessage моделирует сообщения о внутренних грузоперевозках.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class DomesticCargoMessage extends CargoMessage {
     
    public enum Region {
         
        NORTH(1), SOUTH(2), EAST(3), WEST(4);
         
        private int value;
 
        private Region(int value) {
            this.value = value;
        }
 
        public static Region fromValue(int value) {
            return Arrays.stream(Region.values())
                            .filter(region -> region.value == value)
                            .findFirst()
                            .get();
        }
    }
     
    private final Region region;
 
    public DomesticCargoMessage(Cargo cargo, Region region) {
        super(cargo);
        this.region = region;
    }
 
    public Region getRegion() {
        return region;
    }
 
    @Override
    public String toString() {
        return "DomesticCargoMessage [cargo=" + super.toString() + ", region=" + region + "]";
    }
 
}

ШАГ 5: Международное грузовое сообщение

InternationalCargoMessage Class моделирует международные грузовые сообщения.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class InternationalCargoMessage extends CargoMessage {
     
    public enum DeliveryOption {
        NEXT_FLIGHT, PRIORITY, ECONOMY, STANDART
    }
     
    private final DeliveryOption deliveryOption;
     
    public InternationalCargoMessage(Cargo cargo, DeliveryOption deliveryOption) {
        super(cargo);
        this.deliveryOption = deliveryOption;
    }
 
    public DeliveryOption getDeliveryOption() {
        return deliveryOption;
    }
 
    @Override
    public String toString() {
        return "InternationalCargoMessage [cargo=" + super.toString() + ", deliveryOption=" + deliveryOption + "]";
    }
 
}

ШАГ 6: Конфигурация приложения

AppConfiguration — это класс поставщика конфигурации для Spring Container. Он создает каналы сообщений и регистрируется в Spring BeanFactory. Также @EnableIntegration позволяет импортировать конфигурацию интеграции Spring, а @IntegrationComponentScan сканирует определенные компоненты Spring Integration. Оба они пришли с Spring Integration 4.0.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.messaging.MessageChannel;
 
@Configuration
@ComponentScan("com.onlinetechvision.integration")
@EnableIntegration
@IntegrationComponentScan("com.onlinetechvision.integration")
public class AppConfiguration {
 
    /**
     * Creates a new cargoGWDefaultRequest Channel and registers to BeanFactory.
     *
     * @return direct channel
     */
    @Bean
    public MessageChannel cargoGWDefaultRequestChannel() {
        return new DirectChannel();
    }
 
    /**
     * Creates a new cargoSplitterOutput Channel and registers to BeanFactory.
     *
     * @return direct channel
     */
    @Bean
    public MessageChannel cargoSplitterOutputChannel() {
        return new DirectChannel();
    }
 
    /**
     * Creates a new cargoFilterOutput Channel and registers to BeanFactory.
     *
     * @return direct channel
     */
    @Bean
    public MessageChannel cargoFilterOutputChannel() {
        return new DirectChannel();
    }
 
    /**
     * Creates a new cargoRouterDomesticOutput Channel and registers to BeanFactory.
     *
     * @return direct channel
     */
    @Bean
    public MessageChannel cargoRouterDomesticOutputChannel() {
        return new DirectChannel();
    }
 
    /**
     * Creates a new cargoRouterInternationalOutput Channel and registers to BeanFactory.
     *
     * @return direct channel
     */
    @Bean
    public MessageChannel cargoRouterInternationalOutputChannel() {
        return new DirectChannel();
    }
 
    /**
     * Creates a new cargoTransformerOutput Channel and registers to BeanFactory.
     *
     * @return direct channel
     */
    @Bean
    public MessageChannel cargoTransformerOutputChannel() {
        return new DirectChannel();
    }
 
}

ШАГ 7: Шлюз обмена сообщениями

Интерфейс CargoGateway предоставляет приложению метод, относящийся к конкретному домену. Другими словами, он обеспечивает доступ приложения к системе обмена сообщениями. Также @MessagingGateway поставляется с Spring Integration 4.0 и упрощает создание шлюза в системе обмена сообщениями. Его канал запроса по умолчанию — cargoGWDefaultRequestChannel .

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
import java.util.List;
 
import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.messaging.Message;
 
import com.onlinetechvision.model.Cargo;
 
@MessagingGateway(name = "cargoGateway",
                    defaultRequestChannel = "cargoGWDefaultRequestChannel")
public interface ICargoGateway {
 
    /**
     * Processes Cargo Request
     *
     * @param message SI Message covering Cargo List payload and Batch Cargo Id header.
     * @return operation result
     */
    @Gateway
    void processCargoRequest(Message<List<Cargo>> message);
}

ШАГ 8: Разделитель сообщений

CargoSplitter прослушивает канал cargoGWDefaultRequestChannel и разбивает входящий список грузов на сообщения Cargo. Грузовые сообщения отправляются в cargoSplitterOutputChannel.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import java.util.List;
 
import org.springframework.integration.annotation.MessageEndpoint;
import org.springframework.integration.annotation.Splitter;
import org.springframework.messaging.Message;
 
import com.onlinetechvision.model.Cargo;
 
@MessageEndpoint
public class CargoSplitter {
 
    /**
     * Splits Cargo List to Cargo message(s)
     *
     * @param message SI Message covering Cargo List payload and Batch Cargo Id header.
     * @return cargo list
     */
    @Splitter(inputChannel = "cargoGWDefaultRequestChannel",
                outputChannel = "cargoSplitterOutputChannel")
    public List<Cargo> splitCargoList(Message<List<Cargo>> message) {
        return message.getPayload();
    }
}

ШАГ 9: Фильтр сообщений

CargoFilter определяет, следует ли передавать сообщение в канал сообщений. Он прослушивает канал cargoSplitterOutputChannel и фильтрует сообщения о грузе, превышающие весовой предел. Если сообщение Cargo ниже предела веса, оно отправляется на канал cargoFilterOutputChannel . Если сообщение Cargo превышает лимит веса, оно отправляется на канал cargoFilterDiscardChannel .

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
import org.springframework.integration.annotation.Filter;
import org.springframework.integration.annotation.MessageEndpoint;
 
import com.onlinetechvision.model.Cargo;
 
 
@MessageEndpoint
public class CargoFilter {
 
    private static final long CARGO_WEIGHT_LIMIT = 1_000;
     
    /**
     * Checks weight of cargo and filters if it exceeds limit.
     *
     * @param Cargo message
     * @return check result
     */
    @Filter(inputChannel="cargoSplitterOutputChannel", outputChannel="cargoFilterOutputChannel", discardChannel="cargoFilterDiscardChannel")
    public boolean filterIfCargoWeightExceedsLimit(Cargo cargo) {
        return cargo.getWeight() <= CARGO_WEIGHT_LIMIT;
    }
}

ШАГ 10: прослушиватель сброшенных грузовых сообщений

DiscardedCargoMessageListener прослушивает канал cargoFilterDiscard и обрабатывает сообщения Cargo, отклоненные CargoFilter.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.integration.annotation.MessageEndpoint;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.handler.annotation.Header;
 
import com.onlinetechvision.model.Cargo;
 
 
@MessageEndpoint
public class DiscardedCargoMessageListener {
 
    private final Logger logger = LoggerFactory.getLogger(DiscardedCargoMessageListener.class);
     
    /**
     * Handles discarded domestic and international cargo request(s) and logs.
     *
     * @param cargo domestic/international cargo message
     * @param batchId message header shows cargo batch id
     */
    @ServiceActivator(inputChannel = "cargoFilterDiscardChannel")
    public void handleDiscardedCargo(Cargo cargo, @Header("CARGO_BATCH_ID") long batchId) {
        logger.debug("Message in Batch[" + batchId + "] is received with Discarded payload : " + cargo);
    }
 
}

ШАГ 11: Маршрутизатор сообщений

CargoRouter определяет, какой канал (каналы) должен получить следующее сообщение, если оно доступно. Он прослушивает канал cargoFilterOutputChannel и возвращает название соответствующего канала в зависимости от типа доставки груза. Другими словами, он направляет входящие грузовые сообщения во внутренние ( cargoRouterDomesticOutputChannel ) или международные ( cargoRouterInternationalOutputChannel ) грузовые каналы. Также, если тип доставки не установлен, возвращается nullChannel . nullChannel — это фиктивный канал сообщений, который используется в основном для тестирования и отладки. Он не отправляет сообщение от отправителя получателю, но его метод send всегда возвращает true, а метод receive возвращает нулевое значение.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import org.springframework.integration.annotation.MessageEndpoint;
import org.springframework.integration.annotation.Router;
 
import com.onlinetechvision.model.Cargo;
import com.onlinetechvision.model.Cargo.ShippingType;
 
@MessageEndpoint
public class CargoRouter {
     
    /**
     * Determines cargo request' s channel in the light of shipping type.
     *
     * @param Cargo message
     * @return channel name
     */
    @Router(inputChannel="cargoFilterOutputChannel")
    public String route(Cargo cargo) {
        if(cargo.getShippingType() == ShippingType.DOMESTIC) {
            return "cargoRouterDomesticOutputChannel";
        } else if(cargo.getShippingType() == ShippingType.INTERNATIONAL) {
            return "cargoRouterInternationalOutputChannel";
        }
         
        return "nullChannel";
    }
     
}

ШАГ 12: Обмен сообщениями

CargoTransformer прослушивает cargoRouterDomesticOutputChannel & cargoRouterInternationalOutputChannel и преобразовывает входящие запросы Cargo в сообщения внутренних и международных грузов. После этого он отправляет их на канал cargoTransformerOutputChannel .

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
import org.springframework.integration.annotation.MessageEndpoint;
import org.springframework.integration.annotation.Transformer;
 
import com.onlinetechvision.model.Cargo;
import com.onlinetechvision.model.DomesticCargoMessage;
import com.onlinetechvision.model.DomesticCargoMessage.Region;
import com.onlinetechvision.model.InternationalCargoMessage;
import com.onlinetechvision.model.InternationalCargoMessage.DeliveryOption;
 
 
@MessageEndpoint
public class CargoTransformer {
 
    /**
     * Transforms Cargo request to Domestic Cargo obj.
     *
     * @param cargo
     *            request
     * @return Domestic Cargo obj
     */
    @Transformer(inputChannel = "cargoRouterDomesticOutputChannel",
                    outputChannel = "cargoTransformerOutputChannel")
    public DomesticCargoMessage transformDomesticCargo(Cargo cargo) {
        return new DomesticCargoMessage(cargo, Region.fromValue(cargo.getRegion()));
    }
 
    /**
     * Transforms Cargo request to International Cargo obj.
     *
     * @param cargo
     *            request
     * @return International Cargo obj
     */
    @Transformer(inputChannel = "cargoRouterInternationalOutputChannel",
                    outputChannel = "cargoTransformerOutputChannel")
    public InternationalCargoMessage transformInternationalCargo(Cargo cargo) {
        return new InternationalCargoMessage(cargo, getDeliveryOption(cargo.getDeliveryDayCommitment()));
    }
     
    /**
     * Get delivery option by delivery day commitment.
     *
     * @param deliveryDayCommitment delivery day commitment
     * @return delivery option
     */
    private DeliveryOption getDeliveryOption(int deliveryDayCommitment) {
        if (deliveryDayCommitment == 1) {
            return DeliveryOption.NEXT_FLIGHT;
        } else if (deliveryDayCommitment == 2) {
            return DeliveryOption.PRIORITY;
        } else if (deliveryDayCommitment > 2 && deliveryDayCommitment < 5) {
            return DeliveryOption.ECONOMY;
        } else {
            return DeliveryOption.STANDART;
        }
    }
 
}

ШАГ 13: Активатор службы сообщений

CargoServiceActivator — это общая конечная точка для подключения экземпляра службы к системе обмена сообщениями. Он прослушивает канал cargoTransformerOutputChannel и получает обработанные внутренние и международные грузовые сообщения и журналы.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.integration.annotation.MessageEndpoint;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.handler.annotation.Header;
 
import com.onlinetechvision.model.CargoMessage;
 
 
@MessageEndpoint
public class CargoServiceActivator {
 
    private final Logger logger = LoggerFactory.getLogger(CargoServiceActivator.class);
     
    /**
     * Gets processed domestic and international cargo request(s) and logs.
     *
     * @param cargoMessage domestic/international cargo message
     * @param batchId message header shows cargo batch id
     */
    @ServiceActivator(inputChannel = "cargoTransformerOutputChannel")
    public void getCargo(CargoMessage cargoMessage, @Header("CARGO_BATCH_ID") long batchId) {
        logger.debug("Message in Batch[" + batchId + "] is received with payload : " + cargoMessage);
    }
 
}

ШАГ 14: Применение

Класс приложения создан для запуска приложения. Он инициализирует контекст приложения и отправляет запросы на груз в систему обмена сообщениями.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
 
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.messaging.support.MessageBuilder;
 
import com.onlinetechvision.integration.ICargoGateway;
import com.onlinetechvision.model.Cargo;
import com.onlinetechvision.model.Cargo.ShippingType;
 
 
public class Application {
 
    public static void main(String[] args) {
        ApplicationContext ctx = new AnnotationConfigApplicationContext(AppConfiguration.class);
        ICargoGateway orderGateway = ctx.getBean(ICargoGateway.class);
         
        getCargoBatchMap().forEach(
            (batchId, cargoList) -> orderGateway.processCargoRequest(MessageBuilder
                                                                        .withPayload(cargoList)
                                                                        .setHeader("CARGO_BATCH_ID", batchId)
                                                                        .build()));
    }
     
    /**
     * Creates a sample cargo batch map covering multiple batches and returns.
     *
     * @return cargo batch map
     */
    private static Map<Integer, List<Cargo>> getCargoBatchMap() {
        Map<Integer, List<Cargo>> cargoBatchMap = new HashMap<>();
         
        cargoBatchMap.put(1, Arrays.asList(
                 
                new Cargo.CargoBuilder(1, "Receiver_Name1", "Address1", 0.5, ShippingType.DOMESTIC)
                            .setRegion(1).setDescription("Radio").build(),
                //Second cargo is filtered due to weight limit         
                new Cargo.CargoBuilder(2, "Receiver_Name2", "Address2", 2_000, ShippingType.INTERNATIONAL)
                            .setDeliveryDayCommitment(3).setDescription("Furniture").build(),
                new Cargo.CargoBuilder(3, "Receiver_Name3", "Address3", 5, ShippingType.INTERNATIONAL)
                            .setDeliveryDayCommitment(2).setDescription("TV").build(),
                //Fourth cargo is not processed due to no shipping type found          
                new Cargo.CargoBuilder(4, "Receiver_Name4", "Address4", 8, null)
                            .setDeliveryDayCommitment(2).setDescription("Chair").build()));
                                         
        cargoBatchMap.put(2, Arrays.asList(
                //Fifth cargo is filtered due to weight limit
                new Cargo.CargoBuilder(5, "Receiver_Name5", "Address5", 1_200, ShippingType.DOMESTIC)
                            .setRegion(2).setDescription("Refrigerator").build(),
                new Cargo.CargoBuilder(6, "Receiver_Name6", "Address6", 20, ShippingType.DOMESTIC)
                            .setRegion(3).setDescription("Table").build(),
                //Seventh cargo is not processed due to no shipping type found
                new Cargo.CargoBuilder(7, "Receiver_Name7", "Address7", 5, null)
                            .setDeliveryDayCommitment(1).setDescription("TV").build()));
                 
        cargoBatchMap.put(3, Arrays.asList(
                new Cargo.CargoBuilder(8, "Receiver_Name8", "Address8", 200, ShippingType.DOMESTIC)
                            .setRegion(2).setDescription("Washing Machine").build(),
                new Cargo.CargoBuilder(9, "Receiver_Name9", "Address9", 4.75, ShippingType.INTERNATIONAL)
                            .setDeliveryDayCommitment(1).setDescription("Document").build()));
         
        return Collections.unmodifiableMap(cargoBatchMap);
    }
     
}

ШАГ 15: Построить проект

Результаты работы с запросами на груз:

Груз 1: успешно отправлен активатору сервиса.
Груз 2: фильтруется по весу.
Карго 3: успешно отправлено активатору сервиса.
Груз 4: не обрабатывается из-за отсутствия типа доставки.
Груз 5: фильтруется по весу.
Груз 6: успешно отправлен активатору сервиса.
Груз 7: не обрабатывается из-за отсутствия типа доставки.
Карго 8: отправлено активатору сервиса успешно.
Груз 9: успешно отправлен активатору сервиса.

После того, как проект собран и запущен, будут видны следующие журналы вывода консоли:

1
2
3
4
5
6
7
2014-12-09 23:43:51 [main] DEBUG c.o.i.CargoServiceActivator - Message in Batch[1] is received with payload : DomesticCargoMessage [cargo=Cargo [trackingId=1, receiverName=Receiver_Name1, deliveryAddress=Address1, weight=0.5, description=Radio, shippingType=DOMESTIC, deliveryDayCommitment=0, region=1], region=NORTH]
2014-12-09 23:43:51 [main] DEBUG c.o.i.DiscardedCargoMessageListener - Message in Batch[1] is received with Discarded payload : Cargo [trackingId=2, receiverName=Receiver_Name2, deliveryAddress=Address2, weight=2000.0, description=Furniture, shippingType=INTERNATIONAL, deliveryDayCommitment=3, region=0]
2014-12-09 23:43:51 [main] DEBUG c.o.i.CargoServiceActivator - Message in Batch[1] is received with payload : InternationalCargoMessage [cargo=Cargo [trackingId=3, receiverName=Receiver_Name3, deliveryAddress=Address3, weight=5.0, description=TV, shippingType=INTERNATIONAL, deliveryDayCommitment=2, region=0], deliveryOption=PRIORITY]
2014-12-09 23:43:51 [main] DEBUG c.o.i.DiscardedCargoMessageListener - Message in Batch[2] is received with Discarded payload : Cargo [trackingId=5, receiverName=Receiver_Name5, deliveryAddress=Address5, weight=1200.0, description=Refrigerator, shippingType=DOMESTIC, deliveryDayCommitment=0, region=2]
2014-12-09 23:43:51 [main] DEBUG c.o.i.CargoServiceActivator - Message in Batch[2] is received with payload : DomesticCargoMessage [cargo=Cargo [trackingId=6, receiverName=Receiver_Name6, deliveryAddress=Address6, weight=20.0, description=Table, shippingType=DOMESTIC, deliveryDayCommitment=0, region=3], region=EAST]
2014-12-09 23:43:51 [main] DEBUG c.o.i.CargoServiceActivator - Message in Batch[3] is received with payload : DomesticCargoMessage [cargo=Cargo [trackingId=8, receiverName=Receiver_Name8, deliveryAddress=Address8, weight=200.0, description=Washing Machine, shippingType=DOMESTIC, deliveryDayCommitment=0, region=2], region=SOUTH]
2014-12-09 23:43:51 [main] DEBUG c.o.i.CargoServiceActivator - Message in Batch[3] is received with payload : InternationalCargoMessage [cargo=Cargo [trackingId=9, receiverName=Receiver_Name9, deliveryAddress=Address9, weight=4.75, description=Document, shippingType=INTERNATIONAL, deliveryDayCommitment=1, region=0], deliveryOption=NEXT_FLIGHT]

Исходный код

Исходный код доступен на Github

использованная литература

Ссылка: Обработка сообщений с помощью Spring Integration от нашего партнера JCG Эрен Авсарогуллари в блоге Online Technology Vision .