Статьи

Большой набор данных в Mule

Недавно клиент задал вопрос о том, как выполнить поиск большого набора данных в Mule. На странице документации кратко объясняется, как это может быть достигнуто, однако, насколько я могу судить, нет рабочего примера, как это сделать Цель этого блога — подробно объяснить, как работает поиск больших наборов данных в Mule, на примере.

Клиент хотел перенести элементы из одной базы данных в другую, выполнив пакетный выбор, а затем пакетную вставку. Часть «пакетной вставки» довольно проста и выполняется автоматически Mule, когда полезная нагрузка имеет тип List. Однако выбор партии выполняется по-другому.

Чтобы извлечь все записи, мы будем использовать диспетчер пакетов, чтобы вычислить диапазоны идентификаторов для следующей партии записей, которую нужно извлечь. Это предоставляется из коробки с Mule EE.

Мы начнем с определения базы данных, которая будет использоваться в этом примере для извлечения и вставки записей. Для простоты мы будем использовать базу данных Derby в памяти.

ПРИМЕЧАНИЕ: записи должны быть идентифицированы по уникальному ключу в последовательном числовом порядке.

CREATE TABLE table1(KEY1 INTEGER GENERATED BY DEFAULT AS IDENTITY(START WITH 1)  NOT NULL PRIMARY KEY, KEY2 VARCHAR(255));
CREATE TABLE table2(KEY1 VARCHAR(255), KEY2 VARCHAR(255));

INSERT INTO table1(KEY2) VALUES ('TEST1');
INSERT INTO table1(KEY2) VALUES ('TEST2');
INSERT INTO table1(KEY2) VALUES ('TEST3');
INSERT INTO table1(KEY2) VALUES ('TEST4');
INSERT INTO table1(KEY2) VALUES ('TEST5');
INSERT INTO table1(KEY2) VALUES ('TEST6');
INSERT INTO table1(KEY2) VALUES ('TEST7');
INSERT INTO table1(KEY2) VALUES ('TEST8');
INSERT INTO table1(KEY2) VALUES ('TEST9');
INSERT INTO table1(KEY2) VALUES ('TEST10');

Как объяснялось ранее, запрос на выборку основан на диапазонах идентификаторов, которые вычисляются диспетчером пакетов при вызове nextBatch () . Это вернет карту с нижним и верхним идентификаторами, которые будут выбраны. В нашем случае мы храним эту карту в переменную потока с именем «Границы».

<!-- Defining the JDBC connector and queries -->

<jdbc-ee:connector name="jdbcConnectorOne"
	dataSource-ref="Derby_Data_Source" validateConnections="true"
	doc:name="Database">
	<!-- in the query "selectSample" KEY1 is an autoincremented 
	column, whilst the map boundaries variable contains the current lowestID 
	and upperID to be proccessed (accessed in the query through ( #[flowVars.boundaries.lowerId] 
	and #[flowVars.boundaries.upperId) these values are automatically increased by the batch manager. -->

	<jdbc-ee:query key="selectSample"
		value="select KEY1,KEY2 from table1 where KEY1 between #[flowVars.boundaries.lowerId] and 
#[flowVars.boundaries.upperId] order by KEY1" />
</jdbc-ee:connector>

После настройки базы данных и JDBC-коннектора нам нужно настроить Batch Manager. Это состоит из указания idStore (который является текстовым файлом), который BatchManager использует для хранения начальной точки для следующего пакета. Более того, в Batch Manager нам нужно настроить размер пакета и начальную точку.

<!-- Setting up the Batch Manager -->
<spring:bean id="idStore"
	class="com.mulesoft.mule.transport.jdbc.util.IdStore">
        <spring:property name="fileName" value="/tmp/large-dataset.txt" />
</spring:bean>

<spring:bean id="seqBatchManager"
	class="com.mulesoft.mule.transport.jdbc.components.BatchManager">
	<spring:property name="idStore" ref="idStore" />
	<spring:property name="batchSize" value="2" />
	<spring:property name="startingPointForNextBatch" value="0" />
</spring:bean>

В документации вы найдете ссылку на noArgsWrapper. Его задача — вызывать метод nextBatch () в диспетчере пакетов . Однако мы находим это очень запутанным и вводящим в заблуждение, поэтому вместо этого мы используем простое выражение MEL, которое напрямую вызывает nextBatch () .

Теперь нам нужно настроить основной поток, в котором мы выполняем пакетный выбор. Учитывая, что записи извлекаются партиями, поток должен вызываться несколько раз, пока не будут получены все записи. Чтобы решить эту проблему, мы создали составной источник, чтобы в конце потока, если мы не получили все записи, мы повторно запустили тот же поток, используя очередь ВМ.

<composite-source doc:name="Composite Source">
	<http:inbound-endpoint address="http://localhost:8081/batch" doc:name="HTTP"/>
	<vm:inbound-endpoint path="batch" doc:name="VM"/>
</composite-source>

<flow-ref name="startBatch" doc:name="Flow Reference"/>

<logger level="ERROR" message="after component: #[payload]" doc:name="Logger"/>

<!-- Batch Select -->

<jdbc-ee:outbound-endpoint exchange-pattern="request-response"
	queryKey="selectSample" connector-ref="jdbcConnectorOne" doc:name="Database" />
<sub-flow name="startBatch" doc:name="startBatch" doc:description="When invoked a map consisting of the lowerid and the upperid will be returned from the batch manager">
	<set-variable variableName="boundaries"
	value="#[app.registry.seqBatchManager.nextBatch()]" doc:name="Variable"/>
</sub-flow>

Как только текущий пакет завершен, нам нужно вызвать функцию CompetteBatch (), чтобы сообщить менеджеру пакета, что мы закончили с текущим пакетом и готовы обработать следующий. Если этого не сделать, диспетчер пакетов все равно будет рассматривать предыдущую партию как «обработку». Кроме того, мы должны проверить, извлекли ли мы все записи, чтобы мы могли остановить обработку. Мы делаем это, проверяя размер полезной нагрузки, возвращаемой из исходящей конечной точки JDBC. Если размер полезной нагрузки равен «0» (больше записей не нужно извлекать), мы должны вызвать completeBatch ()метод с ‘-1’, инструктируя Batch Manager, что вся партия завершена. Мы также должны установить начальную точку для следующей партии в «0». Это необходимо для того, чтобы при повторном запуске потока из входящей конечной точки HTTP поток начал обработку с первой записи.

Если пакет не завершен, мы вызываем метод completeBatch () (из класса BatchManager) с текущим upperId. Это устанавливает новую начальную точку для следующей партии, которая будет обработана. Наконец, мы заканчиваем поток исходящей виртуальной машиной в «пакете», который запускает основной поток для обработки следующего пакета записей.

<flow-ref name="saveSizeOfBatch" doc:name="Flow Reference"/>

<!-- processing here -->

<flow-ref name="completeBatch" doc:name="Flow Reference"/>

<logger level="ERROR" message="after jdbc batch select: #[payload]" doc:name="Logger"/>
<sub-flow name="saveSizeOfBatch" doc:name="saveSizeOfBatch" doc:description="Get the size of the current batch">
	<set-variable variableName="size" value="#[payload.size()]" doc:name="Variable"/>
</sub-flow>
<sub-flow name="completeBatch" doc:name="completeBatch">
    <choice doc:name="Choice">
	<when expression="flowVars.size == 0">
		<expression-component doc:name="Expression">
			app.registry.seqBatchManager.completeBatch(-1);
			app.registry.seqBatchManager.setStartingPointForNextBatch(0);
		</expression-component>
	</when>
	<otherwise>
		<expression-component doc:name="Expression">
			app.registry.seqBatchManager.completeBatch(flowVars.boundaries.upperId);
		</expression-component>
		<vm:outbound-endpoint path="batch" doc:name="VM"/>
	</otherwise>
    </choice>
</sub-flow>

Полная конфигурация мула основного потока показана здесь ниже.

<!-- Batch Select flow -->
	<flow name="BatchSelect" 
	doc:description="
	1. Get the first trigger from the http inbound endpoint
	2. Invoke the startBatch sub-flow to start the batch manager
	3. Invoke the JDBC batch select
	4. Insert the list of records (this will be automatically done as a batch insert)
	5. Set the batch size
	6. Call the completeBatch sub-flow to either stop processing or invoking the next batch by invoking the vm:inbound endpoint." 
	doc:name="BatchSelect">

		<composite-source doc:name="Composite Source">
			<http:inbound-endpoint address="http://localhost:8081/batch" doc:name="HTTP"/>
			<vm:inbound-endpoint path="batch" doc:name="VM"/>
		</composite-source>

		<flow-ref name="startBatch" doc:name="Flow Reference"/>

		<logger level="ERROR" message="after component: #[payload]" doc:name="Logger"/>

		<!-- Batch Select -->

		<jdbc-ee:outbound-endpoint exchange-pattern="request-response"
			queryKey="selectSample" connector-ref="jdbcConnectorOne" doc:name="Database" />
		
		<flow-ref name="saveSizeOfBatch" doc:name="Flow Reference"/>

		<!-- processing here -->

		<flow-ref name="completeBatch" doc:name="Flow Reference"/>

		<logger level="ERROR" message="after jdbc batch select: #[payload]" doc:name="Logger"/>

	</flow>