yield-bytes

沉淀、分享与无限进步

使用连接池方式和多线程方式连接mysql的测试说明

  前面文章讨论了mysql做高可用的配置,参考文章链接,而本文则是开发项目过程需要用的部分,从配置数据库到实用数据库,以及再用SQL做BI分析再到SQL优化,这些都是全栈工程师的基本功。

1、连接池测试mysql默认连接配置

  先出简单的测试连接池或多线程并发的脚本,这里先借用DBUtils创建连接池,文章后面会给出无须借用第三方库也可以实现实用的连接池。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
import time
import pymysql
import threading
from DBUtils.PooledDB import PooledDB

db_info={
'host':'192.168.100.5',
'port':34312, # 改掉默认端口号,安全考虑
'user':'***',
'password':'***',
'db':'erp_app',
'charset':'utf8'
}


db_pool = PooledDB(
creator=pymysql,
maxconnections=3000, # 连接池允许的最大连接数,0和None表示不限制连接数
mincached=0, # 初始化时,连接池中至少创建的空闲的链接,0表示不创建
maxcached=0, # 连接池中最多闲置的连接,0和None不限制
maxshared=0, # 连接池中最多共享的连接数量,0和None表示全部共享。
blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True
maxusage=None, # 一个连接最多被重复使用的次数,None表示无限制
**db_info # 数据库账户等信息
)

# 测试插入数据
insert_sql = ("insert into apps_name "
"(id,app_log_name,log_path,log_date) "
"values(null,%(app_log_name)s,%(log_path)s,null)")
insert_data={
'app_log_name':'BI-Access-Log',
'log_path':'/opt/data/apps_log/'
}


def save_data(mode,inst_sql,inst_data):
'''
m:多线程模式,c:连接池模式
'''
if mode == 'm':
conn=pymysql.connect(**db_info)
else:
conn = db_pool.connection()
try:
with conn.cursor() as cur:
resl=cur.execute(inst_sql,inst_data)
conn.commit()
except pymysql.MySQLError as err:
conn.rollback()
finally:
# PooledDB连接池关闭方法其实不是真的把该连接关闭,而是将该连接由放入池的队列里,在后文会看到该逻辑的实现
conn.close()


def multi_insert(mode,nums=151):
treads=[]
for i in range(nums):
t=threading.Thread(target=save_data,args=(mode,insert_sql,insert_data))
treads.append(t)
for t in treads:
t.start()
for t in treads:
t.join()

def run(mode,request_nums):
start=time.time()
multi_insert(mode,request_nums)
end=time.time()
cost=end-start
print('cost:{0:.3} s'.format(cost))


if __name__=='__main__':
run('c',100000)

查看mariadb默认设置的最大连接数为151

1
2
3
4
5
6
7
8
9
10
MariaDB [erp_app]> show variables like '%max_con%';
+---------------------------------------+-------+
| Variable_name | Value |
+---------------------------------------+-------+
| extra_max_connections | 1 |
| max_connect_errors | 100 |
| max_connections | 151 |
| performance_schema_max_cond_classes | 80 |
| performance_schema_max_cond_instances | -1 |
+---------------------------------------+-------+

  运行测试脚本,注意这里是连接池10万个连接的并发,对于server端的mysql来说,也就是同时有200个并发connections,就已经出错了,这就是模拟了客户端多线程大量并发消耗完mysql 最大连接资源引起error

1
pymysql.err.OperationalError: (2003, "Can't connect to MySQL server on '192.168.100.5' ([Errno 24] Too many open files)")

2、高并发连接数据库出错分析与测试

  出现以上情况,可通过设置mysql max_connections最大值,来保证并发量,测试mysql在有限物理资源条件下可达到的最大连接数,随便设一个大值例如10000000,最后可mysql单机可设定的最大连接数为10万

1
2
3
4
5
6
7
8
9
10
11
12
13
14
MariaDB [erp_app]> set GLOBAL max_connections=10000000;
Query OK, 0 rows affected, 1 warning (0.000 sec)

MariaDB [erp_app]> show variables like '%max_con%';
+---------------------------------------+--------+
| Variable_name | Value |
+---------------------------------------+--------+
| extra_max_connections | 1 |
| max_connect_errors | 100 |
| max_connections | 100000 |
| performance_schema_max_cond_classes | 80 |
| performance_schema_max_cond_instances | -1 |
+---------------------------------------+--------+
5 rows in set (0.001 sec)

  在这里,==使用多线程方式==,测试脚本发起10万个线程并发请求,运行后程序很快出现socket请求打满客户端系统缓冲区,导致系统级别出错,如下
1
2
3
pymysql.err.OperationalError: (2003, "Can't connect to MySQL server on '192.168.
100.5' ([WinError 10055] 由于系统缓冲区空间不足或队列已满,不能执行套接字上的操
作。)")

再查看服务器响应的最大连接数,成功连接仅有745个

1
2
3
4
5
6
7
MariaDB [erp_app]> show global status like 'Max_used_connections';
+----------------------+-------+
| Variable_name | Value |
+----------------------+-------+
| Max_used_connections | 745 |
+----------------------+-------+
1 row in set (0.001 sec)

3、为何选用连接池优化连接?

  从第二部分的测试可知,因为每个线程创建单独的连接,当并发量大时,会造成client和msyql server之间的频繁“线程创建tcp连接-登录-线程退出关闭tcp连接”,
若采用连接池方式连接mysql,则是重用数据库服务端的tcp通道,以达到client和MySQL之间只需维持较少的连接,单个客户端可以提高其并发量,且消耗较低的物理资源。

打个不一定恰当的通俗比喻:

有10000辆车同时要从A点到达B点,出发前,A、B之间没有路,需要先搭建

1)多线程方式:需要1000个路面施工队同时搭建完1000条“高速路”后,才能同时出发,可见需要消耗非常多资源(10000个施工队以及10000条高速路资源),等10000辆车到达B点后,10000个施工队又得去拆除10000条高速路,非常耗资源

2)连接池方式(假设连接池大小为1000):需要1000个路面施工队同时搭建完1000条“高速路”后,前面1000辆车到达B点,后面9000辆车出发时,施工队不需要再新建高速路,继续重用前面搭建的1000条“高速路”,极大降低的物理资源浪费。

连接池重要两个逻辑:

  • 在程序创建连接的时候,可以从连接池队列中取出一个空闲的连接,不需要重新初始化连接,提升获取连接的速度

  • 关闭连接的时候,把连接放回存放连接池的队列中,而不是真正的关闭,所以可以减少频繁地打开和关闭tcp通道

4、继续测试两种连接效果

  mysql的默认最大连接数已设置为1万个
1)开启2000个线程,重复1次,使用多线程并发,不出意外,mysql接收到已用连接数为1022个,之后的请求连接全部error 中断

1
2
3
4
5
6
7
8
9
10
11
12
(py36) [root@localhost opt]# python insert_test.py
cost:1.99 s

pymysql.err.OperationalError: (2003, "Can't connect to MySQL server on '192.168.100.4' ([Errno 24] Too many open files)")

MariaDB [(none)]> show global status like 'Max_used_connections';
+----------------------+-------+
| Variable_name | Value |
+----------------------+-------+
| Max_used_connections | 1022 |
+----------------------+-------+
1 row in set (0.000 sec)

2) 开启多线程6000个并发,运行出错,6000个线程直接把客户端的系统缓冲区打满,无法继续运行

1
pymysql.err.OperationalError: (2003, "Can't connect to MySQL server on '192.168.100.4' ([WinError 10055] 由于系统缓冲区空间不足或队列已满,不能执行套接字上的操作。)")

3)连接池开启6000个并发,连接池最大连接限制3000个,运行没有问题,cost:24.3 s

在mysql也可以看到最大已用连接数为3000个

1
2
3
4
5
6
7
MariaDB [(none)]> show global status like 'Max_used_connections';
+----------------------+-------+
| Variable_name | Value |
+----------------------+-------+
| Max_used_connections | 3000 |
+----------------------+-------+
1 row in set (0.001 sec)

5 、自行实现简易使用的mysql连接池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
import threading
from queue import Queue
import pymysql
from pymysql.cursors import DictCursor

db_info={
'host':'192.168.100.4',
'port':3306,
'user':'***',
'password':'****',
'db':'erp_app',
'charset':'utf8'
}


class ConnPoolException(Exception):
"""连接池出错 """


class MariaDBPool(object):
_inst_lock=threading.RLock()

def __init__(self, connections, **db_conf):
self.__connections = connections
self.__pool = Queue(connections)
# 在init阶段,就已经创建好指定的连接,全部put到共享队列
for i in range(self.__connections):
try:
conn = pymysql.connect(**db_conf)
self.__pool.put(conn)
except ConnPoolException as e:
raise IOError

# 单例模式创建连接池,个人喜欢用__new__方法创建,简洁,且使用了递归锁,保证在多线程方式创建单例模式的对象都是同一对象
def __new__(cls, *args, **kwargs):
with cls._inst_lock:
if not hasattr(cls,'_inst'):
cls._inst=object.__new__(cls)
return cls._inst

def execute_insert(self,sql,data_dict=None):
conn = self.__pool.get()
cursor = conn.cursor(DictCursor)
try:
result=cursor.execute(sql,data_dict) if data_dict else cursor.execute(sql)
conn.commit()
except ConnPoolException as e:
# 这里就是重点,只是关闭了游标,连接对像又返回池里
conn.rollback()
cursor.close()
self.__pool.put(conn)
return False
else:
# 这里就是重点,只是关闭了游标,连接对像又返回池里
cursor.close()
self.__pool.put(conn)
return result

def executemany_insert(self,sql,data_dict_list=None):
conn = self.__pool.get()
cursor = conn.cursor(DictCursor)
try:
result=cursor.execute(sql,data_dict) if data_dict_list else cursor.executemany(sql)
except ConnPoolException as e:
conn.rollback()
cursor.close()
self.__pool.put(conn)
return False
else:
cursor.close()
self.__pool.put(conn)
return result

# 这里才是真正的关闭所有连接池
def close(self):
for i in range(self.__connections):
self.__pool.get().close()

  就本文测试的数据库以及简单表结构而言,使用该连接池模块,注意测试之前,需重启mysql,保证数据库最大已连接数为1,也即清空历史残留连接,以便做新测试对比。12000个并发,mysql设定最大可用连接数:3000,单例模式,cost:22.2 s,程序不会出现任何异常,当然因表结构和服务器性能情况而不同。

  这里顺便提下oracle线程池,部分项目使用oracle数据库,cx_Oracle也支持使用连接池方式连接,大致流程如下,也可根据使用习惯封装更适合自身业务需要的模块。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import cx_Oracle
db_user='foo'
db_pwd='barbar'
db_host='192.168.100.7'
db_name='erp_app'
dsn = cx_Oracle.makedsn(db_host, "1521", db_name)
db_config = {
'user': db_user,
'password': db_pwd,
'dsn': dsn,
'min': 1,
'max': 1000,
'increment': 1,
'threaded': True
}
orc_pool = cx_Oracle.SessionPool(**dbConfig)

  若使用多线程方式,尤其数量大的情况下(500以上),很容易把底层bug爆出来,cx_Oracle会引发python解释器崩溃。其实建议,只要是连接数据库,所引入的第三方库支持线程池方法的话,都建议用线程池,哪怕你的插入数据不是太频繁或并发量不大,减少程序自身出错,也降低数据库连接压力。