Моя команда и я создаем платформу сервисов на основе набора сервисов RESTful JSON, где каждый сервис вносит свой вклад в платформу, предоставляя различные функции и / или данные. Поскольку журналы создавались повсеместно, мы подумали, что было бы неплохо централизовать ведение журналов и, возможно, также предоставить элементарный просмотрщик журналов, который позволял нам просматривать, фильтровать, сортировать и искать наши журналы. Мы также хотели, чтобы наша запись в журнале была асинхронной, поскольку мы не хотели, чтобы наши службы удерживались при попытке записи журналов, скажем, возможно, непосредственно в базу данных.
Стратегия достижения этого была простой.
- Настройка ActiveMQ
- Создайте приложение log4j, которое записывает журналы в очередь (log4j поставляется с одним таким приложением, но позволяет писать наше собственное.
- Напишите прослушиватель сообщений, который читает журналы из настройки очереди JMS на сервере MQ и сохраняет их
Давайте посмотрим один за другим.
Настройка ActiveMQ
Настройка внешнего сервера ActiveMQ достаточно проста. Отличное руководство доступно по адресу http://servicebus.blogspot.com/2011/02/install-apache-active-mq-on-ubuntu.html, чтобы настроить его в Ubuntu. Вы также можете встроить брокер сообщений в свое приложение. Весна делает это легко. Посмотрим как позже.
Создание приложения JMS Lo4j
Сначала мы создаем JMS-приложение log4j. log4j поставляется с одним таким appender (который пишет в тему JMS вместо очереди)
import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.MessageProducer; import javax.jms.ObjectMessage; import javax.jms.Session; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.log4j.Appender; import org.apache.log4j.AppenderSkeleton; import org.apache.log4j.Logger; import org.apache.log4j.PatternLayout; import org.apache.log4j.spi.LoggingEvent; /** * JMSQueue appender is a log4j appender that writes LoggingEvent to a queue. * @author faheem * */ public class JMSQueueAppender extends AppenderSkeleton implements Appender{ private static Logger logger = Logger.getLogger("JMSQueueAppender"); private String brokerUri; private String queueName; @Override public void close() { } @Override public boolean requiresLayout() { return false; } @Override protected synchronized void append(LoggingEvent event) { try { ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory( this.brokerUri); // Create a Connection javax.jms.Connection connection = connectionFactory.createConnection(); connection.start();np // Create a Session Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE); // Create the destination (Topic or Queue) Destination destination = session.createQueue(this.queueName); // Create a MessageProducer from the Session to the Topic or Queue MessageProducer producer = session.createProducer(destination); producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); ObjectMessage message = session.createObjectMessage(new LoggingEventWrapper(event)); // Tell the producer to send the message producer.send(message); // Clean up session.close(); connection.close(); } catch (Exception e) { e.printStackTrace(); } } public void setBrokerUri(String brokerUri) { this.brokerUri = brokerUri; } public String getBrokerUri() { return brokerUri; } public void setQueueName(String queueName) { this.queueName = queueName; } public String getQueueName() { return queueName; } }
Давайте посмотрим, что здесь происходит.
Строка 19: Мы реализуем интерфейс приложения Log4J, который просит нас реализовать три метода. Требуется выложить , закрыть и добавить . На данный момент мы будем упрощать и реализовывать метод append, который вызывается при каждом вызове метода в логгере.
Строка 37: log4j вызывает метод append и передает объект LoggingEvent в качестве параметра, который представляет вызов для регистратора. Объект LoggingEvent инкапсулирует всю информацию о каждом элементе журнала.
Строки 41 и 42: создайте новую фабрику соединений, предоставив ей URI JMS, в нашем случае activemq, server
Строки 45, 46 и 49: мы устанавливаем соединение и сеанс с сервером JMS. Сессия может быть открыта в нескольких режимах. Auto_Acknowledge сессия один , в котором подтверждение сообщения происходит автоматически. Другие режимы включают Client_Acknowledge, в котором клиент должен явно подтвердить получение и / или обработку сообщения, и два других режима. Подробнее см. В документации по адресу http://download.oracle.com/javaee/1.4/api/javax/jms/Session.html.
Строка 52: создать очередь. Отправьте имя очереди для подключения в качестве параметра.
Строка 56: мы устанавливаем режим доставки Non_Persistent. Другой вариант — Постоянный, когда сообщение сохраняется в постоянном хранилище. Постоянный режим замедляется, но повышает надежность передачи сообщений.
Строка 58: мы делаем несколько вещей. Прежде всего я оборачиваю объект LoggingEvent в LoggingEventWrapper. Это связано с тем, что в объекте LoggingEvent есть некоторые свойства , которые нельзя сериализовать, а также потому, что я хочу получить некоторую дополнительную информацию, такую как IP-адрес и имя хоста. Затем, используя объект сеанса JMS, я подготавливаю объект (оболочку) для транспорта.
Строка 61: я отправляю объект в очередь.
Ниже приведен код для оболочки.
import java.io.Serializable; import java.net.InetAddress; import java.net.UnknownHostException; import org.apache.log4j.EnhancedPatternLayout; import org.apache.log4j.spi.LoggingEvent; /** * Logging Event Wraps a log4j LoggingEvent object. Wrapping is required by some information is lost * when the LoggingEvent is serialized. The idea is to extract all information required from the LoggingEvent * object, place it in the wrapper and then serialize the LoggingEventWrapper. This way all required data remains * available to us. * @author faheem * */ public class LoggingEventWrapper implements Serializable{ private static final String ENHANCED_PATTERN_LAYOUT = "%throwable"; private static final long serialVersionUID = 3281981073249085474L; private LoggingEvent loggingEvent; private Long timeStamp; private String level; private String logger; private String message; private String detail; private String ipAddress; private String hostName; public LoggingEventWrapper(LoggingEvent loggingEvent){ this.loggingEvent = loggingEvent; //Format event and set detail field EnhancedPatternLayout layout = new EnhancedPatternLayout(); layout.setConversionPattern(ENHANCED_PATTERN_LAYOUT); this.detail = layout.format(this.loggingEvent); } public Long getTimeStamp() { return this.loggingEvent.timeStamp; } public String getLevel() { return this.loggingEvent.getLevel().toString(); } public String getLogger() { return this.loggingEvent.getLoggerName(); } public String getMessage() { return this.loggingEvent.getRenderedMessage(); } public String getDetail() { return this.detail; } public LoggingEvent getLoggingEvent() { return loggingEvent; } public String getIpAddress() { try { return InetAddress.getLocalHost().getHostAddress(); } catch (UnknownHostException e) { return "Could not determine IP"; } } public String getHostName() { try { return InetAddress.getLocalHost().getHostName(); } catch (UnknownHostException e) { return "Could not determine Host Name"; } } }
Слушатель сообщений
Слушатель сообщений «слушает» очередь (или тему). Всякий раз, когда новое сообщение добавляется в очередь, вызывается метод onMessage .
import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.ObjectMessage; import org.apache.log4j.Logger; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component public class LogQueueListener implements MessageListener { public static Logger logger = Logger.getLogger(LogQueueListener.class); @Autowired private ILoggingService loggingService; public void onMessage( final Message message ) { if ( message instanceof ObjectMessage ) { try{ final LoggingEventWrapper loggingEventWrapper = (LoggingEventWrapper)((ObjectMessage) message).getObject(); loggingService.saveLog(loggingEventWrapper); } catch (final JMSException e) { logger.error(e.getMessage(), e); } catch (Exception e) { logger.error(e.getMessage(),e); } } } }
Строка 23: проверка, является ли объект, выбранный из очереди, экземпляром ObjectMessage
Строка 26: извлечение LoggingEventWrapper из
строки сообщения 27: вызов метода службы для сохранения журнала
Проводка весной
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p" xmlns:context="http://www.springframework.org/schema/context" xmlns:jms="http://www.springframework.org/schema/jms" xmlns:amq="http://activemq.apache.org/schema/core" xmlns:aop="http://www.springframework.org/schema/aop" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.0.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.5.0.xsd"> <!-- lets create an embedded ActiveMQ Broker --> <!-- uncomment the tag below only if you need to create an embedded broker --> <!-- amq:broker useJmx="false" persistent="false"> <amq:transportConnectors> <amq:transportConnector uri="tcp://localhost:61616" /> </amq:transportConnectors> </amq:broker--> <!-- ActiveMQ destinations to use --> <amq:queue id="destination" physicalName="logQueue" /> <!-- JMS ConnectionFactory to use, configuring the embedded broker using XML --> <amq:connectionFactory id="jmsFactory" brokerURL="tcp://localhost:61616" /> <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> <constructor-arg ref="jmsFactory" /> <property name="exceptionListener" ref="JMSExceptionListener" /> <property name="sessionCacheSize" value="100" /> </bean> <!-- Spring JMS Template --> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <constructor-arg ref="connectionFactory" /> </bean> <!-- listener container definition using the jms namespace, concurrency is the max number of concurrent listeners that can be started --> <jms:listener-container concurrency="10"> <jms:listener id="QueueListener" destination="logQueue" ref="logQueueListener" /> </jms:listener-container> </beans>
Строки 5-9: Используйте тег broker для настройки встроенного брокера сообщений. Поскольку я пользуюсь внешним, он мне не нужен.
Строка 12: Укажите имя очереди, к которой вы хотите подключиться.
Строка 14: URI Брокерского сервера.
Строка 15-19: настройка фабрики соединений
Строка 26-28: настройка прослушивателя сообщений, где мы указываем количество одновременных потоков, которые могут получать сообщения из очереди.
Конечно, приведенный выше пример не будет работать из коробки. Вам все еще нужно включить все зависимости JMS и реализовать службу, которая сохраняет журналы. Но я надеюсь, что это даст вам достойную идею.