В этой статье я расскажу вам о приложении на основе Spring Framework 4, которое максимально использует WebSockets, RabbitMQ и OpenCV для реализации простого приложения компьютерного зрения. Он считает монеты, которые приложение iOS отправляет в видеопотоке через двоичный WebSocket.
Результат
Возможно, странно, я начну с того, что покажу вам законченный результат. Вы видите, как приложение iOS общается с веб-приложением Spring через двоичный WebSocket; декодирование видео, обработка каждого кадра в коде компьютерного зрения за RabbitMQ, затем получение ответов и отправка результатов в приложение AngularJS.
SpringOne2GX от Cake Solutions Ltd. на Vimeo .
Начиная
Давайте начнем с компонентов RabbitMQ / компьютерного зрения. Это типичная задача Spring AMQP. На самом высоком уровне мы будем создавать RecogService
и RecogServiceActivator
. Это RecogService
точка входа в нашу систему. Мы отправляем фрагменты видеопотока (или полные кадры), отправляем их через Spring Integration, затем Spring AMQP, затем RabbitMQ и приложение C ++, затем обратно в Spring Integration, и, наконец, ответ получен в реализации RecogServiceActivator
.
Вот сколько строк кода мы получаем:
class RecogService(recogChannel: MessageChannel) { private def sendWithContentType(contentType: String, correlationId: CorrelationId, chunk: ChunkData): Unit = { val message = MessageBuilder. withPayload(chunk). setCorrelationId(correlationId). setHeader("content-type", contentType). build() recogChannel.send(message) } def imageChunk(correlationId: CorrelationId)(chunk: ChunkData) = sendWithContentType(ContentTypes.`image/*`, correlationId, chunk) def mjpegChunk(correlationId: CorrelationId)(chunk: ChunkData) = sendWithContentType(ContentTypes.`video/mjpeg`, correlationId, chunk) }
Весенняя интеграция
Я уже отдал часть ядра единорога . Это зависит от того MessageChannel
, что он использует для отправки кусков входных данных. Канал отправляет сообщения по * цепочке * (показано ниже):
На первом этапе мы декодируем фрагмент, что может привести к нескольким кадрам. В коде Scala мы превращаем сингл ChunkData
в Collection[FrameData]
.
class ChunkDecoder(mjpegDecoder: MJPEGDecoder) { def decodeFrame(@Header correlationId: CorrelationId, @Header("content-type") contentType: String, @Payload chunk: ChunkData): util.Collection[ImageData] = contentType match { case `video/mjpeg` => decodeMJPEGFrames(correlationId, chunk) case `image/*` => decodeSingleImage(correlationId, chunk) } private def decodeSingleImage(correlationId: CorrelationId, chunk: ChunkData): util.Collection[ImageData] = Collections.singletonList(chunk) private def decodeMJPEGFrames(correlationId: CorrelationId, chunk: ChunkData): util.Collection[ImageData] = mjpegDecoder.decodeFrames(correlationId, chunk) }
К настоящему времени мы расшифровали столько кадров, сколько могли дать новый фрагмент данных. Однако следующие компоненты работают не с наборами кадров, а с одним кадром. Таким образом, мы должны расколоть Message
из Collection[ImageData]
на столько Message
с о ImageData
сколько элементов в коллекции. * Splitter * Spring Integration Message
по умолчанию разбивает Collection[A]-
полезную нагрузку типа на несколько Message
s , каждый из которых содержит элемент исходной коллекции. Затем мы берем каждый декодированный кадр и передаем его исходящей конечной точке AMQP, не забывая отобразить все исходящие и входящие заголовки. Мы также установили максимальное время ожидания.
Когда мы получаем ответ от собственного кода в конце очередей RabbitMQ, мы имеемArray[Byte]
что мы должны быть преобразованы в String
. Наконец, мы выполняем onCoinResponse
метод recogServiceActivator
бина. Мы представляем цепочку в XML как:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration" xmlns:int-amqp="http://www.springframework.org/schema/integration/amqp" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="..."> <int:chain input-channel="recogRequest"> <int:service-activator method="decodeFrame" ref="chunkDecoder"/> <int:splitter apply-sequence="false"/> <int-amqp:outbound-gateway exchange-name="sogx.exchange" routing-key="sogx.recog.key" reply-timeout="250" mapped-reply-headers="*" mapped-request-headers="*" amqp-template="amqpTemplate"/> <int:object-to-string-transformer/> <int:service-activator ref="recogServiceActivator" method="onCoinResponse"/> </int:chain> <rabbit:connection-factory id="connectionFactory" host="localhost" channel-cache-size="10" /> <rabbit:template id="amqpTemplate" connection-factory="connectionFactory" /> <rabbit:admin id="rabbitAdmin" connection-factory="connectionFactory"/> <rabbit:queue name="sogx.recog.queue" declared-by="rabbitAdmin"/> <rabbit:direct-exchange name="sogx.exchange" declared-by="rabbitAdmin"> <rabbit:bindings> <rabbit:binding queue="sogx.recog.queue" key="sogx.recog.key" /> </rabbit:bindings> </rabbit:direct-exchange> </beans>
Я включил минимальную конфигурацию RabbitMQ, предполагая, что сервер RabbitMQ работает localhost
и не требует аутентификации.
СЛЕДУЮЩАЯ СТРАНИЦА >> Приложение командной строки
Приложение командной строки
Теперь пришло время соединить приложение командной строки. Для этого нам необходимо предоставить конфигурацию Spring Framework для создания всех компонентов, составляющих наше приложение. Задача усложняется тем, что мы хотим использовать ту же структуру для веб-приложения. Мы определим основные bean-компоненты в CoreConfig
признаке, которые обеспечат реализацию по умолчанию всех @Bean
s .
trait CoreConfig { // "boring" beans @Bean def mjpegDecoder() = new MJPEGDecoder() @Bean def chunkDecoder() = new ChunkDecoder(mjpegDecoder()) @Bean def recogService(): RecogService = new RecogService(recogRequest()) // abstract beans @Bean def asyncExecutor(): Executor @Bean def recogServiceActivator(): RecogServiceActivator // SI channel @Bean def recogRequest() = new DirectChannel() // the message converter for the payloads @Bean def messageConverter() = new DelegatingJsonMessageConverter( new MappingJackson2MessageConverter()) // the channel that connects to the WS clients @Bean def dispatchChannel() = new ExecutorSubscribableChannel( asyncExecutor()) // MessagingTemplate (and MessageChannel) to dispatch messages to // for further processing // All MessageHandler beans above subscribe to this channel @Bean def dispatchMessagingTemplate(): SimpMessageSendingOperations = { val template = new SimpMessagingTemplate(dispatchChannel()) template.setMessageConverter(messageConverter()) template } }
К сожалению, этой CoreConfig
черты недостаточно для загрузки приложения Spring. Нам нужно создать class
что смеси в этой черте и реализуют asyncExecutor()
и recogServiceActivator()
методу. Затем мы добавим еще несколько аннотаций.
@Configuration @ImportResource( Array("classpath:/META-INF/spring/integration/module-context.xml")) class App extends CoreConfig { @Bean def asyncExecutor(): Executor = new SyncTaskExecutor @Bean def recogServiceActivator() = new RecogServiceActivator { def onCoinResponse(@Header correlationId: CorrelationId, @Payload coins: CoinResponse): Unit = println(">>> " + correlationId + ": " + coins) } }
class App
достаточно использовать в качестве конфигурации для AnnotationConfigApplicationContext
, и это дает нам полноценное приложение Spring.
// Create the Spring ApplicationContext implementation, // register the @Configuration class and load it val ctx = new AnnotationConfigApplicationContext() ctx.register(classOf[App]) ctx.refresh() // Grab the created RecogService implementation val recogService = ctx.getBean(classOf[RecogService]) recogService.mjpegChunk(UUID.randomUUID().toString)(...) // clean up ctx.close()
Это ядро объекта CLI, который содержит удобный командный цикл.
object Cli extends App { import Commands._ import Utils.reader._ @Configuration @ImportResource( Array("classpath:/META-INF/spring/integration/module-context.xml")) class App extends CoreConfig { ... } @tailrec def commandLoop(): Unit = { Console.readLine() match { case QuitCommand => return case ImageCommand(fileName) => readAll(fileName) (recogService.imageChunk(UUID.randomUUID().toString)) case MJPEGCommand(fileName, fps) => readChunks(fileName, fps) (recogService.mjpegChunk(UUID.randomUUID().toString)) case null => // do nothing case _ => println("wtf??") } // in tail position commandLoop() } // Create the Spring ApplicationContext implementation, // register the @Configuration class and load it val ctx = new AnnotationConfigApplicationContext() ctx.register(classOf[App]) ctx.refresh() // Grab the created RecogService implementation val recogService = ctx.getBean(classOf[RecogService]) // start processing the user input commandLoop() // clean up ctx.close() }
Нативные компоненты
Предполагая, что у вас работает брокер RabbitMQ, вы можете запустить это приложение и выполнить mjpeg:/coins2.mjpeg
команду. К сожалению, это закончится сообщением об ошибке в обработчике цепочки. Это потому, что у нас нет собственных компонентов.
Нативный компонент реализован на C ++ с использованием OpenCV. Нам нужно внедрить его RCP-сервер RabbitMQ, где мы используем код компьютерного зрения (in coins.cpp
).
class Main : public RabbitRpcServer { private: CoinCounter coinCounter; protected: virtual std::string handleMessage( const AmqpClient::BasicMessage::ptr_t message, const AmqpClient::Channel::ptr_t channel); public: Main(const std::string queue, const std::string exchange, const std::string routingKey); }; Main::Main(const std::string queue, const std::string exchange, const std::string routingKey) : RabbitRpcServer::RabbitRpcServer(queue, exchange, routingKey) { } std::string Main::handleMessage(const AmqpClient::BasicMessage::ptr_t message, const AmqpClient::Channel::ptr_t channel) { // return a std::string that represents the result return "{\"succeeded\":false}"; } int main(int argc, char** argv) { Main main("sogx.recog.queue", "sogx.exchange", "sogx.recog.key"); main.runAndJoin(8); return 0; }
Код показывает основные понятия: мы наследуем RabbitRpcServer
и реализуем handleMessage
метод. Приведенный выше код демонстрирует минимальную реализацию, которой недостаточно. Полная реализация использует CoinCounter
для выполнения кода компьютерного зрения.
std::string Main::handleMessage(const AmqpClient::BasicMessage::ptr_t message, const AmqpClient::Channel::ptr_t channel) { Json::Object responseJson; try { // get the message, read the image ImageMessage imageMessage(message); auto imageData = imageMessage.headImage(); auto imageMat = cv::imdecode(cv::Mat(imageData), 1); // ponies & unicorns Jzon::Array coinsJson; auto result = coinCounter.count(imageMat); for (auto i = result.coins.begin(); i != result.coins.end(); ++i) { Jzon::Object coinJson; Jzon::Object centerJson; centerJson.Add("x", i->center.x); centerJson.Add("y", i->center.y); coinJson.Add("center", centerJson); coinJson.Add("radius", i->radius); coinsJson.Add(coinJson); } #ifdef WITH_RINGS responseJson.Add("hasRing", result.hasRing); #endif responseJson.Add("coins", coinsJson); responseJson.Add("succeeded", true); } catch (std::exception &e) { // bantha poodoo! std::cerr << e.what() << std::endl; responseJson.Add("succeeded", false); } catch (...) { // more bantha fodder! responseJson.Add("succeeded", false); } Jzon::Writer writer(responseJson, Jzon::NoFormat); writer.Write(); return writer.GetResult(); }
Теперь вы запустите native/recog
приложение и выполните ту же mjpeg:/coins2.mjpeg
команду, и приложение выполнит успешно и распечатает ответы монет.
СЛЕДУЮЩАЯ СТРАНИЦА >> Веб-приложение
Веб приложение
К сожалению, увидеть какой-то текст на стандартном выводе не совсем то, что пользователи ожидают в 2013 году. (Когда они стали такими мягкими? Я помню, когда мне было 20 лет… О, неважно!) Мы хотели бы предоставить хороший отзывчивый веб приложение, которое позволяет коду * отправлять * сообщения в него, а не запрашивать изменения. WebSocket — идеальное решение для этого. Чтобы сделать его еще проще для нас, мы собираемся использовать библиотеку SockJS и использовать протокол STOMP поверх WebSocket!
Мы просто собираемся «спереди» RecogService
с помощью a RecogController
, и мы собираемся предоставить специальную реализацию. RecogServiceActivator.
Эта реализация будет помнить ответы для всех сессий Recog . И, как мы увидим позже, каждое изменение сеансов будет выдвигать сообщение через WebSocket.
Реализация RecogController
использует, в дополнение к знакомым аннотациям Spring MVC, новые аннотации Spring для обмена сообщениями.
@Controller class RecogController @Autowired()(recogService: RecogService, recogSessions: RecogSessions) { @MessageMapping(Array("/app/recog/image")) def image(@SessionId sessionId: RecogSessionId, @MessageBody body: ChunkData): Unit = { recogService.imageChunk(sessionId.value)(body) } @MessageMapping(Array("/app/recog/mjpeg")) def mjpeg(@SessionId sessionId: RecogSessionId, @MessageBody body: ChunkData): Unit = { recogService.mjpegChunk(sessionId.value)(body) } @RequestMapping(Array("/app/predef/image")) @ResponseBody def foo(): String = { val id = UUID.randomUUID().toString Utils.reader.readAll("/coins2.png")(recogService.imageChunk(id)) recogSessions.sessionEnded(RecogSessionId(id)) "image" } @RequestMapping(Array("/app/predef/coins")) @ResponseBody def bar(@RequestParam(defaultValue = "10") fps: Int): String = { val id = UUID.randomUUID().toString Utils.reader.readChunks("/coins2.mjpeg", fps)(recogService.mjpegChunk(id)) recogSessions.sessionEnded(RecogSessionId(id)) "coins" } }
image
И mjpeg
методы будут выполняться , когда мы получим сообщение над WebSocket на данном URL из нашего приложения IOS. Затем мы передаем тело сообщения RecogService
и затем по цепочке, которую мы уже исследовали. Реализация RecogSessions
зависит от того SimpMessageSendingOperations
, что является частью ядра обмена сообщениями Spring. Он обеспечивает точку входа в базовую шину сообщений.
class RecogSessions(messageSender: SimpMessageSendingOperations) { val sessions = new util.HashMap[RecogSessionId, CoinResponse]() def onCoinResponse(correlationId: CorrelationId, coins: CoinResponse): Unit = { sessions.put(RecogSessionId(correlationId.value), coins) sendSessions() } def sessionEnded(sessionId: RecogSessionId): Unit = { sessions.remove(sessionId) sendSessions() } private def sendSessions(): Unit = messageSender.convertAndSend("/topic/recog/sessions", sessions.values().toString) }
обмен сообщениями
Это подводит нас к сообщениям автобусов. Наше приложение будет использовать две основные шины: одну специально для сообщений WebSocket, а другую для общих сообщений.
Давайте рассмотрим жизнь входящего чанка MJPEG из клиента iOS. Сообщение попадает MessagingWebSocketHandler
первым через свой SimpleUrlHandlerMapping
. MessagingWebSocketHandler
Строит сообщение из входящего кадра WebSocket и посылает его к dispatchChannel
. Оттуда оба подписчика видят сообщение, но только тот AnnotationMessageHandler
может его обработать. Он AnnotationMessageHandler
знает обо всех @Controller
аннотированных компонентах и может выполнять методы в этих контроллерах.
В результате наш RecogController#image
метод выполняется, вызывая mjpegChunk
метод RecogService
. Это активирует нашу цепочку через RabbitMQ, наше приложение OpenCV и обратно RecogServiceActivator
, который затем вызывает onCoinResponse
метод RecogSessions
bean-компонента. Этот бин обновляет свою sessions
карту и помещает сообщение в dispatchChannel
.
Сообщения отправляются как подписчикам, так SimpleBrokerMessageHandler
и AnnotationMessageHandler
подписчикам, но на этот раз только SimpleBrokerMessageHandler
сообщения могут обработать. Обработка включает в себя размещение его в месте webSocketHandlerChannel
, где он будет выбран, SubProtocolWebSocketHandler
и направляется в наше приложение, работающее в браузере.
В коде нам понадобится WebConfig
признак, который определяет зависимость типа от себя CoreConfig
. Это выражает то, что конкретные реализации WebConfig
должны также смешиваться с CoreConfig
.
trait WebConfig { // require instances to be mixed in with CoreConfig this: CoreConfig => // Channel for sending STOMP messages to connected WebSocket // sessions (mostly for internal use) @Bean def webSocketHandlerChannel(): SubscribableChannel = new ExecutorSubscribableChannel(asyncExecutor()) @Bean def taskScheduler(): TaskScheduler = { val taskScheduler = new ThreadPoolTaskScheduler() taskScheduler.setThreadNamePrefix("SockJS-") taskScheduler.setPoolSize(4) taskScheduler.afterPropertiesSet() taskScheduler } // MessageHandler that acts as a "simple" message broker @Bean def simpleBrokerMessageHandler(): SimpleBrokerMessageHandler = { val handler = new SimpleBrokerMessageHandler( webSocketHandlerChannel(), util.Arrays.asList("/topic/", "/queue/")) dispatchChannel().subscribe(handler) handler } // WS -[SockJS]-> /sockjs/** ~> sockJsSocketHandler // SockJS WS handler mapping @Bean def sockJsHandlerMapping(): SimpleUrlHandlerMapping = { val handler = new SubProtocolWebSocketHandler(dispatchChannel()) handler.setDefaultProtocolHandler(new StompProtocolHandler()) webSocketHandlerChannel().subscribe(handler) val sockJsService = new DefaultSockJsService(taskScheduler()) val requestHandler = new SockJsHttpRequestHandler(sockJsService, handler) val hm = new SimpleUrlHandlerMapping() hm.setOrder(-2) hm.setUrlMap(Collections.singletonMap("/sockjs/**", requestHandler)) hm } // WS -[Raw]-> /websocket/** ~> websocketSocketHandler // Raw WS handler mapping @Bean def webSocketHandlerMapping(): SimpleUrlHandlerMapping = { val handler = new MessagingWebSocketHandler(dispatchChannel()) { override def afterConnectionClosed(session: WebSocketSession, closeStatus: CloseStatus) { recogSessions().sessionEnded(RecogSessionId(session.getId)) } } handler.setUriPrefix("/websocket/") val requestHandler = new WebSocketHttpRequestHandler(handler) val hm = new SimpleUrlHandlerMapping() hm.setOrder(-1) hm.setUrlMap(Collections.singletonMap("/websocket/**", requestHandler)) hm } // MessageHandler for processing messages by delegating to // @Controller annotated methods @Bean def annotationMethodMessageHandler(): AnnotationMethodMessageHandler = { val handler = new AnnotationMethodMessageHandler( dispatchMessagingTemplate(), webSocketHandlerChannel()) handler.setCustomArgumentResolvers( util.Arrays.asList(new SessionIdMehtodArgumentResolver)) handler.setDestinationPrefixes(util.Arrays.asList("/app/")) handler.setMessageConverter(messageConverter()) dispatchChannel().subscribe(handler) handler } }
Единственное , что остается, чтобы обеспечить реализацию , что смеси в WebConfig
и CoreConfig
черт, а также некоторые конфигурации для XML-менее веб — приложений. Давайте начнем с Webapp
класса.
@Configuration @EnableWebMvc @ComponentScan(basePackages=Array("org.eigengo.sogx")) class Webapp extends WebMvcConfigurerAdapter with WebConfig with CoreConfig { @Bean def asyncExecutor() = { val executor = new ThreadPoolTaskExecutor executor.setCorePoolSize(4) executor.setCorePoolSize(8) executor.setThreadNamePrefix("MessageChannel-") executor } @Bean def recogServiceActivator() = new RecogServiceActivator { def onCoinResponse(@Header correlationId: CorrelationId, @Payload coins: CoinResponse): Unit = recogSessions().onCoinResponse(correlationId, coins) } // Allow serving HTML files through the default Servlet override def configureDefaultServletHandling( configurer: DefaultServletHandlerConfigurer) = { configurer.enable() } }
Чтобы завершить картину, мы реализуем DispatcherServletInitializer
веб-приложение для XML без использования, которое мы развернем в Jetty.
class DispatcherServletInitializer extends AbstractAnnotationConfigDispatcherServletInitializer { protected def getRootConfigClasses: Array[Class[_]] = { Array[Class[_]](classOf[Webapp]) } protected def getServletConfigClasses: Array[Class[_]] = { Array[Class[_]](classOf[Webapp]) } protected def getServletMappings: Array[String] = { Array[String]("/") } protected override def customizeRegistration( registration: ServletRegistration.Dynamic): Unit = { registration.setInitParameter("dispatchOptionsRequest", "true") } }
Приложение AngularJS
Чтобы завершить картину, мы предоставляем пользовательский интерфейс в приложении AngularJS. Основной компонент SessionsCtrl
использует библиотеку SockJS для установления соединения на основе STOMP через WebSocket.
function SessionsCtrl($scope) { // initialization $scope.sessions = []; // Connect to the server on path /sockjs and then create // the STOMP protocol client var socket = new SockJS('/sockjs'); var stompClient = Stomp.over(socket); stompClient.connect('', '', function(frame) { // receive notifications on the recog/sessions topic stompClient.subscribe("/topic/recog/sessions", function(message) { $scope.$apply(function() { $scope.sessions = angular.fromJson(message.body); }); }); }, function(error) { console.log("STOMP protocol error " + error); } ); }
Фактический HTML-код настолько близок, насколько это возможно к примеру приложения AngularJS.
< !doctype html> <html ng-app="coins"> <head> <title>Coin counter</title> <meta http-equiv="Cache-Control" content="no-store, no-cache, must-revalidate, max-age=0"/> <!-- jQuery --> <script src="assets/js/jquery-2.0.3.min.js"></script> <!-- Bootstrap --> <link href="assets/css/bootstrap.min.css" rel="stylesheet"/> <script src="assets/js/bootstrap.min.js"></script> <!-- WS --> <script src="assets/js/sockjs-0.3.4.js"></script> <script src="assets/js/stomp.js"></script> <!-- Application & AngularJS --> <link href="assets/css/coins.css" rel="stylesheet"/> <script src="assets/js/angular.min.js"></script> <script src="assets/js/sessions.js"></script> <script src="assets/js/components.js"></script> </head> <body> <div ng-controller="SessionsCtrl"> <tabs> <pane title="Raw"> <h3>Raw data</h3> <pre>{{sessions}}</pre> </pane> <pane title="Canvas"> <h3>Visual representation</h3> <div ng-repeat="coins in sessions"> <canvas display="{{coins}}" fill="red" scale="0.4" width="500" height="386"></canvas> </div> </pane> </tabs> </div> </body> </html>
Это завершает заявку. Вы можете использовать приложение iOS для отправки изображений через двоичные WebSockets в приложение на основе Spring. Код Spring принимает сообщения и направляет их брокеру AMQP для обработки нашим собственным (компьютерным или графическим) кодом. Затем ответы направляются обратно в приложения, запущенные в браузерах с помощью WebSockets.
Симуляция сообщений
Прежде чем я позволю вам увидеть исходный код, я покажу вам симулятор RabbitMQ, показывающий поток сообщений. Видео показывает, что приложение Spring создает сообщения с байтами, составляющими кадры, отправляя их с sogx.exchange
использованием sogx.recog.key
ключа маршрутизации. Это означает, что они прибывают sogx.recog.queue
и потребляются потребителями (на видео показаны 2 потребителя, но у нас фактически есть 8). Поскольку приложению Spring необходимо получать ответы, оно создает временную очередь с сгенерированным именем и ожидает поступления ответов в эту очередь. И поэтому, когда recog
приложение отправляет ответы, они поступают во временную очередь, где они используются получателями Spring AMQP.
RabbitMQ Симулятор от Cake Solutions Ltd. на Vimeo .
Резюме и код
Вы можете следить за кодом, groll
используя коммиты на https://github.com/eigengo/springone2gx2013 . README.md содержит полезную информацию для сборки приложения. Самая важная вещь, которую нужно помнить (по состоянию на 14 сентября 2013 года), — вам понадобится ночная сборка Spring Framework.