Статьи

Транзакционное кэширование для верблюда с Infinispan

Некоторое время назад я создал разъем Redis для Camel. Redis — это отличное хранилище значений ключей (и многое другое), но затем мне понадобился кэш, работающий в той же JVM, что и Camel, и заметил Infinispan, который только что переключился на ASL v2 . В Camel уже есть другие соединители для кэширования на JVM, такие как Hazelcast и EHCache, но если вы уже используете Camel как часть других продуктов Red Hat или хотите посмотреть, как вытеснение LIRS превосходит LRU, стоит попробовать Infinispan.

Вкратце, Infinispan — это транзакционное хранилище ключей в памяти и сетка данных. При использовании во встроенном режиме Infinispan находится в той же JVM, что и Camel, и позволяет потребителю Camel получать уведомления об изменении кэша:

<route>
  <from uri="infinispan://localhost?cacheContainer=#cacheContainer&caseName=orders&eventTypes=CACHE_ENTRY_CREATED"/>
  <filter>
    <simple>${out.header.CamelInfinispanIsPre} == true</simple>
    <to uri="log:com.mycompany.order?showHeaders=true"/>
  </filter>
</route>

В приведенном выше примере, когда создается запись в кэше, Infinispan запустит два события — одно до и одно после создания записи в кэше. Также возможно получать события синхронно, то есть в том же потоке, который обрабатывает действие кэша, или асинхронно в отдельном потоке, не блокируя действие кэша.

Использовать Infinispan в качестве локального кэша очень просто, он предоставляет интерфейс ConcurrentMap и обладает обычными функциями истечения срока действия, удаления, пассивации, постоянного хранения, запросов и т. Д. Что делает Infinispan также сеткой данных, так это способность узлов обнаруживать другие узлы и реплицировать или распределять данные между собой. Репликация позволяет совместно использовать данные в кластере, тогда как при распределении используется согласованный алгоритм хеширования для достижения лучшей масштабируемости.
В режиме клиент-сервер Infinispan работает как автономное приложение, и производитель Camel может отправлять сообщения с помощью клиента Infinispan Hot Rod. Hot Rod — это двоичный, независимый от языка, интеллектуальный протокол, позволяющий взаимодействовать с серверами Infinisnap в топологии и с учетом распределения хэшей.

Производитель Infinispan в Camel в настоящее время предлагает операции GET , PUT , REMOVE и CLEAR . Вот пример того, как производитель помещает данные в кеш заказов:

<route>
  <from uri="direct:orderCache"/>
  <setHeader headerName="CamelInfinispanKey">
    <simple>${in.header.orderId}</simple>
  </setHeader>
  <setHeader headerName="CamelInfinispanValue">
    <simple>${in.header.orderTotal}</simple>
  </setHeader>
  <setHeader headerName="CamelInfinispanOperation">
    <simple>CamelInfinispanOperationPut</simple>
  </setHeader>
  <to uri="infinispan://localhost?caseName=orders"/>
</route>

Давайте создадим более интересный пример. Infinispan также соответствует JTA и может участвовать в транзакциях. Мы создадим REST API для регистрации пользователей, который сначала сохранит человека в реляционной базе данных с использованием компонента Camel sql, а затем поместит firstName в кэш Infinispan в той же транзакции. Мы сделаем это с использованием транзакционного маршрута Camel, поэтому, если во время маршрутизации произойдет ошибка, на любом этапе Camel обеспечит откат транзакции (для кеша и базы данных), чтобы база данных и кеш всегда были в последовательное состояние.

<route>
  <from uri="restlet:/persons?restletMethod=POST"/>
  <transacted/>

  <!-- PERSIST TO DB -->
  <to uri="sql:insert into person(firstName, lastName) values(:#firstName,:#lastName)?dataSource=#dataSource"/>

  <!-- DAMN EXCEPTION THROWER-->
  <filter>
    <simple>${in.header.lastName} == "damn"</simple>
    <throwException ref="damn"/>
  </filter>

  <!-- PUT TO CACHE -->
  <to uri="sql:select id from person WHERE id = (select max(id) from person)?dataSource=#dataSource"/>
  <setHeader headerName="personId">
    <simple>${body[0][ID]}</simple>
  </setHeader>
  <setHeader headerName="CamelInfinispanKey">
    <simple>${headerAs(personId, String)}</simple>
  </setHeader>
  <setHeader headerName="CamelInfinispanValue">
    <simple>${in.header.firstName}</simple>
  </setHeader>
  <setHeader headerName="CamelInfinispanOperation">
    <simple>CamelInfinispanOperationPut</simple>
  </setHeader>
  <to uri="infinispan://localhost?cacheContainer=#cacheContainer&caseName=orders"/>
</route>

Как видите, в маршруте нет волшебства или дополнительной настройки, это стандартный маршрут. У нас есть небольшой кусочек кода, который выдает исключение, когда человек lastName чертовски симулирует ошибки в середине маршрута.

Приложение работает в автономном режиме с диспетчером транзакций atomikos JTA. Сначала мы создаем JtaTransactionManager

<bean id="userTransaction" class="com.atomikos.icatch.jta.UserTransactionImp"/>
<bean id="userTransactionManager" class="com.atomikos.icatch.jta.UserTransactionManager"/>
<bean id="transactionManager" class="org.springframework.transaction.jta.JtaTransactionManager"> 
  <constructor-arg ref="userTransaction"/>
  <constructor-arg ref="userTransactionManager"/>
</bean>

Затем обернуть наши DataSource с ним:

public AtomikosDataSourceBean atomikosDataSourceBean() throws Exception {
    EmbeddedXADataSource ds = new EmbeddedXADataSource();
    ds.setCreateDatabase("create");
    ds.setDatabaseName("target/testdb");
    ds.setUser("");
    ds.setPassword("");
 
    AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean();
    xaDataSource.setXaDataSource(ds);
    xaDataSource.setUniqueResourceName("xaDerby");
    return xaDataSource;
}

и используя TransactionManagerLookup сказать Infinispan участвовать в той же транзакции:

public BasicCacheContainer basicCacheContainer() throws Throwable {
    GlobalConfiguration glob = new GlobalConfigurationBuilder().nonClusteredDefault().build();

    Configuration loc = new ConfigurationBuilder()
        .transaction().transactionMode(TransactionMode.TRANSACTIONAL)
        .transactionManagerLookup(new TransactionManagerLookup() {
            @Override
            public TransactionManager getTransactionManager() throws Exception {
            return jtaTransactionManager.getTransactionManager();
            }
        }).build();

    return new DefaultCacheManager(glob, loc, true);
}

После всего этого стандартного кода наш источник данных, кеш и маршрут Camel участвуют в одной транзакции. Чтобы увидеть полный пример REST с двухфазной фиксацией и откатом, получите исходный код от github и поиграйте с ним.

Кстати, верблюд-infinispan компонент еще не является частью ствола Camel, чтобы запустить пример вам нужно , что тоже.