Connector(连接器)组件负责生成请求对象和响应对象的,Tomcat默认为HttpConnector,负责根据收到的Http请求报文生成Request对象和Response对象,并把这两个对象传递给Container,然后根据Response中的内容生成相应的HTTP报文。

Connector是使用ProtocolHandler来处理请求的,不同的ProtocolHandler代表不同的连接类型。
Http11Protocol使用的是普通Socket来连接的,Http11NioProtocol使用的是NioSocket来连接的。

ProtocolHandler

  • Endpoint
    Endpoint用来处理底层Socket的网络连接,由于是处理底层的Socket网络连接,因此Endpoint是用来实现TCP/IP协议的。

    Endpoint的抽象实现类AbstractEndpoint里面定义的Acceptor和AsyncTimeout两个内部类和一个Handler接口。

    • Acceptor
      Acceptor用于监听请求。
    • Handler
      Handler用于处理接收到的Socket,在内部调用Processor进行处理。
    • AsyncTimeout
      AsyncTimeout用于检查异步Request的超时。
  • Processor
    Processor用于将Endpoint接收到的Socket封装成Request,用来实现HTTP协议的。

  • Adaptor
    Adapter用于将Request交给Container进行具体的处理,用来将请求适配到Servlet容器进行具体的处理。

本文源码基于Tomcat 8.5.x版本。

Http11NioProtocol

NioEndpoint

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
// NioEndpoint#startInternal()
public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel> {
@Override
public void startInternal() throws Exception {
if (!running) {
// ...

// Create worker collection
if (getExecutor() == null) {
createExecutor();
}

initializeConnectionLatch();

// Start poller thread
poller = new Poller();
Thread pollerThread = new Thread(poller, getName() + "-Poller");
pollerThread.setPriority(threadPriority);
pollerThread.setDaemon(true);
pollerThread.start();

// Start acceptor thread
startAcceptorThread();
}
}
}

// AbstractEndpoint
public abstract class AbstractEndpoint<S,U> {

public void createExecutor() {
internalExecutor = true;
// Tomcat的自定义阻塞队列,重写offer方法,从而使得线程池的最大线程数在阻塞队列前生效
TaskQueue taskqueue = new TaskQueue();
TaskThreadFactory tf = new TaskThreadFactory(getName() + "-exec-", daemon, getThreadPriority());
executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), 60, TimeUnit.SECONDS,taskqueue, tf);
taskqueue.setParent( (ThreadPoolExecutor) executor);
}

protected void startAcceptorThread() {
acceptor = new Acceptor<>(this);
String threadName = getName() + "-Acceptor";
acceptor.setThreadName(threadName);
Thread t = new Thread(acceptor, threadName);
t.setPriority(getAcceptorThreadPriority());
t.setDaemon(getDaemon());
t.start();
}
}

Acceptor

acceptor线程的start:NioEndPoint的startInternal()方法中的startAcceptorThreads()方法调用。

主要负责socket的accept事件,而且默认情况下采用的是阻塞式。
采用阻塞式循环等待客户端请求,将接收到的请求包装一下,然后放入Poller的同步队列events中。

acceptor的run()方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
public class Acceptor<U> implements Runnable {
@Override
public void run() {

int errorDelay = 0;
long pauseStart = 0;

try {
// Loop until we receive a shutdown command
while (!stopCalled) {
while (endpoint.isPaused() && !stopCalled) {
// ...
}

if (stopCalled) {
break;
}
state = AcceptorState.RUNNING;

try {
//if we have reached max connections, wait
// 检查当前最大连接数
// 若未达到maxConnections则加1,否则等待
endpoint.countUpOrAwaitConnection();

// Endpoint might have been paused while waiting for latch
// If that is the case, don't accept new connections
if (endpoint.isPaused()) {
continue;
}

U socket = null;
try {
// Accept the next incoming connection from the server
// socket
// 以NioEndpoint为例,以nio的方式建立socket连接
socket = endpoint.serverSocketAccept();
} catch (Exception ioe) {
// ....
}
// Successful accept, reset the error delay
errorDelay = 0;

// Configure the socket
if (!stopCalled && !endpoint.isPaused()) {
// setSocketOptions() will hand the socket off to
// an appropriate processor if successful
// setSocketOptions() NioEndpoint交给Poller线程处理
if (!endpoint.setSocketOptions(socket)) {
endpoint.closeSocket(socket);
}
} else {
endpoint.destroySocket(socket);
}
} catch (Throwable t) {
// ....
}
}
} finally {
stopLatch.countDown();
}
state = AcceptorState.ENDED;
}
}
  1. socket = endpoint.serverSocketAccept()
    NioEndpoint中的serverSocketChannel一直接收客户端请求。
  2. endpoint.setSocketOptions(socket)
    setSocketOptions -> poller.register(socketWrapper) -> addEvent(pollerEvent) -> events.offer(event) -> queue[insert++] = t;
    serversocketchannel接收到客户端的channel后,取到一个poller线程,包装channel后注册channel,然后添加到poller的事件队列(同步队列SyncronizedQueue)events中。

Poller

poller线程的start:NioEndPoint的startInternal()方法中调用,其中poller的空参构造调用了selector.open(),即一个poller对应一个nio的selector。

主要负责check selecor是否准备好read/write,如果准备好了则将该channel包装包装丢给线程池去处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel> {
/**
* Poller class.
*/
public class Poller implements Runnable {
/**
* The background thread that adds sockets to the Poller, checks the
* poller for triggered events and hands the associated socket off to an
* appropriate processor as events occur.
*/
@Override
public void run() {
// Loop until destroy() is called
while (true) {
boolean hasEvents = false;
try {
if (!close) {
hasEvents = events();
if (wakeupCounter.getAndSet(-1) > 0) {
// If we are here, means we have other stuff to do
// Do a non blocking select
keyCount = selector.selectNow();
} else {
keyCount = selector.select(selectorTimeout);
}
wakeupCounter.set(0);
}
if (close) {
events();
timeout(0, false);
try {
selector.close();
} catch (IOException ioe) {
log.error(sm.getString("endpoint.nio.selectorCloseFail"), ioe);
}
break;
}
// Either we timed out or we woke up, process events first
if (keyCount == 0) {
hasEvents = (hasEvents | events());
}
} catch (Throwable x) {
ExceptionUtils.handleThrowable(x);
log.error(sm.getString("endpoint.nio.selectorLoopError"), x);
continue;
}

Iterator<SelectionKey> iterator =
keyCount > 0 ? selector.selectedKeys().iterator() : null;
// Walk through the collection of ready keys and dispatch
// any active event.
while (iterator != null && iterator.hasNext()) {
SelectionKey sk = iterator.next();
iterator.remove();
NioSocketWrapper socketWrapper = (NioSocketWrapper) sk.attachment();
// Attachment may be null if another thread has called
// cancelledKey()
if (socketWrapper != null) {
processKey(sk, socketWrapper);
}
}
// Process timeouts
timeout(keyCount,hasEvents);
}
getStopLatch().countDown();
}
}
}

Poller的run()方法

  1. while true循环中,hasEvents = events();
    检查事件同步队列中events.poll()是否有事件,有就开始处理事件(此事件是由acceptor线程添加到队列当中的);
  2. Iterator iterator =selector.selectedKeys().iterator()并遍历,处理所有事件 -> processKey() -> processSocket(READ/WRITE分别处理) -> Executor executor = getExecutor(); -> executor.execute(sc);调用线程池处理。

Executor