1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
| # 导入所需的库
from peewee import Model, CharField
from playhouse.pool import PooledPostgresqlExtDatabase
import random
# 定义主写库和多个读库的连接信息
MASTER_DB = {
'host': '127.0.0.1',
'port': 5432,
'user': 'postgres',
'password': 'meiyoumima',
'database': 'postgres',
}
SLAVE_DBS = [
{
'host': '127.0.0.1',
'port': 5433,
'user': 'postgres',
'password': 'meiyoumima',
'database': 'postgres',
},
{
'host': '127.0.0.1',
'port': 5434,
'user': 'postgres',
'password': 'meiyoumima',
'database': 'postgres',
},
# 可以添加更多的从库
]
# 创建主写库连接池
master_db = PooledPostgresqlExtDatabase(
MASTER_DB['database'],
host=MASTER_DB['host'],
port=MASTER_DB['port'],
user=MASTER_DB['user'],
password=MASTER_DB['password'],
max_connections=20,
stale_timeout=300
)
# 创建从库连接池列表
slave_dbs = [
PooledPostgresqlExtDatabase(
db['database'],
host=db['host'],
port=db['port'],
user=db['user'],
password=db['password'],
max_connections=20,
stale_timeout=300
) for db in SLAVE_DBS
]
class ReadWriteManager:
def __init__(self, master, slaves):
self.master = master
self.slaves = slaves
def get_read_db(self):
return random.choice(self.slaves)
def get_write_db(self):
return self.master
# 创建读写管理器
db_manager = ReadWriteManager(master_db, slave_dbs)
# 自定义Model基类,实现读写分离
class BaseModel(Model):
class Meta:
database = master_db # 默认使用主库
@classmethod
def select(cls, *args, **kwargs):
cls._meta.database = db_manager.get_read_db()
return super(BaseModel, cls).select(*args, **kwargs)
@classmethod
def insert(cls, *args, **kwargs):
cls._meta.database = db_manager.get_write_db()
return super(BaseModel, cls).insert(*args, **kwargs)
@classmethod
def update(cls, *args, **kwargs):
cls._meta.database = db_manager.get_write_db()
return super(BaseModel, cls).update(*args, **kwargs)
@classmethod
def delete(cls, *args, **kwargs):
cls._meta.database = db_manager.get_write_db()
return super(BaseModel, cls).delete(*args, **kwargs)
# 示例模型
class User(BaseModel):
name = CharField()
email = CharField(unique=True)
def create_user(name, email):
user, _ = User.get_or_create(name=name, email=email)
print(f"创建用户:{user.name}")
def get_user(email):
user = User.get(User.email == email)
print(f"获取用户:{user.name}")
# 测试读写分离
def test_read_write_separation():
create_user("张三", "zhangsan@example.com")
get_user("zhangsan@example.com")
if __name__ == "__main__":
# test_router()
master_db.create_tables([User])
test_read_write_separation()
user = User.select().order_by(User.id).get_or_none()
print(user.id, user.name, user.email)
|