Опрос пустого каталога (и отправка пустого сообщения с нулевым телом):
|
1
|
from('file://temp?sendEmptyMessageWhenIdle=true') |
Остановить маршрут:
|
1
2
3
4
5
|
.process(new Processor() { public void process(Exchange exchange) throws Exception { getContext().stopRoute('ROUTE_ID'); }}) |
Доступ к свойству объекта в теле:
признание объекта имеет метод с именем ‘getMydata ()’:
|
1
|
new ValueBuilder(simple('${body.mydata}')).isEqualTo(...) |
Определите агрегатор:
|
1
2
3
4
|
.aggregate(simple('${header.id}.substring(0,15)'), genericAggregationStrategy).completionPredicate(header(Exchange.BATCH_COMPLETE) .isEqualTo(Boolean.TRUE)) |
-
'${header.id}.substring(0,15)': флаг для различения сообщений (здесь возвращаемая строка является общей для всех сообщений, мы объединяем их все) -
Exchange.BATCH_COMPLETE: предикат, указывающий конец опроса (например, все файлы были проанализированы) -
genericAggregationStrategy: выше, пример агрегатора, группирующего все сообщения в списке:
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
public class GenericAggregationStrategy implements AggregationStrategy { @SuppressWarnings('unchecked') public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { if (oldExchange == null) { ArrayList<Object> list = new ArrayList<Object>(); list.add(newExchange.getIn().getBody()); newExchange.getIn().setBody(list); return newExchange; } else { Object oldIn = oldExchange.getIn().getBody(); ArrayList<Object> list = null; if(oldIn instanceof ArrayList) { list = (ArrayList<Object>) oldIn; } else { list = new ArrayList<Object>(); list.add(oldIn); } list.add(newExchange.getIn().getBody()); newExchange.getIn().setBody(list); return newExchange; } }} |
Вручную запустить завершение агрегации (что бы это ни было):
Отправьте сообщение с заголовком Exchange.AGGREGATION_COMPLETE_ALL_GROUPS = true
Это можно сделать from('bean:...') , зная, что bean-компонент будет опрашиваться постоянно (как в случае с ‘file’) и каждый раз повторяться. Измените тело сообщения на маршруте, используя:
|
1
|
.transform(myExpression) |
с помощью myExpression :
|
1
2
3
4
5
6
7
|
public class MyExpression implements Expression { public <T> T evaluate(Exchange exchange, Class<T> type) { MyBean newData = ...; return exchange.getContext().getTypeConverter() .convertTo(type, newData); }} |
Используя JaxB:
- по маршруту:
1
.[un]marshal().jaxb('my.business_classes.package') - с настраиваемым форматом данных:
1
.[un]marshal(jaxbDataFormat)с участием :
|
1
2
3
4
|
// indicate to Jaxb to not write XML prolog :JaxbDataFormat jaxbDataFormat = new JaxbDataFormat('my.business_classes.package');jaxb.setFragment(true); |
Общие понятия для управления потоками:
-
from(...)= поток - кроме
from('direct:...')создает «именованный маршрут» с уникальным идентификатором, вызываемым только другим маршрутом (в том же потоке, что и вызывающая сторона). - Компонент
.resequence().batch()создает новый поток для перебрасывания сообщений.
Определите стратегию выключения:
|
1
|
getContext().setShutdownStrategy(new MyShutdownStrategy(getContext())); |
С участием :
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
|
public class MyShutdownStrategy extends DefaultShutdownStrategy { protected CamelContext camelContext; private long timeout = 1; private TimeUnit timeUnit = TimeUnit.SECONDS; public SpiralShutdownStrategy(CamelContext camelContext) { this.camelContext = camelContext; } @Override public long getTimeout() { return this.timeout; } @Override public TimeUnit getTimeUnit() { return this.timeUnit; } @Override public CamelContext getCamelContext() { return this.camelContext; } /** * To ensure shutdown * */ @Override public void suspend(CamelContext context, List<RouteStartupOrder> routes) throws Exception { doShutdown(context, routes, getTimeout(), getTimeUnit(), false, false, false); } /** * To ensure shutdown * */ @Override public void shutdown(CamelContext context, List<RouteStartupOrder> routes, long timeout, TimeUnit timeUnit) throws Exception { doShutdown(context, routes, this.timeout, this.timeUnit, false, false, false); } /** * To ensure shutdown * */ @Override public boolean shutdown(CamelContext context, RouteStartupOrder route, long timeout, TimeUnit timeUnit, boolean abortAfterTimeout) throws Exception { super.shutdown(context, route, this.timeout, this.timeUnit, false); return true; }} |
Остановить партию:
|
1
2
3
4
5
|
.process(new Processor() { public void process(Exchange exchange) throws Exception { context.stop(); }}); |
Вызов метода бина из маршрута:
- Возвращение метода всегда зависит от тела сообщения. Например :
-
public void myMethod(Exchange e):
Не будет изменять тело -
public boolean myMethod(Exchange e):
логическое значение (или любой другой тип примитива) будет установлено в теле -
public Object myMethod(Exchange e):
Объект будет помещен в тело (даже если ноль) -
public Message myMethod(Exchange e):
Сообщение будет помещено в тело (лучше избегайте этого) -
public List<Object> myMethod(Exchange e):
список будет установлен в теле: полезно использовать с.split(), каждый объект будет отправлен в новом сообщении -
public List<Message> myMethod(Exchange e):
список будет установлен в теле:.split()создаст новое сообщение для каждого элемента (лучше избегайте, см. выше)
-
- настраиваемые параметры метода:
-
public void myMethod(Exchange e):
полный обмен будет пройден -
public void myMethod(Object o):
Верблюд попытается преобразовать тело в класс требуемого параметра -
public void myMethod(@Body File o, @Header('myHeader') String myParamHeader):
Верблюд будет вводить каждый параметр, как указано
-
Управление исключениями на маршрутах:
- в глобальном масштабе (должен быть объявлен перед всеми маршрутами):
1
onException(MyException.class, RuntimeCamelException.class).to(...)... - чтобы действительно обрабатывать Exception, а не пузырить его в маршрутах (и журналах):
1
onException(...).handled(true).to(...)... - продолжить процесс в маршруте после исключения:
1
onException(...).continued(true).to(...)... - Исключение «обработано» или «продолжено»
- местный путь (по маршруту):
123
from(...).onException(...).to('manage_error').log('FAIL !!').end().to('continue_route')...
Для записи файла необходим только заголовок Exchange.FILE_NAME .
.resequence сообщений с компонентом .resequence :
- использует выражение для вычисления нового порядка сообщений из уникального сопоставимого «ключа» (число, строка или пользовательский компаратор)
- два пути :
- .batch (): пакетный режим. Ждет приема ВСЕХ сообщений для их переупорядочения. ВНИМАНИЕ : создается новая тема для перебрасывания сообщений.
- .stream (): потоковый режим. Использует обнаружение пробела между ключами сообщений для их повторной отправки. Можно настроить максимальную емкость и время ожидания.
Разделить тело токеном:
|
1
|
.split(body().tokenize('TOKEN')) |
Зная, что токен будет удален из контента. Например, при получении сообщения, содержащего: «data1TOKENdata2TOKENdata3», будут созданы следующие сообщения: «data1», «data2,« data3 ». Поэтому избегайте этого при обработке данных XML, предпочитайте tokenizeXML ().
Динамический доступ к данным тела:
- Легкий язык сценариев: простой язык выражений
- Чтение данных файлов: File Expression Language
Отправка почты:
|
1
2
3
4
5
|
from('direct:mail') .setHeader('To', constant(mailTo)) .setHeader('From', constant(mailFrom)) .setHeader('Subject', constant(mailSubject)) .to('smtp://${user}@${server}:${port}?password=${password}'); |
С вложением:
|
01
02
03
04
05
06
07
08
09
10
11
|
.beanRef(MAIL_ATTACHER, 'attachLog');//withpublic class MailAttacher { public void attachLog(Exchange exc) throws Exception { File toAttach = ...; exc.getIn().addAttachment(toAttach.getName(), new DataHandler(new FileDataSource(toAttach))); // if needed exc.setProperty(Exchange.CHARSET_NAME, 'UTF-8'); }} |
Полезные биржевые свойства:
- Exchange.AGGREGATED_ *: управление агрегатами
- Exchange.BATCH_ *: управление обработанными сообщениями
- Exchange.FILE_ *: управление файловыми сообщениями
- Exchange.HTTP_ *: управление веб-запросами
- Exchange.LOOP_ *: управление циклами
- Exchange.REDELIVERY_ *: управление исключениями
- Exchange.SPLIT_ *: управление разделенным содержимым
Зациклить маршрут:
|
1
2
3
4
|
from('direct:...').loop(countExpression).to('direct:insideLoop').end() |
Где ‘countExpression’ — это выражение, используемое для динамического вычисления количества циклов (вычисляется при входе в цикл). Предпочтительно переместить код цикла по новому маршруту, если процесс сложный.
Управление заголовками:
Заголовки сообщения определяются при его создании. При использовании .split () все последующие сообщения будут иметь те же заголовки, что и исходное сообщение (поэтому будьте осторожны при управлении файлами). В агрегации пользовательские заголовки должны управляться вручную, чтобы их можно было сохранить на оставшейся части маршрута.
Перехватить сообщение
и выполнить маршрут параллельно (должен быть объявлен перед маршрутами):
|
1
|
interceptSendToEndpoint('ENDPOINT_TO_INTERSEPT').to(...)... |
Ссылка: Apache Camel Cheatsheet от нашего партнера JCG Пола-Эммануэля в блоге Developpef .