Files
api/app/scheduler.py
anhduy-tech c2efa46260 changes
2026-02-09 16:56:55 +07:00

168 lines
7.2 KiB
Python

import logging
from django.utils import timezone
from django.db import transaction
from django.apps import apps
from croniter import croniter
import traceback
from apscheduler.schedulers.background import BackgroundScheduler
from app.models import Batch_Job, Batch_Log, Task_Status
from app.workflow_engine import run_workflow
from app.workflow_utils import resolve_value
# Cấu hình logging cơ bản
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def scan_and_run_due_jobs():
"""
Quét và chạy tất cả các batch job đang active và đã đến hạn.
Đồng thời tự động khởi tạo next_run_at cho các job mới.
"""
from django.conf import settings
import pytz
import datetime
now = timezone.now()
# BƯỚC 1: Tìm và khởi tạo các job có next_run_at là null
try:
uninitialized_jobs = Batch_Job.objects.filter(
is_active=True,
next_run_at__isnull=True
).exclude(cron_schedule__isnull=True).exclude(cron_schedule__exact='')
if uninitialized_jobs.exists():
#logger.info(f"Found {uninitialized_jobs.count()} uninitialized jobs. Calculating next run time...")
tz = pytz.timezone(settings.TIME_ZONE)
for job in uninitialized_jobs:
try:
schedule_parts = job.cron_schedule.split()
if len(schedule_parts) == 4:
job.cron_schedule = f"{schedule_parts[0]} {schedule_parts[1]} {schedule_parts[2]} {schedule_parts[3]} *"
cron_parts = job.cron_schedule.split()
if len(cron_parts[0]) > 1 and cron_parts[0].startswith('0'):
cron_parts[0] = str(int(cron_parts[0]))
corrected_schedule = " ".join(cron_parts)
# Xử lý timezone một cách tường minh
# 1. Lấy thời gian hiện tại ở dạng naive (không có thông tin timezone)
naive_now = timezone.localtime(now).replace(tzinfo=None)
# 2. Croniter tính toán ra thời gian naive tiếp theo
naive_next = croniter(corrected_schedule, naive_now).get_next(datetime.datetime)
# 3. Gán lại timezone đúng cho kết quả
aware_next = tz.localize(naive_next)
job.next_run_at = aware_next
job.save()
#logger.info(f" -> Initialized job '{job.name}'. Next run at: {job.next_run_at}")
except Exception as e:
logger.error(f" -> Failed to initialize job '{job.name}': {e}")
except Exception as e:
logger.error(f"Error during job initialization phase: {e}")
# BƯỚC 2: Quét và chạy các job đến hạn như bình thường
#logger.info("Scanning for due batch jobs...")
active_jobs = Batch_Job.objects.filter(is_active=True)
# Tách riêng job chạy mỗi phút (* * * * *)
every_minute_jobs = active_jobs.filter(cron_schedule="* * * * *")
# Các job bình thường (không phải * * * * *), kiểm tra next_run_at
normal_due_jobs = active_jobs.filter(next_run_at__lte=now).exclude(cron_schedule="* * * * *")
# Gộp hai QuerySet bằng union (hoặc |)
due_jobs = every_minute_jobs | normal_due_jobs
if not due_jobs.exists():
logger.info("-> No due jobs found at this time.")
return
#logger.info(f"-> Found {due_jobs.count()} due jobs to run.")
for job in due_jobs:
#logger.info(f"--- Running job: {job.name} (ID: {job.id}) ---")
running_status, _ = Task_Status.objects.get_or_create(code='running', defaults={'name': 'Running'})
log = Batch_Log.objects.create(
system_date=now.date(),
start_time=now,
status=running_status,
log = {"message": f"Starting job {job.name}"}
)
try:
with transaction.atomic():
params = job.parameters or {}
model_name = params.get("model")
filter_conditions = params.get("filter", {})
context_key = params.get("context_key", "target")
if not model_name:
raise ValueError("Job parameters must include a 'model' to query.")
TargetModel = apps.get_model('app', model_name)
resolved_filters = {k: resolve_value(v, {"now": now, "today": now.date()}) for k, v in filter_conditions.items()}
targets = TargetModel.objects.filter(**resolved_filters)
#logger.info(f" > Found {targets.count()} target objects to process for job '{job.name}'.")
processed_count = 0
for target_item in targets:
try:
workflow_context = {
context_key: target_item,
"batch_job": job,
"now": now,
}
run_workflow(
workflow_code=job.workflow.code,
trigger="create",
context=workflow_context
)
processed_count += 1
except Exception as e:
logger.error(f" > Error processing target {getattr(target_item, 'pk', 'N/A')} in job {job.name}: {e}", exc_info=True)
success_status, _ = Task_Status.objects.get_or_create(code='success', defaults={'name': 'Success'})
log.status = success_status
log.log = {"message": f"Successfully processed {processed_count} of {targets.count()} items."}
except Exception as e:
logger.error(f"!!! Job '{job.name}' failed: {e}", exc_info=True)
failed_status, _ = Task_Status.objects.get_or_create(code='failed', defaults={'name': 'Failed'})
log.status = failed_status
log.log = {"error": str(e), "traceback": traceback.format_exc()}
finally:
end_time = timezone.now()
log.end_time = end_time
log.duration = (end_time - log.start_time).total_seconds()
log.save()
# Đặt lịch cho lần chạy tiếp theo
job.last_run_at = now
# Tái sử dụng logic tính toán đã được sửa lỗi timezone
tz = pytz.timezone(settings.TIME_ZONE)
naive_now = timezone.localtime(now).replace(tzinfo=None)
naive_next = croniter(job.cron_schedule, naive_now).get_next(datetime.datetime)
job.next_run_at = tz.localize(naive_next)
job.save()
#logger.info(f"--- Finished job: {job.name}. Next run at: {job.next_run_at} ---")
def start():
"""
Khởi động APScheduler và thêm tác vụ quét job.
"""
scheduler = BackgroundScheduler(timezone='Asia/Ho_Chi_Minh')
# Chạy tác vụ quét job mỗi 5 giây
scheduler.add_job(scan_and_run_due_jobs, 'interval', seconds=5, id='scan_due_jobs_job', replace_existing=True)
scheduler.start()