Компонент seda в Apache Camel очень похож на direct компонент, который я представил в предыдущем блоге, но асинхронно. Для этого он использует java.util.concurrent.BlockingQueue качестве реализации по умолчанию, чтобы ставить сообщения в очередь и отключаться от основного потока Route а затем обрабатывать сообщения в отдельном потоке. Из-за этого BlockingQueue вам необходимо знать о параметрах использования и конфигурации.
Один из вариантов должен знать об асинхронной обработке — это то, что по умолчанию размер очереди не связан, то есть он будет увеличиваться настолько, насколько позволяет ваша память. Чтобы ограничить это, установите size=1000 . Давайте посмотрим на пример.
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
|
package camelcoredemo;import org.slf4j.*;import org.apache.camel.*;import org.apache.camel.builder.*;import org.apache.camel.main.Main;import java.io.*;public class SedaDemoCamel extends Main { static Logger LOG = LoggerFactory.getLogger(SedaDemoCamel.class); public static void main(String[] args) throws Exception { SedaDemoCamel main = new SedaDemoCamel(); main.enableHangupSupport(); main.addRouteBuilder(createRouteBuilder1()); main.addRouteBuilder(createRouteBuilder2()); main.addRouteBuilder(createRouteBuilder3()); main.run(args); } // The file poller route static RouteBuilder createRouteBuilder1() { return new RouteBuilder() { public void configure() { from("file://target/input?preMove=staging&move=.processed") .process(new Processor() { public void process(Exchange msg) { CamelContext camelContext = msg.getContext(); ProducerTemplate producer = camelContext.createProducerTemplate(); String text = msg.getIn().getBody(String.class); String fileName = (String)msg.getIn().getHeader("CamelFileName"); boolean specialFile = fileName.endsWith("_SPECIAL.dat"); if (specialFile) producer.sendBody("seda:specialRoute", text); else producer.sendBody("seda:normalRoute", text); } }); } }; } // The special file processing route static RouteBuilder createRouteBuilder2() { return new RouteBuilder() { public void configure() { from("seda:specialRoute") .process(new Processor() { public void process(Exchange msg) { LOG.info("Processing special file: " + msg); } }); } }; } // The normal file processing route static RouteBuilder createRouteBuilder3() { return new RouteBuilder() { public void configure() { from("seda:normalRoute") .process(new Processor() { public void process(Exchange msg) { LOG.info("Processing normal file: " + msg); } }); } }; }} |
Вы заметите, что этот демонстрационный код очень похож на демонстрационный пример с direct компонентом, с небольшими отличиями. Во-первых, мы используем seda точки seda . Во-вторых, в файле poller мы читаем весь текст содержимого файла. Мы делаем это, потому что теперь мы переходим к асинхронному Route который будет работать в отдельных потоках. Опросчик настроен на перемещение обработанного файла в другую папку сразу после окончания первого Route . Поэтому мы должны убедиться, что Route обработки не зависит от пути к файлу, поэтому вместо этого мы загрузим весь текст.
Еще одна интересная опция seda — вы можете установить количество одновременных потоков для получения сообщений для их обработки! Допустим, если у ваших обычных файлов большой трафик, вы можете настроить использование большего количества потоков в этой части (по умолчанию это всего один поток).
|
1
2
3
4
5
6
|
from("seda:normalRoute?concurrentConsumers=10").process(new Processor() { public void process(Exchange msg) { LOG.info("Processing normal file: " + msg); }}); |
Чтобы убедиться, что вы работаете одновременно, вы можете легко настроить регистратор на отображение имени потока. Например, с log4j , вы можете использовать этот шаблон:
|
1
2
3
4
|
log4j.rootLogger=INFO, stdoutlog4j.appender.stdout=org.apache.log4j.ConsoleAppenderlog4j.appender.stdout.layout=org.apache.log4j.PatternLayoutlog4j.appender.stdout.layout.ConversionPattern=%d %p %t [%c] - %m%n |
В компоненте Seda доступны дополнительные опции, которые вы можете изучить. Попробуйте это с маршрутом и убедитесь сами.