上篇简单地介绍了IOCP模型所需的基础内容,并给出了服务器版本一的声明,更多的内容会在本篇的实现中来展开说明,学完这篇就基本会明白怎样用IOCP来实现一个还不错的上万级别的服务器了。

开始之前得对上篇的类声明做一些改变,因为前天本来是用智能指针来管理客户端单句柄数据和单IO数据的,但当我那晚实现完后测试发现有点问题。当客户端程序低并发访问时没有任何问题,但在用脚本同时开很多客户端时有些客户端得不到正常关闭,这就说明服务器有问题了。开始我以为是线程同步的问题,跟踪了好久发现原来是智能指针的问题。在收到用户连接请求准备接收数据后就会准备接收下一位用户连接了,而在处理IO消息的线程中并非一直拥有指针的所有权,会在退出当前线程时丢失那么一小会儿,一当智能指针的引用计数变为0时原来分配的单IO数据就被释放了,但稍后却还需要用到它,因为已被释放所以稍后会导致访问已被释放的内存,就算能访问到里面也是一堆垃圾数据,后面的操作就得不到执行了,客户端也就无法正常关闭了。

所以在这种多线程中智能指针的使用也得格外小心,于是我换成了普通指针,之后没有任何问题。来看看改变后的类声明:

class IOCPServer
{
public:
    IOCPServer(int port);
    ~IOCPServer();
    void Accept();

private:
    void InitSock();  // 初始化套接字
    void CreateIocp();  // 创建IOCP并开启线程
    void AcceptHandler();  // 接受处理
    void RequestHandler();  // IO消息处理线程
    void RecvMsg(SOCKET, LPPER_IO_DATA);  // 接收消息
    void SendMsg(SOCKET, LPPER_IO_DATA, const std::string&);  // 发送消息
    void CloseSock(SOCKET, LPPER_HANDLE_DATA, LPPER_IO_DATA);  // 关闭套接字

private:
    std::shared_ptr<PER_HANDLE_DATA> m_servSock;  // 服务端单句柄数据
    LPPER_HANDLE_DATA m_clntSock;  // 客户端单句柄数据
    LPPER_IO_DATA m_ioInfo;  // 客户端单IO数据
    std::vector<LPPER_HANDLE_DATA> m_vAcceptedSock;  // 保存客户端
    HANDLE m_hCompPort;  // 完成端口
    int m_nPort;  // 端口
    int m_nThreads;  // 保存开启的线程数
    std::mutex m_mtx;  // 线程同步的互斥量
};

这里把要在多线程中使用的单句柄数据和单IO数据都换成了普通指针,并更改了一些成员函数的参数。

首先来看构造函数:

IOCPServer::IOCPServer(int port)
    : m_nPort(port)
{
    m_servSock = std::make_shared<PER_HANDLE_DATA>();
    assert(m_servSock != nullptr);
    ZeroMemory(m_servSock.get(), sizeof(PER_HANDLE_DATA));

    InitSock();
}

在这里初始化了端口号,并通过智能指针分配了服务端的单句柄数据,接着调用 InitSock 函数来初始化套接字相关内容,该函数实现如下:

void IOCPServer::InitSock()
{
    WSADATA wsaData;
    int ret = WSAStartup(0x0202, &wsaData);
    assert(ret == 0);

    // 创建支持重叠IO的套接字
    m_servSock->hSock = WSASocketW(PF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);

    // 创建IOCP
    CreateIocp();

    // 绑定
    m_servSock->sockAddr.sin_family = AF_INET;
    m_servSock->sockAddr.sin_addr.s_addr = htonl(INADDR_ANY);
    m_servSock->sockAddr.sin_port = htons(m_nPort);

    ret = bind(m_servSock->hSock, (SOCKADDR*)&m_servSock->sockAddr, sizeof(m_servSock->sockAddr));
    assert(ret != SOCKET_ERROR);

    // 监听
    ret = listen(m_servSock->hSock, 5);
    assert(ret != SOCKET_ERROR);
}

这里只有 CreateIocp 这个函数是新内容,其他大家应该不陌生了,所以直接来看这个函数:

void IOCPServer::CreateIocp()
{
    // 创建CP对象
    m_hCompPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 0);

    // 根据CPU核心数开线程
    SYSTEM_INFO sysInfo;
    GetSystemInfo(&sysInfo);
    m_nThreads = sysInfo.dwNumberOfProcessors * 2;
    for (unsigned i = 0; i < m_nThreads; ++i)
    {
        std::thread t(&IOCPServer::RequestHandler, this);
        t.detach();
    }
}

首先,使用 CreateIoCompletionPort 函数的第一个功能创建了CP对象,最后一个参数传0表示默认允许的最大线程数为CPU核心数。

其次,使用 GetSystemInfo 函数获取系统信息,其中就包含着CPU核心数。一般来说我们会开 CPU核心数 * 2 个线程,开启过多的线程在使用时就会进行多次的上下文切换,较多时间都花在这上面就会导致处理具体内容的时间就少了,影响效率。而让开启的线程数恰好与CPU核心数保持一致就不用进行这种开销了,所以默认允许的最大线程数就为CPU的核心数。

那为何我们还要多开2倍的线程呢?这是因为IOCP中实际最大允许同时运行的线程数就是创建CP对象时 CreateIoCompletionPort 函数的最后一个参数指定的,比如我的CPU是8核的,所以同时最多就只有8条线程处于可运行状态,即使我们开启了16个线程,IOCP最多也只会运行8个线程,那么多开的线程有什么用呢?这是为了防止处于可运行状态的这8个线程中有调用 SleepWaitForSingleObject 等函数使线程变成了不可调度,此时就会开启第9个线程来进行处理,等过段时间变为不可调试的线程可运行后结束执行线程数就又下降了,这样就使CPU处于满负载状态下工作。所以就算你开启更多的线程也没有用,就算8条线程都变为不可调度了,也最多只会再启动8条来救场,所以此处我们开启CPU核心数*2条线程。

最后,这里还用到了 threaddetach 函数,我说一般不会用到这个函数,这里却得使用它来分离线程,若是使用 join 程序便会阻塞到这了,这不是我们想要的,而且我们也不需要线程的所有权。

接着,来看析构函数的处理:

IOCPServer::~IOCPServer()
{
    // 关闭服务端套接字
    closesocket(m_servSock->hSock);
    m_servSock->hSock = INVALID_SOCKET;

    for(int i = 0; i < m_nThreads; ++i)
        PostQueuedCompletionStatus(m_hCompPort, 0xFFFFFFFF, NULL, NULL);

    std::for_each(m_vAcceptedSock.begin(), m_vAcceptedSock.end(),
        [&](LPPER_HANDLE_DATA& sp) {
        closesocket(sp->hSock);
        sp->hSock = INVALID_SOCKET;
    });
    WSACleanup();
}

在这里进行清理工作,首先关闭服务端的套接字,因为使用了智能指针,所以我们无需手动释放内存,它会自动为我们释放。

然后使用 PostQueuedCompletionStatus 函数来向所有开启的线程发送退出通知,我们使用 0xFFFFFFFF 来表示字节数,在线程中若是收到字节为 0xFFFFFFFF 的数据我们就退出线程。

最后使用了 for_each 算法配合 lambda 表达式来清理保存起来的客户端,这是因为有些客户端可能会有异常退出导致未正常清理(实际上这种情况很少很少),所以析构函数中我们来二次确认保证资源都被释放了。

现在来看重要的第一部分——接受客户端,这部分是由 AcceptHandler 函数完成的,实现如下:

void IOCPServer::AcceptHandler()
{
    SOCKET hClntSock;
    SOCKADDR_IN clntAddr;
    int addrLen = sizeof(clntAddr);
    for (;;)
    {
        hClntSock = accept(m_servSock->hSock, (SOCKADDR*)&clntAddr, &addrLen);

        std::lock_guard<std::mutex> lg(m_mtx);
        std::cout << "connected client..." << hClntSock << std::endl;

        // 动态分配,并写入连接进来的客户端套接字和地址
        m_clntSock = new PER_HANDLE_DATA;
        m_clntSock->hSock = hClntSock;
        memcpy(&m_clntSock->sockAddr, &clntAddr, addrLen);

        // 保存客户信息
        m_vAcceptedSock.push_back(m_clntSock);

        // 连接完成端口与客户端套接字
        CreateIoCompletionPort((HANDLE)m_clntSock->hSock, m_hCompPort, (ULONG_PTR)m_clntSock, 0);

        // 接收消息
        m_ioInfo = new PER_IO_DATA;
        RecvMsg(m_clntSock->hSock, m_ioInfo);
    }
}

由于现在实现的是第一个版本,所以这部分使用是 accept 函数,实现起来非常简单,但大家知道使用这个函数就不能完全发挥性能,因为accept 函数是阻塞的。虽然如此,一般来说也是够用的,因为一般不会有高并发的连接,主要是IO消息的处理,这部分是由第二部分——处理数据去处理的,所以多数时间这个线程是无所事事的,关键是实现起来简单了许多,所以很多人还在使用这个搞服务器。不过我们自然不满足于此,在第二个版本中我们会使用扩展函数来实现。

这里使用 accept 函数接受连接请求,然后为每个用户动态分配一个单句柄数据,保存该用户的套接字与地址信息到 m_vAcceptedSock 里面,后面要想扩展的话可以在里面保存更多信息,如用户名,用户级别等等。

接着使用 CreateIoCompletionPort 函数的第二个功能来连接客户端socket与完成端口,在第3个参数中我们传入了用户的单IO数据,就是将这部分操作交给系统去为我们做了,之后当该套接字有消息时我们便可通过 GetQueuedCompletionStatus 函数获取到并得到该用户的单IO数据。

最后动态分配了我们的“小纸条” m_ioInfo,这也是要为每个用户分配一个的,其中包含着重叠结构和接收消息的缓冲区还有操作类型。

大家别忘了这里是有一些公共资源要在线程中访问的,所以自然得用到线程同步,在第10行我们使用了 lock_guard 来加锁,以保证数据的安全访问。

RecvMsg 是我们的接收消息函数,实现如下:

void IOCPServer::RecvMsg(SOCKET sock, LPPER_IO_DATA lpIoInfo)
{
    // 清理内存
    ZeroMemory(lpIoInfo, sizeof(PER_IO_DATA));

    lpIoInfo->wsaBuf.buf = lpIoInfo->buf;
    lpIoInfo->wsaBuf.len = BUF_SIZE;
    lpIoInfo->opType = OP_READ;
    DWORD recvBytes, flags = 0;
    WSARecv(sock, &lpIoInfo->wsaBuf, 1, &recvBytes, &flags, &lpIoInfo->overlapped, NULL);
}

每次申请后清理内存是个好习惯,所以在最开始我们就将该结构全部置0,不然其中可能会有一些垃圾数据。接着这些就是重叠IO中所学习的东西了,只是我们在此处多了向“小纸条”上添加了操作类型为 OP_READ,表示这是一条接收消息。然后调用 WSARecv 函数异步接收用户数据的到来。对于这部分若有不明白的就直接参考重叠IO模型的文章,然后再来看就懂了。

读取说了,现在来看发送消息,即 SendMsg 函数:

void IOCPServer::SendMsg(SOCKET sock, LPPER_IO_DATA lpIoInfo, const std::string& msg)
{
    ZeroMemory(lpIoInfo, sizeof(PER_IO_DATA));
    memcpy(lpIoInfo->buf, msg.c_str(), msg.size());
    lpIoInfo->wsaBuf.buf = lpIoInfo->buf;
    lpIoInfo->wsaBuf.len = msg.size();
    lpIoInfo->opType = OP_WRITE;
    WSASend(sock, &lpIoInfo->wsaBuf, 1, NULL, 0, &lpIoInfo->overlapped, NULL);
}

发送和接收消息基本是一样的,第一个参数是套接字句柄,第二个参数是“小纸条”,第三个是要发送的消息。不同之处在于得把操作类型设为 OP_WRITE

终于到了讨论第二部分——数据处理的时候了,即我们的线程函数 RequestHandler:

void IOCPServer::RequestHandler()
{
    LPPER_IO_DATA lpIoInfo;
    LPPER_HANDLE_DATA lpHandleInfo;
    DWORD dwBytesTrans;
    for (;;)
    {
        // 获取IO完成队列
        int ret = GetQueuedCompletionStatus(m_hCompPort, &dwBytesTrans,
            (LPDWORD)&lpHandleInfo, (LPOVERLAPPED*)&lpIoInfo, INFINITE);
        if (ret == 0)
        {
            if (WSAGetLastError() == WAIT_TIMEOUT)
                continue;
            else
            {
                std::cerr << "GetQueuedCompletionStatus failed! Error code:" << WSAGetLastError() << std::endl;
                break;
            }
        }

        // 使用互斥量同步
        std::lock_guard<std::mutex> lg(m_mtx);

        // 退出线程
        if (dwBytesTrans == 0xFFFFFFFF)
        {
            break;
        }

        if (lpIoInfo->opType == OP_READ)  // 接收到数据
        {
            if (dwBytesTrans == 0)
            {
                CloseSock(lpHandleInfo->hSock, lpHandleInfo, lpIoInfo);
                continue;
            }

            std::cout << "received:" << lpIoInfo->buf << std::endl;

            std::string msg("reply-message");
            SendMsg(lpHandleInfo->hSock, lpIoInfo, msg);
        }
        else if (lpIoInfo->opType == OP_WRITE)  // 发送了数据
        {
            RecvMsg(lpHandleInfo->hSock, lpIoInfo);
        }
    }
}

此处便是获取系统通知并作处理的关键部分了,实际上在我们创建CP对象的时候系统就会创建一个IO完成队列,而其中的每条记录就包含着传输的字节数,我们传入的单句柄数据,单IO数据。当与CP对象连接的套接字有消息后系统就会往IO完成队列中添加一条记录,再有消息就往其中添加一条记录,这些记录是FIFO(先入先出)的,所以最先收到的消息就会最先被处理。

当调用 GetQueuedCompletionStatus 函数时,就会把当前线程变为等待状态,并将该线程添加到在创建CP对象时系统创建的一个等待线程队列中,比如我们创建此处创建了16条线程,那么开始时这16条线程就都会被添加到等待线程队列中。而这个队列的顺序是LIFO(后入先出)的,在当IO完成队列中有记录时(即有消息了),便会从队列最后开始唤醒线程来进行处理,此时会把该线程添加到已释放的线程列表中。若唤醒的线程很快就把处理完了,那么它又会调用 GetQueuedCompletionStatus 函数,此时线程就从已释放的线程列表中移除又被添加到等待线程队列了,因为是LIFO的,所以该线程还是最后一个。此时若IO完成队列中还有记录,那么被唤醒的就还是这个线程。

只有在当前线程未处理完时IO完成队列中的其它记录才会再从等待线程队列中唤醒线程来处理。而若记录很多,那么最多也只会等待线程队列中唤醒指定的最大线程数个线程,这样就避免了上下文切换带来的效率影响。而多余的线程如何唤醒呢?咱们在前面已经讲过了,当唤醒的线程中有调用 Sleep 等函数使线程变为阻塞了,那么系统会把线程从已释放线程列表中移除而添加到已暂停线程列表中,然后再从等待线程队列中唤醒下一个线程。此时就唤醒了多余开启的线程,而当阻塞的线程完毕了,此时就会运行的线程就会多于CPU的核心数,所以现在有些CPU就得处理一个以上的线程,但这期间很短,阻塞线程执行完了后就又被加入等待线程队列中了。这样一使CPU处于满负载状态下工作,效率很高。

现在回到代码,当获得到消息时,我们的线程就会被唤醒,此时得判断函数的返回值若为0则表示有错误发生,若有错误,我们进行了相应的处理。

这里还得说下 GetQueuedCompletionStatus 函数的的第三个和第四个参数,第三个就是我们的单句柄数据,这个是在将套接字与CP对象进行关联时使用 CreateCompletionPort 函数的第重叠参数传入的。而第四个参数是我们的“小纸条”,这是在接收和发送消息时我们通过 WSASendWSARecv 函数传入的,大家可能奇怪我们传入的不是其中的 overlapped 吗,这里怎么能完整地获取到小纸条呢?

WSARecv(sock, &lpIoInfo->wsaBuf, 1, &recvBytes, &flags, &lpIoInfo->overlapped, NULL);

其实大家C语言基础若是好的话应该能想明白的,因为结构体的地址就等于结构体中第一个元素的地址,所以:

LPPER_IO_DATA lpIoInfo = ...;
if(&lpIoInfo == &lpIoInfo->overlapped) {
    return true;
}

return false;

将会返回 true,所以得到了结构体的地址也就得到了整张“小纸条”,这也就是为何我们在前面要将overlapped放在结构体第一位的原因。

接着,我们又使用了同步操作,因为是在多线程中,我们要保证数据不能同时被两个线程访问。

最后,若数据类型是收到的消息,那么我们输出消息并回应客户端一条消息,而若收到的消息长度为0时,则表示客户端在关闭套接字时发送了 EOF,此时我们调用关闭函数 CloseSock 做清理工作。

而当向客户端发送了消息后,我们将又会获取到发送消息,此时操作类型为 OP_WRITE,这是我们在 SendMsg 中设置的,然后将又调用 RecvMsg 接收客户端的下一条消息。此处应该是很清晰的,便不做更多介绍了。

还剩下一个函数,即 CloseSock 函数,用来做清理与释放工作,实现如下:

void IOCPServer::CloseSock(SOCKET sock, LPPER_HANDLE_DATA lpHandleInfo, LPPER_IO_DATA lpIoInfo)
{
    std::cout << "disconnected client..." << sock << std::endl;

    auto pos = std::find_if(m_vAcceptedSock.begin(), m_vAcceptedSock.end(),
        [&](LPPER_HANDLE_DATA& sp) {
        return sp->hSock == sock;
    });

    if (pos != m_vAcceptedSock.end())
    {
        closesocket((*pos)->hSock);
        m_vAcceptedSock.erase(pos);
        delete lpHandleInfo;
        delete lpIoInfo;
    }
}

这里我们使用STL算法来从 m_vAcceptedSock 中找出退出的套接字位置,然后关闭该套接字并从列表中清除,最后,别忘了释放分配的内存,在高并发中,有上万用户,若忘记释放内存会被耗尽而导致死机的。

这里附一份测试图:

Output

我测试过一万多个客户端的高并发访问,完全没有问题的。不过还有一些东西得学,以增强我们的服务器,下篇继续来看。

1 thought on “网络模型之IOCP实现版本一(二)”

  1. 这里的单句柄数据只要把它当作一个保存着客户端信息的结构体就可以了。而对于IOCP,你可以结合Linux上的EPOLL来理解,IOCP中使用 CreateIoCompletion 函数的第一个功能创建CP对象,对应EPOLL中使用 epoll_create 函数创建epoll例程;IOCP中使用 CreateIoCompletionPort 函数的第二个功能将CP对象和套接字绑定起来,对应Epoll中使用 epoll_ctl 函数来监视文件描述符,而 epool_ctl 函数中可以指定 EPOLL_CTL_ADD 这些操作类型和 EPOLLIN 等不同的事件类型,所以它不需要IOCP中使用的单句柄数据和单IO数据,IOCP中只会通知有无数据,没有类型这些,也没有套接字信息,所以我们得使用在绑定时传入单句柄数据,在收发消息时传入单IO数据来做一些标记。最后的 GetIoCompletionStatus 函数就对应着Epoll中的 epoll_wait 函数了。本来我们使用的select模型在每次调用select的时候都得向操作系统传入监视对象的信息,这就是最影响效率的地方,而IOCP和EPOOL中只会向操作系统传递1次监视对象的信息,在有消息时通知我们,解决了这个问题。不过我感觉epool的并发有点不及iocp。

Leave a Reply

Your email address will not be published. Required fields are marked *

You can use the Markdown in the comment form.