import logging from django.utils import timezone from django.db import transaction from django.apps import apps from croniter import croniter import traceback from apscheduler.schedulers.background import BackgroundScheduler from app.models import Batch_Job, Batch_Log, Task_Status from app.workflow_engine import run_workflow from app.workflow_utils import resolve_value # Cấu hình logging cơ bản logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) def scan_and_run_due_jobs(): """ Quét và chạy tất cả các batch job đang active và đã đến hạn. Đồng thời tự động khởi tạo next_run_at cho các job mới. """ from django.conf import settings import pytz import datetime now = timezone.now() # BƯỚC 1: Tìm và khởi tạo các job có next_run_at là null try: uninitialized_jobs = Batch_Job.objects.filter( is_active=True, next_run_at__isnull=True ).exclude(cron_schedule__isnull=True).exclude(cron_schedule__exact='') if uninitialized_jobs.exists(): #logger.info(f"Found {uninitialized_jobs.count()} uninitialized jobs. Calculating next run time...") tz = pytz.timezone(settings.TIME_ZONE) for job in uninitialized_jobs: try: schedule_parts = job.cron_schedule.split() if len(schedule_parts) == 4: job.cron_schedule = f"{schedule_parts[0]} {schedule_parts[1]} {schedule_parts[2]} {schedule_parts[3]} *" cron_parts = job.cron_schedule.split() if len(cron_parts[0]) > 1 and cron_parts[0].startswith('0'): cron_parts[0] = str(int(cron_parts[0])) corrected_schedule = " ".join(cron_parts) # Xử lý timezone một cách tường minh # 1. Lấy thời gian hiện tại ở dạng naive (không có thông tin timezone) naive_now = timezone.localtime(now).replace(tzinfo=None) # 2. Croniter tính toán ra thời gian naive tiếp theo naive_next = croniter(corrected_schedule, naive_now).get_next(datetime.datetime) # 3. Gán lại timezone đúng cho kết quả aware_next = tz.localize(naive_next) job.next_run_at = aware_next job.save() #logger.info(f" -> Initialized job '{job.name}'. Next run at: {job.next_run_at}") except Exception as e: logger.error(f" -> Failed to initialize job '{job.name}': {e}") except Exception as e: logger.error(f"Error during job initialization phase: {e}") # BƯỚC 2: Quét và chạy các job đến hạn như bình thường #logger.info("Scanning for due batch jobs...") active_jobs = Batch_Job.objects.filter(is_active=True) # Tách riêng job chạy mỗi phút (* * * * *) every_minute_jobs = active_jobs.filter(cron_schedule="* * * * *") # Các job bình thường (không phải * * * * *), kiểm tra next_run_at normal_due_jobs = active_jobs.filter(next_run_at__lte=now).exclude(cron_schedule="* * * * *") # Gộp hai QuerySet bằng union (hoặc |) due_jobs = every_minute_jobs | normal_due_jobs if not due_jobs.exists(): logger.info("-> No due jobs found at this time.") return #logger.info(f"-> Found {due_jobs.count()} due jobs to run.") for job in due_jobs: #logger.info(f"--- Running job: {job.name} (ID: {job.id}) ---") running_status, _ = Task_Status.objects.get_or_create(code='running', defaults={'name': 'Running'}) log = Batch_Log.objects.create( system_date=now.date(), start_time=now, status=running_status, log = {"message": f"Starting job {job.name}"} ) try: with transaction.atomic(): params = job.parameters or {} model_name = params.get("model") filter_conditions = params.get("filter", {}) context_key = params.get("context_key", "target") if not model_name: raise ValueError("Job parameters must include a 'model' to query.") TargetModel = apps.get_model('app', model_name) resolved_filters = {k: resolve_value(v, {"now": now, "today": now.date()}) for k, v in filter_conditions.items()} targets = TargetModel.objects.filter(**resolved_filters) #logger.info(f" > Found {targets.count()} target objects to process for job '{job.name}'.") processed_count = 0 for target_item in targets: try: workflow_context = { context_key: target_item, "batch_job": job, "now": now, } run_workflow( workflow_code=job.workflow.code, trigger="create", context=workflow_context ) processed_count += 1 except Exception as e: logger.error(f" > Error processing target {getattr(target_item, 'pk', 'N/A')} in job {job.name}: {e}", exc_info=True) success_status, _ = Task_Status.objects.get_or_create(code='success', defaults={'name': 'Success'}) log.status = success_status log.log = {"message": f"Successfully processed {processed_count} of {targets.count()} items."} except Exception as e: logger.error(f"!!! Job '{job.name}' failed: {e}", exc_info=True) failed_status, _ = Task_Status.objects.get_or_create(code='failed', defaults={'name': 'Failed'}) log.status = failed_status log.log = {"error": str(e), "traceback": traceback.format_exc()} finally: end_time = timezone.now() log.end_time = end_time log.duration = (end_time - log.start_time).total_seconds() log.save() # Đặt lịch cho lần chạy tiếp theo job.last_run_at = now # Tái sử dụng logic tính toán đã được sửa lỗi timezone tz = pytz.timezone(settings.TIME_ZONE) naive_now = timezone.localtime(now).replace(tzinfo=None) naive_next = croniter(job.cron_schedule, naive_now).get_next(datetime.datetime) job.next_run_at = tz.localize(naive_next) job.save() #logger.info(f"--- Finished job: {job.name}. Next run at: {job.next_run_at} ---") def start(): """ Khởi động APScheduler và thêm tác vụ quét job. """ scheduler = BackgroundScheduler(timezone='Asia/Ho_Chi_Minh') # Chạy tác vụ quét job mỗi 5 giây scheduler.add_job(scan_and_run_due_jobs, 'interval', seconds=5, id='scan_due_jobs_job', replace_existing=True) scheduler.start()