如何使用Python脚本高效备份、恢复MySQL数据库?

鸿辰 Python 43

概述

本文介绍如何通过Python脚本实现MySQL数据库的备份与恢复。需要注意的是,这里的脚本只是实现了基础的备份与恢复功能,如果你需要个性化,比如进度条,大文件分割恢复等功能,你可以将该脚本进行优化即可。

需要的准备工作

因为我是基于mysqldump 命令实现的备份,因此需要安装该工具。因为这里我不需要MySQL服务端,因此只需要实现客户端安装即可。

# Ubuntu/Debian
sudo apt-get install mysql-client

# CentOS/RHEL
sudo yum install holland-mysqldump.noarch 
# 若 holland-mysqldump.noarch  不存在,可以安装MySQL,即
sudo yum install mysql

python使用的是 python3,这里不做安装说明。

备份数据库脚本

为了方便,这里省去了部分注释内容,如果有不懂的地方,可以将脚本发给AI做解释说明哦~

import subprocess
import os
import argparse
import datetime
import sys
import gzip
import re

def clean_old_backups(backup_dir, retention_days):
    """
    删除超过保留天数的备份文件

    参数:
    backup_dir: 备份目录
    retention_days: 备份保留天数
    """
    try:
        # 获取当前时间
        now = datetime.datetime.now()
        # 计算保留截止日期
        cutoff_date = now - datetime.timedelta(days=retention_days)

        # 遍历备份目录中的所有文件
        for filename in os.listdir(backup_dir):
            filepath = os.path.join(backup_dir, filename)

            # 跳过目录
            if not os.path.isfile(filepath):
                continue

            # 检查是否是备份文件 (匹配备份文件名模式)
            match = re.match(r'backup_(.+)_(\d{8}_\d{6})\.sql(\.gz)?$', filename)
            if not match:
                continue

            # 从文件名中提取时间戳
            timestamp_str = match.group(2)
            try:
                # 将时间戳转换为datetime对象
                file_date = datetime.datetime.strptime(timestamp_str, "%Y%m%d_%H%M%S")
            except ValueError:
                continue

            # 检查文件是否超过保留期限
            if file_date < cutoff_date:
                try:
                    os.remove(filepath)
                    print(f"Deleted old backup: {filename}")
                except Exception as e:
                    print(f"Failed to delete {filename}: {str(e)}", file=sys.stderr)

    except Exception as e:
        print(f"Error cleaning old backups: {str(e)}", file=sys.stderr)

def backup_mysql(host, port, user, password, backup_dir, databases=None, compress=True,retention_days=0):
    """
    备份MySQL数据库

    参数:
    host: MySQL主机地址
    port: MySQL端口
    user: MySQL用户名
    password: MySQL密码
    backup_dir: 备份文件存放目录
    databases: 要备份的数据库列表(默认备份所有数据库)
    compress: 是否使用gzip压缩备份文件
    retention_days: 备份保留天数 (0表示不删除旧备份)
    """
    try:
        # 确保备份目录存在
        os.makedirs(backup_dir, exist_ok=True)

        # 生成时间戳
        timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")

        # 构建备份文件名
        if databases:
            db_list = '_'.join(databases)
            filename = f"backup_{db_list}_{timestamp}.sql"
        else:
            filename = f"backup_all_databases_{timestamp}.sql"

        filepath = os.path.join(backup_dir, filename)
        final_file = filepath + '.gz' if compress else filepath

        # 构建mysqldump命令
        cmd = [
            'mysqldump',
            f'--host={host}',
            f'--port={port}',
            f'--user={user}',
            f'--password={password}',
            '--single-transaction',
            '--routines',
            '--triggers',
            '--events'
        ]

        # 添加数据库参数
        if databases:
            cmd.append('--databases')
            cmd.extend(databases)
        else:
            cmd.append('--all-databases')

        # 执行备份并处理压缩
        if compress:
            with gzip.open(final_file, 'wb') as gz_file:
                process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
                stdout, stderr = process.communicate()

                if process.returncode != 0:
                    raise Exception(f"mysqldump failed: {stderr.decode().strip()}")

                gz_file.write(stdout)
        else:
            with open(filepath, 'wb') as sql_file:
                process = subprocess.Popen(cmd, stdout=sql_file, stderr=subprocess.PIPE)
                _, stderr = process.communicate()

                if process.returncode != 0:
                    raise Exception(f"mysqldump failed: {stderr.decode().strip()}")

        print(f"Backup completed successfully: {final_file}")

        # 清理旧备份
        if retention_days > 0:
            print(f"Cleaning backups older than {retention_days} days...")
            clean_old_backups(backup_dir, retention_days)

        return True

    except Exception as e:
        print(f"Backup failed: {str(e)}", file=sys.stderr)
        return False

if __name__ == "__main__":
    parser = argparse.ArgumentParser(description='MySQL Database Backup Tool')
    parser.add_argument('--host', default='127.0.0.1', help='MySQL host address')
    parser.add_argument('--port', type=int, default=3306, help='MySQL port (default: 3306)')
    parser.add_argument('--user', default='root', help='MySQL username')
    parser.add_argument('--password', required=True, help='MySQL password')
    parser.add_argument('--backup-dir', required=True, help='Backup directory')
    parser.add_argument('--databases', nargs='+', help='Specific databases to backup (default: all)')
    parser.add_argument('--retention-days', type=int, default=0,help='Number of days to keep backups (0=keep forever)')
    parser.add_argument('--no-compress', action='store_false', help='Disable gzip compression')

    args = parser.parse_args()

    # 执行备份
    result = backup_mysql(
        host=args.host,
        port=args.port,
        user=args.user,
        password=args.password,
        backup_dir=args.backup_dir,
        databases=args.databases,
        compress=args.no_compress,
        retention_days=args.retention_days
    )

    sys.exit(0 if result else 1)

该备份脚本具有以下功能

基础备份功能

  • 支持备份所有数据库或指定数据库
  • 生成带时间戳的备份文件名
  • 自动创建备份目录

高级选项

  • 使用gzip压缩备份文件(默认启用)
  • 支持清理过期的备份

错误处理

  • 验证备份目录可访问性
  • 捕获并报告mysqldump错误
  • 返回适当的退出代码

如何使用呢?

创建一个mysql_backup.py的文件,然后把代码复制粘贴到文件中。赋予文件可执行权限

参数说明

参数 说明
–host 数据库服务器地址 ,默认127.0.0.1
–port 数据库服务器端口 ,默认3306
–user 用户名 ,默认root
–password 密码
–backup-dir 备份文件存放的目录
–databases 备份的数据库,不传则备份所有数据库
–retention-days 保留指定天数内的备份,超过则删除
–no-compress 不使用压缩,默认使用压缩

使用示例

备份所有数据库

python3 mysql_backup.py --host=127.0.0.1 --port=3306 --user=root --password=123456 --backup-dir=/var/backup/

备份指定数据库,不压缩

python3 mysql_backup.py --password=123456 --backup-dir=/var/backup/ --databases=db1 db2 --no-compress

备份指定数据库,同时删除超过30天的备份

python3 mysql_backup.py --password=123456 --backup-dir=/var/backup/ --databases=db1 db2 --retention-days=30

备份后,可在/var/backup/目录看到备份的文件

还原备份脚本

有备份就有还原,下面是针对备份脚本实现的还原脚本

import subprocess
import os
import argparse
import sys
import gzip
import re
from tqdm import tqdm

def restore_mysql(host, port, user, password, backup_file):
    """
    还原MySQL数据库

    参数:
    host: MySQL主机地址
    port: MySQL端口
    user: MySQL用户名
    password: MySQL密码
    backup_file: 备份文件路径
    """
    try:
        # 检查备份文件是否存在
        if not os.path.isfile(backup_file):
            raise FileNotFoundError(f"Backup file not found: {backup_file}")

        # 构建mysql命令
        mysql_cmd = [
            'mysql',
            f'--host={host}',
            f'--port={port}',
            f'--user={user}',
            f'--password={password}',
            '--batch'
        ]

        # 根据文件类型选择处理方式
        if backup_file.endswith('.gz'):
            # 处理gzip压缩文件
            with gzip.open(backup_file, 'rb') as gz_file:
                process = subprocess.Popen(
                    mysql_cmd,
                    stdin=subprocess.PIPE,
                    stderr=subprocess.PIPE
                )
                stdout, stderr = process.communicate(gz_file.read())
        else:
            # 处理未压缩SQL文件
            with open(backup_file, 'rb') as sql_file:
                process = subprocess.Popen(
                    mysql_cmd,
                    stdin=sql_file,
                    stderr=subprocess.PIPE
                )
                stdout, stderr = process.communicate()

        # 检查执行结果
        if process.returncode != 0:
            error_msg = stderr.decode().strip() if stderr else "Unknown error"
            raise Exception(f"Restore failed: {error_msg}")

        print("Database restore completed successfully")
        return True

    except Exception as e:
        print(f"Restore failed: {str(e)}", file=sys.stderr)
        return False

def get_databases_from_backup(backup_file):
    """尝试从备份文件中提取数据库名"""
    try:
        opener = gzip.open if backup_file.endswith('.gz') else open
        with opener(backup_file, 'rb') as f:
            content = f.read(4096).decode('utf-8', errors='ignore')

            # 查找数据库名模式
            db_patterns = [
                r"CREATE DATABASE /\*!32312 IF NOT EXISTS\*/ `([^`]+)`",
                r"USE `([^`]+)`;",
                r"Current Database: `([^`]+)`"
            ]

            for pattern in db_patterns:
                match = re.search(pattern, content)
                if match:
                    return match.group(1)

            # 尝试从文件名猜测
            filename = os.path.basename(backup_file)
            if 'backup_' in filename:
                return filename.split('_')[1]

    except:
        pass

    return "unknown database(s)"

if __name__ == "__main__":
    parser = argparse.ArgumentParser(description='MySQL Database Restore Tool')
    parser.add_argument('--host', default='127.0.0.1', help='MySQL host address')
    parser.add_argument('--port', type=int, default=3306, help='MySQL port (default: 3306)')
    parser.add_argument('--user', default='root', help='MySQL username')
    parser.add_argument('--password', required=True, help='MySQL password')
    parser.add_argument('--backup-file', required=True, help='Path to the backup file (.sql or .sql.gz)')
    parser.add_argument('--force', action='store_true', help='Skip confirmation prompt')

    args = parser.parse_args()

    # 获取备份中的数据库名
    db_name = get_databases_from_backup(args.backup_file)

    # 安全确认
    if not args.force:
        print(f"\nWARNING: This will OVERWRITE data in database(s): {db_name}")
        print(f"You are about to restore from: {args.backup_file}")
        confirmation = input("Are you sure you want to proceed? (yes/no): ")

        if confirmation.lower() not in ['y', 'yes']:
            print("Restore cancelled")
            sys.exit(0)

    # 执行还原
    result = restore_mysql(
        host=args.host,
        port=args.port,
        user=args.user,
        password=args.password,
        backup_file=args.backup_file
    )

    sys.exit(0 if result else 1)

该备份脚本具有以下功能

基础还原功能

  • 支持压缩(.gz)和未压缩(.sql)备份文件
  • 自动检测文件类型并相应处理
  • 完整的错误处理和状态报告

安全机制

  • 备份文件存在性验证
  • 数据库覆盖操作确认提示(可通过–force跳过)
  • 尝试从备份文件中提取数据库名用于确认

智能恢复

  • 自动从备份文件头解析数据库名
  • 支持从文件名猜测数据库名
  • 批量执行模式提高恢复效率

如何使用呢?

创建一个mysql_restore.py的文件,然后把代码复制粘贴到文件中。赋予文件可执行权限

参数说明

参数 说明
–host 数据库服务器地址 ,默认127.0.0.1
–port 数据库服务器端口 ,默认3306
–user 用户名 ,默认root
–password 密码
–backup-file 备份文件绝对路径
–force 强制恢复

使用示例

基本还原

python3 mysql_restore.py --host=127.0.0.1 --port=3306 --user=root --password=123456 --backup-file=/var/backup/backup_db1_20250731_105804.sql.gz

强制还原,没有确认步骤

python3 mysql_backup.py --password=123456 --backup-file=/var/backup/backup_db1_20250731_105804.sql.gz --force

还原请注意

  • 还原操作会覆盖现有数据,请谨慎使用,若确定覆盖,也建议先备份当前数据库
  • 确保备份文件与MySQL服务器版本兼容
  • 大型数据库恢复可能需要较长时间,可以针对脚本实现分割还原

标签: python mysql