我之前曾 写过 关于使用 Rx-netty 和 Karyon2 开发云就绪微服务的文章,但该示例存在一些问题,部分内容转载于此:
package org.bk.samplepong.app;
.....
public class RxNettyHandler implements RequestHandler<ByteBuf, ByteBuf> {
private final String healthCheckUri;
private final HealthCheckEndpoint healthCheckEndpoint;
private final ObjectMapper objectMapper = new ObjectMapper();
public RxNettyHandler(String healthCheckUri, HealthCheckEndpoint healthCheckEndpoint) {
this.healthCheckUri = healthCheckUri;
this.healthCheckEndpoint = healthCheckEndpoint;
}
@Override
public Observable<Void> handle(HttpServerRequest<ByteBuf> request, HttpServerResponse<ByteBuf> response) {
if (request.getUri().startsWith(healthCheckUri)) {
return healthCheckEndpoint.handle(request, response);
} else if (request.getUri().startsWith("/message") && request.getHttpMethod().equals(HttpMethod.POST)) {
return request.getContent().map(byteBuf -> byteBuf.toString(Charset.forName("UTF-8")))
.map(s -> {
try {
Message m = objectMapper.readValue(s, Message.class);
return m;
} catch (IOException e) {
throw new RuntimeException(e);
}
})
.map(m -> new MessageAcknowledgement(m.getId(), m.getPayload(), "Pong"))
.flatMap(ack -> {
try {
return response.writeStringAndFlush(objectMapper.writeValueAsString(ack));
} catch (Exception e) {
response.setStatus(HttpResponseStatus.BAD_REQUEST);
return response.close();
}
}
);
} else {
response.setStatus(HttpResponseStatus.NOT_FOUND);
return response.close();
}
}
}
问题是:
- 路由逻辑不集中,request handler既有路由逻辑也有处理逻辑
- 依赖项没有干净地注入。
查看 Karyon2 样本 ,这两个问题现在实际上都得到了非常明确的解决,我想在这里记录下来。
路由
可以使用称为 SimpleUriRouter 的自定义 Rx-netty RequestHandler 集中路由
可以使用 SimpleRouter 以下列方式注册路由,SimpleRouter 是使用 Guice Provider 在此处创建的:
package org.bk.samplepong.app;
.....
public class RxNettyHandler implements RequestHandler<ByteBuf, ByteBuf> {
private final String healthCheckUri;
private final HealthCheckEndpoint healthCheckEndpoint;
private final ObjectMapper objectMapper = new ObjectMapper();
public RxNettyHandler(String healthCheckUri, HealthCheckEndpoint healthCheckEndpoint) {
this.healthCheckUri = healthCheckUri;
this.healthCheckEndpoint = healthCheckEndpoint;
}
@Override
public Observable<Void> handle(HttpServerRequest<ByteBuf> request, HttpServerResponse<ByteBuf> response) {
if (request.getUri().startsWith(healthCheckUri)) {
return healthCheckEndpoint.handle(request, response);
} else if (request.getUri().startsWith("/message") && request.getHttpMethod().equals(HttpMethod.POST)) {
return request.getContent().map(byteBuf -> byteBuf.toString(Charset.forName("UTF-8")))
.map(s -> {
try {
Message m = objectMapper.readValue(s, Message.class);
return m;
} catch (IOException e) {
throw new RuntimeException(e);
}
})
.map(m -> new MessageAcknowledgement(m.getId(), m.getPayload(), "Pong"))
.flatMap(ack -> {
try {
return response.writeStringAndFlush(objectMapper.writeValueAsString(ack));
} catch (Exception e) {
response.setStatus(HttpResponseStatus.BAD_REQUEST);
return response.close();
}
}
);
} else {
response.setStatus(HttpResponseStatus.NOT_FOUND);
return response.close();
}
}
}
现在可以通过自定义 guice 模块按以下方式注册此路由器:
package org.bk.samplepong.app;
.....
public class RxNettyHandler implements RequestHandler<ByteBuf, ByteBuf> {
private final String healthCheckUri;
private final HealthCheckEndpoint healthCheckEndpoint;
private final ObjectMapper objectMapper = new ObjectMapper();
public RxNettyHandler(String healthCheckUri, HealthCheckEndpoint healthCheckEndpoint) {
this.healthCheckUri = healthCheckUri;
this.healthCheckEndpoint = healthCheckEndpoint;
}
@Override
public Observable<Void> handle(HttpServerRequest<ByteBuf> request, HttpServerResponse<ByteBuf> response) {
if (request.getUri().startsWith(healthCheckUri)) {
return healthCheckEndpoint.handle(request, response);
} else if (request.getUri().startsWith("/message") && request.getHttpMethod().equals(HttpMethod.POST)) {
return request.getContent().map(byteBuf -> byteBuf.toString(Charset.forName("UTF-8")))
.map(s -> {
try {
Message m = objectMapper.readValue(s, Message.class);
return m;
} catch (IOException e) {
throw new RuntimeException(e);
}
})
.map(m -> new MessageAcknowledgement(m.getId(), m.getPayload(), "Pong"))
.flatMap(ack -> {
try {
return response.writeStringAndFlush(objectMapper.writeValueAsString(ack));
} catch (Exception e) {
response.setStatus(HttpResponseStatus.BAD_REQUEST);
return response.close();
}
}
);
} else {
response.setStatus(HttpResponseStatus.NOT_FOUND);
return response.close();
}
}
}
本质上就是这样,现在路由逻辑与处理逻辑完全分离了。
依赖注入
依赖注入是通过自定义 guice 模块处理的。我有一个服务,称之为 MessageHandlerService,它接收一条消息并返回一个 Acknowledgement,该服务定义如下:
package org.bk.samplepong.app;
.....
public class RxNettyHandler implements RequestHandler<ByteBuf, ByteBuf> {
private final String healthCheckUri;
private final HealthCheckEndpoint healthCheckEndpoint;
private final ObjectMapper objectMapper = new ObjectMapper();
public RxNettyHandler(String healthCheckUri, HealthCheckEndpoint healthCheckEndpoint) {
this.healthCheckUri = healthCheckUri;
this.healthCheckEndpoint = healthCheckEndpoint;
}
@Override
public Observable<Void> handle(HttpServerRequest<ByteBuf> request, HttpServerResponse<ByteBuf> response) {
if (request.getUri().startsWith(healthCheckUri)) {
return healthCheckEndpoint.handle(request, response);
} else if (request.getUri().startsWith("/message") && request.getHttpMethod().equals(HttpMethod.POST)) {
return request.getContent().map(byteBuf -> byteBuf.toString(Charset.forName("UTF-8")))
.map(s -> {
try {
Message m = objectMapper.readValue(s, Message.class);
return m;
} catch (IOException e) {
throw new RuntimeException(e);
}
})
.map(m -> new MessageAcknowledgement(m.getId(), m.getPayload(), "Pong"))
.flatMap(ack -> {
try {
return response.writeStringAndFlush(objectMapper.writeValueAsString(ack));
} catch (Exception e) {
response.setStatus(HttpResponseStatus.BAD_REQUEST);
return response.close();
}
}
);
} else {
response.setStatus(HttpResponseStatus.NOT_FOUND);
return response.close();
}
}
}
现在,我有一个 guice 模块,它指定 MessageHandlerService 接口和具体的 MessageHandlerServiceImpl 之间的绑定:
package org.bk.samplepong.app;
.....
public class RxNettyHandler implements RequestHandler<ByteBuf, ByteBuf> {
private final String healthCheckUri;
private final HealthCheckEndpoint healthCheckEndpoint;
private final ObjectMapper objectMapper = new ObjectMapper();
public RxNettyHandler(String healthCheckUri, HealthCheckEndpoint healthCheckEndpoint) {
this.healthCheckUri = healthCheckUri;
this.healthCheckEndpoint = healthCheckEndpoint;
}
@Override
public Observable<Void> handle(HttpServerRequest<ByteBuf> request, HttpServerResponse<ByteBuf> response) {
if (request.getUri().startsWith(healthCheckUri)) {
return healthCheckEndpoint.handle(request, response);
} else if (request.getUri().startsWith("/message") && request.getHttpMethod().equals(HttpMethod.POST)) {
return request.getContent().map(byteBuf -> byteBuf.toString(Charset.forName("UTF-8")))
.map(s -> {
try {
Message m = objectMapper.readValue(s, Message.class);
return m;
} catch (IOException e) {
throw new RuntimeException(e);
}
})
.map(m -> new MessageAcknowledgement(m.getId(), m.getPayload(), "Pong"))
.flatMap(ack -> {
try {
return response.writeStringAndFlush(objectMapper.writeValueAsString(ack));
} catch (Exception e) {
response.setStatus(HttpResponseStatus.BAD_REQUEST);
return response.close();
}
}
);
} else {
response.setStatus(HttpResponseStatus.NOT_FOUND);
return response.close();
}
}
}
有了这个,就可以将 MessageHandlerService 注入:
package org.bk.samplepong.app;
.....
public class RxNettyHandler implements RequestHandler<ByteBuf, ByteBuf> {
private final String healthCheckUri;
private final HealthCheckEndpoint healthCheckEndpoint;
private final ObjectMapper objectMapper = new ObjectMapper();
public RxNettyHandler(String healthCheckUri, HealthCheckEndpoint healthCheckEndpoint) {
this.healthCheckUri = healthCheckUri;
this.healthCheckEndpoint = healthCheckEndpoint;
}
@Override
public Observable<Void> handle(HttpServerRequest<ByteBuf> request, HttpServerResponse<ByteBuf> response) {
if (request.getUri().startsWith(healthCheckUri)) {
return healthCheckEndpoint.handle(request, response);
} else if (request.getUri().startsWith("/message") && request.getHttpMethod().equals(HttpMethod.POST)) {
return request.getContent().map(byteBuf -> byteBuf.toString(Charset.forName("UTF-8")))
.map(s -> {
try {
Message m = objectMapper.readValue(s, Message.class);
return m;
} catch (IOException e) {
throw new RuntimeException(e);
}
})
.map(m -> new MessageAcknowledgement(m.getId(), m.getPayload(), "Pong"))
.flatMap(ack -> {
try {
return response.writeStringAndFlush(objectMapper.writeValueAsString(ack));
} catch (Exception e) {
response.setStatus(HttpResponseStatus.BAD_REQUEST);
return response.close();
}
}
);
} else {
response.setStatus(HttpResponseStatus.NOT_FOUND);
return response.close();
}
}
}
实现了这两个功能后,使用 Karyon2 的应用程序也大大简化了,我的 github 存储库
中
有完整的工作应用程序:https://github.com/bijukunjummen/sample-ping-pong-netflixoss/tree/master/sample -乒乓球