开始之前,再来看看流程图。

Flow chart

程序阻塞的主要有两部分,一个是等待数据到来,一个是将数据从内核复制到程序缓冲区。

事件选择模型,其实是在 select 模型的基础上更进一步地做了优化,这次优化把等待数据到来的这部分变成了非阻塞。而这主要是使用 WSAEventSelect 函数完成的,其原型如下:

int WSAEventSelect (
  SOCKET s,              // 套接字
  WSAEVENT hEventObject, // 事件对象
  long lNetworkEvents    // 网络事件
);

事件选择给每个套接字绑定了一个事件,当发生指定网络事件的时候它会把绑定的事件变为 signaled 状态。

WSAEVENT 其实和之前在 Windows 线程同步中所说的内核对象事件是一样的,只是在这个函数中需要手动重置事件,为了清晰方便,所以弄了个 typedef,并提供了一个 WSACreateEvent 函数来创建手动重置的事件。

WSAEVENT event = WSACreateEvent();

其实,这个 WSACreateEvent 函数就相当于调用以下语句:

CreateEvent(NULL, TRUE, FALSE, NULL);

lNetworkEvents 可以指定一些指定的网络事件,我们感兴趣的值如下:

FD_READ
FD_WRITE
FD_ACCEPT
FD_CONNECT
FD_CLOSE

当将套接字和事件绑定起来后,在网络事件发生的时候,OS就把事件设为signaled状态,然后我们就可以根据事件来进行处理。

那么如何验证是否发生了事件呢?

大家可能还记得在 Windows 线程中所说的 WaitForMultipleObjects,它用于等待多个内核对象变为 signaled 状态。在这里,它同样提供了一个 WSAWaitForMultipleEvents 函数来验证是否发生事件,原型如下:

DWORD WSAWaitForMultipleEvents(
  DWORD cEvents,  // 对象个数
  const WSAEVENT FAR *lphEvents,  // 事件数组地址
  BOOL fWaitAll,  // 等待标志
  DWORD dwTimeOUT,  //超时时间
  BOOL fAlertable
);

前四个参数的意义和 WSAForMultipleObjects 相同,最后一个参数 fAlerable 用于触发 alertable wait 状态,这个在重叠 IO 中会用到,现在只需填入 FALSE 即可。

若是超时,则会返回 WAIT_TIMEOUT;否则会返回一个常量,使用这个常量减去 WSA_WAIT_EVENT_0 可以得到事件数组中变为 signaled 状态的事件索引,若有多个事件都为 signaled 状态,那么返回的便是最小的那个索引。知道最小索引也就知道全部的了,可以对后面的依次再调用一次 WSAWaitForMultipleEvents 来确认所有发生事件的索引,具体的操作可以稍后在代码中看。

因为这个函数最多可等待 64 个事件,所以一次就只能监视 64 个,但可以创建线程或扩展保存句柄的数组,然后多次调用这个函数以监视更多事件。

现在已经得到发生事件的索引了,接下来如何确认发生的事件类型呢?

这得使用以下函数:

int WSAEnumNetworkEvents (
  SOCKET s,
  WSAEVENT hEventObject,
  LPWSANETWORKEVENTS lpNetworkEvents
);

前两个分别是发生事件的套接字和与之相连的事件,这两个参数通过上一步得到的索引便能得到了。lpNetworkEvents 是个指向 WSANETWORKEVENTS 的结构体,用于保存发生的事件类型或错误信息,原型如下:

typedef struct _WSANETWORKEVENTS {
   long lNetworkEvents;
   int iErrorCode[FD_MAX_EVENTS];
} WSANETWORKEVENTS, FAR * LPWSANETWORKEVENTS;

lNetworkEvents 就是用于保存发生的事件类型的,比如是 FD_READ 事件它就为 FD_READ。若是有错误,iErrorCode 中就会记录,若是 FD_READ, 则 iErrorCode[FD_READ_BIT] 中就为非 0,应该加以判断。

现在,创建一个 CEventSelect 类来实现上述知识。

class CEventSelect
{
public:
    CEventSelect(int port);
    ~CEventSelect();
    void Accept();

private:
    void InitSock();  // 初始化相关操作
    void RequestHandler();  // 处理请求
    void CompressSocksAndEvents(int index);  // 有客户端退出时,同步删除套接字和对应的事件

private:
    WSADATA m_wsaData;
    SOCKET m_servSock, m_clntSock;
    SOCKADDR_IN m_servAddr, m_clntAddr;
    SOCKET m_hSockAddr[WSA_MAXIMUM_WAIT_EVENTS];  // 套接字数组
    WSAEVENT m_hEventAddr[WSA_MAXIMUM_WAIT_EVENTS];  // 事件数组
    int m_nPort;  // 端口
    int m_nNumOfSock;  // 记录当前的套接字数量
};

同样先从 InitSock 中来看。

void CEventSelect::InitSock()
{
    // 初始化套按字库
    int ret = WSAStartup(MAKEWORD(2, 2), &m_wsaData);
    assert(ret == 0);

    // 创建服务端套接字
    m_servSock = socket(PF_INET, SOCK_STREAM, 0);
    memset(&m_servAddr, 0, sizeof(m_servAddr));
    m_servAddr.sin_family = AF_INET;
    m_servAddr.sin_addr.s_addr = htonl(ADDR_ANY);
    m_servAddr.sin_port = htons(m_nPort);

    // 绑定地址
    ret = bind(m_servSock, (SOCKADDR *)&m_servAddr, sizeof(m_servAddr));
    assert(ret != SOCKET_ERROR);

    //监听
    ret = listen(m_servSock, 5);
    assert(ret != SOCKET_ERROR);

    // 创建一个事件
    WSAEVENT newEvent = WSACreateEvent();

    //将服务端套接字与新建的事件绑定起来,关注FD_ACCEPT和FD_CLOSE消息
    ret = WSAEventSelect(m_servSock, newEvent, FD_ACCEPT | FD_CLOSE);
    assert(ret != SOCKET_ERROR);

    //将套接字与事件同时保存入数组
    m_hSockAddr[m_nNumOfSock] = m_servSock;
    m_hEventAddr[m_nNumOfSock] = newEvent;
    m_nNumOfSock++;  //套接字计数++
}

主要的就是 WSAEventSelect 函数,这里我们创建了一个事件来和服务端的套接字绑定起来,并监听 FD_ACCEPTFD_CLOSE 消息,这样就把本来占用程序时间片的 accept 交给了 OS,由它去处理,有了消息再通知我们。此处就解决了等待数据到来的这部分阻塞,这也正是事件选择模型的改进效果。

我们把每一个套接字和与之相对应的事件同步保存到数组中,这步操作很重要,因为套接字要和事件一一对应,得能通过套接字找到事件,也能通过事件找到套接字。

同样在构造函数中去调用:

CEventSelect::CEventSelect(int port)
    : m_nPort(port),
      m_nNumOfSock(0)
{
    InitSock();
}

现在的工作只剩下处理 OS 传递的通知,这也是关键所在。

void CEventSelect::RequestHandler()
{
    int posInfo, startIndex;
    while (true)
    {
        // 1.验证是否发生了事件
        posInfo = WSAWaitForMultipleEvents(m_nNumOfSock, m_hEventAddr, FALSE, WSA_INFINITE, FALSE);
        startIndex = posInfo - WSA_WAIT_EVENT_0;  // 得到最小索引

        for (int i = startIndex; i < m_nNumOfSock; ++i)
        {
            int sigEventIndex = WSAWaitForMultipleEvents(1, &m_hEventAddr[i], TRUE, 0, FALSE);
            if (sigEventIndex == WSA_WAIT_FAILED || sigEventIndex == WSA_WAIT_TIMEOUT)
            {
                continue;
            }
            else
            {
                // 2.区分事件类型
                WSANETWORKEVENTS netEvents;
                WSAEnumNetworkEvents(m_hSockAddr[i], m_hEventAddr[i], &netEvents);
                if (netEvents.lNetworkEvents & FD_ACCEPT)  // 请求连接时
                {
                    if (netEvents.iErrorCode[FD_ACCEPT_BIT] != 0)  // Error
                        break;

                    int clntAddrSize = sizeof(m_clntAddr);
                    m_clntSock = accept(m_servSock, (SOCKADDR *)&m_clntAddr, &clntAddrSize);

                    WSAEVENT newEvent = WSACreateEvent();
                    WSAEventSelect(m_clntSock, newEvent, FD_READ | FD_CLOSE);
                    m_hSockAddr[m_nNumOfSock] = m_clntSock;
                    m_hEventAddr[m_nNumOfSock] = newEvent;
                    m_nNumOfSock++;
                    printf("connected new client:%d \n", m_clntSock);
                }

                if (netEvents.lNetworkEvents & FD_READ)  // 接收数据时
                {
                    if (netEvents.iErrorCode[FD_READ_BIT] != 0)
                        break;

                    char buf[BUF_SIZE] = "";
                    int recvLen = recv(m_hSockAddr[i], buf, sizeof(buf), 0);
                    send(m_hSockAddr[i], buf, recvLen, 0);
                }

                if (netEvents.lNetworkEvents & FD_CLOSE)  // 断开连接时
                {
                    if (netEvents.iErrorCode[FD_CLOSE_BIT] != 0)
                        break;

                    WSACloseEvent(m_hEventAddr[i]);
                    closesocket(m_hSockAddr[i]);
                    m_nNumOfSock--;
                    printf("closed client-%d...\n", m_hSockAddr[i]);

                    CompressSocksAndEvents(i);
                }
            }
        }
    }
}

先来看第一部分验证是否发生了事件,这主要在前 16 行。

int posInfo, startIndex;
    while (true)
    {
        //验证是否发生了事件
        posInfo = WSAWaitForMultipleEvents(m_nNumOfSock, m_hEventAddr, FALSE, WSA_INFINITE, FALSE);
        startIndex = posInfo - WSA_WAIT_EVENT_0;  //得到最小索引

        for (int i = startIndex; i < m_nNumOfSock; ++i)
        {
            int sigEventIndex = WSAWaitForMultipleEvents(1, &m_hEventAddr[i], TRUE, 0, FALSE);
            if (sigEventIndex == WSA_WAIT_FAILED || sigEventIndex == WSA_WAIT_TIMEOUT)
            {
                continue;
            }
// ...

通过调用 WSAWaitForMultipleEvents 函数来等待系统通知我们,第一个参数是 m_nNumOfSock,基中就保存着当前拥有的套接字数量,第二个参数是事件数组,这个的个数和 m_nNumOfSock 是同步的,所以也就是一样的。

此处无需等待所有的事件,只要有一个有消息了我们就返回,所以第三个参数传入 FALSE。然后传入 WSA_INFINITE 来一直等待系统的通知。

有通知了后减去 WSA_WAIT_EVENT_0 便可得到对应的最小索引,我们再依次对最小索引后面的事件依次调用 WSAWaitForMultipleEvents 就能得到所有发生消息的事件,这里我们只监听一个事件,所以第一个参数只需传入 1 便可。接着就是一些错误检验。

现在来看第二部分,得到了发生的事件索引,现在就得区分事件类型:

WSANETWORKEVENTS netEvents;
WSAEnumNetworkEvents(m_hSockAddr[i], m_hEventAddr[i], &netEvents);

只需给它个 netEvents 用来保存类型或错误,很简单。

当有客户端请求连接时,我们处理 FD_ACCEPT 消息:

if (netEvents.lNetworkEvents & FD_ACCEPT)  // 请求连接时
{
    if (netEvents.iErrorCode[FD_ACCEPT_BIT] != 0)  //Error
        break;

    int clntAddrSize = sizeof(m_clntAddr);
    m_clntSock = accept(m_servSock, (SOCKADDR *)&m_clntAddr, &clntAddrSize);

    WSAEVENT newEvent = WSACreateEvent();
    WSAEventSelect(m_clntSock, newEvent, FD_READ | FD_CLOSE);
    m_hSockAddr[m_nNumOfSock] = m_clntSock;
    m_hEventAddr[m_nNumOfSock] = newEvent;
    m_nNumOfSock++;
    printf("connected new client:%d \n", m_clntSock);
}

只需和对应的消息进行与操作就能检验是否发生了该消息,接着再这里我们又看到了 accept 函数,因为现在肯定已经有消息了,所以 accept 就能立即处理,而不用一直傻傻地等待。

然后为每位连接进来的客户端也绑定上事件,关注 FD_READFD_CLOSE 消息,这样客户端发消息过来或退出我们就能知道了。

接收数据没什么特别的就不说了,现在来看退出消息:

if (netEvents.lNetworkEvents & FD_CLOSE)  // 断开连接时
{
    if (netEvents.iErrorCode[FD_CLOSE_BIT] != 0)
        break;

    WSACloseEvent(m_hEventAddr[i]);
    closesocket(m_hSockAddr[i]);
    m_nNumOfSock--;
    printf("closed client-%d...\n", m_hSockAddr[i]);

    CompressSocksAndEvents(i);
}

在关闭了对应的事件和套接字后,要对计数进行更新,也要在数组中删除对应的事件和套接字,这个主要是在 CompressSocksAndEvents 函数中完成的:

void CEventSelect::CompressSocksAndEvents(int index)
{
    for (int i = index; i < m_nNumOfSock; ++i)
    {
        m_hSockAddr[i] = m_hSockAddr[i + 1];
        m_hEventAddr[i] = m_hEventAddr[i + 1];
    }
}

这里同步更新数组,这小算法没啥说的,现在就使用事件选择写好了数据库,我们在 Accept 函数中调用 RequestHanlder 以供用户使用就好。

可以看出事件选择主要是对套接字绑定一个事件,然后提供感兴趣的消息,交给 OS 去处理,只需经过一个短暂的阻塞(将数据从网卡缓冲区拷贝到程序的内存),就可以完成接收过程了。

这个模型稍微扩展便可以监听几百个事件,而且使用简单,对于这个规模的服务器可以使用其来开发。

但扩展太多管理可能有点麻烦,且还是存在第二部分阻塞的,所以想要支持更多的客户端还是得使用重叠 IO 和完成端口,这些下次来看。

Leave a Reply

Your email address will not be published. Required fields are marked *

You can use the Markdown in the comment form.