`

dubbo remoting 层分析

阅读更多
remote 层关注 transport 和 exchange 这两个包即可. 因为这两个包下封装了通讯相关的内容.


先来补充点额外知识:

1.Endpoint 是端点的概念,我们可以看到,对其抽象,最主要的是一个 send 方法,用于发送数据. 但是我有一个疑问?为啥 Endpoint 不抽象出 receive ?

2.Endpoint 的 send 方法和 ChannelHandler 的 sent 方法的区别和联系?

刚刚我看了下 netty4 下面的 NettyServerHandler 和 NettyChannel,发现 ChannelHandler 中定义的 sent 方法、receive 方法和 Netty Channel 的实现有关,而 NettyChannel 中的 send 方法是和数据发送有关的.

换句话说:NettyChannel 中的 send 方法在发送的时候,会调用 ChannelHandler 中定义的对应方法进行处理.

其实上面问的 Endpoint 为啥不抽象出 receive?其实这个问题就很怪异. 因为 send 方法是一个主动的过程,我发送一个消息,主动的调用下就好,但是我收到一个消息,就不是主动调用下就可以完成的。或许有人会说,可以向 socket 编程那样阻塞住啊。但是仔细想想,netty 不是有 handler 可以在收到消息时进行处理吗?所以在 Endpoint 接口中抽象 receive 方法很鸡肋.

channel 是通讯的载体,信息通过 channel 进行发送和接收. channel 继承 Endpoint. 同时定义了和 attribute 相关的方法.

channelHandler 和 Netty 中的 channelHandler 相关联,是对 channel 的逻辑处理,同时该接口是一个可扩展接口.

client 继承 Endpoint,Channel,Resetable,IdleSensible 接口,客户端同样是一个端点,需要注意的是,客户端可以重连服务端.

server 继承 Endpoint,Resetable,IdleSensible 接口,可以返回所有连接到该服务端的所有客户端,同时可以根据 ip 地址获取具体的客户端.

Codec2 是编解码类

Decodeable 解码接口,用来判断消息是否可以被解码等

Dispatcher 接口用于调度,选择不同的 handler 来进行处理(将消息派发到线程池进行处理).

Transport 接口用于获取 Client 和 Server.

先去网上盗一张图.


下面我们按照上图进行分析:

AbstractPeer 实现 Endpoint 和 ChannelHandler. 主要是增加了对通道是否关闭的判断.

AbstractEndpoint 继承 AbstractPeer 类,增加了 codec2,超时时间,连接超时时间等属性.

AbstractServer 类继承 AbstractEndpoint 以及实现 Server 接口. 这在一方面印证了 Server 其实是一个端.

1.AbstractServer 中的 executor 是在哪个地方用到的了?首先我们看到在构造方法中看到其从 dataStore 中获取 executor. 那么是在哪里放进去的了?我们可以明确的从 WrappedChannelHandler 类的构造方法中看到这里对 executor 进行了赋值,如下:
public WrappedChannelHandler(ChannelHandler handler, URL url) {
        this.handler = handler;
        this.url = url;
        executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url);

        String componentKey = Constants.EXECUTOR_SERVICE_COMPONENT_KEY;
        if (CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(SIDE_KEY))) {
            componentKey = CONSUMER_SIDE;
        }
        DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
        dataStore.put(componentKey, Integer.toString(url.getPort()), executor);
    }
2.现在分析下这里的 executor 干了个啥事了?优雅关机.
优雅停机是啥意思了?核心业务在服务器中正在执行时中断,可能出现严重后果,所以需要等待服务执行完后关闭.
dubbo 是如何做的了?
1.收到 kill 9 进程退出信号,Spring 容器会出发容器销毁事件
2.privoder 端会取消服务元数据信息
3.consumer 端会收到最新地址列表(不包含准备停机的地址)
4.Dubbo 协议会发送 readonly 事件报文通知 consumer 服务不可用
5.服务端等待已经执行的任务结束并拒绝新任务执行

AbstractServer 最主要是实现了 send 和 close 方法. 同时值得注意的是该构造方法调用了 doOpen 方法打开一个服务端.

AbstractClient 继承 AbstractEndpoint 并实现 Client 接口,也就是说 AbstractClient 在语义上是一个端. 同时值得注意的是该构造方法调用了 doOpen 方法和 connect 方法,完成客户端的初始化工作. 我们需要注意下,AbstractClient 中的 wrapChannelHandler 对 handler 的语义进行了增强,增加了心跳和多消息处理 handler. AbstractClient 的 send 方法和 AbstractServer 的 send 方法在实现的时候是不同的,Server 端的 send 方法是遍历 channel,然后调用每一个 channel 进行发送. Client 端只是调用 channel.send 进行发送.

connect 方法加锁了,避免了多线程同时对一个 channel 调用 connect 方法. Client 主要实现 connect、disconnect、reconnect 及 close 的功能.

AbstractChannel 继承 AbstractPeer,实现 Channel 接口.  Channel 在 dubbo 看来,也是一个 Endpoint. 是数据的载体,客户端和服务端通过 channel 发送和接收数据,需要注意的是:AbstractPeer 实现了 ChannelHandler 接口.


ChannelHandlerDelegate 作为装饰者模式的 component. 该接口只定义了一个 getHandler 方法.

AbstractChannelHandlerDelegate 装饰角色,实现的方法都是直接调用成员变量 handler 中的方法.

DecodeHandler 继承 AbstractChannelHandlerDelegate,重写了 handler 的 receive 方法. 在接收到消息的时候进行了判断,如果消息可解码,则直接调用 decode 方法,如果消息是  request 类型的,则对数据部分进行解码,如果消息是 response 类型的,则对结果进行解码. 解码完成后,向上进行传递,换句话说可以无限对 handler 的功能进行增强. 其实这个设计和责任链的设计是两种思路吧,一种是责任链的方式,一种是装饰者模式(任何被解码的消息都必须可以被解码,即:实现 Decodeable 接口).

MultiMessageHandler 同样实现 AbstractChannelHandlerDelegate 接口,但是可以处理 MultiMessage. 我们可以把 MultiMessage 看成是一个聚合的消息(ArrayList). 所以处理的时候也是简单粗暴,直接遍历,然后调用 received 方法.

WrappedChannelHandler 同样是装饰者角色,同样是直接调用成员变量 handler 中的方法. 但是值得注意的是:该类增加了线程池,它的子类都是关心那些消息需要交给线程池处理,那些消息直接交由 I/O 线程处理.

public WrappedChannelHandler(ChannelHandler handler, URL url) {
        this.handler = handler;
        this.url = url;
        executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url);

        String componentKey = Constants.EXECUTOR_SERVICE_COMPONENT_KEY;
        if (CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(SIDE_KEY))) {
            componentKey = CONSUMER_SIDE;
        }
        DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
        dataStore.put(componentKey, Integer.toString(url.getPort()), executor);
    }

MessageOnlyChannelHandler 只有请求响应消息分发到线程池,其他连接断开、心跳等事件在 I/O 线程上执行. MessageOnlyChannelHandler 在重写 received 方法时,并没有 AllChannelHandler 中的那样处理,为什么了?因为该类抛出的异常,会在 HeaderExchangeHandler.caught 方法中进行统一处理.

ExecutionChannelHandler 只有请求消息派发到线程池(不含响应),响应、断开连接、心跳等事件在 I/O 线程上执行. 但是我们看到这里执行的时候,又和 MessageOnlyChannelHandler 中的处理方式不同?为什么了?因为 MessageOnlyChannelHandler 中不仅要处理 request,还要处理 response. 所以进行了统一的处理,但是 ExecutionChannelHandler 只需要在线程池中处理 request,所以就可以和 AllDispatcher 一样了(不必须).

AllChannelHandler 所有消息都派发到线程池,包括请求、响应、连接事件、断开事件、心跳等事件. 关于使用 AllDispatcher 的时候,在早期版本 dubbo provider 线程池可能被 exhausted. 这是由于异常也需要使用线程池来进行处理,但是线程池还是满的,所以无法进一步处理,consumer 只能死等,直至超时. 如何规避了?我们看到在 dubbo2.7 中,received 方法收到消息后,如果线程池抛出了拒绝异常,且这是一个请求,那么将会收到服务端线程池被打满的消息.

direct 所有消息都不派发到线程池,直接在 I/O 线程上执行.

ConnectionOrderedChannelHandler 在 I/O 线程上,将连接、断开事件放入队列,有序逐个执行,其他事件派发到线程池. 需要说一下的是:该类有自己的线程池 connectionExecutor. 因为该类的 caught 方法同样将任务分给了线程池,所以其 received 方法,必须要进行判断处理,如下:

public void received(Channel channel, Object message) throws RemotingException {
        ExecutorService executor = getExecutorService();
        try {
            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
        } catch (Throwable t) {
            //fix, reject exception can not be sent to consumer because thread pool is full, resulting in consumers waiting till timeout.
            if (message instanceof Request && t instanceof RejectedExecutionException) {
                Request request = (Request) message;
                if (request.isTwoWay()) {
                    String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") threadpool is exhausted ,detail msg:" + t.getMessage();
                    Response response = new Response(request.getId(), request.getVersion());
                    response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
                    response.setErrorMessage(msg);
                    channel.send(response);
                    return;
                }
            }
            throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
        }
    }

Dispatcher 就不说了,直接 new 对应的 handler.

ChannelHandlers 类是对 Dispatcher 返回的 channelHandler 的功能进行了增强,增加了处理心跳和多消息的 handler.

AbstractCodec 实现 Codec2 接口,主要定义了检查数据长度方法、判断是客户端侧,还是服务端侧方法,获取序列化方式方法.

TransportCodec 继承 AbstractCodec,主要对 encode 方法、decode 方法进行了重写. 实现序列化和反序列化的功能.

CodecAdapter 将 Codec 适配为 Codec2.

ChannelDelegate 实现 Channel 接口,为 Channel 的装饰角色.

ClientDelegate 实现 Client 接口,为 Client 的装饰角色.

ServerDelegate 实现 Server 接口,为 Server 的装饰角色.

ChannelHandlerAdapter 实现 ChannelHandler 接口,实现的都是空方法,方便子类重写.

ChannelHandlerDispatcher 为 ChannelHandler 的调度器,该类有一个成员变量 channelHandlers,用于保存 channelHandlers. 在实现 connected、disconnected 等方法的时候,都会遍历这个属性列表,依次调用.

CodecSupport 编解码工具类,用于查询序列化方式.

参考:
    ① https://segmentfault.com/a/1190000017390253#articleHeader28
1
0
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics