生产者线程的实现
大约 3 分钟
生产者线程的实现
#ifndef _CONNECTIONPOOL_H
#define _CONNECTIONPOOL_H
#include <string>
#include <queue>
#include <mutex>
#include <atomic> //atomic_int 原子类型
#include <memory> //shared_ptr
#include <thread>
#include <condition_variable>
#include <functional> //bind
#include "connection.h"
using std::string;
using std::queue;
using std::mutex;
using std::thread;
using std::atomic_int;
using std::shared_ptr;
using std::bind;
using std::condition_variable;
using std::unique_lock;
class ConnectionPool {
public:
// 获取连接池对象实例
static ConnectionPool* getConnectionPool();
// 给外部提供接口,从连接池中获取一个可用的空闲连接
shared_ptr<Connection> getConnection();
private:
ConnectionPool();
~ConnectionPool();
#ifdef TEST_LOAD_CONFIG_FILE
public: //测试的时候可以先变成共有的
#endif //TEST_LOAD_CONFIG_FILE
bool loadConfigFile();
// 运行在独立的线程中,专门负责生产新连接
//之所以写成成员函数而不是全局函数,是因为可以更好访问成员变量
void produceConnectionTask();
private:
string _ip; // mysql的ip地址
unsigned short _port; // mysql的端口号 3306
string _username; // mysql登录用户名
string _password; // mysql登录密码
string _dbname; // 连接的数据库名称
int _initSize; // 连接池的初始连接量
int _maxSize; // 连接池的最大连接量
int _maxIdleTime; // 连接池最大空闲时间
int _connectionTimeout; // 连接池获取连接的超时时间
queue<Connection*> _connectionQue; // 存储mysql连接的队列
mutex _queueMutex; // 维护连接队列的线程安全互斥锁
atomic_int _connectionCnt; // 记录连接所创建的connection连接的总数量
condition_variable cond; // 设置条件变量,用于连接生产线程和连接消费线程的通信
};
#endif //_CONNECTIONPOOL_H
#include "connectionPool.h"
#include <iostream>
#include "public.h"
using std::cout;
using std::endl;
// 线程安全的懒汉单例函数接口
ConnectionPool* ConnectionPool::getConnectionPool() { //静态函数的实现,不写static
static ConnectionPool pool; //静态局部变量的初始化,编译器会生成lock和unlock
return &pool;
}
// 从配置文件中加载配置项
bool ConnectionPool::loadConfigFile() {
FILE* pf = fopen("mysql.cnf","r");
if(pf == nullptr) {
LOG("mysql.cnf file is not exit!");
return false;
}
while(!feof(pf)) { //文件不为空
char line[1024] = {0};
fgets(line, sizeof(line), pf);
string str = line;
int idx = str.find('=', 0); //从第0位开始找'='的下标
if(idx == -1) { //无效的配置项
continue;
}
// password=123456\n
int endidx = str.find('\n', idx); //从第idx位开始找'\n'的下标
string key = str.substr(0,idx); //从第0位开始idx个字符
string value = str.substr(idx + 1, endidx - (idx + 1));
#ifdef DEBUG
cout << key << '=' << value << endl;
#endif //DEBUG
if(key == "ip") {
_ip = value;
}
else if(key == "port") {
_port = atoi(value.c_str()); //string->const char* ->int
}
else if(key == "username") {
_username = value;
}
else if(key == "password") {
_password = value;
}
else if(key == "dbname") {
_dbname = value;
}
else if(key == "initSize") {
_initSize = atoi(value.c_str());
}
else if(key == "maxSize") {
_maxSize = atoi(value.c_str());
}
else if(key == "maxIdleTime") {
_maxIdleTime = atoi(value.c_str());
}
else if(key == "connectionTimeOut") {
_connectionTimeout = atoi(value.c_str());
}
}
return true;
}
ConnectionPool::ConnectionPool() {
if(!loadConfigFile()) { //配置文件加载失败
return;
}
//创建初始的数量连接
//这一块是在连接池启动的时候做的,不用考虑线程安全
for(int i = 0; i < _initSize; ++i) {
Connection* conn = new Connection();
conn->connection(_ip, _port, _username, _password, _dbname);
_connectionQue.push(conn);
_connectionCnt++;
}
// 启动一个新的线程,作为连接的生产者 linux thread => pthread_create
// 因为本身还是c接口,所以线程处理函数void* (*)(void*)类型,需要使用bind
thread produce(bind(&ConnectionPool::produceConnectionTask,this)); //线程对象
}
//线程处理函数,生产连接池
void ConnectionPool::produceConnectionTask() {
while(1) {
unique_lock<mutex> lock(_queueMutex);
while(!_connectionQue.empty()) { //队列不为空
cond.wait(lock);
}
if(_connectionCnt < _maxSize) {
Connection* conn = new Connection();
conn->connection(_ip, _port, _username, _password, _dbname);
_connectionQue.push(conn);
_connectionCnt++;
}
// 通知消费者线程,可以消费连接了
cond.notify_all();
}
}
ConnectionPool::~ConnectionPool() {}
shared_ptr<Connection> ConnectionPool::getConnection() {
}