Вступление
В моих предыдущих публикациях здесь и здесь я показал вам, как индексировать данные в Elasticsearch из базы данных SQL с помощью библиотеки импорта JDBC JDBC и Elasticsearch. В первой статье здесь я упомянул некоторые недостатки использования библиотеки импортера, которые я скопировал здесь:
- Нет поддержки ES версии 5 и выше
- Существует возможность дублирования объектов в массиве вложенных объектов. Но дедупликация может быть обработана на прикладном уровне.
- Возможна задержка поддержки последних версий ES.
Все вышеперечисленные недостатки можно преодолеть с помощью Logstash и его следующих плагинов:
- Плагин JDBC Input — для чтения данных из базы данных SQL с использованием JDBC
- Плагин Aggregate Filter — для объединения строк из базы данных SQL во вложенные объекты.
Создание индекса Elasticsearch
Я буду использовать последнюю версию ES, то есть 5.63, которую можно скачать с сайта Elasticsearch здесь . Мы создадим индекс world_v2, используя доступное здесь отображение.
1
2
|
$ curl -XPUT --header "Content-Type: application/json" http: //localhost:9200/world_v2 -d @world-index.json |
или используя REST-клиент Postman, как показано ниже:
Чтобы подтвердить, что индекс был успешно создан, откройте этот URL-адрес http: // localhost: 9200 / world_v2 в браузере, чтобы получить что-то похожее, как показано ниже:
Создание файла конфигурации Logstash
Мы должны выбрать эквивалентную версию logstash, которая будет 5.6.3, и ее можно скачать отсюда . Затем нам нужно установить плагин ввода JDBC, плагин Aggregate filter и плагин Elasticsearch output, используя следующие команды:
1
2
3
|
bin/logstash-plugin install logstash-input-jdbc bin/logstash-plugin install logstash-filter-aggregate bin/logstash-plugin install logstash-output-elasticsearch |
Нам нужно скопировать следующее в каталог bin, чтобы иметь возможность запустить нашу конфигурацию, которую мы определим следующим:
- Загрузите банку MySQL JDBC отсюда .
- Загрузите файл, содержащий запрос SQL для получения данных, отсюда .
Мы скопируем вышеупомянутое в каталог bin Logstash или любой каталог, где у вас будет файл конфигурации logstash, потому что мы ссылаемся на эти два файла в конфигурации, используя их относительные пути. Ниже приведен файл конфигурации Logstash:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
|
input { jdbc { jdbc_connection_string => "jdbc:mysql://localhost:3306/world" jdbc_user => "root" jdbc_password => "mohamed" # The path to downloaded jdbc driver jdbc_driver_library => "mysql-connector-java-5.1.6.jar" jdbc_driver_class => "Java::com.mysql.jdbc.Driver" # The path to the file containing the query statement_filepath => "world-logstash.sql" } } filter { aggregate { task_id => "%{code}" code => " map[ 'code' ] = event.get( 'code' ) map[ 'name' ] = event.get( 'name' ) map[ 'continent' ] = event.get( 'continent' ) map[ 'region' ] = event.get( 'region' ) map[ 'surface_area' ] = event.get( 'surface_area' ) map[ 'year_of_independence' ] = event.get( 'year_of_independence' ) map[ 'population' ] = event.get( 'population' ) map[ 'life_expectancy' ] = event.get( 'life_expectancy' ) map[ 'government_form' ] = event.get( 'government_form' ) map[ 'iso_code' ] = event.get( 'iso_code' ) map[ 'capital' ] = { 'id' => event.get( 'capital_id' ), 'name' => event.get( 'capital_name' ), 'district' => event.get( 'capital_district' ), 'population' => event.get( 'capital_population' ) } map[ 'cities_list' ] ||= [] map[ 'cities' ] ||= [] if (event.get( 'cities_id' ) != nil) if !( map[ 'cities_list' ].include? event.get( 'cities_id' ) ) map[ 'cities_list' ] << event.get( 'cities_id' ) map[ 'cities' ] << { 'id' => event.get( 'cities_id' ), 'name' => event.get( 'cities_name' ), 'district' => event.get( 'cities_district' ), 'population' => event.get( 'cities_population' ) } end end map[ 'languages_list' ] ||= [] map[ 'languages' ] ||= [] if (event.get( 'languages_language' ) != nil) if !( map[ 'languages_list' ].include? event.get( 'languages_language' ) ) map[ 'languages_list' ] << event.get( 'languages_language' ) map[ 'languages' ] << { 'language' => event.get( 'languages_language' ), 'official' => event.get( 'languages_official' ), 'percentage' => event.get( 'languages_percentage' ) } end end event.cancel() " push_previous_map_as_event => true timeout => 5 } mutate { remove_field => [ "cities_list" , "languages_list" ] } } output { elasticsearch { document_id => "%{code}" document_type => "world" index => "world_v2" codec => "json" hosts => [ "127.0.0.1:9200" ] } } |
Мы помещаем файл конфигурации в каталог bin logstash. Мы запускаем конвейер logstash, используя следующую команду:
1
|
$ logstash -w 1 -f world-logstash.conf |
Мы используем 1 работника, потому что несколько работников могут разбить агрегацию, поскольку агрегация происходит на основе последовательности событий, имеющих общий код страны. Мы увидим следующий вывод об успешном завершении конвейера logstash:
Откройте следующий URL-адрес http: // localhost: 9200 / world_v2 / world / IND в браузере, чтобы просмотреть информацию об Индии, проиндексированную в Elasticsearch, как показано ниже:
Опубликовано на Java Code Geeks с разрешения Мохамеда Санауллы, партнера нашей программы JCG . См. Оригинальную статью здесь: агрегирование и индексирование данных в Elasticsearch с использованием Logstash, JDBC
Мнения, высказанные участниками Java Code Geeks, являются их собственными. |