Статьи

Объединять и индексировать данные в Elasticsearch с помощью Logstash, JDBC

Вступление

В моих предыдущих публикациях здесь и здесь я показал вам, как индексировать данные в Elasticsearch из базы данных SQL с помощью библиотеки импорта JDBC JDBC и Elasticsearch. В первой статье здесь я упомянул некоторые недостатки использования библиотеки импортера, которые я скопировал здесь:

  • Нет поддержки ES версии 5 и выше
  • Существует возможность дублирования объектов в массиве вложенных объектов. Но дедупликация может быть обработана на прикладном уровне.
  • Возможна задержка поддержки последних версий ES.

Все вышеперечисленные недостатки можно преодолеть с помощью Logstash и его следующих плагинов:

  • Плагин JDBC Input — для чтения данных из базы данных SQL с использованием JDBC
  • Плагин Aggregate Filter — для объединения строк из базы данных SQL во вложенные объекты.

Создание индекса Elasticsearch

Я буду использовать последнюю версию ES, то есть 5.63, которую можно скачать с сайта Elasticsearch здесь . Мы создадим индекс world_v2, используя доступное здесь отображение.

1
2
$ curl -XPUT --header "Content-Type: application/json"
    http://localhost:9200/world_v2 -d @world-index.json

или используя REST-клиент Postman, как показано ниже:

Чтобы подтвердить, что индекс был успешно создан, откройте этот URL-адрес http: // localhost: 9200 / world_v2 в браузере, чтобы получить что-то похожее, как показано ниже:

Создание файла конфигурации Logstash

Мы должны выбрать эквивалентную версию logstash, которая будет 5.6.3, и ее можно скачать отсюда . Затем нам нужно установить плагин ввода JDBC, плагин Aggregate filter и плагин Elasticsearch output, используя следующие команды:

1
2
3
bin/logstash-plugin install logstash-input-jdbc
bin/logstash-plugin install logstash-filter-aggregate
bin/logstash-plugin install logstash-output-elasticsearch

Нам нужно скопировать следующее в каталог bin, чтобы иметь возможность запустить нашу конфигурацию, которую мы определим следующим:

  1. Загрузите банку MySQL JDBC отсюда .
  2. Загрузите файл, содержащий запрос SQL для получения данных, отсюда .

Мы скопируем вышеупомянутое в каталог bin Logstash или любой каталог, где у вас будет файл конфигурации logstash, потому что мы ссылаемся на эти два файла в конфигурации, используя их относительные пути. Ниже приведен файл конфигурации Logstash:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
input {
  jdbc {
    jdbc_connection_string => "jdbc:mysql://localhost:3306/world"
    jdbc_user => "root"
    jdbc_password => "mohamed"
    # The path to downloaded jdbc driver
    jdbc_driver_library => "mysql-connector-java-5.1.6.jar"
    jdbc_driver_class => "Java::com.mysql.jdbc.Driver"
    # The path to the file containing the query
    statement_filepath => "world-logstash.sql"
  }
}
filter {
  aggregate {
    task_id => "%{code}"
    code => "
      map['code'] = event.get('code')
      map['name'] = event.get('name')
      map['continent'] = event.get('continent')
      map['region'] = event.get('region')
      map['surface_area'] = event.get('surface_area')
      map['year_of_independence'] = event.get('year_of_independence')
      map['population'] = event.get('population')
      map['life_expectancy'] = event.get('life_expectancy')
      map['government_form'] = event.get('government_form')
      map['iso_code'] = event.get('iso_code')
  
      map['capital'] = {
        'id' => event.get('capital_id'),
        'name' => event.get('capital_name'),
        'district' => event.get('capital_district'),
        'population' => event.get('capital_population')
      }
  
      map['cities_list'] ||= []
      map['cities'] ||= []
      if (event.get('cities_id') != nil)
        if !( map['cities_list'].include? event.get('cities_id') )
          map['cities_list'] << event.get('cities_id')
  
          map['cities'] << {
            'id' => event.get('cities_id'),
            'name' => event.get('cities_name'),
            'district' => event.get('cities_district'),
            'population' => event.get('cities_population')
          }
        end
      end
  
      map['languages_list'] ||= []
      map['languages'] ||= []
      if (event.get('languages_language') != nil)
        if !( map['languages_list'].include? event.get('languages_language') )
          map['languages_list'] << event.get('languages_language')
  
          map['languages'] << {
            'language' => event.get('languages_language'),
            'official' => event.get('languages_official'),
            'percentage' => event.get('languages_percentage')
          }
        end
      end
      event.cancel()
    "
    push_previous_map_as_event => true
    timeout => 5
  }
  mutate {
    remove_field => ["cities_list", "languages_list"]
  }
}
output {
  elasticsearch {
    document_id => "%{code}"
    document_type => "world"
    index => "world_v2"
    codec => "json"
    hosts => ["127.0.0.1:9200"]
  }
}

Мы помещаем файл конфигурации в каталог bin logstash. Мы запускаем конвейер logstash, используя следующую команду:

1
$ logstash -w 1 -f world-logstash.conf

Мы используем 1 работника, потому что несколько работников могут разбить агрегацию, поскольку агрегация происходит на основе последовательности событий, имеющих общий код страны. Мы увидим следующий вывод об успешном завершении конвейера logstash:

Откройте следующий URL-адрес http: // localhost: 9200 / world_v2 / world / IND в браузере, чтобы просмотреть информацию об Индии, проиндексированную в Elasticsearch, как показано ниже:

Опубликовано на Java Code Geeks с разрешения Мохамеда Санауллы, партнера нашей программы JCG . См. Оригинальную статью здесь: агрегирование и индексирование данных в Elasticsearch с использованием Logstash, JDBC

Мнения, высказанные участниками Java Code Geeks, являются их собственными.