Статьи

Новое и классное в Spray 1.2.0 (часть II)

В  предыдущем посте я показал вам улучшения обработки фрагментированных запросов. К сожалению, мне пришлось иметь дело с  uri.path.toString(), что, я надеюсь, вы тоже нашли довольно неудовлетворительным. Позвольте мне улучшить API и добавить несколько тестов, пока я в нем!

Я хотел бы максимально использовать Spray Routing; к сожалению, я не могу просто взять маршрутизацию в том виде, в каком она есть, из-за наших фанковых HTTP-постов. К счастью, Spray-маршрутизация позволяет мне довольно хорошо комбинировать два подхода. Давайте разберем  RecogService и  StreamingRecogService из предыдущего поста на:

trait BasicRecogService extends Directives { ... }
trait StreamingRecogService extends Directives { this: Actor => ... }
class RecogServiceActor(coordinator: ActorRef) 
  extends Actor with BasicRecogService with StreamingRecogService { ... }
class StreamingRecogServiceActor[A](
  coordinator: ActorRef, 
  sessionId: String, 
  message: (String, Array[Byte]) => A) 
  extends Actor { ... }

Это дает мне общую структуру. BasicRecogService Содержит  routing.RouteS , которые обрабатывают не-фрагментированные запросы; StreamingRecogService содержит  routing.RouteS , которые имеют дела с блочными запросами. Наконец, я переименовал подтипы,  Actor чтобы закончить словом  Actor. Следующее, что нужно сделать, это перенести функциональность с

case HttpRequest(HttpMethods.POST, uri, _, entity, _) =>
  val client = sender
  uri.path.toString() match {
    case RootUri =>
      (coordinator ? Begin(1)).mapTo[String].onComplete {
        case Success(sessionId) => 
          client ! HttpResponse(entity = sessionId)
        case Failure(ex) => 
          client ! HttpResponse(entity = ex.getMessage, 
                                status = StatusCodes.InternalServerError)
      }
    case StaticUri(sessionId) =>
      coordinator ! SingleImage(sessionId, entity.data.toByteArray)
  }

Я собираюсь использовать DSL маршрутизации Spray, чтобы определить, что у меня есть два URI, оба принимают запросы POST и выполняют их определенным образом. В Scala это тело BasicRecogService

trait BasicRecogService extends Directives {
  import scala.concurrent.duration._
  import akka.pattern.ask
  import CoordinatorActor._
  import RecogService._

  implicit val timeout = akka.util.Timeout(2.seconds)

  def normalRoute(coordinator: ActorRef)(implicit ec: ExecutionContext) =
    path(Recog) {
      post {
        complete((coordinator ? Begin(1)).mapTo[String])
      }
    } ~
    path(Recog / Rest) { sessionId =>
      post {
        entity(as[Array[Byte]]) { entity =>
          coordinator ! SingleImage(sessionId, entity)
          complete("{}")
        }
      }
    }
}

Далее, мне нужна похожая обработка, но для кусков. Я перевожу старый код

case ChunkedRequestStart(
  HttpRequest(HttpMethods.POST, uri, _, entity, _)) =>
  val streamer = uri.path.toString() match {
    case MJPEGUri(sessionId) => 
      context.actorOf(Props(
        new StreamingRecogService(coordinator, sessionId, SingleImage)))
    case H264Uri(sessionId)  => 
      context.actorOf(Props(
        new StreamingRecogService(coordinator, sessionId, FrameChunk)))
  }
  sender ! RegisterChunkHandler(streamer)

Маршрут аналогичен: при публикации для некоторых URI создайте соответствующие  StreamingRecogServiceActorэкземпляры и попросите Spray отправить оставшуюся часть записи вновь созданным актерам.

trait StreamingRecogService extends Directives {
  this: Actor =>

  import CoordinatorActor._
  import RecogService._

  def chunkedRoute(coordinator: ActorRef) = {
    def handleChunksWith(creator: => Actor): 
    	RequestContext => Unit = {
      val handler = context.actorOf(Props(creator))
      sender ! RegisterChunkHandler(handler)

      {_ => ()}
    }

    path(Recog / MJPEG / Rest) { sessionId =>
      post {
        handleChunksWith(
          new StreamingRecogServiceActor(coordinator, sessionId, SingleImage))
      }
    } ~
    path(Recog / H264 / Rest)  { sessionId =>
      post {
        handleChunksWith(
          new StreamingRecogServiceActor(coordinator, sessionId, FrameChunk))
      }
    }
  }

}

Теперь мне нужны черты, которые определяют маршруты Spray, которые можно применять к простым и чанкованным запросам. Все, что мне нужно сделать сейчас, это использовать их в  RecogServiceActor.

RecogServiceActor

Использование маршрутов, которые я создал ранее, действительно просто: когда актер получает равнину  HttpRequest, мы позволяем его  normalRoute обработать; когда актер получает чанк  HttpRequest, мы позволяем ему chunkedRoute справиться.

class RecogServiceActor(coordinator: ActorRef) 
  extends Actor with BasicRecogService with StreamingRecogService {
  import context.dispatcher
  val normal = normalRoute(coordinator)
  val chunked = chunkedRoute(coordinator)

  def receive: Receive =  {
    case _: Http.Connected => sender ! Http.Register(self)
    case request: HttpRequest => 
      normal(RequestContext(request, sender, request.uri.path).
             withDefaultSender(sender))
    case ChunkedRequestStart(request) => 
      chunked(RequestContext(request, sender, request.uri.path).
              withDefaultSender(sender))
  }

}

Обратите внимание, что я предварительно жую  routing.Routes, применяя  normalRoute к  coordinator (и неявно  ExecutionContext, сделанный доступным  import context.dispatcher), и применяя chunkedRoute к тому же  coordinator. Хорошей новостью является то, что  StreamingRecogServiceActorостается неизменным:

class StreamingRecogServiceActor[A](
  coordinator: ActorRef, 
  sessionId: String, 
  message: (String, Array[Byte]) => A) 
  extends Actor {

  def receive = {
    case MessageChunk(data, _) =>
      coordinator ! message(sessionId, data.toByteArray)
    case ChunkedMessageEnd(_, _) =>
      sender ! HttpResponse(entity = "{}")
      context.stop(self)
  }

}

Тестирование это

Позвольте мне закончить, показывая вам, как тривиально проверить маршруты. В этом тесте я проверю, что HTTP POST  /recog отвечает идентификатором сеанса, который он получает от  coordinator субъекта. Хорошей новостью является то, что теперь, когда я  routing.Route проложил маршруты, я могу использовать Spray’s Test Kit, чтобы упростить тестирование.

class BasicRecogServiceSpec 
  extends Specification 
  with Specs2RouteTest 
  with BasicRecogService {

  class TestCoordinatorActor extends Actor {
    def receive: Receive = {
      case Begin(_) => sender ! "a10b2f45-87dd-4fe1-accf-3361763c1553"
    }
  }

  "Basic recog service" should {
    val coordinator = system.actorOf(Props(new TestCoordinatorActor))

    "return the session ID on post" in {
      Post("/recog") ~> normalRoute(coordinator) ~> check {
        responseAs[String] mustEqual "a10b2f45-87dd-4fe1-accf-3361763c1553"
      }
    }

  }

}

Как вы можете видеть, я использую  routing.Route вернулся, применяя  normalRoute к  coordinatorActorRef I , созданным выше, а затем с помощью Test Kit спрей для построения тестового случая:  Post(uri) ~> route ~> check { responseAs[...] mustEqual }.

Код

Как обычно, полный код находится на GitHub по адресу  https://github.com/eigengo/codemesh2013  для вашего удовольствия от клонирования!