В предыдущем посте я показал вам улучшения обработки фрагментированных запросов. К сожалению, мне пришлось иметь дело с uri.path.toString()
, что, я надеюсь, вы тоже нашли довольно неудовлетворительным. Позвольте мне улучшить API и добавить несколько тестов, пока я в нем!
Я хотел бы максимально использовать Spray Routing; к сожалению, я не могу просто взять маршрутизацию в том виде, в каком она есть, из-за наших фанковых HTTP-постов. К счастью, Spray-маршрутизация позволяет мне довольно хорошо комбинировать два подхода. Давайте разберем RecogService
и StreamingRecogService
из предыдущего поста на:
trait BasicRecogService extends Directives { ... } trait StreamingRecogService extends Directives { this: Actor => ... } class RecogServiceActor(coordinator: ActorRef) extends Actor with BasicRecogService with StreamingRecogService { ... } class StreamingRecogServiceActor[A]( coordinator: ActorRef, sessionId: String, message: (String, Array[Byte]) => A) extends Actor { ... }
Это дает мне общую структуру. BasicRecogService
Содержит routing.Route
S , которые обрабатывают не-фрагментированные запросы; StreamingRecogService
содержит routing.Route
S , которые имеют дела с блочными запросами. Наконец, я переименовал подтипы, Actor
чтобы закончить словом Actor
. Следующее, что нужно сделать, это перенести функциональность с
case HttpRequest(HttpMethods.POST, uri, _, entity, _) => val client = sender uri.path.toString() match { case RootUri => (coordinator ? Begin(1)).mapTo[String].onComplete { case Success(sessionId) => client ! HttpResponse(entity = sessionId) case Failure(ex) => client ! HttpResponse(entity = ex.getMessage, status = StatusCodes.InternalServerError) } case StaticUri(sessionId) => coordinator ! SingleImage(sessionId, entity.data.toByteArray) }
Я собираюсь использовать DSL маршрутизации Spray, чтобы определить, что у меня есть два URI, оба принимают запросы POST и выполняют их определенным образом. В Scala это тело BasicRecogService
trait BasicRecogService extends Directives { import scala.concurrent.duration._ import akka.pattern.ask import CoordinatorActor._ import RecogService._ implicit val timeout = akka.util.Timeout(2.seconds) def normalRoute(coordinator: ActorRef)(implicit ec: ExecutionContext) = path(Recog) { post { complete((coordinator ? Begin(1)).mapTo[String]) } } ~ path(Recog / Rest) { sessionId => post { entity(as[Array[Byte]]) { entity => coordinator ! SingleImage(sessionId, entity) complete("{}") } } } }
Далее, мне нужна похожая обработка, но для кусков. Я перевожу старый код
case ChunkedRequestStart( HttpRequest(HttpMethods.POST, uri, _, entity, _)) => val streamer = uri.path.toString() match { case MJPEGUri(sessionId) => context.actorOf(Props( new StreamingRecogService(coordinator, sessionId, SingleImage))) case H264Uri(sessionId) => context.actorOf(Props( new StreamingRecogService(coordinator, sessionId, FrameChunk))) } sender ! RegisterChunkHandler(streamer)
Маршрут аналогичен: при публикации для некоторых URI создайте соответствующие StreamingRecogServiceActor
экземпляры и попросите Spray отправить оставшуюся часть записи вновь созданным актерам.
trait StreamingRecogService extends Directives { this: Actor => import CoordinatorActor._ import RecogService._ def chunkedRoute(coordinator: ActorRef) = { def handleChunksWith(creator: => Actor): RequestContext => Unit = { val handler = context.actorOf(Props(creator)) sender ! RegisterChunkHandler(handler) {_ => ()} } path(Recog / MJPEG / Rest) { sessionId => post { handleChunksWith( new StreamingRecogServiceActor(coordinator, sessionId, SingleImage)) } } ~ path(Recog / H264 / Rest) { sessionId => post { handleChunksWith( new StreamingRecogServiceActor(coordinator, sessionId, FrameChunk)) } } } }
Теперь мне нужны черты, которые определяют маршруты Spray, которые можно применять к простым и чанкованным запросам. Все, что мне нужно сделать сейчас, это использовать их в RecogServiceActor
.
RecogServiceActor
Использование маршрутов, которые я создал ранее, действительно просто: когда актер получает равнину HttpRequest
, мы позволяем его normalRoute
обработать; когда актер получает чанк HttpRequest
, мы позволяем ему chunkedRoute
справиться.
class RecogServiceActor(coordinator: ActorRef) extends Actor with BasicRecogService with StreamingRecogService { import context.dispatcher val normal = normalRoute(coordinator) val chunked = chunkedRoute(coordinator) def receive: Receive = { case _: Http.Connected => sender ! Http.Register(self) case request: HttpRequest => normal(RequestContext(request, sender, request.uri.path). withDefaultSender(sender)) case ChunkedRequestStart(request) => chunked(RequestContext(request, sender, request.uri.path). withDefaultSender(sender)) } }
Обратите внимание, что я предварительно жую routing.Route
s, применяя normalRoute
к coordinator
(и неявно ExecutionContext
, сделанный доступным import context.dispatcher
), и применяя chunkedRoute
к тому же coordinator
. Хорошей новостью является то, что StreamingRecogServiceActor
остается неизменным:
class StreamingRecogServiceActor[A]( coordinator: ActorRef, sessionId: String, message: (String, Array[Byte]) => A) extends Actor { def receive = { case MessageChunk(data, _) => coordinator ! message(sessionId, data.toByteArray) case ChunkedMessageEnd(_, _) => sender ! HttpResponse(entity = "{}") context.stop(self) } }
Тестирование это
Позвольте мне закончить, показывая вам, как тривиально проверить маршруты. В этом тесте я проверю, что HTTP POST /recog
отвечает идентификатором сеанса, который он получает от coordinator
субъекта. Хорошей новостью является то, что теперь, когда я routing.Route
проложил маршруты, я могу использовать Spray’s Test Kit, чтобы упростить тестирование.
class BasicRecogServiceSpec extends Specification with Specs2RouteTest with BasicRecogService { class TestCoordinatorActor extends Actor { def receive: Receive = { case Begin(_) => sender ! "a10b2f45-87dd-4fe1-accf-3361763c1553" } } "Basic recog service" should { val coordinator = system.actorOf(Props(new TestCoordinatorActor)) "return the session ID on post" in { Post("/recog") ~> normalRoute(coordinator) ~> check { responseAs[String] mustEqual "a10b2f45-87dd-4fe1-accf-3361763c1553" } } } }
Как вы можете видеть, я использую routing.Route
вернулся, применяя normalRoute
к coordinator
ActorRef
I , созданным выше, а затем с помощью Test Kit спрей для построения тестового случая: Post(uri) ~> route ~> check { responseAs[...] mustEqual }
.
Код
Как обычно, полный код находится на GitHub по адресу https://github.com/eigengo/codemesh2013 для вашего удовольствия от клонирования!