百木园-与人分享,
就是让自己快乐。

Dubbo源码(九)

1. 前言

本文基于Dubbo2.6.x版本,中文注释版源码已上传github:xiaoguyu/dubbo

源码分析均基于官方Demo,路径:dubbo/dubbo-demo

如果没有看过之前Dubbo系列的文章,建议先去看看。因为服务调用过程涉及范围较广,需要那些前置知识。

Dubbo 服务调用过程比较复杂,包含众多步骤,比如发送请求、编解码、服务降级、过滤器链处理、序列化、线程派发以及响应请求等步骤。限于篇幅原因,本篇文章无法对所有的步骤一一进行分析。后续挖坑再说吧。本篇文章将会重点分析请求的发送与接收、线程派发以及响应的发送与接收等过程。

2. 源码分析

先了解下 Dubbo 服务调用过程(图片来自官方文档)

Untitled

首先服务消费者通过代理对象 Proxy 发起远程调用,接着通过网络客户端 Client 将编码后的请求发送给服务提供方的网络层上,也就是 Server。Server 在收到请求后,首先要做的事情是对数据包进行解码。然后将解码后的请求发送至分发器 Dispatcher,再由分发器将请求派发到指定的线程池上,最后由线程池调用具体的服务。这就是一个远程调用请求的发送与接收过程。至于响应的发送与接收过程,这张图中没有表现出来。

2.1 服务调用方式

Dubbo 支持同步和异步两种调用方式,其中异步调用还可细分为“有返回值”的异步调用和“无返回值”的异步调用。所谓“无返回值”异步调用是指服务消费方只管调用,但不关心调用结果,此时 Dubbo 会直接返回一个空的 RpcResult。Dubbo 默认使用同步调用方式。

2.1.1 异步调用案例

当有返回值异步和无返回值异步同时存在,无返回值异步优先:

  • 有返回值异步调用

    修改配置,将参数async设置为 true

    <dubbo:reference id=\"demoService\" check=\"false\" interface=\"com.alibaba.dubbo.demo.DemoService\">
        <dubbo:method name=\"sayHello\" async=\"true\" />
    </dubbo:reference>
    

    代码使用如下

    String hello = demoService.sayHello(\"world\");// 返回值为null,要注意
    Future<String> future = RpcContext.getContext().getFuture();
    ... // 业务线程可以开始做其他事情
    result = future.get();
    
  • 无返回值异步调用

    修改配置,将参数return设置为 false

    <dubbo:reference id=\"demoService\" check=\"false\" interface=\"com.alibaba.dubbo.demo.DemoService\">
        <dubbo:method name=\"sayHello\" return=\"false\" />
    </dubbo:reference>
    

    代码使用

    String hello = demoService.sayHello(\"world\");// 返回值为null,要注意
    Future<String> future = RpcContext.getContext().getFuture();// future 为 null
    

下面,我们开始进入源码分析。

2.1.2 InvokerInvocationHandler

当我们通过Spring注入服务接口时,实际上注入的是服务接口的实现类,这个实现类由Dubbo框架生成。请看 服务引用#创建代理对象

package org.apache.dubbo.common.bytecode;

public class proxy0 implements org.apache.dubbo.demo.DemoService {

    public static java.lang.reflect.Method[] methods;

    private java.lang.reflect.InvocationHandler handler;

    public proxy0() {
    }

    public proxy0(java.lang.reflect.InvocationHandler arg0) {
        handler = 1;
    }

    public java.lang.String sayHello(java.lang.String arg0) {
        Object[] args = new Object[1];
        args[0] = (w) $1;
        Object ret = handler.invoke(this, methods[0], args);
        return (java.lang.String) ret;
    }
}

也就是调用 demoService.sayHello 时,实际上是调用 handler.invoke ,而这个 handler 就是InvokerInvocationHandler

public class InvokerInvocationHandler implements InvocationHandler {

    private final Invoker<?> invoker;

    public InvokerInvocationHandler(Invoker<?> handler) {
        this.invoker = handler;
    }

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        String methodName = method.getName();
        Class<?>[] parameterTypes = method.getParameterTypes();

        // 拦截定义在 Object 类中的方法(未被子类重写),比如 wait/notify
        if (method.getDeclaringClass() == Object.class) {
            return method.invoke(invoker, args);
        }
        // 如果 toString、hashCode 和 equals 等方法被子类重写了,这里也直接调用
        if (\"toString\".equals(methodName) && parameterTypes.length == 0) {
            return invoker.toString();
        }
        if (\"hashCode\".equals(methodName) && parameterTypes.length == 0) {
            return invoker.hashCode();
        }
        if (\"equals\".equals(methodName) && parameterTypes.length == 1) {
            return invoker.equals(args[0]);
        }
        // 将 method 和 args 封装到 RpcInvocation 中,并执行后续的调用
        return invoker.invoke(new RpcInvocation(method, args)).recreate();
    }
}

invoke 方法判断如果是 java 内置的一下方法,则直接调用,不走 dubbo 的逻辑。所以我们关注的是 invoker.invoke() 。类变量 invoker 实际上是 FailoverClusterInvoker, 但是又被 MockClusterInvoker包装了一层。这个 FailoverClusterInvoker 是由FailoverCluster生成的,请看 服务引用#远程引用 。而 MockClusterInvoker 是由MockClusterWrapper生成,其基于Dubbo的SPI机制,将 FailoverCluster 又包装了一遍。MockClusterInvoker内部封装了服务降级逻辑。以后再开坑聊。

我们在 Dubbo集群 文章中讲过FailoverClusterInvoker,所以直接快进到DubboInvoker#doInvoke()方法。此时是不是一脸懵逼,为啥从 FailoverClusterInvoker 一下子就到了 DubboInvoker ,我们先来看看调用栈

Untitled

我们把视角拉回FailoverClusterInvoker#doInvoke,看看通过负载均衡选出的 invoker

Untitled

从图片可以看到,最外层的invoker是一个内部类,是 服务目录通过订阅注册中心 生成的

invoker = new InvokerDelegate<T>(protocol.refer(serviceType, url), url, providerUrl);

而 protocol 实际是DubboProtocol,所以 protocol.refer(serviceType, url) 生成的是DubboInvoker,至于为啥调用链这么长,是因为ProtocolFilterWrapper,这个类增加了对Dubbo过滤器的支持。这是一个 protocol 的包装类,它包装了DubboProtocol#refer() ,我们取看看 ProtocolFilterWrapper的源码

@Override
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
    if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
        return protocol.refer(type, url);
    }
    // 创建invoker链条
    return buildInvokerChain(protocol.refer(type, url), Constants.REFERENCE_FILTER_KEY, Constants.CONSUMER);
}

private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
    Invoker<T> last = invoker;
    // 获取过滤器
    List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
    if (!filters.isEmpty()) {
        for (int i = filters.size() - 1; i >= 0; i--) {
            final Filter filter = filters.get(i);
            final Invoker<T> next = last;
            // 对invoker进行封装,责任链模式
            last = new Invoker<T>() {
                ......

                @Override
                public Result invoke(Invocation invocation) throws RpcException {
                    return filter.invoke(next, invocation);
                }

            };
        }
    }
    return last;
}

buildInvokerChain 方法将 invoker 转换成责任链的形式,获取的 filters 为 {ConsumerContextFilter,FutureFilter,MonitorFilter},和图片中的调用栈就对应上了。

那么还剩下ListenerInvokerWrapper,这是一个 Invoker 包装类,由 ProtocolListenerWrapper 生成。

public class ProtocolListenerWrapper implements Protocol {

    @Override
    public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
        if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
            return protocol.refer(type, url);
        }
        // 封装了Invoker监听器
        return new ListenerInvokerWrapper<T>(protocol.refer(type, url),
                Collections.unmodifiableList(
                        ExtensionLoader.getExtensionLoader(InvokerListener.class)
                                .getActivateExtension(url, Constants.INVOKER_LISTENER_KEY)));
    }

}

public class ListenerInvokerWrapper<T> implements Invoker<T> {

    public ListenerInvokerWrapper(Invoker<T> invoker, List<InvokerListener> listeners) {
        if (invoker == null) {
            throw new IllegalArgumentException(\"invoker == null\");
        }
        this.invoker = invoker;
        this.listeners = listeners;
        if (listeners != null && !listeners.isEmpty()) {
            for (InvokerListener listener : listeners) {
                if (listener != null) {
                    try {
                        listener.referred(invoker);
                    } catch (Throwable t) {
                        logger.error(t.getMessage(), t);
                    }
                }
            }
        }
    }

    @Override
    public void destroy() {
        try {
            invoker.destroy();
        } finally {
            if (listeners != null && !listeners.isEmpty()) {
                for (InvokerListener listener : listeners) {
                    if (listener != null) {
                        try {
                            listener.destroyed(invoker);
                        } catch (Throwable t) {
                            logger.error(t.getMessage(), t);
                        }
                    }
                }
            }
        }
    }

}

总结一下:

ProtocolFilterWrapper是 Invoker 过滤器的支持,dubbo的过滤器用的也是责任链模式ListenerInvokerWrapper是 Invoker 监听器的支持

2.1.3 DubboInvoker

上面啰嗦了很多,终于回到主线 DubboInvoker 。它继承自 AbstractInvoker ,invoke 方法在抽象父类中

public abstract class AbstractInvoker<T> implements Invoker<T> {
    @Override
    public Result invoke(Invocation inv) throws RpcException {
        ...

        RpcInvocation invocation = (RpcInvocation) inv;
        // 设置 Invoker
        invocation.setInvoker(this);
        if (attachment != null && attachment.size() > 0) {
            // 设置 attachment
            invocation.addAttachmentsIfAbsent(attachment);
        }
        Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
        if (contextAttachments != null && contextAttachments.size() != 0) {
            // 添加 contextAttachments 到 RpcInvocation#attachment 变量中
            invocation.addAttachments(contextAttachments);
        }
        if (getUrl().getMethodParameter(invocation.getMethodName(), Constants.ASYNC_KEY, false)) {
            // 设置异步信息到 RpcInvocation#attachment 中
            invocation.setAttachment(Constants.ASYNC_KEY, Boolean.TRUE.toString());
        }
        // 添加调用id
        RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);

        Byte serializationId = CodecSupport.getIDByName(getUrl().getParameter(SERIALIZATION_KEY, DEFAULT_REMOTING_SERIALIZATION));
        if (serializationId != null) {
            invocation.put(SERIALIZATION_ID_KEY, serializationId);
        }

        try {
            // 抽象方法,由子类实现
            return doInvoke(invocation);
        } catch (InvocationTargetException e) { // biz exception
            ...
        } catch (RpcException e) {
            ...
        } catch (Throwable e) {
            return new RpcResult(e);
        }
    }
}

invoke 方法中,主要用于添加信息到 RpcInvocation#attachment 中,给后续的逻辑使用。重点是 doInvoke 方法,这是一个抽象方法,由子类 DubboInvoker 实现。

@Override
protected Result doInvoke(final Invocation invocation) throws Throwable {
    RpcInvocation inv = (RpcInvocation) invocation;
    final String methodName = RpcUtils.getMethodName(invocation);
    // 设置 path 和 version 到 attachment 中
    inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
    inv.setAttachment(Constants.VERSION_KEY, version);

    ExchangeClient currentClient;
    if (clients.length == 1) {
        // 从 clients 数组中获取 ExchangeClient
        currentClient = clients[0];
    } else {
        currentClient = clients[index.getAndIncrement() % clients.length];
    }
    try {
        // 获取异步配置
        boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
        // isOneway 为 true,表示“单向”通信
        boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
        int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
        // 异步无返回值
        if (isOneway) {
            boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
            // 发送请求
            currentClient.send(inv, isSent);
            // 设置上下文中的 future 字段为 null
            RpcContext.getContext().setFuture(null);
            // 返回一个空的 RpcResult
            return new RpcResult();
        // 异步有返回值
        } else if (isAsync) {
            // 发送请求,并得到一个 ResponseFuture 实例
            ResponseFuture future = currentClient.request(inv, timeout);
            // 设置 future 到上下文中
            RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
            // 暂时返回一个空结果
            return new RpcResult();
        // 同步调用
        } else {
            RpcContext.getContext().setFuture(null);
            // 发送请求,得到一个 ResponseFuture 实例,并调用该实例的 get 方法进行等待
            return (Result) currentClient.request(inv, timeout).get();
        }
    } catch (TimeoutException e) {
        throw new RpcException(RpcException.TIMEOUT_EXCEPTION, \"Invoke remote method timeout. method: \" + invocation.getMethodName() + \", provider: \" + getUrl() + \", cause: \" + e.getMessage(), e);
    } catch (RemotingException e) {
        throw new RpcException(RpcException.NETWORK_EXCEPTION, \"Failed to invoke remote method: \" + invocation.getMethodName() + \", provider: \" + getUrl() + \", cause: \" + e.getMessage(), e);
    }
}

doInvoke 方法主要是对同步和异步调用的逻辑处理。可以看到,在有返回值的情况下,同步和异步都是通过 currentClient.request 来发送请求。区别在于,同步调用会使用 ResponseFuture#get 方法阻塞,知道请求完成,得到返回值。而异步是将 ResponseFuture 放到上下文对象中,返回空结果。

FutureAdapter 是一个适配器,它实现了 jdk 内置的 Future 接口,将 ResponseFuture 转换成 Future 的用法,更贴合用户习惯。这里我们的重点是ResponseFuture是如何支持异步调用的,这个接口的默认实现是DefaultFuture

public class DefaultFuture implements ResponseFuture {
    private static final Map<Long, Channel> CHANNELS = new ConcurrentHashMap<Long, Channel>();
    private static final Map<Long, DefaultFuture> FUTURES = new ConcurrentHashMap<Long, DefaultFuture>();
    private final Lock lock = new ReentrantLock();
    private final Condition done = lock.newCondition();    

    // 构造方法
    public DefaultFuture(Channel channel, Request request, int timeout) {
        this.channel = channel;
        this.request = request;
        // 获取请求 id,这个 id 很重要,后面还会见到
        this.id = request.getId();
        this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
        // 存储 <requestId, DefaultFuture> 映射关系到 FUTURES 中
        FUTURES.put(id, this);
        CHANNELS.put(id, channel);
    }

    // 阻塞等待并获取请求结果
    @Override
    public Object get() throws RemotingException {
        return get(timeout);
    }

    @Override
    public Object get(int timeout) throws RemotingException {
        if (timeout <= 0) {
            timeout = Constants.DEFAULT_TIMEOUT;
        }
        // 检测服务提供方是否成功返回了调用结果
        if (!isDone()) {
            long start = System.currentTimeMillis();
            lock.lock();
            try {
                // 循环检测服务提供方是否成功返回了调用结果
                while (!isDone()) {
                    // 如果调用结果尚未返回,这里等待一段时间,默认1000毫秒
                    done.await(timeout, TimeUnit.MILLISECONDS);
                    // 如果调用结果成功返回,或等待超时,此时跳出 while 循环,执行后续的逻辑
                    if (isDone() || System.currentTimeMillis() - start > timeout) {
                        break;
                    }
                }
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } finally {
                lock.unlock();
            }
            // 如果调用结果仍未返回,则抛出超时异常
            if (!isDone()) {
                throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
            }
        }
        return returnFromResponse();
    }

    @Override
    public boolean isDone() {
        // 通过检测 response 字段为空与否,判断是否收到了调用结果
        return response != null;
    }

    // 当请求有响应时,调用此方法
    public static void received(Channel channel, Response response) {
        try {
            // 根据调用编号从 FUTURES 集合中查找指定的 DefaultFuture 对象
            DefaultFuture future = FUTURES.remove(response.getId());
            if (future != null) {
                future.doReceived(response);
            } else {
                // 这是请求超时,但是结果返回了的警告
                logger.warn(\"...\");
            }
        } finally {
            CHANNELS.remove(response.getId());
        }
    }

    private void doReceived(Response res) {
        lock.lock();
        try {
            // 保存响应对象
            response = res;
            if (done != null) {
                // 唤醒用户线程
                done.signal();
            }
        } finally {
            lock.unlock();
        }
        if (callback != null) {
            invokeCallback(callback);
        }
    }

}

上面对DefaultFuture做了部分代码精简。get 方法阻塞等待返回值。而 received 方法则是在请求有相应时,保存响应对象并唤醒 get 方法中的循环。这里是很典型的 future 结构的写法,有疑问的同学可以去了解下 Java 的并发知识。

2.2 服务消费方发送请求

上节讲了 Dubbo 的同步、异步调用方式。本节来讲讲有返回值的情况下,Dubbo 消费方是如何发送请求的。

我们把实现拉回 DubboInvoker#doInvoke 方法中,其有返回值的请求方法为 currentClient.request(inv, timeout),currentClient 为ReferenceCountExchangeClient,我们看下面这张调用栈图

Untitled

  • ReferenceCountExchangeClient:为 ExchangeClient 添加引用计数功能
  • HeaderExchangeClient:内部持有 client ,并封装了心跳的功能

从 DubboInvoker 到 HeaderExchangeChannel,在 服务引用 文章就讲过了,这里不再赘述。下面直接看HeaderExchangeChannel 中的 request 方法

public ResponseFuture request(Object request, int timeout) throws RemotingException {
    if (closed) {
        throw new RemotingException(this.getLocalAddress(), null, \"Failed to send request \" + request + \", cause: The channel \" + this + \" is closed!\");
    }
    // create request.
    // 创建 Request 对象
    Request req = new Request();
    req.setVersion(Version.getProtocolVersion());
    // 设置双向通信标志为 true
    req.setTwoWay(true);
    // 这里的 request 变量类型为 RpcInvocation
    req.setData(request);

    // 创建 DefaultFuture 对象
    DefaultFuture future = new DefaultFuture(channel, req, timeout);
    try {
        // 调用 NettyClient 的 send 方法发送请求(在父类AbstractPeer中)
        channel.send(req);
    } catch (RemotingException e) {
        future.cancel();
        throw e;
    }
    // 返回 DefaultFuture 对象
    return future;
}

从上面的方法可以看到,将请求数据封装成 Request 对象,传递给 DefaultFuture,再发送出去。Request 在构造方法中会创建请求id,用于在接收到响应时,确定是哪个请求的响应。继续看请求的发送方法 channel.send(req),channel 是 NettyClient,结合类图看调用路径

Untitled

public abstract class AbstractPeer implements Endpoint, ChannelHandler {
    @Override
    public void send(Object message) throws RemotingException {
        send(message, url.getParameter(Constants.SENT_KEY, false));
    }
}

public abstract class AbstractClient extends AbstractEndpoint implements Client {
    public void send(Object message, boolean sent) throws RemotingException {
        if (send_reconnect && !isConnected()) {
            connect();
        }
        // 获取 Channel,getChannel 是一个抽象方法,具体由子类实现
        Channel channel = getChannel();

        if (channel == null || !channel.isConnected()) {
            throw new RemotingException(this, \"message can not send, because channel is closed . url:\" + getUrl());
        }
        // 继续向下调用
        channel.send(message, sent);
    }
}

这里就两个重点,获取 channel 和 使用 channel 继续往下调用。先看看如何获取 channel

public class NettyClient extends AbstractClient {
    @Override
    protected com.alibaba.dubbo.remoting.Channel getChannel() {
        Channel c = channel;
        if (c == null || !c.isActive()) {
            return null;
        }
        // 获取一个 NettyChannel 类型对象
        return NettyChannel.getOrAddChannel(c, getUrl(), this);
    }
}

final class NettyChannel extends AbstractChannel {
    // 私有构造方法
    private NettyChannel(Channel channel, URL url, ChannelHandler handler) {
        super(url, handler);
        if (channel == null) {
            throw new IllegalArgumentException(\"netty channel == null;\");
        }
        this.channel = channel;
    }

    static NettyChannel getOrAddChannel(Channel ch, URL url, ChannelHandler handler) {
        if (ch == null) {
            return null;
        }

        // 尝试从集合中获取 NettyChannel 实例
        NettyChannel ret = channelMap.get(ch);
        if (ret == null) {
            // 如果 ret = null,则创建一个新的 NettyChannel 实例
            NettyChannel nettyChannel = new NettyChannel(ch, url, handler);
            if (ch.isActive()) {
                ret = channelMap.putIfAbsent(ch, nettyChannel);
            }
            if (ret == null) {
                ret = nettyChannel;
            }
        }
        return ret;
    }
}

获取 channel 的逻辑很简单,从缓存获取 NettyChannel,没有则创建。下面继续看 channel.send(message, sent)

public void send(Object message, boolean sent) throws RemotingException {
    super.send(message, sent);

    boolean success = true;
    int timeout = 0;
    try {
        // 发送消息(包含请求和响应消息)
        ChannelFuture future = channel.writeAndFlush(message);
        // sent 的值源于 <dubbo:method sent=\"true/false\" /> 中 sent 的配置值,有两种配置值:
        //   1. true: 等待消息发出,消息发送失败将抛出异常
        //   2. false: 不等待消息发出,将消息放入 IO 队列,即刻返回
        // 默认情况下 sent = false;
        if (sent) {
            timeout = getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
            // 等待消息发出,若在规定时间没能发出,success 会被置为 false
            success = future.await(timeout);
        }
        Throwable cause = future.cause();
        if (cause != null) {
            throw cause;
        }
    } catch (Throwable e) {
        throw new RemotingException(this, \"Failed to send message \" + message + \" to \" + getRemoteAddress() + \", cause: \" + e.getMessage(), e);
    }

    // 若 success 为 false,这里抛出异常
    if (!success) {
        throw new RemotingException(this, \"...\");
    }
}

至此,请求数据的发送过程就结束了。涉及 Netty 的发送编解码处理过程,感兴趣的可以从 NettyClient#doOpen方法入手,这里鉴于篇幅,就不写了。

2.2.1 调用路径

下面我们来总结一下消费端调用发送请求过程的调用栈(以 DemoService 为例)

proxy0#sayHello(String)
  —> InvokerInvocationHandler#invoke(Object, Method, Object[])
    —> MockClusterInvoker#invoke(Invocation)
      —> AbstractClusterInvoker#invoke(Invocation)
        —> FailoverClusterInvoker#doInvoke(Invocation, List<Invoker<T>>, LoadBalance)
          —> Filter#invoke(Invoker, Invocation)  // 包含多个 Filter 调用
            —> ListenerInvokerWrapper#invoke(Invocation) 
              —> AbstractInvoker#invoke(Invocation) 
                —> DubboInvoker#doInvoke(Invocation)
                  —> ReferenceCountExchangeClient#request(Object, int)
                    —> HeaderExchangeClient#request(Object, int)
                      —> HeaderExchangeChannel#request(Object, int)
                        —> AbstractPeer#send(Object)
                          —> AbstractClient#send(Object, boolean)
                            —> NettyChannel#send(Object, boolean)
                              —> NioClientSocketChannel#write(Object)

2.3 服务提供方接收请求

默认情况下 Dubbo 使用 Netty 作为底层的通信框架,从 NettyServer#doOpen 方法知道,接收请求的入口在 NettyServerHandler#channelRead,这里已经是解码之后得到的数据。然后数据依次经过 MultiMessageHandler、HeartbeatHandler 以及 AllChannelHandler 。至于为什么是这几个类以及顺序,可以去看 NettyServer 的构造方法。下面我们首先看调用栈

Untitled

public class NettyServer extends AbstractServer implements Server {
    public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
        super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
    }
}

public class ChannelHandlers {
    public static ChannelHandler wrap(ChannelHandler handler, URL url) {
        return ChannelHandlers.getInstance().wrapInternal(handler, url);
    }
}

public class ChannelHandlers {
    protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
        return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
                .getAdaptiveExtension().dispatch(handler, url)));
    }
}

MultiMessageHandler、HeartbeatHandler 直接通过构造方法创建,而 AllChannelHandler 则由 Dispatcher 的默认自适应拓展类 AllDispatcher 创建。

2.3.1 线程派发模型

刚才讲到了 Dispatcher,这是一个线程派发器。让我们回顾一下 Dubbo 服务调用过程图(图片来自官方文档)

Untitled

Dubbo 将底层通信框架中接收请求的线程称为 IO 线程。如果一些事件处理逻辑可以很快执行完,此时直接在 IO 线程上执行该段逻辑即可。但如果事件的处理逻辑比较耗时,比如该段逻辑会发起数据库查询或者 HTTP 请求。此时我们就不应该让事件处理逻辑在 IO 线程上执行,而是应该派发到线程池中去执行。原因也很简单,IO 线程主要用于接收请求,如果 IO 线程被占满,将导致它不能接收新的请求。PS:像不像Netty的主从模型,万物殊途同归啊。

Dispatcher 真实的职责创建具有线程派发能力的 ChannelHandler,比如 AllChannelHandler、MessageOnlyChannelHandler 和 ExecutionChannelHandler 等,其本身并不具备线程派发能力。Dubbo 支持 5 种不同的线程派发策略

策略 用途
all 所有消息都派发到线程池,包括请求,响应,连接事件,断开事件等
direct 所有消息都不派发到线程池,全部在 IO 线程上直接执行
message 只有请求和响应消息派发到线程池,其它消息均在 IO 线程上执行
execution 只有请求消息派发到线程池,不含响应。其它消息均在 IO 线程上执行
connection 在 IO 线程上,将连接断开事件放入队列,有序逐个执行,其它消息派发到线程池

下面我们看看默认的 AllChannelHandler

public class AllChannelHandler extends WrappedChannelHandler {
    /** 处理请求和响应消息,这里的 message 变量类型可能是 Request,也可能是 Response */
    @Override
    public void received(Channel channel, Object message) throws RemotingException {
        // 获取线程池,由自适应拓展生成,默认由 FixedThreadPool 生成
        ExecutorService cexecutor = getExecutorService();
        try {
            // 将请求和响应消息派发到线程池中处理
            cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
        } catch (Throwable t) {
        	if(message instanceof Request && t instanceof RejectedExecutionException){
        		Request request = (Request)message;
            // 如果通信方式为双向通信,此时将 Server side ... threadpool is exhausted
            // 错误信息封装到 Response 中,并返回给服务消费方。
        		if(request.isTwoWay()){
        			String msg = \"Server side(\" + url.getIp() + \",\" + url.getPort() + \") threadpool is exhausted ,detail msg:\" + t.getMessage();
        			Response response = new Response(request.getId(), request.getVersion());
        			response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
        			response.setErrorMessage(msg);
              // 返回包含错误信息的 Response 对象
        			channel.send(response);
        			return;
        		}
        	}
            throw new ExecutionException(message, channel, getClass() + \" error when process received event .\", t);
        }
    }
}

请求对象会被封装 ChannelEventRunnable 中,也就是 ChannelEventRunnable#run 方法才是实际处理请求的地方。

2.3.2 调用服务

public class ChannelEventRunnable implements Runnable {
    @Override
    public void run() {
        // 检测通道状态,对于请求或响应消息,此时 state = RECEIVED
        if (state == ChannelState.RECEIVED) {
            try {
                // 将 channel 和 message 传给 ChannelHandler 对象,进行后续的调用
                handler.received(channel, message);
            } catch (Exception e) {
                logger.warn(\"...\", e);
            }
        // 其他消息类型通过 switch 进行处理
        } else {
            switch (state) {
            case CONNECTED:
                ...
            case DISCONNECTED:
                ...
            case SENT:
                ...
            case CAUGHT:
                ...
            default:
                logger.warn(\"unknown state: \" + state + \", message is \" + message);
            }
        }
    }
}

ChannelEventRunnable 依然不进行调用逻辑,只是根据通道的状态将请求转发。可以注意一下,这里特意对 RECEIVED 状态用了 if 判断,然后其它状态使用 switch 来判断,是因为绝大部分的请求都是 RECEIVED 类型。

这里的 handler 是 DecodeHandler,这是一个解码处理器。也许你会以为,这个是不是和 InternalDecoder冲突了?既然解码操作已经在 IO 线程(也就是 Netty 的 WorkerGroup)中处理了,为什么到 Dubbo 线程池中,还要再处理一次?这取决于 decode.in.io 参数,允许将部分解码工作交由 Dubbo 线程池中完成。下面我们略过 DecodeHandler,快进到 HeaderExchangeHandler 中

public class HeaderExchangeHandler implements ChannelHandlerDelegate {
    @Override
    public void received(Channel channel, Object message) throws RemotingException {
        channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
        ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
        try {
            // 处理请求对象
            if (message instanceof Request) {
                // handle request.
                Request request = (Request) message;
                if (request.isEvent()) {
                    // 处理事件
                    handlerEvent(channel, request);
                // 处理普通的请求
                } else {
                    // 双向通信
                    if (request.isTwoWay()) {
                        // 向后调用服务,并得到调用结果
                        Response response = handleRequest(exchangeChannel, request);
                        // 将调用结果返回给服务消费端
                        channel.send(response);
                    } else {
                        // 如果是单向通信,仅向后调用指定服务即可,无需返回调用结果
                        handler.received(exchangeChannel, request.getData());
                    }
                }
            // 处理响应对象,服务消费方会执行此处逻辑
            } else if (message instanceof Response) {
                handleResponse(channel, (Response) message);
            } else if (message instanceof String) {
                // telnet 相关
                ...
            } else {
                handler.received(exchangeChannel, message);
            }
        } finally {
            HeaderExchangeChannel.removeChannelIfDisconnected(channel);
        }
    }

    // 处理请求
    Response handleRequest(ExchangeChannel channel, Request req) throws RemotingException {
        Response res = new Response(req.getId(), req.getVersion());
        // 检测请求是否合法,不合法则返回状态码为 BAD_REQUEST 的响应
        if (req.isBroken()) {
            ...
            return res;
        }
        // 获取 data 字段值,也就是 RpcInvocation 对象
        Object msg = req.getData();
        try {
            // handle data.
            // 继续向下调用
            Object result = handler.reply(channel, msg);
            // 设置 OK 状态码
            res.setStatus(Response.OK);
            // 设置调用结果
            res.setResult(result);
        } catch (Throwable e) {
            // 若调用过程出现异常,则设置 SERVICE_ERROR,表示服务端异常
            res.setStatus(Response.SERVICE_ERROR);
            res.setErrorMessage(StringUtils.toString(e));
        }
        return res;
    }
}

处理过程注释中已经写了。通过 handleRequest 方法处理请求得到返回值,并通过 channel.send 将结果返回给消费者。(碎碎念:这个 channel 和 Netty 的是真的像)

handleRequest 方法中主要是对 Response 对象的处理,我们继续跟进调用过程 handler.reply(channel, msg),这个 handler 是 DubboProtocol的类变量requestHandler

public class DubboProtocol extends AbstractProtocol {

    private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {

        @Override
        public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
            if (message instanceof Invocation) {
                Invocation inv = (Invocation) message;
                // 获取 Invoker 实例
                Invoker<?> invoker = getInvoker(channel, inv);
                // need to consider backward-compatibility if it\'s a callback
                // 回调相关
                if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
                    ...
                }
                RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
                // 通过 Invoker 调用具体的服务
                return invoker.invoke(inv);
            }
            throw new RemotingException(channel, \"...\");
        }

        ...
    };

    Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException {
        ...
        // 计算 service key,格式为 groupName/serviceName:serviceVersion:port。比如:
        //   dubbo/com.alibaba.dubbo.demo.DemoService:1.0.0:20880
        String serviceKey = serviceKey(port, path, inv.getAttachments().get(Constants.VERSION_KEY), inv.getAttachments().get(Constants.GROUP_KEY));

        // 从 exporterMap 查找与 serviceKey 相对应的 DubboExporter 对象,
        // 服务导出过程中会将 <serviceKey, DubboExporter> 映射关系存储到 exporterMap 集合中
        DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey);

        if (exporter == null)
            throw new RemotingException(channel, \"Not found exported service: \" + serviceKey + \" in \" + exporterMap.keySet() + \", may be version or group mismatch \" + \", channel: consumer: \" + channel.getRemoteAddress() + \" --> provider: \" + channel.getLocalAddress() + \", message:\" + inv);

        // 获取 Invoker 对象,并返回
        return exporter.getInvoker();
    }
}

reply 方法先是获取 Invoker 实例,然后通过 Invoker 调用具体的服务。想了解 Invoker 的创建以及如何放入到 exporterMap 中的,可以看以前写过的 服务导出 文章。下面这段在 服务导出 文章中均有提过,不想看的可以直接跳到本节末尾看调用路径。

invoke 方法定义在 AbstractProxyInvoker 中

public abstract class AbstractProxyInvoker<T> implements Invoker<T> {

    @Override
    public Result invoke(Invocation invocation) throws RpcException {
        try {
            // 调用 doInvoke 执行后续的调用,并将调用结果封装到 RpcResult 中,并
            return new RpcResult(doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments()));
        } catch (InvocationTargetException e) {
            return new RpcResult(e.getTargetException());
        } catch (Throwable e) {
            throw new RpcException(\"Failed to invoke remote proxy method ...\");
        }
    }
    
    protected abstract Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable;
}

doInvoke 是一个抽象方法,这个需要由具体的 Invoker 实例实现。Invoker 实例是在运行时通过 JavassistProxyFactory 创建的

public class JavassistProxyFactory extends AbstractProxyFactory {
    
    @Override
    public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
        final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf(\'$\') < 0 ? proxy.getClass() : type);
        // 创建匿名类对象
        return new AbstractProxyInvoker<T>(proxy, type, url) {
            @Override
            protected Object doInvoke(T proxy, String methodName,
                                      Class<?>[] parameterTypes,
                                      Object[] arguments) throws Throwable {
                // 调用 invokeMethod 方法进行后续的调用
                return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
            }
        };
    }
}

Wrapper 是一个抽象类,其中 invokeMethod 是一个抽象方法。Dubbo 会在运行时通过 Javassist 框架为 Wrapper 生成实现类,并实现 invokeMethod 方法,该方法最终会根据调用信息调用具体的服务。以 DemoServiceImpl 为例,Javassist 为其生成的代理类如下。

public class Wrapper0 extends Wrapper implements ClassGenerator.DC {
    // 省略其他方法

    public Object invokeMethod(Object object, String string, Class[] arrclass, Object[] arrobject) throws InvocationTargetException {
        DemoService demoService;
        try {
            // 类型转换
            demoService = (DemoService)object;
        }
        catch (Throwable throwable) {
            throw new IllegalArgumentException(throwable);
        }
        try {
            // 根据方法名调用指定的方法
            if (\"sayHello\".equals(string) && arrclass.length == 1) {
                return demoService.sayHello((String)arrobject[0]);
            }
        }
        catch (Throwable throwable) {
            throw new InvocationTargetException(throwable);
        }
        throw new NoSuchMethodException(new StringBuffer().append(\"Not found method \\\"\").append(string).append(\"\\\" in class com.alibaba.dubbo.demo.DemoService.\").toString());
    }
}

至此,服务端调用服务的过程就讲完了。

2.3.3 调用路径

下面我们来总结一下服务端调用服务过程的调用栈(以 DemoService 为例)

// 这是IO线程的调用过程
NettyServerHandler#channelRead(ChannelHandlerContext, Object)
  —> AbstractPeer#received(Channel, Object)
    —> MultiMessageHandler#received(Channel, Object)
      —> HeartbeatHandler#received(Channel, Object)
        —> AllChannelHandler#received(Channel, Object)
// 这是转发到线程池之后的调用过程
ChannelEventRunnable#run()
  —> DecodeHandler#received(Channel, Object)
    —> HeaderExchangeHandler#received(Channel, Object)
      —> HeaderExchangeHandler#handleRequest(ExchangeChannel, Request)
        —> DubboProtocol.requestHandler#reply(ExchangeChannel, Object)
          —> Filter#invoke(Invoker, Invocation)
            —> AbstractProxyInvoker#invoke(Invocation)
              —> Wrapper0#invokeMethod(Object, String, Class[], Object[])
                —> DemoServiceImpl#sayHello(String)

2.4 服务提供方返回调用结果

在 2.3.2 节中讲了,调用结果会封装在 Response 对象中,并由NettyChannel 的 send 方法将 Response 对象返回。详情请看 HeaderExchangeHandler。至于返回 Response 过程中的编码过程,我们省略。

2.5 服务消费方接收调用结果

消费者接收响应数据的处理过程中,从 NettyHandler (消费者是 NettyClientHandler,生产者是 NettyServerHandler,不过他们的 channelRead 方法一模一样) 到 AllChannelHandler 的处理过程与服务提供方接收请求(2.3节)的处理过程一致,就不重复分析了。所以本节重点在 Dubbo如何将调用结果传递给用户线程。

2.5.1 向用户线程传递调用结果

我们直接快进到 HeaderExchangeHandler 的 received 方法中(调用路径请看 2.3.2 节末尾)

public class HeaderExchangeHandler implements ChannelHandlerDelegate {

    public void received(Channel channel, Object message) throws RemotingException {
        channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
        ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
        try {
            // 处理请求对象
            if (message instanceof Request) {
                ...
            // 处理响应对象,服务消费方会执行此处逻辑
            } else if (message instanceof Response) {
                handleResponse(channel, (Response) message);
            } else if (message instanceof String) {
                ...
            } else {
                handler.received(exchangeChannel, message);
            }
        } finally {
            HeaderExchangeChannel.removeChannelIfDisconnected(channel);
        }
    }

    static void handleResponse(Channel channel, Response response) throws RemotingException {
        if (response != null && !response.isHeartbeat()) {
            DefaultFuture.received(channel, response);
        }
    }
}

可以看到,是调用 DefaultFuture#receive 方法处理的,DefaultFuture 对象我们在 2.1.3 节有讲到,继续追踪代码

public class DefaultFuture implements ResponseFuture {

    private static final Map<Long, DefaultFuture> FUTURES = new ConcurrentHashMap<Long, DefaultFuture>();

    public static void received(Channel channel, Response response) {
        try {
            // 根据调用编号从 FUTURES 集合中查找指定的 DefaultFuture 对象
            DefaultFuture future = FUTURES.remove(response.getId());
            if (future != null) {
                future.doReceived(response);
            } else {
                // 这是请求超时,但是结果返回了的警告
                logger.warn(\"...\");
            }
        } finally {
            CHANNELS.remove(response.getId());
        }
    }

    private void doReceived(Response res) {
        lock.lock();
        try {
            // 保存响应对象
            response = res;
            if (done != null) {
                // 唤醒用户线程
                done.signal();
            }
        } finally {
            lock.unlock();
        }
        if (callback != null) {
            invokeCallback(callback);
        }
    }
}

在一次调用过程中,请求和相应的编号是一致的,所以可以根据调用编号从 FUTURES 中得到发起请求时创建的 DefaultFuture 。DefaultFuture.get 方法阻塞等待响应结果,而 DefaultFuture#received 是得到响应结果之后唤醒用户线程(也就是 get 方法中的循环)。这两个方法结合起来看就明白了。

3. 总结

没啥好总结的,Dubbo 系列就写完了。阅读优秀框架的源码从大的方面可以学习其思想以及架构,小的方面就是一个个小功能的写法,比如负载均衡算法、DefaultFuture、SPI 等等。

PS:总感觉 Dubbo 和 Netty 的线程模型殊途同归


参考资料

Dubbo开发指南


来源:https://www.cnblogs.com/konghuanxi/p/16645441.html
本站部分图文来源于网络,如有侵权请联系删除。

未经允许不得转载:百木园 » Dubbo源码(九)

相关推荐

  • 暂无文章