В моем проекте на работе у нас есть несколько производственных систем, использующих MongoDB в качестве основной базы данных. Довольно много коллекций, которые у нас есть, можно назвать «огромными», и нам иногда приходится переносить данные из одной (для простоты, скажем, «схемы») в другую или выполнять некоторую статистику по всей коллекция.
Итак, первое, что вы попытаетесь сделать, это типичный foreach для мошеннического запроса, подобного следующему:
Person where(_.age > 18) foreach { p => /*...*/ }
… и делайте любую статистику и т. Д., Что вам нужно сделать для каждого из элементов. Оказывается, есть много причин, по которым это отстой (плохое время), и даже не приблизиться к вычислению всего этого. Некоторые проблемы :
- Курсор истечет, если используется так
- мы сопоставляем вещи с объектами Person, так что это требует времени и памяти
(PS: мы могли бы сделать хуже, чем в примере выше — не пытайтесь получить огромную коллекцию в памяти … ;-)). Переходя к решению, здесь мы должны сделать несколько вещей. Во-первых, не допускайте тайм-аута курсора , чего можно добиться, установив соответствующую опцию на курсоре:
cursor.addOption(Bytes.QUERYOPTION_NOTIMEOUT)
Как вы уже заметили … мы подошли к низкоуровневому API здесь … А пока, допустим, я в порядке с этим (мы снова включим мошенничество чуть позже). Таким образом, помещая это в контекст, вы должны написать:
def withCursor[T](cursor: => DBCursor)(f: DBCursor => T) =
try { f(c) } finally { cursor.close() }
// somewhere...
MongoDB.useCollection(meta.mongoIdentifier, meta.collectionName) { coll =>
withCursor(coll.find()) { cursor =>
cursor.addOption(Bytes.QUERYOPTION_NOTIMEOUT)
// ...
Хорошо, здесь вы можете использовать курсор и перебирать всю коллекцию. Это довольно немного кода, но мы дошли до того, что курсор можно использовать, и время ожидания не истечет… Следующий шаг — упаковать это в симпатичную функцию и разрешить передачу в посторонний объект запроса. Я вставлю конечный результат этих шагов, чтобы вы могли проанализировать его самостоятельно:
def stream[T <: MongoRecord[T]]
(meta: MongoMetaRecord[T], query: Option[DBObject] = None)
(callback: T => Unit) {
MongoDB.useCollection(meta.mongoIdentifier, meta.collectionName) { coll =>
withCursor(coll.find(query.getOrElse(null))) { cursor =>
cursor.addOption(Bytes.QUERYOPTION_NOTIMEOUT)
while(cursor.hasNext) {
val next = cursor.next()
val obj = meta.fromDBObject(next)
callback(obj)
}
}
}
}
// can be used like this:
val meta = Person
stream(meta) { migrateIt(_) }
// or with the query supplied:
import scalaz.Scalaz._
import com.foursquare.rogue.Rogue._
val query = meta where(_.age > 18)
stream(meta, query.asDBObject.some) { migrateIt }
Есть небольшая неприятность с написанием query.asDBObject.some (который Scalaz эквивалентен Some (query.asDBObject) )… Давайте исправим это с помощью простой делегирующей функции:
def stream[T <: MongoRecord[T]]
(meta: MongoMetaRecord[T],
query: MongoMetaRecord[T] => BaseQuery[_, _, _, _, _, _, _])
(callback: T => Unit) {
stream(meta, query(meta).asDBObject.some)(callback)
}
// so we can call it like:
import com.foursquare.rogue.Rogue._
stream(meta, meta.where(_.age > 18)) { migrateIt }
Но я все еще не доволен этим. Как уже упоминалось, мы имеем дело с огромными коллекциями, поэтому обработка может иногда занимать один день. Давайте посмотрим, что мы можем отсечь здесь … О да, давайте не будем выбирать поля, которые мы не используем. Давайте пересмотрим наш код для реализации вышеуказанной версии streamSelect, которая будет выбирать только те поля, которые нас интересуют, из mongo:
/** So we don't have to manually extract field names */
def select[T](on: T, fields: Function1[T, BaseField]*): Seq[String] =
for(f <- fields) yield f(on).name
def streamSelect[T <: MongoRecord[T]]
(meta: MongoMetaRecord[T],
select: Seq[String] = Nil,
query: DBObject)(callback: List[Any] => Unit) {
MongoDB.useCollection(meta.mongoIdentifier, meta.collectionName) { coll =>
val selectFields = new BasicDBObject(select.map(_ -> 1).toMap)
withCursor(coll.find(query, selectFields)) { cursor =>
cursor.addOption(Bytes.QUERYOPTION_NOTIMEOUT)
while(cursor.hasNext) {
val next = cursor.next()
val fieldValues = select map { next.get(_) match {
case l: BasicDBList => l.toList
case other => other
}
}
callback(fieldValues)
}
}
}
}
// which can be used as:
val m = Person
val above18 = m where (_.age > 18)
streamSelect(m, select[Person](m, _.name, _.age, _.height), query = above18) {
case (name: String) :: (age: Int) :: (h: Double) :: Nil =>
// ...
}
// instead of using the bellow helper you can pass names manually:
select[Person](m, _.name, _.age, _.height) == List(m.name.name, m.age.name, m.height.name)
Здесь мы выбираем только те поля, которые нам действительно нужны, что значительно повысило производительность. Это вполне читабельно, хотя я не смог избавиться от
[Person]
введите параметр в
select
помощник. Мы используем такие потоки везде, где мы знаем, что есть «много материала для обработки» или в так называемых «предварительных загрузках», где мы вычисляем набор значений из всей коллекции для повторного использования.
Возможно, вы заметили, что все это не было очень TypeSafe ( обратный вызов не). Итак … вы могли бы спросить, реализовали ли мы «безопасную версию» наших потоков? И на самом деле мы это сделали, хотя это основано на кортежах, поэтому нам приходилось реализовывать одно и то же несколько раз — для разных кортежей. Я вставлю здесь только использование версии TypeSafe (и, если вам интересно, я могу сделать пост в блоге о них):
val m = Person
streamTypesafe(m)(m.age, m.name, m.height)) {
(age, name, height) =>
// yes, age: Int, name: String, and height: Double! 🙂
}
Используя эти потоки, мы получили возможность легко записывать все миграции, которые нам нужны, и мы все еще достаточно гибки, чтобы, например, удалить поля из коллекции (с некоторой обработкой, поэтому простой запрос Монго не будет быть достаточным). В streamSelect мы открыты, чтобы иметь несколько операторов case, поэтому, даже если коллекция не является однородной, мы можем совпасть с нулем в некоторых полях и по-прежнему работать со всей коллекцией — при необходимости.
В любом случае, я надеюсь, что вы нашли эту кучу фрагментов кода интересной или полезной — мы, безусловно, делаем это в нашем повседневном кодировании ?