复制代码
为懒人提供无限可能,生命不息,code不止
人类感性的情绪,让我们知难行难
我思故我在
日拱一卒,功不唐捐
首页
前端
后台
数据库
运维
资源下载
实用工具
接口文档工具
企业管理平台Demo
登录
注册
其它
【原创】python3多数据源(doris和mysql),从库读取数据,主库更新业务数据
作者: whooyun
发表于:
2025-09-03 11:45
import
mysql
.
connector
from
mysql
.
connector
import
Error
import
json
import
os
from
datetime
import
datetime
from
time
import
sleep
class
MySQLDualDatabase
:
def
__init__
(
self
,
read_config
,
write_config
):
"""
初始化双数据源连接
Args:
read_config: 只读数据库配置字典
write_config: 可写数据库配置字典
"""
self
.
read_config
=
{
**
read_config
,
'charset'
:
'utf8mb4'
,
'auth_plugin'
:
'mysql_native_password'
}
self
.
write_config
=
{
**
write_config
,
'charset'
:
'utf8mb4'
,
'auth_plugin'
:
'mysql_native_password'
}
self
.
read_connection
=
None
self
.
write_connection
=
None
self
.
cached_data
=
[]
# 本地数据缓存
self
.
max_retry_count
=
3
# 最大重试次数
self
.
retry_delay
=
1
# 重试延迟(秒)
def
__enter__
(
self
):
self
.
connect
()
return
self
def
__exit__
(
self
,
exc_type
,
exc_val
,
exc_tb
):
self
.
close
()
def
connect
(
self
):
"""建立双数据源连接"""
try
:
# 连接只读数据库
self
.
read_connection
=
mysql
.
connector
.
connect
(
**
self
.
read_config
)
print
(
"只读数据库连接成功"
)
# 连接可写数据库
self
.
write_connection
=
mysql
.
connector
.
connect
(
**
self
.
write_config
)
print
(
"可写数据库连接成功"
)
except
Error
as
e
:
print
(
f
"数据库连接失败:
{
e
}
"
)
self
.
close
()
# 确保部分连接也被关闭
raise
def
close
(
self
):
"""关闭双数据源连接"""
# 关闭只读连接
try
:
if
self
.
read_connection
and
self
.
read_connection
.
is_connected
():
self
.
read_connection
.
close
()
print
(
"只读数据库连接已关闭"
)
except
Exception
as
e
:
print
(
f
"关闭只读数据库连接时出错:
{
e
}
"
)
finally
:
self
.
read_connection
=
None
# 关闭可写连接
try
:
if
self
.
write_connection
and
self
.
write_connection
.
is_connected
():
self
.
write_connection
.
close
()
print
(
"可写数据库连接已关闭"
)
except
Exception
as
e
:
print
(
f
"关闭可写数据库连接时出错:
{
e
}
"
)
finally
:
self
.
write_connection
=
None
def
_validate_connections
(
self
):
"""验证数据库连接状态"""
if
not
self
.
read_connection
or
not
self
.
read_connection
.
is_connected
():
raise
Exception
(
"只读数据库连接无效或已断开"
)
if
not
self
.
write_connection
or
not
self
.
write_connection
.
is_connected
():
raise
Exception
(
"可写数据库连接无效或已断开"
)
def
_validate_cached_data
(
self
):
"""验证缓存数据的完整性"""
if
not
self
.
cached_data
:
return
True
# 检查数据格式
for
i
,
record
in
enumerate
(
self
.
cached_data
):
if
not
isinstance
(
record
, (
tuple
,
list
))
or
len
(
record
)
!=
2
:
print
(
f
"警告: 第
{
i
+
1
}
条缓存数据格式异常:
{
record
}
"
)
return
False
# 检查ID是否为有效数字
if
not
isinstance
(
record
[
0
], (
int
,
float
))
or
record
[
0
]
<=
0
:
print
(
f
"警告: 第
{
i
+
1
}
条缓存数据ID无效:
{
record
[
0
]
}
"
)
return
False
# 检查时间字段
if
record
[
1
]
is
None
:
print
(
f
"警告: 第
{
i
+
1
}
条缓存数据时间字段为空:
{
record
}
"
)
return
False
return
True
def
fetch_all_data_from_readonly
(
self
,
start_id
=
37882966
,
end_id
=
201173267
):
"""
从只读数据库获取所有需要更新的数据并缓存到本地
Args:
start_id: 起始ID
end_id: 结束ID
Returns:
int: 获取到的记录总数
"""
# 验证连接状态
self
.
_validate_connections
()
retry_count
=
0
while
retry_count
<
self
.
max_retry_count
:
try
:
cursor
=
self
.
read_connection
.
cursor
()
self
.
cached_data
=
[]
# 清空缓存
print
(
f
"开始从只读数据库查询数据,ID范围:
{
start_id
+
1
}
-
{
end_id
}
"
)
# 查询所有需要更新的数据
select_sql
=
"""
SELECT e.id, sub.min_create_time
FROM fa_expense e
INNER JOIN (
SELECT
master_fee_id,
MIN(create_time) as min_create_time
FROM fa_expense_sub
WHERE create_time IS NOT NULL
AND master_fee_id >
%s
AND master_fee_id <=
%s
GROUP BY master_fee_id
) sub ON e.id = sub.master_fee_id
WHERE e.create_time IS NULL
AND e.id >
%s
AND e.id <=
%s
ORDER BY e.id
"""
cursor
.
execute
(
select_sql
, (
start_id
,
end_id
,
start_id
,
end_id
))
self
.
cached_data
=
cursor
.
fetchall
()
cursor
.
close
()
# 验证缓存数据
if
not
self
.
_validate_cached_data
():
raise
Exception
(
"缓存数据验证失败"
)
print
(
f
"从只读数据库获取到
{
len
(
self
.
cached_data
)
}
条需要更新的记录"
)
# 可选:将数据保存到本地文件作为备份
self
.
_save_cache_to_file
()
return
len
(
self
.
cached_data
)
except
Error
as
e
:
retry_count
+=
1
print
(
f
"从只读数据库查询数据失败 (第
{
retry_count
}
次尝试):
{
e
}
"
)
if
retry_count
<
self
.
max_retry_count
:
print
(
f
"等待
{
self
.
retry_delay
}
秒后重试..."
)
sleep
(
self
.
retry_delay
)
else
:
print
(
"已达到最大重试次数,查询失败"
)
raise
def
_save_cache_to_file
(
self
):
"""
将缓存数据保存到本地文件
"""
try
:
cache_file
=
f
"cache_data_
{
datetime
.
now
().
strftime
(
'%Y%m
%d
_%H%M%S'
)
}
.json"
cache_data
=
{
'timestamp'
:
datetime
.
now
().
isoformat
(),
'total_records'
:
len
(
self
.
cached_data
),
'data'
: [(
int
(
record
[
0
]),
record
[
1
].
isoformat
()
if
record
[
1
]
else
None
)
for
record
in
self
.
cached_data
]
}
with
open
(
cache_file
,
'w'
,
encoding
=
'utf-8'
)
as
f
:
json
.
dump
(
cache_data
,
f
,
ensure_ascii
=
False
,
indent
=
2
)
print
(
f
"缓存数据已保存到文件:
{
cache_file
}
"
)
except
Exception
as
e
:
print
(
f
"保存缓存文件失败:
{
e
}
"
)
def
update_from_cached_data
(
self
,
commit_size
=
1000
):
"""
使用缓存数据在可写数据库执行批量更新
Args:
commit_size: 每多少条记录提交一次事务
Returns:
int: 实际更新的记录数
"""
# 验证连接状态
self
.
_validate_connections
()
if
not
self
.
cached_data
:
print
(
"没有缓存数据可供更新"
)
return
0
# 验证缓存数据
if
not
self
.
_validate_cached_data
():
raise
Exception
(
"缓存数据验证失败,无法进行更新"
)
retry_count
=
0
while
retry_count
<
self
.
max_retry_count
:
try
:
cursor
=
self
.
write_connection
.
cursor
()
update_sql
=
"UPDATE fa_expense SET create_time =
%s
WHERE id =
%s
AND create_time IS NULL"
total_updated
=
0
total_records
=
len
(
self
.
cached_data
)
print
(
f
"开始使用缓存数据在可写数据库执行更新,总记录数:
{
total_records
}
"
)
# 分批处理缓存数据
for
i
in
range
(
0
,
total_records
,
commit_size
):
# 获取当前批次的数据
batch_data
=
self
.
cached_data
[
i
:
i
+
commit_size
]
update_data
=
[(
record
[
1
],
record
[
0
])
for
record
in
batch_data
]
batch_retry_count
=
0
while
batch_retry_count
<
self
.
max_retry_count
:
try
:
# 执行批量更新
cursor
.
executemany
(
update_sql
,
update_data
)
affected_rows
=
cursor
.
rowcount
# 提交事务
self
.
write_connection
.
commit
()
batch_count
=
len
(
batch_data
)
total_updated
+=
affected_rows
print
(
f
"第
{
i
//
commit_size
+
1
}
批处理完成,处理
{
batch_count
}
条记录,实际更新
{
affected_rows
}
条"
)
# 执行后暂停0.2秒,减少数据库压力
sleep
(
0.2
)
break
except
Error
as
batch_error
:
batch_retry_count
+=
1
print
(
f
"第
{
i
//
commit_size
+
1
}
批处理失败 (第
{
batch_retry_count
}
次尝试):
{
batch_error
}
"
)
self
.
write_connection
.
rollback
()
if
batch_retry_count
<
self
.
max_retry_count
:
print
(
f
"等待
{
self
.
retry_delay
}
秒后重试批次
{
i
//
commit_size
+
1
}
..."
)
sleep
(
self
.
retry_delay
)
else
:
print
(
f
"批次
{
i
//
commit_size
+
1
}
已达到最大重试次数,跳过该批次"
)
break
cursor
.
close
()
print
(
f
"
\n
批量更新完成!总共处理
{
total_records
}
条记录,实际更新
{
total_updated
}
条记录"
)
return
total_updated
except
Error
as
e
:
retry_count
+=
1
print
(
f
"批量更新失败 (第
{
retry_count
}
次尝试):
{
e
}
"
)
if
self
.
write_connection
:
self
.
write_connection
.
rollback
()
if
retry_count
<
self
.
max_retry_count
:
print
(
f
"等待
{
self
.
retry_delay
}
秒后重试整个更新过程..."
)
sleep
(
self
.
retry_delay
)
else
:
print
(
"已达到最大重试次数,更新失败"
)
raise
def
process_complete_workflow
(
self
,
start_id
=
3378534
,
end_id
=
201554384
,
commit_size
=
1000
):
"""
完整的工作流程:从只读数据库查询数据,缓存到本地,然后在可写数据库执行更新
Args:
start_id: 起始ID
end_id: 结束ID
commit_size: 每多少条记录提交一次事务
Returns:
dict: 包含查询和更新统计信息的字典
"""
workflow_start_time
=
datetime
.
now
()
try
:
# 第一步:从只读数据库获取数据
print
(
"=== 第一步:从只读数据库查询数据 ==="
)
fetched_count
=
self
.
fetch_all_data_from_readonly
(
start_id
,
end_id
)
if
fetched_count
==
0
:
print
(
"没有需要更新的数据"
)
return
{
'fetched_count'
:
0
,
'updated_count'
:
0
,
'duration'
:
str
(
datetime
.
now
()
-
workflow_start_time
)
}
# 第二步:使用缓存数据在可写数据库执行更新
print
(
"
\n
=== 第二步:在可写数据库执行批量更新 ==="
)
updated_count
=
self
.
update_from_cached_data
(
commit_size
)
workflow_end_time
=
datetime
.
now
()
duration
=
workflow_end_time
-
workflow_start_time
result
=
{
'fetched_count'
:
fetched_count
,
'updated_count'
:
updated_count
,
'duration'
:
str
(
duration
),
'start_time'
:
workflow_start_time
.
isoformat
(),
'end_time'
:
workflow_end_time
.
isoformat
()
}
print
(
f
"
\n
=== 工作流程完成 ==="
)
print
(
f
"查询记录数:
{
fetched_count
}
"
)
print
(
f
"更新记录数:
{
updated_count
}
"
)
print
(
f
"总耗时:
{
duration
}
"
)
return
result
except
Exception
as
e
:
print
(
f
"工作流程执行失败:
{
e
}
"
)
raise
if
__name__
==
"__main__"
:
# 只读数据库配置(用于查询数据)
read_db_config
=
{
'host'
:
'192.168.2.35'
,
# 只读数据库地址
'port'
:
3306
,
'user'
:
'tt_finance_ro'
,
# 临时使用读写用户进行测试
'password'
:
'tt@123'
,
# 读写密码
'database'
:
'tt-dev'
}
# 可写数据库配置(用于更新数据)
write_db_config
=
{
'host'
:
'192.168.2.34'
,
# 可写数据库地址
'port'
:
3306
,
'user'
:
'tt_finance_rw'
,
# 读写用户
'password'
:
'tt@123'
,
# 读写密码
'database'
:
'tt-dev'
}
# 使用双数据源执行完整工作流程
with
MySQLDualDatabase
(
read_db_config
,
write_db_config
)
as
db
:
try
:
# 执行完整的工作流程:查询 -> 缓存 -> 更新
result
=
db
.
process_complete_workflow
(
start_id
=
37882966
,
end_id
=
201173267
,
commit_size
=
1000
)
print
(
"
\n
=== 执行结果统计 ==="
)
print
(
f
"查询到的记录数:
{
result
[
'fetched_count'
]
}
"
)
print
(
f
"实际更新的记录数:
{
result
[
'updated_count'
]
}
"
)
print
(
f
"执行总耗时:
{
result
[
'duration'
]
}
"
)
print
(
f
"开始时间:
{
result
[
'start_time'
]
}
"
)
print
(
f
"结束时间:
{
result
[
'end_time'
]
}
"
)
except
Exception
as
e
:
print
(
f
"程序执行失败:
{
e
}
"
)
import
traceback
traceback
.
print_exc
()