Статьи

Игра с фьючерсами Scala

Во время собеседования мы часто даем разработчикам Scala простую задачу проектирования: моделировать двоичное дерево. Самая простая, но не обязательно лучшая реализация включает в себя  Option идиому:

case class Tree[+T](value: T, left: Option[Tree[T]], right: Option[Tree[T]])

Бонусные баллы за неизменность, используя  case класс и ковариацию. Гораздо лучше, но более сложная реализация включает два  case класса, но, по крайней мере, позволяет моделировать пустые деревья:

sealed trait Tree[+T]
case object Empty extends Tree[Nothing]
case class Node[+T](value: T, left: Tree[T], right: Tree[T]) extends Tree[T]

Давайте придерживаться первой идеи. Теперь реализуем построение дерева с произвольной высотой:

def apply[T](n: Int)(block: => T): Tree[T] = n match {
    case 1 => Tree(block, None, None)
    case _ =>
        Tree(
            block,
            Some(Tree(n - 1)(block)),
            Some(Tree(n - 1)(block))
        )
}

Чтобы построить дерево из 1024 листьев и всех случайных величин, достаточно сказать:

val randomTree: Tree[Double] = Tree(1 + 10)(math.random)

Это открытый вопрос, следующим требованием может быть написание  map метода, эквивалентного  Seq.map() или Option.map(). Понимание того, что это значит, является частью вопроса. Реализация на удивление проста:

case class Tree[+T](value: T, left: Option[Tree[T]], right: Option[Tree[T]]) = {
 
def map[R](f: T => R): Tree[R] =
    Tree(
        f(value), 
         left.map{_.map(f)}, 
        right.map{_.map(f)})    
}

ОК …  .map{_.map(f)}ты что, шутишь? Помните , что  left и  right являются  Options и  Option.map(f) повороты Option[T] в  Option[R]. Итак, сначала  map приходит от  Option. Второе  _.map(f) — это рекурсивный вызов Tree.map(). Теперь мы можем, например, создать второе дерево (неизменность!) С каждым увеличенным значением, но сохраняющим структуру:

val tree: Tree[Int] = //...
val incremented = tree.map(1 +)

… или с  toString() вызванным на каждое значение:

val stringified = tree.map(_.toString) 

Давайте пойдем немного дальше. Если  f функция отнимает много времени и не имеет побочных эффектов (что часто бывает при выполнении  map()), или наше дерево значительно больше, как насчет распараллеливания  Tree.map() ? Есть несколько способов добиться этого и довольно много ловушек. Самый простой подход — использовать пул потоков, поддерживаемый  ExecutionContext:

case class Tree[+T](value: T, left: Option[Tree[T]], right: Option[Tree[T]]) {
    def pmap[R](f: T => R)(implicit ec: ExecutionContext, timeout: Duration): Tree[R] = {
        val transformed: Future[R] = Future { f(value)}
        val  leftFuture: Option[Future[Tree[R]]] =  left.map { l => Future { l.pmap(f)}}
        val rightFuture: Option[Future[Tree[R]]] = right.map { r => Future { r.pmap(f)}}
 
        Tree(
            Await.result(transformed, timeout),
             leftFuture.map(Await.result(_, timeout)),
            rightFuture.map(Await.result(_, timeout)))
    }
}

Использовать  pmap (имя не  случайно ) довольно просто, если разобраться с несколькими из них  implicits:

import scala.concurrent.{Await, Future, ExecutionContext}
import java.util.concurrent.Executors
import scala.concurrent.duration._
 
val pool = Executors newFixedThreadPool 10
implicit val ec: ExecutionContext = ExecutionContext fromExecutor pool
implicit val timeout = 10.second
 
val tree = Tree("alpha",
    None,
    Some(
        Tree("beta",
            None,
            None)))
 
println(tree.pmap{_.toUpperCase})

В приведенном выше примере кода будет использовано простое дерево с корнем « альфа » и правым потомком « бета », а также значения в верхнем регистре всех значений в нескольких потоках. Вызов  Future { ... } — это простая идиома для отправки асинхронной задачи в пул потоков и получения  Future[T]взамен. 

Есть как минимум пара проблем с этим кодом. Прежде всего это в основном … ждет. Несколько потоков будут простаивать, ожидая, пока дети завершат. Но это не худший вариант развития событий. Представьте, что наш пул потоков ограничен одним потоком (проблема остается для больших, но все еще ограниченных пулов). Мы порождаем подзадачи для детей и ждем их окончания. Но эти подзадачи никогда не запускаются, потому что они не могут получить поток из пула потоков. Зачем? Потому что в пуле только один поток, и мы его уже потребляем! Этот единственный поток блокируется в ожидании простоя задач, которые никогда не могут быть завершены. Это называется тупик . На самом деле код истекает по истечении заданного промежутка времени, но это не меняет того факта, что реализация выше с треском проваливается. ForkJoinPool решил бы эту проблему, но есть более продвинутые и полезные решения.

входящий Tree[Future[R]]

Удивительно, но есть еще лучший, более функциональный и чистый подход. Реактивное программирование препятствует ожиданию. Вместо того, чтобы скрывать асинхронный характер дерева обработки, давайте подчеркнем это! Поскольку обработка уже основана на  Futures, поместите их явно в API:

case class Tree[+T](value: T, left: Option[Tree[T]], right: Option[Tree[T]]) {
    def mapf[R](f: T => R)(implicit ec: ExecutionContext, timeout: Duration): Tree[Future[R]] = {
        Tree(
            Future { f(value) },
             left.map {_.mapf(f)},
            right.map {_.mapf(f)}
        )
    }
}

Tree.mapf() возвращается немедленно, но вместо возвращения  Tree[R] мы теперь получаем  Tree[Future[R]]. Таким образом, у нас есть дерево, где каждый узел содержит независимый  Future. Как мы можем вернуться к знакомым  Future[R]? Используется один подход Tree.map(), который мы уже реализовали:

val treeOfFutures: Tree[Future[R]] = ...
 
val tree = treeOfFutures.map(Await.result(_, 10.seconds))

Бьюсь об заклад, это не ясно, но в принципе это просто — для каждого узла ждать независимого будущего объекта, пока все они не будут решены. Нет риска тупика, потому что фьючерсы не зависят друг от друга. 

Превращаясь  Tree[Future[R]] в Future[Tree[R]]

Но мы хотим пойти глубже. Зачем работать с кучей будущих, если у нас может быть только  одно будущее, чтобы управлять ими всеми ? Подумайте о Future.sequence() том, что превращается  Seq[Future[T]] в  Future[Seq[T]]. Реализация такого метода Tree[Future[T]] сама по себе является хорошей задачей. Идея состоит в том, чтобы иметь счетчик всех нерешенных задач, и как только все они будут выполнены, разыменуйте все фьючерсы без блокировки (так как они уже завершены):

object Tree {
 
    def sequence[T](tree: Tree[Future[T]])(implicit ec: ExecutionContext, timeout: Duration): Future[Tree[T]] = {
        val promise = Promise[Tree[T]]()
        val pending = new AtomicInteger(tree.size)
        for {
            future <- tree
            value <- future
        } if(pending.decrementAndGet() == 0) {
            promise.success(
                tree.map(Await.result(_, 0.seconds))    //will never block
            )
        }
        promise.future
    }
}

Код выше является немного обязательным и не обрабатывает исключения должным образом — но может быть хорошей отправной точкой.
Мы перебираем все фьючерсы и уменьшаем счетчик после завершения каждого из них. Если все фьючерсы сделаны, мы выполняем наше обещание. Приведенный выше код требует два дополнительных метода: 
Tree.size и 
Tree.foreach() (используется неявно внутри для понимания) — который я оставляю для вас в качестве упражнения.