from django.test import Client from app.workflow_registry import register_action from app.workflow_utils import resolve_value from app.document_generator import DocumentGenerator from app.jobemail import EmailJobRunner from app.payment import account_entry_api from django.apps import apps import re import datetime client = Client() # ============================ # Logic xử lý Map Expression ($map) # ============================ def handle_map_expression(expression, context): """ Xử lý biểu thức đặc biệt để biến đổi danh sách dữ liệu. Cú pháp: $map(data.installments, {amount: amount, due_date: $add_days(created_at, gap)}) """ # Regex tách nguồn dữ liệu và template match = re.match(r"^\$map\(([^,]+),\s*\{(.*)\}\)$", expression.strip()) if not match: return [] source_path = match.group(1).strip() template_content = match.group(2).strip() # Lấy danh sách dữ liệu gốc từ context (ví dụ: data.installments) source_data = resolve_value(source_path, context) if not isinstance(source_data, list): return [] # Tìm các cặp key: value trong template định nghĩa # Hỗ trợ cả trường hợp value là một hàm lồng như $add_days pairs = re.findall(r"(\w+):\s*(\$add_days\([^)]+\)|[^{},]+)", template_content) results = [] for index, item in enumerate(source_data): # Tạo context riêng cho từng item để resolve item_context = {**context, "item": item, "index": index} processed_row = {} for key, val_expr in pairs: val_expr = val_expr.strip() # 1. Xử lý biến chỉ mục ($index) if val_expr == "$index": processed_row[key] = index # 2. Xử lý hàm cộng ngày ($add_days) elif "$add_days" in val_expr: m = re.search(r"\$add_days\(([^,]+),\s*([^)]+)\)", val_expr) if m: base_key = m.group(1).strip() days_key = m.group(2).strip() # Tìm giá trị ngày gốc và số ngày cần cộng base_date = item.get(base_key) if base_key in item else resolve_value(base_key, item_context) days = item.get(days_key) if days_key in item else resolve_value(days_key, item_context) try: # Chuyển đổi string sang date object if isinstance(base_date, str): base_date = datetime.datetime.strptime(base_date[:10], "%Y-%m-%d").date() new_date = base_date + datetime.timedelta(days=int(days or 0)) processed_row[key] = new_date.isoformat() except Exception: processed_row[key] = str(base_date) # Trả về bản gốc nếu lỗi # 3. Xử lý lấy giá trị từ item hiện tại hoặc context chung else: if val_expr == "$item": processed_row[key] = item elif val_expr == "$index": processed_row[key] = index elif val_expr in item: processed_row[key] = item[val_expr] else: processed_row[key] = resolve_value(val_expr, item_context) results.append(processed_row) return results # ============================ # CRUD thông qua API có sẵn # ============================ def deep_resolve_values(data, context): if isinstance(data, dict): return {k: deep_resolve_values(v, context) for k, v in data.items()} elif isinstance(data, list): return [deep_resolve_values(item, context) for item in data] elif isinstance(data, str): # Workaround for resolver bug: handle strings that are only a placeholder match = re.fullmatch(r"\{([^}]+)\}", data) if match: # The path is the content inside the braces, e.g., "transaction_detail.id" path = match.group(1) # resolve_value works on raw paths, so call it directly return resolve_value(path, context) else: # This handles complex strings like "/prefix/{path}/" or normal strings return resolve_value(data, context) else: return data @register_action("API_CALL", schema={"required": ["method", "url"]}) def api_call_action(params, context): """Thực hiện gọi API nội bộ bằng Django Test Client""" method = params["method"].upper() url = resolve_value(params["url"], context) save_as = params.get("save_as") raw_body = params.get("body") # Nếu body chứa biểu thức $map, thực hiện biến đổi dữ liệu trước khi gửi if isinstance(raw_body, str) and raw_body.startswith("$map"): body = handle_map_expression(raw_body, context) elif isinstance(raw_body, dict): body = deep_resolve_values(raw_body, context) else: body = resolve_value(raw_body, context) print(f" [API_CALL] {method} {url}") # Thực hiện request if method == "POST": resp = client.post(url, body, content_type="application/json") elif method == "PATCH": resp = client.patch(url, body, content_type="application/json") elif method == "PUT": resp = client.put(url, body, content_type="application/json") else: resp = client.get(url) print(f" [API_CALL] Status Code: {resp.status_code}") if resp.status_code >= 400: error_content = resp.content.decode('utf-8') print(f" [API_CALL] Error: {error_content}") raise Exception(f"API Call failed: {error_content}") result = resp.json() print(f" [API_CALL] Result: {result}") if save_as: context[save_as] = result return result # ============================ # Gọi Utility / API bên ngoài # ============================ @register_action("CALL_UTILITY", schema={"required": ["utility_code"]}) def call_utility_action(params, context): Utility = apps.get_model("app", "Utility") util = Utility.objects.get(code=params["utility_code"]) module_path = util.integration_module if not module_path: return {"error": "utility has no module"} from django.utils.module_loading import import_string func = import_string(module_path) resolved_params = {k: resolve_value(v, context) for k,v in params.get("params", {}).items()} return func(**resolved_params) @register_action("SEND_EMAIL", schema={"required": ["template"]}) def send_email_action(params, context): tpl_name = params["template"] tpl_pks = {k: resolve_value(v, context) for k,v in params.get("context_pks", {}).items()} Email_Template = apps.get_model("app", "Email_Template") try: template_obj = Email_Template.objects.get(name=tpl_name) except Email_Template.DoesNotExist: raise Exception(f"Email template '{tpl_name}' not found") runner = EmailJobRunner(template=template_obj, context_pks=tpl_pks) success = runner.run() return {"sent": success} # ============================ # Tạo Document # ============================ @register_action("GENERATE_DOCUMENT", schema={"required": ["document_code"]}) def generate_document_action(params, context): code = resolve_value(params["document_code"], context) pks = {k: str(resolve_value(v, context)) for k, v in params.get("context_pks", {}).items()} save_as = params.get("save_as") print(f" [GEN_DOC] Generating for code: {code}") gen = DocumentGenerator(document_code=code, context_pks=pks) result = gen.generate() formatted_result = [{ "pdf": result.get("pdf"), "file": result.get("file"), "name": result.get("name"), "code": code }] if save_as: context[save_as] = formatted_result print(f" [GEN_DOC] Success: Saved to context as '{save_as}'") return formatted_result # ============================ # Hạch toán # ============================ @register_action("ACCOUNT_ENTRY", schema={"required": ["amount", "category_code"]}) def account_entry_action(params, context): amount = resolve_value(params["amount"], context) content = params.get("content", "") userid = resolve_value(params.get("userid"), context) return account_entry_api( code=params.get("internal_account", "HOAC02VND"), amount=amount, content=content, type=params.get("type", "CR"), category=apps.get_model("app", "Entry_Category").objects.get(code=params["category_code"]).id, userid=userid, ref=params.get("ref") ) # ============================ # Tìm bản ghi # ============================ @register_action("LOOKUP_DATA", schema={"required": ["model_name", "lookup_field", "lookup_value"]}) def lookup_data_action(params, context): model_name = params["model_name"] field = params["lookup_field"] save_as = params.get("save_as") # Lấy giá trị thực tế (ví dụ: "reserved") value = resolve_value(params["lookup_value"], context) print(f" [LOOKUP] Searching {model_name} where {field} = '{value}'") try: Model = apps.get_model("app", model_name) obj = Model.objects.filter(**{field: value}).first() if not obj: print(f" [LOOKUP] ERROR: Not found!") raise Exception(f"Lookup failed: {model_name} with {field}={value} not found.") if save_as: context[save_as] = obj print(f" [LOOKUP] Success: Found ID {obj.id}, saved to context as '{save_as}'") return obj.id except Exception as e: print(f" [LOOKUP] EXCEPTION: {str(e)}") raise e