Статьи

Ответные вызовы WebSockets в Play Framework

 Я получил вопрос от пользователя Play о реализации WebSockets для вызова / ответа в Play Framework. Это не то, что встречается так часто, поскольку это означает, что использование WebSockets в основном делает то, что AJAX делает для вас, так какой в ​​этом смысл? Но вот несколько вариантов использования, о которых я подумал:

  • У вас есть некоторое преобразование потока, которое может быть сделано только на стороне сервера. Например, возможно преобразование требует некоторой тяжелой работы с базой данных, или слишком дорого для мобильного клиента, или, возможно, вы хотите зашифровать поток с помощью ключа, который является частным для сервера.
  • Вы уже обрабатываете поток событий с сервера, используя WebSockets, и ответы на вызовы — это просто больше событий в этом потоке, поэтому вы хотите использовать один и тот же механизм транспорта для этих событий.
  • Ваше приложение особенно болтливое, и вам не нужны служебные данные протокола HTTP при каждом вызове / ответе.

Возможно, есть и другие варианты использования — WebSockets — это довольно новая технология, и как отрасль мы еще не определились с ее наилучшими вариантами использования.

Простая реализация эха

Play WebSocket реализуется путем предоставления итератора, который принимает сообщения от клиента, и счетчика, который создает сообщения для клиента. Если бы мы просто хотели отобразить каждое сообщение, которое отправил нам клиент, мы бы хотели вернуть итератора, чьи входные данные становятся выходными данными возвращаемого нами перечислителя. В Play нет ничего из коробки, чтобы сделать это, но мы, вероятно, добавим что-то из коробки, что сделает это в будущем выпуске. Сейчас я собираюсь написать метод с именем join, который возвращает объединенную пару iteratee / enumerator:

/**
 * Create a joined iteratee enumerator pair.
 *
 * When the enumerator is applied to an iteratee, the iteratee subsequently consumes whatever the iteratee in the pair
 * is applied to.  Consequently the enumerator is "one shot", applying it to subsequent iteratees will throw an
 * exception.
 */
def joined[A]: (Iteratee[A, Unit], Enumerator[A]) = {
  val promisedIteratee = Promise[Iteratee[A, Unit]]()
  val enumerator = new Enumerator[A] {
    def apply[B](i: Iteratee[A, B]) = {
      val doneIteratee = Promise[Iteratee[A, B]]()
 
      // Equivalent to map, but allows us to handle failures
      def wrap(delegate: Iteratee[A, B]): Iteratee[A, B] = new Iteratee[A, B] {
        def fold[C](folder: (Step[A, B]) => Future[C]) = {
          val toReturn = delegate.fold {
            case done @ Step.Done(a, in) => {
              doneIteratee.success(done.it)
              folder(done)
            }
            case Step.Cont(k) => {
              folder(Step.Cont(k.andThen(wrap)))
            }
            case err => folder(err)
          }
          toReturn.onFailure {
            case e => doneIteratee.failure(e)
          }
          toReturn
        }
      }
 
      if (promisedIteratee.trySuccess(wrap(i).map(_ => ()))) {
        doneIteratee.future
      } else {
        throw new IllegalStateException("Joined enumerator may only be applied once")
      }
    }
  }
  (Iteratee.flatten(promisedIteratee.future), enumerator)
}

Этот код может быть немного страшным, если вы не понимаете итераторов, но, как я сказал, мы, вероятно, добавим это в сам Play в будущем. Остальная часть кода в этом посте будет простой.

Теперь, когда у нас есть присоединенный итератор / перечислитель, давайте реализуем эхо WebSocket. В оставшейся части этого поста мы будем предполагать, что все наши WebSockets отправляют / получают сообщения JSON.

def echo = WebSocket.using[JsValue] { req =>
  joined[JsValue]
}

Итак, теперь у нас есть эхо-вызов / ответ WebSocket. Но это не очень полезно, мы хотим что-то делать с входящими сообщениями и создавать новые исходящие сообщения в качестве ответов.

Обработка сообщений

Итак, теперь, когда мы выразили наш вызов / ответ в терминах присоединенного итератора / перечислителя, как мы можем преобразовать сообщения вызова в различные сообщения ответа? Ответ перечисляется. Перечисляемые могут быть использованы для преобразования итераторов и перечислителей. Мы возвращаем как перечислитель, так и итератор, так какой же мы преобразовываем? Ответ не имеет значения, я собираюсь использовать его для преобразования итерируемого. Перечисляемый объект, который мы собираемся использовать, это перечисляемый элемент карты:

def process = WebSocket.using[JsValue] { req =>
  val (iter, enum) = joined[JsValue]
 
  (Enumeratee.map[JsValue] { json =>
    Json.obj(
      "status" -> "received",
      "msg" -> json
    )
  } &> iter, enum)
}

Перечисляемые пользователи — одна из самых мощных функций конечных пользователей для конечных пользователей. Вы можете использовать любого перечисляемого здесь, но давайте посмотрим на некоторые примеры других распространенных случаев использования.

Что если мы не хотим возвращать ответ на каждое сообщение? Есть множество способов сделать это, но самый простой — это использовать collectenumeratee, который выполняет частичную функцию:

def process = WebSocket.using[JsValue] { req =>
  val (iter, enum) = joined[JsValue]
 
  (Enumeratee.collect[JsValue] { 
    case json if (json \ "foo").asOpt[JsValue].isDefined =>
      Json.obj(
        "status" -> "received",
        "msg" -> json
      )
  } &> iter, enum)
}

Что делать, если мы хотим сделать некоторые операции блокировки? В Play 2.2 это можно будет сделать, просто предоставив контекст выполнения, подходящий для блокировки вызовов, для того, кого вы решите использовать, но Play 2.1 пока не поддерживает это, поэтому нам приходится отправлять обратный вызов в другой контекст выполнения самостоятельно. , Это можно сделать с помощью перечисления mapM:

val ec: ExecutionContext = ...
 
def process = WebSocket.using[JsValue] { req =>
  val (iter, enum) = joined[JsValue]
 
  (Enumeratee.mapM[JsValue] { json =>
    Future {
      // Some expensive computation, eg a database call, that returns JsValue
    }(ec)
  } &> iter, enum)
}


Что делать, если мы хотим сделать некоторые операции блокировки? В Play 2.2 это можно будет сделать, просто предоставив контекст выполнения, подходящий для блокировки вызовов, для того, кого вы решите использовать, но Play 2.1 пока не поддерживает это, поэтому нам приходится отправлять обратный вызов в другой контекст выполнения самостоятельно. , Это можно сделать с помощью перечисления mapM:

val ec: ExecutionContext = ...
 
def process = WebSocket.using[JsValue] { req =>
  val (iter, enum) = joined[JsValue]
 
  (Enumeratee.mapM[JsValue] { json =>
    Future {
      // Some expensive computation, eg a database call, that returns JsValue
    }(ec)
  } &> iter, enum)
}

Отталкивание от внешнего счетчика

Возможно, вы захотите объединить ваши сообщения о вызове / ответе с сообщениями от другого перечислителя, который самопроизвольно отправляет сообщения клиенту, например, перечислитель для всех клиентов. Это может быть сделано путем чередования вашего присоединенного перечислителя с внешним перечислителем:

val globalEvents: Enumerator[JsValue] = ...
 
def process = WebSocket.using[JsValue] { req =>
  val (iter, enum) = joined[JsValue]
 
  (Enumeratee.map[JsValue] { json =>
    ...
  } &> iter, Enumerator.interleave(enum, globalEvents))
}

Вывод

Использование WebSockets в стиле ответа на вызов может потребоваться вашему приложению. Если это так, то использование перечислимых для отображения потока сообщений, поступающих на выходящие сообщения, является наиболее естественным и идиоматическим способом сделать это в Play. Он позволяет вам вызывать большое количество составных перечислимых, которые Play предоставляет из коробки, и делает ваш код простым и понятным.