Статьи

Потоковые миграции с поддержкой Scala в MongoDB на миллионах записей

 

В моем проекте на работе у нас есть несколько производственных систем, использующих MongoDB в качестве основной базы данных. Довольно много коллекций, которые у нас есть, можно назвать «огромными», и нам иногда приходится переносить данные из одной (для простоты, скажем, «схемы») в другую или выполнять некоторую статистику по всей коллекция.

Итак, первое, что вы попытаетесь сделать, это типичный foreach для мошеннического запроса, подобного следующему:

Person where(_.age > 18) foreach { p => /*...*/ }

… и делайте любую статистику и т. Д., Что вам нужно сделать для каждого из элементов. Оказывается, есть много причин, по которым это отстой (плохое время), и даже не приблизиться к вычислению всего этого. Некоторые проблемы :

  • Курсор истечет, если используется так
  • мы сопоставляем вещи с объектами Person, так что это требует времени и памяти

(PS: мы могли бы сделать хуже, чем в примере выше — не пытайтесь получить огромную коллекцию в памяти … ;-)). Переходя к решению, здесь мы должны сделать несколько вещей. Во-первых, не допускайте тайм-аута курсора , чего можно добиться, установив соответствующую опцию на курсоре:

cursor.addOption(Bytes.QUERYOPTION_NOTIMEOUT)

Как вы уже заметили … мы подошли к низкоуровневому API здесь … А пока, допустим, я в порядке с этим (мы снова включим мошенничество чуть позже). Таким образом, помещая это в контекст, вы должны написать:

def withCursor[T](cursor: => DBCursor)(f: DBCursor => T) =
  try { f(c) } finally { cursor.close() }

// somewhere...
MongoDB.useCollection(meta.mongoIdentifier, meta.collectionName) { coll =>
  withCursor(coll.find()) { cursor =>
    cursor.addOption(Bytes.QUERYOPTION_NOTIMEOUT)
    // ...

Хорошо, здесь вы можете использовать курсор и перебирать всю коллекцию. Это довольно немного кода, но мы дошли до того, что курсор можно использовать, и время ожидания не истечет… Следующий шаг — упаковать это в симпатичную функцию и разрешить передачу в посторонний объект запроса. Я вставлю конечный результат этих шагов, чтобы вы могли проанализировать его самостоятельно:

  def stream[T <: MongoRecord[T]]
            (meta: MongoMetaRecord[T], query: Option[DBObject] = None)
            (callback: T => Unit) {
    MongoDB.useCollection(meta.mongoIdentifier, meta.collectionName) { coll =>
      withCursor(coll.find(query.getOrElse(null))) { cursor =>
        cursor.addOption(Bytes.QUERYOPTION_NOTIMEOUT)

        while(cursor.hasNext) {
          val next = cursor.next()
          val obj = meta.fromDBObject(next)

          callback(obj)
        }
      }
    }
  }

// can be used like this:
val meta = Person
stream(meta) { migrateIt(_) }

// or with the query supplied:
import scalaz.Scalaz._
import com.foursquare.rogue.Rogue._
val query = meta where(_.age > 18)
stream(meta, query.asDBObject.some) { migrateIt }

Есть небольшая неприятность с написанием query.asDBObject.some (который Scalaz эквивалентен Some (query.asDBObject) )… Давайте исправим это с помощью простой делегирующей функции:

def stream[T <: MongoRecord[T]]
          (meta: MongoMetaRecord[T], 
           query: MongoMetaRecord[T] => BaseQuery[_, _, _, _, _, _, _])
          (callback: T => Unit) {
  stream(meta, query(meta).asDBObject.some)(callback)
}

// so we can call it like:
import com.foursquare.rogue.Rogue._
stream(meta, meta.where(_.age > 18)) { migrateIt }

Но я все еще не доволен этим. Как уже упоминалось, мы имеем дело с огромными коллекциями, поэтому обработка может иногда занимать один день. Давайте посмотрим, что мы можем отсечь здесь … О да, давайте не будем выбирать поля, которые мы не используем. Давайте пересмотрим наш код для реализации вышеуказанной версии streamSelect, которая будет выбирать только те поля, которые нас интересуют, из mongo:

  /** So we don't have to manually extract field names */
  def select[T](on: T, fields: Function1[T, BaseField]*): Seq[String] =
    for(f <- fields) yield f(on).name

  def streamSelect[T <: MongoRecord[T]]
                  (meta: MongoMetaRecord[T], 
                   select: Seq[String] = Nil, 
                   query: DBObject)(callback: List[Any] => Unit) {
    MongoDB.useCollection(meta.mongoIdentifier, meta.collectionName) { coll =>
      val selectFields = new BasicDBObject(select.map(_ -> 1).toMap)

      withCursor(coll.find(query, selectFields)) { cursor =>
        cursor.addOption(Bytes.QUERYOPTION_NOTIMEOUT)

        while(cursor.hasNext) {
          val next = cursor.next()

          val fieldValues = select map { next.get(_) match {
              case l: BasicDBList => l.toList
              case other => other
            }
          }

          callback(fieldValues)
        }
      }
    }
  }


// which can be used as:
val m = Person
val above18 = m where (_.age > 18)
streamSelect(m, select[Person](m, _.name, _.age, _.height), query = above18) { 
  case (name: String) :: (age: Int) :: (h: Double) :: Nil =>
   // ...
}

// instead of using the bellow helper you can pass names manually:
select[Person](m, _.name, _.age, _.height) == List(m.name.name, m.age.name, m.height.name)

Здесь мы выбираем только те поля, которые нам действительно нужны, что значительно повысило производительность. Это вполне читабельно, хотя я не смог избавиться от

[Person]

введите параметр в

select

помощник. Мы используем такие потоки везде, где мы знаем, что есть «много материала для обработки» или в так называемых «предварительных загрузках», где мы вычисляем набор значений из всей коллекции для повторного использования.

Возможно, вы заметили, что все это не было очень TypeSafe ( обратный вызов не). Итак … вы могли бы спросить, реализовали ли мы «безопасную версию» наших потоков? И на самом деле мы это сделали, хотя это основано на кортежах, поэтому нам приходилось реализовывать одно и то же несколько раз — для разных кортежей. Я вставлю здесь только использование версии TypeSafe (и, если вам интересно, я могу сделать пост в блоге о них):

val m = Person
streamTypesafe(m)(m.age, m.name, m.height)) {
  (age, name, height) =>
  // yes, age: Int, name: String, and height: Double! 🙂  
}

Используя эти потоки, мы получили возможность легко записывать все миграции, которые нам нужны, и мы все еще достаточно гибки, чтобы, например, удалить поля из коллекции (с некоторой обработкой, поэтому простой запрос Монго не будет быть достаточным). В streamSelect мы открыты, чтобы иметь несколько операторов case, поэтому, даже если коллекция не является однородной, мы можем совпасть с нулем в некоторых полях и по-прежнему работать со всей коллекцией — при необходимости.

В любом случае, я надеюсь, что вы нашли эту кучу фрагментов кода интересной или полезной — мы, безусловно, делаем это в нашем повседневном кодировании ?