Files
api/app/run_batch_jobs.py
anhduy-tech 2ac88177d8 changes
2026-01-23 22:05:18 +07:00

105 lines
4.8 KiB
Python

from django.core.management.base import BaseCommand
from django.utils import timezone
from django.db import transaction
from django.apps import apps
from croniter import croniter
import traceback
from app.models import Batch_Job, Batch_Log, Task_Status
from app.workflow_engine import run_workflow
from app.workflow_utils import resolve_value
class Command(BaseCommand):
help = 'Runs all active batch jobs that are due.'
def handle(self, *args, **options):
self.stdout.write(f"[{timezone.now()}] Starting batch job runner...")
now = timezone.now()
# Lấy các job cần chạy (active và đã đến lúc chạy)
due_jobs = Batch_Job.objects.filter(
is_active=True,
next_run_at__lte=now
)
self.stdout.write(f"Found {due_jobs.count()} due jobs to run.")
for job in due_jobs:
self.stdout.write(f"--- Running job: {job.name} (ID: {job.id}) ---")
# Bắt đầu ghi log
running_status, _ = Task_Status.objects.get_or_create(code='running', defaults={'name': 'Running'})
log = Batch_Log.objects.create(
system_date=now.date(),
start_time=now,
status=running_status
)
try:
with transaction.atomic():
# 1. Tìm dữ liệu để xử lý dựa trên tham số của job
params = job.parameters or {}
model_name = params.get("model")
filter_conditions = params.get("filter", {})
context_key = params.get("context_key", "target") # Tên biến trong context workflow
if not model_name:
raise ValueError("Job parameters must include a 'model' to query.")
TargetModel = apps.get_model('app', model_name)
# Resolve các giá trị động trong điều kiện filter (ví dụ: $today)
resolved_filters = {k: resolve_value(v, {"now": now, "today": now.date()}) for k, v in filter_conditions.items()}
targets = TargetModel.objects.filter(**resolved_filters)
self.stdout.write(f" > Found {targets.count()} target objects to process.")
# 2. Thực thi workflow cho từng đối tượng
processed_count = 0
for target_item in targets:
try:
# Chuẩn bị context cho workflow
workflow_context = {
context_key: target_item,
"batch_job": job,
"now": now,
}
run_workflow(
workflow_code=job.workflow.code,
trigger="BATCH_JOB",
context=workflow_context
)
processed_count += 1
except Exception as e:
self.stderr.write(f" > Error processing target {getattr(target_item, 'pk', 'N/A')} in job {job.name}: {e}")
# Hiện tại, nếu một item lỗi thì bỏ qua và chạy item tiếp theo
# Có thể thay đổi logic để dừng cả job nếu cần
# 3. Cập nhật log khi thành công
success_status, _ = Task_Status.objects.get_or_create(code='success', defaults={'name': 'Success'})
log.status = success_status
log.log = {"message": f"Successfully processed {processed_count} of {targets.count()} items."}
except Exception as e:
self.stderr.write(f"!!! Job '{job.name}' failed: {e}")
traceback.print_exc()
failed_status, _ = Task_Status.objects.get_or_create(code='failed', defaults={'name': 'Failed'})
log.status = failed_status
log.log = {"error": str(e), "traceback": traceback.format_exc()}
finally:
# 4. Hoàn tất log và đặt lịch chạy lại cho job
end_time = timezone.now()
log.end_time = end_time
log.duration = (end_time - log.start_time).total_seconds()
log.save()
# Đặt lịch cho lần chạy tiếp theo
job.last_run_at = now
job.next_run_at = croniter(job.cron_schedule, now).get_next(timezone.datetime)
job.save()
self.stdout.write(f"--- Finished job: {job.name}. Next run at: {job.next_run_at} ---")
self.stdout.write(f"[{timezone.now()}] Batch job runner finished.")