This commit is contained in:
anhduy-tech
2026-01-26 08:15:42 +07:00
parent 2ac88177d8
commit 559f8169c9
15 changed files with 791 additions and 229 deletions

162
app/scheduler.py Normal file
View File

@@ -0,0 +1,162 @@
import logging
from django.utils import timezone
from django.db import transaction
from django.apps import apps
from croniter import croniter
import traceback
from apscheduler.schedulers.background import BackgroundScheduler
from app.models import Batch_Job, Batch_Log, Task_Status
from app.workflow_engine import run_workflow
from app.workflow_utils import resolve_value
# Cấu hình logging cơ bản
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def scan_and_run_due_jobs():
"""
Quét và chạy tất cả các batch job đang active và đã đến hạn.
Đồng thời tự động khởi tạo next_run_at cho các job mới.
"""
from django.conf import settings
import pytz
import datetime
now = timezone.now()
# BƯỚC 1: Tìm và khởi tạo các job có next_run_at là null
try:
uninitialized_jobs = Batch_Job.objects.filter(
is_active=True,
next_run_at__isnull=True
).exclude(cron_schedule__isnull=True).exclude(cron_schedule__exact='')
if uninitialized_jobs.exists():
#logger.info(f"Found {uninitialized_jobs.count()} uninitialized jobs. Calculating next run time...")
# Lấy timezone của dự án
tz = pytz.timezone(settings.TIME_ZONE)
for job in uninitialized_jobs:
try:
schedule_parts = job.cron_schedule.split()
if len(schedule_parts) == 4:
job.cron_schedule = f"{schedule_parts[0]} {schedule_parts[1]} {schedule_parts[2]} {schedule_parts[3]} *"
cron_parts = job.cron_schedule.split()
if len(cron_parts[0]) > 1 and cron_parts[0].startswith('0'):
cron_parts[0] = str(int(cron_parts[0]))
corrected_schedule = " ".join(cron_parts)
# Xử lý timezone một cách tường minh
# 1. Lấy thời gian hiện tại ở dạng naive (không có thông tin timezone)
naive_now = timezone.localtime(now).replace(tzinfo=None)
# 2. Croniter tính toán ra thời gian naive tiếp theo
naive_next = croniter(corrected_schedule, naive_now).get_next(datetime.datetime)
# 3. Gán lại timezone đúng cho kết quả
aware_next = tz.localize(naive_next)
job.next_run_at = aware_next
job.save()
#logger.info(f" -> Initialized job '{job.name}'. Next run at: {job.next_run_at}")
except Exception as e:
logger.error(f" -> Failed to initialize job '{job.name}': {e}")
except Exception as e:
logger.error(f"Error during job initialization phase: {e}")
# BƯỚC 2: Quét và chạy các job đến hạn như bình thường
#logger.info("Scanning for due batch jobs...")
# Lấy các job cần chạy (có next_run_at không null và đã đến hạn)
due_jobs = Batch_Job.objects.filter(is_active=True, next_run_at__lte=now)
if not due_jobs.exists():
#logger.info("-> No due jobs found at this time.")
return
#logger.info(f"-> Found {due_jobs.count()} due jobs to run.")
for job in due_jobs:
#logger.info(f"--- Running job: {job.name} (ID: {job.id}) ---")
running_status, _ = Task_Status.objects.get_or_create(code='running', defaults={'name': 'Running'})
log = Batch_Log.objects.create(
system_date=now.date(),
start_time=now,
status=running_status,
log = {"message": f"Starting job {job.name}"}
)
try:
with transaction.atomic():
params = job.parameters or {}
model_name = params.get("model")
filter_conditions = params.get("filter", {})
context_key = params.get("context_key", "target")
if not model_name:
raise ValueError("Job parameters must include a 'model' to query.")
TargetModel = apps.get_model('app', model_name)
resolved_filters = {k: resolve_value(v, {"now": now, "today": now.date()}) for k, v in filter_conditions.items()}
targets = TargetModel.objects.filter(**resolved_filters)
#logger.info(f" > Found {targets.count()} target objects to process for job '{job.name}'.")
processed_count = 0
for target_item in targets:
try:
workflow_context = {
context_key: target_item,
"batch_job": job,
"now": now,
}
run_workflow(
workflow_code=job.workflow.code,
trigger="create",
context=workflow_context
)
processed_count += 1
except Exception as e:
logger.error(f" > Error processing target {getattr(target_item, 'pk', 'N/A')} in job {job.name}: {e}", exc_info=True)
success_status, _ = Task_Status.objects.get_or_create(code='success', defaults={'name': 'Success'})
log.status = success_status
log.log = {"message": f"Successfully processed {processed_count} of {targets.count()} items."}
except Exception as e:
logger.error(f"!!! Job '{job.name}' failed: {e}", exc_info=True)
failed_status, _ = Task_Status.objects.get_or_create(code='failed', defaults={'name': 'Failed'})
log.status = failed_status
log.log = {"error": str(e), "traceback": traceback.format_exc()}
finally:
end_time = timezone.now()
log.end_time = end_time
log.duration = (end_time - log.start_time).total_seconds()
log.save()
# Đặt lịch cho lần chạy tiếp theo
job.last_run_at = now
# Tái sử dụng logic tính toán đã được sửa lỗi timezone
tz = pytz.timezone(settings.TIME_ZONE)
naive_now = timezone.localtime(now).replace(tzinfo=None)
naive_next = croniter(job.cron_schedule, naive_now).get_next(datetime.datetime)
job.next_run_at = tz.localize(naive_next)
job.save()
#logger.info(f"--- Finished job: {job.name}. Next run at: {job.next_run_at} ---")
def start():
"""
Khởi động APScheduler và thêm tác vụ quét job.
"""
scheduler = BackgroundScheduler(timezone='Asia/Ho_Chi_Minh')
# Chạy tác vụ quét job mỗi 60 giây
scheduler.add_job(scan_and_run_due_jobs, 'interval', seconds=60, id='scan_due_jobs_job', replace_existing=True)
scheduler.start()
#logger.info("APScheduler started... Jobs will be scanned every 60 seconds.")