前言

这是IOCP的末篇了,本次的实例使用IOCP配合扩展函数来实现服务器,并对之前的版本做一些优化,比如这里使用了内存池,日志记录,所以这也是效率最好的一个版本,作为一个例子来说已经很完整了。

因为前面已经介绍了较多的基础内容,并且也写出了实例一,所以这篇大部分内容都不会再详细讲解相关内容了,这写起来太费时间了。取而代之的是代码会全部列出来,若大家有哪里看不懂,那就说明前三篇未全部理解,这时应该再看看前面的文章。

类预览

首先来预览下类的定义,看看我们需要做些什么:

#include <thread>  // 线程库
#include <mutex>  // 线程同步互斥量
#include <string>  // 不用说
#include <memory>  // 用到智能指针
#include <cassert>  // 断言
#include <vector>  // 使用vector保存连接的用户
#include <algorithm>  // 算法库
#include <boost/pool/singleton_pool.hpp>  // 内存池
#include <boost/format.hpp>  // 格式化字符
#include <WinSock2.h>  // winsock2
#include <MSWSock.h>  // 扩展函数
#include "utilities.h"  // 包含了一些自己写的小工具,这里用到Log日志工具
#pragma comment(lib, "ws2_32.lib")

namespace network
{
    class IOCPServer;

    using namespace utilities;

    // 缓冲区大小
    const int BUF_SIZE = 1024;

    // 操作类型
    enum class OP_TYPE {
        OP_ACCEPT,
        OP_READ,
        OP_WRITE
    };

    // 单句柄数据
    typedef struct {
        SOCKET hSock;
        SOCKADDR_IN sockAddr;
    }PER_HANDLE_DATA, *LPPER_HANDLE_DATA;

    // 单IO数据
    typedef struct {
        WSAOVERLAPPED overlapped;
        WSABUF wsaBuf;
        char buf[BUF_SIZE];
        OP_TYPE opType;
        SOCKET clntSock;
    }PER_IO_DATA, *LPPER_IO_DATA;

    class IOCPServer
    {
    public:
        using handle_spl     =  boost::singleton_pool<PER_HANDLE_DATA, sizeof(PER_HANDLE_DATA)>;
        using io_spl         =  boost::singleton_pool<PER_IO_DATA, sizeof(PER_IO_DATA)>;
        using handle_ptr     =  std::shared_ptr<PER_HANDLE_DATA>;
        using phandle_vector =  std::vector<LPPER_HANDLE_DATA>;

        // 构造函数
        IOCPServer(int port);
        ~IOCPServer();
        void Accept();
        void Run();

    private:
        void InitSock();  // 初始化套接字
        void CreateIocp();  // 创建IOCP
        bool LoadExtensionFunc();  // 加载扩展函数
        void AcceptEx();  // 接受操作
        void AcceptDelivery();  // 投递接受操作
        void AcceptHandler(LPPER_IO_DATA);  // 接受处理
        void RequestHandler();  // 请求处理
        void RecvMsg(SOCKET, LPPER_IO_DATA);  // 接收消息
        void SendMsg(SOCKET, LPPER_IO_DATA, const std::string& msg);  // 发送消息
        void CloseSock(LPPER_HANDLE_DATA, LPPER_IO_DATA);  // 关闭套接字

    private:
        handle_ptr m_servSock;  // 服务器单句柄数据
        phandle_vector m_vAcceptedSock;  // 保存连进来的用户信息 
        LPPER_HANDLE_DATA m_clntSock;  // 客户端单句柄数据
        LPPER_IO_DATA m_ioInfo;  // 单IO数据
        LPFN_ACCEPTEX m_lpfnAcceptEx;  // AcceptEx函数指针
        LPFN_GETACCEPTEXSOCKADDRS m_lpfnGetAcceptExSockaddrs;  // GetAccetExSockaddrs函数指针
        HANDLE m_hCompPort;  // 完成端口
        int m_nThreads;  // 线程数量
        int m_nPort;  // 端口
        std::mutex m_mtx;  // 同步的互斥量
        Log log;  // 日志类
    };
}

大多函数名都和之前的版本保持一致,所以大家应该不会觉得太陌生,这里来说说和之前不同的部分。

先是操作类型,这里是如此定义的:

enum class OP_TYPE {
    OP_ACCEPT,  // 接受操作
    OP_READ,  // 读取操作
    OP_WRITE  // 写入操作
};

这里使用了作用域限制的 enum(scoped enum),这是C++11的东西,它的语法就是在 enum 后面加一个 class,这样这些enums就有了作用域限制,亦即访问变成了 OP_TYPE::OP_READ,于是避免了枚举元素直接包含到 namespace 下。这里还加了一个类型 OP_ACCEPT,用于标记接受操作,后面会用到。

PER_IO_DATA 中还添加了一个 clntSock 成员用于在”小纸条“上保存远程套接字,亦即客户端套接字。

typedef struct {
    WSAOVERLAPPED overlapped;
    WSABUF wsaBuf;
    char buf[BUF_SIZE];
    OP_TYPE opType;
    SOCKET clntSock;
}PER_IO_DATA, *LPPER_IO_DATA;

接着使用声明别名简化了几个名字较长的类型,这也是C++11的东西,一般来说和 typedef 没什么不同,但 typedef 不支持模板化,而且在声明模板别名时也不如 using 方便。

using handle_spl     =  boost::singleton_pool<PER_HANDLE_DATA, sizeof(PER_HANDLE_DATA)>;
using io_spl         =  boost::singleton_pool<PER_IO_DATA, sizeof(PER_IO_DATA)>;
using handle_ptr     =  std::shared_ptr<PER_HANDLE_DATA>;
using phandle_vector =  std::vector<LPPER_HANDLE_DATA>;

这里还使用了 boost 库中的内存池 singleton_pool,之前我们都是使用的 new 来动态分配,在客户端退出时再 delete 掉,这样就会有大量的申请释放动作,影响效率。其实退出客户端的内存不用急着释放,可以接着准备给后面连接的用户使用,这样就会省掉许多申请释放的操作,统一从池中分配,最后再统一释放,这就是内存池的作用。

在定义中多了许多扩展函数的类型,这些上篇讲过,主要是如何运用到IOCP中,在后面我们再来看实现。

日志工具(Log)

在这个版本中不再使用 cout 来输出了,我们将相关信息记录到文件中,这样不仅可以方便查看,而且可以保存的更久。

这里使用的是我自己以前写的一个小工具,包含在 utilities.h 中,其中有一个 Log 类,专门用于日志记录。

这个 Log 工具使用起来非常简单,就像这样:

Log log;
log.d("");  // 调试信息
log.i("");  // 普通信息
log.w("");  // 警告信息
log.e("");  // 错误信息

其中重载了 operator<<,所以也可以这样使用:

LOG_DEBUG << "" << "";  // 调度信息
LOG_INFO << "";         // 普通信息
LOG_WARNING << "";      // 警告信息
LOG_ERROR << "";        // 错误信息

若没有指定目录,它会在当前程序目录下新建:

"log/当前日期/log_xx.log"

xx 对应 Log 等级,若是普通信息则为 log_i.log,调试信息则为 log_d.log,总共4个文件。这样我们就把日志类型分开了,有错误时直接看 log_e.log,有警告时直接看 log_w.log。当然,也可以使用 SetLogDestination 函数来手动设置这4个文件的输出目录。

若在调试时我们想直接在 dos 中看,那么可以调用 ENABLE_LOG_TOSTERR 函数并设置为 true,再想输出到文件可以重新设为 false

输出的内容中可能处于各个不同的程序,如何区分呢?可以使用 ENABLE_LOG_TAG 函数来为不同程序设置不同标记,设置后日志的内容为:

"当前时间--标记内容:日志内容"

构造与析构函数

这两个函数未做什么改变,和第一版的实现一样,可以简单的看下:

network::IOCPServer::IOCPServer(int port)
    : m_nPort(port)
{
    m_servSock = std::make_shared<PER_HANDLE_DATA>();
    assert(m_servSock != nullptr);
    ZeroMemory(m_servSock.get(), sizeof(PER_HANDLE_DATA));

    InitSock();

    ENABLE_LOG_TAG("IOCPServer");
}

在这里我使用 ENABLE_LOG_TAG 开启了日志的标志功能,并将其设为 IOCPServer,这样日志记录的时候就会包含这个标志了。

析构函数:

network::IOCPServer::~IOCPServer()
{
    for (int i = 0; i < m_nThreads; ++i)
    {
        PostQueuedCompletionStatus(m_hCompPort, 0xFFFFFFFF, NULL, NULL);
    }

    std::for_each(m_vAcceptedSock.begin(), m_vAcceptedSock.end(),
        [&](LPPER_HANDLE_DATA& hd) {
        closesocket(hd->hSock);
        hd->hSock = INVALID_SOCKET;
    });

    WSACleanup();
}

初始化套接字与创建完成端口

InitSock 函数用于初始化套接字相关,其实现如下:

void network::IOCPServer::InitSock()
{
    WSADATA wsaData;
    auto ret = WSAStartup(0x0202, &wsaData);
    assert(ret == 0);

    // 创建支持重叠IO的套接字
    m_servSock->hSock = WSASocketW(PF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);

    // 绑定
    m_servSock->sockAddr.sin_family = AF_INET;
    m_servSock->sockAddr.sin_addr.s_addr = htonl(INADDR_ANY);
    m_servSock->sockAddr.sin_port = htons(m_nPort);

    ret = bind(m_servSock->hSock, (SOCKADDR*)&m_servSock->sockAddr, sizeof(SOCKADDR_IN));
    assert(ret != SOCKET_ERROR);

    // 创建CP对象
    CreateIocp();

    // 连接服务器套接字与CP对象
    CreateIoCompletionPort((HANDLE)m_servSock->hSock, m_hCompPort, (ULONG_PTR)m_servSock.get(), NULL);

    // 监听
    ret = listen(m_servSock->hSock, 5);
    assert(ret != SOCKET_ERROR);
}

这里,和版本一不同的地方在于22行将服务器套接字与CP对象也进行了关联,这样在处理消息的线程中我们可以获得到服务端的接收操作,因为已经不用 accept 函数了,所以不能在那里处理了。

CreateIocp 函数用于创建CP对象并开启线程,内容和实例一中相同,所以不多作解释了:

void network::IOCPServer::CreateIocp()
{
    // 创建CP对象
    m_hCompPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 0);

    SYSTEM_INFO sysInfo;
    GetSystemInfo(&sysInfo);
    m_nThreads = sysInfo.dwNumberOfProcessors * 2;
    for (int i = 0; i < m_nThreads; ++i)
    {
        std::thread t(&IOCPServer::RequestHandler, this);
        t.detach();
    }
}

加载扩展函数

要使用 AcceptGetAcceptExSockaddrs 函数,首先通过 WSAIoctl 函数加载到函数指针上,这是上篇的内容,这里只是专门放到了一个函数中处理。

bool network::IOCPServer::LoadExtensionFunc()
{
    GUID guidAcceptEx = WSAID_ACCEPTEX;
    GUID guidGetAcceptExSockaddrs = WSAID_GETACCEPTEXSOCKADDRS;
    DWORD dwBytes;

    // 加载AcceptEx函数
    auto ret = WSAIoctl(m_servSock->hSock,
        SIO_GET_EXTENSION_FUNCTION_POINTER,
        &guidAcceptEx, sizeof(guidAcceptEx),
        &m_lpfnAcceptEx, sizeof(m_lpfnAcceptEx),
        &dwBytes, NULL, NULL
    );

    boost::format fmt;
    if (ret == SOCKET_ERROR)
    {
        if (WSAGetLastError() != WSA_IO_PENDING)
        {
            fmt = boost::format("%s%d") % "Load AcceptEx faild! Error code:" % WSAGetLastError();
            log.e(fmt.str());
            return false;
        }
    }

    // 加载GetAcceptExSockaddrs函数
    ret = WSAIoctl(m_servSock->hSock,
        SIO_GET_EXTENSION_FUNCTION_POINTER,
        &guidGetAcceptExSockaddrs, sizeof(guidGetAcceptExSockaddrs),
        &m_lpfnGetAcceptExSockaddrs, sizeof(m_lpfnGetAcceptExSockaddrs),
        &dwBytes, NULL, NULL
    );
    if (ret == SOCKET_ERROR)
    {
        fmt = boost::format("%s%d") % "Load GetAcceptExSockaddrs failed! Error code:" % WSAGetLastError();
        log.e(fmt.str());
        return false;
    }

    return true;
}

加载事项和上篇所说一致,这里首次使用了 Log 来记录错误消息,因为普通字符串无法和 DWORD 直接相+,所以这里使用了 boost 中的另一个工具 formatformat 可以格式化不同的内容,就像 printf 一样,使用 % 号来连接后面的内容,即 %s%d 的实值,使用起来十分简单。通过使用其中的 str 函数,可以获得到字符串形式的内容。

投递接受处理

AcceptDelivery 函数中,投递了10个接收处理来准备接受连接的用户:

void network::IOCPServer::AcceptDelivery()
{
    for (int i = 0; i < 10; ++i)
    {
        AcceptEx();
    }
}

具体功能是通过 AcceptEx 函数实现的:

void network::IOCPServer::AcceptEx()
{
    // 从内存池中申请内存
    m_ioInfo = (LPPER_IO_DATA)io_spl::malloc();
    assert(io_spl::is_from(m_ioInfo));

    ZeroMemory(m_ioInfo, sizeof(PER_IO_DATA));
    m_ioInfo->wsaBuf.buf = m_ioInfo->buf;
    m_ioInfo->wsaBuf.len = BUF_SIZE;
    m_ioInfo->opType = OP_TYPE::OP_ACCEPT;

    // 为将要接受的连接创建客户机套接字
    m_ioInfo->clntSock = WSASocketW(PF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);

    DWORD dwBytes;
    auto ret = m_lpfnAcceptEx(m_servSock->hSock,
        m_ioInfo->clntSock,
        m_ioInfo->wsaBuf.buf,
        m_ioInfo->wsaBuf.len - (sizeof(SOCKADDR_IN) + 16) * 2, // 若不希望AcceptEx建立连接后等待用户发送数据,则设为0
        sizeof(SOCKADDR_IN) + 16,
        sizeof(SOCKADDR_IN) + 16,
        &dwBytes,
        &m_ioInfo->overlapped
    );

    if (!ret)
    {
        if (WSAGetLastError() != WSA_IO_PENDING)
        {
            auto fmt = boost::format("%s%d") % "AcceptEx failed! Error code:" % WSAGetLastError();
            log.e(fmt.str());
        }
    }
}

这里我们从内存池中为”小纸条“申请内存,singleton_pool 中提供了 malloc 函数可以从内存池中申请内存,使用 is_from 函数可以确认该块内存是否是从该内存池中申请的。

接着初始化了”小纸条“,并为客户端先准备好套接字,原理在扩展函数那篇也已经讲过了,所以这里也不再赘述。

主要来看看扩展函数 AcceptEx 的使用:

auto ret = m_lpfnAcceptEx(m_servSock->hSock,  // 服务端套接字
    m_ioInfo->clntSock,  // 客户机套接字
    m_ioInfo->wsaBuf.buf,  // 用于接收数据的缓冲区
    m_ioInfo->wsaBuf.len - (sizeof(SOCKADDR_IN) + 16) * 2, // 若不希望AcceptEx建立连接后等待用户发送数据,则设为0
    sizeof(SOCKADDR_IN) + 16,  // 本地套接字地址结构大小
    sizeof(SOCKADDR_IN) + 16,  // 远程套接字地址结构大小
    &dwBytes,  // 实际收到的数据大小
    &m_ioInfo->overlapped  // 重叠结构
);

注释已经很清楚的解释了各个参数的意义,扩展函数 AcceptEx 是通过 WSAIoctl 函数获取到函数指针 m_lpfnAcceptEx 里的,这个函数是异步的,所以不会阻塞。

我们传入为每个客户端准备的”小纸条“(m_ioInfo),系统去替我们执行耗时操作,处理完了它会在上面记下我们需要的内容,稍后我们可以在处理消息函数中通过 GetQueuedCompletionStatus 函数拿到。

处理用户请求

程序的主要逻辑都集中在了 RequestHandler 函数中,也是我们前面开启的线程函数,异步处理的消息我们都能在这里获得到。实现如下:

void network::IOCPServer::RequestHandler()
{
    LPPER_HANDLE_DATA lpHandleInfo;
    LPPER_IO_DATA lpIoInfo;
    DWORD dwBytesTrans;
    for (;;)
    {
        auto ret = GetQueuedCompletionStatus(m_hCompPort, &dwBytesTrans,
            (LPDWORD)&lpHandleInfo, (LPOVERLAPPED*)&lpIoInfo, INFINITE);
        if (ret == 0)
        {
            if (WSAGetLastError() == WAIT_TIMEOUT)
                continue;
            else
            {
                auto fmt = boost::format("%s%d") % "GetQueuedCompletionStatus failed! Error code:" % GetLastError();
                log.e(fmt.str());
                break;
            }
        }

        std::lock_guard<std::mutex> lg(m_mtx);

        if (dwBytesTrans == 0xFFFFFFFF)
        {
            break;
        }

        switch (lpIoInfo->opType)
        {
        case OP_TYPE::OP_ACCEPT:
            AcceptHandler(lpIoInfo);
            break;
        case OP_TYPE::OP_READ:
        {
            if (dwBytesTrans == 0)  // EOP
            {
                CloseSock(lpHandleInfo, lpIoInfo);
                continue;
            }

            // 将客户端消息转发回去
            std::string msg(lpIoInfo->buf);
            log.i("received:" + msg);

            SendMsg(lpHandleInfo->hSock, lpIoInfo, msg);
        }
            break;
        case OP_TYPE::OP_WRITE:
            RecvMsg(lpHandleInfo->hSock, lpIoInfo);
            break;
        }
    }
}

关于IOCP的主要原理我们在实例一中已经写过了,所以这里不会再说重复的内容,主要来看有关扩展函数的部分。

通过 OP_ACCEPT 操作类型获取到连接用户的请求,这里的数据就是扩展函数 AcceptEx 投递过来的,对于连接进来的用户,我们专门用 AcceptHandler 函数来处理,其实现如下:

void network::IOCPServer::AcceptHandler(LPPER_IO_DATA lpIoInfo)
{
    // 使客户端套接字具有和服务器套接相同的属性
    setsockopt(lpIoInfo->clntSock, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT,
        (char*)&lpIoInfo->clntSock, sizeof(SOCKET));

    SOCKADDR_IN *lpLocalSockAddr = nullptr, *lpRemoteSockAddr = nullptr;
    int LocalSockAddr, RemoteSockAddr;

    m_lpfnGetAcceptExSockaddrs(lpIoInfo->wsaBuf.buf,
        lpIoInfo->wsaBuf.len - (sizeof(SOCKADDR_IN) + 16) * 2,
        sizeof(SOCKADDR_IN) + 16,
        sizeof(SOCKADDR_IN) + 16,
        (SOCKADDR**)&lpLocalSockAddr, &LocalSockAddr,
        (SOCKADDR**)&lpRemoteSockAddr, &RemoteSockAddr
        );

    auto fmt = boost::format("%s%d") % "connected client..." % lpIoInfo->clntSock;
    log.i(fmt.str());

    // 从内存池分配内存
    m_clntSock = (LPPER_HANDLE_DATA)handle_spl::malloc();
    assert(handle_spl::is_from(m_clntSock));

    // 写入连接进来的客户端套接字和地址
    m_clntSock->hSock = lpIoInfo->clntSock;
    memcpy(&m_clntSock->sockAddr, lpRemoteSockAddr, sizeof(SOCKADDR_IN));

    // 连接客户端套接字与CP对象
    CreateIoCompletionPort((HANDLE)m_clntSock->hSock, m_hCompPort, (ULONG_PTR)m_clntSock, NULL);

    // 保存用户信息
    m_vAcceptedSock.push_back(m_clntSock);

    fmt = boost::format("%s%d") % "waiting for data arrived..." % m_clntSock->hSock;
    log.i(fmt.str());

    if (lpIoInfo->overlapped.InternalHigh != 0)
    {
        std::string msg(lpIoInfo->buf);

        // 输出普通日志
        log.i("received:" + msg);

        SendMsg(m_clntSock->hSock, lpIoInfo, msg);
    }
    else
    {
        RecvMsg(m_clntSock->hSock, lpIoInfo);
    }

    // 处理好一位客户后,投递下一个连接等待
    AcceptEx();
}

这里就相当于实例一中 accept 函数处理的那部分了,有用户连接进来就使用 GetAcceptExSockaddrs 获取到其地址信息,为其分配单句柄数据,再和CP对象连接起来,最后使用 vector 保存已连接用户的信息。

由于我们在扩展函数 AcceptEx 中设置了同时接受第一份数据,所以其实在连接时就已经获得了客户端发来的第一份数据,而这份数据就保存在"小纸条”中的缓冲区里,而大小保存在哪里呢?在重叠IO中我们就说边可以直接通过重叠结构的 InternalHigh 字段获取到的。若包含第一份数据,我们则输出该数据,然后再转发给客户端;若没有包含,我们就直接接收下一份数据。

处理好一位用户之后,再调用 AcceptEx 函数投递下一个连接等待,以此处理所有的用户请求。

接收与发送消息

这部分和实例一完全没有区别,便只列出实现。

void network::IOCPServer::RecvMsg(SOCKET sock, LPPER_IO_DATA lpIoInfo)
{
    ZeroMemory(lpIoInfo, sizeof(PER_IO_DATA));
    lpIoInfo->wsaBuf.buf = lpIoInfo->buf;
    lpIoInfo->wsaBuf.len = BUF_SIZE;
    lpIoInfo->opType = OP_TYPE::OP_READ;

    DWORD recvBytes, flags = 0;
    WSARecv(sock, &lpIoInfo->wsaBuf, 1, &recvBytes, &flags, &lpIoInfo->overlapped, NULL);
}

void network::IOCPServer::SendMsg(SOCKET sock, LPPER_IO_DATA lpIoInfo, const std::string& msg)
{
    ZeroMemory(lpIoInfo, sizeof(PER_IO_DATA));
    memcpy(lpIoInfo->buf, msg.c_str(), msg.size());
    lpIoInfo->wsaBuf.buf = lpIoInfo->buf;
    lpIoInfo->wsaBuf.len = msg.size();
    lpIoInfo->opType = OP_TYPE::OP_WRITE;

    WSASend(sock, &lpIoInfo->wsaBuf, 1, NULL, 0, &lpIoInfo->overlapped, NULL);
}

清除释放

CloseSock 函数也与实例一区别不大,只是这里释放内存是归还给内存池,而非直接 delete

void network::IOCPServer::CloseSock(LPPER_HANDLE_DATA lpHandleInfo, LPPER_IO_DATA lpIoInfo)
{
    // 输出普通日志
    auto fmt = boost::format("%s%d") % "disconnected client...." % lpHandleInfo->hSock;
    log.i(fmt.str());

    auto pos = std::find_if(m_vAcceptedSock.begin(), m_vAcceptedSock.end(),
        [&](LPPER_HANDLE_DATA& hd) {
        return hd->hSock == lpHandleInfo->hSock;
    });

    if (pos != m_vAcceptedSock.end())
    {
        closesocket((*pos)->hSock);
        m_vAcceptedSock.erase(pos);

        // 归还从内存池申请的内存
        if(handle_spl::is_from(lpHandleInfo))
            handle_spl::free(lpHandleInfo);
        if(io_spl::is_from(lpIoInfo))
            io_spl::free(lpIoInfo);
    }
}

总结

本篇算是一个综合性较强的应用,若能全部理解,那么对于IOCP模型你就理解的差不多了。Windows的网络模型在这里也就全部介绍完了,后面主要介绍网络库的使用,而要理解网络库,必须得有这些知识做基础,这些后面再说吧。大家可以发现这个例子也仅仅是个基本的网络服务器雏形罢了,要想写一个完整的项目,随便都得上千行了,那其中还包含许多sql,加密解密,界面等等技术,要是游戏的服务器那涉及的就更多了。这些东西后面有时间我都会总结一些文章的,等介绍了更多基础东西我们再来写一个真正完整的项目吧。

Leave a Reply

Your email address will not be published. Required fields are marked *

You can use the Markdown in the comment form.