В моем проекте на работе у нас есть несколько производственных систем, использующих MongoDB в качестве основной базы данных. Довольно много коллекций, которые у нас есть, можно назвать «огромными», и нам иногда приходится переносить данные из одной (для простоты, скажем, «схемы») в другую или выполнять некоторую статистику по всей коллекция.
Итак, первое, что вы попытаетесь сделать, это типичный foreach для мошеннического запроса, подобного следующему:
Person where(_.age > 18) foreach { p => /*...*/ }
… и делайте любую статистику и т. Д., Что вам нужно сделать для каждого из элементов. Оказывается, есть много причин, по которым это отстой (плохое время), и даже не приблизиться к вычислению всего этого. Некоторые проблемы :
- Курсор истечет, если используется так
- мы сопоставляем вещи с объектами Person, так что это требует времени и памяти
(PS: мы могли бы сделать хуже, чем в примере выше — не пытайтесь получить огромную коллекцию в памяти … ;-)). Переходя к решению, здесь мы должны сделать несколько вещей. Во-первых, не допускайте тайм-аута курсора , чего можно добиться, установив соответствующую опцию на курсоре:
cursor.addOption(Bytes.QUERYOPTION_NOTIMEOUT)
Как вы уже заметили … мы подошли к низкоуровневому API здесь … А пока, допустим, я в порядке с этим (мы снова включим мошенничество чуть позже). Таким образом, помещая это в контекст, вы должны написать:
def withCursor[T](cursor: => DBCursor)(f: DBCursor => T) = try { f(c) } finally { cursor.close() } // somewhere... MongoDB.useCollection(meta.mongoIdentifier, meta.collectionName) { coll => withCursor(coll.find()) { cursor => cursor.addOption(Bytes.QUERYOPTION_NOTIMEOUT) // ...
Хорошо, здесь вы можете использовать курсор и перебирать всю коллекцию. Это довольно немного кода, но мы дошли до того, что курсор можно использовать, и время ожидания не истечет… Следующий шаг — упаковать это в симпатичную функцию и разрешить передачу в посторонний объект запроса. Я вставлю конечный результат этих шагов, чтобы вы могли проанализировать его самостоятельно:
def stream[T <: MongoRecord[T]] (meta: MongoMetaRecord[T], query: Option[DBObject] = None) (callback: T => Unit) { MongoDB.useCollection(meta.mongoIdentifier, meta.collectionName) { coll => withCursor(coll.find(query.getOrElse(null))) { cursor => cursor.addOption(Bytes.QUERYOPTION_NOTIMEOUT) while(cursor.hasNext) { val next = cursor.next() val obj = meta.fromDBObject(next) callback(obj) } } } } // can be used like this: val meta = Person stream(meta) { migrateIt(_) } // or with the query supplied: import scalaz.Scalaz._ import com.foursquare.rogue.Rogue._ val query = meta where(_.age > 18) stream(meta, query.asDBObject.some) { migrateIt }
Есть небольшая неприятность с написанием query.asDBObject.some (который Scalaz эквивалентен Some (query.asDBObject) )… Давайте исправим это с помощью простой делегирующей функции:
def stream[T <: MongoRecord[T]] (meta: MongoMetaRecord[T], query: MongoMetaRecord[T] => BaseQuery[_, _, _, _, _, _, _]) (callback: T => Unit) { stream(meta, query(meta).asDBObject.some)(callback) } // so we can call it like: import com.foursquare.rogue.Rogue._ stream(meta, meta.where(_.age > 18)) { migrateIt }
Но я все еще не доволен этим. Как уже упоминалось, мы имеем дело с огромными коллекциями, поэтому обработка может иногда занимать один день. Давайте посмотрим, что мы можем отсечь здесь … О да, давайте не будем выбирать поля, которые мы не используем. Давайте пересмотрим наш код для реализации вышеуказанной версии streamSelect, которая будет выбирать только те поля, которые нас интересуют, из mongo:
/** So we don't have to manually extract field names */ def select[T](on: T, fields: Function1[T, BaseField]*): Seq[String] = for(f <- fields) yield f(on).name def streamSelect[T <: MongoRecord[T]] (meta: MongoMetaRecord[T], select: Seq[String] = Nil, query: DBObject)(callback: List[Any] => Unit) { MongoDB.useCollection(meta.mongoIdentifier, meta.collectionName) { coll => val selectFields = new BasicDBObject(select.map(_ -> 1).toMap) withCursor(coll.find(query, selectFields)) { cursor => cursor.addOption(Bytes.QUERYOPTION_NOTIMEOUT) while(cursor.hasNext) { val next = cursor.next() val fieldValues = select map { next.get(_) match { case l: BasicDBList => l.toList case other => other } } callback(fieldValues) } } } } // which can be used as: val m = Person val above18 = m where (_.age > 18) streamSelect(m, select[Person](m, _.name, _.age, _.height), query = above18) { case (name: String) :: (age: Int) :: (h: Double) :: Nil => // ... } // instead of using the bellow helper you can pass names manually: select[Person](m, _.name, _.age, _.height) == List(m.name.name, m.age.name, m.height.name)
Здесь мы выбираем только те поля, которые нам действительно нужны, что значительно повысило производительность. Это вполне читабельно, хотя я не смог избавиться от
[Person]
введите параметр в
select
помощник. Мы используем такие потоки везде, где мы знаем, что есть «много материала для обработки» или в так называемых «предварительных загрузках», где мы вычисляем набор значений из всей коллекции для повторного использования.
Возможно, вы заметили, что все это не было очень TypeSafe ( обратный вызов не). Итак … вы могли бы спросить, реализовали ли мы «безопасную версию» наших потоков? И на самом деле мы это сделали, хотя это основано на кортежах, поэтому нам приходилось реализовывать одно и то же несколько раз — для разных кортежей. Я вставлю здесь только использование версии TypeSafe (и, если вам интересно, я могу сделать пост в блоге о них):
val m = Person streamTypesafe(m)(m.age, m.name, m.height)) { (age, name, height) => // yes, age: Int, name: String, and height: Double! 🙂 }
Используя эти потоки, мы получили возможность легко записывать все миграции, которые нам нужны, и мы все еще достаточно гибки, чтобы, например, удалить поля из коллекции (с некоторой обработкой, поэтому простой запрос Монго не будет быть достаточным). В streamSelect мы открыты, чтобы иметь несколько операторов case, поэтому, даже если коллекция не является однородной, мы можем совпасть с нулем в некоторых полях и по-прежнему работать со всей коллекцией — при необходимости.
В любом случае, я надеюсь, что вы нашли эту кучу фрагментов кода интересной или полезной — мы, безусловно, делаем это в нашем повседневном кодировании ?