跳至主要內容

生产者线程的实现

张威大约 3 分钟mysqlmysql连接池

生产者线程的实现

#ifndef _CONNECTIONPOOL_H
#define _CONNECTIONPOOL_H

#include <string>
#include <queue>
#include <mutex>
#include <atomic>   //atomic_int  原子类型
#include <memory>	//shared_ptr
#include <thread>
#include <condition_variable>	
#include <functional>	//bind
#include "connection.h"

using std::string;
using std::queue;
using std::mutex;
using std::thread;
using std::atomic_int;
using std::shared_ptr;
using std::bind;
using std::condition_variable;
using std::unique_lock;
class ConnectionPool {
public:
	// 获取连接池对象实例
    static ConnectionPool* getConnectionPool();
	// 给外部提供接口,从连接池中获取一个可用的空闲连接
	shared_ptr<Connection> getConnection();
private:
    ConnectionPool();
    ~ConnectionPool();

#ifdef TEST_LOAD_CONFIG_FILE
public: //测试的时候可以先变成共有的
#endif	//TEST_LOAD_CONFIG_FILE
    bool loadConfigFile();
	
	// 运行在独立的线程中,专门负责生产新连接
	//之所以写成成员函数而不是全局函数,是因为可以更好访问成员变量
	void produceConnectionTask();

private:
	string _ip; // mysql的ip地址
	unsigned short _port; // mysql的端口号 3306
	string _username; // mysql登录用户名
	string _password; // mysql登录密码
	string _dbname; // 连接的数据库名称
	int _initSize; // 连接池的初始连接量
	int _maxSize; // 连接池的最大连接量
	int _maxIdleTime; // 连接池最大空闲时间
	int _connectionTimeout; // 连接池获取连接的超时时间
    
    queue<Connection*> _connectionQue;  // 存储mysql连接的队列
    mutex _queueMutex;  // 维护连接队列的线程安全互斥锁
    atomic_int _connectionCnt; // 记录连接所创建的connection连接的总数量 
	condition_variable cond;	// 设置条件变量,用于连接生产线程和连接消费线程的通信

};

#endif //_CONNECTIONPOOL_H

C++多线程pthread和thread-CSDN博客open in new window

#include "connectionPool.h"
#include <iostream>
#include "public.h"

using std::cout;
using std::endl;


// 线程安全的懒汉单例函数接口
ConnectionPool* ConnectionPool::getConnectionPool() { //静态函数的实现,不写static
    static ConnectionPool pool; //静态局部变量的初始化,编译器会生成lock和unlock
    return &pool;
}
// 从配置文件中加载配置项
bool ConnectionPool::loadConfigFile() {
    FILE* pf = fopen("mysql.cnf","r");
    if(pf == nullptr) {
        LOG("mysql.cnf file is not exit!");
        return false;
    }

    while(!feof(pf)) {  //文件不为空
        char line[1024] = {0};
        fgets(line, sizeof(line), pf);
        string str = line;
        int idx = str.find('=', 0); //从第0位开始找'='的下标
        if(idx == -1) {  //无效的配置项
            continue;
        }

        // password=123456\n
        int endidx = str.find('\n', idx); //从第idx位开始找'\n'的下标

        string key = str.substr(0,idx); //从第0位开始idx个字符
        string value = str.substr(idx + 1, endidx - (idx + 1));
#ifdef DEBUG
        cout << key << '=' << value << endl;
#endif //DEBUG
        if(key == "ip") {
            _ip = value;
        }
        else if(key == "port") {
            _port = atoi(value.c_str());    //string->const char* ->int
        }
        else if(key == "username") {
            _username = value;
        }
        else if(key == "password") {
            _password = value;
        }
        else if(key == "dbname") {
            _dbname = value;
        }
        else if(key == "initSize") {
            _initSize = atoi(value.c_str());
        }
        else if(key == "maxSize") {
            _maxSize = atoi(value.c_str());
        }
        else if(key == "maxIdleTime") {
            _maxIdleTime = atoi(value.c_str());
        }
        else if(key == "connectionTimeOut") {
            _connectionTimeout = atoi(value.c_str());
        }
    }
    
    return true;
}

ConnectionPool::ConnectionPool() {
    if(!loadConfigFile()) { //配置文件加载失败
        return;
    }

    //创建初始的数量连接
    //这一块是在连接池启动的时候做的,不用考虑线程安全
    for(int i = 0; i < _initSize; ++i) {
        Connection* conn = new Connection();
        conn->connection(_ip, _port, _username, _password, _dbname);
        _connectionQue.push(conn);
        _connectionCnt++;
    }

    // 启动一个新的线程,作为连接的生产者 linux thread => pthread_create
    // 因为本身还是c接口,所以线程处理函数void* (*)(void*)类型,需要使用bind
    thread produce(bind(&ConnectionPool::produceConnectionTask,this));  //线程对象

}

//线程处理函数,生产连接池
void ConnectionPool::produceConnectionTask() {
    while(1) {
        unique_lock<mutex> lock(_queueMutex);
        while(!_connectionQue.empty()) {    //队列不为空
            cond.wait(lock);
        }   
        if(_connectionCnt < _maxSize) { 
            Connection* conn = new Connection();
            conn->connection(_ip, _port, _username, _password, _dbname);
            _connectionQue.push(conn);
            _connectionCnt++;
        }
       
        // 通知消费者线程,可以消费连接了
        cond.notify_all();
    }
}

ConnectionPool::~ConnectionPool() {}


shared_ptr<Connection> ConnectionPool::getConnection() {
        
}