Статьи

Игра с фьючерсами

Во время собеседования мы часто даем разработчикам Scala простую задачу проектирования: моделировать двоичное дерево. Самая простая, но не обязательно лучшая реализация включает в себя Option :

1
case class Tree[+T](value: T, left: Option[Tree[T]], right: Option[Tree[T]])

Бонусные баллы за неизменность, используя case и ковариацию. Гораздо лучше, но более сложная реализация включает в себя два класса case но по крайней мере позволяет моделировать пустые деревья

1
2
3
sealed trait Tree[+T]
case object Empty extends Tree[Nothing]
case class Node[+T](value: T, left: Tree[T], right: Tree[T]) extends Tree[T]

Давайте придерживаться первой идеи. Теперь реализуем построение дерева с произвольной высотой:

1
2
3
4
5
6
7
8
9
def apply[T](n: Int)(block: => T): Tree[T] = n match {
    case 1 => Tree(block, None, None)
    case _ =>
        Tree(
            block,
            Some(Tree(n - 1)(block)),
            Some(Tree(n - 1)(block))
        )
}

Чтобы построить дерево из 1024 листьев и всех случайных величин, достаточно сказать:

1
val randomTree: Tree[Double] = Tree(1 + 10)(math.random)

Это открытый вопрос, следующим требованием может быть написание метода map эквивалентного Seq.map() или Option.map() . Понимание того, что это значит, является частью вопроса. Реализация на удивление проста:

1
2
3
4
5
6
7
8
case class Tree[+T](value: T, left: Option[Tree[T]], right: Option[Tree[T]]) = {
  
def map[R](f: T => R): Tree[R] =
    Tree(
        f(value),
         left.map{_.map(f)},
        right.map{_.map(f)})  
}

ОК… .map{_.map(f)} , ты шутишь? Помните, что left и right являются Option s, а Option.map(f) превращает Option[T] в Option[R] . Итак, первая map приходит из Option . Второй _.map(f) на самом деле является рекурсивным вызовом Tree.map() . Теперь мы можем, например, создать второе дерево (неизменность!) С каждым увеличенным значением, но сохраняющим структуру:

1
2
val tree: Tree[Int] = //...
val incremented = tree.map(1 +)

… или с toString() вызываемой для каждого значения:

1
val stringified = tree.map(_.toString)

Давай пойдем немного дальше. Если функция f отнимает много времени и не имеет побочных эффектов (что является частым требованием при выполнении map() ) или наше дерево значительно больше, как насчет распараллеливания Tree.map() каким-либо образом? Есть несколько способов добиться этого и довольно много ловушек. Самый простой подход — использовать пул потоков, поддерживаемый ExecutionContext :

01
02
03
04
05
06
07
08
09
10
11
12
case class Tree[+T](value: T, left: Option[Tree[T]], right: Option[Tree[T]]) {
    def pmap[R](f: T => R)(implicit ec: ExecutionContext, timeout: Duration): Tree[R] = {
        val transformed: Future[R] = Future { f(value)}
        val  leftFuture: Option[Future[Tree[R]]] =  left.map { l => Future { l.pmap(f)}}
        val rightFuture: Option[Future[Tree[R]]] = right.map { r => Future { r.pmap(f)}}
  
        Tree(
            Await.result(transformed, timeout),
             leftFuture.map(Await.result(_, timeout)),
            rightFuture.map(Await.result(_, timeout)))
    }
}

Использовать pmap (имя не является совпадением ) довольно просто, если разобраться с несколькими implicits :

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
import scala.concurrent.{Await, Future, ExecutionContext}
import java.util.concurrent.Executors
import scala.concurrent.duration._
  
val pool = Executors newFixedThreadPool 10
implicit val ec: ExecutionContext = ExecutionContext fromExecutor pool
implicit val timeout = 10.second
  
val tree = Tree("alpha",
    None,
    Some(
        Tree("beta",
            None,
            None)))
  
println(tree.pmap{_.toUpperCase})

В приведенном выше примере кода будет использовано простое дерево с корнем « альфа » и правым потомком « бета », а также значения в верхнем регистре всех значений в нескольких потоках. Вызов Future { ... } — это простая идиома для отправки асинхронной задачи в пул потоков и получения взамен Future[T] .

Есть как минимум пара проблем с этим кодом. Прежде всего это в основном … ждет. Несколько потоков будут простаивать, ожидая, пока дети завершат. Но это не худший вариант развития событий. Представьте, что наш пул потоков ограничен одним потоком (проблема остается для больших, но все еще ограниченных пулов). Мы порождаем подзадачи для детей и ждем их окончания. Но эти подзадачи никогда не запускаются, потому что они не могут получить поток из пула потоков. Почему? Потому что в пуле только один поток, и мы его уже потребляем! Этот единственный поток блокируется в ожидании простоя задач, которые никогда не могут быть завершены. Это называется тупик . На самом деле код истекает по истечении заданного промежутка времени, но это не меняет того факта, что реализация выше с треском проваливается. ForkJoinPool бы эту проблему, но есть более продвинутые и полезные решения.

Ввод Tree[Future[R]]

Удивительно, но есть еще лучший, более функциональный и чистый подход. Реактивное программирование препятствует ожиданию. Вместо того, чтобы скрывать асинхронный характер дерева обработки, давайте подчеркнем это! Поскольку обработка уже основана на Future s, поместите их явно в API:

1
2
3
4
5
6
7
8
9
case class Tree[+T](value: T, left: Option[Tree[T]], right: Option[Tree[T]]) {
    def mapf[R](f: T => R)(implicit ec: ExecutionContext, timeout: Duration): Tree[Future[R]] = {
        Tree(
            Future { f(value) },
             left.map {_.mapf(f)},
            right.map {_.mapf(f)}
        )
    }
}

Tree.mapf() возвращается немедленно, но вместо возврата Tree[R] мы теперь получаем Tree[Future[R]] . Таким образом, у нас есть дерево, где каждый узел содержит независимое Future . Как мы можем вернуться к знакомому Tree[R] ? Один подход использует Tree.map() , который мы уже реализовали:

1
2
3
val treeOfFutures: Tree[Future[R]] = ...
  
val tree = treeOfFutures.map(Await.result(_, 10.seconds))

Бьюсь об заклад, это не ясно, но в принципе это просто — для каждого узла ждать независимого будущего объекта, пока все они не будут решены. Нет риска тупика, потому что фьючерсы не зависят друг от друга.

Превращая Tree[Future[R]] в Future[Tree[R]]

Но мы хотим пойти глубже. Зачем работать с кучей будущих, если у нас может быть только одно будущее, чтобы управлять ими всеми ? Подумайте о Future.sequence() который превращает Seq[Future[T]] в Future[Seq[T]] . Реализация такого метода для Tree[Future[T]] сама по себе является хорошей задачей. Идея состоит в том, чтобы иметь счетчик всех нерешенных задач, и как только все они будут выполнены, разыменуйте все фьючерсы без блокировки (так как они уже завершены):

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
object Tree {
  
    def sequence[T](tree: Tree[Future[T]])(implicit ec: ExecutionContext, timeout: Duration): Future[Tree[T]] = {
        val promise = Promise[Tree[T]]()
        val pending = new AtomicInteger(tree.size)
        for {
            future <- tree
            value <- future
        } if(pending.decrementAndGet() == 0) {
            promise.success(
                tree.map(Await.result(_, 0.seconds))    //will never block
            )
        }
        promise.future
    }
}

Код выше является немного обязательным и не обрабатывает исключения должным образом — но может быть хорошей отправной точкой. Мы перебираем все фьючерсы и уменьшаем счетчик после завершения каждого из них. Если все фьючерсы сделаны, мы выполняем наше обещание. Tree.size выше код требует двух дополнительных методов: Tree.size и Tree.foreach() (неявно используемых внутри для понимания), которые я оставляю для вас в качестве упражнения.

Ссылка: игра с фьючерсами от нашего партнера JCG Томаша Нуркевича в блоге Java и соседстве .