Files
api/app/backup.py
2025-12-30 11:27:14 +07:00

181 lines
7.0 KiB
Python

import os, subprocess, logging
from rest_framework.decorators import api_view
from rest_framework.response import Response
from app.models import *
from django.db import close_old_connections
from datetime import datetime, timedelta
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
static_folder = os.path.join(BASE_DIR, "static")
#==========================================================================================
def backup_postgres(db_name, db_user, db_host, db_port):
# close old connections
close_old_connections()
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
filename="{}/{}/backup_postgres.log".format(static_folder, 'log')
)
output_dir = "{}/database".format(static_folder)
"""
Backup a PostgreSQL database using pg_dump.
Args:
db_name (str): Name of the database to backup
db_user (str): Database user
db_host (str): Database host
db_port (str): Database port
output_dir (str): Directory to store the backup file
"""
try:
# Create output directory if it doesn't exist
os.makedirs(output_dir, exist_ok=True)
# Generate timestamp for backup file
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
backup_file = os.path.join(output_dir, f'{db_name}_backup_{timestamp}.sql')
start_time = datetime.now()
# Construct pg_dump command
pg_dump_cmd = [
'pg_dump',
'-U', db_user,
'-h', db_host,
'-p', db_port,
'-F', 'p', # Plain SQL format
'-b', # Include large objects
'-v', # Verbose output
db_name
]
# Set PGPASSWORD environment variable to avoid password prompt
os.environ['PGPASSWORD'] = "V59yNLN42a9Q7xT" #input("Enter database password: ")
# Execute pg_dump and save output to file
logging.info(f"Starting backup of database {db_name} to {backup_file}")
with open(backup_file, 'w') as f:
process = subprocess.run(
pg_dump_cmd,
stdout=f,
stderr=subprocess.PIPE,
universal_newlines=True,
check=True
)
logging.info(f"Backup completed successfully: {backup_file}")
print(f"Backup saved to {backup_file}")
backup = Backup(code="DB{}".format(datetime.now().strftime('%Y%m%d%H%M')), name="database-backup",
status=Task_Status.objects.get(pk=4), start_time=start_time, end_time=datetime.now(), file=f'{db_name}_backup_{timestamp}.sql')
backup.save()
return {"status": "success"}
except subprocess.CalledProcessError as e:
logging.error(f"Backup failed: {e.stderr}")
print(f"Error during backup: {e.stderr}")
backup = Backup(code="DB{}".format(datetime.now().strftime('%Y%m%d%H%M')), name="database-backup",
status=Task_Status.objects.get(pk=3), start_time=start_time, end_time=datetime.now())
backup.save()
return {"status": "error", "text": str(e.stderr)}
except Exception as e:
logging.error(f"Unexpected error: {str(e)}")
print(f"Unexpected error: {str(e)}")
backup = Backup(code="DB{}".format(datetime.now().strftime('%Y%m%d%H%M')), name="database-backup",
status=Task_Status.objects.get(pk=3), start_time=start_time, end_time=datetime.now())
backup.save()
return {"status": "error", "text": str(e)}
#==========================================================================================
def restore_postgres(db_name, db_user, db_host, db_port, backup_file):
"""
Restore a PostgreSQL database from a .sql backup file using psql.
Args:
db_name (str): Name of the database to restore
db_user (str): Database user
db_host (str): Database host
db_port (str): Database port
backup_file (str): Path to the backup .sql file
"""
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
filename="{}/{}/backup_postgres.log".format(static_folder, 'log')
)
try:
# Verify backup file exists
if not os.path.exists(backup_file):
raise FileNotFoundError(f"Backup file not found: {backup_file}")
# Construct psql command
psql_cmd = [
'psql',
'-U', db_user,
'-h', db_host,
'-p', db_port,
'-d', db_name,
'-f', backup_file
]
# Set PGPASSWORD environment variable to avoid password prompt
os.environ['PGPASSWORD'] = "V59yNLN42a9Q7xT" #input("Enter database password: ")
# Execute psql to restore the database
logging.info(f"Starting restore of database {db_name} from {backup_file}")
process = subprocess.run(
psql_cmd,
stderr=subprocess.PIPE,
universal_newlines=True,
check=True
)
logging.info(f"Restore completed successfully for database {db_name}")
print(f"Database {db_name} restored successfully from {backup_file}")
except subprocess.CalledProcessError as e:
logging.error(f"Restore failed: {e.stderr}")
print(f"Error during restore: {e.stderr}")
except FileNotFoundError as e:
logging.error(str(e))
print(str(e))
except Exception as e:
logging.error(f"Unexpected error: {str(e)}")
print(f"Unexpected error: {str(e)}")
finally:
# Clean up environment variable
os.environ.pop('PGPASSWORD', None)
#==========================================================================================
def delete_old_dbbackup():
# close old connections
close_old_connections()
# start
begin_date = (datetime.now() - timedelta(days=7)).date()
rows = Backup.objects.filter(name='database-backup', create_time__date__lte=begin_date, status=4)
for row in rows:
file_path = "{}/database/{}".format(static_folder, row.file)
try:
os.remove(file_path)
print("Đã xóa file:", file_path)
row.note = 'file was deleted'
row.save()
except FileNotFoundError:
print("File không tồn tại:", file_path)
except PermissionError:
print("Không có quyền xóa file:", file_path)
except Exception as e:
print("Lỗi khác:", e)
#==========================================================================================
@api_view(['GET', 'POST'])
def backup_database(request):
if request.method == 'GET':
result = backup_postgres("y99", "postgres", "107.155.65.79", "5423")
else:
result = backup_postgres(request.data["db_name"], request.data["db_user"], request.data["db_host"], request.data["db_port"])
return Response(result)