Это еще один поток краткой публикации, где я пытаюсь выяснить, как лучше всего решить данную проблему.
Обновление: я решил эту проблему совершенно по-другому. Я опубликую об этом позже. Я держу это здесь, потому что это хороший пост о том, как я думаю о проблеме.
Некоторое время назад я опубликовал визуальное описание того, как Map / Reduce должен работать . Это было в то время, когда я работал над реализацией карты / сокращения RavenDB, но на самом деле это не то, как работает текущая карта / уменьшения RavenDB.
Вместо этого это работает так:
Фаза карты:
for item in docs: for result in reduce(map(item)): Persist(item.id, result.key, result)
И фаза сокращения:
for result in reduce(results): WriteToIndex(result)
Здесь есть несколько интересных вещей. Во-первых, у нас есть и карта, и сокращенный пробег на этапе карты, почему это так?
Ну, чтобы сделать немедленное снижение ценностей, конечно. Это имеет два основных преимущества.
- Это означает, что в фазе редукции редуктор принимает выходные данные редуцирования — это важно, потому что он готовит нас к следующему шагу, который мы должны были бы сделать, многократным шагам редуцирования.
- Во-вторых, это означает, что если у вас есть несколько результатов из одних и тех же документов с одним и тем же ключом, они будут немедленно уменьшены, а не будут иметь несколько результатов с одним и тем же ключом.
Вы можете заметить проблему с кодом выше, хотя. Кажется, мы работаем только один раз и только один раз.
Действительно, так ведет себя RavenDB 1.0. Он запускает уменьшение только один раз, независимо от того, сколько записей у вас есть на ключ.
Подождите! Мне, вероятно, нужно объяснить вещи. Давайте на секунду поговорим о следующей карте / уменьшенном индексе:
//map from order in docs.Orders from line in order.OrderLines select new { line.Product, line.Qty } //reduce from result in results group result by result.Product into g select new { Product = g.Key, Qty = g.Sum(x=>x.Qty) }
Теперь, если у нас есть заказ с двумя позициями для одного и того же продукта, они будут немедленно уменьшены (в одном документе) и сохранены в качестве результатов карты.
Теперь ключ сокращения в этом случае — это продукт позиции. Поэтому, когда мы выполняем уменьшение, мы загружаем все результаты карты, которые имеют один и тот же ключ уменьшения, и вместе выполняем их через функцию сокращения.
Как я уже сказал, так ведет себя RavenDB 1.0. И он работает очень хорошо, за исключением того, что он ведет себя не так хорошо, если у вас много результатов для одного и того же ключа сокращения. Что произойдет, если у нас будет действительно популярный продукт, который был куплен миллионным заказом?
Каждый раз, когда мы получим новый заказ на этот продукт, нам придется заново сокращать весь набор. Это означает, что мы должны были бы повторно уменьшить 1 миллион пунктов.
Недавно у нас возникла проблема, когда у одного из наших клиентов возникла проблема с запуском картографических / сокращающих индексов по всем данным переписи населения США. Одним из проблемных индексов было что-то вроде:
//map from person in docs.CensusPeople select new { person.State, Count = 1 } //reduce from result in results group result by result.State into g select new { State = g.Key, Count = g.Sum(x=>x.Count) }
Как вы можете себе представить, у такого рода вещей будет много предметов для одного и того же ключа.
Например, для Калифорнии нам понадобится сократить примерно на 38 миллионов единиц, а для Техаса — более 25 миллионов единиц. Это действительно не то, что мы имели в виду, поэтому мы обратились к давней ошибке в RavenDB и начали реализовывать многошаговое сокращение.
Вопрос в том, как это сделать. В идеале нам не нужно вносить какие-либо изменения между индексами map / Reduce, которые имеют большое количество элементов на ключ, и индексами map / Reduce, которые имеют небольшое количество индексов на ключ.
Вопрос в том, как это сделать и как убедиться, что мы не влияем на общую производительность системы. Как я упоминал ранее, очень легко изменить вещи, чтобы они соответствовали одному конкретному сценарию, забывая при этом все о других сценариях.
После этого все становится интересным, и здесь вы можете потеряться, потому что эта часть написана в основном с точки зрения разработчиков.
Реальное поведение системы на этапе отображения является более сложным, поскольку нам нужно сделать недействительными старые элементы, это выглядит примерно так:
for item in docs: keys = DeleteMapResultsFor(item.id) for result in reduce(map(item)): keys.Remove(result.key) Persist(item.id, result.key, result) ReReduceRemovedKeys(keys) Instead of this, we will have this: for item in docs: result = DeleteMapResultsFor(item.id) keys = new HashSet<string>(result.Select(x=>x.Key)) lookups = result.ToLookup(x=>new {x.id, x.key}) for result in reduce(map(item)): keys.Remove(result.key) int bucketId if not lookups.TryFind(new { item.Id, result.key}, out bucketId): bucketId = -1 Persist(item.id, result.key, bucketId, result) ReReduceRemovedKeys(keys)
Обратите внимание, что теперь у нас есть понятие сегмента, и по умолчанию этот сегмент установлен на –1. Но обратите внимание, что мы сохраняем тот же идентификатор bucketId, если он уже есть, это будет важно позже.
Интересная вещь происходит в фазе сокращения:
def Reduce(string key, int level): bool hasMore = true bool hadMore = false while hasMore: results = GetMappedResults(key, level, out hasMore) hadMore |= hasMore if hasMore: newBucketId = GetNewBucketId(key, level) UpdateBucketId(results, newBucketId) for result in reduce(results): Persist(key, level +1, result) if not hadMore: WriteToIndex(key, result) if hadMore: ScheduleReduce(key, level +1)
Здесь происходят важные вещи. Если у нас есть менее 1024 наименований для одного и того же ключа, мы просто действуем нормально, там ничего не видно.
Если у нас есть что-то большее, тогда мы создадим корзину для всех этих результатов и запланируем повторное снижение для следующего уровня.
Другими словами, это выглядит так, здесь это фаза карты, обратите внимание, что мы начинаем со всех идентификаторов сегментов, равных –1.
При запуске фазы сокращения с уровнем = 0 мы получаем три сегмента, например:
То, что нам теперь нужно снова уменьшить, это то место, где нас вызывают с level = 1. Предположим, что результаты групп 1 и 2 все еще превышают 1024, поэтому мы будем иметь:
И наконец:
Пока это выглядит красиво, но есть несколько практических проблем, которые нам еще предстоит решить.
Для начала, когда это закончится? У нас есть пользователи, которые пишут карту / уменьшают запросы следующим образом:
//map #1 from customer in docs.Customers select new { CustomerId = customer.Id, CustomerName = customer.Name, OrderId = (string)null, } // map #2 from order in docs.Orders select new { order.CustomerId, CustomerName = (string)null, OrderId = order.Id } //reduce from result in results group result by result.CustomerId into g let name = g.FirstOrDefault(x=>x.CustomerName!=null).CustomerName from item in g select new { CustomerId = g.Key, CustomerName = name, item.OrderId }
Это не одобренный (но работающий) способ, позволяющий вам запрашивать и сортировать по имени клиента при поиске индексов. Проблема этого метода заключается в том, что если у нас будет 15 000 заказов на одного клиента, мы получим такое же число и на этапе сокращения.
Теперь, почему это осуждается? Потому что в то время как это использует карту / уменьшение, на самом деле это не … вы знаете … сокращение данных. Чтобы решить эту проблему, мы собираемся убедиться, что все элементы, сгенерированные за один шаг сокращения, всегда будут попадать в один и тот же сегмент. Это будет означать, что мы ведем почти то же самое поведение, что и сейчас, это будет неэффективно, но так было всегда.
Мы также собираемся ограничить количество уровней до трех, что по-прежнему дает нам возможность обрабатывать более миллиарда результатов, прежде чем на этапе сокращения потребуется увидеть более 1024 элементов одновременно.
Возьмите пример Калифорнии, у нас было бы 37 691 912 человек, каждый из которых сократил бы до 37 691 912 результатов карты в ведре -1. Тогда у нас есть 36 809 ведер для второго уровня. И наконец 35 уровней на третьем уровне. Все из которых рассчитаны на конечный результат.
Следующим шагом здесь является фактическая обработка обновлений, что означает, что мы должны следить за идентификаторами сегментов в будущем, поэтому мы начнем с удаления человека, что означает, что нам нужно удалить результат его карты. Это означает, что нам необходимо повторно уменьшить объем корзины, к которой они принадлежат, на соответствующем уровне, а затем вверх и т. Д. В общей сложности нам придется вычислить 1024 + 1024 + 35 элементов вместо 37 691 912.
Ладно, хватит разговоров, давайте посмотрим, достаточно ли я выправлю вещи, чтобы я смог это реализовать.