Статьи

Лучше поздно, чем никогда: SSE или события, отправленные сервером, теперь в JAX-RS

Отправленные сервером события (или просто SSE ) — довольно полезный протокол, который позволяет серверам передавать данные клиентам через HTTP. Это то, что наши веб-браузеры поддерживают целую вечность, но, что удивительно, долгое время игнорировалось спецификацией JAX-RS . Хотя у Джерси было расширение, доступное для медиа-типа SSE , API никогда не был формализован и поэтому не был переносимым на другие реализации JAX-RS .

К счастью, JAX-RS 2.1 , также известный как JSR-370 , изменил это, сделав поддержку SSE , как на стороне клиента, так и на стороне сервера, частью официальной спецификации. В сегодняшнем посте мы рассмотрим, как интегрировать поддержку SSE в существующие веб-сервисы Java REST (ful) , используя недавно выпущенную версию 3.2.0 потрясающей платформы Apache CXF . Фактически, кроме начальной загрузки, на самом деле нет ничего специфичного для CXF , все примеры должны работать в любой другой среде, которая реализует спецификацию JAX-RS 2.1 .

Без дальнейших церемоний, давайте начнем. Поскольку в наши дни значительное количество Java-проектов построено на основе потрясающей Spring Framework , наше примерное приложение будет использовать Spring Boot и Apache CXF Spring Boot Integration, чтобы быстро начать работу. Старый добрый приятель Apache Maven также поможет нам, управляя зависимостями нашего проекта

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
org.springframework.boot
    spring-boot-starter
    1.5.8.RELEASE
 
 
 
    org.apache.cxf
    cxf-rt-frontend-jaxrs
    3.2.0
 
 
 
    org.apache.cxf
    cxf-spring-boot-starter-jaxrs
    3.2.0
 
 
 
    org.apache.cxf
    cxf-rt-rs-client
    3.2.0
 
 
 
     org.apache.cxf
     cxf-rt-rs-sse
     3.2.0

Под капотом Apache CXF использует среду Atmosphere для реализации транспорта SSE, так что это еще одна зависимость, которую мы должны включить.

1
2
3
org.atmosphere
    atmosphere-runtime
    2.4.14

Особенности использования инфраструктуры Atmosphere приводят к необходимости предоставления дополнительных параметров конфигурации, а именно transportId , чтобы гарантировать, что транспорт, поддерживающий SSE, будет выбран во время выполнения. Соответствующие данные могут быть добавлены в файл application.yml :

1
2
3
4
cxf:
  servlet:
    init:
      transportId: http://cxf.apache.org/transports/http/sse

Отлично, значит, основа есть, двигаясь дальше. Веб-сервис REST (ful), который мы собираемся создать, будет представлять мнимые средние нагрузки на процессор (для простоты, генерируемые случайным образом) в виде потоков SSE . Класс Stats будет представлять нашу модель данных.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
public class Stats {
    private long timestamp;
    private int load;
 
    public Stats() {
    }
 
    public Stats(long timestamp, int load) {
        this.timestamp = timestamp;
        this.load = load;
    }
 
    // Getters and setters are omitted
    ...
}

Говоря о потоках, спецификация Reactive Streams вошла в Java 9, и, надеюсь, мы увидим ускоренное принятие моделей реактивного программирования сообществом Java. Более того, разработка веб-сервисов REST (ful) с поддержкой SSE была бы намного проще и понятнее при поддержке Reactive Streams . Для этого давайте добавим RxJava 2 в наш пример приложения.

1
2
3
io.reactivex.rxjava2
    rxjava
    2.1.6

Это хороший момент, чтобы начать с нашего класса StatsRestService , типичной реализации ресурса JAX-RS . Ключевые возможности SSE в JAX-RS 2.1 сосредоточены вокруг контекстного объекта Sse, который может быть введен следующим образом.

1
2
3
4
5
6
7
@Service
@Path("/api/stats")
public class StatsRestService {
    @Context
    public void setSse(Sse sse) {
        // Access Sse context here
    }

Из контекста Sse мы можем получить доступ к двум очень полезным абстракциям: SseBroadcaster и OutboundSseEvent.Builder , например:

1
2
3
4
5
6
7
8
private SseBroadcaster broadcaster;
private Builder builder;
     
@Context
public void setSse(Sse sse) {
    this.broadcaster = sse.newBroadcaster();
    this.builder = sse.newEventBuilder();
}

Как вы уже могли догадаться, OutboundSseEvent.Builder создает экземпляры классов OutboundSseEvent, которые можно отправлять по проводам, в то время как SseBroadcaster передает один и тот же поток SSE всем подключенным клиентам. С учетом вышесказанного мы можем сгенерировать поток OutboundSseEvent и распространить его среди всех, кто заинтересован:

01
02
03
04
05
06
07
08
09
10
11
private static void subscribe(final SseBroadcaster broadcaster, final Builder builder) {
    Flowable
        .interval(1, TimeUnit.SECONDS)
        .zipWith(eventsStream(builder), (id, bldr) -> createSseEvent(bldr, id))
        .subscribeOn(Schedulers.single())
        .subscribe(broadcaster::broadcast);
}
 
private static Flowable<OutboundSseEvent.Builder> eventsStream(final Builder builder) {
    return Flowable.generate(emitter -> emitter.onNext(builder.name("stats")));
}

Если вы не знакомы с RxJava 2 , не беспокойтесь, это то, что здесь происходит. Метод eventsStream возвращает фактически бесконечный поток экземпляров OutboundSseEvent.Builder для событий SSE типа stats . Метод подписки немного сложнее. Мы начнем с создания потока, который генерирует последовательный номер каждую секунду, например 0,1,2,3,4,5,6,… и так далее. Позже мы объединяем этот поток с потоком, возвращаемым методом eventsStream , по сути объединяя оба потока в один, который генерирует кортеж (число, OutboundSseEvent.Builder ) каждую секунду. Честно говоря, этот кортеж не очень полезен для нас, поэтому мы преобразуем его в экземпляр класса OutboundSseEvent , рассматривая число как идентификатор события SSE :

1
2
3
4
5
6
7
8
9
private static final Random RANDOM = new Random();
 
private static OutboundSseEvent createSseEvent(OutboundSseEvent.Builder builder, long id) {
    return builder
        .id(Long.toString(id))
        .data(Stats.class, new Stats(new Date().getTime(), RANDOM.nextInt(100)))
        .mediaType(MediaType.APPLICATION_JSON_TYPE)
        .build();
}

OutboundSseEvent может нести любую полезную нагрузку в свойстве data, которая будет сериализована в соответствии с указанным mediaType , используя обычную стратегию разрешения MessageBodyWriter . Как только мы получим наш экземпляр OutboundSseEvent , мы отправим его с помощью метода SseBroadcaster :: broadcast . Обратите внимание, что мы передали поток управления другому потоку, используя оператор подписки , это обычно то, что вы будете делать все время.

Хорошо, надеюсь, часть потока теперь очищена, но как мы можем на самом деле подписаться на события SSE , генерируемые SseBroadcaster ? Это проще, чем вы думаете

1
2
3
4
5
6
@GET
@Path("broadcast")
@Produces(MediaType.SERVER_SENT_EVENTS)
public void broadcast(@Context SseEventSink sink) {
    broadcaster.register(sink);
}

И у нас все готово. Наиболее важной частью здесь является создаваемый тип контента, который должен быть установлен в MediaType.SERVER_SENT_EVENTS . В этом случае контекстный экземпляр SseEventSink становится доступным и может быть зарегистрирован в экземпляре SseBroadcaster .

Чтобы увидеть наш ресурс JAX-RS в действии, нам нужно загрузить экземпляр сервера, используя, например, JAXRSServerFactoryBean , сконфигурировав на этом пути всех необходимых провайдеров. Обратите внимание, что мы также явно указываем транспорт, который будет использоваться, в данном случае SseHttpTransportFactory.TRANSPORT_ID .

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
@Configuration
@EnableWebMvc
public class AppConfig extends WebMvcConfigurerAdapter {
    @Bean
    public Server rsServer(Bus bus, StatsRestService service) {
        JAXRSServerFactoryBean endpoint = new JAXRSServerFactoryBean();
        endpoint.setBus(bus);
        endpoint.setAddress("/");
        endpoint.setServiceBean(service);
        endpoint.setTransportId(SseHttpTransportFactory.TRANSPORT_ID);
        endpoint.setProvider(new JacksonJsonProvider());
        return endpoint.create();
    }
     
    @Override
    public void addResourceHandlers(ResourceHandlerRegistry registry) {
        registry
          .addResourceHandler("/static/**")
          .addResourceLocations("classpath:/web-ui/");
    }
}

Чтобы замкнуть цикл, нам просто нужно предоставить бегунок для нашего приложения Spring Boot :

1
2
3
4
5
6
@SpringBootApplication
public class SseServerStarter {   
    public static void main(String[] args) {
        SpringApplication.run(SseServerStarter.class, args);
    }
}

Теперь, если мы запустим приложение и перейдем по адресу http: // localhost: 8080 / static / broadcast.html, используя несколько веб-браузеров или разные вкладки в одном и том же браузере, мы увидим одинаковый поток событий, отображенный во всех них:

Хорошо, широковещание, безусловно, является допустимым вариантом использования, но как насчет возврата независимого потока SSE при каждом вызове конечной точки? Легко, просто используйте методы SseEventSink , такие как send и close , чтобы напрямую управлять потоком SSE .

01
02
03
04
05
06
07
08
09
10
@GET
@Path("sse")
@Produces(MediaType.SERVER_SENT_EVENTS)
public void stats(@Context SseEventSink sink) {
    Flowable
        .interval(1, TimeUnit.SECONDS)
        .zipWith(eventsStream(builder), (id, bldr) -> createSseEvent(bldr, id))
        .subscribeOn(Schedulers.single())
        .subscribe(sink::send, ex -> {}, sink::close);
}

На этот раз, если мы запустим приложение и перейдем по адресу http: // localhost: 8080 / static / index.html, используя несколько веб-браузеров или разные вкладки в одном и том же браузере, мы увидим абсолютно разные графики:

Отлично, серверные API действительно очень лаконичны и просты в использовании. Но как насчет клиентской стороны, можем ли мы использовать потоки SSE из приложений Java? Ответ — да, абсолютно. JAX-RS 2.1 также описывает API на стороне клиента, в основе которого лежит SseEventSource .

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
final WebTarget target = ClientBuilder
    .newClient()
    .register(JacksonJsonProvider.class)
         
try (final SseEventSource eventSource =
            SseEventSource
                .target(target)
                .reconnectingEvery(5, TimeUnit.SECONDS)
                .build()) {
 
    eventSource.register(event -> {
        final Stats stats = event.readData(Stats.class, MediaType.APPLICATION_JSON_TYPE);
        System.out.println("name: " + event.getName());
        System.out.println("id: " + event.getId());
        System.out.println("comment: " + event.getComment());
        System.out.println("data: " + stats.getLoad() + ", " + stats.getTimestamp());
        System.out.println("---------------");
    });
    eventSource.open();
 
    // Just consume SSE events for 10 seconds
    Thread.sleep(10000);
}

Если мы запустим этот фрагмент кода (при условии, что сервер также работает) мы увидим что-то подобное в консоли (как вы помните, данные генерируются случайным образом).

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
name: stats
id: 0
comment: null
data: 82, 1509376080027
---------------
name: stats
id: 1
comment: null
data: 68, 1509376081033
---------------
name: stats
id: 2
comment: null
data: 12, 1509376082028
---------------
name: stats
id: 3
comment: null
data: 5, 1509376083028
---------------
 
...

Как мы видим, OutboundSseEvent со стороны сервера становится InboundSseEvent для стороны клиента. Клиент может использовать любую полезную нагрузку из свойства данных, которую можно десериализовать, указав ожидаемый тип носителя , используя обычную стратегию разрешения MessageBodyReader .

В одном посте много сжатого материала. И все же, есть еще несколько вещей, касающихся SSE и JAX-RS 2.1, которые мы здесь не рассмотрели, например, например, использование HttpHeaders.LAST_EVENT_ID_HEADER или настройка задержек повторного подключения. Это может быть отличной темой для следующего поста, если будет интересная информация.

В заключение, поддержка SSE в JAX-RS — это то, чего многие из нас ждали так долго. Наконец, это там, пожалуйста, попробуйте!

Полные источники проекта доступны на Github .

Опубликовано на Java Code Geeks с разрешения Андрея Редько, партнера нашей программы JCG . См. Оригинальную статью здесь: Лучше поздно, чем никогда: SSE или события, отправленные сервером, теперь в JAX-RS

Мнения, высказанные участниками Java Code Geeks, являются их собственными.