基于 Rx-netty 和 Karyon2 的云就绪微服务

一则或许对你有用的小广告

欢迎加入小哈的星球 ,你将获得:专属的项目实战 / Java 学习路线 / 一对一提问 / 学习打卡/ 赠书活动

目前,正在 星球 内带小伙伴们做第一个项目:全栈前后端分离博客项目,采用技术栈 Spring Boot + Mybatis Plus + Vue 3.x + Vite 4手把手,前端 + 后端全栈开发,从 0 到 1 讲解每个功能点开发步骤,1v1 答疑,陪伴式直到项目上线,目前已更新了 204 小节,累计 32w+ 字,讲解图:1416 张,还在持续爆肝中,后续还会上新更多项目,目标是将 Java 领域典型的项目都整上,如秒杀系统、在线商城、IM 即时通讯、权限管理等等,已有 870+ 小伙伴加入,欢迎点击围观

我之前曾 写过 关于使用 Rx-netty 和 Karyon2 开发云就绪微服务的文章,但该示例存在一些问题,部分内容转载于此:



 package org.bk.samplepong.app;

.....

public class RxNettyHandler implements RequestHandler<ByteBuf, ByteBuf> {

private final String healthCheckUri;
private final HealthCheckEndpoint healthCheckEndpoint;
private final ObjectMapper objectMapper = new ObjectMapper();

public RxNettyHandler(String healthCheckUri, HealthCheckEndpoint healthCheckEndpoint) {
    this.healthCheckUri = healthCheckUri;
    this.healthCheckEndpoint = healthCheckEndpoint;
}

@Override
public Observable&lt;Void&gt; handle(HttpServerRequest&lt;ByteBuf&gt; request, HttpServerResponse&lt;ByteBuf&gt; response) {
    if (request.getUri().startsWith(healthCheckUri)) {
        return healthCheckEndpoint.handle(request, response);
    } else if (request.getUri().startsWith("/message") &amp;&amp; request.getHttpMethod().equals(HttpMethod.POST)) {
        return request.getContent().map(byteBuf -&gt; byteBuf.toString(Charset.forName("UTF-8")))
                .map(s -&gt; {
                    try {
                        Message m = objectMapper.readValue(s, Message.class);
                        return m;
                    } catch (IOException e) {
                        throw new RuntimeException(e);
                    }
                })
                .map(m -&gt; new MessageAcknowledgement(m.getId(), m.getPayload(), "Pong"))
                .flatMap(ack -&gt; {
                            try {
                                return response.writeStringAndFlush(objectMapper.writeValueAsString(ack));
                            } catch (Exception e) {
                                response.setStatus(HttpResponseStatus.BAD_REQUEST);
                                return response.close();
                            }
                        }
                );
    } else {
        response.setStatus(HttpResponseStatus.NOT_FOUND);
        return response.close();
    }
}

}


问题是:


  1. 路由逻辑不集中,request handler既有路由逻辑也有处理逻辑
  2. 依赖项没有干净地注入。


查看 Karyon2 样本 ,这两个问题现在实际上都得到了非常明确的解决,我想在这里记录下来。

路由


可以使用称为 SimpleUriRouter 的自定义 Rx-netty RequestHandler 集中路由

可以使用 SimpleRouter 以下列方式注册路由,SimpleRouter 是使用 Guice Provider 在此处创建的:



 package org.bk.samplepong.app;

.....

public class RxNettyHandler implements RequestHandler<ByteBuf, ByteBuf> {

private final String healthCheckUri;
private final HealthCheckEndpoint healthCheckEndpoint;
private final ObjectMapper objectMapper = new ObjectMapper();

public RxNettyHandler(String healthCheckUri, HealthCheckEndpoint healthCheckEndpoint) {
    this.healthCheckUri = healthCheckUri;
    this.healthCheckEndpoint = healthCheckEndpoint;
}

@Override
public Observable&lt;Void&gt; handle(HttpServerRequest&lt;ByteBuf&gt; request, HttpServerResponse&lt;ByteBuf&gt; response) {
    if (request.getUri().startsWith(healthCheckUri)) {
        return healthCheckEndpoint.handle(request, response);
    } else if (request.getUri().startsWith("/message") &amp;&amp; request.getHttpMethod().equals(HttpMethod.POST)) {
        return request.getContent().map(byteBuf -&gt; byteBuf.toString(Charset.forName("UTF-8")))
                .map(s -&gt; {
                    try {
                        Message m = objectMapper.readValue(s, Message.class);
                        return m;
                    } catch (IOException e) {
                        throw new RuntimeException(e);
                    }
                })
                .map(m -&gt; new MessageAcknowledgement(m.getId(), m.getPayload(), "Pong"))
                .flatMap(ack -&gt; {
                            try {
                                return response.writeStringAndFlush(objectMapper.writeValueAsString(ack));
                            } catch (Exception e) {
                                response.setStatus(HttpResponseStatus.BAD_REQUEST);
                                return response.close();
                            }
                        }
                );
    } else {
        response.setStatus(HttpResponseStatus.NOT_FOUND);
        return response.close();
    }
}

}

现在可以通过自定义 guice 模块按以下方式注册此路由器:


 package org.bk.samplepong.app;

.....

public class RxNettyHandler implements RequestHandler<ByteBuf, ByteBuf> {

private final String healthCheckUri;
private final HealthCheckEndpoint healthCheckEndpoint;
private final ObjectMapper objectMapper = new ObjectMapper();

public RxNettyHandler(String healthCheckUri, HealthCheckEndpoint healthCheckEndpoint) {
    this.healthCheckUri = healthCheckUri;
    this.healthCheckEndpoint = healthCheckEndpoint;
}

@Override
public Observable&lt;Void&gt; handle(HttpServerRequest&lt;ByteBuf&gt; request, HttpServerResponse&lt;ByteBuf&gt; response) {
    if (request.getUri().startsWith(healthCheckUri)) {
        return healthCheckEndpoint.handle(request, response);
    } else if (request.getUri().startsWith("/message") &amp;&amp; request.getHttpMethod().equals(HttpMethod.POST)) {
        return request.getContent().map(byteBuf -&gt; byteBuf.toString(Charset.forName("UTF-8")))
                .map(s -&gt; {
                    try {
                        Message m = objectMapper.readValue(s, Message.class);
                        return m;
                    } catch (IOException e) {
                        throw new RuntimeException(e);
                    }
                })
                .map(m -&gt; new MessageAcknowledgement(m.getId(), m.getPayload(), "Pong"))
                .flatMap(ack -&gt; {
                            try {
                                return response.writeStringAndFlush(objectMapper.writeValueAsString(ack));
                            } catch (Exception e) {
                                response.setStatus(HttpResponseStatus.BAD_REQUEST);
                                return response.close();
                            }
                        }
                );
    } else {
        response.setStatus(HttpResponseStatus.NOT_FOUND);
        return response.close();
    }
}

}

本质上就是这样,现在路由逻辑与处理逻辑完全分离了。


依赖注入


依赖注入是通过自定义 guice 模块处理的。我有一个服务,称之为 MessageHandlerService,它接收一条消息并返回一个 Acknowledgement,该服务定义如下:


 package org.bk.samplepong.app;

.....

public class RxNettyHandler implements RequestHandler<ByteBuf, ByteBuf> {

private final String healthCheckUri;
private final HealthCheckEndpoint healthCheckEndpoint;
private final ObjectMapper objectMapper = new ObjectMapper();

public RxNettyHandler(String healthCheckUri, HealthCheckEndpoint healthCheckEndpoint) {
    this.healthCheckUri = healthCheckUri;
    this.healthCheckEndpoint = healthCheckEndpoint;
}

@Override
public Observable&lt;Void&gt; handle(HttpServerRequest&lt;ByteBuf&gt; request, HttpServerResponse&lt;ByteBuf&gt; response) {
    if (request.getUri().startsWith(healthCheckUri)) {
        return healthCheckEndpoint.handle(request, response);
    } else if (request.getUri().startsWith("/message") &amp;&amp; request.getHttpMethod().equals(HttpMethod.POST)) {
        return request.getContent().map(byteBuf -&gt; byteBuf.toString(Charset.forName("UTF-8")))
                .map(s -&gt; {
                    try {
                        Message m = objectMapper.readValue(s, Message.class);
                        return m;
                    } catch (IOException e) {
                        throw new RuntimeException(e);
                    }
                })
                .map(m -&gt; new MessageAcknowledgement(m.getId(), m.getPayload(), "Pong"))
                .flatMap(ack -&gt; {
                            try {
                                return response.writeStringAndFlush(objectMapper.writeValueAsString(ack));
                            } catch (Exception e) {
                                response.setStatus(HttpResponseStatus.BAD_REQUEST);
                                return response.close();
                            }
                        }
                );
    } else {
        response.setStatus(HttpResponseStatus.NOT_FOUND);
        return response.close();
    }
}

}

现在,我有一个 guice 模块,它指定 MessageHandlerService 接口和具体的 MessageHandlerServiceImpl 之间的绑定:


 package org.bk.samplepong.app;

.....

public class RxNettyHandler implements RequestHandler<ByteBuf, ByteBuf> {

private final String healthCheckUri;
private final HealthCheckEndpoint healthCheckEndpoint;
private final ObjectMapper objectMapper = new ObjectMapper();

public RxNettyHandler(String healthCheckUri, HealthCheckEndpoint healthCheckEndpoint) {
    this.healthCheckUri = healthCheckUri;
    this.healthCheckEndpoint = healthCheckEndpoint;
}

@Override
public Observable&lt;Void&gt; handle(HttpServerRequest&lt;ByteBuf&gt; request, HttpServerResponse&lt;ByteBuf&gt; response) {
    if (request.getUri().startsWith(healthCheckUri)) {
        return healthCheckEndpoint.handle(request, response);
    } else if (request.getUri().startsWith("/message") &amp;&amp; request.getHttpMethod().equals(HttpMethod.POST)) {
        return request.getContent().map(byteBuf -&gt; byteBuf.toString(Charset.forName("UTF-8")))
                .map(s -&gt; {
                    try {
                        Message m = objectMapper.readValue(s, Message.class);
                        return m;
                    } catch (IOException e) {
                        throw new RuntimeException(e);
                    }
                })
                .map(m -&gt; new MessageAcknowledgement(m.getId(), m.getPayload(), "Pong"))
                .flatMap(ack -&gt; {
                            try {
                                return response.writeStringAndFlush(objectMapper.writeValueAsString(ack));
                            } catch (Exception e) {
                                response.setStatus(HttpResponseStatus.BAD_REQUEST);
                                return response.close();
                            }
                        }
                );
    } else {
        response.setStatus(HttpResponseStatus.NOT_FOUND);
        return response.close();
    }
}

}

有了这个,就可以将 MessageHandlerService 注入:


 package org.bk.samplepong.app;

.....

public class RxNettyHandler implements RequestHandler<ByteBuf, ByteBuf> {

private final String healthCheckUri;
private final HealthCheckEndpoint healthCheckEndpoint;
private final ObjectMapper objectMapper = new ObjectMapper();

public RxNettyHandler(String healthCheckUri, HealthCheckEndpoint healthCheckEndpoint) {
    this.healthCheckUri = healthCheckUri;
    this.healthCheckEndpoint = healthCheckEndpoint;
}

@Override
public Observable&lt;Void&gt; handle(HttpServerRequest&lt;ByteBuf&gt; request, HttpServerResponse&lt;ByteBuf&gt; response) {
    if (request.getUri().startsWith(healthCheckUri)) {
        return healthCheckEndpoint.handle(request, response);
    } else if (request.getUri().startsWith("/message") &amp;&amp; request.getHttpMethod().equals(HttpMethod.POST)) {
        return request.getContent().map(byteBuf -&gt; byteBuf.toString(Charset.forName("UTF-8")))
                .map(s -&gt; {
                    try {
                        Message m = objectMapper.readValue(s, Message.class);
                        return m;
                    } catch (IOException e) {
                        throw new RuntimeException(e);
                    }
                })
                .map(m -&gt; new MessageAcknowledgement(m.getId(), m.getPayload(), "Pong"))
                .flatMap(ack -&gt; {
                            try {
                                return response.writeStringAndFlush(objectMapper.writeValueAsString(ack));
                            } catch (Exception e) {
                                response.setStatus(HttpResponseStatus.BAD_REQUEST);
                                return response.close();
                            }
                        }
                );
    } else {
        response.setStatus(HttpResponseStatus.NOT_FOUND);
        return response.close();
    }
}

}

实现了这两个功能后,使用 Karyon2 的应用程序也大大简化了,我的 github 存储库 有完整的工作应用程序:https://github.com/bijukunjummen/sample-ping-pong-netflixoss/tree/master/sample -乒乓球