复制代码

为懒人提供无限可能,生命不息,code不止

人类感性的情绪,让我们知难行难
我思故我在
日拱一卒,功不唐捐
  • 首页
  • 前端
  • 后台
  • 数据库
  • 运维
  • 资源下载
  • 实用工具
  • 接口文档工具
  • 企业管理平台Demo
  • 登录
  • 注册

其它

【原创】python3多数据源(doris和mysql),从库读取数据,主库更新业务数据

作者: whooyun发表于: 2025-09-03 11:45

import mysql.connector
from mysql.connector import Error
import json
import os
from datetime import datetime
from time import sleep

class MySQLDualDatabase:
    def __init__(self, read_config, write_config):
        """
        初始化双数据源连接
       
        Args:
            read_config: 只读数据库配置字典
            write_config: 可写数据库配置字典
        """
        self.read_config = {
            **read_config,
            'charset': 'utf8mb4',
            'auth_plugin': 'mysql_native_password'
        }
        self.write_config = {
            **write_config,
            'charset': 'utf8mb4',
            'auth_plugin': 'mysql_native_password'
        }
        self.read_connection = None
        self.write_connection = None
        self.cached_data = []  # 本地数据缓存
        self.max_retry_count = 3  # 最大重试次数
        self.retry_delay = 1  # 重试延迟(秒)

    def __enter__(self):
        self.connect()
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.close()

    def connect(self):
        """建立双数据源连接"""
        try:
            # 连接只读数据库
            self.read_connection = mysql.connector.connect(**self.read_config)
            print("只读数据库连接成功")
           
            # 连接可写数据库
            self.write_connection = mysql.connector.connect(**self.write_config)
            print("可写数据库连接成功")
           
        except Error as e:
            print(f"数据库连接失败: {e}")
            self.close()  # 确保部分连接也被关闭
            raise

    def close(self):
        """关闭双数据源连接"""
        # 关闭只读连接
        try:
            if self.read_connection and self.read_connection.is_connected():
                self.read_connection.close()
                print("只读数据库连接已关闭")
        except Exception as e:
            print(f"关闭只读数据库连接时出错: {e}")
        finally:
            self.read_connection = None
       
        # 关闭可写连接
        try:
            if self.write_connection and self.write_connection.is_connected():
                self.write_connection.close()
                print("可写数据库连接已关闭")
        except Exception as e:
            print(f"关闭可写数据库连接时出错: {e}")
        finally:
             self.write_connection = None

    def _validate_connections(self):
        """验证数据库连接状态"""
        if not self.read_connection or not self.read_connection.is_connected():
            raise Exception("只读数据库连接无效或已断开")
        if not self.write_connection or not self.write_connection.is_connected():
            raise Exception("可写数据库连接无效或已断开")
   
    def _validate_cached_data(self):
        """验证缓存数据的完整性"""
        if not self.cached_data:
            return True
           
        # 检查数据格式
        for i, record in enumerate(self.cached_data):
            if not isinstance(record, (tuple, list)) or len(record) != 2:
                print(f"警告: 第 {i+1} 条缓存数据格式异常: {record}")
                return False
           
            # 检查ID是否为有效数字
            if not isinstance(record[0], (int, float)) or record[0] <= 0:
                print(f"警告: 第 {i+1} 条缓存数据ID无效: {record[0]}")
                return False
               
            # 检查时间字段
            if record[1] is None:
                print(f"警告: 第 {i+1} 条缓存数据时间字段为空: {record}")
                return False
       
        return True

    def fetch_all_data_from_readonly(self, start_id=37882966, end_id=201173267):
        """
        从只读数据库获取所有需要更新的数据并缓存到本地
       
        Args:
            start_id: 起始ID
            end_id: 结束ID
       
        Returns:
            int: 获取到的记录总数
        """
        # 验证连接状态
        self._validate_connections()
           
        retry_count = 0
        while retry_count < self.max_retry_count:
            try:
                cursor = self.read_connection.cursor()
                self.cached_data = []  # 清空缓存
               
                print(f"开始从只读数据库查询数据,ID范围: {start_id+1} - {end_id}")
               
                # 查询所有需要更新的数据
                select_sql = """
                SELECT e.id, sub.min_create_time
                FROM fa_expense e
                INNER JOIN (
                    SELECT
                        master_fee_id,
                        MIN(create_time) as min_create_time
                    FROM fa_expense_sub
                    WHERE create_time IS NOT NULL
                    AND master_fee_id > %s
                    AND master_fee_id <= %s
                    GROUP BY master_fee_id
                ) sub ON e.id = sub.master_fee_id
                WHERE e.create_time IS NULL
                AND e.id > %s
                AND e.id <= %s
                ORDER BY e.id
                """
               
                cursor.execute(select_sql, (start_id, end_id, start_id, end_id))
                self.cached_data = cursor.fetchall()
               
                cursor.close()
               
                # 验证缓存数据
                if not self._validate_cached_data():
                    raise Exception("缓存数据验证失败")
               
                print(f"从只读数据库获取到 {len(self.cached_data)} 条需要更新的记录")
               
                # 可选:将数据保存到本地文件作为备份
                self._save_cache_to_file()
               
                return len(self.cached_data)
               
            except Error as e:
                retry_count += 1
                print(f"从只读数据库查询数据失败 (第{retry_count}次尝试): {e}")
               
                if retry_count < self.max_retry_count:
                    print(f"等待 {self.retry_delay} 秒后重试...")
                    sleep(self.retry_delay)
                else:
                    print("已达到最大重试次数,查询失败")
                    raise
   
    def _save_cache_to_file(self):
        """
        将缓存数据保存到本地文件
        """
        try:
            cache_file = f"cache_data_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
            cache_data = {
                'timestamp': datetime.now().isoformat(),
                'total_records': len(self.cached_data),
                'data': [(int(record[0]), record[1].isoformat() if record[1] else None) for record in self.cached_data]
            }
           
            with open(cache_file, 'w', encoding='utf-8') as f:
                json.dump(cache_data, f, ensure_ascii=False, indent=2)
           
            print(f"缓存数据已保存到文件: {cache_file}")
           
        except Exception as e:
            print(f"保存缓存文件失败: {e}")

    def update_from_cached_data(self, commit_size=1000):
        """
        使用缓存数据在可写数据库执行批量更新
       
        Args:
            commit_size: 每多少条记录提交一次事务
       
        Returns:
            int: 实际更新的记录数
        """
        # 验证连接状态
        self._validate_connections()
       
        if not self.cached_data:
            print("没有缓存数据可供更新")
            return 0
           
        # 验证缓存数据
        if not self._validate_cached_data():
            raise Exception("缓存数据验证失败,无法进行更新")
           
        retry_count = 0
        while retry_count < self.max_retry_count:
            try:
                cursor = self.write_connection.cursor()
                update_sql = "UPDATE fa_expense SET create_time = %s WHERE id = %s AND create_time IS NULL"
               
                total_updated = 0
                total_records = len(self.cached_data)
               
                print(f"开始使用缓存数据在可写数据库执行更新,总记录数: {total_records}")
               
                # 分批处理缓存数据
                for i in range(0, total_records, commit_size):
                    # 获取当前批次的数据
                    batch_data = self.cached_data[i:i + commit_size]
                    update_data = [(record[1], record[0]) for record in batch_data]
                   
                    batch_retry_count = 0
                    while batch_retry_count < self.max_retry_count:
                        try:
                            # 执行批量更新
                            cursor.executemany(update_sql, update_data)
                            affected_rows = cursor.rowcount
                           
                            # 提交事务
                            self.write_connection.commit()
                           
                            batch_count = len(batch_data)
                            total_updated += affected_rows
                           
                            print(f"第 {i//commit_size + 1} 批处理完成,处理 {batch_count} 条记录,实际更新 {affected_rows} 条")
                           
                            # 执行后暂停0.2秒,减少数据库压力
                            sleep(0.2)
                            break
                           
                        except Error as batch_error:
                            batch_retry_count += 1
                            print(f"第 {i//commit_size + 1} 批处理失败 (第{batch_retry_count}次尝试): {batch_error}")
                            self.write_connection.rollback()
                           
                            if batch_retry_count < self.max_retry_count:
                                print(f"等待 {self.retry_delay} 秒后重试批次 {i//commit_size + 1}...")
                                sleep(self.retry_delay)
                            else:
                                print(f"批次 {i//commit_size + 1} 已达到最大重试次数,跳过该批次")
                                break
               
                cursor.close()
                print(f"\n批量更新完成!总共处理 {total_records} 条记录,实际更新 {total_updated} 条记录")
               
                return total_updated
               
            except Error as e:
                retry_count += 1
                print(f"批量更新失败 (第{retry_count}次尝试): {e}")
                if self.write_connection:
                    self.write_connection.rollback()
                   
                if retry_count < self.max_retry_count:
                    print(f"等待 {self.retry_delay} 秒后重试整个更新过程...")
                    sleep(self.retry_delay)
                else:
                    print("已达到最大重试次数,更新失败")
                    raise
   
    def process_complete_workflow(self, start_id=3378534, end_id=201554384, commit_size=1000):
        """
        完整的工作流程:从只读数据库查询数据,缓存到本地,然后在可写数据库执行更新
       
        Args:
            start_id: 起始ID
            end_id: 结束ID  
            commit_size: 每多少条记录提交一次事务
           
        Returns:
            dict: 包含查询和更新统计信息的字典
        """
        workflow_start_time = datetime.now()
       
        try:
            # 第一步:从只读数据库获取数据
            print("=== 第一步:从只读数据库查询数据 ===")
            fetched_count = self.fetch_all_data_from_readonly(start_id, end_id)
           
            if fetched_count == 0:
                print("没有需要更新的数据")
                return {
                    'fetched_count': 0,
                    'updated_count': 0,
                    'duration': str(datetime.now() - workflow_start_time)
                }
           
            # 第二步:使用缓存数据在可写数据库执行更新
            print("\n=== 第二步:在可写数据库执行批量更新 ===")
            updated_count = self.update_from_cached_data(commit_size)
           
            workflow_end_time = datetime.now()
            duration = workflow_end_time - workflow_start_time
           
            result = {
                'fetched_count': fetched_count,
                'updated_count': updated_count,
                'duration': str(duration),
                'start_time': workflow_start_time.isoformat(),
                'end_time': workflow_end_time.isoformat()
            }
           
            print(f"\n=== 工作流程完成 ===")
            print(f"查询记录数: {fetched_count}")
            print(f"更新记录数: {updated_count}")
            print(f"总耗时: {duration}")
           
            return result
           
        except Exception as e:
            print(f"工作流程执行失败: {e}")
            raise
   
 

if __name__ == "__main__":
    # 只读数据库配置(用于查询数据)
    read_db_config = {
        'host': '192.168.2.35',  # 只读数据库地址
        'port': 3306,
        'user': 'tt_finance_ro',  # 临时使用读写用户进行测试
        'password': 'tt@123',  # 读写密码
        'database': 'tt-dev'
    }
   
    # 可写数据库配置(用于更新数据)
    write_db_config = {
        'host': '192.168.2.34',  # 可写数据库地址
        'port': 3306,
        'user': 'tt_finance_rw',  # 读写用户
        'password': 'tt@123',  # 读写密码
        'database': 'tt-dev'
    }
   
    # 使用双数据源执行完整工作流程
    with MySQLDualDatabase(read_db_config, write_db_config) as db:
        try:
            # 执行完整的工作流程:查询 -> 缓存 -> 更新
            result = db.process_complete_workflow(
                start_id=37882966,
                end_id=201173267,
                commit_size=1000
            )
           
            print("\n=== 执行结果统计 ===")
            print(f"查询到的记录数: {result['fetched_count']}")
            print(f"实际更新的记录数: {result['updated_count']}")
            print(f"执行总耗时: {result['duration']}")
            print(f"开始时间: {result['start_time']}")
            print(f"结束时间: {result['end_time']}")
           
        except Exception as e:
            print(f"程序执行失败: {e}")
            import traceback
            traceback.print_exc()