Статьи

Простая и быстрая параллельная обработка (шаблон Fork-Join) для Mule

За время моей работы в качестве консультанта я узнал, что один из самых популярных шаблонов, которые запрашивают клиенты, — это шаблон fork-join. По сути, идея заключается в том, что в середине потока Мула нам нужно что-то сделать параллельно, дождаться завершения всех путей и продолжить.

Зачем вам нужен шаблон соединения вилки? Есть по крайней мере сто одна причина, но позвольте мне привести небольшой пример. Представьте, что у вас есть поток, в котором вам необходимо выполнить поиск в Интернете, возможно, позвонить в 2 различных внешних веб-сервиса. Поскольку внешние веб-службы, как правило, работают медленнее по сравнению с вызовом внутренних служб, вы можете решить выполнить этот поиск параллельно, чтобы сократить время отклика. Как только вы соберете данные поиска из обеих служб, вы продолжите свой поток. В основном здесь мы ищем сокращение времени отклика.

Очень хорошее объяснение того , как это в настоящее время может быть достигнуто в Mule можно найти здесь .

Это все хорошо, но я считаю, что это слишком сложно реализовать для простого шаблона. И сложность обычно также означает худшую производительность, более высокие затраты на внедрение и более высокие затраты на обслуживание. Я знал, что должен был что-то сделать, и я сделал.

Сначала я создал простой поток с шаблоном fork-join, как вы это обычно делаете в Mule, и провел стресс-тестирование. Я обнаружил, что в среднем, просто чтобы выполнить форк-соединение, Мул занимал около 100 мс при небольшой нагрузке на Mac с двухъядерным процессором HyperThreaded i7. В моих книгах это выглядело немного на высокой стороне.

Затем я решил внедрить специальный процессор сообщений (весь код доступен в GIST в конце этого блога). Этот обработчик сообщений довольно прост. В качестве конфигурации он принимает список обработчиков сообщений. Идея состоит в том, что каждый из этих обработчиков сообщений получит копию текущего сообщения, а затем они выполняются параллельно. Потоки для выполнения берутся из пула потоков, который также настраивается. Результатом обработчика сообщений является MuleMessageCollection, который содержит список MuleMessages с ответом от каждого выполнения в том же порядке, что и настроенный. Следующий GIST показывает пример того, как вы настроите этот процессор сообщений.

<flow name="parallelWsLookupFlow" doc:name="parallelWsLookupFlow">
	<enricher doc:name="Parallel Lookups">
		<processor-chain>
	        	<custom-processor class="com.ricston.processor.ParallelMessageProcessor" doc:name="ParallelMessageProcessor">
	        		<spring:property name="processors">
	        			<spring:list>
	        				<spring:ref bean="lookupWs1"/>
	        				<spring:ref bean="lookupWs2"/>
	        			</spring:list>
	        		</spring:property>
	        		<spring:property name="maxActive" value="100"/>
	        	</custom-processor>
	        	<combine-collections-transformer />
		</processor-chain>
	    <enrich target="#[flowVars.lookup0]" source="#[message.payload[0]]" />
	    <enrich target="#[flowVars.lookup1]" source="#[message.payload[1]]" />
	</enricher>
</flow>
    
<flow name="lookupWs1" doc:name="lookupWs1">
	<cxf:jaxws-client 
		clientClass="${lookupWs1.clientClass}" 
		operation="${lookupWs1.operation}" port="${lookupWs1.port}" >
	</cxf:jaxws-client>
	<http:outbound-endpoint exchange-pattern="request-response" 
		host="${lookupWs1.host}" port="${lookupWs1.port}" path="${lookupWs1.path}"
		method="POST" responseTimeout="${lookupWs1.responsetimeout}" doc:name="HTTP" />
</flow>

<flow name="lookupWs2" doc:name="lookupWs2">
	<cxf:jaxws-client 
		clientClass="${lookupWs2.clientClass}" 
		operation="${lookupWs2.operation}" port="${lookupWs2.port}" >
	</cxf:jaxws-client>
	<http:outbound-endpoint exchange-pattern="request-response" 
		host="${lookupWs2.host}" port="${lookupWs2.port}" path="${lookupWs2.path}"
		method="POST" responseTimeout="${lookupWs2.responsetimeout}" doc:name="HTTP" />
</flow>

Здесь мы вызываем два веб-сервиса параллельно, сохраняя результат каждого поиска в переменной потока (с помощью обогащения ). Как видите, ParallelMessageProcessor принимает список MessageProcessor, в этом случае они называются lookupWs1 и lookupWs2. Этими обработчиками сообщений могут быть любые виды обработчиков сообщений, но здесь мы вызываем другие потоки, которые внутренне вызывают внешние веб-службы.

Как только мы получим ответ от каждого отдельного веб-сервиса, мы используем объединение-коллекций-преобразователей для преобразования MuleMessageCollection в MuleMessage, а затем принимаем каждый отдельный результат, используя MEL. Максимальное количество активных потоков во внутреннем пуле потоков также настраивается.

Следующий GIST является кодом для этого обработчика сообщений. Как вы видите, это очень просто и легко следовать. Единственное ограничение заключается в том, что нам нужна Java 1.6 (или выше), поскольку мы используем ThreadPoolExecutor для параллельного выполнения обработчиков сообщений. Сказав это, на самом деле это не проблема, так как это стало требованием для более поздних версий Mule.

The process() method is where all the magic happens. For each message processor configured, we create a ProcessorRunner which clones the current event and uses it to execute the message processor. Once the ProcessorRunner is created, we use the invokeAll() available on the ThreadPoolExecutor.

The invokeAll() returns immediately with a list of Future objects, used to get the response, and of course, wait where necessary.

Using the exact same tests as before, the new ParallelMessageProcessor happily invoked the web services in parallel, and the fork-join pattern was taking less than 5ms under the same load.

Hope you find this ParallelMessageProcessor as useful as I did!

package com.ricston.processor;

/**
 * imports
 * /

/**
 * ParallelMessageProcessor is a message processor that will execute the inner message 
 * processors in parallel. Before returning, it will wait for all the processors to finish
 * execution. This implements the fork-join pattern.
 * 
 * The result will be a MuleMessageCollection with an entry for each inner 
 * message processor, in the same order they were listed in processors. 
 * In simple terms, this is very similar to a what a parallel ALL could look like.
 * 
 * The message that comes in is replicated and routed to each message processor. However
 * all the inner message processors are executed using a thread pool. The thread pool is
 * configurable.
 * 
 * Usages: need to do multiple lookups in parallel, like web service calls, jdbc calls...
 */
public class ParallelMessageProcessor implements MessageProcessor, MuleContextAware, Initialisable
{
    private Log logger = LogFactory.getLog(getClass());
    /**
     * List of MessageProcessors to be executed in parallel 
     */
    private List<MessageProcessor> processors;
    
    /**
     * MuleContext used to create events, messages ...
     */
    private MuleContext muleContext;
    
    /**
     * ThreadPoolExecutor to be used to run the procesors in parallel
     */
    protected ThreadPoolExecutor threadPool;
    
    /**
     * Max threads active in the pool
     */
    private int maxActive = 100;
    
    /**
     * The length of the queue used to queue the work for the thread pool
     */
    private int queueLength = 1000000;
    
    
    public ParallelMessageProcessor()
    {
        super();
    }
    
    /**
     * Initialse the thread pool
     */
    @Override
    public void initialise() throws InitialisationException
    {
        BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(queueLength);
        threadPool = new ThreadPoolExecutor(maxActive, maxActive, 10000, TimeUnit.MILLISECONDS, queue);
    }

    /**
     * Send the message to all processors to be processed in parallel using the pool. Also
     * wait for all processors to finish processing, and return the result in a 
     * MuleMessageCollection, in the same order as the processors were configured.
     */
    @Override
    public MuleEvent process(MuleEvent event) throws MuleException
    {
        //create list of ProcessorRunner, each one will execute a message processor
        int noOfProcessors = processors.size();
        List<ProcessorRunner> threads = new ArrayList<ProcessorRunner>(noOfProcessors);
        
        //create a MuleMessageCollection, to be used to return the results
        MuleMessageCollection resultMessages = new DefaultMessageCollection(muleContext);

        try
        {
            //create a ProcessorRunner for each message processor, initialising it 
            //with the message processor to execute, and the current MuleEvent
            for (MessageProcessor mp : processors)
            {
                ProcessorRunner t = new ProcessorRunner(mp, event);
                threads.add(t);
            }
            
            logDebugStart();
            
            //invoke the message processors using the thread pool
            List<Future<MuleEvent>> future = threadPool.invokeAll(threads);
            
            //collect the results into a MuleMessageCollection, wait if necessary
            for (Future<MuleEvent> f : future)
            {
                resultMessages.addMessage(f.get().getMessage());
            }
            
            logDebugEnd(resultMessages);
        }
        catch (InterruptedException e)
        {
            throw new MessagingException(event, e);
        }
        catch (Exception e)
        {
            throw new MessagingException(event, e);
        }

        //return the MuleMessageCollection as a result 
        return new DefaultMuleEvent(resultMessages, event);
    }
    
    protected void logDebugStart()
    {
        if (logger.isDebugEnabled())
        {
            
            logger.debug("Firing parallel requests");
        }
    }
    
    protected void logDebugEnd(MuleMessageCollection resultMessages)
    {
        if (logger.isDebugEnabled())
        {
            logger.debug("Collected " + resultMessages.getMessagesAsArray().length + " messages");
        }
    }

    @Override
    public void setMuleContext(MuleContext context)
    {
        this.muleContext = context;
    }

    /**
     * getters and setters here
     * /

}
package com.ricston.processor;

/**
 * imports
 * /

/**
 * A simple class implementing Callable which will execute a MessageProcessor
 * given a MuleEvent as input. the Callable and MuleEvent are configured on 
 * creation of this class, or usings the setter methods.
 */
public class ProcessorRunner implements Callable<MuleEvent>
{
    /**
     * The MessageProcessor to be executed
     */
    private MessageProcessor processor;
    
    /**
     * The MuleEvent to be passed on to the MessageProcessor
     */
    private MuleEvent event;
    
    public ProcessorRunner()
    {
        super();
    }
    
    /**
     * Initialise the class with the given parameters.
     * 
     * @param processor The MessageProcessor to be executed
     * @param event The MuleEvent to be passed to the processor
     */
    public ProcessorRunner(MessageProcessor processor, MuleEvent event)
    {
        this();
        this.processor = processor;
        this.event = event;
    }
    
    /**
     * Create a clone of the MuleEvent passed in (no event can be shared with
     * multiple threads) and execute the processor using the cloned event.
     */
    @Override
    public MuleEvent call() throws Exception
    {
        try
        {
            MuleEvent clonedEvent = DefaultMuleEvent.copy(event);
            MuleEvent result = this.processor.process(clonedEvent);
            
            return result;
        }
        catch (MuleException e)
        {
            throw new RuntimeException(e);
        }
    }
    
    /**
     * getters and setters
     * /

}