Статьи

Как создать пакетный облачный коннектор

Когда мы объявили о выпуске в декабре 2013 года , захватывающая новая функция также увидела дневной свет: пакетный модуль . Если вы еще не читали статью, в которой описаны основные особенности функции, вам следует это сделать, но сегодня я хотел бы сосредоточиться на том, как блок <batch: commit> взаимодействует с коннекторами Anypoint ™, и, в частности, на том, как вы можете использовать свои собственные коннекторы для воспользоваться этой функцией.

Обзор <batch: commit>

В двух словах, вы можете использовать блок Batch Commit для сбора подмножества записей для массовой загрузки во внешний источник или службу. Например, вместо того, чтобы загружать каждый отдельный контакт (т. Е. Запись) в контакты Google , вы можете настроить пакетную фиксацию для сбора, скажем, 100 записей, а затем поместить их все в контакты Google в одном блоке. На шаге пакета — единственном месте, где вы можете его применить — вы можете использовать пакетный коммит, чтобы обернуть обработчик исходящих сообщений. Смотрите пример ниже:

Смешивание в разъемах

Это все здорово, но при чем тут разъемы? Ну, единственная причина, почему приведенный выше пример вообще имеет какой-либо смысл, заключается в том, что Google Connector способен выполнять массовые операции. Если соединитель поддерживает только обновление записей по одной, тогда не будет причин для существования <batch: commit>.

Но ждать! Пакетный модуль был выпущен только два месяца назад, но коннекторы, такие как Google Contacts, Salesforce , Netsuite и т. Д., Много лет работали с большими объемами ! Правда что. Но то, чего у нас не было, пока не появился Batch, — это конструкция, позволяющая нам обрабатывать ошибки на уровне записей.

Предположим, вы загружаете 200 записей в Salesforce. В прошлом, если 100 из них выходили из строя, а остальные 100 были успешными, вам нужно было проанализировать ответ соединителя, вытащить отказавший из успешного завершения и предпринять соответствующие действия. Если вы захотите сделать то же самое с контактами Google, вам снова придется делать все заново, с дополнительной сложностью, с которой вы не сможете повторно использовать свой код, потому что API Google и Salesforce используют совершенно разные представления для уведомления о результате операции.

Наша цель с пакетным модулем ясна: упростить этот процесс . Мы больше не хотим, чтобы вы пытались выяснить представление каждого API для массового результата и обрабатывать каждую неудачную запись независимо — теперь вы можете положиться на <batch: commit>, чтобы сделать это автоматически.

Это не волшебство

«Вид магии» — одна из моих любимых песен Queen, особенно живое выступление на стадионе Уэмбли в 86 году. Хотя магия, описанная в этой песне, не относится к механизмам пакетной обработки и коннектора, в этой песне есть одна фраза, которая точно описывает проблему: «Может быть только одна».

Если мы хотим, чтобы пакетный модуль понимал все типы результатов массовых операций, нам нужно начать с определения канонического способа его представления. Мы сделали это в классе BulkOperationResult, который определяет следующий контакт:

/**
 * This class is used to provide item level information about a bulk operation. This
 * master entity represents the bulk operation as a whole, while the detail entity
 * {@link BulkItem} represents the operation status for each individual data piece.
 * The {@link #items} list defines a contract in which the ordering of those items
 * needs to match the ordering of the original objects. For example, if the bulk
 * operation consisted of 10 person objects in which number X corresponded to the
 * person 'John Doe', then the Xth item in the {@link #items} list must reference to
 * the result of procesing the same 'John Doe'
 */
public final class BulkOperationResult<T> implements Serializable
{
    /**
     * The operation id
     */
    public Serializable getId();
    
    /**
     * Whether or not the operation was successful. Should be true if
     * and only if all the child {@link BulkItem} entities were also successful
     */
    public boolean isSuccessful();
    
    /**
     * An ordered list of {@link BulkItem}, one per each item in the original
     * operation, no matter if the record was successful or not
     */
    public List<BulkItem<T>> getItems();
    
    /**
     * A custom property stored under the given key
     * 
     * @param key the key of the custom property
     * @return a {@link Serializable} value
     */
    public Serializable getCustomProperty(String key);
}

По сути, вышеуказанный класс является отношением Master-Detail, в котором:

  • BulkOperationResult представляет операцию в целом, играющую роль мастера
  • BulkItem представляет результат для каждой отдельной записи, играя роль детализации
  • Оба класса неизменны
  • Между мастером и деталями есть упорядоченность. Первый элемент в списке BulkItem должен соответствовать первой записи в исходном массиве. Второе должно соответствовать второму и так далее.

Если вам интересно, вот так выглядит контакт BulkItem:

/**
 * This class represents an individual data piece in the context of a bulk operation
 */
public final class BulkItem<T> implements Serializable
{
    /**
     * The item id
     */
    public Serializable getId();

    /**
     * Wether or not it was successful. Notice that this should be false
     * if {@link #exception} is not null, however there might not be an
     * exception but the item could still not be successful for other reasons.
     */
    public boolean isSuccessful();

    /**
     * Message to add context on this item. Could be an error description, a warning
     * or simply some info related to the operation
     */
    public String getMessage();

    /**
     * An optional status code
     */
    public String getStatusCode();
    
    /**
     * An exception if the item was failed
     */
    public Exception getException();

    /**
     * The actual data this entity represents
     */
    public T getPayload();

    /**
     * A custom property stored under the given key
     * 
     * @param key the key of the custom property
     * @return a {@link Serializable} value
     */
    public Serializable getCustomProperty(String key);
}

Итак, это все? Мы просто модифицируем все коннекторы, чтобы они возвращали объект BulkOperationResult для всех массовых операций, и все готово? Не совсем. Это было бы рекомендуемой практикой для новых соединителей, движущихся вперед, но для существующих соединителей мы бы нарушили обратную совместимость с любым существующим приложением Mule, написанным до выпуска модуля Batch, которые вручную обрабатывают вывод массовых операций.

В этих случаях мы сделали так, чтобы эти разъемы регистрировали трансформатор. Поскольку каждый соединитель обязан понимать домен каждого API, имеет смысл попросить каждый соединитель преобразовать свое собственное представление массовых операций в объект BulkOperationResult.

Давайте посмотрим на пример. Это подпись для операции в соединителе контактов Google, которая выполняет массовую операцию:

public List<BatchResult> batchContacts(String batchId, List<NestedProcessor> operations) throws Exception;

Давайте забудем о реализации метода прямо сейчас. Вывод из вышеприведенного фрагмента заключается в том, что операция вернет список объектов BatchResult. Давайте посмотрим, как зарегистрировать преобразователь, который переходит от этого к BulkOperationResult:

@Start
public void init() {
	this.muleContext.getRegistry().registerTransformer(new BatchResultToBulkOperationTransformer());
}

И для большого финала, код самого преобразователя:

public class BatchResultToBulkOperationTransformer extends AbstractDiscoverableTransformer {

	public BatchResultToBulkOperationTransformer() {
		this.registerSourceType(DataTypeFactory.create(List.class, BatchResult.class, null));
		this.setReturnDataType(DataTypeFactory.create(BulkOperationResult.class));
	}
	
	@Override
	protected Object doTransform(Object src, String enc) throws TransformerException {
		List<BatchResult> results = (List<BatchResult>) src;
		
		BulkOperationResultBuilder<BaseEntry<?>> builder = BulkOperationResult.<BaseEntry<?>>builder();
		
		if (results != null) {
			for (BatchResult result : results) {
				BatchStatus status = result.getStatus();
				int code = status.getCode();
				
				builder.addItem(BulkItem.<BaseEntry<?>>builder()
						.setRecordId(result.getId())
						.setPayload(result.getEntry())
						.setMessage(status.getContent())
						.setStatusCode(String.format("%d - %s", code, status.getReason()))
						.setSuccessful(code == 200 || code == 201 || code == 204)
					);
			}
		}
		
		return builder.build();
	}

}

Важные вещи, на которые следует обратить внимание в отношении вышеуказанного трансформатора:

  • Это расширяет AbstractDiscoverableTransformer. Это сделано для того, чтобы пакетный модуль мог динамически находить его во время выполнения.
  • Он определяет исходный и целевой типы данных в своем конструкторе
  • Метод doTransform () делает «магию»
  • Обратите внимание, как классы BulkOperationResult и BulkItem предоставляют удобные объекты Builder для отделения их внутренних представлений от кода вашего коннектора

И это в значительной степени это! Последнее, что нужно принять во внимание: что произойдет, если я использую массовую операцию в <batch: commit> с использованием соединителя, который не поддерживает сообщение BulkOperationResult? Ну, в этом случае у вас есть два варианта:

  • Напишите преобразователь и зарегистрируйте его самостоятельно на уровне приложения
  • Просто пусть так и будет, и в случае исключения, пакет не выполнит все записи

Завершение

In this article we discussed why it’s important for connectors to support bulk operations whenever possible (some APIs just can’t do it, that’s not your fault). For new connectors, we advice to always return instances of the canonical BulkOperationResult class. If you want to add batch support to an existing connector without breaking backwards compatibility, we covered how to register discoverable transformers to do the trick.