Моя последняя задача в BigPanda Engineering состояла в том, чтобы обновить существующий сервис с Elasticsearch версии 1.7 до более новой версии Elastic 6.8.1. В этой статье я расскажу, как мы перешли с эластичного 1.6 на эластичный 6.8 с жесткими ограничениями, такими как нулевое время простоя, отсутствие потери данных, ноль ошибок, и предоставлю вам сценарий, который выполнит миграцию за вас. Этот пост содержит 6 глав (одна необязательная): Что в этом для меня? Какие новые функции побудили нас обновить нашу версию? Ограничения — каковы были наши бизнес-требования Как мы можем устранить ограничения План [Необязательная глава] — Как мы справились с печально известной проблемой картографического взрыва? Как сделать миграцию данных между кластерами Глава 1 — Что это для меня? Какие преимущества мы ожидаем решить путем обновления нашего хранилища данных? Было несколько причин: Проблемы с производительностью и стабильностью. Мы столкнулись с огромным количеством отключений при длительном MTTR, что вызвало у нас много головной боли. Это отражается в частых высоких задержках, высокой загрузке процессора и многом другом. Несуществующая поддержка в старых версиях Elasticsearch — нам не хватало некоторых оперативных знаний в Elasticsearch, и когда мы искали сторонние консультации, нам было предложено перейти вперед, чтобы получить поддержку Динамические отображения в нашей схеме — наша текущая схема в Elasticsearch 1.7 использовала функцию, называемую динамическими отображениями, которая заставляла наш кластер взрываться несколько раз, и мы хотели решить эту проблему Плохая видимость нашего существующего кластера — мы хотели получить лучший обзор изнутри и увидели, что в более поздних версиях были отличные инструменты для экспорта метрик Глава 2 — Ограничения Нулевая миграция простоя — у нас есть активные пользователи в нашей системе, и мы не можем позволить системе не работать во время миграции План восстановления — мы не можем позволить себе «потерять» или «испортить» данные, независимо от их стоимости. Таким образом, мы должны подготовить план восстановления в случае сбоя нашей миграции Ноль ошибок — мы не должны изменять существующие функции поиска для конечных пользователей Глава 3 — Размышление о плане Давайте разберем ограничения от простых до самых сложных: Ноль ошибок Чтобы выполнить это требование, я изучил все возможные запросы, которые получает служба, и каковы были ее результаты, и добавил юнит-тесты, где это необходимо. Кроме того, я добавил несколько метрик (для Elasticsearch Indexer и new Elasticsearch Indexer ) для отслеживания задержки, пропускной способности и производительности, что позволило мне проверить, что мы только улучшили их. План восстановления Это означает, что мне нужно разрешить следующую ситуацию: я развернул новый код в производстве, и все работает не так, как ожидалось. Что я могу с этим поделать? Поскольку я работал в службе, использующей источник событий, я мог добавить еще одного прослушивателя (схема ниже) и начать запись в новый кластер Elasticsearch, не влияя на состояние производства. Нулевая миграция простоя Текущий сервис находится в режиме реального времени и не может быть «деактивирован» на периоды более 5–10 минут. Хитрость в получении этого права заключается в следующем: Храните журнал всех действий, которые выполняет ваш сервис (мы используем Kafka в производстве) Запустите процесс миграции в автономном режиме (и отслеживайте смещение до начала миграции). Когда миграция закончится, запустите новый сервис для журнала с записанным смещением и наверстайте упущенное Когда отставание закончится, измените свою внешнюю сторону, чтобы запросить новый сервис, и все готово Глава 4 — План Наш текущий сервис использует следующую архитектуру (основанную на передаче сообщений в Kafka): Event topic содержит события, созданные другими приложениями (например, UserId 3 created) Command topic содержит перевод этих событий в конкретные команды , используемые этим приложением (например: Add userId 3) Elasticsearch 1.7 — хранилище данных command Topic прочитанного Elasticsearch Indexer. Мы планировали добавить еще одного потребителя ( new Elasticsearch Indexer) command topic, который будет читать те же самые сообщения и записывать их параллельно с Elasticsearch 6.8 С чего начать Честно говоря, я считал себя новичком в Elasticsearch, и чтобы чувствовать себя уверенно, выполняя эту задачу, мне пришлось подумать о том, как лучше всего подойти к этой теме и изучить ее. Несколько вещей, которые помогли, были: Документация — это безумно полезный ресурс для всего Elasticsearch. Потратьте время, чтобы прочитать его и делать заметки (не пропустите: Mapping и QueryDsl ). HTTP API — все под CAT API. Это было супер полезно, чтобы отладить вещи локально и посмотреть, как Elastic реагирует. (не пропустите: здоровье кластера, индексы кошек, поиск, удаление индекса) Метрики (❤️). С первого дня мы настроили новую блестящую панель инструментов с множеством интересных метрик (взятых из эластичного поиска-экспортера для Prometheus ), которые помогли и подтолкнули нас к пониманию Elastic. Код Наш кодовое использовали библиотеку под названием elastic4s и использовали самую старую версию программы, доступную в библиотеке — Действительно хорошая причина , чтобы мигрировать дальше! Поэтому первым делом было просто перенести версии и посмотреть, что ломается Существует несколько тактик для того, как выполнить миграцию кода, и мы выбрали в первую очередь попытку восстановить существующую функциональность в новой версии Elastic без переписывания всего кода с самого начала. AKA, достигните существующей функциональности, но на более новой версии Elasticsearch К счастью для нас, код уже содержал почти полное покрытие тестирования, поэтому наша задача была намного проще, и это заняло около 2 недель времени разработки Важно отметить, что если бы это было не так, нам пришлось бы потратить некоторое время на заполнение этого покрытия и только затем перейти на новую версию (поскольку одним из наших ограничений было: не нарушать существующую функциональность) Глава 5 — Картографическая проблема взрыва Давайте опишем наш вариант использования более подробно, это наша модель class InsertMessageCommand(tags: Map[String,String]) И, например, экземпляр этого сообщения будет: new InsertMessageCommand(Map("name"->"dor","lastName"->"sever")) И учитывая эту модель, нам нужно было поддерживать следующие требования запросов: запрос по значению запрос по имени и значению тега То, как это моделировалось в нашей схеме Elasticsearch 1.7, использовало динамическую схему шаблона (поскольку ключи тегов являются динамическими и не могут быть смоделированы в расширенном режиме) Динамический шаблон вызвал у нас многократные сбои из-за проблемы взрыва Mapping, и схема выглядела так: Оболочка xxxxxxxxxx 1 55 1 curl -X PUT "localhost:9200/_template/my_template?pretty" -H 'Content-Type: application/json' -d ' 2 { 3 "index_patterns": [ 4 "your-index-names*" 5 ], 6 "mappings": { 7 "_doc": { 8 "dynamic_templates": [ 9 { 10 "tags": { 11 "mapping": { 12 "type": "text" 13 }, 14 "path_match": "actions.tags.*" 15 } 16 } 17 ] 18 } 19 }, 20 "aliases": {} 21 }' 22 23 curl -X PUT "localhost:9200/your-index-names-1/_doc/1?pretty" -H 'Content-Type: application/json' -d' 24 { 25 "actions": { 26 "tags" : { 27 "name": "John", 28 "lname" : "Smith" 29 } 30 } 31 } 32 ' 33 34 curl -X PUT "localhost:9200/your-index-names-1/_doc/2?pretty" -H 'Content-Type: application/json' -d' 35 { 36 "actions": { 37 "tags" : { 38 "name": "Dor", 39 "lname" : "Sever" 40 } 41 } 42 } 43 ' 44 45 curl -X PUT "localhost:9200/your-index-names-1/_doc/3?pretty" -H 'Content-Type: application/json' -d' 46 { 47 "actions": { 48 "tags" : { 49 "name": "AnotherName", 50 "lname" : "AnotherLastName" 51 } 52 } 53 } 54 ' 55 56 57 curl -X GET "localhost:9200/_search?pretty" -H 'Content-Type: application/json' -d' 58 { 59 "query": { 60 "match" : { 61 "actions.tags.name" : { 62 "query" : "John" 63 } 64 } 65 } 66 } 67 ' 68 # returns 1 match(doc 1) 69 70 71 curl -X GET "localhost:9200/_search?pretty" -H 'Content-Type: application/json' -d' 72 { 73 "query": { 74 "match" : { 75 "actions.tags.lname" : { 76 "query" : "John" 77 } 78 } 79 } 80 } 81 ' 82 # returns zero matches 83 84 # search by value 85 curl -X GET "localhost:9200/_search?pretty" -H 'Content-Type: application/json' -d' 86 { 87 "query": { 88 "query_string" : { 89 "fields": ["actions.tags.*" ], 90 "query" : "Dor" 91 } 92 } 93 } 94 ' 95 Решение для вложенных документов Нашим первым инстинктом в решении проблемы взрыва картографии было использование вложенных документов. Мы прочитали учебник по вложенным типам данных в Elastic docs и определили следующую схему и запросы Оболочка xxxxxxxxxx 1 72 1 curl -X PUT "localhost:9200/my_index?pretty" -H 'Content-Type: application/json' -d' 2 { 3 "mappings": { 4 "_doc": { 5 "properties": { 6 "tags": { 7 "type": "nested" 8 } 9 } 10 } 11 } 12 } 13 ' 14 15 curl -X PUT "localhost:9200/my_index/_doc/1?pretty" -H 'Content-Type: application/json' -d' 16 { 17 "tags" : [ 18 { 19 "key" : "John", 20 "value" : "Smith" 21 }, 22 { 23 "key" : "Alice", 24 "value" : "White" 25 } 26 ] 27 } 28 ' 29 30 31 # Query by tag key and value 32 curl -X GET "localhost:9200/my_index/_search?pretty" -H 'Content-Type: application/json' -d' 33 { 34 "query": { 35 "nested": { 36 "path": "tags", 37 "query": { 38 "bool": { 39 "must": [ 40 { "match": { "tags.key": "Alice" }}, 41 { "match": { "tags.value": "White" }} 42 ] 43 } 44 } 45 } 46 } 47 } 48 ' 49 50 # Returns 1 document 51 52 53 curl -X GET "localhost:9200/my_index/_search?pretty" -H 'Content-Type: application/json' -d' 54 { 55 "query": { 56 "nested": { 57 "path": "tags", 58 "query": { 59 "bool": { 60 "must": [ 61 { "match": { "tags.value": "Smith" }} 62 ] 63 } 64 } 65 } 66 } 67 } 68 ' 69 70 # Query by tag value 71 # Returns 1 result 72 И это решение сработало. Однако, когда мы попытались вставить реальные данные о клиентах, мы увидели, что количество документов в нашем индексе увеличилось примерно на X 500 Мы подумали о следующих проблемах и пошли искать лучшее решение: Количество документов, которые мы имели в нашем кластере, составляло около 500 миллионов документов, и это означало, что в новой схеме мы собирались достичь двухсот пятидесяти миллиардов документов (это 250 000 000 000 документов) Мы читаем этот действительно хороший пост в блоге — https://blog.gojekengineering.com/elasticsearch-the-trouble-with-nested-documents-e97b33b46194, в котором подчеркивается, что вложенные документы могут вызывать высокую задержку в запросах и проблемы с использованием кучи. Тестирование — поскольку мы конвертировали 1 документ в старом кластере в неизвестное количество документов в новом кластере, будет намного сложнее отследить, работает ли процесс миграции без потери данных (если наше преобразование было 1: 1, мы Можно утверждать, что количество в старом кластере равно количеству в новом кластере) Избегать вложенных документов Реальная хитрость в этом заключалась в том, чтобы сосредоточиться на том, какие поддерживаемые запросы мы выполняли: поиск по значению тега и поиск по ключу и значению тега. Первый запрос не требует вложенных документов, так как он работает с одним полем, а для последнего мы сделали следующий трюк. Мы создали поле, которое содержит комбинацию ключа и значения, и всякий раз, когда пользователь запрашивает ключ, значение совпадает, мы переводим его запрос в соответствующий текст и запрашиваем это поле. Пример: Оболочка x 58 1 curl -X PUT "localhost:9200/my_index_2?pretty" -H 'Content-Type: application/json' -d' 2 { 3 "mappings": { 4 "_doc": { 5 "properties": { 6 "tags": { 7 "type": "object", 8 "properties": { 9 "keyToValue": { 10 "type": "keyword" 11 }, 12 "value": { 13 "type": "keyword" 14 } 15 } 16 } 17 } 18 } 19 } 20 } 21 ' 22 23 24 curl -X PUT "localhost:9200/my_index_2/_doc/1?pretty" -H 'Content-Type: application/json' -d' 25 { 26 "tags" : [ 27 { 28 "keyToValue" : "John:Smith", 29 "value" : "Smith" 30 }, 31 { 32 "keyToValue" : "Alice:White", 33 "value" : "White" 34 } 35 ] 36 } 37 ' 38 39 # Query by key,value 40 # User queries for key: Alice, and value : White , we then query elastic with this query: 41 42 curl -X GET "localhost:9200/my_index_2/_search?pretty" -H 'Content-Type: application/json' -d' 43 { 44 "query": { 45 "bool": { 46 "must": [ { "match": { "tags.keyToValue": "Alice:White" }}] 47 }}} 48 ' 49 50 # Query by value only 51 curl -X GET "localhost:9200/my_index_2/_search?pretty" -H 'Content-Type: application/json' -d' 52 { 53 "query": { 54 "bool": { 55 "must": [ { "match": { "tags.value": "White" }}] 56 }}} 57 ' 58 Глава 6 — Процесс миграции Мы планировали перенести около 500 миллионов документов без простоев. И для этого нам понадобилось: Стратегия переноса данных из старого Elastic в новый Elasticsearch Стратегия о том, как сократить разрыв между началом миграции и ее окончанием. И наши два варианта закрытия лага: Наша система обмена сообщениями основана на Kafka, и мы могли бы просто взять текущее смещение до начала миграции, а после окончания миграции начать использовать из этого конкретного смещения. Это решение требует ручной настройки смещений и некоторых других вещей, но будет работать Другой подход к решению этой проблемы состоял в том, чтобы начать использовать сообщения с начала темы в Kafka и сделать наши действия на Elasticsearch идемпотентными, то есть, если изменение уже было «применено», в магазине Elastic ничего не изменится Запросы, сделанные нашим сервисом против Elastic, были уже идемпотентными, поэтому мы выбираем вариант 2, потому что он требует нулевой ручной работы (не нужно принимать конкретные смещения, а затем устанавливать их впоследствии в новой группе потребителей) Как мы можем перенести данные? Это были варианты, о которых мы думали: Если бы наша Кафка содержала все сообщения с начала времени, мы могли бы просто играть с самого начала, и конечное состояние было бы равным. Но так как мы применяем сохранение к темам, это не было вариантом Сбрасывать сообщения на диск — и затем загружать их непосредственно в Elastic — это решение выглядело довольно странно, зачем хранить их на диске, а не просто записывать их непосредственно в Elastic? Перенос сообщений между старой упругой в новую упругую — это означало написание какого-то «сценария» (кто-нибудь сказал, python?), Который будет подключаться к старому кластеру Elasticsearch, запрашивать элементы, преобразовывать их в новую схему и индексировать их в кластер Мы выбрали последний вариант, и это были те варианты дизайна, которые мы имели в виду Давайте не будем думать об обработке ошибок, если нам не нужно. Давайте попробуем написать что-то очень простое, и если возникнут ошибки, попробуем их устранить. В конце концов, нам не нужно было решать эту проблему, так как во время миграции не было ошибок Это разовая операция, поэтому все, что работает в первую очередь / KISS Метрики — поскольку процессы миграции могут занимать долгие часы или дни, мы хотели, чтобы с первого дня была возможность отслеживать количество ошибок и отслеживать текущий прогресс и скорость копирования сценария. Мы думали долго и упорно и выбрать питон в качестве нашего оружия выбора. Окончательная версия кода прилагается ниже. dictor==0.1.2 - to copy and transform our Elasticsearch documents elasticsearch==1.9.0 - to connect to "old" Elasticsearch elasticsearch6==6.4.2 - to connect to the "new" Elasticsearch statsd==3.3.0 - to report metrics питон xxxxxxxxxx 1 69 1 from elasticsearch import Elasticsearch 2 from elasticsearch6 import Elasticsearch as Elasticsearch6 3 import sys 4 from elasticsearch.helpers import scan 5 from elasticsearch6.helpers import parallel_bulk 6 import statsd 7 8 ES_SOURCE = Elasticsearch(sys.argv[1]) 9 ES_TARGET = Elasticsearch6(sys.argv[2]) 10 INDEX_SOURCE = sys.argv[3] 11 INDEX_TARGET = sys.argv[4] 12 QUERY_MATCH_ALL = {"query": {"match_all": {}}} 13 SCAN_SIZE = 1000 14 SCAN_REQUEST_TIMEOUT = '3m' 15 REQUEST_TIMEOUT = 180 16 MAX_CHUNK_BYTES = 15 * 1024 * 1024 17 RAISE_ON_ERROR = False 18 19 20 def transform_item(item, index_target): 21 # implement your logic transformation here 22 transformed_source_doc = item.get("_source") 23 return {"_index": index_target, 24 "_type": "_doc", 25 "_id": item['_id'], 26 "_source": transformed_source_doc} 27 28 29 def transformedStream(es_source, match_query, index_source, index_target, transform_logic_func): 30 for item in scan(es_source, query=match_query, index=index_source, size=SCAN_SIZE, 31 timeout=SCAN_REQUEST_TIMEOUT): 32 yield transform_logic_func(item, index_target) 33 34 35 def index_source_to_target(es_source, es_target, match_query, index_source, index_target, bulk_size, statsd_client, 36 logger, transform_logic_func): 37 ok_count = 0 38 fail_count = 0 39 count_response = es_source.count(index=index_source, body=match_query) 40 count_result = count_response['count'] 41 statsd_client.gauge(stat='elastic_migration_document_total_count,index={0},type=success'.format(index_target), 42 value=count_result) 43 with statsd_client.timer('elastic_migration_time_ms,index={0}'.format(index_target)): 44 actions_stream = transformedStream(es_source, match_query, index_source, index_target, transform_logic_func) 45 for (ok, item) in parallel_bulk(es_target, 46 chunk_size=bulk_size, 47 max_chunk_bytes=MAX_CHUNK_BYTES, 48 actions=actions_stream, 49 request_timeout=REQUEST_TIMEOUT, 50 raise_on_error=RAISE_ON_ERROR): 51 if not ok: 52 logger.error("got error on index {} which is : {}".format(index_target, item)) 53 fail_count += 1 54 statsd_client.incr('elastic_migration_document_count,index={0},type=failure'.format(index_target), 55 1) 56 else: 57 ok_count += 1 58 statsd_client.incr('elastic_migration_document_count,index={0},type=success'.format(index_target), 59 1) 60 61 return ok_count, fail_count 62 63 64 statsd_client = statsd.StatsClient(host='localhost', port=8125) 65 66 if __name__ == "__main__": 67 index_source_to_target(ES_SOURCE, ES_TARGET, QUERY_MATCH_ALL, INDEX_SOURCE, INDEX_TARGET, BULK_SIZE, 68 statsd_client, transform_item) 69 Вывод Миграция данных в реальной производственной системе — сложная задача, требующая большого внимания и тщательного планирования. Я рекомендую потратить время и выполнить шаги, перечисленные выше, и выяснить, что лучше всего подходит для ваших нужд Как правило, всегда старайтесь максимально снизить ваши требования. Например, требуется ли миграция без простоев? Можете ли вы позволить себе потерю данных? Обновление хранилищ данных — это, как правило, марафон, а не спринт, поэтому сделайте глубокий вдох и попытайтесь насладиться поездкой Весь процесс, перечисленный выше, занял у меня около 4 месяцев работы. Все примеры Elasticsearch, приведенные в этом посте, были протестированы с версией 6.8.1. Share: