Статьи

Синхронизация данных между приложениями Mule с MongoDB

Одна из многих вещей, которые Мул делает очень хорошо, это прозрачное сохранение состояния между сообщениями. Возьмите идемпотентный фильтр сообщений, например; Idempotent-message-filter отслеживает, какие сообщения были обработаны ранее, чтобы гарантировать, что повторяющиеся сообщения не будут обработаны. Это все достигается с помощью одного элемента XML. Или возьмите облачные соединители с поддержкой OAuth, которые должны отслеживать токены доступа, чтобы вы не перенаправлялись на страницу авторизации каждый раз. Все это происходит за кулисами, и вам не о чем беспокоиться.

Способность Мула хранить это состояние реализуется через объектные хранилища. Объектные хранилища предоставляют простой общий способ хранения данных в Mule. Многие обработчики сообщений, такие как idempotent-meesage-filter, обработчики сообщений OAuth, маршрутизаторы до успешного завершения и многие другие, все прозрачно используют хранилища объектов за кулисами для хранения состояния между сообщениями.

По умолчанию Mule использует скрытые хранилища объектов в памяти. Однако все становится более интересным, когда ваше приложение Mule распределено по нескольким узлам Mule. В какой-то момент вам понадобится синхронизировать хранилища объектов в нескольких приложениях. Если вам повезло с использованием корпоративной версии Mule, вы можете воспользоваться встроенной поддержкой кластеризации для синхронизации состояния по всему кластеру. Но даже тогда, как упомянуто в этом сообщении в блоге Джона Демика: Синхронизация приложений мулов между центрами данных с помощью Apache Cassandra , это может быть не идеальным, если ваши узлы Mule географически распределены по нескольким центрам обработки данных.

В вышеупомянутом посте Джон приводит отличный пример использования Apache Cassandra в качестве общего хранилища объектов. Но многие другие традиционные и NoSql хранилища данных также могут быть использованы. В этом посте я покажу простой пример того, как вы можете добиться того же, используя MongoDB. Если это достаточно хорошо для Hipster Hacker, то это достаточно хорошо для меня!

Настройка хранилища объектов MongoDB

MongoDB — это база данных документов с открытым исходным кодом и ведущая база данных NoSQL. Модуль MongoDB может использоваться для чтения / записи / запроса и выполнения всей загрузки другой операции с базой данных MongoDB как часть ваших потоков Mule. Он также предоставляет дополнительный подмодуль для использования базы данных MondogDB в качестве реализации хранилища объектов в Mule. Вот пример конфигурации из одного из тестов:

<?xml version="1.0" encoding="UTF-8"?>
<!--

    Mule Mongo Connector

    Copyright (c) MuleSoft, Inc.  All rights reserved.  http://www.mulesoft.com

    The software in this package is published under the terms of the CPAL v1.0
    license, a copy of which has been included with this distribution in the
    LICENSE.txt file.

-->

<mule xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:mos="http://www.mulesoft.org/schema/mule/mongo-object-store"
    xmlns:http="http://www.mulesoft.org/schema/mule/http" xmlns:script="http://www.mulesoft.org/schema/mule/scripting"
    xmlns:test="http://www.mulesoft.org/schema/mule/test" xmlns:spring="http://www.springframework.org/schema/beans"
    xmlns:util="http://www.springframework.org/schema/util" xmlns:p="http://www.springframework.org/schema/p"
    xsi:schemaLocation="
          http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
          http://www.mulesoft.org/schema/mule/mongo-object-store http://www.mulesoft.org/schema/mule/mongo-object-store/current/mule-mongo-object-store.xsd
          http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd
          http://www.mulesoft.org/schema/mule/scripting http://www.mulesoft.org/schema/mule/scripting/current/mule-scripting.xsd
          http://www.mulesoft.org/schema/mule/test http://www.mulesoft.org/schema/mule/test/current/mule-test.xsd
          http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
          http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd">

    <spring:beans>
        <spring:bean class="org.mule.module.mongo.FakeObjectStoreUser"
            p:objectStore-ref="mos" />
    </spring:beans>

    <mos:config name="mos" database="mongo-connector-test" />
</mule> 

Как видите, он классифицируется как собственный модуль с собственным пространством имен и настраивается просто с помощью элемента mos: config name = «mos» database = «mongo-connector-test» .

Тем не менее, это не работает для текущей версии разъема. Из-за следующей ошибки . Вы можете просто применить исправление в вышеупомянутом запросе на извлечение или создать хранилище объектов самостоятельно с помощью Spring — аналогично тому, как это делается в конфигурации Apache Cassandra.

Класс хранилища объектов можно найти здесь: MongoObjectStore.java . Как видите, он имеет различные аннотации для значений по умолчанию и инициализации класса. Однако, поскольку мы не используем это как полноценный модуль Mule Devkit, эти аннотации не имеют никакого эффекта. Поэтому мы должны сами передать значения по умолчанию и вызвать метод initialise при создании экземпляра нашего хранилища объектов.

Это довольно просто с Spring. Вот полный рабочий пример использования экземпляра объекта-хранилища Spring в качестве хранилища для idempotent-message-filter:

<?xml version="1.0" encoding="UTF-8"?>
<mule xmlns:mongo="http://www.mulesoft.org/schema/mule/mongo" xmlns:objectstore="http://www.mulesoft.org/schema/mule/objectstore"
	xmlns="http://www.mulesoft.org/schema/mule/core"
	xmlns:spring="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://www.mulesoft.org/schema/mule/mongo http://www.mulesoft.org/schema/mule/mongo/2.0/mule-mongo.xsd
		http://www.mulesoft.org/schema/mule/objectstore http://www.mulesoft.org/schema/mule/objectstore/1.0/mule-objectstore.xsd
		http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
		http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-current.xsd">
    
	<spring:beans>
		<spring:bean id="mongoObjectStore" class="org.mule.module.mongo.MongoObjectStore" 
			init-method="initialize" scope="singleton">
			<spring:property name="host" value="localhost"/>
			<spring:property name="port" value="27017"/>
			<spring:property name="database" value="test"/>
			<spring:property name="username" value=""/>
			<spring:property name="password" value=""/>
			<spring:property name="writeConcern" value="DATABASE_DEFAULT"/>
		</spring:bean>
	</spring:beans>
  
	<flow name="mongo-filter">
		<poll frequency="10000">
			<set-payload value="1" doc:name="Set Payload" />
		</poll>
		
		<idempotent-message-filter storePrefix="ids" idExpression="#[payload]">
			<spring-object-store ref="mongoObjectStore" />
		</idempotent-message-filter>
		
		<logger level="INFO" message="Passed Filter" />
	</flow>
	
</mule>

Если вы запустите эту конфигурацию, Mule будет каждые 10 секунд опрашивать одно и то же сообщение: «1». При первом опросе вы должны увидеть «Пропущенный фильтр» в выводе журнала. Последующие опросы не должны ничего регистрировать, поскольку фильтр идемпотентных сообщений отфильтрует их. Обычно это выполняется с помощью стандартного хранилища объектов в памяти. Чтобы доказать это, вы должны проверить значение в коллекции базы данных Mongo: db.getCollection (‘mule.objectstore._default’). Find () . Также вы можете остановить свое приложение Mule и перезапустить его, чтобы доказать, что оно работает между перезапусками и отказоустойчивостью и т. Д.