在上篇介绍了重叠 IO 的基本知识并使用事件的方式实现了第一个版本,但大家知道使用事件的缺点,因为 WaitForSingleObject 函数最多只能等待 64 个事件,所以要想处理更多的客户端得通过多个工作者线程来同时监视 Event 对象,这样便使处理更加麻烦,而且和事件选择其实一样依旧存在着部分阻塞。

所以今天来看第二种实现重叠 IO 的方法,即使用 Completion Routine。接着上篇,我们说这种方法是跟 WSASendWSARecv 函数的最后一个参数有关的,所以再来看看其原型:

int WSARecv (
  SOCKET s,  // 套接字句柄
  LPWSABUF lpBuffers,  // 指向待接收数据缓冲区
  DWORD dwBufferCount,  // lpBuffers数组的长度
  LPDWORD lpNumberOfBytesRecvd,  // 保存实际接收的字节数
  LPDWORD lpFlags,  // 数据传输标志
  LPWSAOVERLAPPED lpOverlapped,  // 指向重叠结构
  LPWSAOVERLAPPED_COMPLETION_ROUTINE lpCompletionROUTINE  // 指向Completion Routine函数
);

WSASendWSARecv 同理,只看一个便好。可以看到,最后一个参数是一个函数指针,这个指向的函数原型必须是这样的:

void CALLBACK CompletionRoutine(
    DWORD dwError,  // 错误信息
    DWORD dwTransferred,  // 实际收发的字节数
    LPWSAOVERLAPPED lpOverlapped,  // 就是WSARecv和WSASend中的lpOverlapped
    DWORD dwFlags  // 标志
);

WSASendWSARecv 函数会保存最后一个参数中我们传入的函数,当系统处理完后再自动调用这个函数,那么系统是如何得知这些参数的呢?其实就是在 WSARecv 函数的参数中指定的,想想上篇说的 WSAOVERLAPPED 结构体中的 InternalInternalHigh 就分别保存着错误码和实际收发的字节数。

当有接收到用户消息后,系统就通过这些参数来调用我们传进去的函数指针,我们在自己创建的函数中作处理便可以了。这就把如何确认发生了消息这部分变得非常简单了,而且这部分的阻塞也不存在了,这才真正实现了非阻塞处理。想想我们前面学习的各种方法都得去开个循环确认是否有消息要处理,处理不便且有阻塞,你就会发现这种方式真的是方便多了。

所以这种方法的基本操作都和第一种方法是一样的,主要是在这最后一个函数指针上。还是直接通过代码来说吧,毕竟千言不及一码清晰,首先还是来先定义一个结构体供我们使用:

typedef struct {
    SOCKET hClntSock;
    char buf[BUF_SIZE];
    WSABUF wsaBuf;
}PER_IO_DATA, *LPPER_IO_DATA;

这就是我们在这里用的小纸条了,其中包含着客户端套接字和接收数据的缓冲区,我们为其多加一个指针别名方便使用。

接着来看类的定义:

class COverlappedServer
{
public:
    COverlappedServer(int port);
    ~COverlappedServer();
    void Accept();

private:
    void InitSock();
    void RequestHandler();
    static void CALLBACK ReadCmplRoutine(DWORD, DWORD, LPWSAOVERLAPPED, DWORD);
    static void CALLBACK WriteCmplRoutine(DWORD, DWORD, LPWSAOVERLAPPED, DWORD);

private:
    SOCKET m_hListenSock;  // 监听套接字
    SOCKADDR_IN m_listenAddr;  // 监听套接字地址
    LPWSAOVERLAPPED m_lpOverlapped;  // 重叠结构
    LPPER_IO_DATA m_hIoInfo;  // 单IO数据
    int m_nPort;  // 端口
};

在这里我们声明了两个 Completion Routine 函数,ReadCmplRoutine 函数用于在接收数据后调用,WriteCmplRoutine 函数用于在发送数据后调用。CALLBACK 其实就是 stdcall 调用约定,函数调用约定的目的是指定栈平衡的方式,关于这个若是讲逆向时我会专门总结一篇文章的。

大家可能发现这里还有一个 WSAOVERLAPPED 指针,不是使用第二种方法了吗?怎么还需要这个参数。其实在这里它的作用不再是用于事件了,稍后你将看到它起的作用。

InitSock 函数和第一种方法的操作是一样的,看看就好,若有哪里不懂,可回上篇参考。

void COverlappedServer::InitSock()
{
    WSADATA wsaData;
    int ret = WSAStartup(MAKEWORD(2, 2), &wsaData);
    assert(ret == 0);

    // 创建重叠IO套接字
    m_hListenSock = WSASocketW(PF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);

    // 更改socket IO选项为非0,非0即为非阻塞套接字
    ULONG ulMode = 1;
    ioctlsocket(m_hListenSock, FIONBIO, &ulMode);

    memset(&m_listenAddr, 0, sizeof(m_listenAddr));
    m_listenAddr.sin_family = AF_INET;
    m_listenAddr.sin_addr.s_addr = htonl(INADDR_ANY);
    m_listenAddr.sin_port = htons(m_nPort);

    ret = bind(m_hListenSock, (SOCKADDR *)&m_listenAddr, sizeof(m_listenAddr));
    assert(ret != SOCKET_ERROR);

    ret = listen(m_hListenSock, 5);
    assert(ret != SOCKET_ERROR);
}

现在来看最主要的部分,RequestHandler 函数:

void COverlappedServer::RequestHandler()
{
    SOCKET hClntSock;
    SOCKADDR_IN clntAddr;
    int addrLen = sizeof(clntAddr);
    while (true)
    {
        SleepEx(100, TRUE);  // 设为alertable wait以调用例行程序

        hClntSock = accept(m_hListenSock, (SOCKADDR *)&clntAddr, &addrLen);
        if (hClntSock == INVALID_SOCKET)
        {
            if (WSAGetLastError() == WSAEWOULDBLOCK)
                continue;
            else
            {
                std::cerr << "accept() error!" << std::endl;
                break;
            }
        }

        std::cout << "connected client...." << hClntSock <<  std::endl;

        m_lpOverlapped = new WSAOVERLAPPED;
        ZeroMemory(m_lpOverlapped, sizeof(m_lpOverlapped));

        m_hIoInfo = new PER_IO_DATA;
        m_hIoInfo->hClntSock = hClntSock;
        m_hIoInfo->wsaBuf.buf = m_hIoInfo->buf;
        m_hIoInfo->wsaBuf.len = BUF_SIZE;

        m_lpOverlapped->hEvent = m_hIoInfo;
        DWORD recvBytes = 0;
        DWORD flags = 0;
        WSARecv(hClntSock, &m_hIoInfo->wsaBuf, 1,
            &recvBytes, &flags, m_lpOverlapped, ReadCmplRoutine);
    }
}

前面部分便不再说了,主要来说下部分,在 24 行创建了一个重叠结构,并用 ZeroMemory 函数初始化为 0。在 27 行,我们创建了“小纸条”,对于每位连接进来的用户我们都得为其分配一个,所以我们用堆分配。在这里,保存着客户端套接字和接收数据的缓冲区,别忘了,系统会在这里填写需要的数据。

那么我们应该把“小纸条”保存到那里呢?这东西是要在 Completion Routine 函数中使用的,可再在上面观察其原型,发现只有一个 lpOverlapped 成员没有用到,事实上,当使用 Completion Routine 时根本就不会用到 OVERLAPPED 中的 hEvent 成员,所以我们便能将其利用起来传递我们的“小纸条”。所以有了 32 行的代码,事件本就是一个指针,所以当然可以指向我们的结构体了。

最后要说说系统是如何调用这个回调函数(ReadCmplRoutine)的,当程序运行时,系统会创建一个线程,同时会创建一个和该线程相关联的队列,这个队列称为 APC 队列。当 IO 请求完成时,系统会将 IO 完成通知添加到 APC 队列中,以执行处理,不过系统会以任意的顺序来处理队列中的 IO 请求。所以在回调函数调用前线程中的其它正在处理的东西必须处理完,然后系统才能检查 APC 队列,对队列中的每一项调用回调函数,并传入错误码,实际传输的字节数,和 OVERLAPPED 结构体的地址。

也就是说要给线程标记一个时间点,在这个点上线程的其它东西都处理完了,然后系统再检查APC队列中是否有待处理项。线程可以把自己设为 alertable wait 状态来设置这个点,这样系统就知道何时需要检查线程的 APC 队列了,然后便可为其中的每一项调用回调函数。

我们在第 8 行就通过 SleepEx 函数将线程设为了 alertable wait 状态,我们熟悉的具有此功能的函数有:

DWORD SleepEx(
  DWORD dwMilliseconds,  // time-out interval in milliseconds
  BOOL bAlertable        // early completion flag
);

DWORD WaitForSingleObjectEx(
  HANDLE hHandle,        // handle to object to wait for
  DWORD dwMilliseconds,  // time-out interval, in milliseconds
  BOOL bAlertable        // return to execute I/O completion routine if TRUE
);

DWORD WaitForMultipleObjectsEx(
  DWORD nCount,             // number of handles in handle array
  CONST HANDLE *lpHandles,  // points to the object-handle array
  BOOL fWaitAll,            // wait flag
  DWORD dwMilliseconds,     // time-out interval in milliseconds
  BOOL bAlertable           // alertable wait flag
);

DWORD WSAWaitForMultipleEvents(
  DWORD cEvents,
  const WSAEVENT FAR *lphEvents,
  BOOL fWaitAll,
  DWORD dwTimeOUT,
  BOOL bAlertable
);

可以发现这些函数皆有一个 bAlertable成员,将其设为 true 就可设为 alertable wait 状态,我们还见过这些函数的非Ex版本,比如 Sleep 函数,其实在这些函数的内部就调用了Ex版本并将 bAlertable 成员设为 false

最后,我们来看在 ReadCmplRoutieWriteCmplRoutie 回调函数中如何处理,先来看 ReadCmplRoutie 函数:

void COverlappedServer::ReadCmplRoutine(DWORD dwError,
    DWORD dwRecvBytes, LPWSAOVERLAPPED lpOverlapped, DWORD dwFlags)
{
    LPPER_IO_DATA hIoInfo = static_cast<LPPER_IO_DATA>(lpOverlapped->hEvent);
    if (dwRecvBytes == 0)
    {
        std::cout << "disconnected client " << hIoInfo->hClntSock << std::endl;

        closesocket(hIoInfo->hClntSock);
        delete lpOverlapped;
        delete hIoInfo;
    }
    else
    {
        std::cout << "received: " << hIoInfo->buf << std::endl;

        hIoInfo->wsaBuf.len = dwRecvBytes;
        DWORD sendBytes;
        WSASend(hIoInfo->hClntSock, &hIoInfo->wsaBuf,
            1, &sendBytes, 0, lpOverlapped, WriteCmplRoutine);
    }
}

可以看到,在第一行取到了传入的“小纸条”,也就是说我们获得了客户端的套接字和接收数据的缓冲区,很棒。

接着判断是否是退出消息,接收的字节长度为0便为退出消息,这些系统在调用回调函数时已经为我们填好了,所以直接判断 dwRecvBytes 就可以了。在这里,关闭断开连接的套接字句柄,并释放为其分配的内存,我们要接受上千个客户端,若是只分配不释放电脑会崩的,而且崩的很快。

接着,依旧将客户端发过来的数据再转发回去,这里使用的是 Completion Routine,所以必须得使用 WSASend 函数。

现在在 WriteCmplRoutine 中需要做的处理就非常少了:

void COverlappedServer::WriteCmplRoutine(DWORD dwError,
    DWORD dwRecvBytes, LPWSAOVERLAPPED lpOverlapped, DWORD dwFlags)
{
    LPPER_IO_DATA hIoInfo = static_cast<LPPER_IO_DATA>(lpOverlapped->hEvent);
    DWORD recvBytes = 0;
    DWORD flags = 0;
    WSARecv(hIoInfo->hClntSock, &hIoInfo->wsaBuf,
        1, &recvBytes, &flags, lpOverlapped, ReadCmplRoutine);
}

在这里,依旧拿到我们的小纸条,继续调用 WSARecv 函数来接受下一个数据请求。

这次使用重叠IO实现的版本是目前为止我们实现过的最高效的,四五千个用户应该是没有问题的,我开了四千个客户端测试,未崩。

Output

最后不得不说说使用 alertable wait 这种方式的缺点。首先是上下文环境的问题,因为这个函数是 static 的,所以不能访问类中成员,在这里我们巧妙地通过借用 OVERLAPPEDhEvent 指向“小纸条”解决了这个问题,要不然就只能全放在全局作用域,这里算是解决了这个问题。

主要问题是在一个线程中既要发出 IO 请求,又要对完成通知进行处理。若是 CPU 有多个核,这就会使其它线程即使空闲也不会对完成通知做出响应,就造成了忙的忙死了,闲的闲死了,所以它并没能充分利用起 CPU,这样的程序伸缩性并不太好。所以微软又开发了 IOCP 模型来解决了这个问题,下篇便来讲解。

Leave a Reply

Your email address will not be published. Required fields are marked *

You can use the Markdown in the comment form.