前面已经说过 select 和 Event selct 模型,这两个还是比较小型的,今天来说重叠 IO,这个可以支持上千个用户,当然理解起来也越来越难了点。

重叠 IO 模型是典型的非阻塞模型,接收数据和拷贝数据这两部分全部占用系统时间片,实现了效率最大化,只要将一个结构体投给系统它便会替我们完成耗时的处理。

首先来看如何创建一个支持重叠 IO 的套接字,使用的是 WSASocket 函数,原型如下:

SOCKET WSASocket (
  int af,  // 地址族
  int type,  // 传输方式
  int protocol,  // 通信协议
  LPWSAPROTOCOL_INFO lpProtocolInfo,
  GROUP g,
  DWORD dwFlags
);

前3个参数就是以前使用的 socket 函数的参数,主要来看后面的几个参数。lpProtocolInfo 可以传递一个 WSAPROTOCOL_INFO 结构体的地址,其中包含创建套接字的信息,不使用时传入 NULLg 是作为扩展函数而预约的参数,也为 NULL 便好;dwFlags 表示套接字的标志,要使用重叠 IO,得设为 WSA_FLAG_OVERLAPPED

所以要创建一个重叠 IO 套接字,可以这样写:

WSASocket(PF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);

对于 sendrecv 也有对应的函数 WSASend和WSARecv,先来看 WSASend 函数:

1int WSASend (
2  SOCKET s,  // 套接字句柄
3  LPWSABUF lpBuffers,  // 待传输数据的缓冲区
4  DWORD dwBufferCount,  // lpBuffers数组的长度
5  LPDWORD lpNumberOfBytesSent,  // 保存实际发送的字节数
6  DWORD dwFlags,  // 设置数据传输特性
7  LPWSAOVERLAPPED lpOverlapped,  // 指向一个WSAOVERLAPPED结构体,这个可用事件确认数据是否传输完毕
8  LPWSAOVERLAPPED_COMPLETION_ROUTINE lpCompletionROUTINE  // 是一个函数指针,在传输完成的时候可以自动调用指向的函数
9);

大多参数都不难理解,主要是多了几个新的结构体,我们先来看 WSABUF 结构体:

typedef struct __WSABUF {
    u_longlen;     // buffer length
    char FAR *buf; // pointer to buffer
} WSABUF, FAR * LPWSABUF;

其中包含了待传输数据的大小和其缓冲区地址。

再来看最后两个参数,所谓异步就是把本来需要使用程序时间片的运算交给系统去处理,这样耗费的就是系统的时间片,这样自己的程序就不会阻塞执行了。而系统把这些耗时操作处理完了还是得通知我们来做后续处理,这就得有确认数据是否处理完毕的方法了。重叠 IO 有两种方法可以确认数据是否传输完毕,而这两种方法就关系到这后两个参数。

第一种方法是使用内核对象的事件来确认,关于这个大家应该已经很熟悉了,从线程同步时我们就接触了这个,然后又在事件选择模型中主要使用这个来确认是否有消息要进行处理,此时重叠 IO 又可以使用它来确认数据是否处理完毕。现在来看看 WSAOVERLAPPED 结构体的原型:

typedef struct _WSAOVERLAPPED {
    DWORD        Internal;
    DWORD        InternalHigh;
    DWORD        Offset;
    DWORD        OffsetHigh;
    WSAEVENT     hEvent;
} WSAOVERLAPPED, FAR * LPWSAOVERLAPPED;

InternalInternalHigh 这两个参数原本是供系统内部使用的,后来开放了,Internal 中保存着错误码,InternalHigh 中保存着传输的字节数。其实在这两个参数未开放时,Windows 专门提供了一个 GetOverlappedResult 函数来获取这两个信息,很多书中也使用的这个,其实直接使用这两个字段获取就好了。

OffsetOffsetHigh 构成了一个 64 位的偏移量,表示读取的偏移地址。hEvent 就是事件,可以用上篇说的 WSACreateEvent 函数来创建手动重置的事件。

在这里我们主要是使用 hEvent 这个参数,为了理解其它各个参数的意义,这里便说说文件中的重叠 IO。

重叠 IO 不仅在套接字中会使用,其它设备中也有用到,套接字也是一种文件罢了,在我们使用 ReadFile 函数读取文件的时候,要是文件太大了也会阻塞在那里,要是观察其原型我们会发现它也提供了一个重叠 IO 参数。

BOOL ReadFile(
  HANDLE hFile,                // handle of file to read
  LPVOID lpBuffer,             // pointer to buffer that receives data
  DWORD nNumberOfBytesToRead,  // number of bytes to read
  LPDWORD lpNumberOfBytesRead, // pointer to number of bytes read
  LPOVERLAPPED lpOverlapped    // pointer to structure for data
);

可以发现最后一个参数指向一个 OVERLAPPED,当指定了重叠 IO 后,读取大文件时 ReadFile 就会把请求提交给操作系统,由操作系统来处理,读取完了再通知,我们要做的就是获取这个通知。下面来看使用重叠 IO 读取文件的一个小例子:

// 保存读取字节的缓冲区
std::shared_ptr<byte> buf(new byte[1000], std::default_delete<byte[]>());

// 以重叠IO方式打开文件,注意倒数第2个参数设置了重叠IO标志
HANDLE hFile = CreateFileW(L"movie.avi", GENERIC_ALL,
    FILE_SHARE_READ | FILE_SHARE_WRITE,
    0, OPEN_EXISTING, FILE_FLAG_OVERLAPPED, 0);

// 获取文件大小
DWORD dwSize = GetFileSize(hFile, 0);

DWORD dwRead;
OVERLAPPED ovp = { 0 };
ovp.Offset = 5;  // 从偏移5的位置开始读
ovp.OffsetHigh = 0;

// 读取文件,因为是异步的IO,只是提交给操作系统了,并没有读取就返回了
// 真正的读取操作由系统完成
int ret = ReadFile(hFile, buf.get(), 1000, &dwRead, &ovp);
if (ret)
{
    std::cout << buf.get() << std::endl;
}
else
{
    // 最后一次错误是ERROR_IO_PENDING时,代表系统还未读完
    if (GetLastError() == ERROR_IO_PENDING)
    {
        // 等待读取完毕
        WaitForSingleObject(hFile, INFINITE);
        // 输出错误码,读取的字节数
        std::cout << ovp.Internal << " " << ovp.InternalHigh << std::endl;
        // 输出读取的字节
        std::cout << buf.get() << std::endl;
    }
    else  // 其它错误则读取出错
    {
        DWORD dwError = GetLastError();
        std::cerr << "read failed! Error:" << dwError << std::endl;
    }
}

CloseHandle(hFile);  // 关闭文件句柄

这里主要是说明 OVERLAPPED 结构体的使用方法,真正读取大文件时我们可以开几个线程分别从不同的偏移地址同时读取。

从注释已经可以很清晰地理解 OVERLAPPED 了,这个东西其实就像是一个小纸条,我们在上面写下读取位置,交给系统,系统根据我们给的信息开始读取,再在上面记录下一些信息给我们。

现在回到之前,来说第二种方法,WSASend 函数的最后一个参数是一个函数指针,我们需要声明一个 Completion Routine 函数,当数据操作完成时它会自动调用这个函数。这个函数的原型必须是这样的:

void CALLBACK CompletionRoutine(
    DWORD dwError,  // 错误信息
    DWORD cbTransferred,  // 实际收发的字节数
    LPWSAOVERLAPPED lpOverlapped,  // 就是WSASend中的lpOverlapped
    DWORD dwFlags  // 标志
);

关于更多信息我们在实例中讲解,现在来看接收函数 WSARecv,原型如下:

int WSARecv (
  SOCKET s,
  LPWSABUF lpBuffers,
  DWORD dwBufferCount,
  LPDWORD lpNumberOfBytesRecvd,
  LPDWORD lpFlags,
  LPWSAOVERLAPPED lpOverlapped,
  LPWSAOVERLAPPED_COMPLETION_ROUTINE lpCompletionROUTINE
);

因为已经详细说过 WSASend 函数了,所以这个函数已经没什么好说的了。

这些基本知识就讲完了,其实第一种方法和事件选择模型有相似之处,而且也有着一样的缺点,我们主要看的是第二种方法,但为了连贯完整性,我还是用第一种方法写了一份完整的例子。

首先先来定义几个结构体方便我们使用:

typedef struct {
    SOCKET hSockAddr[NUM_CLIENT];
    WSAEVENT hEventAddr[NUM_CLIENT];
}PER_HANDLE_DATA;

typedef struct {
    OVERLAPPED overlapped;
    WSABUF wsaBuf;
    char buf[BUF_SIZE];
}PER_IO_DATA;

PER_HANDLE_DATA 结构体用来保存连接进来的客户端和与其绑定的事件,在上篇我们说过等待事件的函数最多只能监视 64 个事件,所以这个 NUM_CLIENT 其实是 WSA_MAXIMUM_WAIT_EVENTS 的一个宏定义:

#define NUM_CLIENT WSA_MAXIMUM_WAIT_EVENTS

要想监视更多事件还是得创建线程来多次调用。

PER_IO_DATA 结构体用于保存单IO数据,就是前面说的小纸条,在这个小纸条上我们保存的有重叠结构,缓冲区和接收数据的缓冲区。 BUF_SIZE 是缓冲区长度,这个我定义为 1024。

接下来来看使用事件的重叠IO服务器的定义:

class COvpEventServer
{
public:
    COvpEventServer(int port);
    ~COvpEventServer();
    void Accpet();

private:
    void InitSock();
    void AcceptHandler();  // 处理接收请求
    void RequestHandler();  // 处理其它请求
    void Cleanup(int index);  // 清除退出的客户端

private:
    SOCKET m_hListenSock;  // 服务器监听套接字
    SOCKADDR_IN m_listenAddr;  // 监听套接字地址
    std::shared_ptr<PER_IO_DATA> m_hIoInfo[NUM_CLIENT];
    PER_HANDLE_DATA m_hbInfo;  // 保存连接的用户和关系的事件
    int m_nPort;  // 端口号
    int m_nNumOfClient;  // 记录连接的用户数 
};

基本注释已经全部给出,主要来说这个 m_hIoInfo,因为每一个连接进来的用户都得分配一个重叠结构和缓冲区以接收系统处理后的结果,所以同样声明为数组。这里使用了智能指针 shared_ptr 来管理内存。

现在来依次看这些函数的实现,先来看 InitSock 函数,这里主要是初始化套接字相关操作:

void COvpEventServer::InitSock()
{
    WSADATA wsaData;
    int ret = WSAStartup(MAKEWORD(2, 2), &wsaData);
    assert(ret == 0);

    // 创建支持重叠IO的套接字
    m_hListenSock = WSASocketW(PF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);

    // 更改socket IO选项为非0,非0即为非阻塞套接字
    ULONG ulMode = 1;
    ioctlsocket(m_hListenSock, FIONBIO, &ulMode);

    memset(&m_listenAddr, 0, sizeof(m_listenAddr));
    m_listenAddr.sin_family = AF_INET;
    m_listenAddr.sin_addr.s_addr = htonl(INADDR_ANY);
    m_listenAddr.sin_port = htons(m_nPort);

    ret = bind(m_hListenSock, (SOCKADDR *)&m_listenAddr, sizeof(m_listenAddr));
    assert(ret != SOCKET_ERROR);

    ret = listen(m_hListenSock, 5);
    assert(ret != SOCKET_ERROR);

    std::thread t(&COvpEventServer::RequestHandler, this);
    t.detach();
}

这里我们使用 WSASocketW 函数创建了一个支持重叠 IO 的套接字,接着把套接字设为非阻塞的,这是使用 ioctsocket 函数来完成的,这个函数的原型如下:

int ioctlsocket (
  SOCKET s,
  long cmd,
  u_long FAR* argp
);

将第二个参数设为 FIONBIO 后,第三个参数只要是非 0 就会把指定的套接字设为非阻塞,设为非阻塞的套接字在调用 accept 这些函数时就不会阻塞了,会直接返回。

最后,使用 thread 开了一个线程,这个线程用于处理其它请求的,稍后再来看。

接着来看处理连接请求的 AcceptHandler 函数:

void COvpEventServer::AcceptHandler()
{
    SOCKET hClntSock;
    SOCKADDR_IN clntAddr;
    int addrLen = sizeof(clntAddr);
    while (TRUE)
    {
        // 接受请求,因为已经设为非阻塞套接字,所以不会阻塞
        hClntSock = accept(m_hListenSock, (SOCKADDR *)&clntAddr, &addrLen);
        if (hClntSock == INVALID_SOCKET)
        {
            // 若无连接请求,最后一次错误会设为WSAEWOULDBLOCK
            if (WSAGetLastError() == WSAEWOULDBLOCK)
                continue;
            else
            {
                std::cerr << "accept error!" << std::endl;
                break;
            }
        }
        std::cout << "connected client..." << std::endl;

        // 创建手动重置的事件
        WSAEVENT newEvent = WSACreateEvent();

        // 分配内存,设置小纸条
        m_hIoInfo[m_nNumOfClient] = std::make_shared<PER_IO_DATA>();
        memset(&m_hIoInfo[m_nNumOfClient]->overlapped, 0, sizeof(WSAOVERLAPPED));
        m_hIoInfo[m_nNumOfClient]->overlapped.hEvent = newEvent;
        m_hIoInfo[m_nNumOfClient]->wsaBuf.buf = m_hIoInfo[m_nNumOfClient]->buf;
        m_hIoInfo[m_nNumOfClient]->wsaBuf.len = BUF_SIZE;

        // 同步更新客户套接字和关联的事件
        m_hbInfo.hSockAddr[m_nNumOfClient] = hClntSock;
        m_hbInfo.hEventAddr[m_nNumOfClient] = newEvent;

        DWORD recvBytes = 0;
        DWORD flags = 0;
        // 接收数据
        WSARecv(hClntSock, &m_hIoInfo[m_nNumOfClient]->wsaBuf, 1, &recvBytes, &flags, &m_hIoInfo[m_nNumOfClient]->overlapped, NULL);

        m_nNumOfClient++;
    }
}

在这里主要处理的是连接请求,因为套接字已经是非阻塞的了,所以 accept 不会阻塞,但需要加个判断。

接着设置重叠结构和缓冲区,就是所谓的填写小纸条,设置分配的事件和接收数据的缓冲区。这个小纸条是传给 WSARecv 函数的,因为重叠结构是和事件绑定上的,所以我们便可坐享其成,等到系统为我们处理完成以后,它会设置这个事件状态,我们只要监视这个事件状态就可以了。

其它细节便不多说了,除了几个重叠 IO 相关的函数,事件这些我们在事件选择模型里面已经详细说过了,若是忘了可再次参考那篇文章。

现在来看处理其它请求的 RequestHandler 函数:

void COvpEventServer::RequestHandler()
{
    int posInfo, index;
    while (true)
    {
        // 检测发生事件的索引
        posInfo = WSAWaitForMultipleEvents(m_nNumOfClient, m_hbInfo.hEventAddr, FALSE, 1000, FALSE);
        if (posInfo == WSA_WAIT_FAILED || posInfo == WSA_WAIT_TIMEOUT)
            continue;

        // 得到索引
        index = posInfo - WSA_WAIT_EVENT_0;

        // 手动重置事件
        WSAResetEvent(m_hbInfo.hEventAddr[index]);

        // 获取实际接收的字节数
        DWORD dwRecvBytes = m_hIoInfo[index]->overlapped.InternalHigh;
        if (dwRecvBytes == 0)  // 字节数为0则表示客户端退出
        {
            std::cout << "disconnected..." << m_hbInfo.hSockAddr[index] << std::endl;
            Cleanup(index);
        }
        else
        {
            // 输出接收的消息
            std::cout << m_hIoInfo[index]->buf << std::endl;
            send(m_hbInfo.hSockAddr[index], m_hIoInfo[index]->buf, dwRecvBytes, 0);

            // 再次接收其它消息
            DWORD recvBytes = 0;
            DWORD flags = 0;
            WSARecv(m_hbInfo.hSockAddr[index], &m_hIoInfo[index]->wsaBuf, 1,
                &recvBytes, &flags, &m_hIoInfo[index]->overlapped, NULL);
        }
    }
}

基本框架和上篇差不多,但我感觉这里其实要比事件选择模型简单清晰点的。

还是通过 WSAWaitForMultipleEvents 函数检测发生消息的事件位置,然后减去 WSA_WAIT_EVENT_0 得到最小索引。
得到索引就等于得到了对应的套接字和 PER_IO_DATA,因为我们是同步更新这三个地方的。

接着通过 OVERLAPPED 结构体的 InternalHigh 字段来得到实际接收的字节数,忘了的可以看前面这个结构的说明。

字节数不为 0 就表示有收到数据,我们通过 PER_IO_DATAbuf 字段可以得到接收的数据,我们把小纸条投递上去,系统已经帮我们把这些内容都准备好了。

大家可能会说这里怎么还是使用的 send 函数发送数据,而非 WSASend 函数呢?可以想一下。

WSARecvWSASend 函数其实更多的是为 Completion Routine 方式使用的,也就是前面所说的第二种方法,因为我们这里需要系统帮我们填写小纸条中的数据,所以有必要使用 WSARecv 函数。但在发送数据的时候已经万事俱备了,数据,大小,套接字都准备好了,所以也就没必要使用 WSASend 函数了。

最后,再次调用一次 WSARecv 函数以接收其它的数据请求。当客户端退出时,收到的字节数会为 0,在这里我们专门使用 Cleanup 函数来处理:

void COvpEventServer::Cleanup(int index)
{
    closesocket(m_hbInfo.hSockAddr[index]);
    WSACloseEvent(m_hbInfo.hEventAddr[index]);
    m_hIoInfo[index].reset();
    for (int i = index; i < m_nNumOfClient; ++i)
    {
        m_hbInfo.hSockAddr[index] = m_hbInfo.hSockAddr[index + 1];
        m_hbInfo.hEventAddr[index] = m_hbInfo.hEventAddr[index + 1];
        m_hIoInfo[index] = m_hIoInfo[index + 1];
    }
}

这个函数比较简单,在这里关闭断开连接的套接字和事件,对于引用计数,可以调用 reset() 重置下计数。然后重置下数组,没什么好说的。

第一种方法说完了,可以结合事件选择模型对比着看,测试就不截图了。

其实重叠 IO 主要用的是第二种方式,这种方式没有客户端的限制,实现起来也稍微简单点,但这篇好像已经有点长了,就放到下篇来专门说吧。

Leave a Reply

Your email address will not be published. Required fields are marked *

You can use the Markdown in the comment form.