网络模型之IOCP服务器实例二(四)
前言
这是IOCP的末篇了,本次的实例使用IOCP配合扩展函数来实现服务器,并对之前的版本做一些优化,比如这里使用了内存池,日志记录,所以这也是效率最好的一个版本,作为一个例子来说已经很完整了。
因为前面已经介绍了较多的基础内容,并且也写出了实例一,所以这篇大部分内容都不会再详细讲解相关内容了,这写起来太费时间了。取而代之的是代码会全部列出来,若大家有哪里看不懂,那就说明前三篇未全部理解,这时应该再看看前面的文章。
类预览
首先来预览下类的定义,看看我们需要做些什么:
#include <thread> // 线程库
#include <mutex> // 线程同步互斥量
#include <string> // 不用说
#include <memory> // 用到智能指针
#include <cassert> // 断言
#include <vector> // 使用vector保存连接的用户
#include <algorithm> // 算法库
#include <boost/pool/singleton_pool.hpp> // 内存池
#include <boost/format.hpp> // 格式化字符
#include <WinSock2.h> // winsock2
#include <MSWSock.h> // 扩展函数
#include "utilities.h" // 包含了一些自己写的小工具,这里用到Log日志工具
#pragma comment(lib, "ws2_32.lib")
namespace network
{
class IOCPServer;
using namespace utilities;
// 缓冲区大小
const int BUF_SIZE = 1024;
// 操作类型
enum class OP_TYPE {
OP_ACCEPT,
OP_READ,
OP_WRITE
};
// 单句柄数据
typedef struct {
SOCKET hSock;
SOCKADDR_IN sockAddr;
}PER_HANDLE_DATA, *LPPER_HANDLE_DATA;
// 单IO数据
typedef struct {
WSAOVERLAPPED overlapped;
WSABUF wsaBuf;
char buf[BUF_SIZE];
OP_TYPE opType;
SOCKET clntSock;
}PER_IO_DATA, *LPPER_IO_DATA;
class IOCPServer
{
public:
using handle_spl = boost::singleton_pool<PER_HANDLE_DATA, sizeof(PER_HANDLE_DATA)>;
using io_spl = boost::singleton_pool<PER_IO_DATA, sizeof(PER_IO_DATA)>;
using handle_ptr = std::shared_ptr<PER_HANDLE_DATA>;
using phandle_vector = std::vector<LPPER_HANDLE_DATA>;
// 构造函数
IOCPServer(int port);
~IOCPServer();
void Accept();
void Run();
private:
void InitSock(); // 初始化套接字
void CreateIocp(); // 创建IOCP
bool LoadExtensionFunc(); // 加载扩展函数
void AcceptEx(); // 接受操作
void AcceptDelivery(); // 投递接受操作
void AcceptHandler(LPPER_IO_DATA); // 接受处理
void RequestHandler(); // 请求处理
void RecvMsg(SOCKET, LPPER_IO_DATA); // 接收消息
void SendMsg(SOCKET, LPPER_IO_DATA, const std::string& msg); // 发送消息
void CloseSock(LPPER_HANDLE_DATA, LPPER_IO_DATA); // 关闭套接字
private:
handle_ptr m_servSock; // 服务器单句柄数据
phandle_vector m_vAcceptedSock; // 保存连进来的用户信息
LPPER_HANDLE_DATA m_clntSock; // 客户端单句柄数据
LPPER_IO_DATA m_ioInfo; // 单IO数据
LPFN_ACCEPTEX m_lpfnAcceptEx; // AcceptEx函数指针
LPFN_GETACCEPTEXSOCKADDRS m_lpfnGetAcceptExSockaddrs; // GetAccetExSockaddrs函数指针
HANDLE m_hCompPort; // 完成端口
int m_nThreads; // 线程数量
int m_nPort; // 端口
std::mutex m_mtx; // 同步的互斥量
Log log; // 日志类
};
}
大多函数名都和之前的版本保持一致,所以大家应该不会觉得太陌生,这里来说说和之前不同的部分。
先是操作类型,这里是如此定义的:
enum class OP_TYPE {
OP_ACCEPT, // 接受操作
OP_READ, // 读取操作
OP_WRITE // 写入操作
};
这里使用了作用域限制的 enum
(scoped enum),这是C++11的东西,它的语法就是在 enum
后面加一个 class
,这样这些enums就有了作用域限制,亦即访问变成了 OP_TYPE::OP_READ
,于是避免了枚举元素直接包含到 namespace
下。这里还加了一个类型 OP_ACCEPT
,用于标记接受操作,后面会用到。
在 PER_IO_DATA
中还添加了一个 clntSock
成员用于在”小纸条“上保存远程套接字,亦即客户端套接字。
typedef struct {
WSAOVERLAPPED overlapped;
WSABUF wsaBuf;
char buf[BUF_SIZE];
OP_TYPE opType;
SOCKET clntSock;
}PER_IO_DATA, *LPPER_IO_DATA;
接着使用声明别名简化了几个名字较长的类型,这也是C++11的东西,一般来说和 typedef
没什么不同,但 typedef
不支持模板化,而且在声明模板别名时也不如 using
方便。
using handle_spl = boost::singleton_pool<PER_HANDLE_DATA, sizeof(PER_HANDLE_DATA)>;
using io_spl = boost::singleton_pool<PER_IO_DATA, sizeof(PER_IO_DATA)>;
using handle_ptr = std::shared_ptr<PER_HANDLE_DATA>;
using phandle_vector = std::vector<LPPER_HANDLE_DATA>;
这里还使用了 boost
库中的内存池 singleton_pool
,之前我们都是使用的 new
来动态分配,在客户端退出时再 delete
掉,这样就会有大量的申请释放动作,影响效率。其实退出客户端的内存不用急着释放,可以接着准备给后面连接的用户使用,这样就会省掉许多申请释放的操作,统一从池中分配,最后再统一释放,这就是内存池的作用。
在定义中多了许多扩展函数的类型,这些上篇讲过,主要是如何运用到IOCP中,在后面我们再来看实现。
日志工具(Log)
在这个版本中不再使用 cout
来输出了,我们将相关信息记录到文件中,这样不仅可以方便查看,而且可以保存的更久。
这里使用的是我自己以前写的一个小工具,包含在 utilities.h
中,其中有一个 Log
类,专门用于日志记录。
这个 Log
工具使用起来非常简单,就像这样:
Log log;
log.d(""); // 调试信息
log.i(""); // 普通信息
log.w(""); // 警告信息
log.e(""); // 错误信息
其中重载了 operator<<
,所以也可以这样使用:
LOG_DEBUG << "" << ""; // 调度信息
LOG_INFO << ""; // 普通信息
LOG_WARNING << ""; // 警告信息
LOG_ERROR << ""; // 错误信息
若没有指定目录,它会在当前程序目录下新建:
"log/当前日期/log_xx.log"
xx
对应 Log
等级,若是普通信息则为 log_i.log
,调试信息则为 log_d.log
,总共4个文件。这样我们就把日志类型分开了,有错误时直接看 log_e.log
,有警告时直接看 log_w.log
。当然,也可以使用 SetLogDestination
函数来手动设置这4个文件的输出目录。
若在调试时我们想直接在 dos
中看,那么可以调用 ENABLE_LOG_TOSTERR
函数并设置为 true
,再想输出到文件可以重新设为 false
。
输出的内容中可能处于各个不同的程序,如何区分呢?可以使用 ENABLE_LOG_TAG
函数来为不同程序设置不同标记,设置后日志的内容为:
"当前时间--标记内容:日志内容"
构造与析构函数
这两个函数未做什么改变,和第一版的实现一样,可以简单的看下:
network::IOCPServer::IOCPServer(int port)
: m_nPort(port)
{
m_servSock = std::make_shared<PER_HANDLE_DATA>();
assert(m_servSock != nullptr);
ZeroMemory(m_servSock.get(), sizeof(PER_HANDLE_DATA));
InitSock();
ENABLE_LOG_TAG("IOCPServer");
}
在这里我使用 ENABLE_LOG_TAG
开启了日志的标志功能,并将其设为 IOCPServer
,这样日志记录的时候就会包含这个标志了。
析构函数:
network::IOCPServer::~IOCPServer()
{
for (int i = 0; i < m_nThreads; ++i)
{
PostQueuedCompletionStatus(m_hCompPort, 0xFFFFFFFF, NULL, NULL);
}
std::for_each(m_vAcceptedSock.begin(), m_vAcceptedSock.end(),
[&](LPPER_HANDLE_DATA& hd) {
closesocket(hd->hSock);
hd->hSock = INVALID_SOCKET;
});
WSACleanup();
}
初始化套接字与创建完成端口
InitSock
函数用于初始化套接字相关,其实现如下:
void network::IOCPServer::InitSock()
{
WSADATA wsaData;
auto ret = WSAStartup(0x0202, &wsaData);
assert(ret == 0);
// 创建支持重叠IO的套接字
m_servSock->hSock = WSASocketW(PF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);
// 绑定
m_servSock->sockAddr.sin_family = AF_INET;
m_servSock->sockAddr.sin_addr.s_addr = htonl(INADDR_ANY);
m_servSock->sockAddr.sin_port = htons(m_nPort);
ret = bind(m_servSock->hSock, (SOCKADDR*)&m_servSock->sockAddr, sizeof(SOCKADDR_IN));
assert(ret != SOCKET_ERROR);
// 创建CP对象
CreateIocp();
// 连接服务器套接字与CP对象
CreateIoCompletionPort((HANDLE)m_servSock->hSock, m_hCompPort, (ULONG_PTR)m_servSock.get(), NULL);
// 监听
ret = listen(m_servSock->hSock, 5);
assert(ret != SOCKET_ERROR);
}
这里,和版本一不同的地方在于22行将服务器套接字与CP对象也进行了关联,这样在处理消息的线程中我们可以获得到服务端的接收操作,因为已经不用 accept
函数了,所以不能在那里处理了。
CreateIocp
函数用于创建CP对象并开启线程,内容和实例一中相同,所以不多作解释了:
void network::IOCPServer::CreateIocp()
{
// 创建CP对象
m_hCompPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 0);
SYSTEM_INFO sysInfo;
GetSystemInfo(&sysInfo);
m_nThreads = sysInfo.dwNumberOfProcessors * 2;
for (int i = 0; i < m_nThreads; ++i)
{
std::thread t(&IOCPServer::RequestHandler, this);
t.detach();
}
}
加载扩展函数
要使用 Accept
和 GetAcceptExSockaddrs
函数,首先通过 WSAIoctl
函数加载到函数指针上,这是上篇的内容,这里只是专门放到了一个函数中处理。
bool network::IOCPServer::LoadExtensionFunc()
{
GUID guidAcceptEx = WSAID_ACCEPTEX;
GUID guidGetAcceptExSockaddrs = WSAID_GETACCEPTEXSOCKADDRS;
DWORD dwBytes;
// 加载AcceptEx函数
auto ret = WSAIoctl(m_servSock->hSock,
SIO_GET_EXTENSION_FUNCTION_POINTER,
&guidAcceptEx, sizeof(guidAcceptEx),
&m_lpfnAcceptEx, sizeof(m_lpfnAcceptEx),
&dwBytes, NULL, NULL
);
boost::format fmt;
if (ret == SOCKET_ERROR)
{
if (WSAGetLastError() != WSA_IO_PENDING)
{
fmt = boost::format("%s%d") % "Load AcceptEx faild! Error code:" % WSAGetLastError();
log.e(fmt.str());
return false;
}
}
// 加载GetAcceptExSockaddrs函数
ret = WSAIoctl(m_servSock->hSock,
SIO_GET_EXTENSION_FUNCTION_POINTER,
&guidGetAcceptExSockaddrs, sizeof(guidGetAcceptExSockaddrs),
&m_lpfnGetAcceptExSockaddrs, sizeof(m_lpfnGetAcceptExSockaddrs),
&dwBytes, NULL, NULL
);
if (ret == SOCKET_ERROR)
{
fmt = boost::format("%s%d") % "Load GetAcceptExSockaddrs failed! Error code:" % WSAGetLastError();
log.e(fmt.str());
return false;
}
return true;
}
加载事项和上篇所说一致,这里首次使用了 Log
来记录错误消息,因为普通字符串无法和 DWORD
直接相+,所以这里使用了 boost 中的另一个工具 format
,format
可以格式化不同的内容,就像 printf
一样,使用 %
号来连接后面的内容,即 %s%d
的实值,使用起来十分简单。通过使用其中的 str
函数,可以获得到字符串形式的内容。
投递接受处理
在 AcceptDelivery
函数中,投递了10个接收处理来准备接受连接的用户:
void network::IOCPServer::AcceptDelivery()
{
for (int i = 0; i < 10; ++i)
{
AcceptEx();
}
}
具体功能是通过 AcceptEx
函数实现的:
void network::IOCPServer::AcceptEx()
{
// 从内存池中申请内存
m_ioInfo = (LPPER_IO_DATA)io_spl::malloc();
assert(io_spl::is_from(m_ioInfo));
ZeroMemory(m_ioInfo, sizeof(PER_IO_DATA));
m_ioInfo->wsaBuf.buf = m_ioInfo->buf;
m_ioInfo->wsaBuf.len = BUF_SIZE;
m_ioInfo->opType = OP_TYPE::OP_ACCEPT;
// 为将要接受的连接创建客户机套接字
m_ioInfo->clntSock = WSASocketW(PF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);
DWORD dwBytes;
auto ret = m_lpfnAcceptEx(m_servSock->hSock,
m_ioInfo->clntSock,
m_ioInfo->wsaBuf.buf,
m_ioInfo->wsaBuf.len - (sizeof(SOCKADDR_IN) + 16) * 2, // 若不希望AcceptEx建立连接后等待用户发送数据,则设为0
sizeof(SOCKADDR_IN) + 16,
sizeof(SOCKADDR_IN) + 16,
&dwBytes,
&m_ioInfo->overlapped
);
if (!ret)
{
if (WSAGetLastError() != WSA_IO_PENDING)
{
auto fmt = boost::format("%s%d") % "AcceptEx failed! Error code:" % WSAGetLastError();
log.e(fmt.str());
}
}
}
这里我们从内存池中为”小纸条“申请内存,singleton_pool
中提供了 malloc
函数可以从内存池中申请内存,使用 is_from
函数可以确认该块内存是否是从该内存池中申请的。
接着初始化了”小纸条“,并为客户端先准备好套接字,原理在扩展函数那篇也已经讲过了,所以这里也不再赘述。
主要来看看扩展函数 AcceptEx
的使用:
auto ret = m_lpfnAcceptEx(m_servSock->hSock, // 服务端套接字
m_ioInfo->clntSock, // 客户机套接字
m_ioInfo->wsaBuf.buf, // 用于接收数据的缓冲区
m_ioInfo->wsaBuf.len - (sizeof(SOCKADDR_IN) + 16) * 2, // 若不希望AcceptEx建立连接后等待用户发送数据,则设为0
sizeof(SOCKADDR_IN) + 16, // 本地套接字地址结构大小
sizeof(SOCKADDR_IN) + 16, // 远程套接字地址结构大小
&dwBytes, // 实际收到的数据大小
&m_ioInfo->overlapped // 重叠结构
);
注释已经很清楚的解释了各个参数的意义,扩展函数 AcceptEx
是通过 WSAIoctl
函数获取到函数指针 m_lpfnAcceptEx
里的,这个函数是异步的,所以不会阻塞。
我们传入为每个客户端准备的”小纸条“(m_ioInfo
),系统去替我们执行耗时操作,处理完了它会在上面记下我们需要的内容,稍后我们可以在处理消息函数中通过 GetQueuedCompletionStatus
函数拿到。
处理用户请求
程序的主要逻辑都集中在了 RequestHandler
函数中,也是我们前面开启的线程函数,异步处理的消息我们都能在这里获得到。实现如下:
void network::IOCPServer::RequestHandler()
{
LPPER_HANDLE_DATA lpHandleInfo;
LPPER_IO_DATA lpIoInfo;
DWORD dwBytesTrans;
for (;;)
{
auto ret = GetQueuedCompletionStatus(m_hCompPort, &dwBytesTrans,
(LPDWORD)&lpHandleInfo, (LPOVERLAPPED*)&lpIoInfo, INFINITE);
if (ret == 0)
{
if (WSAGetLastError() == WAIT_TIMEOUT)
continue;
else
{
auto fmt = boost::format("%s%d") % "GetQueuedCompletionStatus failed! Error code:" % GetLastError();
log.e(fmt.str());
break;
}
}
std::lock_guard<std::mutex> lg(m_mtx);
if (dwBytesTrans == 0xFFFFFFFF)
{
break;
}
switch (lpIoInfo->opType)
{
case OP_TYPE::OP_ACCEPT:
AcceptHandler(lpIoInfo);
break;
case OP_TYPE::OP_READ:
{
if (dwBytesTrans == 0) // EOP
{
CloseSock(lpHandleInfo, lpIoInfo);
continue;
}
// 将客户端消息转发回去
std::string msg(lpIoInfo->buf);
log.i("received:" + msg);
SendMsg(lpHandleInfo->hSock, lpIoInfo, msg);
}
break;
case OP_TYPE::OP_WRITE:
RecvMsg(lpHandleInfo->hSock, lpIoInfo);
break;
}
}
}
关于IOCP的主要原理我们在实例一中已经写过了,所以这里不会再说重复的内容,主要来看有关扩展函数的部分。
通过 OP_ACCEPT
操作类型获取到连接用户的请求,这里的数据就是扩展函数 AcceptEx
投递过来的,对于连接进来的用户,我们专门用 AcceptHandler
函数来处理,其实现如下:
void network::IOCPServer::AcceptHandler(LPPER_IO_DATA lpIoInfo)
{
// 使客户端套接字具有和服务器套接相同的属性
setsockopt(lpIoInfo->clntSock, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT,
(char*)&lpIoInfo->clntSock, sizeof(SOCKET));
SOCKADDR_IN *lpLocalSockAddr = nullptr, *lpRemoteSockAddr = nullptr;
int LocalSockAddr, RemoteSockAddr;
m_lpfnGetAcceptExSockaddrs(lpIoInfo->wsaBuf.buf,
lpIoInfo->wsaBuf.len - (sizeof(SOCKADDR_IN) + 16) * 2,
sizeof(SOCKADDR_IN) + 16,
sizeof(SOCKADDR_IN) + 16,
(SOCKADDR**)&lpLocalSockAddr, &LocalSockAddr,
(SOCKADDR**)&lpRemoteSockAddr, &RemoteSockAddr
);
auto fmt = boost::format("%s%d") % "connected client..." % lpIoInfo->clntSock;
log.i(fmt.str());
// 从内存池分配内存
m_clntSock = (LPPER_HANDLE_DATA)handle_spl::malloc();
assert(handle_spl::is_from(m_clntSock));
// 写入连接进来的客户端套接字和地址
m_clntSock->hSock = lpIoInfo->clntSock;
memcpy(&m_clntSock->sockAddr, lpRemoteSockAddr, sizeof(SOCKADDR_IN));
// 连接客户端套接字与CP对象
CreateIoCompletionPort((HANDLE)m_clntSock->hSock, m_hCompPort, (ULONG_PTR)m_clntSock, NULL);
// 保存用户信息
m_vAcceptedSock.push_back(m_clntSock);
fmt = boost::format("%s%d") % "waiting for data arrived..." % m_clntSock->hSock;
log.i(fmt.str());
if (lpIoInfo->overlapped.InternalHigh != 0)
{
std::string msg(lpIoInfo->buf);
// 输出普通日志
log.i("received:" + msg);
SendMsg(m_clntSock->hSock, lpIoInfo, msg);
}
else
{
RecvMsg(m_clntSock->hSock, lpIoInfo);
}
// 处理好一位客户后,投递下一个连接等待
AcceptEx();
}
这里就相当于实例一中 accept
函数处理的那部分了,有用户连接进来就使用 GetAcceptExSockaddrs
获取到其地址信息,为其分配单句柄数据,再和CP对象连接起来,最后使用 vector
保存已连接用户的信息。
由于我们在扩展函数 AcceptEx
中设置了同时接受第一份数据,所以其实在连接时就已经获得了客户端发来的第一份数据,而这份数据就保存在"小纸条”中的缓冲区里,而大小保存在哪里呢?在重叠IO中我们就说边可以直接通过重叠结构的 InternalHigh
字段获取到的。若包含第一份数据,我们则输出该数据,然后再转发给客户端;若没有包含,我们就直接接收下一份数据。
处理好一位用户之后,再调用 AcceptEx
函数投递下一个连接等待,以此处理所有的用户请求。
接收与发送消息
这部分和实例一完全没有区别,便只列出实现。
void network::IOCPServer::RecvMsg(SOCKET sock, LPPER_IO_DATA lpIoInfo)
{
ZeroMemory(lpIoInfo, sizeof(PER_IO_DATA));
lpIoInfo->wsaBuf.buf = lpIoInfo->buf;
lpIoInfo->wsaBuf.len = BUF_SIZE;
lpIoInfo->opType = OP_TYPE::OP_READ;
DWORD recvBytes, flags = 0;
WSARecv(sock, &lpIoInfo->wsaBuf, 1, &recvBytes, &flags, &lpIoInfo->overlapped, NULL);
}
void network::IOCPServer::SendMsg(SOCKET sock, LPPER_IO_DATA lpIoInfo, const std::string& msg)
{
ZeroMemory(lpIoInfo, sizeof(PER_IO_DATA));
memcpy(lpIoInfo->buf, msg.c_str(), msg.size());
lpIoInfo->wsaBuf.buf = lpIoInfo->buf;
lpIoInfo->wsaBuf.len = msg.size();
lpIoInfo->opType = OP_TYPE::OP_WRITE;
WSASend(sock, &lpIoInfo->wsaBuf, 1, NULL, 0, &lpIoInfo->overlapped, NULL);
}
清除释放
CloseSock
函数也与实例一区别不大,只是这里释放内存是归还给内存池,而非直接 delete
。
void network::IOCPServer::CloseSock(LPPER_HANDLE_DATA lpHandleInfo, LPPER_IO_DATA lpIoInfo)
{
// 输出普通日志
auto fmt = boost::format("%s%d") % "disconnected client...." % lpHandleInfo->hSock;
log.i(fmt.str());
auto pos = std::find_if(m_vAcceptedSock.begin(), m_vAcceptedSock.end(),
[&](LPPER_HANDLE_DATA& hd) {
return hd->hSock == lpHandleInfo->hSock;
});
if (pos != m_vAcceptedSock.end())
{
closesocket((*pos)->hSock);
m_vAcceptedSock.erase(pos);
// 归还从内存池申请的内存
if(handle_spl::is_from(lpHandleInfo))
handle_spl::free(lpHandleInfo);
if(io_spl::is_from(lpIoInfo))
io_spl::free(lpIoInfo);
}
}
总结
本篇算是一个综合性较强的应用,若能全部理解,那么对于IOCP模型你就理解的差不多了。Windows的网络模型在这里也就全部介绍完了,后面主要介绍网络库的使用,而要理解网络库,必须得有这些知识做基础,这些后面再说吧。大家可以发现这个例子也仅仅是个基本的网络服务器雏形罢了,要想写一个完整的项目,随便都得上千行了,那其中还包含许多sql,加密解密,界面等等技术,要是游戏的服务器那涉及的就更多了。这些东西后面有时间我都会总结一些文章的,等介绍了更多基础东西我们再来写一个真正完整的项目吧。