Статьи

Создание значимых документов из журналов приложений, охватывающих несколько строк — часть 2

О части 1

В первой части статьи я описал сценарий по частям. В этой части я опишу, как можно запрашивать документы в Elasticsearch, а также представлю полный сценарий.

Получение данных из Elasticsearch

Как вы, возможно, знаете, Logstash обычно используется вместе с двумя другими инструментами, а именно Elasticsearch и Kibana, причем эти три обычно называют «стеком инструментов ELK». В большинстве случаев информация журнала, анализируемая Logstash, сохраняется как документы в Elasticsearch, который является высокопроизводительной поисковой системой.

В нашем случае мы также подключаем секцию ‘output’ скрипта к экземпляру Elasticsearch. При анализе ввода созданные документы будут сохранены в экземпляре Elasticsearch. Как только информация журнала сохраняется в Elasticsearch, доступ к информации становится более легким, поскольку мы можем использовать интерфейс «поиск» вместо необходимости обходить длинные текстовые файлы. Например, мы можем легко сгенерировать отчет обо всех сообщениях «ОШИБКА» за последний месяц, вызвав поисковый запрос по индексу Elasticsearch, используя атрибут документа «logLevel». Пример запроса представлен во фрагменте 12.

{
  "query": {
    "filtered": {
      "filter": {
        "query": {
          "range": {
            "logTime": {
              "gte": "2015-04-09 00:00:00",
              "lte": "2015-04-09 23:59:00"
            }
          }
        }
      },
      "query": {
        "match": {
          "logLevel": "ERROR"
        }
      }
    }
  }
}

Фрагмент 12: запрос Elasticsearch

Важным аспектом процесса клонирования является то, что, поскольку мы создаем новые записи из клонированной записи, новые записи будут иметь ту же метку времени, что и исходная запись. Таким образом, мы можем не только искать исходную запись, используя ее временную метку, мы также можем получать все записи, созданные из нее, используя ту же самую временную метку. Чтобы отличить исходную запись от последующих записей, мы добавили поле с именем «recordType». Значение ‘recordType’ равно ‘mater’ для исходной записи и ‘child’ для записей, созданных из клонированной записи. Запрос Elasticsearch, используемый для извлечения таких записей, представлен во фрагменте 13.

{
  "query": {
    "filtered": {
      "filter": {
        "query": {
          "range": {
            "logTime": {
              "gte": "2015-04-09 00:00:00",
              "lte": "2015-04-09 23:59:00"
            }
          }
        }
      },
      "query": {
        "bool": {
          "must": [
            { "match" : { "logLevel": "ERROR" } },
            { "match" : { "recordType": "child" } }
          ]
        }
      }
    }
  }
}

Фрагмент 13: запрос Elasticsearch, соответствующий нескольким полям

Полный сценарий

Полный сценарий, представленный в фрагменте 14, который выполняет следующее:

  1. Создает одну запись из нескольких физических строк трассировки стека.
  2. Клонирует запись для дальнейшей обработки.
  3. Клонированная запись разбивается на несколько записей, каждая из которых содержит детальную информацию.
input {
  file {
    path => "/input/foolproof/input2.log"
    type => "foolproof_log"
    sincedb_path => "/input/foolproof/input2.sincedb"
    start_position => "beginning"
  }
  tcp {
    type => "foolproof_log"
    port => 9999
  }
}

filter {
  if [type] == "foolproof_log" {
    if "cloned" not in [tags] {
      if [message] == "" {
        drop { }
      }

      # read 'sync completed' info message
      grok {
        patterns_dir => "./patterns"
        match => { "message" => "%{CUSTOM_LEADER_TEXT_INFO:logLevel}[,]%{SPACE}%{CUSTOM_TIMESTAMP:logTimeString}[,]%{SPACE}%{CUSTOM_LABEL_SYNC_OPERATION_COMPLETED}%{SPACE}%{NUMBER:moduleID}%{GREEDYDATA:extraText}" }
        add_field => { "subType" => "infoSyncCompleted" }
        remove_tag => [ "_grokparsefailure" ]
      }

      if "_grokparsefailure" not in [tags] {
        mutate {
          gsub => [
            "logTimeString", ",", ";"
          ]
        }
        date {
          locale => "en"
          match => [ "logTimeString", "YYYY-MM-dd HH:mm:ss" ]
          #timezone => "Asia/Calcutta"
          target => "logTime"
        }
      }

      if "_grokparsefailure" in [tags] {
        multiline {
          patterns_dir => "./patterns"
          pattern => "(%{CUSTOM_ERROR_LABEL_1},%{SPACE}%{TIMESTAMP_ISO8601},%{SPACE}%{GREEDYDATA})|(%{CUSTOM_ERROR_LABEL_2}%{SPACE}%{TIMESTAMP_ISO8601}%{SPACE}%{GREEDYDATA})|(%{TIMESTAMP_ISO8601}%{SPACE}%{CUSTOM_MAIN_LABEL}%{SPACE}%{GREEDYDATA})"
          negate => true
          what => previous
        }

        grok {
          patterns_dir => "./patterns"
          match => [ "message", "%{CUSTOM_ERROR_LABEL_1:logLevel},%{SPACE}%{TIMESTAMP_ISO8601:logTimeString},%{SPACE}%{GREEDYDATA:mergedText}" ]
          remove_tag => [ "_grokparsefailure" ]
          add_field => { "recordType" => "master" }
          add_field => { "inputType" => "foolproof_log" }
        }

        # mergedText can be removed as we will be reading from the 'message' field
        mutate {
          remove_field => [ "mergedText" ]
        }

        mutate {
          gsub => [ "logTimeString", ",", ";" ]
        }
        date {
          locale => "en"
          match => [ "logTimeString", "YYYY-MM-dd HH:mm:ss" ]
          #timezone => "Asia/Calcutta"
          target => "logTime"
        }

        ## should we remove \n, \r and \t from 'message' field as well?
        ## removal can make it easy for search. These characters need
        ## to be replaced by some other character like # that does not occur in the message

        # clone the multiline, so that we can split in into constituent parts later
        clone {
          clones => [ "cloned" ]
        }
      }
    }
  }

  # split the 'cloned' multiline into separate parts for detailed processing
  if [type] == "cloned" {
    if [message] == "" {
      drop { }
    }

    if [logLevel] == "ERROR" {
      # this pattern patches the pattern of the multiline
      grok {
        patterns_dir => "./patterns"
        match => [ "message", "%{CUSTOM_ERROR_LABEL_1},%{SPACE}%{TIMESTAMP_ISO8601},%{SPACE}%{GREEDYDATA:mergedMessage}" ]
        remove_tag => [ "_grokparsefailure" ]
        remove_field => [ "message" ]
        remove_tag => [ "multiline" ]
        remove_field => [ "recordType" ]
      }

      mutate {
        add_field => { "recordType" => "child" }
      }

      ## we remove \n, \r and \t from 'mergedMessage' field.
      ## We remove these characters by replacing them by suitable alternative.
      ## we will then use the substituted chracter for splitting the multiline message into
      ## constituent messages
      mutate {
        gsub => [
          "mergedMessage", "\r\n", "#",
          "mergedMessage", "\n\n", "#",
          "mergedMessage", "\r\n\t", "#",
          "mergedMessage", "\n\t", "#",
          "mergedMessage", "\n    ", "#",
          "mergedMessage", "\n   ", "#",
          "mergedMessage", "\n", "#"
        ]
      }

      split {
        field => "mergedMessage"
        target => "splitText"
        terminator => "#"
      }

      # parse business unit id line
      grok {
        patterns_dir => "./patterns"
        match => { "splitText" => "%{CUSTOM_LEADER_TEXT_ERROR_OCCURRED_FOR}[:]%{SPACE}%{CUSTOM_TEXT_2:messge1}[.][.]%{SPACE}%{CUSTOM_LABEL_BUSINESS_UNIT_ID}%{SPACE}[=]%{SPACE}%{NUMBER:businessUnitID}%{SPACE}%{CUSTOM_LABEL_SDM_SOURCE}%{SPACE}[=]%{SPACE}%{IP:sourceIPAddress}%{GREEDYDATA:errorText}" }
        add_field => { "subType" => "errorOccurredFor" }
        remove_tag => [ "_grokparsefailure" ]
      }
      if "_grokparsefailure" not in [tags] {
        mutate {
          convert => [ "businessUnitID", "integer" ]
        }
      }

      # read first error line
      if "_grokparsefailure" in [tags] {
        grok {
          patterns_dir => "./patterns"
          match => { "splitText" => "%{SPACE}%{CUSTOM_ERROR}%{GREEDYDATA:messageText}" }
          add_field => { "subType" => "errorReason" }
          remove_tag => [ "_grokparsefailure" ]
        }
      }

      # read error stack details
      if "_grokparsefailure" in [tags] {
        grok {
          patterns_dir => "./patterns"
          match => { "splitText" => "%{CUSTOM_LEADER_TEXT_ERROR_DETAILS_STACK}[:]%{SPACE}%{GREEDYDATA:errorText}" }
          add_field => { "subType" => "stackTraceStart" }
          remove_tag => [ "_grokparsefailure" ]
        }
      }
      # read error source details
      if "_grokparsefailure" in [tags] {
        grok {
          patterns_dir => "./patterns"
          match => { "splitText" => "%{CUSTOM_LEADER_TEXT_ERROR_SOURCE}[:]%{SPACE}%{GREEDYDATA:errorSource}" }
          add_field => { "subType" => "errorSource" }
          remove_tag => [ "_grokparsefailure" ]
        }
      }
      # read application specific stack line
      if "_grokparsefailure" in [tags] {
        grok {
          patterns_dir => "./patterns"
          match => { "splitText" => "%{SPACE}%{CUSTOM_LEADER_TEXT_APPLICATION_PACKAGE_APP}%{GREEDYDATA:messageText}" }
          add_field => { "subType" => "stackTraceApp" }
          remove_tag => [ "_grokparsefailure" ]
        }
      }

      # read other stack lines
      if "_grokparsefailure" in [tags] {
        grok {
          patterns_dir => "./patterns"
          match => { "splitText" => "%{SPACE}%{CUSTOM_LEADER_TEXT_APPLICATION_PACKAGE_AT}%{SPACE}%{GREEDYDATA:messageText}" }
          add_field => { "subType" => "stackTraceOther" }
          remove_tag => [ "_grokparsefailure" ]
        }
      }
    }
  }
}

output {
  stdout {
    codec => "rubydebug"
  }

  file {
    codec => "json"
    flush_interval => 0 # 0 - every message; 2 - default
    path => "/output/out-foolproof-json.txt"
  }

  elasticsearch {
    host => "localhost"
    index => "foolproof"
  }
}

Фрагмент 14: полный сценарий

Вывод

Если несколько строк данных из файла журнала хранятся в виде одного блока текста, как это делается плагином «multiline», извлечение информации из блока текста становится затруднительным. Любая бизнес-логика, аналитические алгоритмы или отчеты, которые нуждаются в информации, должны будут реализовывать логику обработки текста, что затрудняет обслуживание приложений.

Чтобы преодолеть эту проблему, я представил метод, использующий конструкции Logstash, который позволяет нам не только сохранять исходную запись, созданную «multiline», как она есть, но и создавать из нее более значимые и детализированные записи. Такие детальные записи могут быть легко обработаны с помощью алгоритмов аналитики или инструментов отчетности.

об авторе

Он является частью группы Future and Emerging Technologies. Группа исследует новые и появляющиеся технологии, включая 3D-печать, носимые вычисления, аналитику и дополненную реальность. Группа создает сервисные линии для предприятий, использующих эти технологии. Ранее он возглавлял Центр высокопроизводительных вычислений (CoE) и CoE по большим данным в группе.