跳至主要內容

最大空闲时间回收连接扫描线程的实现

张威大约 4 分钟mysqlmysql连接池

最大空闲时间回收连接扫描线程的实现

给对象添加一个属性-记录时间

/*
 * 实现MySQL数据库的操作
*/

#ifndef _CONNECTION_H
#define _CONNECTION_H
#include <string>
#include <mysql/mysql.h>
#include <ctime>

using std::string;

class Connection {
public:
    ...

	// 刷新一下连接的起始的空闲时间点
    void refreshAliveTime() {
        _aliveTime = clock();
    }
	// 返回存活的时间
    clock_t getAliveTime() {
        return clock() - _aliveTime;
    }

private:
    MYSQL* _conn;    //表示和MySQL的一条连接
    clock_t _aliveTime; // 记录进入空闲状态后的起始存活时间
};


#endif  // _CONNECTION_H

空闲时间-也就是进入队列了

#ifndef _CONNECTIONPOOL_H
#define _CONNECTIONPOOL_H

#include <string>
#include <queue>
#include <mutex>
#include <atomic>   //atomic_int  原子类型
#include <memory>	//shared_ptr
#include <thread>
#include <condition_variable>	
#include <functional>	//bind
#include "connection.h"

using std::string;
using std::queue;
using std::mutex;
using std::thread;
using std::atomic_int;
using std::shared_ptr;
using std::bind;
using std::condition_variable;
using std::unique_lock;
class ConnectionPool {
public:
	// 获取连接池对象实例
    static ConnectionPool* getConnectionPool();
	// 给外部提供接口,从连接池中获取一个可用的空闲连接
	shared_ptr<Connection> getConnection();
private:
    ConnectionPool();
    ~ConnectionPool();

	// 运行在独立的线程中,专门负责生产新连接
	//之所以写成成员函数而不是全局函数,是因为可以更好访问成员变量
	void produceConnectionTask();

	// 扫描超过maxIdleTime时间的空闲连接,进行对于的连接回收
	void scannerConnectionTask();

private:
	string _ip; // mysql的ip地址
	unsigned short _port; // mysql的端口号 3306
	string _username; // mysql登录用户名
	string _password; // mysql登录密码
	string _dbname; // 连接的数据库名称
	int _initSize; // 连接池的初始连接量
	int _maxSize; // 连接池的最大连接量
	int _maxIdleTime; // 连接池最大空闲时间
	int _connectionTimeout; // 连接池获取连接的超时时间
    
    queue<Connection*> _connectionQue;  // 存储mysql连接的队列
    mutex _queueMutex;  // 维护连接队列的线程安全互斥锁
    atomic_int _connectionCnt; // 记录连接所创建的connection连接的总数量 
	condition_variable cond;	// 设置条件变量,用于连接生产线程和连接消费线程的通信

};

#endif //_CONNECTIONPOOL_H

加入队列的连接需要刷新时间

  • 初始化时创建_initSize个连接
  • 生产者创建连接
  • 智能指针析构函数放回队列

线程处理函数则定时检查队头元素(FIFO,最先加入队列的元素)是否超时

#include "connectionPool.h"
#include "public.h"
#include <iostream>

using std::cout;
using std::endl;

ConnectionPool::ConnectionPool()
{
    if (!loadConfigFile()) { // 配置文件加载失败
        return;
    }

    // 创建初始的数量连接
    // 这一块是在连接池启动的时候做的,不用考虑线程安全
    for (int i = 0; i < _initSize; ++i) {
        Connection *conn = new Connection();
        conn->connection(_ip, _port, _username, _password, _dbname);
        conn->refreshAliveTime(); // 刷新一下开始空闲的起始时间
        _connectionQue.push(conn);
        _connectionCnt++;
    }

    // 启动一个新的线程,作为连接的生产者 linux thread => pthread_create
    // 因为本身还是c接口,所以线程处理函数void* (*)(void*)类型,需要使用bind
    thread produce(bind(&ConnectionPool::produceConnectionTask, this)); // 线程对象
    produce.detach();                                                   // 设置成分离线程

    // 启动一个新的定时线程,扫描超过maxIdleTime时间的空闲连接,进行对于的连接回收
    thread scanner(bind(&ConnectionPool::scannerConnectionTask, this));
    scanner.detach();
}

void ConnectionPool::produceConnectionTask()
{
    while (1) {
        unique_lock<mutex> lock(_queueMutex);
        while (!_connectionQue.empty()) { // 队列不为空
            cond.wait(lock);
        }
        if (_connectionCnt < _maxSize) {
            Connection *conn = new Connection();
            conn->connection(_ip, _port, _username, _password, _dbname);
            conn->refreshAliveTime(); // 刷新一下开始空闲的起始时间
            _connectionQue.push(conn);
            _connectionCnt++;
        }

        // 通知消费者线程,可以消费连接了
        cond.notify_all();
    }
}

// 扫描超过maxIdleTime时间的空闲连接,进行对于的连接回收
void ConnectionPool::scannerConnectionTask()
{
    while (1) {
        // 通过sleep模拟定时效果
        std::this_thread::sleep_for(std::chrono::seconds(_maxIdleTime));
       
       // 扫描整个队列,释放多余的连接
        while (_connectionCnt > _initSize) {
            Connection *p = _connectionQue.front();
            if (p->getAliveTime() > _maxIdleTime) {
                _connectionQue.pop();
                delete p;
                --_connectionCnt;   // 调用~Connection()释放连接
            }
            else {
                break;  // 队头的连接没有超过_maxIdleTime,其它连接肯定没有
            }
        }
    }
}


// 消费者线程函数,从队列中获取连接
shared_ptr<Connection> ConnectionPool::getConnection()
{
    unique_lock<mutex> lock(_queueMutex);
    while (_connectionQue.empty()) {
        if (std::cv_status::timeout == cond.wait_for(lock, std::chrono::milliseconds(_connectionTimeout))) {
            if (_connectionQue.empty()) {
                LOG("获取空闲连接超时了...获取连接失败!");
                return nullptr;
            }
        }
    }

    /*
    shared_ptr智能指针析构时,会把connection资源直接delete掉,相当于
    调用connection的析构函数,connection就被close掉了。
    这里需要自定义shared_ptr的释放资源的方式,把connection直接归还到queue当中
    */
    shared_ptr<Connection> sp(_connectionQue.front(),
                              [&](Connection *pconn) {
                                  // 这里是在服务器应用线程中调用的,所以一定要考虑队列的线程安全操作
                                  unique_lock<mutex> lock(_queueMutex);
                                  pconn->refreshAliveTime();
                                  _connectionQue.push(pconn);
                              });
    _connectionQue.pop();
    if (_connectionQue.empty()) {
        cond.notify_all();
    }

    return sp;
}

C++ 多线程detach()操作的坑以及传参_c++ thread detach-CSDN博客open in new window