Компонент seda
в Apache Camel очень похож на direct
компонент, который я представил в предыдущем блоге, но асинхронно. Для этого он использует java.util.concurrent.BlockingQueue
качестве реализации по умолчанию, чтобы ставить сообщения в очередь и отключаться от основного потока Route
а затем обрабатывать сообщения в отдельном потоке. Из-за этого BlockingQueue
вам необходимо знать о параметрах использования и конфигурации.
Один из вариантов должен знать об асинхронной обработке — это то, что по умолчанию размер очереди не связан, то есть он будет увеличиваться настолько, насколько позволяет ваша память. Чтобы ограничить это, установите size=1000
. Давайте посмотрим на пример.
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
|
package camelcoredemo; import org.slf4j.*; import org.apache.camel.*; import org.apache.camel.builder.*; import org.apache.camel.main.Main; import java.io.*; public class SedaDemoCamel extends Main { static Logger LOG = LoggerFactory.getLogger(SedaDemoCamel. class ); public static void main(String[] args) throws Exception { SedaDemoCamel main = new SedaDemoCamel(); main.enableHangupSupport(); main.addRouteBuilder(createRouteBuilder1()); main.addRouteBuilder(createRouteBuilder2()); main.addRouteBuilder(createRouteBuilder3()); main.run(args); } // The file poller route static RouteBuilder createRouteBuilder1() { return new RouteBuilder() { public void configure() { from( "file://target/input?preMove=staging&move=.processed" ) .process( new Processor() { public void process(Exchange msg) { CamelContext camelContext = msg.getContext(); ProducerTemplate producer = camelContext.createProducerTemplate(); String text = msg.getIn().getBody(String. class ); String fileName = (String)msg.getIn().getHeader( "CamelFileName" ); boolean specialFile = fileName.endsWith( "_SPECIAL.dat" ); if (specialFile) producer.sendBody( "seda:specialRoute" , text); else producer.sendBody( "seda:normalRoute" , text); } }); } }; } // The special file processing route static RouteBuilder createRouteBuilder2() { return new RouteBuilder() { public void configure() { from( "seda:specialRoute" ) .process( new Processor() { public void process(Exchange msg) { LOG.info( "Processing special file: " + msg); } }); } }; } // The normal file processing route static RouteBuilder createRouteBuilder3() { return new RouteBuilder() { public void configure() { from( "seda:normalRoute" ) .process( new Processor() { public void process(Exchange msg) { LOG.info( "Processing normal file: " + msg); } }); } }; } } |
Вы заметите, что этот демонстрационный код очень похож на демонстрационный пример с direct
компонентом, с небольшими отличиями. Во-первых, мы используем seda
точки seda
. Во-вторых, в файле poller мы читаем весь текст содержимого файла. Мы делаем это, потому что теперь мы переходим к асинхронному Route
который будет работать в отдельных потоках. Опросчик настроен на перемещение обработанного файла в другую папку сразу после окончания первого Route
. Поэтому мы должны убедиться, что Route
обработки не зависит от пути к файлу, поэтому вместо этого мы загрузим весь текст.
Еще одна интересная опция seda
— вы можете установить количество одновременных потоков для получения сообщений для их обработки! Допустим, если у ваших обычных файлов большой трафик, вы можете настроить использование большего количества потоков в этой части (по умолчанию это всего один поток).
1
2
3
4
5
6
|
from( "seda:normalRoute?concurrentConsumers=10" ) .process( new Processor() { public void process(Exchange msg) { LOG.info( "Processing normal file: " + msg); } }); |
Чтобы убедиться, что вы работаете одновременно, вы можете легко настроить регистратор на отображение имени потока. Например, с log4j
, вы можете использовать этот шаблон:
1
2
3
4
|
log4j.rootLogger=INFO, stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%d %p %t [%c] - %m%n |
В компоненте Seda доступны дополнительные опции, которые вы можете изучить. Попробуйте это с маршрутом и убедитесь сами.