基于 Rx-netty 和 Karyon2 的云就绪微服务

一则或许对你有用的小广告

欢迎加入小哈的星球 ,你将获得:专属的项目实战 / 1v1 提问 / Java 学习路线 / 学习打卡 / 每月赠书 / 社群讨论

  • 新项目:《从零手撸:仿小红书(微服务架构)》 正在持续爆肝中,基于 Spring Cloud Alibaba + Spring Boot 3.x + JDK 17...点击查看项目介绍 ;
  • 《从零手撸:前后端分离博客项目(全栈开发)》 2 期已完结,演示链接: http://116.62.199.48/ ;

截止目前, 星球 内专栏累计输出 63w+ 字,讲解图 2808+ 张,还在持续爆肝中.. 后续还会上新更多项目,目标是将 Java 领域典型的项目都整一波,如秒杀系统, 在线商城, IM 即时通讯,权限管理,Spring Cloud Alibaba 微服务等等,已有 2200+ 小伙伴加入学习 ,欢迎点击围观

我之前曾 写过 关于使用 Rx-netty 和 Karyon2 开发云就绪微服务的文章,但该示例存在一些问题,部分内容转载于此:



 package org.bk.samplepong.app;

.....

public class RxNettyHandler implements RequestHandler<ByteBuf, ByteBuf> {

private final String healthCheckUri;
private final HealthCheckEndpoint healthCheckEndpoint;
private final ObjectMapper objectMapper = new ObjectMapper();

public RxNettyHandler(String healthCheckUri, HealthCheckEndpoint healthCheckEndpoint) {
    this.healthCheckUri = healthCheckUri;
    this.healthCheckEndpoint = healthCheckEndpoint;
}

@Override
public Observable&lt;Void&gt; handle(HttpServerRequest&lt;ByteBuf&gt; request, HttpServerResponse&lt;ByteBuf&gt; response) {
    if (request.getUri().startsWith(healthCheckUri)) {
        return healthCheckEndpoint.handle(request, response);
    } else if (request.getUri().startsWith("/message") &amp;&amp; request.getHttpMethod().equals(HttpMethod.POST)) {
        return request.getContent().map(byteBuf -&gt; byteBuf.toString(Charset.forName("UTF-8")))
                .map(s -&gt; {
                    try {
                        Message m = objectMapper.readValue(s, Message.class);
                        return m;
                    } catch (IOException e) {
                        throw new RuntimeException(e);
                    }
                })
                .map(m -&gt; new MessageAcknowledgement(m.getId(), m.getPayload(), "Pong"))
                .flatMap(ack -&gt; {
                            try {
                                return response.writeStringAndFlush(objectMapper.writeValueAsString(ack));
                            } catch (Exception e) {
                                response.setStatus(HttpResponseStatus.BAD_REQUEST);
                                return response.close();
                            }
                        }
                );
    } else {
        response.setStatus(HttpResponseStatus.NOT_FOUND);
        return response.close();
    }
}

}


问题是:


  1. 路由逻辑不集中,request handler既有路由逻辑也有处理逻辑
  2. 依赖项没有干净地注入。


查看 Karyon2 样本 ,这两个问题现在实际上都得到了非常明确的解决,我想在这里记录下来。

路由


可以使用称为 SimpleUriRouter 的自定义 Rx-netty RequestHandler 集中路由

可以使用 SimpleRouter 以下列方式注册路由,SimpleRouter 是使用 Guice Provider 在此处创建的:



 package org.bk.samplepong.app;

.....

public class RxNettyHandler implements RequestHandler<ByteBuf, ByteBuf> {

private final String healthCheckUri;
private final HealthCheckEndpoint healthCheckEndpoint;
private final ObjectMapper objectMapper = new ObjectMapper();

public RxNettyHandler(String healthCheckUri, HealthCheckEndpoint healthCheckEndpoint) {
    this.healthCheckUri = healthCheckUri;
    this.healthCheckEndpoint = healthCheckEndpoint;
}

@Override
public Observable&lt;Void&gt; handle(HttpServerRequest&lt;ByteBuf&gt; request, HttpServerResponse&lt;ByteBuf&gt; response) {
    if (request.getUri().startsWith(healthCheckUri)) {
        return healthCheckEndpoint.handle(request, response);
    } else if (request.getUri().startsWith("/message") &amp;&amp; request.getHttpMethod().equals(HttpMethod.POST)) {
        return request.getContent().map(byteBuf -&gt; byteBuf.toString(Charset.forName("UTF-8")))
                .map(s -&gt; {
                    try {
                        Message m = objectMapper.readValue(s, Message.class);
                        return m;
                    } catch (IOException e) {
                        throw new RuntimeException(e);
                    }
                })
                .map(m -&gt; new MessageAcknowledgement(m.getId(), m.getPayload(), "Pong"))
                .flatMap(ack -&gt; {
                            try {
                                return response.writeStringAndFlush(objectMapper.writeValueAsString(ack));
                            } catch (Exception e) {
                                response.setStatus(HttpResponseStatus.BAD_REQUEST);
                                return response.close();
                            }
                        }
                );
    } else {
        response.setStatus(HttpResponseStatus.NOT_FOUND);
        return response.close();
    }
}

}

现在可以通过自定义 guice 模块按以下方式注册此路由器:


 package org.bk.samplepong.app;

.....

public class RxNettyHandler implements RequestHandler<ByteBuf, ByteBuf> {

private final String healthCheckUri;
private final HealthCheckEndpoint healthCheckEndpoint;
private final ObjectMapper objectMapper = new ObjectMapper();

public RxNettyHandler(String healthCheckUri, HealthCheckEndpoint healthCheckEndpoint) {
    this.healthCheckUri = healthCheckUri;
    this.healthCheckEndpoint = healthCheckEndpoint;
}

@Override
public Observable&lt;Void&gt; handle(HttpServerRequest&lt;ByteBuf&gt; request, HttpServerResponse&lt;ByteBuf&gt; response) {
    if (request.getUri().startsWith(healthCheckUri)) {
        return healthCheckEndpoint.handle(request, response);
    } else if (request.getUri().startsWith("/message") &amp;&amp; request.getHttpMethod().equals(HttpMethod.POST)) {
        return request.getContent().map(byteBuf -&gt; byteBuf.toString(Charset.forName("UTF-8")))
                .map(s -&gt; {
                    try {
                        Message m = objectMapper.readValue(s, Message.class);
                        return m;
                    } catch (IOException e) {
                        throw new RuntimeException(e);
                    }
                })
                .map(m -&gt; new MessageAcknowledgement(m.getId(), m.getPayload(), "Pong"))
                .flatMap(ack -&gt; {
                            try {
                                return response.writeStringAndFlush(objectMapper.writeValueAsString(ack));
                            } catch (Exception e) {
                                response.setStatus(HttpResponseStatus.BAD_REQUEST);
                                return response.close();
                            }
                        }
                );
    } else {
        response.setStatus(HttpResponseStatus.NOT_FOUND);
        return response.close();
    }
}

}

本质上就是这样,现在路由逻辑与处理逻辑完全分离了。


依赖注入


依赖注入是通过自定义 guice 模块处理的。我有一个服务,称之为 MessageHandlerService,它接收一条消息并返回一个 Acknowledgement,该服务定义如下:


 package org.bk.samplepong.app;

.....

public class RxNettyHandler implements RequestHandler<ByteBuf, ByteBuf> {

private final String healthCheckUri;
private final HealthCheckEndpoint healthCheckEndpoint;
private final ObjectMapper objectMapper = new ObjectMapper();

public RxNettyHandler(String healthCheckUri, HealthCheckEndpoint healthCheckEndpoint) {
    this.healthCheckUri = healthCheckUri;
    this.healthCheckEndpoint = healthCheckEndpoint;
}

@Override
public Observable&lt;Void&gt; handle(HttpServerRequest&lt;ByteBuf&gt; request, HttpServerResponse&lt;ByteBuf&gt; response) {
    if (request.getUri().startsWith(healthCheckUri)) {
        return healthCheckEndpoint.handle(request, response);
    } else if (request.getUri().startsWith("/message") &amp;&amp; request.getHttpMethod().equals(HttpMethod.POST)) {
        return request.getContent().map(byteBuf -&gt; byteBuf.toString(Charset.forName("UTF-8")))
                .map(s -&gt; {
                    try {
                        Message m = objectMapper.readValue(s, Message.class);
                        return m;
                    } catch (IOException e) {
                        throw new RuntimeException(e);
                    }
                })
                .map(m -&gt; new MessageAcknowledgement(m.getId(), m.getPayload(), "Pong"))
                .flatMap(ack -&gt; {
                            try {
                                return response.writeStringAndFlush(objectMapper.writeValueAsString(ack));
                            } catch (Exception e) {
                                response.setStatus(HttpResponseStatus.BAD_REQUEST);
                                return response.close();
                            }
                        }
                );
    } else {
        response.setStatus(HttpResponseStatus.NOT_FOUND);
        return response.close();
    }
}

}

现在,我有一个 guice 模块,它指定 MessageHandlerService 接口和具体的 MessageHandlerServiceImpl 之间的绑定:


 package org.bk.samplepong.app;

.....

public class RxNettyHandler implements RequestHandler<ByteBuf, ByteBuf> {

private final String healthCheckUri;
private final HealthCheckEndpoint healthCheckEndpoint;
private final ObjectMapper objectMapper = new ObjectMapper();

public RxNettyHandler(String healthCheckUri, HealthCheckEndpoint healthCheckEndpoint) {
    this.healthCheckUri = healthCheckUri;
    this.healthCheckEndpoint = healthCheckEndpoint;
}

@Override
public Observable&lt;Void&gt; handle(HttpServerRequest&lt;ByteBuf&gt; request, HttpServerResponse&lt;ByteBuf&gt; response) {
    if (request.getUri().startsWith(healthCheckUri)) {
        return healthCheckEndpoint.handle(request, response);
    } else if (request.getUri().startsWith("/message") &amp;&amp; request.getHttpMethod().equals(HttpMethod.POST)) {
        return request.getContent().map(byteBuf -&gt; byteBuf.toString(Charset.forName("UTF-8")))
                .map(s -&gt; {
                    try {
                        Message m = objectMapper.readValue(s, Message.class);
                        return m;
                    } catch (IOException e) {
                        throw new RuntimeException(e);
                    }
                })
                .map(m -&gt; new MessageAcknowledgement(m.getId(), m.getPayload(), "Pong"))
                .flatMap(ack -&gt; {
                            try {
                                return response.writeStringAndFlush(objectMapper.writeValueAsString(ack));
                            } catch (Exception e) {
                                response.setStatus(HttpResponseStatus.BAD_REQUEST);
                                return response.close();
                            }
                        }
                );
    } else {
        response.setStatus(HttpResponseStatus.NOT_FOUND);
        return response.close();
    }
}

}

有了这个,就可以将 MessageHandlerService 注入:


 package org.bk.samplepong.app;

.....

public class RxNettyHandler implements RequestHandler<ByteBuf, ByteBuf> {

private final String healthCheckUri;
private final HealthCheckEndpoint healthCheckEndpoint;
private final ObjectMapper objectMapper = new ObjectMapper();

public RxNettyHandler(String healthCheckUri, HealthCheckEndpoint healthCheckEndpoint) {
    this.healthCheckUri = healthCheckUri;
    this.healthCheckEndpoint = healthCheckEndpoint;
}

@Override
public Observable&lt;Void&gt; handle(HttpServerRequest&lt;ByteBuf&gt; request, HttpServerResponse&lt;ByteBuf&gt; response) {
    if (request.getUri().startsWith(healthCheckUri)) {
        return healthCheckEndpoint.handle(request, response);
    } else if (request.getUri().startsWith("/message") &amp;&amp; request.getHttpMethod().equals(HttpMethod.POST)) {
        return request.getContent().map(byteBuf -&gt; byteBuf.toString(Charset.forName("UTF-8")))
                .map(s -&gt; {
                    try {
                        Message m = objectMapper.readValue(s, Message.class);
                        return m;
                    } catch (IOException e) {
                        throw new RuntimeException(e);
                    }
                })
                .map(m -&gt; new MessageAcknowledgement(m.getId(), m.getPayload(), "Pong"))
                .flatMap(ack -&gt; {
                            try {
                                return response.writeStringAndFlush(objectMapper.writeValueAsString(ack));
                            } catch (Exception e) {
                                response.setStatus(HttpResponseStatus.BAD_REQUEST);
                                return response.close();
                            }
                        }
                );
    } else {
        response.setStatus(HttpResponseStatus.NOT_FOUND);
        return response.close();
    }
}

}

实现了这两个功能后,使用 Karyon2 的应用程序也大大简化了,我的 github 存储库 有完整的工作应用程序:https://github.com/bijukunjummen/sample-ping-pong-netflixoss/tree/master/sample -乒乓球




相关文章