概述
本文介绍如何通过Python脚本实现MySQL数据库的备份与恢复。需要注意的是,这里的脚本只是实现了基础的备份与恢复功能,如果你需要个性化,比如进度条,大文件分割恢复等功能,你可以将该脚本进行优化即可。
需要的准备工作
因为我是基于mysqldump
命令实现的备份,因此需要安装该工具。因为这里我不需要MySQL服务端,因此只需要实现客户端安装即可。
# Ubuntu/Debian
sudo apt-get install mysql-client
# CentOS/RHEL
sudo yum install holland-mysqldump.noarch
# 若 holland-mysqldump.noarch 不存在,可以安装MySQL,即
sudo yum install mysql
python
使用的是 python3,这里不做安装说明。
备份数据库脚本
为了方便,这里省去了部分注释内容,如果有不懂的地方,可以将脚本发给AI做解释说明哦~
import subprocess
import os
import argparse
import datetime
import sys
import gzip
import re
def clean_old_backups(backup_dir, retention_days):
"""
删除超过保留天数的备份文件
参数:
backup_dir: 备份目录
retention_days: 备份保留天数
"""
try:
# 获取当前时间
now = datetime.datetime.now()
# 计算保留截止日期
cutoff_date = now - datetime.timedelta(days=retention_days)
# 遍历备份目录中的所有文件
for filename in os.listdir(backup_dir):
filepath = os.path.join(backup_dir, filename)
# 跳过目录
if not os.path.isfile(filepath):
continue
# 检查是否是备份文件 (匹配备份文件名模式)
match = re.match(r'backup_(.+)_(\d{8}_\d{6})\.sql(\.gz)?$', filename)
if not match:
continue
# 从文件名中提取时间戳
timestamp_str = match.group(2)
try:
# 将时间戳转换为datetime对象
file_date = datetime.datetime.strptime(timestamp_str, "%Y%m%d_%H%M%S")
except ValueError:
continue
# 检查文件是否超过保留期限
if file_date < cutoff_date:
try:
os.remove(filepath)
print(f"Deleted old backup: {filename}")
except Exception as e:
print(f"Failed to delete {filename}: {str(e)}", file=sys.stderr)
except Exception as e:
print(f"Error cleaning old backups: {str(e)}", file=sys.stderr)
def backup_mysql(host, port, user, password, backup_dir, databases=None, compress=True,retention_days=0):
"""
备份MySQL数据库
参数:
host: MySQL主机地址
port: MySQL端口
user: MySQL用户名
password: MySQL密码
backup_dir: 备份文件存放目录
databases: 要备份的数据库列表(默认备份所有数据库)
compress: 是否使用gzip压缩备份文件
retention_days: 备份保留天数 (0表示不删除旧备份)
"""
try:
# 确保备份目录存在
os.makedirs(backup_dir, exist_ok=True)
# 生成时间戳
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
# 构建备份文件名
if databases:
db_list = '_'.join(databases)
filename = f"backup_{db_list}_{timestamp}.sql"
else:
filename = f"backup_all_databases_{timestamp}.sql"
filepath = os.path.join(backup_dir, filename)
final_file = filepath + '.gz' if compress else filepath
# 构建mysqldump命令
cmd = [
'mysqldump',
f'--host={host}',
f'--port={port}',
f'--user={user}',
f'--password={password}',
'--single-transaction',
'--routines',
'--triggers',
'--events'
]
# 添加数据库参数
if databases:
cmd.append('--databases')
cmd.extend(databases)
else:
cmd.append('--all-databases')
# 执行备份并处理压缩
if compress:
with gzip.open(final_file, 'wb') as gz_file:
process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = process.communicate()
if process.returncode != 0:
raise Exception(f"mysqldump failed: {stderr.decode().strip()}")
gz_file.write(stdout)
else:
with open(filepath, 'wb') as sql_file:
process = subprocess.Popen(cmd, stdout=sql_file, stderr=subprocess.PIPE)
_, stderr = process.communicate()
if process.returncode != 0:
raise Exception(f"mysqldump failed: {stderr.decode().strip()}")
print(f"Backup completed successfully: {final_file}")
# 清理旧备份
if retention_days > 0:
print(f"Cleaning backups older than {retention_days} days...")
clean_old_backups(backup_dir, retention_days)
return True
except Exception as e:
print(f"Backup failed: {str(e)}", file=sys.stderr)
return False
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='MySQL Database Backup Tool')
parser.add_argument('--host', default='127.0.0.1', help='MySQL host address')
parser.add_argument('--port', type=int, default=3306, help='MySQL port (default: 3306)')
parser.add_argument('--user', default='root', help='MySQL username')
parser.add_argument('--password', required=True, help='MySQL password')
parser.add_argument('--backup-dir', required=True, help='Backup directory')
parser.add_argument('--databases', nargs='+', help='Specific databases to backup (default: all)')
parser.add_argument('--retention-days', type=int, default=0,help='Number of days to keep backups (0=keep forever)')
parser.add_argument('--no-compress', action='store_false', help='Disable gzip compression')
args = parser.parse_args()
# 执行备份
result = backup_mysql(
host=args.host,
port=args.port,
user=args.user,
password=args.password,
backup_dir=args.backup_dir,
databases=args.databases,
compress=args.no_compress,
retention_days=args.retention_days
)
sys.exit(0 if result else 1)
该备份脚本具有以下功能
基础备份功能
- 支持备份所有数据库或指定数据库
- 生成带时间戳的备份文件名
- 自动创建备份目录
高级选项
- 使用gzip压缩备份文件(默认启用)
- 支持清理过期的备份
错误处理
- 验证备份目录可访问性
- 捕获并报告mysqldump错误
- 返回适当的退出代码
如何使用呢?
创建一个mysql_backup.py
的文件,然后把代码复制粘贴到文件中。赋予文件可执行权限。
参数说明
参数 | 说明 |
---|---|
–host | 数据库服务器地址 ,默认127.0.0.1 |
–port | 数据库服务器端口 ,默认3306 |
–user | 用户名 ,默认root |
–password | 密码 |
–backup-dir | 备份文件存放的目录 |
–databases | 备份的数据库,不传则备份所有数据库 |
–retention-days | 保留指定天数内的备份,超过则删除 |
–no-compress | 不使用压缩,默认使用压缩 |
使用示例
备份所有数据库
python3 mysql_backup.py --host=127.0.0.1 --port=3306 --user=root --password=123456 --backup-dir=/var/backup/
备份指定数据库,不压缩
python3 mysql_backup.py --password=123456 --backup-dir=/var/backup/ --databases=db1 db2 --no-compress
备份指定数据库,同时删除超过30天的备份
python3 mysql_backup.py --password=123456 --backup-dir=/var/backup/ --databases=db1 db2 --retention-days=30
备份后,可在/var/backup/目录看到备份的文件
还原备份脚本
有备份就有还原,下面是针对备份脚本实现的还原脚本
import subprocess
import os
import argparse
import sys
import gzip
import re
from tqdm import tqdm
def restore_mysql(host, port, user, password, backup_file):
"""
还原MySQL数据库
参数:
host: MySQL主机地址
port: MySQL端口
user: MySQL用户名
password: MySQL密码
backup_file: 备份文件路径
"""
try:
# 检查备份文件是否存在
if not os.path.isfile(backup_file):
raise FileNotFoundError(f"Backup file not found: {backup_file}")
# 构建mysql命令
mysql_cmd = [
'mysql',
f'--host={host}',
f'--port={port}',
f'--user={user}',
f'--password={password}',
'--batch'
]
# 根据文件类型选择处理方式
if backup_file.endswith('.gz'):
# 处理gzip压缩文件
with gzip.open(backup_file, 'rb') as gz_file:
process = subprocess.Popen(
mysql_cmd,
stdin=subprocess.PIPE,
stderr=subprocess.PIPE
)
stdout, stderr = process.communicate(gz_file.read())
else:
# 处理未压缩SQL文件
with open(backup_file, 'rb') as sql_file:
process = subprocess.Popen(
mysql_cmd,
stdin=sql_file,
stderr=subprocess.PIPE
)
stdout, stderr = process.communicate()
# 检查执行结果
if process.returncode != 0:
error_msg = stderr.decode().strip() if stderr else "Unknown error"
raise Exception(f"Restore failed: {error_msg}")
print("Database restore completed successfully")
return True
except Exception as e:
print(f"Restore failed: {str(e)}", file=sys.stderr)
return False
def get_databases_from_backup(backup_file):
"""尝试从备份文件中提取数据库名"""
try:
opener = gzip.open if backup_file.endswith('.gz') else open
with opener(backup_file, 'rb') as f:
content = f.read(4096).decode('utf-8', errors='ignore')
# 查找数据库名模式
db_patterns = [
r"CREATE DATABASE /\*!32312 IF NOT EXISTS\*/ `([^`]+)`",
r"USE `([^`]+)`;",
r"Current Database: `([^`]+)`"
]
for pattern in db_patterns:
match = re.search(pattern, content)
if match:
return match.group(1)
# 尝试从文件名猜测
filename = os.path.basename(backup_file)
if 'backup_' in filename:
return filename.split('_')[1]
except:
pass
return "unknown database(s)"
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='MySQL Database Restore Tool')
parser.add_argument('--host', default='127.0.0.1', help='MySQL host address')
parser.add_argument('--port', type=int, default=3306, help='MySQL port (default: 3306)')
parser.add_argument('--user', default='root', help='MySQL username')
parser.add_argument('--password', required=True, help='MySQL password')
parser.add_argument('--backup-file', required=True, help='Path to the backup file (.sql or .sql.gz)')
parser.add_argument('--force', action='store_true', help='Skip confirmation prompt')
args = parser.parse_args()
# 获取备份中的数据库名
db_name = get_databases_from_backup(args.backup_file)
# 安全确认
if not args.force:
print(f"\nWARNING: This will OVERWRITE data in database(s): {db_name}")
print(f"You are about to restore from: {args.backup_file}")
confirmation = input("Are you sure you want to proceed? (yes/no): ")
if confirmation.lower() not in ['y', 'yes']:
print("Restore cancelled")
sys.exit(0)
# 执行还原
result = restore_mysql(
host=args.host,
port=args.port,
user=args.user,
password=args.password,
backup_file=args.backup_file
)
sys.exit(0 if result else 1)
该备份脚本具有以下功能
基础还原功能
- 支持压缩(.gz)和未压缩(.sql)备份文件
- 自动检测文件类型并相应处理
- 完整的错误处理和状态报告
安全机制
- 备份文件存在性验证
- 数据库覆盖操作确认提示(可通过–force跳过)
- 尝试从备份文件中提取数据库名用于确认
智能恢复
- 自动从备份文件头解析数据库名
- 支持从文件名猜测数据库名
- 批量执行模式提高恢复效率
如何使用呢?
创建一个mysql_restore.py
的文件,然后把代码复制粘贴到文件中。赋予文件可执行权限。
参数说明
参数 | 说明 |
---|---|
–host | 数据库服务器地址 ,默认127.0.0.1 |
–port | 数据库服务器端口 ,默认3306 |
–user | 用户名 ,默认root |
–password | 密码 |
–backup-file | 备份文件绝对路径 |
–force | 强制恢复 |
使用示例
基本还原
python3 mysql_restore.py --host=127.0.0.1 --port=3306 --user=root --password=123456 --backup-file=/var/backup/backup_db1_20250731_105804.sql.gz
强制还原,没有确认步骤
python3 mysql_backup.py --password=123456 --backup-file=/var/backup/backup_db1_20250731_105804.sql.gz --force
还原请注意
- 还原操作会覆盖现有数据,请谨慎使用,若确定覆盖,也建议先备份当前数据库
- 确保备份文件与MySQL服务器版本兼容
- 大型数据库恢复可能需要较长时间,可以针对脚本实现分割还原
文章来源:
鸿辰
版权声明:本网站可能会转载或引用其他来源的文章、图片、数据等信息。对于这些转载内容,版权归原作者所有。本站尊重原作者的劳动成果,并在可能的情况下注明来源和作者。如有任何版权问题,请及时联系,收到后将第一时间处理。