Статьи

Публикация доменных событий с помощью Spring Integration EventBus

Интеграции могут стать беспорядочными, если вы не относитесь к ним серьезно, особенно если у вас есть как входящие, так и исходящие интеграции в одной точке. Проблемы интеграции могут нежелательно загрязнять ваш домен, если у вас нет надлежащих основ, заложенных в первую очередь. Шестиугольная архитектура  помогает сохранить ваш домен в чистоте от аспектов безопасности, транзакций и других нефункциональных аспектов, передавая их на уровень обслуживания приложений. Но все же есть проблемы интеграции, которые мы должны решать за пределами уровня обслуживания приложений — в частности, те внешние интеграции, которые происходят из событий домена,  опубликованных доменом приложения.

Если вы уже используете Spring Framework в своем проекте, то наличие интеграции Spring значительно упрощает эти проблемы. Хотя Apache Camel также может быть хорошим кандидатом, я считаю, что весенняя интеграция более привлекательна из-за ее простоты и совместимости с другими весенними проектами.

В этой статье я покажу, как я использовал интеграцию Spring в одном из моих недавних проектов, чтобы домен не обращал внимания на проблемы интеграции и в то же время упрощался. Основной целью проекта было ускорение процесса эмиграции пассажира в очень загруженном аэропорту. В проекте было много входящих и исходящих интеграций с внешними системами, относящимися к авиакомпаниям, аэропортам, местным жителям и секторам иностранных дел и т. Д. Ниже представлена ​​диаграмма архитектуры программного приложения, которое стало результатом этого проекта.

Шестиугольная архитектура

On the left side of the diagram, these small blocks in the REST API layer represents workflow activities which are responsible for manipulating the state of underlying domain objects. REST API layer communicates with application API via these workflow activites. This also involves necessary back and forth translation between resource and domain objects. Different types of request which are shown on left side comes from front-end application. Requests which does not require any external input goes directly to application service via workflow activities. Whereas, some request that requires external system intervention, triggers integration flows to gather the necessary information from outside world and then send the enriched request to application service API layer. These external integrations can be greatly simplified by interacting with a Java interface backed up by a Messaging Gateway provided by spring integration. The bottom left REST API requests terminate at a workflow activity which triggers an integration flow with an outside system before calling the application service.

Whereas, domain events (shown with dotted arrows pointing outwards from the domain) are published by the domain in effect of certain business incidents. These events are published to outside world so they can also react onto them by subscribing to those events. You will be lucky enough, if all the other applications are in downstream and interpretation of event messages generated by your bounded context is their responsibility. Hence, in most of the cases, it is your responsibility to construct the message into their respective formats and put them into some messaging system, so that they can pick from it. Not only that, you may sometime require to inform multiple system for a single event. This requires transformation, service orchestration, routing and lots of other integration concerns to care about. Also, not to forget you are also required to persist all domain events into some event store as a snapshot for future reference.

To address these concerns, we will implement a custom Event Bus which we will be using to publish events from application domain. We then be able to consume these events directly into spring integration flows, just like shown in above diagram, to notify external systems into their respective formats. There you will be having full liberty of addressing integration concerns using EIP patterns in spring integration flows.

Below is the basic interface of Eventbus that will be used to publish and subscribe events. Complete source code is available over GitHub.

package org.springframework.integration.eventbus;

public interface EventBus{
public boolean publish(Event event);
    boolean subscribe(String topic, Object subscriber);
    boolean unsubscribe(String topic, Object subscriber);
}

Behind the scenes, this eventbus implementation uses publish-subscribe-channel to send the events to all subscribes. First argument in Eventbus subscribe/unscribe methods should be the name of that publish-subscribe-channel. Whereas for pulishing domain events, eventbus will be able to extract it from Event type. Below code snippet shows how a domain event has to be defined:

public interface Event {
public String forTopic();
}

public abstract class DomainEvent implements Event {

  public int eventVersion;
  private Date createdOn;
  private transient String topic;

  protected DomainEvent(Date createdOn) {
    this.setCreatedOn(createdOn);
    this.setTopic("domainEventsChannel");
    this.setEventVersion(1);
  }

  private void setTopic(String topic) {
  this.topic = topic;
  }

  private void setCreatedOn(Date createdOn) {
  this.createdOn = createdOn;
  }

  public Date occurredOn() {
  return createdOn;
  }

  public String forTopic() {
  return topic;
  }

  private void setEventVersion(int eventVersion) {
  this.eventVersion = eventVersion;
  }

  public int eventVersion() {
  return eventVersion;
  }
}

public class FooDomainEvent extends DomainEvent {
  public FooDomainEvent() {
  super(new Date());
  }
}

Here «domainEventsChannel» is the name of the publish-subscribe-channel that has to be configured for EventBus as mentioned above. Below is the example spring configuration that wires up all necessary components together.

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration.xsd">

  <context:annotation-config />
  <!-- Helper utility used by domain to publish event using Eventbus -->
  <bean class="com.foobar.domain.model.DomainEventPublisher"/>
  <bean id="eventBus" class="org.springframework.integration.eventbus.SpringEventBus" />  
  <!-- Publish subscriber channel used by EventBus -->
  <int:publish-subscribe-channel id="domainEventsChannel"/>
  <!-- Eventstore where all events has to be persisted -->
  <bean id="eventStore" class="com.foobar.infrastructure.eventstore.InMemoryEventStore" />
  <int:service-activator input-channel="domainEventsChannel" ref="eventStore" />

  <import resource="foo-bar-integration-flow.xml"/>
</beans>

DomainEventPublisher is a convenient class that delegates call to EventBus for publishing the event. It is an implementation of Ambient Context Pattern, that helps you in publishing events from anywhere deep down in hierarchy of domain objects. You can provide access to eventbus as per your own taste, if trade-offs of using that pattern are not acceptable to you. Rest of the configuration is quite explanatory. Hence, let us move to foo-bar-integration-flow.xml.

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:stream="http://www.springframework.org/schema/integration/stream"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/stream
      http://www.springframework.org/schema/integration/stream/spring-integration-stream.xsd">

  <bean id="fooBarApplicationService" class="com.foobar.application.FooBarApplicationService"/>

  <int:payload-type-router input-channel="domainEventsChannel" default-output-channel="nullChannel">
  <int:mapping type="com.foobar.domain.model.BarDomainEvent" channel="barEvents"/>
  <int:mapping type="com.foobar.domain.model.FooDomainEvent" channel="fooEvents"/>
  </int:payload-type-router>

  <int:publish-subscribe-channel id="fooEvents"/>
  <int:publish-subscribe-channel id="barEvents"/>

  <int:chain input-channel="barEvents">
    <int:transformer expression="payload.getClass().getName()"/>
    <stream:stdout-channel-adapter append-newline="true"/>
  </int:chain>
  <int:service-activator input-channel="fooEvents" ref="fooBarApplicationService"/>

</beans>

In above integration flow, we are routing domain events into two different channels one that prints out barEvent at console and the other one calls application service method that is annotated with SubsribeEvent annotation, as shown in below code snippet.

import org.springframework.integration.eventbus.SubscribeEvent;
import org.springframework.stereotype.Service;
import com.foobar.domain.model.BarDomainEvent;
import com.foobar.domain.model.DomainEvent;

@Service
public class FooBarApplicationService extends BaseFooBarApplicationService{

  @SubscribeEvent
  public void when(BarDomainEvent event){
      System.out.println(this.getClass().getSimpleName()+"  received event "+event.getClass().getSimpleName());
  }

  @SubscribeEvent
  public void when(DomainEvent event) {
      System.out.println(this.getClass().getSimpleName()+"  received event "+event.getClass().getSimpleName());
  }
}

Although there is not much happening in above integration flow, but it is more than enough to illustrate how powerful that tool can be. You can now have the full liberty of routing, transforming, and publishing the event by any mean you want.

If you find above way of handing events bit verbose, in terms of notifying the subscriber beans, then you can use below configuration to make it more concise.

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:stream="http://www.springframework.org/schema/integration/stream"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/stream
      http://www.springframework.org/schema/integration/stream/spring-integration-stream.xsd">

  <context:annotation-config />
  <context:component-scan base-package="com.foobar.application"/>

  <bean class="com.foobar.domain.model.DomainEventPublisher"/>
  <!-- Event subscriber bean processor -->
  <bean id="eventSubscriberBeanProcessor" class="org.springframework.integration.eventbus.EventSubscriberBeanProcessor" />
  <bean id="eventBus" class="org.springframework.integration.eventbus.SpringEventBus" />
  <bean id="eventStore" class="com.foobar.infrastructure.eventstore.InMemoryEventStore" />

  <int:publish-subscribe-channel id="domainEventsChannel"/>
</beans>

In above configuration «eventSubscriberBeanProcessor» automatically registers all the beans that have methods annotated with SubscribeEvent annotation. Therefore, you do not have to always explicitly register them like we had done for «eventStore» and «fooBarApplicationService». Although this will give more advantage over previous configuration, make sure you should not violate the transaction boundary constraint of aggregates. Which states, only one aggregate can be modified in one transaction. Finally, below test case reveals how domain event can be published.

@RunWith(JUnit4.class)
public class SpringEventbusTestcase{

  @Test
  public void springEventBusVerboseSubscribeTest() {
    ApplicationContext context = new ClassPathXmlApplicationContext("/META-INF/spring/integration/spring-eventbus-verbose-subscribe-ctxt.xml");

    FooBarApplicationService applicationService = context.getBean(FooBarApplicationService.class);
    applicationService.updateFoo();
    applicationService.updateBar();

    // Test sending arbitrary domain event to validate that it gets consumed by BaseFooBarApplicationService
    DomainEventPublisher.publish(new DomainEvent(new Date()){});
    EventStore eventStore = context.getBean(EventStore.class);
    //Verify all above domain events are logged in event store
    Assert.assertTrue(eventStore.allDomainEventsSince(-1l).size()==3);
  }
}

Notice how I have used DomainEventPublisher to publish an event. Whereas, rest of the events are being published as from inside domain when updateFoo and updateBar methods are called.

That is it for the day. Grab the source code from GitHub play with it and enlighten me with your feedback.