`
ahua186186
  • 浏览: 554104 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
社区版块
存档分类
最新评论

summercool-hsf &Netty3.X总结2--客户端建立连接环节

 
阅读更多
客户端建立连接环节:(线程就像车的引擎,是RPC框架设计的关键)

1.主线程(main)初始化2个对象:ChannelFactory 和ChannelPipelineFactory

(1)初始化NioClientSocketChannelFactory的3个属性:bossPool ,workerPool ,sink (new NioClientSocketPipelineSink(bossPool))并启动boss和工作线程(见文章后面的第5点)

(2)初始化DefaultChannelPipeline的属性:name2ctx,tail,

2.主线程(main)通过 clientbootstrap.connect(socketAddress)初始化NioClientSocketChannel的work线程,pipeline,sink:

由此可见:NETTY3.X是一个NioClientSocketChannel一个work线程。

public ChannelFuture connect(final SocketAddress remoteAddress, final SocketAddress localAddress) {

        if (remoteAddress == null) {
            throw new NullPointerException("remoteAddress");
        }

        ChannelPipeline pipeline;
        try {
            pipeline = getPipelineFactory().getPipeline();
        } catch (Exception e) {
            throw new ChannelPipelineException("Failed to initialize a pipeline.", e);
        }

        // Set the options.
        Channel ch = getFactory().newChannel(pipeline);
        boolean success = false;
        try {
            ch.getConfig().setOptions(getOptions());
            success = true;
        } finally {
            if (!success) {
                ch.close();
            }
        }

        // Bind.
        if (localAddress != null) {
            ch.bind(localAddress);
        }

        // Connect.
        return ch.connect(remoteAddress);
    }



public SocketChannel newChannel(ChannelPipeline pipeline) {
        return new NioClientSocketChannel(this, pipeline, sink, workerPool.nextWorker());
    }



3.最终经过设定好的DefaultChannelPipeline的handler处理后交给NioClientSocketPipelineSink处理事件:OPEN,CONNECTED:
 public void eventSunk(
            ChannelPipeline pipeline, ChannelEvent e) throws Exception {
        if (e instanceof ChannelStateEvent) {
            ChannelStateEvent event = (ChannelStateEvent) e;
            NioClientSocketChannel channel =
                (NioClientSocketChannel) event.getChannel();
            ChannelFuture future = event.getFuture();
            ChannelState state = event.getState();
            Object value = event.getValue();

            switch (state) {
            case OPEN:
                if (Boolean.FALSE.equals(value)) {
                    channel.worker.close(channel, future);
                }
                break;
            case BOUND:
                if (value != null) {
                    bind(channel, future, (SocketAddress) value);
                } else {
                    channel.worker.close(channel, future);
                }
                break;
            case CONNECTED:
                if (value != null) {
                    connect(channel, future, (SocketAddress) value);
                } else {
                    channel.worker.close(channel, future);
                }
                break;
            case INTEREST_OPS:
                channel.worker.setInterestOps(channel, future, ((Integer) value).intValue());
                break;
            }
        } else if (e instanceof MessageEvent) {
            MessageEvent event = (MessageEvent) e;
            NioSocketChannel channel = (NioSocketChannel) event.getChannel();
            boolean offered = channel.writeBufferQueue.offer(event);
            assert offered;
            channel.worker.writeFromUserCode(channel);
        }
    }

4.注册事件:

(1)注册NioClientSocketChannel的OP_READ事件到nioworker线程的selector上进行监听
nioworker类:
private final class RegisterTask implements Runnable {
        private final NioSocketChannel channel;
        private final ChannelFuture future;
        private final boolean server;

        RegisterTask(
                NioSocketChannel channel, ChannelFuture future, boolean server) {

            this.channel = channel;
            this.future = future;
            this.server = server;
        }

        public void run() {
            SocketAddress localAddress = channel.getLocalAddress();
            SocketAddress remoteAddress = channel.getRemoteAddress();

            if (localAddress == null || remoteAddress == null) {
                if (future != null) {
                    future.setFailure(new ClosedChannelException());
                }
                close(channel, succeededFuture(channel));
                return;
            }

            try {
                if (server) {
                    channel.channel.configureBlocking(false);
                }

                channel.channel.register(
                        selector, channel.getInternalInterestOps(), channel);

                if (future != null) {
                    channel.setConnected();
                    future.setSuccess();
                }

                if (server || !((NioClientSocketChannel) channel).boundManually) {
                    fireChannelBound(channel, localAddress);
                }
                fireChannelConnected(channel, remoteAddress);
            } catch (IOException e) {
                if (future != null) {
                    future.setFailure(e);
                }
                close(channel, succeededFuture(channel));
                if (!(e instanceof ClosedChannelException)) {
                    throw new ChannelException(
                            "Failed to register a socket to the selector.", e);
                }
            }
        }
    }



(2)注册NioClientSocketChannel的OP_CONNECT事件到NIOclientBoss线程的selector上进行监听

NioClientBoss类:

 private final class RegisterTask implements Runnable {
        private final NioClientBoss boss;
        private final NioClientSocketChannel channel;

        RegisterTask(NioClientBoss boss, NioClientSocketChannel channel) {
            this.boss = boss;
            this.channel = channel;
        }

        public void run() {
            int timeout = channel.getConfig().getConnectTimeoutMillis();
            if (timeout > 0) {
                if (!channel.isConnected()) {
                    channel.timoutTimer = timer.newTimeout(wakeupTask,
                            timeout, TimeUnit.MILLISECONDS);
                }
            }
            try {
                channel.channel.register(
                        boss.selector, SelectionKey.OP_CONNECT, channel);
            } catch (ClosedChannelException e) {
                channel.worker.close(channel, succeededFuture(channel));
            }

            int connectTimeout = channel.getConfig().getConnectTimeoutMillis();
            if (connectTimeout > 0) {
                channel.connectDeadlineNanos = System.nanoTime() + connectTimeout * 1000000L;
            }
        }
    }



5.NioClientSocketChannelFactory启动的IO线程:(1个boss多个work线程)


初始化线程池:

	private static ExecutorService getCachedExecutor(String name) {
		//newCachedThreadPool:根据需要创建线程,需考虑内存是否够用,理论上一个线程1M内存,存在内存溢出的风险
		return Executors.newCachedThreadPool(new NamedThreadFactory(name));
	}

	public AbstractHsfService() {
		this(getCachedExecutor("HSF-BOSS-PROCESSOR"), Runtime.getRuntime().availableProcessors() + 1);
	}

	public AbstractHsfService(Executor bossExecutor, int workerCount) {
		this(bossExecutor, getCachedExecutor("HSF-WORKER-PROCESSOR"), workerCount);
	}

	public AbstractHsfService(Executor bossExecutor, Executor workerExecutor, int workerCount) {
		if (bossExecutor == null) {
			throw new IllegalArgumentException("bossExecutor can not be null.");
		} else if (workerExecutor == null) {
			throw new IllegalArgumentException("workerExecutor can not be null.");
		} else if (workerCount <= 0) {
			throw new IllegalArgumentException("workerCount required > 0.");
		} else if (workerExecutor instanceof ThreadPoolExecutor
				&& ((ThreadPoolExecutor) workerExecutor).getMaximumPoolSize() < workerCount) {
			throw new IllegalArgumentException("the maximum pool size of workerExecutor required >= workerCount.");
		}

		this.bossExecutor = bossExecutor;
		this.workerExecutor = workerExecutor;
		this.workerCount = workerCount;

		init();
	}


启动工作线程:AbstractNioWorkerPool类

   protected void init() {
        if (!initialized.compareAndSet(false, true)) {
            throw new IllegalStateException("initialized already");
        }

        for (int i = 0; i < workers.length; i++) {
            workers[i] = newWorker(workerExecutor);
        }

        waitForWorkerThreads();
    }



启动BOSS线程:AbstractNioBossPool类
 protected void init() {
        if (!initialized.compareAndSet(false, true)) {
            throw new IllegalStateException("initialized already");
        }

        for (int i = 0; i < bosses.length; i++) {
            bosses[i] = newBoss(bossExecutor);
        }

        waitForBossThreads();
    }


6.在DispatchUpStreamHandler中初始化封装的HsfChannel:
@Override
	public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
		logger.warn("channel {} is connected.", e.getChannel());
		//
		HsfChannel hsfChannel = new HsfChannel(eventDispatcher.getService(), e.getChannel());
		ChannelGroupFuture channelFuture = ChannelGroupFutureHolder.remove(hsfChannel.getId());
		hsfChannel.setChannelGroupFuture(channelFuture);
		//
		eventDispatcher.getService().getChannels().put(hsfChannel.getId(), hsfChannel);
		eventDispatcher.dispatchChannelEvent(ctx, hsfChannel, e);

		super.channelConnected(ctx, e);
	}

  • 大小: 17.7 KB
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics