try { // Loop until we receive a shutdown command while (!stopCalled) { while (endpoint.isPaused() && !stopCalled) { // ... }
if (stopCalled) { break; } state = AcceptorState.RUNNING;
try { //if we have reached max connections, wait // 检查当前最大连接数 // 若未达到maxConnections则加1,否则等待 endpoint.countUpOrAwaitConnection();
// Endpoint might have been paused while waiting for latch // If that is the case, don't accept new connections if (endpoint.isPaused()) { continue; }
U socket = null; try { // Accept the next incoming connection from the server // socket // 以NioEndpoint为例,以nio的方式建立socket连接 socket = endpoint.serverSocketAccept(); } catch (Exception ioe) { // .... } // Successful accept, reset the error delay errorDelay = 0;
// Configure the socket if (!stopCalled && !endpoint.isPaused()) { // setSocketOptions() will hand the socket off to // an appropriate processor if successful // setSocketOptions() NioEndpoint交给Poller线程处理 if (!endpoint.setSocketOptions(socket)) { endpoint.closeSocket(socket); } } else { endpoint.destroySocket(socket); } } catch (Throwable t) { // .... } } } finally { stopLatch.countDown(); } state = AcceptorState.ENDED; } }
publicclassNioEndpointextendsAbstractJsseEndpoint<NioChannel,SocketChannel> { /** * Poller class. */ publicclassPollerimplementsRunnable{ /** * The background thread that adds sockets to the Poller, checks the * poller for triggered events and hands the associated socket off to an * appropriate processor as events occur. */ @Override publicvoidrun(){ // Loop until destroy() is called while (true) { boolean hasEvents = false; try { if (!close) { hasEvents = events(); if (wakeupCounter.getAndSet(-1) > 0) { // If we are here, means we have other stuff to do // Do a non blocking select keyCount = selector.selectNow(); } else { keyCount = selector.select(selectorTimeout); } wakeupCounter.set(0); } if (close) { events(); timeout(0, false); try { selector.close(); } catch (IOException ioe) { log.error(sm.getString("endpoint.nio.selectorCloseFail"), ioe); } break; } // Either we timed out or we woke up, process events first if (keyCount == 0) { hasEvents = (hasEvents | events()); } } catch (Throwable x) { ExceptionUtils.handleThrowable(x); log.error(sm.getString("endpoint.nio.selectorLoopError"), x); continue; }
Iterator<SelectionKey> iterator = keyCount > 0 ? selector.selectedKeys().iterator() : null; // Walk through the collection of ready keys and dispatch // any active event. while (iterator != null && iterator.hasNext()) { SelectionKey sk = iterator.next(); iterator.remove(); NioSocketWrapper socketWrapper = (NioSocketWrapper) sk.attachment(); // Attachment may be null if another thread has called // cancelledKey() if (socketWrapper != null) { processKey(sk, socketWrapper); } } // Process timeouts timeout(keyCount,hasEvents); } getStopLatch().countDown(); } } }
Poller的run()方法
while true循环中,hasEvents = events(); 检查事件同步队列中events.poll()是否有事件,有就开始处理事件(此事件是由acceptor线程添加到队列当中的);