Опрос пустого каталога (и отправка пустого сообщения с нулевым телом):
1
|
from( 'file://temp?sendEmptyMessageWhenIdle=true' ) |
Остановить маршрут:
1
2
3
4
5
|
.process( new Processor() { public void process(Exchange exchange) throws Exception { getContext().stopRoute( 'ROUTE_ID' ); } }) |
Доступ к свойству объекта в теле:
признание объекта имеет метод с именем ‘getMydata ()’:
1
|
new ValueBuilder(simple( '${body.mydata}' )).isEqualTo(...) |
Определите агрегатор:
1
2
3
4
|
.aggregate(simple( '${header.id}.substring(0,15)' ), genericAggregationStrategy) .completionPredicate(header(Exchange.BATCH_COMPLETE) .isEqualTo(Boolean.TRUE)) |
-
'${header.id}.substring(0,15)'
: флаг для различения сообщений (здесь возвращаемая строка является общей для всех сообщений, мы объединяем их все) -
Exchange.BATCH_COMPLETE
: предикат, указывающий конец опроса (например, все файлы были проанализированы) -
genericAggregationStrategy
: выше, пример агрегатора, группирующего все сообщения в списке:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
public class GenericAggregationStrategy implements AggregationStrategy { @SuppressWarnings ( 'unchecked' ) public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { if (oldExchange == null ) { ArrayList<Object> list = new ArrayList<Object>(); list.add(newExchange.getIn().getBody()); newExchange.getIn().setBody(list); return newExchange; } else { Object oldIn = oldExchange.getIn().getBody(); ArrayList<Object> list = null ; if (oldIn instanceof ArrayList) { list = (ArrayList<Object>) oldIn; } else { list = new ArrayList<Object>(); list.add(oldIn); } list.add(newExchange.getIn().getBody()); newExchange.getIn().setBody(list); return newExchange; } } } |
Вручную запустить завершение агрегации (что бы это ни было):
Отправьте сообщение с заголовком Exchange.AGGREGATION_COMPLETE_ALL_GROUPS = true
Это можно сделать from('bean:...')
, зная, что bean-компонент будет опрашиваться постоянно (как в случае с ‘file’) и каждый раз повторяться. Измените тело сообщения на маршруте, используя:
1
|
.transform(myExpression) |
с помощью myExpression
:
1
2
3
4
5
6
7
|
public class MyExpression implements Expression { public <T> T evaluate(Exchange exchange, Class<T> type) { MyBean newData = ...; return exchange.getContext().getTypeConverter() .convertTo(type, newData); } } |
Используя JaxB:
- по маршруту:
1
.[un]marshal().jaxb(
'my.business_classes.package'
)
- с настраиваемым форматом данных:
1
.[un]marshal(jaxbDataFormat)
с участием :
1
2
3
4
|
// indicate to Jaxb to not write XML prolog : JaxbDataFormat jaxbDataFormat = new JaxbDataFormat( 'my.business_classes.package' ); jaxb.setFragment( true ); |
Общие понятия для управления потоками:
-
from(...)
= поток - кроме
from('direct:...')
создает «именованный маршрут» с уникальным идентификатором, вызываемым только другим маршрутом (в том же потоке, что и вызывающая сторона). - Компонент
.resequence().batch()
создает новый поток для перебрасывания сообщений.
Определите стратегию выключения:
1
|
getContext().setShutdownStrategy( new MyShutdownStrategy(getContext())); |
С участием :
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
|
public class MyShutdownStrategy extends DefaultShutdownStrategy { protected CamelContext camelContext; private long timeout = 1 ; private TimeUnit timeUnit = TimeUnit.SECONDS; public SpiralShutdownStrategy(CamelContext camelContext) { this .camelContext = camelContext; } @Override public long getTimeout() { return this .timeout; } @Override public TimeUnit getTimeUnit() { return this .timeUnit; } @Override public CamelContext getCamelContext() { return this .camelContext; } /** * To ensure shutdown * */ @Override public void suspend(CamelContext context, List<RouteStartupOrder> routes) throws Exception { doShutdown(context, routes, getTimeout(), getTimeUnit(), false , false , false ); } /** * To ensure shutdown * */ @Override public void shutdown(CamelContext context, List<RouteStartupOrder> routes, long timeout, TimeUnit timeUnit) throws Exception { doShutdown(context, routes, this .timeout, this .timeUnit, false , false , false ); } /** * To ensure shutdown * */ @Override public boolean shutdown(CamelContext context, RouteStartupOrder route, long timeout, TimeUnit timeUnit, boolean abortAfterTimeout) throws Exception { super .shutdown(context, route, this .timeout, this .timeUnit, false ); return true ; } } |
Остановить партию:
1
2
3
4
5
|
.process( new Processor() { public void process(Exchange exchange) throws Exception { context.stop(); } }); |
Вызов метода бина из маршрута:
- Возвращение метода всегда зависит от тела сообщения. Например :
-
public void myMethod(Exchange e)
:
Не будет изменять тело -
public boolean myMethod(Exchange e)
:
логическое значение (или любой другой тип примитива) будет установлено в теле -
public Object myMethod(Exchange e)
:
Объект будет помещен в тело (даже если ноль) -
public Message myMethod(Exchange e)
:
Сообщение будет помещено в тело (лучше избегайте этого) -
public List<Object> myMethod(Exchange e)
:
список будет установлен в теле: полезно использовать с.split()
, каждый объект будет отправлен в новом сообщении -
public List<Message> myMethod(Exchange e)
:
список будет установлен в теле:.split()
создаст новое сообщение для каждого элемента (лучше избегайте, см. выше)
-
- настраиваемые параметры метода:
-
public void myMethod(Exchange e)
:
полный обмен будет пройден -
public void myMethod(Object o)
:
Верблюд попытается преобразовать тело в класс требуемого параметра -
public void myMethod(@Body File o, @Header('myHeader') String myParamHeader)
:
Верблюд будет вводить каждый параметр, как указано
-
Управление исключениями на маршрутах:
- в глобальном масштабе (должен быть объявлен перед всеми маршрутами):
1
onException(MyException.
class
, RuntimeCamelException.
class
).to(...)...
- чтобы действительно обрабатывать Exception, а не пузырить его в маршрутах (и журналах):
1
onException(...).handled(
true
).to(...)...
- продолжить процесс в маршруте после исключения:
1
onException(...).continued(
true
).to(...)...
- Исключение «обработано» или «продолжено»
- местный путь (по маршруту):
123
from(...)
.onException(...).to(
'manage_error'
).log(
'FAIL !!'
).end()
.to(
'continue_route'
)...
Для записи файла необходим только заголовок Exchange.FILE_NAME
.
.resequence
сообщений с компонентом .resequence
:
- использует выражение для вычисления нового порядка сообщений из уникального сопоставимого «ключа» (число, строка или пользовательский компаратор)
- два пути :
- .batch (): пакетный режим. Ждет приема ВСЕХ сообщений для их переупорядочения. ВНИМАНИЕ : создается новая тема для перебрасывания сообщений.
- .stream (): потоковый режим. Использует обнаружение пробела между ключами сообщений для их повторной отправки. Можно настроить максимальную емкость и время ожидания.
Разделить тело токеном:
1
|
.split(body().tokenize( 'TOKEN' )) |
Зная, что токен будет удален из контента. Например, при получении сообщения, содержащего: «data1TOKENdata2TOKENdata3», будут созданы следующие сообщения: «data1», «data2,« data3 ». Поэтому избегайте этого при обработке данных XML, предпочитайте tokenizeXML ().
Динамический доступ к данным тела:
- Легкий язык сценариев: простой язык выражений
- Чтение данных файлов: File Expression Language
Отправка почты:
1
2
3
4
5
|
from( 'direct:mail' ) .setHeader( 'To' , constant(mailTo)) .setHeader( 'From' , constant(mailFrom)) .setHeader( 'Subject' , constant(mailSubject)) .to( 'smtp://${user}@${server}:${port}?password=${password}' ); |
С вложением:
01
02
03
04
05
06
07
08
09
10
11
|
.beanRef(MAIL_ATTACHER, 'attachLog' ); //with public class MailAttacher { public void attachLog(Exchange exc) throws Exception { File toAttach = ...; exc.getIn().addAttachment(toAttach.getName(), new DataHandler( new FileDataSource(toAttach))); // if needed exc.setProperty(Exchange.CHARSET_NAME, 'UTF-8' ); } } |
Полезные биржевые свойства:
- Exchange.AGGREGATED_ *: управление агрегатами
- Exchange.BATCH_ *: управление обработанными сообщениями
- Exchange.FILE_ *: управление файловыми сообщениями
- Exchange.HTTP_ *: управление веб-запросами
- Exchange.LOOP_ *: управление циклами
- Exchange.REDELIVERY_ *: управление исключениями
- Exchange.SPLIT_ *: управление разделенным содержимым
Зациклить маршрут:
1
2
3
4
|
from( 'direct:...' ) .loop(countExpression) .to( 'direct:insideLoop' ) .end() |
Где ‘countExpression’ — это выражение, используемое для динамического вычисления количества циклов (вычисляется при входе в цикл). Предпочтительно переместить код цикла по новому маршруту, если процесс сложный.
Управление заголовками:
Заголовки сообщения определяются при его создании. При использовании .split () все последующие сообщения будут иметь те же заголовки, что и исходное сообщение (поэтому будьте осторожны при управлении файлами). В агрегации пользовательские заголовки должны управляться вручную, чтобы их можно было сохранить на оставшейся части маршрута.
Перехватить сообщение
и выполнить маршрут параллельно (должен быть объявлен перед маршрутами):
1
|
interceptSendToEndpoint( 'ENDPOINT_TO_INTERSEPT' ).to(...)... |
Ссылка: Apache Camel Cheatsheet от нашего партнера JCG Пола-Эммануэля в блоге Developpef .