从dubbo看Netty使用 有更新!

  |   0 评论   |   2,445 浏览

    38套精品Java架构师高并发高性能高可用分布式集群电商缓存性能调优项目实战教程(高级进阶篇460GB)
    20套精品《Java从零基础到架构师学习路径》配套教程(基础中级篇600GB)

    Netty是个高性能的Java网络传输框架,在很多中间件或者分布式框架中几乎都能看到它的身影。既然Netty这么受欢迎,那到底怎么把netty嵌入到我们的系统中了?笔者在几年前就接触了Netty,也开发了个小项目。一直问题困扰自己,Netty使用场景是什么?怎么使用Netty?我们可以从dubbo中找到些答案。dubbo是高性能轻量级的RPC框架。

    netty本质的功能负责网络传输,dubbo使用netty作为网络传输框架。说到网络传输自然离不开SocketSocket是端到端的连接。dubbo是无中心化,每个client端都能与server端连接,每个client端同时又是server端。

    dubboclient端主要实现AbstractClientNettyClient扩展继承了它。一般来说对于同一个server端来说(ipport相同),只有一个client实例对应,也就是dubbo所说的共享连接。从DubboProtocol类实现可以找到

        private ExchangeClient[] getClients(URL url){

            //是否共享连接

            boolean service_share_connect = false;

            int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0);

            //如果connections不配置,则共享连接,否则每服务每连接

            if (connections == 0){

                service_share_connect = true;

                connections = 1;

            }

     

            ExchangeClient[] clients = new ExchangeClient[connections];

            for (int i = 0; i < clients.length; i++) {

                if (service_share_connect){

                    clients[i] = getSharedClient(url);

                } else {

                    clients[i] = initClient(url);

                }

            }

            return clients;

    }

    从代码可以看出NettyClient实例并不轻,尽量减少NettyClient实例,这也是多个服务共享连接的原因之一。在设计类似NettyClient的时候,不要忘了实例化NettyClient的开销。一定不要忘记了缓存或者连接池的使用。大都优秀的框架都是运用了这两个思想。

        private static final Logger logger = LoggerFactory.getLogger(AbstractClient.class);

     

        protected static final String CLIENT_THREAD_POOL_NAME  ="DubboClientHandler";

     

        private static final AtomicInteger CLIENT_THREAD_POOL_ID = new AtomicInteger();

     

        private final Lock            connectLock = new ReentrantLock();

     

        //重连调度器

        private static final ScheduledThreadPoolExecutor reconnectExecutorService = new ScheduledThreadPoolExecutor(2, new NamedThreadFactory("DubboClientReconnectTimer", true));

     

        private volatile  ScheduledFuture<?> reconnectExecutorFuture = null;

     

        protected volatile ExecutorService executor;

     

        private final boolean send_reconnect ;

     

        private final AtomicInteger reconnect_count = new AtomicInteger(0);

     

        //重连的error日志是否已经被调用过.

        private final AtomicBoolean reconnect_error_log_flag = new AtomicBoolean(false) ;

     

        //重连warning的间隔.(waring多少次之后,warning一次) //for test

        private final int reconnect_warning_period ;

     

        //the last successed connected time

        private long lastConnectedTime = System.currentTimeMillis();

     

    private final long shutdown_timeout ;

    nettyClient怎么初始化的打开的?从代码可以看出写的中规中矩,并没有惊奇,从很多教程几乎都能看到这样的实现。

        @Override

        protected void doOpen() throws Throwable {

            NettyHelper.setNettyLoggerFactory();

            bootstrap = new ClientBootstrap(channelFactory);

            // config

            // @see org.jboss.netty.channel.socket.SocketChannelConfig

            bootstrap.setOption("keepAlive", true);

            bootstrap.setOption("tcpNoDelay", true);

            bootstrap.setOption("connectTimeoutMillis", getTimeout());

            final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);

            bootstrap.setPipelineFactory(new ChannelPipelineFactory() {

                public ChannelPipeline getPipeline() {

                    NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);

                    ChannelPipeline pipeline = Channels.pipeline();

                    pipeline.addLast("decoder", adapter.getDecoder());

                    pipeline.addLast("encoder", adapter.getEncoder());

                    pipeline.addLast("handler", nettyHandler);

                    return pipeline;

                }

            });

    }

    dubbo为了实现对Channel的抽象,不依赖Netty的实现,自己设计了Channel类,而NettyChannel只不过是dubboChannel其中一种实现而已。NettyChannel类保存了一个静态变量channelMap,这个是map型变量。原生的Channeldubbo定制化的NettyChannel一对一对应绑定起来。

    final class NettyChannel extends AbstractChannel {

     

        private static final ConcurrentMap<org.jboss.netty.channel.Channel, NettyChannel> channelMap = new ConcurrentHashMap<org.jboss.netty.channel.Channel, NettyChannel>();

    一对一绑定实现

        static NettyChannel getOrAddChannel(org.jboss.netty.channel.Channel ch, URL url, ChannelHandler handler) {

            if (ch == null) {

                return null;

            }

            NettyChannel ret = channelMap.get(ch);

            if (ret == null) {

                NettyChannel nc = new NettyChannel(ch, url, handler);

                if (ch.isConnected()) {

                    ret = channelMap.putIfAbsent(ch, nc);

                }

                if (ret == null) {

                    ret = nc;

                }

            }

            return ret;

        }

    NettyHandler是对ChannelHandler一层封装。ChannelHandler大量采用装饰器模式和委托模式,这类似Java中的IOStream。通过装饰器模式使得ChannelHandler具有解码,统计,分发等等功能。最里层ChannelHandlerDubboProtocol类中的内部类。reply方法看起来不来,主要做了2件事:获取对应的Invoker,执行invoke调用。

       private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {

     

            public Object reply(ExchangeChannel channel, Object message) throws RemotingException {

                if (message instanceof Invocation) {

                    Invocation inv = (Invocation) message;

                    Invoker<?> invoker = getInvoker(channel, inv);

                    //如果是callback 需要处理高版本调用低版本的问题

                    if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))){

                        String methodsStr = invoker.getUrl().getParameters().get("methods");

                        boolean hasMethod = false;

                        if (methodsStr == null || methodsStr.indexOf(",") == -1){

                            hasMethod = inv.getMethodName().equals(methodsStr);

                        } else {

                            String[] methods = methodsStr.split(",");

                            for (String method : methods){

                                if (inv.getMethodName().equals(method)){

                                    hasMethod = true;

                                    break;

                                }

                            }

                        }

                        if (!hasMethod){

                            logger.warn(new IllegalStateException("The methodName "+inv.getMethodName()+" not found in callback service interface ,invoke will be ignored. please update the api interface. url is:" + invoker.getUrl()) +" ,invocation is :"+inv );

                            return null;

                        }

                    }

                    RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());

                    return invoker.invoke(inv);

                }

                throw new RemotingException(channel, "Unsupported request: " + message == null ? null : (message.getClass().getName() + ": " + message) + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());

            }

    ...

    NettyHandler继承了SimpleChannelHandler,是我们最需要关注和设计的类,因为它是Netty提供开发者最有控制权的类。任何依赖Netty的框架都需要定制化NettyHandler类。dubbo也不例外,对NettyHandler进行了大量抽象和封装,使其能满足自身功能的需要。

    评论

    发表评论

    validate