Отправленные сервером события (или просто SSE ) — довольно полезный протокол, который позволяет серверам передавать данные клиентам через HTTP. Это то, что наши веб-браузеры поддерживают целую вечность, но, что удивительно, долгое время игнорировалось спецификацией JAX-RS . Хотя у Джерси было расширение, доступное для медиа-типа SSE , API никогда не был формализован и поэтому не был переносимым на другие реализации JAX-RS .
К счастью, JAX-RS 2.1 , также известный как JSR-370 , изменил это, сделав поддержку SSE , как на стороне клиента, так и на стороне сервера, частью официальной спецификации. В сегодняшнем посте мы рассмотрим, как интегрировать поддержку SSE в существующие веб-сервисы Java REST (ful) , используя недавно выпущенную версию 3.2.0 потрясающей платформы Apache CXF . Фактически, кроме начальной загрузки, на самом деле нет ничего специфичного для CXF , все примеры должны работать в любой другой среде, которая реализует спецификацию JAX-RS 2.1 .
Без дальнейших церемоний, давайте начнем. Поскольку в наши дни значительное количество Java-проектов построено на основе потрясающей Spring Framework , наше примерное приложение будет использовать Spring Boot и Apache CXF Spring Boot Integration, чтобы быстро начать работу. Старый добрый приятель Apache Maven также поможет нам, управляя зависимостями нашего проекта
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
org.springframework.boot spring-boot-starter 1.5.8.RELEASE org.apache.cxf cxf-rt-frontend-jaxrs 3.2.0 org.apache.cxf cxf-spring-boot-starter-jaxrs 3.2.0 org.apache.cxf cxf-rt-rs-client 3.2.0 org.apache.cxf cxf-rt-rs-sse 3.2.0 |
Под капотом Apache CXF использует среду Atmosphere для реализации транспорта SSE, так что это еще одна зависимость, которую мы должны включить.
|
1
2
3
|
org.atmosphere atmosphere-runtime 2.4.14 |
Особенности использования инфраструктуры Atmosphere приводят к необходимости предоставления дополнительных параметров конфигурации, а именно transportId , чтобы гарантировать, что транспорт, поддерживающий SSE, будет выбран во время выполнения. Соответствующие данные могут быть добавлены в файл application.yml :
|
1
2
3
4
|
cxf: servlet: init: transportId: http://cxf.apache.org/transports/http/sse |
Отлично, значит, основа есть, двигаясь дальше. Веб-сервис REST (ful), который мы собираемся создать, будет представлять мнимые средние нагрузки на процессор (для простоты, генерируемые случайным образом) в виде потоков SSE . Класс Stats будет представлять нашу модель данных.
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
|
public class Stats { private long timestamp; private int load; public Stats() { } public Stats(long timestamp, int load) { this.timestamp = timestamp; this.load = load; } // Getters and setters are omitted ...} |
Говоря о потоках, спецификация Reactive Streams вошла в Java 9, и, надеюсь, мы увидим ускоренное принятие моделей реактивного программирования сообществом Java. Более того, разработка веб-сервисов REST (ful) с поддержкой SSE была бы намного проще и понятнее при поддержке Reactive Streams . Для этого давайте добавим RxJava 2 в наш пример приложения.
|
1
2
3
|
io.reactivex.rxjava2 rxjava 2.1.6 |
Это хороший момент, чтобы начать с нашего класса StatsRestService , типичной реализации ресурса JAX-RS . Ключевые возможности SSE в JAX-RS 2.1 сосредоточены вокруг контекстного объекта Sse, который может быть введен следующим образом.
|
1
2
3
4
5
6
7
|
@Service@Path("/api/stats")public class StatsRestService { @Context public void setSse(Sse sse) { // Access Sse context here } |
Из контекста Sse мы можем получить доступ к двум очень полезным абстракциям: SseBroadcaster и OutboundSseEvent.Builder , например:
|
1
2
3
4
5
6
7
8
|
private SseBroadcaster broadcaster;private Builder builder; @Contextpublic void setSse(Sse sse) { this.broadcaster = sse.newBroadcaster(); this.builder = sse.newEventBuilder();} |
Как вы уже могли догадаться, OutboundSseEvent.Builder создает экземпляры классов OutboundSseEvent, которые можно отправлять по проводам, в то время как SseBroadcaster передает один и тот же поток SSE всем подключенным клиентам. С учетом вышесказанного мы можем сгенерировать поток OutboundSseEvent и распространить его среди всех, кто заинтересован:
|
01
02
03
04
05
06
07
08
09
10
11
|
private static void subscribe(final SseBroadcaster broadcaster, final Builder builder) { Flowable .interval(1, TimeUnit.SECONDS) .zipWith(eventsStream(builder), (id, bldr) -> createSseEvent(bldr, id)) .subscribeOn(Schedulers.single()) .subscribe(broadcaster::broadcast);}private static Flowable<OutboundSseEvent.Builder> eventsStream(final Builder builder) { return Flowable.generate(emitter -> emitter.onNext(builder.name("stats")));} |
Если вы не знакомы с RxJava 2 , не беспокойтесь, это то, что здесь происходит. Метод eventsStream возвращает фактически бесконечный поток экземпляров OutboundSseEvent.Builder для событий SSE типа stats . Метод подписки немного сложнее. Мы начнем с создания потока, который генерирует последовательный номер каждую секунду, например 0,1,2,3,4,5,6,… и так далее. Позже мы объединяем этот поток с потоком, возвращаемым методом eventsStream , по сути объединяя оба потока в один, который генерирует кортеж (число, OutboundSseEvent.Builder ) каждую секунду. Честно говоря, этот кортеж не очень полезен для нас, поэтому мы преобразуем его в экземпляр класса OutboundSseEvent , рассматривая число как идентификатор события SSE :
|
1
2
3
4
5
6
7
8
9
|
private static final Random RANDOM = new Random();private static OutboundSseEvent createSseEvent(OutboundSseEvent.Builder builder, long id) { return builder .id(Long.toString(id)) .data(Stats.class, new Stats(new Date().getTime(), RANDOM.nextInt(100))) .mediaType(MediaType.APPLICATION_JSON_TYPE) .build();} |
OutboundSseEvent может нести любую полезную нагрузку в свойстве data, которая будет сериализована в соответствии с указанным mediaType , используя обычную стратегию разрешения MessageBodyWriter . Как только мы получим наш экземпляр OutboundSseEvent , мы отправим его с помощью метода SseBroadcaster :: broadcast . Обратите внимание, что мы передали поток управления другому потоку, используя оператор подписки , это обычно то, что вы будете делать все время.
Хорошо, надеюсь, часть потока теперь очищена, но как мы можем на самом деле подписаться на события SSE , генерируемые SseBroadcaster ? Это проще, чем вы думаете
|
1
2
3
4
5
6
|
@GET@Path("broadcast")@Produces(MediaType.SERVER_SENT_EVENTS)public void broadcast(@Context SseEventSink sink) { broadcaster.register(sink);} |
И у нас все готово. Наиболее важной частью здесь является создаваемый тип контента, который должен быть установлен в MediaType.SERVER_SENT_EVENTS . В этом случае контекстный экземпляр SseEventSink становится доступным и может быть зарегистрирован в экземпляре SseBroadcaster .
Чтобы увидеть наш ресурс JAX-RS в действии, нам нужно загрузить экземпляр сервера, используя, например, JAXRSServerFactoryBean , сконфигурировав на этом пути всех необходимых провайдеров. Обратите внимание, что мы также явно указываем транспорт, который будет использоваться, в данном случае SseHttpTransportFactory.TRANSPORT_ID .
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
|
@Configuration@EnableWebMvcpublic class AppConfig extends WebMvcConfigurerAdapter { @Bean public Server rsServer(Bus bus, StatsRestService service) { JAXRSServerFactoryBean endpoint = new JAXRSServerFactoryBean(); endpoint.setBus(bus); endpoint.setAddress("/"); endpoint.setServiceBean(service); endpoint.setTransportId(SseHttpTransportFactory.TRANSPORT_ID); endpoint.setProvider(new JacksonJsonProvider()); return endpoint.create(); } @Override public void addResourceHandlers(ResourceHandlerRegistry registry) { registry .addResourceHandler("/static/**") .addResourceLocations("classpath:/web-ui/"); }} |
Чтобы замкнуть цикл, нам просто нужно предоставить бегунок для нашего приложения Spring Boot :
|
1
2
3
4
5
6
|
@SpringBootApplicationpublic class SseServerStarter { public static void main(String[] args) { SpringApplication.run(SseServerStarter.class, args); }} |
Теперь, если мы запустим приложение и перейдем по адресу http: // localhost: 8080 / static / broadcast.html, используя несколько веб-браузеров или разные вкладки в одном и том же браузере, мы увидим одинаковый поток событий, отображенный во всех них:
Хорошо, широковещание, безусловно, является допустимым вариантом использования, но как насчет возврата независимого потока SSE при каждом вызове конечной точки? Легко, просто используйте методы SseEventSink , такие как send и close , чтобы напрямую управлять потоком SSE .
|
01
02
03
04
05
06
07
08
09
10
|
@GET@Path("sse")@Produces(MediaType.SERVER_SENT_EVENTS)public void stats(@Context SseEventSink sink) { Flowable .interval(1, TimeUnit.SECONDS) .zipWith(eventsStream(builder), (id, bldr) -> createSseEvent(bldr, id)) .subscribeOn(Schedulers.single()) .subscribe(sink::send, ex -> {}, sink::close);} |
На этот раз, если мы запустим приложение и перейдем по адресу http: // localhost: 8080 / static / index.html, используя несколько веб-браузеров или разные вкладки в одном и том же браузере, мы увидим абсолютно разные графики:
Отлично, серверные API действительно очень лаконичны и просты в использовании. Но как насчет клиентской стороны, можем ли мы использовать потоки SSE из приложений Java? Ответ — да, абсолютно. JAX-RS 2.1 также описывает API на стороне клиента, в основе которого лежит SseEventSource .
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
final WebTarget target = ClientBuilder .newClient() .register(JacksonJsonProvider.class) try (final SseEventSource eventSource = SseEventSource .target(target) .reconnectingEvery(5, TimeUnit.SECONDS) .build()) { eventSource.register(event -> { final Stats stats = event.readData(Stats.class, MediaType.APPLICATION_JSON_TYPE); System.out.println("name: " + event.getName()); System.out.println("id: " + event.getId()); System.out.println("comment: " + event.getComment()); System.out.println("data: " + stats.getLoad() + ", " + stats.getTimestamp()); System.out.println("---------------"); }); eventSource.open(); // Just consume SSE events for 10 seconds Thread.sleep(10000); } |
Если мы запустим этот фрагмент кода (при условии, что сервер также работает) мы увидим что-то подобное в консоли (как вы помните, данные генерируются случайным образом).
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
|
name: statsid: 0comment: nulldata: 82, 1509376080027---------------name: statsid: 1comment: nulldata: 68, 1509376081033---------------name: statsid: 2comment: nulldata: 12, 1509376082028---------------name: statsid: 3comment: nulldata: 5, 1509376083028---------------... |
Как мы видим, OutboundSseEvent со стороны сервера становится InboundSseEvent для стороны клиента. Клиент может использовать любую полезную нагрузку из свойства данных, которую можно десериализовать, указав ожидаемый тип носителя , используя обычную стратегию разрешения MessageBodyReader .
В одном посте много сжатого материала. И все же, есть еще несколько вещей, касающихся SSE и JAX-RS 2.1, которые мы здесь не рассмотрели, например, например, использование HttpHeaders.LAST_EVENT_ID_HEADER или настройка задержек повторного подключения. Это может быть отличной темой для следующего поста, если будет интересная информация.
В заключение, поддержка SSE в JAX-RS — это то, чего многие из нас ждали так долго. Наконец, это там, пожалуйста, попробуйте!
Полные источники проекта доступны на Github .
| Опубликовано на Java Code Geeks с разрешения Андрея Редько, партнера нашей программы JCG . См. Оригинальную статью здесь: Лучше поздно, чем никогда: SSE или события, отправленные сервером, теперь в JAX-RS
Мнения, высказанные участниками Java Code Geeks, являются их собственными. |



