Время от времени я сталкиваюсь с интересной проблемой, связанной с соединениями и пулами с ActiveMQ , и сегодня я хотел бы обсудить кое-что, что не всегда очень ясно и может потенциально заставить вас сильно пить при использовании ActiveMQ и Camel JMS . Не говоря уже о том, что вы не захотите много пить при использовании ActiveMQ и Camel в любом случае… в честь того, насколько восхитительной становится интеграция и обмен сообщениями при их использовании, конечно.
Итак, сначала. Пул подключений.
Конечно, вы всегда слышали, чтобы объединить свои связи. Что это на самом деле означает, и почему вы хотите это сделать?
Открытие соединения с брокером ActiveMQ является относительно дорогой операцией по сравнению с другими действиями, такими как создание сеанса или потребителя. Поэтому, отправляя или получая сообщения и обычно взаимодействуя с брокером, вы хотели бы повторно использовать существующие подключения, если это возможно. Чего вы не хотите делать, так это полагаться на библиотеку JMS (например, Spring JmsTemplate), которая открывает и закрывает соединения для каждой отправки или получения сообщения … если только вы не можете объединять / кэшировать свои соединения.
Так что, если мы можем согласиться с тем, что пул соединений является хорошей идеей, взглянем на пример конфигурации:
01
02
03
04
05
06
07
08
09
10
11
|
< bean id = "pooledConnectionFactory" class = "org.apache.activemq.pool.PooledConnectionFactory" init-method = "start" destroy-method = "stop" > < property name = "maxConnections" value = "10" /> < property name = "maximumActiveSessionPerConnection" value = "10" /> < property name = "connectionFactory" > < bean class = "org.apache.activemq.ActiveMQConnectionFactory" > < property name = "brokerURL" value = "tcp://127.0.0.1:61616" /> </ bean > </ property > </ bean > |
Возможно, вы даже захотите использовать Apache Camel и его замечательный компонент camel-jms, потому что иначе было бы глупо. Поэтому, возможно, вы захотите настроить конфигурацию JMS, аналогичную следующей:
1
2
3
4
5
6
7
8
|
< bean id = "jmsConfig" class = "org.apache.camel.component.jms.JmsConfiguration" > < property name = "connectionFactory" ref = "pooledConnectionFactory" /> < property name = "transacted" value = "true" /> < property name = "concurrentConsumers" value = "15" /> < property name = "deliveryPersistent" value = "true" /> < property name = "requestTimeout" value = "10000" /> < property name = "cacheLevelName" value = "CACHE_CONSUMER" /> </ bean > |
Эта конфигурация в основном означает для потребителей, настроить 15 одновременных потребителей, использовать транзакции (локальные), использовать сообщения PERSISTENT для производителей, установить тайм-аут на 10000 для запроса-ответа и т. Д., И т. Д.
Огромное примечание : если вы хотите более подробно ознакомиться с конфигами для компонента jms, особенно в том, что касается кэширования потребителей, транзакций и многого другого, ознакомьтесь с отличным блогом Торстена о Camel JMS с транзакциями — урок усвоен . Может быть, вам стоит потратить некоторое время на изучение его блога, потому что у него тоже много хороших вещей из Camel / ActiveMQ!
Круто пока. У нас есть пул соединений из 10 соединений, мы ожидаем 10 сеансов на соединение (всего 100 сеансов, если нам это нужно …) и 15 одновременных потребителей. Мы должны быть в состоянии справиться с серьезной нагрузкой, верно?
Посмотрите на этот маршрут здесь. Это достаточно просто, предоставляет компонент activemq (который будет использовать jmsConfig сверху, так что 15 одновременных потребителей) и просто делает некоторые записи:
1
2
3
|
from( "activemq:test.queue" ) .routeId( "test.queue.routeId" ) .to( "log:org.apache.camel.blog?groupSize=100" ); |
Попробуйте и запустите это. Вы сразу обнаружите, что ваши потребители заблокированы, и следы стека покажут эту красоту:
01
02
03
04
05
06
07
08
09
10
11
|
"Camel (camel-1) thread #1 - JmsConsumer[test.queue]" daemon prio=5 tid=7f81eb4bc000 nid=0x10abbb000 in Object.wait() [10abba000] java.lang.Thread.State: WAITING (on object monitor) at java.lang.Object.wait(Native Method) - waiting on <7f40e9070> (a org.apache.commons.pool.impl.GenericKeyedObjectPool$Latch) at java.lang.Object.wait(Object.java:485) at org.apache.commons.pool.impl.GenericKeyedObjectPool.borrowObject(GenericKeyedObjectPool.java:1151) - locked <7f40e9070> (a org.apache.commons.pool.impl.GenericKeyedObjectPool$Latch) at org.apache.activemq.pool.ConnectionPool.createSession(ConnectionPool.java:146) at org.apache.activemq.pool.PooledConnection.createSession(PooledConnection.java:173) at org.springframework.jms.support.JmsAccessor.createSession(JmsAccessor.java:196) .... |
Как это может быть? У нас есть пул соединений … у нас есть сеансы на соединение, настроенные на 10 на соединение, так как мы все заблокированы при создании новых сеансов?
Ответ в том, что вы исчерпали количество сессий, как вы можете ожидать по трассировке стека. Но как? И сколько мне нужно выпить, чтобы решить эту проблему?
Хорошо держись сейчас. Хватай пиво и выслушай меня.
Сначала поймите это. Реализация пула ActiveMQ использует commons-pool, а атрибут maxActiveSessionsPerConnection фактически сопоставляется со свойством maxActive базового пула. Из документов это означает:
1
|
maxActive controls the maximum number of objects (per key) that can allocated by the pool (checked out to client threads, or idle in the pool) at one time. |
Ключ здесь — это «ключ» (буквально… пункт «на ключ» в документации). Таким образом, в реализации ActiveMQ ключ представляет собой объект, который представляет 1) является ли режим сеанса транзакцией и 2) что такое режим подтверждения (), как показано здесь . Таким образом, проще говоря, вы получите сеансы «maxActive» для каждого ключа, который используется в этом соединении … так что если у вас есть клиенты, которые используют транзакции, нет транзакций, client-ack, auto-ack, transacted-session, dups-ok, и т. д. вы можете начать видеть, что в итоге вы получите сеансы «maxActive» для каждой перестановки. Так что, если для maxActiveSesssionsPerConnection установлено значение 10, вы можете в итоге получить 10 x 2 x 4 == 80 сеансов. Это то, что нужно спрятать в глубине души.
Второй ключ здесь заключается в том, что, когда компонент camel-jms устанавливает потребителей, он в конечном итоге разделяет одно соединение между всеми потребителями, указанными в сеансе concurrentConsumers. Это интересный момент, потому что camel-jms использует базовую среду Spring DefaultMessageListenerContainer и, к сожалению, это ограничение исходит из этой библиотеки. Таким образом, если у вас есть 15 одновременных потребителей, все они будут совместно использовать одно соединение (даже если объединение в пул… оно будет захватывать одно соединение из пула и удерживать его). Таким образом, если у вас есть 15 потребителей, каждый из которых использует одно и то же соединение, каждый из которых использует транзакционный режим, каждый из которых имеет совместный режим подтверждения, тогда вы в конечном итоге попытаетесь создать 15 сеансов для этого одного соединения. И вы в конечном итоге с вышеупомянутым.
Так что мое эмпирическое правило, чтобы избежать этих сценариев:
- Понять точно, что делают каждый из ваших производителей и потребителей, каковы их режимы TX и ACK
- Всегда настраивайте максимальное количество сеансов, когда вам НУЖНО (слишком много потоков сеансов? Я не знаю ..), но всегда выполняйте concurrentConsumers + 1 в качестве значения по крайней мере
- Если производители и потребители производят / потребляют одно и то же место, РАЗДЕЛИТЕ БАССЕЙН ПОДКЛЮЧЕНИЯ: один пул для потребителей, один пул для производителей
Не знаю, насколько ценна эта информация, но я хотел записать ее для себя. Если кто-то считает это ценным или у него есть вопросы, дайте мне знать в комментариях.