Статьи

Облачный готовый микросервис на основе Rx-netty и Karyon2

Netflix Karyon предоставляет чистую платформу для создания облачных микросервисов. В вашей организации, если вы используете стек Netflix OSS, состоящий из Eureka для регистрации и обнаружения услуг , Archaius для управления имуществом, то весьма вероятно, что вы используете Karyon для создания своих микросервисов.

В последнее время Кэрион претерпел довольно много изменений, и моя цель здесь состоит в том, чтобы задокументировать хороший образец с использованием более новой версии Кэриона. Старый Karyon (назовите его Karyon1) был основан на спецификациях JAX-RS 1.0 с Джерси в качестве реализации, более новая версия Karyon (Karyon2) все еще поддерживает Джерси, но также поощряет использование RX-Netty, который является настроенной версией Netty с поддержка Rx-Java .

С учетом сказанного, позвольте мне перейти к образцу. Моя цель в этом примере — создать микросервис «pong», который принимает сообщение «POST» и возвращает «Подтверждение».

Ниже приведен пример запроса:

1
2
3
4
{
"id": "id",
"payload":"Ping"
}

И ожидаемый ответ:

1
{"id":"id","received":"Ping","payload":"Pong"}

Первым шагом является создание RequestHandler, который, как следует из названия, является компонентом RX-Netty, отвечающим за маршрутизацию входящего запроса:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
package org.bk.samplepong.app;
 
import com.fasterxml.jackson.databind.ObjectMapper;
import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.reactivex.netty.protocol.http.server.HttpServerRequest;
import io.reactivex.netty.protocol.http.server.HttpServerResponse;
import io.reactivex.netty.protocol.http.server.RequestHandler;
import netflix.karyon.transport.http.health.HealthCheckEndpoint;
import org.bk.samplepong.domain.Message;
import org.bk.samplepong.domain.MessageAcknowledgement;
import rx.Observable;
 
import java.io.IOException;
import java.nio.charset.Charset;
 
 
public class RxNettyHandler implements RequestHandler<ByteBuf, ByteBuf> {
 
    private final String healthCheckUri;
    private final HealthCheckEndpoint healthCheckEndpoint;
    private final ObjectMapper objectMapper = new ObjectMapper();
 
    public RxNettyHandler(String healthCheckUri, HealthCheckEndpoint healthCheckEndpoint) {
        this.healthCheckUri = healthCheckUri;
        this.healthCheckEndpoint = healthCheckEndpoint;
    }
 
    @Override
    public Observable<Void> handle(HttpServerRequest<ByteBuf> request, HttpServerResponse<ByteBuf> response) {
        if (request.getUri().startsWith(healthCheckUri)) {
            return healthCheckEndpoint.handle(request, response);
        } else if (request.getUri().startsWith("/message") && request.getHttpMethod().equals(HttpMethod.POST)) {
            return request.getContent().map(byteBuf -> byteBuf.toString(Charset.forName("UTF-8")))
                    .map(s -> {
                        try {
                            Message m = objectMapper.readValue(s, Message.class);
                            return m;
                        } catch (IOException e) {
                            throw new RuntimeException(e);
                        }
                    })
                    .map(m -> new MessageAcknowledgement(m.getId(), m.getPayload(), "Pong"))
                    .flatMap(ack -> {
                                try {
                                    return response.writeStringAndFlush(objectMapper.writeValueAsString(ack));
                                } catch (Exception e) {
                                    response.setStatus(HttpResponseStatus.BAD_REQUEST);
                                    return response.close();
                                }
                            }
                    );
        } else {
            response.setStatus(HttpResponseStatus.NOT_FOUND);
            return response.close();
        }
    }
}

Этот поток полностью асинхронный и внутренне управляется библиотеками RX-java, лямбда-выражения Java 8 также помогают сделать код лаконичным. Единственная проблема, которую вы видите здесь, заключается в том, что логика маршрутизации (от того, к какому контроллеру) смешана с реальной логикой контроллера, и я считаю, что это решается .

Принимая во внимание этот RequestHandler, сервер может быть запущен в автономной Java-программе, используя сырую RX-Netty таким образом, это по сути, конечная точка будет установлена ​​на порту 8080 для обработки запросов:

1
2
3
4
5
6
7
8
9
public final class RxNettyExample {
 
    public static void main(String... args) throws Exception {
        final ObjectMapper objectMapper = new ObjectMapper();
        RxNettyHandler handler = new RxNettyHandler();
 
        HttpServer<ByteBuf, ByteBuf> server = RxNetty.createHttpServer(8080, handler);
 
        server.start();

Это, однако, является родным способом Rx-netty: для готового к работе в облаке микросервиса должно произойти несколько вещей: служба должна зарегистрироваться в Eureka и должна отвечать на проверки работоспособности обратно из Eureka и должна иметь возможность загружать свойства с помощью Archaius. ,

Так что с Karyon2 запуск в основной программе выглядит немного иначе:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package org.bk.samplepong.app;
 
import netflix.adminresources.resources.KaryonWebAdminModule;
import netflix.karyon.Karyon;
import netflix.karyon.KaryonBootstrapModule;
import netflix.karyon.ShutdownModule;
import netflix.karyon.archaius.ArchaiusBootstrapModule;
import netflix.karyon.eureka.KaryonEurekaModule;
import netflix.karyon.servo.KaryonServoModule;
import netflix.karyon.transport.http.health.HealthCheckEndpoint;
import org.bk.samplepong.resource.HealthCheck;
 
public class SamplePongApp {
 
    public static void main(String[] args) {
        HealthCheck healthCheckHandler = new HealthCheck();
        Karyon.forRequestHandler(8888,
                new RxNettyHandler("/healthcheck",
                        new HealthCheckEndpoint(healthCheckHandler)),
                new KaryonBootstrapModule(healthCheckHandler),
                new ArchaiusBootstrapModule("sample-pong"),
                KaryonEurekaModule.asBootstrapModule(),
                Karyon.toBootstrapModule(KaryonWebAdminModule.class),
                ShutdownModule.asBootstrapModule(),
                KaryonServoModule.asBootstrapModule()
        ).startAndWaitTillShutdown();
    }
}

Теперь она по существу готова к работе в облаке, эта версия программы при запуске будет корректно регистрироваться в Eureka и предоставлять конечную точку проверки работоспособности. Это дополнительно предоставляет аккуратный набор оконечных точек администратора в порту 8077.

Вывод

Я надеюсь, что это дает хорошее представление об использовании Karyon2 для разработки на основе Netflix OSS. Весь образец доступен в моем репозитории github здесь : https://github.com/bijukunjummen/sample-ping-pong-netflixoss/tree/master/sample-pong. В качестве продолжения я покажу, как можно разработать один и тот же сервис с использованием Spring -Cloud, который является способом Spring для создания микро-сервисов.

Ссылка: Микросервис на основе Rx-netty и Karyon2 от нашего партнера JCG Биджу Кунджуммена из блога all and sundry.