Статьи

Новый пакетный модуль Mule и опрос с водяным знаком


В выпуске Mule 3.5 появилось много новых функций.
В этом посте я кратко объясню, как можно использовать четыре из этих новых функций:

  • Пакетный модуль
  • Планировщик водяных знаков и cron в обработчике сообщений опроса
  • Новый коннектор базы данных

Затем я покажу, как применять эти новые функции для достижения функциональности, аналогичной описанной в предыдущем  сообщении в блоге .

Пакетный модуль

Пакетный модуль позволяет нам разрабатывать приложение Mule для выполнения пакетной обработки более простым и интуитивно понятным способом.

В новом пакетном модуле пакетное задание разделено на 4 части; Ввод, загрузка и отправка (выполняются автоматически Mule), обработка и завершение.

Это объясняется ниже.

Входная фаза

Как следует из названия, фаза ввода — это часть в пакетном задании, где данные загружаются и подготавливаются для обработки пакетным модулем. Поскольку мы можем вызвать пакет, используя команду «Выполнить пакет» (даже из потока), фаза ввода является необязательной. Это похоже на поведение (частных) потоков, где источником сообщения может быть либо другой поток, пакетное задание, либо входящая конечная точка.

Фаза загрузки и отправки

Фаза «загрузка и отправка» выполняется Mule автоматически; здесь Mule помещает каждое сообщение из полезной нагрузки в очередь для фазы процесса.

Фаза процесса

Этап процесса разделен на серию пакетных шагов, каждый пакетный шаг может иметь несколько обработчиков сообщений (исходящие конечные точки, ссылки на потоки, компоненты Java и т. Д.), А также каждый шаг может выполняться в зависимости от того, было ли сообщение успешным или неудачным на предыдущем шаге. ,

На завершающей фазе

На этапе «На завершении» есть результаты пакетного задания, которые можно использовать для составления отчетов.

Дополнительную информацию о пакетном модуле можно найти в следующем сообщении в блоге и документации Mule.

Обработчик сообщений опроса

Процессор опроса теперь улучшен, чтобы включить функциональность для определения выражения cron следующим образом:

<poll>
<schedulers:cron-scheduler expression="0 0/1 * 1/1 * ? *" />
</poll>

Также важно отметить, что если вы собираетесь использовать частоту опроса, теперь она должна быть определена в подэлементе (называемом планировщиком с фиксированной частотой) вместо атрибута в самом процессоре опроса.

<poll>
<fixed-frequency-scheduler frequency="1" timeUnit="MINUTES" startDelay="0"/>
</poll>

водяной знак

Водяной знак позволяет нам получить определенное значение (например, самый большой идентификатор) из опрашиваемых элементов и сохранить это значение в хранилище объектов (если хранилище объектов не определено, оно создается и используется водяным знаком автоматически).

Чтобы использовать эту функцию, нам нужно указать либо селектор и выражение селектора, либо мы можем использовать выражение обновления.

Атрибут Selector может принимать значения MIN, MAX, FIRST, LAST. Выражение селектора принимает выражение для получения требуемого элемента, например # [message.payload [‘Id’]].

<poll>
<fixed-frequency-scheduler frequency="1" timeUnit="MINUTES" />
<watermark variable="Id" default-expression="#[0]" selector="MAX" selector-expression="#[payload['id']]" />
</poll>

С другой стороны, выражение update позволяет нам определить пользовательское выражение, которое обновляет значение водяного знака (обычно это переменная, которую мы устанавливаем в потоке).

<poll>
<fixed-frequency-scheduler frequency="1" timeUnit="MINUTES" />
<watermark variable="Id" default-expression="#[0]" update-expression="#[flowVars['myId']]" />
</poll>...
<set-variable variableName="myId" value="#[payload[payload.size()-1]['id']]" />

Запись; при отладке в Anypoint Studio значение водяного знака сохраняется между различными выполнениями приложения, это связано с тем, что значение водяного знака по умолчанию сохраняется в постоянном хранилище объектов. Если во время разработки это нежелательное поведение, можно изменить хранилище объектов, чтобы использовать память, как описано в другом сообщении в блоге,  или изменить конфигурацию выполнения, чтобы очистить данные приложения.

Новый коннектор базы данных

Mule теперь имеет новый коннектор базы данных, он имеет большие улучшения по сравнению с предыдущим коннектором JDBC, включая потоковую передачу для выбора, поддержку массового режима для вставок, поддержку автоматически генерируемых ключей, поддержку Datasense, а также лучшую поддержку динамических запросов.

Пересмотр поиска большого набора данных в Mule с новыми функциями Mule.

В этом разделе я определю несколько способов выполнения пакетных заданий с новым соединителем базы данных и функцией водяных знаков.

В первом примере я показываю, как мы можем определить пакетное задание, которое опрашивает с заданными интервалами для следующих значений. Ограничением первого примера может быть то, что мы хотим, чтобы все доступные в базе данных значения опрашивались в указанное время (особенно при использовании cron). Это обрабатывается во втором примере.

Чтобы начать создавать пакетное задание, нам сначала нужно определить bean-компонент Spring для источника данных (в данном случае он использует источник данных derby).

<spring:beans>
	<spring:bean id="dataSource" class="org.springframework.jdbc.datasource.SimpleDriverDataSource">
		<spring:property name="driverClass" value="org.apache.derby.jdbc.EmbeddedDriver"/>
		<spring:property name="url" value="jdbc:derby:target/database/message;create=true"/>
		<spring:property name="username" value="user"/>
		<spring:property name="password" value="password"/>
	</spring:bean>
</spring:beans>

Затем нам нужно указать конфигурацию базы данных следующим образом:

<db:generic-config name="Generic_Database_Configuration" dataSource-ref="dataSource"/>

Определив конфигурацию базы данных, мы можем приступить к созданию пакетного задания.

Сначала нам нужно определить нашу входную фазу, в которой есть элемент опроса (который позволяет нам указать выражение Cron или фиксированную частоту для частоты опроса), который опрашивает выбор базы данных.

Чтобы использовать фиксированную частоту, укажите элемент опроса следующим образом:

<poll>
<fixed-frequency-scheduler frequency="1" timeUnit="MINUTES" startDelay="0"/>
</poll>

Чтобы использовать планировщик cron, укажите элемент опроса следующим образом:

<poll>
<schedulers:cron-scheduler expression="0 0/1 * 1/1 * ? *" />
</poll>

Теперь мы можем определить запрос, используя новый соединитель базы данных. С новым соединителем базы данных есть три способа сделать запрос выбора:

  1. «Параметризованных»
  2. ‘Dynamic’
  3. «Из шаблона»

В этом случае мы используем запрос «Параметризованный», поскольку наш оператор выбора не должен быть динамическим, и поскольку мы не собираемся повторно использовать запрос в нашем приложении.

Теперь наш элемент Polling должен выглядеть примерно так:

<poll doc:name="Poll">
	<fixed-frequency-scheduler frequency="1" timeUnit="HOURS"/>
	<db:select config-ref="Generic_Database_Configuration">
	<db:parameterized-query><![CDATA[SELECT KEY1, KEY2 FROM table1 ]]></db:parameterized-query>
	</db:select>
</poll>

Предполагая, что мы хотим сделать запрос к базе данных, который получит нам только следующие элементы в базе данных, мы можем использовать водяной знак; в этом случае мы можем указать селектор и выражение-селектор.

Атрибут «селектор» может принимать значения MIN, MAX, FIRST, LAST. В этом примере мы используем MAX, поскольку автоматически сгенерированный номер в базе данных находится в порядке возрастания. Выражение селектора принимает выражение для получения необходимого элемента. В таком случае; # [Message.payload [ ‘key1’]].

Мы также должны определить выражение по умолчанию (выражение, которое будет вызываться при первом запуске), а также переменную, которую мы используем для хранения результата водяного знака (в этом примере мы называем его «Id»).

Это должно выглядеть примерно так:

<watermark variable="Id" default-expression="#[0]"
					selector="MAX" selector-expression="#[message.payload['key1']]" />

Кроме того, мы должны изменить наш запрос, чтобы выбрать только те элементы, которые больше, чем значение переменной «Id».

<db:parameterized-query><![CDATA[SELECT key1, key2 FROM table1 WHERE BETWEEN #[flowVars['Id'] +1 ] AND #[flowVars['Id'] + 10 ]]]></db:parameterized-query>

Элемент poll теперь должен выглядеть так:

			<poll doc:name="Poll">
				<fixed-frequency-scheduler frequency="1"
					timeUnit="HOURS" />
				<watermark variable="Id" default-expression="#[0]"
					selector="MAX" selector-expression="#[message.payload['key1']]" />
				<db:select config-ref="Generic_Database_Configuration"
					doc:name="Database">
					<db:parameterized-query><![CDATA[SELECT key1, key2 FROM table1 WHERE BETWEEN #[flowVars['Id'] +1 ] AND #[flowVars['Id'] + 10 ]]]></db:parameterized-query>
				</db:select>
			</poll>

Теперь, когда у нас определена входная фаза, мы можем начать определять фазу обработки.

Как показано ранее, каждый элемент в списке преобразуется в запись на этапе «Загрузка и отправка», и каждая запись затем обрабатывается на этапе обработки в серии из одного или нескольких этапов пакета.

Сначала нам нужно сгруппировать эти элементы с помощью пакетного коммита, атрибут size определяет количество элементов, которые мы хотим зафиксировать в каждом пакете. После того, как мы определим пакетную фиксацию, мы можем определить конечную точку вставки базы данных с помощью параметризованного запроса. Так как мы хотим, чтобы результаты были вставлены массово, мы должны установить для атрибута массового режима в конечной точке базы данных значение true.

	<batch:commit doc:name="Batch Commit" size="10">
		<db:insert config-ref="Generic_Database_Configuration" bulkMode="true" doc:name="Copy_of_Database">
			<db:parameterized-query><![CDATA[INSERT into table2(KEY1, KEY2) values(#[payload['key1']], #[payload['key2']])]]></db:parameterized-query>
		</db:insert>
	</batch:commit>

В качестве дополнительного шага, после этого пакетного шага мы также можем определить пакетный шаг, который обрабатывает только ошибочные сообщения (обратите внимание на политику принятия), для простоты мы просто собираемся определить регистратор.

			<batch:step name="BatchFailed" accept-policy="ONLY_FAILURES">
				<logger doc:name="Logger" level="ERROR"
					message="Record with the following payload has failed. Payload:: #[message.payload]" />
			</batch:step>

Поскольку фаза обработки теперь определена, единственная недостающая часть — это фаза «завершена».

Как описано ранее, этап завершения используется для сбора результатов партии, которую мы обработали. Здесь мы собираемся использовать регистратор для демонстрации этой функции, однако я уверен, что эти значения можно использовать более креативно для создания более потрясающих отчетов в реальных приложениях.

		<batch:on-complete>
			<logger message="Number of failed Records: #[payload.failedRecords] "
				level="INFO" doc:name="Failed Records" />
			<logger message="Number of sucessfull Records: #[payload.successfulRecords]"
				level="INFO" doc:name="Sucessfull Records" />
			<logger message="ElapsedTime #[payload.getElapsedTimeInMillis()]"
				level="INFO" doc:name="Elapsed Time" />
		</batch:on-complete>

Ниже приведена полная последовательность действий для этого примера:

<spring:beans>
	<spring:bean id="dataSource" class="org.springframework.jdbc.datasource.SimpleDriverDataSource">
		<spring:property name="driverClass" value="org.apache.derby.jdbc.EmbeddedDriver"/>
		<spring:property name="url" value="jdbc:derby:target/database/message;create=true"/>
		<spring:property name="username" value="user"/>
		<spring:property name="password" value="password"/>
	</spring:bean>
</spring:beans>
	<db:generic-config name="Generic_Database_Configuration"
		dataSource-ref="dataSource" />

	<batch:job name="batch-example-1">
		<batch:threading-profile poolExhaustedAction="WAIT" />
		<batch:input>
			<poll doc:name="Poll">
				<fixed-frequency-scheduler frequency="1"
					timeUnit="HOURS" />
				<watermark variable="Id" default-expression="#[0]"
					selector="MAX" selector-expression="#[message.payload['key1']]" />
				<db:select config-ref="Generic_Database_Configuration"
					doc:name="Database">
                    <db:parameterized-query><![CDATA[SELECT key1, key2 FROM table1 WHERE key1 BETWEEN #[flowVars['Id'] +1 ] AND #[flowVars['Id'] + 10 ]]]></db:parameterized-query>
				</db:select>
			</poll>
		</batch:input>
		<batch:process-records>
			<batch:step name="Batch_Step">
	<batch:commit doc:name="Batch Commit" size="10">
		<db:insert config-ref="Generic_Database_Configuration" bulkMode="true" >
			<db:parameterized-query><![CDATA[INSERT into table2(KEY1, KEY2) values(#[payload['key1']], #[payload['key2']])]]></db:parameterized-query>
		</db:insert>
	</batch:commit>
			</batch:step>
			<batch:step name="BatchFailed" accept-policy="ONLY_FAILURES">
				<logger doc:name="Logger" level="ERROR"
					message="Record with the following payload has failed. Payload:: #[message.payload]" />
			</batch:step>
		</batch:process-records>
		<batch:on-complete>
			<logger message="Number of failed Records: #[payload.failedRecords] "
				level="INFO" doc:name="Failed Records" />
			<logger message="Number of sucessfull Records: #[payload.successfulRecords]"
				level="INFO" doc:name="Sucessfull Records" />
			<logger message="ElapsedTime #[payload.getElapsedTimeInMillis()]"
				level="INFO" doc:name="Elapsed Time" />
		</batch:on-complete>
	</batch:job>

Теперь, когда мы увидели, как мы можем реализовать это очень легко, мы можем перейти к обсуждению того, как мы можем улучшить это, чтобы повторить то же функциональное поведение, что и в блоге «Извлечение большого набора данных в муле».

Существует три основных различия между реализацией публикации в блоге «Извлечение большого набора данных в муле» и приведенной выше реализацией:

  1. Идентификатор не сбрасывается в «0» после завершения всей партии.
  2. Мы не проводим опрос сразу после этого, но мы ждем следующей частоты опроса.
  3. Конечная точка HTTP используется для запуска пакета.

Для первого требования нам нужно изменить водяной знак, не используя выражение селектора, чтобы использовать вместо него выражение обновления.

Для второго требования нам нужно:

  • Чтобы убедиться, что планировщик запускается точно после друг друга
  • Чтобы убедиться, что никакой другой планировщик не запускается до завершения этого опроса.

Третье требование может быть достигнуто путем остановки и запуска фазы пакетного ввода, однако я считаю, что это выходит за рамки этого сообщения в блоге.

Первое требование довольно легко выполнить, нам просто нужно сделать пару простых изменений.

Первое изменение, которое нам нужно сделать, это изменить водяной знак следующим образом:

<watermark variable="Id" default-expression="#[0]" update-expression="#[flowVars['myId']]" />

Нам также нужно заполнить «myId» до «0», когда не осталось записей для обработки, и до наибольшего значения, когда существуют дополнительные записи, такие как:

			<choice>
				<when expression="#[payload.size() > 0]">
					<set-variable variableName="myId" value="#[payload[payload.size()-1]['key1']]" />
				</when>
				<otherwise>
					<set-variable variableName="myId" value="#[0]" doc:name="set id" />
				</otherwise>
			</choice>

Нам также нужно изменить запрос к базе данных, который будет упорядочен по ключу 1 в порядке возрастания, а также изменить значения фильтра, чтобы они находились между текущим идентификатором и количеством элементов, которые мы хотим получить:

<db:select config-ref="Generic_Database_Configuration">
						<db:parameterized-query><![CDATA[SELECT key1, key2 FROM table1 WHERE key1 BETWEEN #[flowVars['Id'] +1 ] AND #[flowVars['Id'] + 10 ] order by key1 asc]]></db:parameterized-query>
					</db:select>

Второе требование немного сложнее в реализации. Для этого требования нам нужно убедиться, что во время текущего опроса не будет другого опроса; чтобы сделать это, мы должны убедиться, что планировщик остановлен сразу после выбора базы данных.

Нам также нужно вручную опросить дальнейшие записи на случай, если у текущего результата есть записи для обработки. В случае, если дальнейшие записи не обрабатываются, мы должны убедиться, что планировщик работает, чтобы он автоматически запускался в течение указанного интервала.

Это требует, чтобы мы использовали немного пользовательского кода Java, который получает планировщик для нас, чтобы получить необходимые функциональные возможности для остановки, запуска и планирования программы опроса.

Это код Java, который я использовал для этого:

package com.ricston.blog.example;

import java.util.Collection;
import org.mule.api.MuleContext;
import org.mule.api.MuleEventContext;
import org.mule.api.MuleException;
import org.mule.api.schedule.Scheduler;
import org.mule.modules.schedulers.cron.CronScheduler;
import org.mule.transport.polling.schedule.FixedFrequencyScheduler;
import org.mule.util.Predicate;

public class SchedulerWrapper{


	protected Scheduler getScheduler(MuleEventContext eventContext) {
		MuleContext context = eventContext.getMuleContext();
		final String flowName = eventContext.getFlowConstruct().getName();

		// Get poll scheduler so as to stop and start and poll the endpoint.
		Collection<Scheduler> pollSchedulers = context.getRegistry()
				.lookupScheduler(new Predicate<String>() {
					@Override
					public boolean evaluate(String s) {
						// the scheduler name should be polling:// + flow
						// name/batchjob name + /..
						if (s.startsWith("polling://" + flowName + "/")) {

							return true;
						} else {
							return false;
						}
					}
				});

		// There should be only one scheduler
		if (pollSchedulers.size() == 1) {
			Scheduler pollScheduler = pollSchedulers.iterator().next();
			return pollScheduler;
		} else {
			throw new IllegalStateException("Was expecting one scheduler but there were:" + pollSchedulers.size());
		}

	}

	protected void stopScheduler(Scheduler scheduler) throws MuleException {
		if (scheduler instanceof FixedFrequencyScheduler) {
			@SuppressWarnings("rawtypes")
			FixedFrequencyScheduler fixedFrequencyScheduler = (FixedFrequencyScheduler) scheduler;
			fixedFrequencyScheduler.stop();
		} else if (scheduler instanceof CronScheduler) {
			CronScheduler cronScheduler = (CronScheduler) scheduler;
			cronScheduler.stop();
		} else {
			throw new IllegalArgumentException(
					"Expected instance of org.mule.transport.polling.schedule.FixedFrequencyScheduler or org.mule.transport.polling.schedule.CronScheduler, but argument was "
							+ scheduler.getClass());
		}
	}
	
	protected void startScheduler(Scheduler scheduler) throws MuleException {
		if (scheduler instanceof FixedFrequencyScheduler) {
			@SuppressWarnings("rawtypes")
			FixedFrequencyScheduler fixedFrequencyScheduler = (FixedFrequencyScheduler) scheduler;
			fixedFrequencyScheduler.start();
		} else if (scheduler instanceof CronScheduler) {
			CronScheduler cronScheduler = (CronScheduler) scheduler;
			cronScheduler.start();
		} else {
			throw new IllegalArgumentException(
					"Expected instance of org.mule.transport.polling.schedule.FixedFrequencyScheduler or org.mule.transport.polling.schedule.CronScheduler, but argument was " + scheduler.getClass());
		}
	}

	
	protected void scheduleScheduler(Scheduler scheduler) throws Exception{
		if (scheduler instanceof FixedFrequencyScheduler) {
			@SuppressWarnings("rawtypes")
			FixedFrequencyScheduler fixedFrequencyScheduler = (FixedFrequencyScheduler) scheduler;
			fixedFrequencyScheduler.schedule();
		} else if (scheduler instanceof CronScheduler) {
			CronScheduler cronScheduler = (CronScheduler) scheduler;
			cronScheduler.schedule();
		} else {
			throw new IllegalArgumentException(
					"Expected instance of org.mule.transport.polling.schedule.FixedFrequencyScheduler or org.mule.transport.polling.schedule.CronScheduler, but argument was " + scheduler.getClass());
		}
		
	}

}

Обратите внимание, что при этом предполагается, что существует только одно имя пакета, начинающееся с того же имени пакета, за которым следует ‘/’, однако это можно уточнить, используя регулярное выражение.

package com.ricston.blog.example;

import org.mule.api.MuleEventContext;
import org.mule.api.lifecycle.Callable;
import org.mule.api.schedule.Scheduler;

public class StartScheduler extends SchedulerWrapper implements Callable {

	@Override
	public Object onCall(MuleEventContext eventContext) throws Exception {
		
		Scheduler scheduler = this.getScheduler(eventContext);
		this.startScheduler(scheduler);
		return eventContext.getMessage().getPayload();
	}
}
package com.ricston.blog.example;

import org.mule.api.MuleEventContext;
import org.mule.api.lifecycle.Callable;
import org.mule.api.schedule.Scheduler;

public class StopScheduler extends SchedulerWrapper implements Callable {

	@Override
	public Object onCall(MuleEventContext eventContext) throws Exception {
		
		Scheduler scheduler = this.getScheduler(eventContext);
		this.stopScheduler(scheduler);
		return eventContext.getMessage().getPayload();
	}

}
package com.ricston.blog.example;

import org.mule.api.MuleEventContext;
import org.mule.api.lifecycle.Callable;
import org.mule.api.schedule.Scheduler;

public class ScheduleScheduler extends SchedulerWrapper implements Callable {

	@Override
	public Object onCall(MuleEventContext eventContext) throws Exception {
		Scheduler scheduler = getScheduler(eventContext);
		this.scheduleScheduler(scheduler);
		return eventContext.getMessage().getPayload();
	}

}

The following is the whole batch job for the second example:

<spring:beans>
	<spring:bean id="dataSource" class="org.springframework.jdbc.datasource.SimpleDriverDataSource">
		<spring:property name="driverClass" value="org.apache.derby.jdbc.EmbeddedDriver"/>
		<spring:property name="url" value="jdbc:derby:target/database/message;create=true"/>
		<spring:property name="username" value="user"/>
		<spring:property name="password" value="password"/>
	</spring:bean>
</spring:beans>
	<db:generic-config name="Generic_Database_Configuration"
		dataSource-ref="dataSource" doc:name="Generic Database Configuration" />

	<batch:job name="batch-example-two">
		<batch:input>
			<poll>
				<schedulers:cron-scheduler expression="0 0/1 * 1/1 * ? *" />
				<watermark variable="Id" default-expression="#[0]"
					update-expression="#[flowVars['myId']]" />
					<db:select config-ref="Generic_Database_Configuration"
						doc:name="Database">
						<db:parameterized-query><![CDATA[SELECT key1, key2 FROM table1 WHERE key1 BETWEEN #[flowVars['Id'] +1 ] AND #[flowVars['Id'] + 10 ] order by key1 asc]]></db:parameterized-query>
					</db:select>
			</poll>
			<component class="com.ricston.blog.examples.StopScheduler"
				doc:name="Stop scheduler" />
			<choice>
				<when expression="#[payload.size() > 0]">
					<set-variable variableName="myId" value="#[payload[payload.size()-1]['key1']]" doc:name="set id" />
				</when>
				<otherwise>
					<set-variable variableName="myId" value="#[0]" doc:name="set id" />
				</otherwise>
			</choice>

		</batch:input>
		<batch:process-records>
			<batch:step name="Batch_Step">
				<batch:commit size="10">
					<db:insert config-ref="Generic_Database_Configuration"
						doc:name="Database" bulkMode="true">
						<db:parameterized-query><![CDATA[INSERT into table2(KEY1, KEY2) values(#[payload['key1']], #[payload['key2']])]]></db:parameterized-query>
					</db:insert>
				</batch:commit>
			</batch:step>
			<batch:step name="BatchFailed" accept-policy="ONLY_FAILURES">
				<logger doc:name="Logger" level="ERROR"
					message="Record with the following payload has failed. Payload:: #[message.payload]" />
			</batch:step>
		</batch:process-records>
		<batch:on-complete>
			<logger level="INFO" doc:name="Logger" message="Stopping Inputblock" />
			<choice>
				<when expression="#[payload.getProcessedRecords() > 0]">
					<component class="com.ricston.blog.example.ScheduleScheduler"
						doc:name="Schedule Scheduler" />
					<component class="com.ricston.blog.example.StartScheduler"
						doc:name="Start Scheduler" />
				</when>
				<otherwise>
					<!-- get scheduler running -->
					<component class="com.ricston.blog.example.StartScheduler"
						doc:name="Start Scheduler" />
				</otherwise>

			</choice>
			<logger message="Number of failed Records: #[payload.failedRecords] "
				level="INFO" doc:name="Failed Records" />
			<logger message="Number of sucessfull Records: #[payload.successfulRecords]"
				level="INFO" doc:name="Sucessfull Records" />
			<logger message="ElapsedTime #[payload.getElapsedTimeInMillis()]"
				level="INFO" doc:name="Elapsed Time" />
		</batch:on-complete>
	</batch:job>

Надеюсь, вам понравился этот пост.