Статьи

Реализация повторных попыток с помощью MDB или пакетного задания MQ? (БЫЛО 7, MQ 6)

Нам нужно прослушивать сообщения, распространяемые через Websphere MQ, чтобы получать информацию, когда сотрудник присоединяется к IBM или покидает ее. А поскольку ресурсы, используемые при обработке (база данных, веб-служба), могут быть временно недоступны, мы должны иметь возможность справляться с такими сбоями, которые могут варьироваться от минут до часов, многократно повторяя попытку обработки после некоторой задержки. И мы также должны иметь возможность иметь дело с «ядовитыми сообщениями», это означает, что сообщения, обработка которых всегда терпит неудачу, либо потому, что их содержимое недопустимо, либо потому, что их данные не согласуются с базой данных.

Вопрос в том, будет ли это лучше реализовано как компонент, управляемый сообщениями (MDB), или как пакетное задание, регулярно проверяющее его очередь, учитывая, что у нас есть Websphere Application Server 7 (и, следовательно, Java EE 5) и Websphere MQ 6, которые оба имеют некоторые важные изменения по сравнению с предыдущими версиями. Оказывается, это зависит — оба подхода имеют свои преимущества и недостатки, и поэтому речь идет о вероятности возникновения особых проблем, а также требований и приоритетов бизнеса.

Настройка сцены

MDB против пакетной работы: факторы решения

Должны ли мы выбрать MDB или подход пакетной работы, зависит от ряда факторов, некоторые из них:

  • Требования

    • Количество: Какое количество сообщений нам нужно обработать?
    • Вероятность ошибки: какова вероятность того, что ресурс будет временно недоступен или что сообщение будет содержать данные, которые не могут быть правильно обработаны, и как скоро такая проблема должна быть решена? Т.е. мы можем подождать до другого дня или мы начнем работать, как только ресурс снова заработает? Это скажет нам, как часто нам нужно повторять попытки и достаточно ли ручного решения проблемы.
  • Поддержка обработки ошибок / логики повторных попыток
  • Простота использования / разработки / обслуживания
  • Производительность: нам нужно обрабатывать все входящие сообщения и оказывать минимальное негативное влияние на производительность целевой системы.
  • Скорость обработки входящих сообщений (немедленное против одного или нескольких раз в день)
  • Интеграция с нашим мониторингом операций (ведение журнала, наше веб-приложение Operations Dashboard)

Проблемы для решения

Существует три типа проблем:

  1. Сбой связи с MQ, например, из-за прерывания сетевого подключения
  2. Невозможность обработать сообщение из-за временной недоступности ресурса (БД или WS)
  3. Ядовитое (недействительное) сообщение (неправильный тип данных, неожиданное содержимое), приводящее к исключению при его обработке

Два подхода

Подход 1: Бин, управляемый сообщениями (MDB)

MDB размещается на сервере приложений, который выполняет большую часть работы от имени компонента (например, управление транзакциями и параллелизмом), что упрощает его разработку и настройку. Это может быть так же просто, как написание

@javax.ejb.MessageDrivenpublic class SimpleMessageBean implements javax.jms.MessageListener {    public void onMessage(javax.jms.Message inMessage) {    final javax.jms.TextMessage msg = (javax.jms.TextMessage) inMessage;    final String msgBody = msg.getText();    // handle the msgBody ...   }}

и настройку ActivationSpecification для MDB в JNDI через приложение. Пользовательский интерфейс администрирования сервера.

Вопрос, конечно, в том, насколько хорошо он может обрабатывать вредоносные сообщения и повторные попытки, когда ресурсы временно недоступны.

Обработка ошибок MDB и конфигурация в Websphere 7

Давайте посмотрим, как Websphere обрабатывает различные виды ошибок, связанных с MDB, и как мы конфигурируем MDB на этом сервере приложений, особенно в отношении обработки ошибок.

Обработка ошибок MDB в Websphere

Что происходит при возникновении ошибки?

Обычно сервер приложений запускает MQ- транзакцию до того, как он вызывает MDB, и либо фиксирует ее, когда MDB завершает работу, либо откатывает ее, когда выдает исключение. Если транзакция завершается успешно, сообщение удаляется из очереди MDB, в противном случае оно будет возвращено и обработано снова в будущем. Это поведение по умолчанию, соответствующее настройке управляемых контейнером транзакций (tx) с типом ‘required’. Обратите внимание, что в этой транзакции могут участвовать и операции с БД, и, таким образом, они также фиксируются / откатываются, что может быть полезно.

  1. В случае сбоя связи / соединения MQ WAS регистрирует исключение и позже повторяет попытку подключения на основе его конфигурации. Также возможно установить ExceptionListener , который будет вызываться с исключением в качестве параметра в таком случае.
  2. В случае исключения во время обработки сообщения (или из-за ручного вызова setRollbackOnly) текущая транзакция откатывается, сообщение помещается обратно в очередь и MDB перезапускается. Когда очередь перепроверяется, сообщение снова обнаруживается и передается другому MDB. Если причина проблемы не исчезнет, ​​это снова не удастся — и так далее.

У нас есть два способа справиться с ошибочным сообщением:

  1. Удалите сообщение из очереди, либо отбросив его, либо переместив в «очередь возврата» очереди. Это уместно, когда само сообщение является проблемой (например, содержит данные, несовместимые с БД …).
  2. Остановите обработку сообщений из очереди (приостановите спецификацию активации) и перезапустите ее позже, когда проблема будет решена. Это целесообразно, когда необходимый ресурс временно недоступен.

Конфигурация поставщика сообщений и ресурсов JMS, связанная с обработкой ошибок

Мы будем использовать спецификацию активации JCA 1.5 (а не порты слушателя, которые устарели после WAS 7) с Websphere MQ в качестве поставщика, что ограничивает наши параметры конфигурации описанными ниже.

  • Отключение (временно) всей очереди, точнее отключение MDB

    • «Количество последовательных сбоев доставки до приостановки конечной точки» — в спецификации активации MQ
    • «Остановить конечную точку, если доставка сообщения не удалась» — если истина, доставка сообщения приостанавливается, если количество последовательных сбоев доставки… превышено

      • До Fix Pack 7.0.1.0 это применимо только в том случае, если MDB выдает исключение или возникает внутренняя ошибка, а не когда транзакция помечена для отката ( ошибка IC60714 ).
      • Когда конечная точка остановлена, может быть отправлено уведомление JMX, чтобы кто-то был проинформирован о том, что конечную точку необходимо будет повторно включить. Возможно, мы могли бы автоматизировать эту повторную активацию с помощью запланированного EJB без сохранения состояния, используя Websphere JMX для повторной активации после задержки.
  • Удаление проблемных сообщений (это делает сам Websphere MQ, а не WAS)

    • «Порог возврата» (BOTHRESH; свойство очереди, настроенное с помощью MQ) указывает максимальное количество раз, когда сообщение может быть помещено в очередь до его перемещения в указанную очередь запроса на возврат. По умолчанию: 0 или не установлено ?! => никогда не доставляется
    • «Очередь возврата заданий» (BOQNAME; свойство очереди, настроенной через MQ) — очередь, в которую следует помещать сообщения с ошибками; по умолчанию: SYSTEM.DEAD.LETTER.QUEUE
    • ВНИМАНИЕ: Это относится только к очереди, а не к теме. Но под темами в любом случае используются (вероятно, динамические) очереди, поэтому это должно быть как-то возможно.

  • Другие связанные настройки

    • Нет способа настроить, как часто WAS должен проверять наличие новых сообщений , по крайней мере, я не смог найти. Существует « интервал повторного сканирования », который сообщает WAS, как долго ждать, прежде чем проверять другую очередь, но, кажется, применяется только тогда, когда очередей больше, чем потоков сканирования. Значение по умолчанию — 5 с, и, согласно некоторым сообщениям, оно не может быть увеличено (хотя это может быть не так на нашем уровне версии / пакета исправлений).

Интересные ресурсы:

Дизайн подхода MDB

Дизайн обработки ошибок

Основная проблема с подходом MDB заключается в том, что он не поддерживает повторную попытку операции после задержки (либо для одного сообщения с ошибкой, либо для всей очереди, если ресурс временно недоступен). Есть некоторые обходные пути, но не очень хорошие.

  • Для одного сообщения я не смог найти способ осуществить повторные попытки после некоторой, предпочтительно увеличивающейся задержки; единственное, что мы можем сделать, — это повторить его несколько раз с задержкой по умолчанию Websphere, которая составляет 5 с, и, если она все еще не удалась, переместить ее в специальную очередь, которая будет обрабатываться вручную, возможно, также отправляя электронное письмо. уведомление.
  • Если есть какая-то глобальная проблема, такая как недоступный ресурс, обозначенный несколькими последовательными сбоями одного или нескольких сообщений (в зависимости от содержимого очереди), мы могли бы позволить WAS остановить MDB и повторно включить его позже либо автоматически после задержать или вручную, когда проблема будет решена.

MDB дизайн

  • При обнаружении недоступности ресурса позвольте WAS автоматически остановить MDB с помощью параметра «Количество последовательных сбоев доставки до приостановки конечной точки». Его нужно будет повторно включить вручную или автоматически.

    • Повторная активация MDB вручную: мы должны каким-то образом обнаружить, что MDB был отключен (возможно, только путем просмотра журнала), выяснить причину его сбоя и повторно включить его через консоль администрирования Websphere.
    • Automated re-activation: Implement a scheduled stateless EJB, which re-enables the MDB after few tens of minutes – preferably do this few times with increasing delay, if still failing, give up and notify an admin.
      • To find out when reactivation is necessary, the re-activating EJB can either regularly check the status of the MDB (which is feasible) or listen for JMX notifications issued by the logging system and watch for a deactivation
      • The re-activation itself is performed via JMX by invoking resume() on the appropriate J2CMessageEndpoint MBean (see the link above for how to get a handle to it).
      • In any case the application would need the permission to perform some WAS administration operations, namely to re-activate the MDB, and perhaps also to access JMX or the AdminServiceFactory/AdminService, which might be a problem if corporate security rules do not allow that.
  • When there is a poison message, move it to the backout queue and notify somebody to handle it (e.g. via email)
    • If the queue contains only one message there is no generic way how to find out whether the problem is within the message or in some resource, the MDB would need to find this out and communicate it. If there are multiple messages and only one fails, we know it’s a poison message and it could be automatically removed by means of the “Backout threshold”. (Beware of the interaction between the message’s redelivery count/backout threashold and the “Number of sequential delivery failures…” – the letter is reset when either a message processing succeeds or when the MDB is stopped/restarted.)
  • (Perhaps we could use JMS selectorsJMS header propertiesrewrite of the selectors

MDB approach evaluation

Key MDB issues

  • Permissions to perform a WAS administration operation required.
  • Difficult to distinguish a poison message from a resource outage without additional logic when the queue contains only one element.
    • But see the batch job design below, which also requires to be able to distinguish these two types of problems.
  • Inefficient determination of MDB’s status for the delay reactivation logic: either polling its status regularly or watching the log with many unrelated messages that can’t be filtered out.

Key MDB advantages and disadvantages

  • Advantages
    • The data is processed and reaches the destination system soon after it is published
    • Key characteristics are configurable via UI (Number of sequential delivery failures, Backout threshold, MQ connection data, MQ security/SSL, processing concurrency, DataSource configuration/injection, …). Actually this is also a disadvantage due to needing an admin, see below
    • Logging configurable at the runtime (Java logging, levels can be set in WAS console)
  • Disadvantages
    • Any configuration requires a WAS administrator and thus lot of time due to the IBM bureaucracy and ceremony
    • Difficult to collect and communicate statistics for our Operations Dashboard because (a) there are frequent fine-grained changes instead of 1/day batch changes and (b) there is no support for the Job logging framework of ours (a possible but laborious solution is to gather statistics in an instance variable and log them in regular intervals into the job tables using JDBC and some code extracted from the job framework)
    • Necessary to somehow configure the reactivation EJB (the reactivation delay, number of attempts before giving up)

MDB design questions:

  • Do we need automated reactivation of a disabled MDB? Perhaps not if: (1) a resource outage happens rarely and/or (2) the administration team spots this automatically and can handle it automatically without any bureaucracy and consumption of our resources.

MDB resources

Essential docs

Other (not all docs available for our version, namely MQ v6 and WAs v7)

Approach 2: A batch job checking MQ regularly

A batch job is a command-line application that is regularly, for example once a day, run by cron and actively scans its incoming queue/topic for new messages and processes them all at once. All the JMS communication and management and configuration must be implemented from scratch. (Though utilities such as Spring JMS template may simplify it.)

Job error handling and configuration

Error handling

The problems and ways to deal with errors are the same as when running in an application server, only we have to do everything ourselves. That means we need to manually start a transaction, catch exception and commit/roll back and configure the topic/queue for poison messages.

We would need to implement a problem cause detection logic to distinguish whether there is a temporary resource outage or whether there is a problem with the message itself (either incorrect data type or data inconsistent with the target DB ). There is no other good way to deal with these distinct kinds of problems without really knowing which of them it is.

We would deal with the problems as follows:

  • For a resource outage, retry few times with an increasing delay, then quit and postpone the processing till the next scheduled execution
  • For a poison message, move it to the backout queue and notify an administrator

JMS configuration in a batch job

We have two ways to configure the JMS resources (a Topic and a (Topic)ConnectionFactory) and their options related to error handling:

  1. Use MQ provider-specific APIs to create the objects and configure them. See this JMS + MQ API example.
  2. Configure the provider-specific resources in JNDI and use only the standardized JNDI and JMS APIs. This is very easy with Sun’s filesystem-based JNDI provider (fscontext.jar and providerutil.jar) and vendor-supplied tools for generating the JNDI .bindings file for existing JMS resources . In the case of MQ you can do it in the MQ Explorer GUI or with the command-line utility JMSAdmin (a JMSAdmin example, another one).

You can create the JNDI configuration via the MQ Explorer wizards – after having added a new JNDI “context” using fscontext and a local directory – either by first creating the JMS resource and then letting the wizard generate an MQ resources for it and adjusting it as needed:

Or by creating the JMS resource from an existing MQ resource:

Provided that the FsContext configuration file .bindings produced by JMSAdmin/MQ Explorer is in the folder /etc/mqJndiconfig, we would connect to the MQ as follows:

final Hashtable<String, String> env = new Hashtable<String, String>();env.put(Context.INITIAL_CONTEXT_FACTORY, "com.sun.jndi.fscontext.RefFSContextFactory");env.put(Context.PROVIDER_URL, "file:/etc/mqJndiconfig");final InitialContext context = new InitialContext(env);ConnectionFactory qcf = (javax.jms.ConnectionFactory) context.lookup("MyConnectionFactory"); // Note: I set the channel property MCAUSER so it actually isn't necessary to supply credentials below:final Connection connection = qcf.createConnection("anna", "password ignored");// Client ID is necessary for a durable subscr.// We could alternatively set in on the ConnectionFactory - the CLIENTID  propertyconnection.setClientID("MyCoolApplication");final Session session = connection.createSession(true, -1); // the param 2 is ignored for durable subscr.final Topic destination = (Topic) context.lookup(JNDI_PREFIX + TOPIC);final MessageConsumer receiver = subscribe(session, destination);try {connection.start();} catch (javax.jms.JMSException e) {throw new RuntimeException("Failed to start the JMS connection", e);}

We would then read the messages via:

while ((msg= receiver.receiveNoWait()) != null) {handleMessage(msg);} 

The dependencies include jms.jar, fscontext.jar and providerutil.jar. You can find them either in the WMQ installation or on the web.

You may want to have a look at Writing a simple publish/subscribe application connecting through WebSphere MQ in WMQ help, which discusses some of the code above in more detail.

Job design

  • Set a reasonable backout threshold and a suitable backout queue on the queue used by the Topic so that problematic messages are removed automatically after few failed attempts
    • Some monitoring of the backout queue would be necessary. If the MQ infrastructure doesn’t provide it then we can implement another MQ/JMS reader that would send an email when there are some new messages in the queue.
    • Regarding the type of the topic queue:
      • Either we can use the shared JMS queue (SYSTEM.JMS.D.SUBSCRIBER.QUEUE) for the topic
      • Or we can use a non-shared queue unique for our application, which would be actually better and more aligned with IBM standards. It’s achieved by setting a broker durable subscription queue pattern in the form “SYSTEM.JMS.D..*” (notice the trailing *) on the JMS ConnectionFactory when defining it in JNDI or in the Java code. Upon the first connection a permanent dynamic queue is generated for the client. We can then set the backout options on it (or the administrators may define a model queue for these dynamic queues with this setting already applied).
  • Read and process each message in a new MQ transaction so that if the processing fails it will be put back into the queue (and its delivery count will be increased, thus making it perhaps eligible for the backout queue)
    • ISSUE: The failed message would be obtained again on the next read (because it goes to the front, not to the end of the subscriber’s queue) and thus we can’t process any other message before dealing with it. Possible solutions:
      1. Fail immediately, try again during the next regular run. If this happens several times in a row for a particular message then it will be moved to the backout queue by MQ (w.r.t. the settings above).
      2. Wait for a while such as 10-20m and try it again. If it still fails, continue as in #1.
    • Notice that DB transactions are not a part of the MQ transaction (unless we use some external transaction manager and XA resource managers, which would require more work) but that shouldn’t be a problem for us. If the DB tx fails then we will manually roll back the MQ tx. If the DB tx succeeds but later we fail to communicate the success to MQ then the message will stay in the queue and be processed again, which isn’t a big problem in our particular case. (

      Global transactions
      with DB operations being a part of the MQ tx are supported only either (1) for a “server application”(2) with an external XA tx manager, such as in WAS.)

Batch job approach evaluation

  • Advantages
    • Simple implementation offlexibledelayed retrials – upon a resource outage, end the job and try again during the next scheduled run or, perhaps, retry first after a manual delay (Thread.sleep(); beware connection timeouts).
    • Simple integration into our monitoring/logging framework incl. the Operations Dashboard.
  • Disadvantages
    • More coding to set up/clean the resources and handle errors, which is not trivial, and thus also more bug prone.
    • Concurrent processing of the messages would be considerably more difficult to implement correctly if possible at all (if MQ JMS does implement the necessary optional methods). We would need to use the advanced methods targeted at app. server vendors – there is a non-MQ example of a multithreaded (non-durable subscription) consumer. Hopefully it could be modified for a durable one using Connection.createDurableConnectionConsumer with the same simple implementation of ServerSessionPool.
      • Impl. details: The “pool” would always create a new custom ServerSession implementation. instance, whose getSession() would simply create a new transacted TopicSession, set its MessageListener, and run the session in a new Thread when start() called. Notice that (Topic)Session implements Runnable, whose run() invokes the provided listener sequentially for available messages. The listener would process a message and call commit/rollback on its owning session (see this transactional listener example
      • Important: Handling of failed messages would need to be considered carefully as returning a message to the queue would lead to its immediate re-processing and likely rejection by one of the other threads, exceeding its backout treshold in a few seconds and thus subverting the delayed retrial processing. On the other hand, as mentioned above, we should anyway be able to distinguish resource outage, in which case we would stop processing immediately, and a problematic message, which would anyway end up in the backout queue so this is perhaps not a real issue.
      • Note: When using MessageListeners, it’s important to set an ExceptionListener on the connection because some errors can only be communicated this way.

Summary and conclusion

The version and fixpack level of WMQ/WAS is very important.

Both the approaches have some pros and cons and it depends on the requirements and their priority which one would be more suitable.

MDB: It’s more difficult to implement delayed retrial if it is required – it may be implemented via a scheduled EJB automatically re-enabling the MDB stopped by WAS after a number of failures (one issue is that we’d need WAS admin rights for the EJB to do that; another is performance due to the need to either monitor logs or check the MDB’s status regularly). On the other hand, concurrent processing is available out of the box and also implementing a bean notifying about problematic messages in the backout queue is simpler thanks to the injection of the mail API resources. This solution may thus require some JMX and Java EE (scheduled EJB) magic and there may be unexpected problems with that.

JOB: Concurrency: it’s more difficult to implement processing of the messages in parallel, there is even a slight chance that it’s impossible. Also  more coding is required and thus there will be more bugs. On the other hand we can implement delayed retrials as we want. Thus if concurrent processing isn’t critical while the automatic delayed retrial may be then this would be the best approach.

 

From http://theholyjava.wordpress.com/2010/09/13/implementing-retrial-with-a-mdb-or-an-mq-batch-job-was-7-mq-6/