From 2ac88177d8a571a8473bf5c1f55137dc4029fe9d Mon Sep 17 00:00:00 2001 From: anhduy-tech Date: Fri, 23 Jan 2026 22:04:52 +0700 Subject: [PATCH] changes --- app/models.py | 23 ++++++++-- app/run_batch_jobs.py | 104 ++++++++++++++++++++++++++++++++++++++++++ app/workflow_utils.py | 6 ++- requirements.txt | 1 + 4 files changed, 129 insertions(+), 5 deletions(-) create mode 100644 app/run_batch_jobs.py diff --git a/app/models.py b/app/models.py index 0b75d56a..7e0d6e23 100644 --- a/app/models.py +++ b/app/models.py @@ -1998,6 +1998,7 @@ class Email_Job(models.Model): create_time = models.DateTimeField(null=True, auto_now_add=True) update_time = models.DateTimeField(null=True, auto_now=True) + class Meta: db_table = 'email_job' @@ -2011,8 +2012,6 @@ class Transaction_Discount(models.Model): class Meta: db_table = 'transaction_discount' - unique_together = ('transaction', 'discount') - class Co_Ownership(models.Model): transaction = models.ForeignKey(Transaction, null=False, related_name='co_op', on_delete=models.PROTECT) @@ -2142,4 +2141,22 @@ class Utility(models.Model): verbose_name_plural = 'Utilities' def __str__(self): - return f"{self.name} ({self.code}) - Type: {self.utility_type}" \ No newline at end of file + return f"{self.name} ({self.code}) - Type: {self.utility_type}" + +class Batch_Job(models.Model): + name = models.CharField(max_length=255) + workflow = models.ForeignKey(Workflow, on_delete=models.PROTECT, help_text="Workflow to execute") + cron_schedule = models.CharField(max_length=100, help_text="Cron-like schedule (e.g., '0 0 * * *' for daily at midnight)") + parameters = models.JSONField(default=dict, blank=True, help_text="Parameters to find data for the workflow context") + is_active = models.BooleanField(default=True, db_index=True) + last_run_at = models.DateTimeField(null=True, blank=True) + next_run_at = models.DateTimeField(null=True, blank=True, db_index=True) + + create_time = models.DateTimeField(auto_now_add=True) + update_time = models.DateTimeField(auto_now=True) + + def __str__(self): + return self.name + + class Meta: + db_table = 'batch_job' \ No newline at end of file diff --git a/app/run_batch_jobs.py b/app/run_batch_jobs.py new file mode 100644 index 00000000..613e74db --- /dev/null +++ b/app/run_batch_jobs.py @@ -0,0 +1,104 @@ +from django.core.management.base import BaseCommand +from django.utils import timezone +from django.db import transaction +from django.apps import apps +from croniter import croniter +import traceback + +from app.models import Batch_Job, Batch_Log, Task_Status +from app.workflow_engine import run_workflow +from app.workflow_utils import resolve_value + +class Command(BaseCommand): + help = 'Runs all active batch jobs that are due.' + + def handle(self, *args, **options): + self.stdout.write(f"[{timezone.now()}] Starting batch job runner...") + + now = timezone.now() + # Lấy các job cần chạy (active và đã đến lúc chạy) + due_jobs = Batch_Job.objects.filter( + is_active=True, + next_run_at__lte=now + ) + + self.stdout.write(f"Found {due_jobs.count()} due jobs to run.") + + for job in due_jobs: + self.stdout.write(f"--- Running job: {job.name} (ID: {job.id}) ---") + + # Bắt đầu ghi log + running_status, _ = Task_Status.objects.get_or_create(code='running', defaults={'name': 'Running'}) + log = Batch_Log.objects.create( + system_date=now.date(), + start_time=now, + status=running_status + ) + + try: + with transaction.atomic(): + # 1. Tìm dữ liệu để xử lý dựa trên tham số của job + params = job.parameters or {} + model_name = params.get("model") + filter_conditions = params.get("filter", {}) + context_key = params.get("context_key", "target") # Tên biến trong context workflow + + if not model_name: + raise ValueError("Job parameters must include a 'model' to query.") + + TargetModel = apps.get_model('app', model_name) + + # Resolve các giá trị động trong điều kiện filter (ví dụ: $today) + resolved_filters = {k: resolve_value(v, {"now": now, "today": now.date()}) for k, v in filter_conditions.items()} + + targets = TargetModel.objects.filter(**resolved_filters) + self.stdout.write(f" > Found {targets.count()} target objects to process.") + + # 2. Thực thi workflow cho từng đối tượng + processed_count = 0 + for target_item in targets: + try: + # Chuẩn bị context cho workflow + workflow_context = { + context_key: target_item, + "batch_job": job, + "now": now, + } + run_workflow( + workflow_code=job.workflow.code, + trigger="BATCH_JOB", + context=workflow_context + ) + processed_count += 1 + except Exception as e: + self.stderr.write(f" > Error processing target {getattr(target_item, 'pk', 'N/A')} in job {job.name}: {e}") + # Hiện tại, nếu một item lỗi thì bỏ qua và chạy item tiếp theo + # Có thể thay đổi logic để dừng cả job nếu cần + + # 3. Cập nhật log khi thành công + success_status, _ = Task_Status.objects.get_or_create(code='success', defaults={'name': 'Success'}) + log.status = success_status + log.log = {"message": f"Successfully processed {processed_count} of {targets.count()} items."} + + except Exception as e: + self.stderr.write(f"!!! Job '{job.name}' failed: {e}") + traceback.print_exc() + failed_status, _ = Task_Status.objects.get_or_create(code='failed', defaults={'name': 'Failed'}) + log.status = failed_status + log.log = {"error": str(e), "traceback": traceback.format_exc()} + + finally: + # 4. Hoàn tất log và đặt lịch chạy lại cho job + end_time = timezone.now() + log.end_time = end_time + log.duration = (end_time - log.start_time).total_seconds() + log.save() + + # Đặt lịch cho lần chạy tiếp theo + job.last_run_at = now + job.next_run_at = croniter(job.cron_schedule, now).get_next(timezone.datetime) + job.save() + + self.stdout.write(f"--- Finished job: {job.name}. Next run at: {job.next_run_at} ---") + + self.stdout.write(f"[{timezone.now()}] Batch job runner finished.") diff --git a/app/workflow_utils.py b/app/workflow_utils.py index 3b160130..45d1f455 100644 --- a/app/workflow_utils.py +++ b/app/workflow_utils.py @@ -36,9 +36,9 @@ def resolve_value(expr, context): return datetime.now().isoformat(timespec='seconds') # 2025-12-21T14:30:45 # ============================================= - # 2. Hàm toán học: $add(a, b), $sub(a, b) + # 2. Hàm toán học: $add(a, b), $sub(a, b), $multiply(a, b) # ============================================= - func_match = re.match(r"^\$(add|sub)\(([^,]+),\s*([^)]+)\)$", expr) + func_match = re.match(r"^\$(add|sub|multiply)\(([^,]+),\s*([^)]+)\)$", expr) if func_match: func_name = func_match.group(1) arg1_val = resolve_value(func_match.group(2).strip(), context) @@ -51,6 +51,8 @@ def resolve_value(expr, context): return num1 + num2 if func_name == "sub": return num1 - num2 + if func_name == "multiply": + return num1 * num2 except (ValueError, TypeError): print(f" [ERROR] Math function {func_name} failed with values: {arg1_val}, {arg2_val}") return 0 diff --git a/requirements.txt b/requirements.txt index f5a1692b..f330c3bc 100644 --- a/requirements.txt +++ b/requirements.txt @@ -20,4 +20,5 @@ num2words mammoth paramiko channels +croniter uvicorn[standard] \ No newline at end of file