最大空闲时间回收连接扫描线程的实现
大约 4 分钟
最大空闲时间回收连接扫描线程的实现
给对象添加一个属性-记录时间
/*
* 实现MySQL数据库的操作
*/
#ifndef _CONNECTION_H
#define _CONNECTION_H
#include <string>
#include <mysql/mysql.h>
#include <ctime>
using std::string;
class Connection {
public:
...
// 刷新一下连接的起始的空闲时间点
void refreshAliveTime() {
_aliveTime = clock();
}
// 返回存活的时间
clock_t getAliveTime() {
return clock() - _aliveTime;
}
private:
MYSQL* _conn; //表示和MySQL的一条连接
clock_t _aliveTime; // 记录进入空闲状态后的起始存活时间
};
#endif // _CONNECTION_H
空闲时间-也就是进入队列了
#ifndef _CONNECTIONPOOL_H
#define _CONNECTIONPOOL_H
#include <string>
#include <queue>
#include <mutex>
#include <atomic> //atomic_int 原子类型
#include <memory> //shared_ptr
#include <thread>
#include <condition_variable>
#include <functional> //bind
#include "connection.h"
using std::string;
using std::queue;
using std::mutex;
using std::thread;
using std::atomic_int;
using std::shared_ptr;
using std::bind;
using std::condition_variable;
using std::unique_lock;
class ConnectionPool {
public:
// 获取连接池对象实例
static ConnectionPool* getConnectionPool();
// 给外部提供接口,从连接池中获取一个可用的空闲连接
shared_ptr<Connection> getConnection();
private:
ConnectionPool();
~ConnectionPool();
// 运行在独立的线程中,专门负责生产新连接
//之所以写成成员函数而不是全局函数,是因为可以更好访问成员变量
void produceConnectionTask();
// 扫描超过maxIdleTime时间的空闲连接,进行对于的连接回收
void scannerConnectionTask();
private:
string _ip; // mysql的ip地址
unsigned short _port; // mysql的端口号 3306
string _username; // mysql登录用户名
string _password; // mysql登录密码
string _dbname; // 连接的数据库名称
int _initSize; // 连接池的初始连接量
int _maxSize; // 连接池的最大连接量
int _maxIdleTime; // 连接池最大空闲时间
int _connectionTimeout; // 连接池获取连接的超时时间
queue<Connection*> _connectionQue; // 存储mysql连接的队列
mutex _queueMutex; // 维护连接队列的线程安全互斥锁
atomic_int _connectionCnt; // 记录连接所创建的connection连接的总数量
condition_variable cond; // 设置条件变量,用于连接生产线程和连接消费线程的通信
};
#endif //_CONNECTIONPOOL_H
加入队列的连接需要刷新时间
- 初始化时创建_initSize个连接
- 生产者创建连接
- 智能指针析构函数放回队列
线程处理函数则定时检查队头元素(FIFO,最先加入队列的元素)是否超时
#include "connectionPool.h"
#include "public.h"
#include <iostream>
using std::cout;
using std::endl;
ConnectionPool::ConnectionPool()
{
if (!loadConfigFile()) { // 配置文件加载失败
return;
}
// 创建初始的数量连接
// 这一块是在连接池启动的时候做的,不用考虑线程安全
for (int i = 0; i < _initSize; ++i) {
Connection *conn = new Connection();
conn->connection(_ip, _port, _username, _password, _dbname);
conn->refreshAliveTime(); // 刷新一下开始空闲的起始时间
_connectionQue.push(conn);
_connectionCnt++;
}
// 启动一个新的线程,作为连接的生产者 linux thread => pthread_create
// 因为本身还是c接口,所以线程处理函数void* (*)(void*)类型,需要使用bind
thread produce(bind(&ConnectionPool::produceConnectionTask, this)); // 线程对象
produce.detach(); // 设置成分离线程
// 启动一个新的定时线程,扫描超过maxIdleTime时间的空闲连接,进行对于的连接回收
thread scanner(bind(&ConnectionPool::scannerConnectionTask, this));
scanner.detach();
}
void ConnectionPool::produceConnectionTask()
{
while (1) {
unique_lock<mutex> lock(_queueMutex);
while (!_connectionQue.empty()) { // 队列不为空
cond.wait(lock);
}
if (_connectionCnt < _maxSize) {
Connection *conn = new Connection();
conn->connection(_ip, _port, _username, _password, _dbname);
conn->refreshAliveTime(); // 刷新一下开始空闲的起始时间
_connectionQue.push(conn);
_connectionCnt++;
}
// 通知消费者线程,可以消费连接了
cond.notify_all();
}
}
// 扫描超过maxIdleTime时间的空闲连接,进行对于的连接回收
void ConnectionPool::scannerConnectionTask()
{
while (1) {
// 通过sleep模拟定时效果
std::this_thread::sleep_for(std::chrono::seconds(_maxIdleTime));
// 扫描整个队列,释放多余的连接
while (_connectionCnt > _initSize) {
Connection *p = _connectionQue.front();
if (p->getAliveTime() > _maxIdleTime) {
_connectionQue.pop();
delete p;
--_connectionCnt; // 调用~Connection()释放连接
}
else {
break; // 队头的连接没有超过_maxIdleTime,其它连接肯定没有
}
}
}
}
// 消费者线程函数,从队列中获取连接
shared_ptr<Connection> ConnectionPool::getConnection()
{
unique_lock<mutex> lock(_queueMutex);
while (_connectionQue.empty()) {
if (std::cv_status::timeout == cond.wait_for(lock, std::chrono::milliseconds(_connectionTimeout))) {
if (_connectionQue.empty()) {
LOG("获取空闲连接超时了...获取连接失败!");
return nullptr;
}
}
}
/*
shared_ptr智能指针析构时,会把connection资源直接delete掉,相当于
调用connection的析构函数,connection就被close掉了。
这里需要自定义shared_ptr的释放资源的方式,把connection直接归还到queue当中
*/
shared_ptr<Connection> sp(_connectionQue.front(),
[&](Connection *pconn) {
// 这里是在服务器应用线程中调用的,所以一定要考虑队列的线程安全操作
unique_lock<mutex> lock(_queueMutex);
pconn->refreshAliveTime();
_connectionQue.push(pconn);
});
_connectionQue.pop();
if (_connectionQue.empty()) {
cond.notify_all();
}
return sp;
}