В предыдущем посте я показал вам улучшения обработки фрагментированных запросов. К сожалению, мне пришлось иметь дело с uri.path.toString(), что, я надеюсь, вы тоже нашли довольно неудовлетворительным. Позвольте мне улучшить API и добавить несколько тестов, пока я в нем!
Я хотел бы максимально использовать Spray Routing; к сожалению, я не могу просто взять маршрутизацию в том виде, в каком она есть, из-за наших фанковых HTTP-постов. К счастью, Spray-маршрутизация позволяет мне довольно хорошо комбинировать два подхода. Давайте разберем RecogService и StreamingRecogService из предыдущего поста на:
trait BasicRecogService extends Directives { ... }
trait StreamingRecogService extends Directives { this: Actor => ... }
class RecogServiceActor(coordinator: ActorRef)
extends Actor with BasicRecogService with StreamingRecogService { ... }
class StreamingRecogServiceActor[A](
coordinator: ActorRef,
sessionId: String,
message: (String, Array[Byte]) => A)
extends Actor { ... }
Это дает мне общую структуру. BasicRecogService Содержит routing.RouteS , которые обрабатывают не-фрагментированные запросы; StreamingRecogService содержит routing.RouteS , которые имеют дела с блочными запросами. Наконец, я переименовал подтипы, Actor чтобы закончить словом Actor. Следующее, что нужно сделать, это перенести функциональность с
case HttpRequest(HttpMethods.POST, uri, _, entity, _) =>
val client = sender
uri.path.toString() match {
case RootUri =>
(coordinator ? Begin(1)).mapTo[String].onComplete {
case Success(sessionId) =>
client ! HttpResponse(entity = sessionId)
case Failure(ex) =>
client ! HttpResponse(entity = ex.getMessage,
status = StatusCodes.InternalServerError)
}
case StaticUri(sessionId) =>
coordinator ! SingleImage(sessionId, entity.data.toByteArray)
}
Я собираюсь использовать DSL маршрутизации Spray, чтобы определить, что у меня есть два URI, оба принимают запросы POST и выполняют их определенным образом. В Scala это тело BasicRecogService
trait BasicRecogService extends Directives {
import scala.concurrent.duration._
import akka.pattern.ask
import CoordinatorActor._
import RecogService._
implicit val timeout = akka.util.Timeout(2.seconds)
def normalRoute(coordinator: ActorRef)(implicit ec: ExecutionContext) =
path(Recog) {
post {
complete((coordinator ? Begin(1)).mapTo[String])
}
} ~
path(Recog / Rest) { sessionId =>
post {
entity(as[Array[Byte]]) { entity =>
coordinator ! SingleImage(sessionId, entity)
complete("{}")
}
}
}
}
Далее, мне нужна похожая обработка, но для кусков. Я перевожу старый код
case ChunkedRequestStart(
HttpRequest(HttpMethods.POST, uri, _, entity, _)) =>
val streamer = uri.path.toString() match {
case MJPEGUri(sessionId) =>
context.actorOf(Props(
new StreamingRecogService(coordinator, sessionId, SingleImage)))
case H264Uri(sessionId) =>
context.actorOf(Props(
new StreamingRecogService(coordinator, sessionId, FrameChunk)))
}
sender ! RegisterChunkHandler(streamer)
Маршрут аналогичен: при публикации для некоторых URI создайте соответствующие StreamingRecogServiceActorэкземпляры и попросите Spray отправить оставшуюся часть записи вновь созданным актерам.
trait StreamingRecogService extends Directives {
this: Actor =>
import CoordinatorActor._
import RecogService._
def chunkedRoute(coordinator: ActorRef) = {
def handleChunksWith(creator: => Actor):
RequestContext => Unit = {
val handler = context.actorOf(Props(creator))
sender ! RegisterChunkHandler(handler)
{_ => ()}
}
path(Recog / MJPEG / Rest) { sessionId =>
post {
handleChunksWith(
new StreamingRecogServiceActor(coordinator, sessionId, SingleImage))
}
} ~
path(Recog / H264 / Rest) { sessionId =>
post {
handleChunksWith(
new StreamingRecogServiceActor(coordinator, sessionId, FrameChunk))
}
}
}
}
Теперь мне нужны черты, которые определяют маршруты Spray, которые можно применять к простым и чанкованным запросам. Все, что мне нужно сделать сейчас, это использовать их в RecogServiceActor.
RecogServiceActor
Использование маршрутов, которые я создал ранее, действительно просто: когда актер получает равнину HttpRequest, мы позволяем его normalRoute обработать; когда актер получает чанк HttpRequest, мы позволяем ему chunkedRoute справиться.
class RecogServiceActor(coordinator: ActorRef)
extends Actor with BasicRecogService with StreamingRecogService {
import context.dispatcher
val normal = normalRoute(coordinator)
val chunked = chunkedRoute(coordinator)
def receive: Receive = {
case _: Http.Connected => sender ! Http.Register(self)
case request: HttpRequest =>
normal(RequestContext(request, sender, request.uri.path).
withDefaultSender(sender))
case ChunkedRequestStart(request) =>
chunked(RequestContext(request, sender, request.uri.path).
withDefaultSender(sender))
}
}
Обратите внимание, что я предварительно жую routing.Routes, применяя normalRoute к coordinator (и неявно ExecutionContext, сделанный доступным import context.dispatcher), и применяя chunkedRoute к тому же coordinator. Хорошей новостью является то, что StreamingRecogServiceActorостается неизменным:
class StreamingRecogServiceActor[A](
coordinator: ActorRef,
sessionId: String,
message: (String, Array[Byte]) => A)
extends Actor {
def receive = {
case MessageChunk(data, _) =>
coordinator ! message(sessionId, data.toByteArray)
case ChunkedMessageEnd(_, _) =>
sender ! HttpResponse(entity = "{}")
context.stop(self)
}
}
Тестирование это
Позвольте мне закончить, показывая вам, как тривиально проверить маршруты. В этом тесте я проверю, что HTTP POST /recog отвечает идентификатором сеанса, который он получает от coordinator субъекта. Хорошей новостью является то, что теперь, когда я routing.Route проложил маршруты, я могу использовать Spray’s Test Kit, чтобы упростить тестирование.
class BasicRecogServiceSpec
extends Specification
with Specs2RouteTest
with BasicRecogService {
class TestCoordinatorActor extends Actor {
def receive: Receive = {
case Begin(_) => sender ! "a10b2f45-87dd-4fe1-accf-3361763c1553"
}
}
"Basic recog service" should {
val coordinator = system.actorOf(Props(new TestCoordinatorActor))
"return the session ID on post" in {
Post("/recog") ~> normalRoute(coordinator) ~> check {
responseAs[String] mustEqual "a10b2f45-87dd-4fe1-accf-3361763c1553"
}
}
}
}
Как вы можете видеть, я использую routing.Route вернулся, применяя normalRoute к coordinatorActorRef I , созданным выше, а затем с помощью Test Kit спрей для построения тестового случая: Post(uri) ~> route ~> check { responseAs[...] mustEqual }.
Код
Как обычно, полный код находится на GitHub по адресу https://github.com/eigengo/codemesh2013 для вашего удовольствия от клонирования!