Анализ журналов очень полезен и позволяет организациям получать информацию из файлов журналов.
В этом посте (который является руководством для начинающих) я расскажу, как настроить «очередь сообщений» в Spring Boot, а затем использовать ее в качестве источника потока с помощью Flink. Этот шаблон учитывает системы с высокой степенью развязки, где один компонент делегирует ответственность за дальнейшую обработку другому компоненту асинхронным способом.
Весь код в этом примере доступен в GitHub .
Конвейер данных, используемый в этом примере:
1. Настройка Apache Log File Simulator
Для файлов журналов я использовал очень удобную утилиту для генерации поддельных файлов Apache . После его установки выполните следующую команду, чтобы запустить генератор файлов в бесконечном режиме
Оболочка
1
$ python apache-fake-log-gen.py -n 0 -o LOG
2. Настройка Logstash
Теперь, когда у нас есть заполняемые файлы, мы хотим, чтобы агент прочитал эти тексты журналов и отправил их в наше приложение Spring Boot . Logstash — это специальная утилита для таких случаев использования. Просто скачайте и распакуйте его; используйте следующий текст конфигурации и сохраните его как $ LOGSTASH / config / apache-log.yml .
* <app-hostname> — это IP-адрес или имя хоста, на котором вы запускаете приложение весенней загрузки.
YAML
xxxxxxxxxx
1
input
2
file
3
path => "/root/Fake-Apache-Log-Generator/*.log"
4
5
6
filter
8
grok
9
match => "message" => "%{COMBINEDAPACHELOG}"
10
11
12
output
14
http
15
url => "http://<app-hostname>:8080/logs"
16
http_method => "post"
17
18
Затем выполните следующую команду, чтобы запустить Logstash. Я протестировал версию 7.3.2.
Оболочка
x
1
$LOGSTASH/bin/logstash -f config/apache.yml
Вам также могут понравиться:
Анализ журнала приложений и визуализация данных .
3. Приложение Spring Boot для получения данных журнала из Logstash
Как и с другими функциями, настройка JMS очень проста в Spring Boot. Я следовал этой статье dzone и документации весенней загрузки . Я использую стандартную очередь сообщений ActiveMQ в памяти, как показано ниже, и выставляю ее как бин.
Джава
xxxxxxxxxx
1
2
3
4
public class Application
5
{
6
private static final String LOCAL_ACTIVE_MQ_URL = "vm://localhost?broker.persistent=false";
7
public static void main(String[] args)
9
{
10
SpringApplication.run(Application.class, args);
11
}
12
13
14
public Session mySession() throws JMSException
15
{
16
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(LOCAL_ACTIVE_MQ_URL);
17
factory.setTrustAllPackages( true );
18
Connection connection = factory.createConnection();
19
connection.start();
20
return connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
21
}
22
}
Затем я предоставляю конечную точку REST «http: // hostname: 8080 / logs» для получения сообщений журнала от Logstash и отправки их в очередь. Обратите внимание, что полезная нагрузка JSON из Logstash преобразуется в POJO (ApacheLogMessage.java) перед отправкой в очередь.
Джава
xxxxxxxxxx
1
2
"/logs") (
3
public class Controller
4
{
5
6
private JmsTemplate jmsTemplate;
7
8
public static final String QUEUE_NAME = "webserverlog";
9
10
11
public ResponseEntity<?> sendToQueue( ApacheLogMessage message)
12
{
13
jmsTemplate.convertAndSend(QUEUE_NAME, message );
14
15
return new ResponseEntity<>(HttpStatus.ACCEPTED);
16
}
17
}
4. Настройте Flink для использования очереди в качестве источника
AMQSource.java в пакете org.pd.streaming.application.queue расширяет Flink RichSourceFunction
и выступает в качестве источника. Он принимает consumer
объект типа MessageConsumer
и слушает сообщения. Перед отправкой во Flink я конвертирую его обратно в свой POJO для сообщения журнала Apache.
Джава
xxxxxxxxxx
1
2
public void run( SourceContext<ApacheLogMessage> ctx ) throws Exception
3
{
4
while( running )
5
{
6
Message m = consumer.receive();
7
8
ApacheLogMessage logMessage = (ApacheLogMessage)((ObjectMessage)m).getObject();
9
10
ctx.collect( logMessage );
11
}
12
}
Теперь, когда мы создали исходный код, давайте соберем их вместе и запустим среду исполнения Flink. В классе StreamProcess.java я сначала создаю Flink DataStream
из моего источника.
Джава
xxxxxxxxxx
1
Destination destination = mySession.createQueue(QUEUE_NAME);
2
MessageConsumer consumer = mySession.createConsumer( destination );
3
source = new AMQSource(consumer);
4
DataStream<ApacheLogMessage> dataStream = env.addSource( source );
Затем я строю конвейер данных Flink, как показано ниже:
Джава
xxxxxxxxxx
1
dataStream
2
.keyBy((KeySelector<ApacheLogMessage, String>) ApacheLogMessage::getClientip)
3
.timeWindow( Time.seconds( 10 ) )
4
.apply( new WindowFunction<ApacheLogMessage,Tuple2<String, Long>, String,TimeWindow>()
5
{
6
7
public void apply( String key, TimeWindow window,Iterable<ApacheLogMessage> input, Collector<Tuple2<String,Long>> out ) throws Exception
8
{
9
long count = 0;
10
for( ApacheLogMessage msg : input)
11
{
12
if ( HttpStatus.valueOf( msg.getResponse() ).is4xxClientError() )
13
{
14
count++;
15
}
16
}
17
out.collect( new Tuple2<>(key, count) );
18
}
19
})
20
.filter( new FilterFunction<Tuple2<String,Long>>()
21
{
22
23
public boolean filter( Tuple2<String,Long> value ) throws Exception
24
{
25
return value.f1 > 0;
26
}
27
})
28
.print();
Позвольте мне подробно объяснить, как работает приведенный выше код.
- Строка 2: разделите поток журнала, используя IP-адрес клиента в качестве ключа.
- Строка 3: создайте временное окно в 10 секунд.
- Строка 4-19: проверьте сообщения, в которых сервер сообщил код ответа 4xx (ошибки на стороне клиента). Верните Flink Tuple, содержащий IP-адрес клиента и количество.
- Строка 20-27: еще один Flink API для фильтрации тех кортежей, у которых не было ошибок 4xx.
Наконец, напечатайте эти кортежи (пример с консоли Eclipse).
Я надеюсь, что эта статья поможет вам начать анализ журналов с помощью Flink. В следующих публикациях я буду добавлять базу данных в качестве приемника и делиться некоторыми графиками Grafana, чтобы лучше визуализировать наши данные.