Статьи

Изучение Apache Camel Core — компонент Seda

Компонент seda в Apache Camel очень похож на direct компонент, который я представил в предыдущем блоге, но асинхронно. Для этого он использует java.util.concurrent.BlockingQueue качестве реализации по умолчанию, чтобы ставить сообщения в очередь и отключаться от основного потока Route а затем обрабатывать сообщения в отдельном потоке. Из-за этого BlockingQueue вам необходимо знать о параметрах использования и конфигурации.

Один из вариантов должен знать об асинхронной обработке — это то, что по умолчанию размер очереди не связан, то есть он будет увеличиваться настолько, насколько позволяет ваша память. Чтобы ограничить это, установите size=1000 . Давайте посмотрим на пример.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
package camelcoredemo;
 
import org.slf4j.*;
import org.apache.camel.*;
import org.apache.camel.builder.*;
import org.apache.camel.main.Main;
import java.io.*;
 
public class SedaDemoCamel extends Main {
    static Logger LOG = LoggerFactory.getLogger(SedaDemoCamel.class);
    public static void main(String[] args) throws Exception {
        SedaDemoCamel main = new SedaDemoCamel();
        main.enableHangupSupport();
        main.addRouteBuilder(createRouteBuilder1());
        main.addRouteBuilder(createRouteBuilder2());
        main.addRouteBuilder(createRouteBuilder3());
        main.run(args);
    }
    // The file poller route
    static RouteBuilder createRouteBuilder1() {
        return new RouteBuilder() {
            public void configure() {
                from("file://target/input?preMove=staging&move=.processed")
                .process(new Processor() {
                    public void process(Exchange msg) {
                        CamelContext camelContext = msg.getContext();
                        ProducerTemplate producer = camelContext.createProducerTemplate();
                        String text = msg.getIn().getBody(String.class);
                        String fileName = (String)msg.getIn().getHeader("CamelFileName");
                        boolean specialFile = fileName.endsWith("_SPECIAL.dat");
                        if (specialFile)
                            producer.sendBody("seda:specialRoute", text);
                        else
                            producer.sendBody("seda:normalRoute", text);
                    }
                });
            }
        };
    }
    // The special file processing route
    static RouteBuilder createRouteBuilder2() {
        return new RouteBuilder() {
            public void configure() {
                from("seda:specialRoute")
                .process(new Processor() {
                    public void process(Exchange msg) {
                        LOG.info("Processing special file: " + msg);
                    }
                });
            }
        };
    }
    // The normal file processing route
    static RouteBuilder createRouteBuilder3() {
        return new RouteBuilder() {
            public void configure() {
                from("seda:normalRoute")
                .process(new Processor() {
                    public void process(Exchange msg) {
                        LOG.info("Processing normal file: " + msg);
                    }
                });
            }
        };
    }
}

Вы заметите, что этот демонстрационный код очень похож на демонстрационный пример с direct компонентом, с небольшими отличиями. Во-первых, мы используем seda точки seda . Во-вторых, в файле poller мы читаем весь текст содержимого файла. Мы делаем это, потому что теперь мы переходим к асинхронному Route который будет работать в отдельных потоках. Опросчик настроен на перемещение обработанного файла в другую папку сразу после окончания первого Route . Поэтому мы должны убедиться, что Route обработки не зависит от пути к файлу, поэтому вместо этого мы загрузим весь текст.

Еще одна интересная опция seda — вы можете установить количество одновременных потоков для получения сообщений для их обработки! Допустим, если у ваших обычных файлов большой трафик, вы можете настроить использование большего количества потоков в этой части (по умолчанию это всего один поток).

1
2
3
4
5
6
from("seda:normalRoute?concurrentConsumers=10")
.process(new Processor() {
    public void process(Exchange msg) {
        LOG.info("Processing normal file: " + msg);
    }
});

Чтобы убедиться, что вы работаете одновременно, вы можете легко настроить регистратор на отображение имени потока. Например, с log4j , вы можете использовать этот шаблон:

1
2
3
4
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p %t [%c] - %m%n

В компоненте Seda доступны дополнительные опции, которые вы можете изучить. Попробуйте это с маршрутом и убедитесь сами.

Ссылка: Изучение Apache Camel Core — компонента Seda от нашего партнера JCG Земьяна Дена в блоге A Programmer’s Journal .