Статьи

Извлеченные уроки: ActiveMQ, Apache Camel и пул соединений

Время от времени я сталкиваюсь с интересной проблемой, связанной с соединениями и пулами с ActiveMQ , и сегодня я хотел бы обсудить кое-что, что не всегда очень ясно и может потенциально заставить вас сильно пить при использовании ActiveMQ и Camel JMS . Не говоря уже о том, что вы не захотите много пить при использовании ActiveMQ и Camel в любом случае … в честь того, насколько восхитительной становится интеграция и обмен сообщениями при их использовании, конечно.

Итак, сначала. Пул подключений.

Конечно, вы всегда слышали, чтобы объединить свои связи. Что это на самом деле означает, и почему вы хотите это сделать?

Открытие соединения с брокером ActiveMQ является относительно дорогой операцией по сравнению с другими действиями, такими как создание сеанса или потребителя. Поэтому, отправляя или получая сообщения и обычно взаимодействуя с брокером, вы хотели бы повторно использовать существующие подключения, если это возможно. Чего вы не хотите делать, так это полагаться на библиотеку JMS (например, Spring JmsTemplate), которая открывает и закрывает соединения для каждой отправки или получения сообщения … если только вы не можете объединять / кэшировать свои соединения.

Так что, если мы можем согласиться с тем, что пул соединений является хорошей идеей, взглянем на пример конфигурации:

    <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" init-method="start" destroy-method="stop">
        
        <property name="maxConnections" value="10" />
        <property name="maximumActiveSessionPerConnection" value="10" />

        <property name="connectionFactory" >
            <bean class="org.apache.activemq.ActiveMQConnectionFactory">
                <property name="brokerURL" value="tcp://127.0.0.1:61616" />
            </bean>
        </property>
    </bean>

Возможно, вы даже захотите использовать Apache Camel и его замечательный компонент camel-jms, потому что иначе было бы глупо. Поэтому, возможно, вы захотите настроить конфигурацию JMS, аналогичную следующей:

    <bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration">
        <property name="connectionFactory" ref="pooledConnectionFactory" />
        <property name="transacted" value="true" />
        <property name="concurrentConsumers" value="15" />
        <property name="deliveryPersistent" value="true" />
        <property name="requestTimeout" value="10000" />
        <property name="cacheLevelName"  value="CACHE_CONSUMER" />
    </bean>

Эта конфигурация в основном означает для потребителей, настроить 15 одновременных потребителей, использовать транзакции (локальные), использовать сообщения PERSISTENT для производителей, установить тайм-аут на 10000 для запроса-ответа и т. Д., И т. Д.

Огромное примечание : если вы хотите более подробно ознакомиться с конфигами для компонента jms, особенно в том, что касается кэширования потребителей, транзакций и многого другого, ознакомьтесь с отличным блогом Торстена о Camel JMS с транзакциями — урок усвоен . Может быть, вам стоит потратить некоторое время на изучение его блога, потому что у него тоже много хороших вещей из Camel / ActiveMQ:)

Круто пока. У нас есть пул соединений из 10 соединений, мы ожидаем 10 сеансов на соединение (всего 100 сеансов, если нам это нужно …) и 15 одновременных потребителей. Мы должны быть в состоянии справиться с серьезной нагрузкой, верно?

Посмотрите на этот маршрут здесь. Это достаточно просто, предоставляет компонент activemq (который будет использовать jmsConfig сверху, так что 15 одновременных потребителей) и просто делает некоторые записи:

from("activemq:test.queue")
          .routeId("test.queue.routeId")
          .to("log:org.apache.camel.blog?groupSize=100");

Попробуйте и запустите это. Вы сразу обнаружите, что ваши потребители заблокированы, и следы стека покажут эту красоту:

"Camel (camel-1) thread #1 - JmsConsumer[test.queue]" daemon prio=5 tid=7f81eb4bc000 nid=0x10abbb000 in Object.wait() [10abba000]
java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
- waiting on <7f40e9070> (a org.apache.commons.pool.impl.GenericKeyedObjectPool$Latch)
at java.lang.Object.wait(Object.java:485)
at org.apache.commons.pool.impl.GenericKeyedObjectPool.borrowObject(GenericKeyedObjectPool.java:1151)
- locked <7f40e9070> (a org.apache.commons.pool.impl.GenericKeyedObjectPool$Latch)
at org.apache.activemq.pool.ConnectionPool.createSession(ConnectionPool.java:146)
at org.apache.activemq.pool.PooledConnection.createSession(PooledConnection.java:173)
at org.springframework.jms.support.JmsAccessor.createSession(JmsAccessor.java:196)
....

Как это может быть? У нас есть пул соединений … у нас есть сеансы на соединение, настроенные на 10 на соединение, так как мы все заблокированы при создании новых сеансов?

Ответ в том, что вы исчерпали количество сессий, как вы можете ожидать по трассировке стека. Но как? И сколько мне нужно выпить, чтобы решить эту проблему?

Хорошо держись сейчас. Хватай пиво и выслушай меня.

Сначала поймите это. Реализация пула ActiveMQ использует commons-pool, а атрибут maxActiveSessionsPerConnection фактически сопоставляется со свойством maxActive базового пула. Из документов это означает:

maxActive controls the maximum number of objects (per key) that can allocated by the pool (checked out to client threads, or idle in the pool) at one time.

Ключ здесь — это «ключ» (буквально… пункт «на ключ» в документации). Таким образом, в реализации ActiveMQ ключ представляет собой объект, который представляет 1) является ли режим сеанса транзакцией и 2) что такое режим подтверждения (), как показано здесь . Таким образом, проще говоря, вы получите «maxActive» сеансы для каждого ключа, который используется в этом соединении … так что если у вас есть клиенты, использующие транзакции, без транзакций, client-ack, auto-ack, transacted-session, dups-ok, и т. д. вы можете начать видеть, что в итоге вы получите сеансы «maxActive» для каждой перестановки. Так что, если для maxActiveSesssionsPerConnection установлено значение 10, вы можете в итоге получить 10 x 2 x 4 == 80 сеансов. Это то, что нужно спрятать в глубине души.

Второй ключ здесь заключается в том, что, когда компонент camel-jms устанавливает потребителей, он в конечном итоге разделяет одно соединение между всеми потребителями, указанными в сеансе concurrentConsumers. Это интересный момент, потому что camel-jms использует базовую среду Spring DefaultMessageListenerContainer и, к сожалению, это ограничение исходит из этой библиотеки. Таким образом, если у вас есть 15 одновременных потребителей, все они будут совместно использовать одно соединение (даже если объединение в пул… оно будет захватывать одно соединение из пула и удерживать его). Таким образом, если у вас есть 15 потребителей, каждый из которых использует одно и то же соединение, каждый из которых использует транзакционный режим, каждый из которых имеет совместный режим подтверждения, тогда вы в конечном итоге попытаетесь создать 15 сеансов для этого одного соединения. И вы в конечном итоге с вышеупомянутым.

Так что мое эмпирическое правило, чтобы избежать этих сценариев:

  • Понять точно, что делают каждый из ваших производителей и потребителей, каковы их режимы TX и ACK
  • Всегда настраивайте максимальное количество сеансов, когда вам НУЖНО (слишком много потоков сеансов? Я не знаю ..), но всегда выполняйте concurrentConsumers + 1 в качестве значения по крайней мере
  • Если производители и потребители производят / потребляют одно и то же место, РАЗДЕЛИТЕ БАССЕЙН ПОДКЛЮЧЕНИЯ: один пул для потребителей, один пул для производителей

Не знаю, насколько ценна эта информация, но я хотел записать ее для себя. Если кто-то считает это ценным или у него есть вопросы, дайте мне знать в комментариях.