Статьи

Обработка сообщений с помощью Spring Integration

Spring Integration предоставляет расширение среды Spring для поддержки хорошо известных шаблонов корпоративной интеграции. Он обеспечивает легкий обмен сообщениями в приложениях на базе Spring и поддерживает интеграцию с внешними системами. Одна из важнейших целей Spring Integration — предоставить простую модель для построения поддерживаемых и тестируемых решений для интеграции предприятия.

Основные компоненты :

Сообщение:  это универсальная оболочка для любого объекта Java в сочетании с метаданными, используемыми платформой при обработке этого объекта. Он состоит из полезной нагрузки и заголовка (ов). Полезная нагрузка сообщения может быть любым объектом Java, а заголовок сообщения является строкой / картой объектов, содержащей имя и значение заголовка. MessageBuilder используется для создания сообщений, охватывающих полезную нагрузку и заголовки, следующим образом:

import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;

Message<String> message = MessageBuilder.withPayload("Message Payload")
                .setHeader("Message_Header1", "Message_Header1_Value")
                .setHeader("Message_Header2", "Message_Header2_Value")
                .build();

Канал сообщений. Канал  сообщений — это компонент, через который передаются сообщения, поэтому его можно рассматривать как канал между производителем сообщения и потребителем. Производитель отправляет сообщение на канал, а потребитель получает сообщение от канала. Канал сообщений может следовать семантике «точка-точка» или «публикация / подписка». С каналом «точка-точка» самое большее один потребитель может получить каждое сообщение, отправленное на канал. С помощью каналов публикации / подписки несколько подписчиков могут получать каждое сообщение, отправленное на канал. Spring Integration поддерживает оба из них.

В этом примере проекта используются прямой канал и нулевой канал. Прямой канал — это тип канала по умолчанию в Spring Integration и самый простой вариант двухточечного канала. Нулевой канал — это фиктивный канал сообщений, который используется в основном для тестирования и отладки. Он не отправляет сообщение от отправителя получателю, но его метод send всегда возвращает true, а метод receive возвращает нулевое значение. В дополнение к DirectChannel и NullChannel, Spring Integration предоставляет различные реализации каналов сообщений, такие как PublishSubscribeChannel, QueueChannel, PriorityChannel, RendezvousChannel, ExecutorChannel и ScopedChannel.

Конечная точка сообщения:  конечная точка сообщения изолирует код приложения от инфраструктуры. Другими словами, это уровень абстракции между кодом приложения и структурой обмена сообщениями.

Главные конечные точки сообщения:

Трансформатор: Трансформатор  сообщений отвечает за преобразование содержимого или структуры сообщения и возврат измененного сообщения. Например: его можно использовать для преобразования полезной нагрузки сообщения из одного формата в другой или для изменения значений заголовка сообщения.

Фильтр: Фильтр  сообщений определяет, следует ли передавать сообщение в канал сообщений.

Маршрутизатор: Маршрутизатор  сообщений решает, какой канал (каналы) должен получить сообщение следующим, если оно доступно.

Разделитель:  Разделитель разбивает входящее сообщение на несколько сообщений и отправляет их на соответствующий канал.

Агрегатор:  Агрегатор объединяет несколько сообщений в одно сообщение.

Активатор службы: Активатор  службы — это общая конечная точка для подключения экземпляра службы к системе обмена сообщениями.

Адаптер канала: Адаптер  канала — это конечная точка, которая соединяет канал сообщений с внешней системой. Канальные адаптеры могут быть входящими или исходящими. Конечная точка адаптера входящего канала соединяет внешнюю систему с MessageChannel. Конечная точка адаптера исходящего канала соединяет MessageChannel с внешней системой.

Шлюз обмена сообщениями.  Шлюз является точкой входа для системы обмена сообщениями и скрывает API обмена сообщениями от внешней системы. Он двунаправлен, покрывая каналы запроса и ответа.

Также Spring Integration предоставляет различные канальные адаптеры и шлюзы обмена сообщениями (для AMQP, File, Redis, Gemfire, Http, Jdbc, JPA, JMS, RMI, Stream и т. Д.) Для поддержки обмена сообщениями с внешними системами. Пожалуйста, посетите справочную документацию Spring Integration для получения подробной информации.

В следующем примере реализации обмена сообщениями о грузе показано поведение базовых оконечных точек для простоты понимания. Система обмена сообщениями о грузе прослушивает сообщения о грузе из внешней системы с помощью интерфейса CargoGateway. Полученные грузовые сообщения обрабатываются с использованием CargoSplitter, CargoFilter, CargoRouter, CargoTransformer MessageEndpoints. После этого обработанные успешные внутренние и международные грузовые сообщения отправляются в CargoServiceActivator.

Spring Integration системы Cargo Messaging выглядит следующим образом:

Давайте рассмотрим пример внедрения системы обмена сообщениями.

Используемые технологии:

JDK 1.8.0_25
Spring 4.1.2
Spring Integration 4.1.0
Maven 3.2.2
Ubuntu 14.04

Иерархия проекта выглядит следующим образом:

ШАГ 1: Зависимости

Зависимости добавляются в Maven pom.xml.

<properties>
        <spring.version>4.1.2.RELEASE</spring.version>
        <spring.integration.version>4.1.0.RELEASE</spring.integration.version>
    </properties>

    <dependencies>
        <!-- Spring 4 dependencies -->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
            <version>${spring.version}</version>
        </dependency>

        <!-- Spring Integration dependencies -->
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-core</artifactId>
            <version>${spring.integration.version}</version>
        </dependency>
</dependencies>

ШАГ 2: Грузовой строитель

CargoBuilder создан для создания запросов на груз.

public class Cargo {

    public enum ShippingType {
        DOMESTIC, INTERNATIONAL
    }

    private final long trackingId;
    private final String receiverName;
    private final String deliveryAddress;
    private final double weight;
    private final String description;
    private final ShippingType shippingType;
    private final int deliveryDayCommitment;
    private final int region;

    private Cargo(CargoBuilder cargoBuilder) {
        this.trackingId = cargoBuilder.trackingId;
        this.receiverName = cargoBuilder.receiverName;
        this.deliveryAddress = cargoBuilder.deliveryAddress;
        this.weight = cargoBuilder.weight;
        this.description = cargoBuilder.description;
        this.shippingType = cargoBuilder.shippingType;
        this.deliveryDayCommitment = cargoBuilder.deliveryDayCommitment;
        this.region = cargoBuilder.region;
    }

    // Getter methods...

    @Override
    public String toString() {
        return "Cargo [trackingId=" + trackingId + ", receiverName="
                + receiverName + ", deliveryAddress=" + deliveryAddress
                + ", weight=" + weight + ", description=" + description
                + ", shippingType=" + shippingType + ", deliveryDayCommitment="
                + deliveryDayCommitment + ", region=" + region + "]";
    }

    public static class CargoBuilder {

        private final long trackingId;
        private final String receiverName;
        private final String deliveryAddress;
        private final double weight;
        private final ShippingType shippingType;
        private int deliveryDayCommitment;
        private int region;
        private String description;

        public CargoBuilder(long trackingId, String receiverName,
                            String deliveryAddress, double weight, 
                            ShippingType shippingType) {
            this.trackingId = trackingId;
            this.receiverName = receiverName;
            this.deliveryAddress = deliveryAddress;
            this.weight = weight;
            this.shippingType = shippingType;
        }

        public CargoBuilder setDeliveryDayCommitment(int deliveryDayCommitment) {
            this.deliveryDayCommitment = deliveryDayCommitment;
            return this;
        }

        public CargoBuilder setDescription(String description) {
            this.description = description;
            return this;
        }

        public CargoBuilder setRegion(int region) {
            this.region = region;
            return this;
        }

        public Cargo build() {
            Cargo cargo = new Cargo(this);
            if ((ShippingType.DOMESTIC == cargo.getShippingType()) && (cargo.getRegion() <= 0 || cargo.getRegion() > 4)) {
                throw new IllegalStateException("Region is invalid! Cargo Tracking Id : " + cargo.getTrackingId());
            }

            return cargo;
        }

    }

ШАГ 3: Грузовое сообщение

CargoMessage является родительским классом внутренних и международных грузовых сообщений.

public class CargoMessage {

    private final Cargo cargo;

    public CargoMessage(Cargo cargo) {
        this.cargo = cargo;
    }

    public Cargo getCargo() {
        return cargo;
    }

    @Override
    public String toString() {
        return cargo.toString();
    }
}

ШАГ 4: Внутреннее грузовое сообщение

Класс DomesticCargoMessage моделирует сообщения о внутренних грузоперевозках.

public class DomesticCargoMessage extends CargoMessage {

    public enum Region {

        NORTH(1), SOUTH(2), EAST(3), WEST(4);

        private int value;

        private Region(int value) {
            this.value = value;
        }

        public static Region fromValue(int value) {
            return Arrays.stream(Region.values())
                            .filter(region -> region.value == value)
                            .findFirst()
                            .get();
        }
    }

    private final Region region; 

    public DomesticCargoMessage(Cargo cargo, Region region) {
        super(cargo);
        this.region = region;
    }

    public Region getRegion() {
        return region;
    }

    @Override
    public String toString() {
        return "DomesticCargoMessage [cargo=" + super.toString() + ", region=" + region + "]";
    }

}

ШАГ 5: Международное грузовое сообщение

InternationalCargoMessage Class моделирует международные грузовые сообщения.

public class InternationalCargoMessage extends CargoMessage {

    public enum DeliveryOption {
        NEXT_FLIGHT, PRIORITY, ECONOMY, STANDART
    }

    private final DeliveryOption deliveryOption;

    public InternationalCargoMessage(Cargo cargo, DeliveryOption deliveryOption) {
        super(cargo);
        this.deliveryOption = deliveryOption;
    }

    public DeliveryOption getDeliveryOption() {
        return deliveryOption;
    }

    @Override
    public String toString() {
        return "InternationalCargoMessage [cargo=" + super.toString() + ", deliveryOption=" + deliveryOption + "]";
    }

}

ШАГ 6: Конфигурация приложения

AppConfiguration — это класс поставщика конфигурации для Spring Container. Он создает каналы сообщений и регистрируется в Spring BeanFactory. Также  @EnableIntegration  позволяет импортировать конфигурацию интеграции Spring, а  @IntegrationComponentScan  сканирует определенные компоненты Spring Integration. Оба они пришли с Spring Integration 4.0.

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.messaging.MessageChannel;

@Configuration
@ComponentScan("com.onlinetechvision.integration")
@EnableIntegration
@IntegrationComponentScan("com.onlinetechvision.integration")
public class AppConfiguration {

    /**
     * Creates a new cargoGWDefaultRequest Channel and registers to BeanFactory.
     *
     * @return direct channel
     */
    @Bean
    public MessageChannel cargoGWDefaultRequestChannel() {
        return new DirectChannel();
    }

    /**
     * Creates a new cargoSplitterOutput Channel and registers to BeanFactory.
     *
     * @return direct channel
     */
    @Bean
    public MessageChannel cargoSplitterOutputChannel() {
        return new DirectChannel();
    }

    /**
     * Creates a new cargoFilterOutput Channel and registers to BeanFactory.
     *
     * @return direct channel
     */
    @Bean
    public MessageChannel cargoFilterOutputChannel() {
        return new DirectChannel();
    }

    /**
     * Creates a new cargoRouterDomesticOutput Channel and registers to BeanFactory.
     *
     * @return direct channel
     */
    @Bean
    public MessageChannel cargoRouterDomesticOutputChannel() {
        return new DirectChannel();
    }

    /**
     * Creates a new cargoRouterInternationalOutput Channel and registers to BeanFactory.
     *
     * @return direct channel
     */
    @Bean
    public MessageChannel cargoRouterInternationalOutputChannel() {
        return new DirectChannel();
    }

    /**
     * Creates a new cargoTransformerOutput Channel and registers to BeanFactory.
     *
     * @return direct channel
     */
    @Bean
    public MessageChannel cargoTransformerOutputChannel() {
        return new DirectChannel();
    }

}

ШАГ 7: Шлюз обмена сообщениями

Интерфейс CargoGateway предоставляет приложению метод, зависящий от домена. Другими словами, он обеспечивает доступ приложения к системе обмена сообщениями. Также  @MessagingGateway  поставляется с Spring Integration 4.0 и упрощает создание шлюза в системе обмена сообщениями. Его канал запроса по умолчанию —  cargoGWDefaultRequestChannel .

import java.util.List;

import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.messaging.Message;

import com.onlinetechvision.model.Cargo;

@MessagingGateway(name = "cargoGateway", 
                    defaultRequestChannel = "cargoGWDefaultRequestChannel")
public interface ICargoGateway {

    /**
     * Processes Cargo Request
     *
     * @param message SI Message covering Cargo List payload and Batch Cargo Id header.
     * @return operation result
     */
    @Gateway
    void processCargoRequest(Message<List<Cargo>> message);
}

ШАГ 8: Разделитель сообщений

CargoSplitter прослушивает   канал cargoGWDefaultRequestChannel и разбивает входящий список грузов на сообщения Cargo. Грузовые сообщения отправляются в  cargoSplitterOutputChannel.

import java.util.List;

import org.springframework.integration.annotation.MessageEndpoint;
import org.springframework.integration.annotation.Splitter;
import org.springframework.messaging.Message;

import com.onlinetechvision.model.Cargo;

@MessageEndpoint
public class CargoSplitter {

    /**
     * Splits Cargo List to Cargo message(s)
     *
     * @param message SI Message covering Cargo List payload and Batch Cargo Id header.
     * @return cargo list
     */
    @Splitter(inputChannel = "cargoGWDefaultRequestChannel", 
                outputChannel = "cargoSplitterOutputChannel")
    public List<Cargo> splitCargoList(Message<List<Cargo>> message) {
        return message.getPayload();
    }
}

ШАГ 9: Фильтр сообщений

CargoFilter определяет, следует ли передавать сообщение в канал сообщений. Он прослушивает   канал cargoSplitterOutputChannel и фильтрует сообщения о грузе, превышающие ограничение по весу. Если сообщение Cargo ниже предела веса, оно отправляется на  канал cargoFilterOutputChannel . Если сообщение Cargo превышает лимит веса, оно отправляется на  канал cargoFilterDiscardChannel .

import org.springframework.integration.annotation.Filter;
import org.springframework.integration.annotation.MessageEndpoint;

import com.onlinetechvision.model.Cargo;


@MessageEndpoint
public class CargoFilter {

    private static final long CARGO_WEIGHT_LIMIT = 1_000;

    /**
     * Checks weight of cargo and filters if it exceeds limit.
     *
     * @param Cargo message
     * @return check result
     */
    @Filter(inputChannel="cargoSplitterOutputChannel", outputChannel="cargoFilterOutputChannel", discardChannel="cargoFilterDiscardChannel")
    public boolean filterIfCargoWeightExceedsLimit(Cargo cargo) {
        return cargo.getWeight() <= CARGO_WEIGHT_LIMIT;
    }
}

ШАГ 10: прослушиватель сброшенных грузовых сообщений

DiscardedCargoMessageListener прослушивает канал cargoFilterDiscard и обрабатывает сообщения Cargo, отклоненные CargoFilter.

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.integration.annotation.MessageEndpoint;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.handler.annotation.Header;

import com.onlinetechvision.model.Cargo;


@MessageEndpoint
public class DiscardedCargoMessageListener {

private final Logger logger = LoggerFactory.getLogger(DiscardedCargoMessageListener.class);

/**
     * Handles discarded domestic and international cargo request(s) and logs.
     *
     * @param cargo domestic/international cargo message
     * @param batchId message header shows cargo batch id
     */
@ServiceActivator(inputChannel = "cargoFilterDiscardChannel")
public void handleDiscardedCargo(Cargo cargo, @Header("CARGO_BATCH_ID") long batchId) {
logger.debug("Message in Batch[" + batchId + "] is received with Discarded payload : " + cargo);
}

}

ШАГ 11: Маршрутизатор сообщений

CargoRouter определяет, какой канал (каналы) должен получить следующее сообщение, если оно доступно. Он прослушивает   канал cargoFilterOutputChannel и возвращает название соответствующего канала в зависимости от типа доставки груза. Другими словами, он направляет входящие грузовые сообщения на внутренние ( cargoRouterDomesticOutputChannel ) или международные ( cargoRouterInternationalOutputChannel ) грузовые каналы. Также, если тип доставки не установлен,   возвращается nullChannelnullChannel  — это фиктивный канал сообщений, который используется в основном для тестирования и отладки. Он не отправляет сообщение от отправителя получателю, но его метод send всегда возвращает true, а метод receive возвращает нулевое значение.

import org.springframework.integration.annotation.MessageEndpoint;
import org.springframework.integration.annotation.Router;

import com.onlinetechvision.model.Cargo;
import com.onlinetechvision.model.Cargo.ShippingType;

@MessageEndpoint
public class CargoRouter {

    /**
     * Determines cargo request' s channel in the light of shipping type.
     *
     * @param Cargo message
     * @return channel name
     */
    @Router(inputChannel="cargoFilterOutputChannel")
    public String route(Cargo cargo) {
        if(cargo.getShippingType() == ShippingType.DOMESTIC) {
            return "cargoRouterDomesticOutputChannel";
        } else if(cargo.getShippingType() == ShippingType.INTERNATIONAL) {
            return "cargoRouterInternationalOutputChannel";
        } 

        return "nullChannel"; 
    }

}

ШАГ 12: Обмен сообщениями

CargoTransformer прослушивает  cargoRouterDomesticOutputChannel  & cargoRouterInternationalOutputChannel  и преобразует входящие запросы Cargo в сообщения внутренних и международных грузов. После этого он отправляет их на  канал cargoTransformerOutputChannel .

import org.springframework.integration.annotation.MessageEndpoint;
import org.springframework.integration.annotation.Transformer;

import com.onlinetechvision.model.Cargo;
import com.onlinetechvision.model.DomesticCargoMessage;
import com.onlinetechvision.model.DomesticCargoMessage.Region;
import com.onlinetechvision.model.InternationalCargoMessage;
import com.onlinetechvision.model.InternationalCargoMessage.DeliveryOption;


@MessageEndpoint
public class CargoTransformer {

    /**
     * Transforms Cargo request to Domestic Cargo obj.
     *
     * @param cargo
     *            request
     * @return Domestic Cargo obj
     */
    @Transformer(inputChannel = "cargoRouterDomesticOutputChannel", 
                    outputChannel = "cargoTransformerOutputChannel")
    public DomesticCargoMessage transformDomesticCargo(Cargo cargo) {
        return new DomesticCargoMessage(cargo, Region.fromValue(cargo.getRegion()));
    }

    /**
     * Transforms Cargo request to International Cargo obj.
     *
     * @param cargo
     *            request
     * @return International Cargo obj
     */
    @Transformer(inputChannel = "cargoRouterInternationalOutputChannel", 
                    outputChannel = "cargoTransformerOutputChannel")
    public InternationalCargoMessage transformInternationalCargo(Cargo cargo) {
        return new InternationalCargoMessage(cargo, getDeliveryOption(cargo.getDeliveryDayCommitment()));
    }

    /**
     * Get delivery option by delivery day commitment.
     *
     * @param deliveryDayCommitment delivery day commitment
     * @return delivery option
     */
    private DeliveryOption getDeliveryOption(int deliveryDayCommitment) {
        if (deliveryDayCommitment == 1) {
            return DeliveryOption.NEXT_FLIGHT;
        } else if (deliveryDayCommitment == 2) {
            return DeliveryOption.PRIORITY;
        } else if (deliveryDayCommitment > 2 && deliveryDayCommitment < 5) {
            return DeliveryOption.ECONOMY;
        } else {
            return DeliveryOption.STANDART;
        }
    }

}

ШАГ 13: Активатор службы сообщений

CargoServiceActivator — это общая конечная точка для подключения экземпляра службы к системе обмена сообщениями. Он прослушивает   канал cargoTransformerOutputChannel и получает обработанные внутренние и международные грузовые сообщения и журналы.

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.integration.annotation.MessageEndpoint;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.handler.annotation.Header;

import com.onlinetechvision.model.CargoMessage;


@MessageEndpoint
public class CargoServiceActivator {

    private final Logger logger = LoggerFactory.getLogger(CargoServiceActivator.class);

    /**
     * Gets processed domestic and international cargo request(s) and logs.
     *
     * @param cargoMessage domestic/international cargo message
     * @param batchId message header shows cargo batch id
     */
    @ServiceActivator(inputChannel = "cargoTransformerOutputChannel")
    public void getCargo(CargoMessage cargoMessage, @Header("CARGO_BATCH_ID") long batchId) {
        logger.debug("Message in Batch[" + batchId + "] is received with payload : " + cargoMessage);
    }

}

ШАГ 14: Применение

Класс приложения создан для запуска приложения. Он инициализирует контекст приложения и отправляет запросы на груз в систему обмена сообщениями.

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.messaging.support.MessageBuilder;

import com.onlinetechvision.integration.ICargoGateway;
import com.onlinetechvision.model.Cargo;
import com.onlinetechvision.model.Cargo.ShippingType;


public class Application {

    public static void main(String[] args) {
        ApplicationContext ctx = new AnnotationConfigApplicationContext(AppConfiguration.class);
        ICargoGateway orderGateway = ctx.getBean(ICargoGateway.class);

        getCargoBatchMap().forEach(
            (batchId, cargoList) -> orderGateway.processCargoRequest(MessageBuilder
                                                                        .withPayload(cargoList)
                                                                        .setHeader("CARGO_BATCH_ID", batchId)
                                                                        .build()));
    }

    /**
     * Creates a sample cargo batch map covering multiple batches and returns.
     *
     * @return cargo batch map
     */
    private static Map<Integer, List<Cargo>> getCargoBatchMap() {
        Map<Integer, List<Cargo>> cargoBatchMap = new HashMap<>();

        cargoBatchMap.put(1, Arrays.asList(

                new Cargo.CargoBuilder(1, "Receiver_Name1", "Address1", 0.5, ShippingType.DOMESTIC)
                            .setRegion(1).setDescription("Radio").build(),
                //Second cargo is filtered due to weight limit          
                new Cargo.CargoBuilder(2, "Receiver_Name2", "Address2", 2_000, ShippingType.INTERNATIONAL)
                            .setDeliveryDayCommitment(3).setDescription("Furniture").build(),
                new Cargo.CargoBuilder(3, "Receiver_Name3", "Address3", 5, ShippingType.INTERNATIONAL)
                            .setDeliveryDayCommitment(2).setDescription("TV").build(),
                //Fourth cargo is not processed due to no shipping type found           
                new Cargo.CargoBuilder(4, "Receiver_Name4", "Address4", 8, null)
                            .setDeliveryDayCommitment(2).setDescription("Chair").build()));

        cargoBatchMap.put(2, Arrays.asList(
                //Fifth cargo is filtered due to weight limit
                new Cargo.CargoBuilder(5, "Receiver_Name5", "Address5", 1_200, ShippingType.DOMESTIC)
                            .setRegion(2).setDescription("Refrigerator").build(),
                new Cargo.CargoBuilder(6, "Receiver_Name6", "Address6", 20, ShippingType.DOMESTIC)
                            .setRegion(3).setDescription("Table").build(),
                //Seventh cargo is not processed due to no shipping type found
                new Cargo.CargoBuilder(7, "Receiver_Name7", "Address7", 5, null)
                            .setDeliveryDayCommitment(1).setDescription("TV").build()));

        cargoBatchMap.put(3, Arrays.asList(
                new Cargo.CargoBuilder(8, "Receiver_Name8", "Address8", 200, ShippingType.DOMESTIC)
                            .setRegion(2).setDescription("Washing Machine").build(),
                new Cargo.CargoBuilder(9, "Receiver_Name9", "Address9", 4.75, ShippingType.INTERNATIONAL)
                            .setDeliveryDayCommitment(1).setDescription("Document").build()));

        return Collections.unmodifiableMap(cargoBatchMap);
    }

}

ШАГ 15: Построить проект

Результаты работы с запросами на груз:

Груз 1:  успешно отправлен активатору сервиса.
Груз 2:  фильтруется по весу.
Карго 3:  успешно отправлено активатору сервиса.
Груз 4:  не обрабатывается из-за отсутствия типа доставки.
Груз 5:  фильтруется по весу.
Груз 6:  успешно отправлен активатору сервиса.
Груз 7:  не обрабатывается из-за отсутствия типа доставки.
Карго 8:  отправлено активатору сервиса успешно.
Груз 9:  успешно отправлен активатору сервиса.

После того, как проект собран и запущен, будут видны следующие журналы вывода консоли:

2014-12-09 23:43:51 [main] DEBUG c.o.i.CargoServiceActivator - Message in Batch[1] is received with payload : DomesticCargoMessage [cargo=Cargo [trackingId=1, receiverName=Receiver_Name1, deliveryAddress=Address1, weight=0.5, description=Radio, shippingType=DOMESTIC, deliveryDayCommitment=0, region=1], region=NORTH]
2014-12-09 23:43:51 [main] DEBUG c.o.i.DiscardedCargoMessageListener - Message in Batch[1] is received with Discarded payload : Cargo [trackingId=2, receiverName=Receiver_Name2, deliveryAddress=Address2, weight=2000.0, description=Furniture, shippingType=INTERNATIONAL, deliveryDayCommitment=3, region=0]
2014-12-09 23:43:51 [main] DEBUG c.o.i.CargoServiceActivator - Message in Batch[1] is received with payload : InternationalCargoMessage [cargo=Cargo [trackingId=3, receiverName=Receiver_Name3, deliveryAddress=Address3, weight=5.0, description=TV, shippingType=INTERNATIONAL, deliveryDayCommitment=2, region=0], deliveryOption=PRIORITY]
2014-12-09 23:43:51 [main] DEBUG c.o.i.DiscardedCargoMessageListener - Message in Batch[2] is received with Discarded payload : Cargo [trackingId=5, receiverName=Receiver_Name5, deliveryAddress=Address5, weight=1200.0, description=Refrigerator, shippingType=DOMESTIC, deliveryDayCommitment=0, region=2]
2014-12-09 23:43:51 [main] DEBUG c.o.i.CargoServiceActivator - Message in Batch[2] is received with payload : DomesticCargoMessage [cargo=Cargo [trackingId=6, receiverName=Receiver_Name6, deliveryAddress=Address6, weight=20.0, description=Table, shippingType=DOMESTIC, deliveryDayCommitment=0, region=3], region=EAST]
2014-12-09 23:43:51 [main] DEBUG c.o.i.CargoServiceActivator - Message in Batch[3] is received with payload : DomesticCargoMessage [cargo=Cargo [trackingId=8, receiverName=Receiver_Name8, deliveryAddress=Address8, weight=200.0, description=Washing Machine, shippingType=DOMESTIC, deliveryDayCommitment=0, region=2], region=SOUTH]
2014-12-09 23:43:51 [main] DEBUG c.o.i.CargoServiceActivator - Message in Batch[3] is received with payload : InternationalCargoMessage [cargo=Cargo [trackingId=9, receiverName=Receiver_Name9, deliveryAddress=Address9, weight=4.75, description=Document, shippingType=INTERNATIONAL, deliveryDayCommitment=1, region=0], deliveryOption=NEXT_FLIGHT]

Исходный код
Исходный код доступен на Github

Ссылки

Шаблоны корпоративной интеграции
Spring Integration Справочное руководство
Spring Integration 4.1.0.RELEASE API
Pro Spring Integration Выпуск
Spring Integration 3.0.2 и 4.0 Milestone 4