О части 1
В первой части статьи я описал сценарий по частям. В этой части я опишу, как можно запрашивать документы в Elasticsearch, а также представлю полный сценарий.
Получение данных из Elasticsearch
Как вы, возможно, знаете, Logstash обычно используется вместе с двумя другими инструментами, а именно Elasticsearch и Kibana, причем эти три обычно называют «стеком инструментов ELK». В большинстве случаев информация журнала, анализируемая Logstash, сохраняется как документы в Elasticsearch, который является высокопроизводительной поисковой системой.
В нашем случае мы также подключаем секцию ‘output’ скрипта к экземпляру Elasticsearch. При анализе ввода созданные документы будут сохранены в экземпляре Elasticsearch. Как только информация журнала сохраняется в Elasticsearch, доступ к информации становится более легким, поскольку мы можем использовать интерфейс «поиск» вместо необходимости обходить длинные текстовые файлы. Например, мы можем легко сгенерировать отчет обо всех сообщениях «ОШИБКА» за последний месяц, вызвав поисковый запрос по индексу Elasticsearch, используя атрибут документа «logLevel». Пример запроса представлен во фрагменте 12.
{
"query": {
"filtered": {
"filter": {
"query": {
"range": {
"logTime": {
"gte": "2015-04-09 00:00:00",
"lte": "2015-04-09 23:59:00"
}
}
}
},
"query": {
"match": {
"logLevel": "ERROR"
}
}
}
}
}
Фрагмент 12: запрос Elasticsearch
Важным аспектом процесса клонирования является то, что, поскольку мы создаем новые записи из клонированной записи, новые записи будут иметь ту же метку времени, что и исходная запись. Таким образом, мы можем не только искать исходную запись, используя ее временную метку, мы также можем получать все записи, созданные из нее, используя ту же самую временную метку. Чтобы отличить исходную запись от последующих записей, мы добавили поле с именем «recordType». Значение ‘recordType’ равно ‘mater’ для исходной записи и ‘child’ для записей, созданных из клонированной записи. Запрос Elasticsearch, используемый для извлечения таких записей, представлен во фрагменте 13.
{
"query": {
"filtered": {
"filter": {
"query": {
"range": {
"logTime": {
"gte": "2015-04-09 00:00:00",
"lte": "2015-04-09 23:59:00"
}
}
}
},
"query": {
"bool": {
"must": [
{ "match" : { "logLevel": "ERROR" } },
{ "match" : { "recordType": "child" } }
]
}
}
}
}
}
Фрагмент 13: запрос Elasticsearch, соответствующий нескольким полям
Полный сценарий
Полный сценарий, представленный в фрагменте 14, который выполняет следующее:
- Создает одну запись из нескольких физических строк трассировки стека.
- Клонирует запись для дальнейшей обработки.
- Клонированная запись разбивается на несколько записей, каждая из которых содержит детальную информацию.
input {
file {
path => "/input/foolproof/input2.log"
type => "foolproof_log"
sincedb_path => "/input/foolproof/input2.sincedb"
start_position => "beginning"
}
tcp {
type => "foolproof_log"
port => 9999
}
}
filter {
if [type] == "foolproof_log" {
if "cloned" not in [tags] {
if [message] == "" {
drop { }
}
# read 'sync completed' info message
grok {
patterns_dir => "./patterns"
match => { "message" => "%{CUSTOM_LEADER_TEXT_INFO:logLevel}[,]%{SPACE}%{CUSTOM_TIMESTAMP:logTimeString}[,]%{SPACE}%{CUSTOM_LABEL_SYNC_OPERATION_COMPLETED}%{SPACE}%{NUMBER:moduleID}%{GREEDYDATA:extraText}" }
add_field => { "subType" => "infoSyncCompleted" }
remove_tag => [ "_grokparsefailure" ]
}
if "_grokparsefailure" not in [tags] {
mutate {
gsub => [
"logTimeString", ",", ";"
]
}
date {
locale => "en"
match => [ "logTimeString", "YYYY-MM-dd HH:mm:ss" ]
#timezone => "Asia/Calcutta"
target => "logTime"
}
}
if "_grokparsefailure" in [tags] {
multiline {
patterns_dir => "./patterns"
pattern => "(%{CUSTOM_ERROR_LABEL_1},%{SPACE}%{TIMESTAMP_ISO8601},%{SPACE}%{GREEDYDATA})|(%{CUSTOM_ERROR_LABEL_2}%{SPACE}%{TIMESTAMP_ISO8601}%{SPACE}%{GREEDYDATA})|(%{TIMESTAMP_ISO8601}%{SPACE}%{CUSTOM_MAIN_LABEL}%{SPACE}%{GREEDYDATA})"
negate => true
what => previous
}
grok {
patterns_dir => "./patterns"
match => [ "message", "%{CUSTOM_ERROR_LABEL_1:logLevel},%{SPACE}%{TIMESTAMP_ISO8601:logTimeString},%{SPACE}%{GREEDYDATA:mergedText}" ]
remove_tag => [ "_grokparsefailure" ]
add_field => { "recordType" => "master" }
add_field => { "inputType" => "foolproof_log" }
}
# mergedText can be removed as we will be reading from the 'message' field
mutate {
remove_field => [ "mergedText" ]
}
mutate {
gsub => [ "logTimeString", ",", ";" ]
}
date {
locale => "en"
match => [ "logTimeString", "YYYY-MM-dd HH:mm:ss" ]
#timezone => "Asia/Calcutta"
target => "logTime"
}
## should we remove \n, \r and \t from 'message' field as well?
## removal can make it easy for search. These characters need
## to be replaced by some other character like # that does not occur in the message
# clone the multiline, so that we can split in into constituent parts later
clone {
clones => [ "cloned" ]
}
}
}
}
# split the 'cloned' multiline into separate parts for detailed processing
if [type] == "cloned" {
if [message] == "" {
drop { }
}
if [logLevel] == "ERROR" {
# this pattern patches the pattern of the multiline
grok {
patterns_dir => "./patterns"
match => [ "message", "%{CUSTOM_ERROR_LABEL_1},%{SPACE}%{TIMESTAMP_ISO8601},%{SPACE}%{GREEDYDATA:mergedMessage}" ]
remove_tag => [ "_grokparsefailure" ]
remove_field => [ "message" ]
remove_tag => [ "multiline" ]
remove_field => [ "recordType" ]
}
mutate {
add_field => { "recordType" => "child" }
}
## we remove \n, \r and \t from 'mergedMessage' field.
## We remove these characters by replacing them by suitable alternative.
## we will then use the substituted chracter for splitting the multiline message into
## constituent messages
mutate {
gsub => [
"mergedMessage", "\r\n", "#",
"mergedMessage", "\n\n", "#",
"mergedMessage", "\r\n\t", "#",
"mergedMessage", "\n\t", "#",
"mergedMessage", "\n ", "#",
"mergedMessage", "\n ", "#",
"mergedMessage", "\n", "#"
]
}
split {
field => "mergedMessage"
target => "splitText"
terminator => "#"
}
# parse business unit id line
grok {
patterns_dir => "./patterns"
match => { "splitText" => "%{CUSTOM_LEADER_TEXT_ERROR_OCCURRED_FOR}[:]%{SPACE}%{CUSTOM_TEXT_2:messge1}[.][.]%{SPACE}%{CUSTOM_LABEL_BUSINESS_UNIT_ID}%{SPACE}[=]%{SPACE}%{NUMBER:businessUnitID}%{SPACE}%{CUSTOM_LABEL_SDM_SOURCE}%{SPACE}[=]%{SPACE}%{IP:sourceIPAddress}%{GREEDYDATA:errorText}" }
add_field => { "subType" => "errorOccurredFor" }
remove_tag => [ "_grokparsefailure" ]
}
if "_grokparsefailure" not in [tags] {
mutate {
convert => [ "businessUnitID", "integer" ]
}
}
# read first error line
if "_grokparsefailure" in [tags] {
grok {
patterns_dir => "./patterns"
match => { "splitText" => "%{SPACE}%{CUSTOM_ERROR}%{GREEDYDATA:messageText}" }
add_field => { "subType" => "errorReason" }
remove_tag => [ "_grokparsefailure" ]
}
}
# read error stack details
if "_grokparsefailure" in [tags] {
grok {
patterns_dir => "./patterns"
match => { "splitText" => "%{CUSTOM_LEADER_TEXT_ERROR_DETAILS_STACK}[:]%{SPACE}%{GREEDYDATA:errorText}" }
add_field => { "subType" => "stackTraceStart" }
remove_tag => [ "_grokparsefailure" ]
}
}
# read error source details
if "_grokparsefailure" in [tags] {
grok {
patterns_dir => "./patterns"
match => { "splitText" => "%{CUSTOM_LEADER_TEXT_ERROR_SOURCE}[:]%{SPACE}%{GREEDYDATA:errorSource}" }
add_field => { "subType" => "errorSource" }
remove_tag => [ "_grokparsefailure" ]
}
}
# read application specific stack line
if "_grokparsefailure" in [tags] {
grok {
patterns_dir => "./patterns"
match => { "splitText" => "%{SPACE}%{CUSTOM_LEADER_TEXT_APPLICATION_PACKAGE_APP}%{GREEDYDATA:messageText}" }
add_field => { "subType" => "stackTraceApp" }
remove_tag => [ "_grokparsefailure" ]
}
}
# read other stack lines
if "_grokparsefailure" in [tags] {
grok {
patterns_dir => "./patterns"
match => { "splitText" => "%{SPACE}%{CUSTOM_LEADER_TEXT_APPLICATION_PACKAGE_AT}%{SPACE}%{GREEDYDATA:messageText}" }
add_field => { "subType" => "stackTraceOther" }
remove_tag => [ "_grokparsefailure" ]
}
}
}
}
}
output {
stdout {
codec => "rubydebug"
}
file {
codec => "json"
flush_interval => 0 # 0 - every message; 2 - default
path => "/output/out-foolproof-json.txt"
}
elasticsearch {
host => "localhost"
index => "foolproof"
}
}
Фрагмент 14: полный сценарий
Вывод
Если несколько строк данных из файла журнала хранятся в виде одного блока текста, как это делается плагином «multiline», извлечение информации из блока текста становится затруднительным. Любая бизнес-логика, аналитические алгоритмы или отчеты, которые нуждаются в информации, должны будут реализовывать логику обработки текста, что затрудняет обслуживание приложений.
Чтобы преодолеть эту проблему, я представил метод, использующий конструкции Logstash, который позволяет нам не только сохранять исходную запись, созданную «multiline», как она есть, но и создавать из нее более значимые и детализированные записи. Такие детальные записи могут быть легко обработаны с помощью алгоритмов аналитики или инструментов отчетности.
об авторе
Он является частью группы Future and Emerging Technologies. Группа исследует новые и появляющиеся технологии, включая 3D-печать, носимые вычисления, аналитику и дополненную реальность. Группа создает сервисные линии для предприятий, использующих эти технологии. Ранее он возглавлял Центр высокопроизводительных вычислений (CoE) и CoE по большим данным в группе.