Статьи

Реализация автоматического выключателя для определения ваших собственных шаблонов интеграции


Один из моих любимых
шаблонов из отличного релиза Майкла Найгарда
!  это
выключатель цепи .
Автоматический выключатель представляет собой автоматический выключатель , который останавливает поток электроэнергии в случае отказа. Такое поведение также полезно при интеграции с удаленными системами.

Возможно, мы захотим остановить доставку сообщений на исходящую конечную точку после возникновения определенного исключения. Хорошим примером является удаленная система под нагрузкой или цель подключения типа «отказ в обслуживании». В этом случае было бы неплохо автоматически остановить доставку на определенный период времени, чтобы не усугубить ситуацию.

Конфигурирование выключателя

Мы будем использовать  DevKit  для реализации автоматического выключателя и запуска с помощью Maven для создания модуля Mule для процессоров автоматического выключателя.

mvn archetype:generate -DarchetypeGroupId=org.mule.tools.devkit -DarchetypeArtifactId=mule-devkit-archetype-generic -DarchetypeVersion=3.0.1 -DarchetypeRepository=http://repository.mulesoft.org/releases/ -DgroupId=com.acmesoft -DartifactId=circuit-breaker-module -Dversion=1.0-SNAPSHOT -DmuleVersion=3.2.0 -DmuleModuleName=CircuitBreaker -DmuleModulePackage=com.acmesoft.integration

Как только это будет сделано, мы можем определить, как автоматический выключатель будет настроен в потоке. Посмотрите на следующую конфигурацию:

<circuitbreaker:config breakerName="testingBreaker" tripThreshold="5" tripTimeout="300000"/>

<flow name="testFlow">
    <vm:inbound-endpoint path="vm.in"/>
    <circuitbreaker:filter/>
    <test:component throwException="true"/>
    <vm:outbound-endpoint path="vm.out"/>
    <default-exception-strategy>
        <circuitbreaker:trip tripOnException="org.mule.tck.exceptions.FunctionalTestException"/>
    </default-exception-strategy>
</flow>

В верхней части файла мы указываем имя выключателя, сколько «отключений» до тех пор, пока выключатель не будет открыт, и тайм-аут для автоматического отключения отключения. Мы будем использовать искусственный поток для целей тестирования, используя компонент test: для создания исключения.

Фильтр настраивается перед компонентом. Если автоматический выключатель «разомкнут», то сообщения не будут отправлены компоненту test :. Отключающий процессор отвечает за размыкание выключателя, если выброшено достаточное количество исключений FunctionalTestException. Эта конфигурация останавливает передачу сообщений в тестовый компонент: в течение 5 минут после того, как было сгенерировано 5 FunctionalTestException.

Внедрение выключателя

Теперь, когда мы знаем, как будет выглядеть наша конфигурация, мы можем сосредоточиться на реализации. Полная реализация модуля здесь , но мы будем рассматривать его по одному. Важно отметить, что эти реализации — все, что требуется для поддержки приведенной выше конфигурации. Пользовательские пространства имен или реализации Spring NamespaceHandler не требуются. Мы просто соответствующим образом комментируем наши методы, а Mule и mvn делают все остальное.

Методы trip () и filter ()

Методы trip () и filter () в CircuitBreakerModule.java предоставляют реализации для обработчиков сообщений, которые мы только что видели.

Давайте сначала взглянем на trip (), метод, который открывает прерыватель:

 /**
     * Custom processor
     * <p/>
     * {@sample.xml ../../../doc/CircuitBreaker-connector.xml.sample circuitbreaker:trip}
     *
     * @param exceptionMessage The exception.
     * @param tripOnException  The exception type we should trip on.
     * @return Some string
     */
    @Processor
    public Object trip(String tripOnException, @Payload ExceptionMessage exceptionMessage) {

        if (exceptionMessage.getException().getCause().getClass().getCanonicalName().equals(tripOnException)) {
            incrementFailureCount();
            if (getFailureCount() == tripThreshold) {
                breakerTrippedOn = new Date();
            }
        }
        return exceptionMessage;
    }

Этот процессор вызывается в стратегии исключений и будет увеличивать счетчик ошибок при возникновении указанного исключения. Если faultCount равен tripThreshold, мы также устанавливаем breakerTrippedOn на текущее время. getFailureCount () получает свое значение из  ObjectStore  . Мы посмотрим, как это будет реализовано в ближайшее время. Сначала давайте посмотрим, как реализован фильтр:

/**
     * Custom processor
     * <p/>
     * {@sample.xml ../../../doc/CircuitBreaker-connector.xml.sample circuitbreaker:filter}
     *
     * @param payload The message payload
     * @return Some string
     */
    @Processor
    public Object filter(@Payload Object payload) {
        if (getFailureCount() < tripThreshold) {
            return payload;
        } else if (breakerTrippedOn != null && System.currentTimeMillis() - breakerTrippedOn.getTime() > tripTimeout) {
            breakerTrippedOn = null;
            resetFailureCount();
            return payload;
        } else {
            throw new CircuitOpenException();
        }
    }

Процессор фильтра сгенерирует исключение CircuitOpenException, если failCount превысит tripThreshold . Если значение tripTimeout превышено, то failCount сбрасывается, и сообщения пропускаются. Бросив CircuitOpenException дает нам гибкость в обработке сообщений , которые были заблокированы. Мы могли бы, например, поместить эти сообщения в очередь повторов, чтобы попытаться доставить их позже, после того как прерыватель был открыт.

Сохранение состояния выключателя в ObjectStore

Эта реализация использует постоянный Mule ObjectStore по умолчанию для хранения   failCount.  Следующие вспомогательные методы позволяют процессорам отслеживать это состояние.

Integer getFailureCount() {
        try {
            objectStoreMutex.acquire();
        } catch (InterruptedException e) {
            logger.error("Could not acquire mutex", e);
        }

        ObjectStore objectStore = objectStoreManager.getObjectStore(MuleProperties.OBJECT_STORE_DEFAULT_PERSISTENT_NAME);

        String key = String.format("%s.failureCount", breakerName);

        Integer failureCount = 0;
        try {
            if (objectStore.contains(key)) {
                failureCount = (Integer) objectStore.retrieve(key);
            }
        } catch (Exception e) {
            logger.error("Could not retrieve key from object-store: " + key, e);
        }

        objectStoreMutex.release();

        return failureCount;

    }

    void incrementFailureCount() {
        try {
            objectStoreMutex.acquire();
        } catch (InterruptedException e) {
            logger.error("Could not acquire mutex", e);
        }

        ObjectStore objectStore = objectStoreManager.getObjectStore(MuleProperties.OBJECT_STORE_DEFAULT_PERSISTENT_NAME);


        String key = String.format("%s.failureCount", breakerName);

        Integer failureCount = 0;
        try {
            if (objectStore.contains(key)) {
                failureCount = (Integer) objectStore.retrieve(key);
                objectStore.remove(key);
            }
            objectStore.store(key, failureCount + 1);
        } catch (Exception e) {
            logger.error("Could not retrieve key from object-store: " + key, e);
        }

        objectStoreMutex.release();
    }

    void resetFailureCount() {
        try {
            objectStoreMutex.acquire();
        } catch (InterruptedException e) {
            logger.error("Could not acquire mutex", e);
        }

        ObjectStore objectStore = objectStoreManager.getObjectStore(MuleProperties.OBJECT_STORE_DEFAULT_PERSISTENT_NAME);


        String key = String.format("%s.failureCount", breakerName);

        Integer failureCount = 0;
        try {
            if (objectStore.contains(key)) {
                failureCount = (Integer) objectStore.retrieve(key);
                objectStore.remove(key);
            }
            objectStore.store(key, 0);
        } catch (Exception e) {
            logger.error("Could not retrieve key from object-store: " + key, e);
        }

        objectStoreMutex.release();
    }

objectStoreMutex — это семафор с одним разрешением. Это сделано для того, чтобы избежать чтения и записи состояния гонки в хранилище объектов при одновременном доступе к автоматическому выключателю. Использование хранилища объектов дает нам возможность управлять прерывателем, возможно, используя JMX. Тот факт, что мы получаем постоянное хранилище по умолчанию, также означает, что состояние нашего автоматического выключателя будет сохраняться при перезапуске экземпляра Mule. Если ваше приложение работает в   кластере Mule HA,  состояние автоматического выключателя будет распределено по кластеризованным узлам.

Надеемся, что эта запись в блоге продемонстрировала, что Mule DevKit позволяет легко внедрять сложные приложения с поддержкой схем в ваши приложения Mule. Это позволяет вам определять свои собственные шаблоны интеграции, возможно, специфичные для вашей организации, и позволять им мирно сосуществовать с остальной частью экосистемы Мул.