跳至主要內容

连接池压力测试

张威大约 6 分钟mysqlmysql连接池

连接池压力测试

连接池代码

mysql.cnf配置文件

#数据库连接池的配置文件,下面和宏一样就不要加多余的空格了
ip=127.0.0.1
port=3306
username=root
password=147258
dbname=chat
initSize=10
maxSize=1024
// 连接池最大空闲时间默认单位是秒
maxIdleTime=60
// 连接池获取连接的超时时间单位是毫秒
connectionTimeOut=100

public.h公共头文件

/*
 * 这是一个公共的头文件
*/
#ifndef _PUBLIC_H
#define _PUBLIC_H



#define LOG(str) \
    cout << __FILE__ << ":" << __LINE__ << " " <<   \
    __TIMESTAMP__ << ":" << str << endl;


#endif //_PUBLIC_H

connection.h数据库操作头文件

/*
 * 实现MySQL数据库的操作
*/

#ifndef _CONNECTION_H
#define _CONNECTION_H
#include <string>
#include <mysql/mysql.h>
#include <ctime>

using std::string;

class Connection {
public:
    Connection();
    ~Connection();

    //数据库的连接操作
    bool connection(string ip, unsigned short port, string user, string passward, string dbname);
    
    //更新操作 insert delete update
    bool update(string sql);

    //查询操作select
    MYSQL_RES* query(string sql);

	// 刷新一下连接的起始的空闲时间点
    void refreshAliveTime() {
        _aliveTime = clock();
    }
	// 返回存活的时间
    clock_t getAliveTime() {
        return clock() - _aliveTime;
    }

private:
    MYSQL* _conn;    //表示和MySQL的一条连接
    clock_t _aliveTime; // 记录进入空闲状态后的起始存活时间
};


#endif  // _CONNECTION_H

connection.cc数据库操作的封装实现

#include <iostream>
#include "connection.h"
#include "public.h"

using std::cout;
using std::endl;

Connection::Connection() {
    //初始化
    _conn = mysql_init(nullptr);
}

//释放资源
Connection::~Connection() {
    if(_conn != nullptr) {
        mysql_close(_conn);
    }
}

bool Connection::connection(string ip, unsigned short port, 
string user, string passward, string dbname) {
    //建立连接
    MYSQL* p = mysql_real_connect(_conn, ip.c_str(), user.c_str(),
     passward.c_str(),dbname.c_str(), port, nullptr, 0);
     return p != nullptr;
}

bool Connection::update(string sql) {
    //更新操作 insert delete update
    if(mysql_query(_conn, sql.c_str())) {
        LOG("更新失败:" +  sql);
        return false;
    }
    return true;
}

MYSQL_RES* Connection::query(string sql) {
    if(mysql_query(_conn, sql.c_str())) {
        LOG("查询失败" + sql);
    }
    return mysql_use_result(_conn);
}

connectionPool.h连接池的头文件

#ifndef _CONNECTIONPOOL_H
#define _CONNECTIONPOOL_H

#include <string>
#include <queue>
#include <mutex>
#include <atomic>   //atomic_int  原子类型
#include <memory>	//shared_ptr
#include <thread>
#include <condition_variable>	
#include <functional>	//bind
#include "connection.h"

using std::string;
using std::queue;
using std::mutex;
using std::thread;
using std::atomic_int;
using std::shared_ptr;
using std::bind;
using std::condition_variable;
using std::unique_lock;
class ConnectionPool {
public:
	// 获取连接池对象实例
    static ConnectionPool* getConnectionPool();
	// 给外部提供接口,从连接池中获取一个可用的空闲连接
	shared_ptr<Connection> getConnection();
private:
    ConnectionPool();
    ~ConnectionPool();

#ifdef TEST_LOAD_CONFIG_FILE
public: //测试的时候可以先变成共有的
#endif	//TEST_LOAD_CONFIG_FILE
    bool loadConfigFile();
	
	// 运行在独立的线程中,专门负责生产新连接
	//之所以写成成员函数而不是全局函数,是因为可以更好访问成员变量
	void produceConnectionTask();

	// 扫描超过maxIdleTime时间的空闲连接,进行对于的连接回收
	void scannerConnectionTask();

private:
	string _ip; // mysql的ip地址
	unsigned short _port; // mysql的端口号 3306
	string _username; // mysql登录用户名
	string _password; // mysql登录密码
	string _dbname; // 连接的数据库名称
	int _initSize; // 连接池的初始连接量
	int _maxSize; // 连接池的最大连接量
	int _maxIdleTime; // 连接池最大空闲时间
	int _connectionTimeout; // 连接池获取连接的超时时间
    
    queue<Connection*> _connectionQue;  // 存储mysql连接的队列
    mutex _queueMutex;  // 维护连接队列的线程安全互斥锁
    atomic_int _connectionCnt; // 记录连接所创建的connection连接的总数量 
	condition_variable cond;	// 设置条件变量,用于连接生产线程和连接消费线程的通信

};

#endif //_CONNECTIONPOOL_H

connectionPool.cc连接池的实现

#include "connectionPool.h"
#include "public.h"
#include <iostream>

using std::cout;
using std::endl;

// 线程安全的懒汉单例函数接口
ConnectionPool *ConnectionPool::getConnectionPool()
{                               // 静态函数的实现,不写static
    static ConnectionPool pool; // 静态局部变量的初始化,编译器会生成lock和unlock
    return &pool;
}
// 从配置文件中加载配置项
bool ConnectionPool::loadConfigFile()
{
    FILE *pf = fopen("mysql.cnf", "r");
    if (pf == nullptr) {
        LOG("mysql.cnf file is not exit!");
        return false;
    }

    while (!feof(pf)) { // 文件不为空
        char line[1024] = {0};
        fgets(line, sizeof(line), pf);
        string str = line;
        int idx = str.find('=', 0); // 从第0位开始找'='的下标
        if (idx == -1) {            // 无效的配置项
            continue;
        }

        // password=123456\n
        int endidx = str.find('\n', idx); // 从第idx位开始找'\n'的下标

        string key = str.substr(0, idx); // 从第0位开始idx个字符
        string value = str.substr(idx + 1, endidx - (idx + 1));
#ifdef DEBUG
        cout << key << '=' << value << endl;
#endif // DEBUG
        if (key == "ip") {
            _ip = value;
        } else if (key == "port") {
            _port = atoi(value.c_str()); // string->const char* ->int
        } else if (key == "username") {
            _username = value;
        } else if (key == "password") {
            _password = value;
        } else if (key == "dbname") {
            _dbname = value;
        } else if (key == "initSize") {
            _initSize = atoi(value.c_str());
        } else if (key == "maxSize") {
            _maxSize = atoi(value.c_str());
        } else if (key == "maxIdleTime") {
            _maxIdleTime = atoi(value.c_str());
        } else if (key == "connectionTimeOut") {
            _connectionTimeout = atoi(value.c_str());
        }
    }

    return true;
}

ConnectionPool::ConnectionPool()
{
    if (!loadConfigFile()) { // 配置文件加载失败
        return;
    }

    // 创建初始的数量连接
    // 这一块是在连接池启动的时候做的,不用考虑线程安全
    for (int i = 0; i < _initSize; ++i) {
        Connection *conn = new Connection();
        conn->connection(_ip, _port, _username, _password, _dbname);
        conn->refreshAliveTime(); // 刷新一下开始空闲的起始时间
        _connectionQue.push(conn);
        _connectionCnt++;
    }

    // 启动一个新的线程,作为连接的生产者 linux thread => pthread_create
    // 因为本身还是c接口,所以线程处理函数void* (*)(void*)类型,需要使用bind
    thread produce(bind(&ConnectionPool::produceConnectionTask, this)); // 线程对象
    produce.detach();                                                   // 设置成分离线程

    // 启动一个新的定时线程,扫描超过maxIdleTime时间的空闲连接,进行对于的连接回收
    thread scanner(bind(&ConnectionPool::scannerConnectionTask, this));
    scanner.detach();
}

void ConnectionPool::produceConnectionTask()
{
    while (1) {
        unique_lock<mutex> lock(_queueMutex);
        while (!_connectionQue.empty()) { // 队列不为空
            cond.wait(lock);
        }
        if (_connectionCnt < _maxSize) {
            Connection *conn = new Connection();
            conn->connection(_ip, _port, _username, _password, _dbname);
            conn->refreshAliveTime(); // 刷新一下开始空闲的起始时间
            _connectionQue.push(conn);
            _connectionCnt++;
        }

        // 通知消费者线程,可以消费连接了
        cond.notify_all();
    }
}
// 扫描超过maxIdleTime时间的空闲连接,进行对于的连接回收
void ConnectionPool::scannerConnectionTask()
{
    while (1) {
        // 通过sleep模拟定时效果
        std::this_thread::sleep_for(std::chrono::seconds(_maxIdleTime));
       
       // 扫描整个队列,释放多余的连接
        while (_connectionCnt > _initSize) {
            Connection *p = _connectionQue.front();
            if (p->getAliveTime() > _maxIdleTime) {
                _connectionQue.pop();
                delete p;
                --_connectionCnt;   // 调用~Connection()释放连接
            }
            else {
                break;  // 队头的连接没有超过_maxIdleTime,其它连接肯定没有
            }
        }
    }
}

ConnectionPool::~ConnectionPool() {
    //{
        std::unique_lock<mutex> lock(_queueMutex);
        while(!_connectionQue.empty()) {
            delete _connectionQue.front();
            _connectionQue.pop();
        }
    //}   
}

// 消费者线程函数,从队列中获取连接
shared_ptr<Connection> ConnectionPool::getConnection()
{
    unique_lock<mutex> lock(_queueMutex);
    while (_connectionQue.empty()) {
        if (std::cv_status::timeout == cond.wait_for(lock, std::chrono::milliseconds(_connectionTimeout))) {
            if (_connectionQue.empty()) {
                LOG("获取空闲连接超时了...获取连接失败!");
                return nullptr;
            }
        }
    }

    /*
    shared_ptr智能指针析构时,会把connection资源直接delete掉,相当于
    调用connection的析构函数,connection就被close掉了。
    这里需要自定义shared_ptr的释放资源的方式,把connection直接归还到queue当中
    */
    shared_ptr<Connection> sp(_connectionQue.front(),
                              [&](Connection *pconn) {
                                  // 这里是在服务器应用线程中调用的,所以一定要考虑队列的线程安全操作
                                  unique_lock<mutex> lock(_queueMutex);
                                  pconn->refreshAliveTime();
                                  _connectionQue.push(pconn);
                              });
    _connectionQue.pop();
    if (_connectionQue.empty()) {
        cond.notify_all();
    }

    return sp;
}

压力测试

压力测试是设计优化类项目所必须的

单线程

const int M = 5000;
    
clock_t begin = clock();
for (int i = 0; i < M; ++i) {
    // 不使用连接池的测试
    Connection conn;
    conn.connection("127.0.0.1", 3306, "root", "147258", "chat");
    conn.update(sql);
}
clock_t end = clock();
cout << "1. 未使用连接池耗时:" << end - begin << " ms" << endl;

// 使用连接池的测试
begin = clock();
ConnectionPool* pool = ConnectionPool::getConnectionPool();
for (size_t i = 0; i < M; i++) {
    shared_ptr<Connection> conn = pool->getConnection();
    conn->update(sql);
}
end = clock();
cout << "2. 使用连接池耗时:" << end - begin << " ms" << endl;
void func1()
{
    char sql[1024] = {0};
    sprintf(sql, "insert into user(name, age, sex) values('%s', %d, '%s')",
            "guan yu1", 34, "M");
    for (int i = 0; i < 250; ++i) {
        // 不使用连接池的测试
        Connection conn;
        conn.connection("127.0.0.1", 3306, "root", "147258", "chat");
        conn.update(sql);
    }
}

void func2()
{
    char sql[1024] = {0};
    sprintf(sql, "insert into user(name, age, sex) values('%s', %d, '%s')",
            "guan yu1", 34, "M");
    ConnectionPool* pool = ConnectionPool::getConnectionPool();
    for (size_t i = 0; i < 250; i++) {
        shared_ptr<Connection> conn = pool->getConnection();
        conn->update(sql);
    }
}

int main() {
    //
    Connection conn;
    conn.connection("127.0.0.1", 3306, "root", "147258", "chat");
    
    clock_t begin = clock();
    thread t1(func1);
    thread t2(func1);
    thread t3(func1);
    thread t4(func1);
    t1.join();
    t2.join();
    t3.join();
    t4.join();
    clock_t end = clock();
    cout << end - begin << endl;
    return 0;
}