Ранее я писал об использовании Rx-netty и Karyon2 для разработки готовых к работе в облаке микросервисов, хотя с образцом было несколько проблем, частично воспроизведенных здесь:
package org.bk.samplepong.app;
.....
public class RxNettyHandler implements RequestHandler<ByteBuf, ByteBuf> {
private final String healthCheckUri;
private final HealthCheckEndpoint healthCheckEndpoint;
private final ObjectMapper objectMapper = new ObjectMapper();
public RxNettyHandler(String healthCheckUri, HealthCheckEndpoint healthCheckEndpoint) {
this.healthCheckUri = healthCheckUri;
this.healthCheckEndpoint = healthCheckEndpoint;
}
@Override
public Observable<Void> handle(HttpServerRequest<ByteBuf> request, HttpServerResponse<ByteBuf> response) {
if (request.getUri().startsWith(healthCheckUri)) {
return healthCheckEndpoint.handle(request, response);
} else if (request.getUri().startsWith("/message") && request.getHttpMethod().equals(HttpMethod.POST)) {
return request.getContent().map(byteBuf -> byteBuf.toString(Charset.forName("UTF-8")))
.map(s -> {
try {
Message m = objectMapper.readValue(s, Message.class);
return m;
} catch (IOException e) {
throw new RuntimeException(e);
}
})
.map(m -> new MessageAcknowledgement(m.getId(), m.getPayload(), "Pong"))
.flatMap(ack -> {
try {
return response.writeStringAndFlush(objectMapper.writeValueAsString(ack));
} catch (Exception e) {
response.setStatus(HttpResponseStatus.BAD_REQUEST);
return response.close();
}
}
);
} else {
response.setStatus(HttpResponseStatus.NOT_FOUND);
return response.close();
}
}
}
Вопросы:
- Логика маршрутизации не централизована, обработчик запросов имеет как логику маршрутизации, так и логику обработки
- Зависимости не вводятся чисто.
Глядя на образцы Karyon2 , обе эти проблемы на самом деле сейчас решены очень четко, и я хотел бы документировать их здесь.
Маршрутизация
Маршрутизация может быть централизована с использованием пользовательского RX-netty RequestHandler, называемого SimpleUriRouter
Маршруты можно зарегистрировать следующим образом, используя SimpleRouter, который создается здесь с помощью Guice Provider:
import com.google.inject.Inject;
import com.google.inject.Provider;
import io.netty.buffer.ByteBuf;
import netflix.karyon.health.HealthCheckHandler;
import netflix.karyon.transport.http.SimpleUriRouter;
import netflix.karyon.transport.http.health.HealthCheckEndpoint;
import org.bk.samplepong.app.ApplicationMessageHandler;
import org.bk.samplepong.common.health.HealthCheck;
public class AppRouteProvider implements Provider<SimpleUriRouter<ByteBuf, ByteBuf>> {
@Inject
private HealthCheck healthCheck;
@Inject
private ApplicationMessageHandler applicationMessageHandler;
@Override
public SimpleUriRouter get() {
SimpleUriRouter simpleUriRouter = new SimpleUriRouter();
simpleUriRouter.addUri("/healthcheck", new HealthCheckEndpoint(healthCheck));
simpleUriRouter.addUri("/message", applicationMessageHandler);
return simpleUriRouter;
}
}
Этот маршрутизатор теперь может быть зарегистрирован через пользовательский модуль guice следующим образом:
public class KaryonAppModule extends KaryonHttpModule<ByteBuf, ByteBuf> {
public KaryonAppModule() {
super("routerModule", ByteBuf.class, ByteBuf.class);
}
@Override
protected void configureServer() {
bindRouter().toProvider(new AppRouteProvider());
interceptorSupport().forUri("/*").intercept(LoggingInterceptor.class);
server().port(8888);
}
}
По сути это и есть, теперь логика маршрутизации четко отделена от логики обработки.
Внедрение зависимости
Внедрение зависимостей осуществляется с помощью пользовательских модулей Guice . У меня есть служба, назовите ее MessageHandlerService, которая принимает сообщение и возвращает подтверждение, эта служба определяется следующим образом:
public class MessageHandlerServiceImpl implements MessageHandlerService {
private static final Logger logger = LoggerFactory.getLogger(MessageHandlerServiceImpl.class);
public Observable<MessageAcknowledgement> handleMessage(Message message) {
return Observable.<MessageAcknowledgement>create(s -> {
s.onNext(new MessageAcknowledgement(message.getId(), message.getPayload(), "Pong"));
s.onCompleted();
});
}
}
Теперь у меня есть модуль guice, который определяет связь между интерфейсом MessageHandlerService и конкретным MessageHandlerServiceImpl:
public class AppModule extends AbstractModule {
@Override
protected void configure() {
bind(MessageHandlerService.class).to(MessageHandlerServiceImpl.class).in(Scopes.SINGLETON);
}
}
Благодаря этому MessageHandlerService может быть введен в:
public class ApplicationMessageHandler implements RequestHandler<ByteBuf, ByteBuf> {
private final ObjectMapper objectMapper = new ObjectMapper();
private final MessageHandlerService messageHandlerService;
@Inject
public ApplicationMessageHandler(MessageHandlerService messageHandlerService) {
this.messageHandlerService = messageHandlerService;
}
@Override
public Observable<Void> handle(HttpServerRequest<ByteBuf> request, HttpServerResponse<ByteBuf> response) {
return request.getContent().map(byteBuf -> byteBuf.toString(Charset.forName("UTF-8")))
.map(s -> {
try {
Message m = objectMapper.readValue(s, Message.class);
return m;
} catch (IOException e) {
throw new RuntimeException(e);
}
})
.flatMap(messageHandlerService::handleMessage)
.flatMap(ack -> {
try {
return response.writeStringAndFlush(objectMapper.writeValueAsString(ack));
} catch (Exception e) {
response.setStatus(HttpResponseStatus.BAD_REQUEST);
return response.close();
}
}
);
}
}
Благодаря реализации обеих этих функций приложение, использующее Karyon2, также значительно упрощается, и у меня есть полное рабочее приложение в моем репозитории github здесь : https://github.com/bijukunjummen/sample-ping-pong-netflixoss/tree/master/sample -pong