Статьи

Кэширование транзакций для Camel с Infinispan

Некоторое время назад я создал разъем Redis для Camel. Redis — это отличное хранилище значений ключей (и многое другое), но затем мне понадобился кэш, работающий в той же JVM, что и Camel, и заметил Infinispan, который только что переключился на ASL v2 . В Camel уже есть другие соединители для кэширования на JVM, такие как Hazelcast и EHCache, но если вы уже используете Camel как часть других продуктов Red Hat или хотите посмотреть, как вытеснение LIRS превосходит LRU, стоит попробовать Infinispan.

Вкратце, Infinispan — это транзакционное хранилище ключей в памяти и сетка данных. При использовании во встроенном режиме Infinispan находится в той же JVM, что и Camel, и позволяет потребителю Camel получать уведомления об изменении кэша:

1
2
3
4
5
6
7
<route>
  <from uri="infinispan://localhost?cacheContainer=#cacheContainer&caseName=orders&eventTypes=CACHE_ENTRY_CREATED"/>
  <filter>
    <simple>${out.header.CamelInfinispanIsPre} == true</simple>
    <to uri="log:com.mycompany.order?showHeaders=true"/>
  </filter>
</route>

В приведенном выше примере, когда создается запись в кэше, Infinispan запустит два события — одно до и одно после создания записи в кэше. Также возможно получать события синхронно, то есть в том же потоке, который обрабатывает действие кэша, или асинхронно в отдельном потоке, не блокируя действие кэша.

Использовать Infinispan в качестве локального кэша просто, он предоставляет интерфейс ConcurrentMap и имеет обычные функции истечения срока действия, удаления, пассивации, постоянного хранения, запросов и т. Д. Что делает Infinispan также сеткой данных, так это способность узлов обнаруживать другие узлы и реплицировать или распределять данные между собой. Репликация позволяет совместно использовать данные в кластере, тогда как при распределении используется согласованный алгоритм хеширования для достижения лучшей масштабируемости

В режиме клиент-сервер Infinispan работает как автономное приложение, и производитель Camel может отправлять сообщения с помощью клиента Infinispan Hot Rod. Hot Rod — это двоичный, независимый от языка, интеллектуальный протокол, позволяющий взаимодействовать с серверами Infinisnap в топологии и с учетом распределения хэшей.

Производитель Infinispan в Camel в настоящее время предлагает операции GET , PUT , REMOVE и CLEAR . Вот пример того, как производитель помещает данные в кеш заказов:

01
02
03
04
05
06
07
08
09
10
11
12
13
<route>
  <from uri="direct:orderCache"/>
  <setHeader headerName="CamelInfinispanKey">
    <simple>${in.header.orderId}</simple>
  </setHeader>
  <setHeader headerName="CamelInfinispanValue">
    <simple>${in.header.orderTotal}</simple>
  </setHeader>
  <setHeader headerName="CamelInfinispanOperation">
    <simple>CamelInfinispanOperationPut</simple>
  </setHeader>
  <to uri="infinispan://localhost?caseName=orders"/>
</route>

Давайте создадим более интересный пример. Infinispan также соответствует JTA и может участвовать в транзакциях. Мы создадим REST API для регистрации пользователей, который сначала сохранит человека в реляционной базе данных с использованием компонента Camel sql, а затем поместит firstName в кэш Infinispan в той же транзакции. Мы сделаем это с использованием транзакционного маршрута Camel, поэтому, если во время маршрутизации произойдет ошибка, на любом этапе Camel обеспечит откат транзакции (для кеша и базы данных), чтобы база данных и кеш всегда были в последовательное состояние.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
<route>
  <from uri="restlet:/persons?restletMethod=POST"/>
  <transacted/>
 
  <!-- PERSIST TO DB -->
  <to uri="sql:insert into person(firstName, lastName) values(:#firstName,:#lastName)?dataSource=#dataSource"/>
 
  <!-- DAMN EXCEPTION THROWER-->
  <filter>
    <simple>${in.header.lastName} == "damn"</simple>
    <throwException ref="damn"/>
  </filter>
 
  <!-- PUT TO CACHE -->
  <to uri="sql:select id from person WHERE id = (select max(id) from person)?dataSource=#dataSource"/>
  <setHeader headerName="personId">
    <simple>${body[0][ID]}</simple>
  </setHeader>
  <setHeader headerName="CamelInfinispanKey">
    <simple>${headerAs(personId, String)}</simple>
  </setHeader>
  <setHeader headerName="CamelInfinispanValue">
    <simple>${in.header.firstName}</simple>
  </setHeader>
  <setHeader headerName="CamelInfinispanOperation">
    <simple>CamelInfinispanOperationPut</simple>
  </setHeader>
  <to uri="infinispan://localhost?cacheContainer=#cacheContainer&caseName=orders"/>
</route>

Как видите, в маршруте нет волшебства или дополнительной настройки, это стандартный маршрут. У нас есть небольшой кусочек кода, который выдает исключение, когда человек lastName чертовски симулирует ошибки в середине маршрута.

Приложение работает в автономном режиме с диспетчером транзакций atomikos JTA. Сначала мы создаем JtaTransactionManager, который будет использоваться транзакционным маршрутом:

1
2
3
4
5
6
<bean id="userTransaction" class="com.atomikos.icatch.jta.UserTransactionImp"/>
<bean id="userTransactionManager" class="com.atomikos.icatch.jta.UserTransactionManager"/>
<bean id="transactionManager" class="org.springframework.transaction.jta.JtaTransactionManager">
  <constructor-arg ref="userTransaction"/>
  <constructor-arg ref="userTransactionManager"/>
</bean>

Затем оберните наш источник данных этим:

01
02
03
04
05
06
07
08
09
10
11
12
public AtomikosDataSourceBean atomikosDataSourceBean() throws Exception {
    EmbeddedXADataSource ds = new EmbeddedXADataSource();
    ds.setCreateDatabase("create");
    ds.setDatabaseName("target/testdb");
    ds.setUser("");
    ds.setPassword("");
  
    AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean();
    xaDataSource.setXaDataSource(ds);
    xaDataSource.setUniqueResourceName("xaDerby");
    return xaDataSource;
}

и используя TransactionManagerLookup сказать Infinispan участвовать в той же транзакции:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
public BasicCacheContainer basicCacheContainer() throws Throwable {
    GlobalConfiguration glob = new GlobalConfigurationBuilder().nonClusteredDefault().build();
 
    Configuration loc = new ConfigurationBuilder()
        .transaction().transactionMode(TransactionMode.TRANSACTIONAL)
        .transactionManagerLookup(new TransactionManagerLookup() {
            @Override
            public TransactionManager getTransactionManager() throws Exception {
            return jtaTransactionManager.getTransactionManager();
            }
        }).build();
 
    return new DefaultCacheManager(glob, loc, true);
}

После всего этого стандартного кода наш источник данных, кеш и маршрут Camel участвуют в одной транзакции. Чтобы увидеть полный пример REST с двухфазной фиксацией и откатом, получите исходный код от github и поиграйте с ним.

Кстати, компонент Camel-infinispan все еще не является частью магистрали Camel, для запуска примера вам это тоже понадобится.