Files
api/app/workflow_actions.py
anhduy-tech 20cbb4ab39 changes
2026-01-20 11:33:42 +07:00

291 lines
10 KiB
Python

from django.test import Client
from app.workflow_registry import register_action
from app.workflow_utils import resolve_value
from app.document_generator import DocumentGenerator
from app.jobemail import EmailJobRunner
from app.payment import account_entry_api
from django.apps import apps
import re
import datetime
client = Client()
# ============================
# Logic xử lý Map Expression ($map)
# ============================
def handle_map_expression(expression, context):
"""
Xử lý biểu thức đặc biệt để biến đổi danh sách dữ liệu.
Cú pháp: $map(data.installments, {amount: amount, due_date: $add_days(created_at, gap)})
"""
# Regex tách nguồn dữ liệu và template
match = re.match(r"^\$map\(([^,]+),\s*\{(.*)\}\)$", expression.strip())
if not match:
return []
source_path = match.group(1).strip()
template_content = match.group(2).strip()
# Lấy danh sách dữ liệu gốc từ context (ví dụ: data.installments)
source_data = resolve_value(source_path, context)
if not isinstance(source_data, list):
return []
# Tìm các cặp key: value trong template định nghĩa
# Hỗ trợ cả trường hợp value là một hàm lồng như $add_days
pairs = re.findall(r"(\w+):\s*(\$add_days\([^)]+\)|[^{},]+)", template_content)
results = []
for index, item in enumerate(source_data):
# Tạo context riêng cho từng item để resolve
item_context = {**context, "item": item, "index": index}
processed_row = {}
for key, val_expr in pairs:
val_expr = val_expr.strip()
# 1. Xử lý biến chỉ mục ($index)
if val_expr == "$index":
processed_row[key] = index
# 2. Xử lý hàm cộng ngày ($add_days)
elif "$add_days" in val_expr:
m = re.search(r"\$add_days\(([^,]+),\s*([^)]+)\)", val_expr)
if m:
base_key = m.group(1).strip()
days_key = m.group(2).strip()
# Tìm giá trị ngày gốc và số ngày cần cộng
base_date = item.get(base_key) if base_key in item else resolve_value(base_key, item_context)
days = item.get(days_key) if days_key in item else resolve_value(days_key, item_context)
try:
# Chuyển đổi string sang date object
if isinstance(base_date, str):
base_date = datetime.datetime.strptime(base_date[:10], "%Y-%m-%d").date()
new_date = base_date + datetime.timedelta(days=int(days or 0))
processed_row[key] = new_date.isoformat()
except Exception:
processed_row[key] = str(base_date) # Trả về bản gốc nếu lỗi
# 3. Xử lý lấy giá trị từ item hiện tại hoặc context chung
else:
if val_expr == "$item":
processed_row[key] = item
elif val_expr == "$index":
processed_row[key] = index
elif val_expr in item:
processed_row[key] = item[val_expr]
else:
processed_row[key] = resolve_value(val_expr, item_context)
results.append(processed_row)
return results
# ============================
# CRUD thông qua API có sẵn
# ============================
def deep_resolve_values(data, context):
if isinstance(data, dict):
return {k: deep_resolve_values(v, context) for k, v in data.items()}
elif isinstance(data, list):
return [deep_resolve_values(item, context) for item in data]
elif isinstance(data, str):
# Workaround for resolver bug: handle strings that are only a placeholder
match = re.fullmatch(r"\{([^}]+)\}", data)
if match:
# The path is the content inside the braces, e.g., "transaction_detail.id"
path = match.group(1)
# resolve_value works on raw paths, so call it directly
return resolve_value(path, context)
else:
# This handles complex strings like "/prefix/{path}/" or normal strings
return resolve_value(data, context)
else:
return data
@register_action("API_CALL", schema={"required": ["method", "url"]})
def api_call_action(params, context):
"""Thực hiện gọi API nội bộ bằng Django Test Client"""
method = params["method"].upper()
url = resolve_value(params["url"], context)
save_as = params.get("save_as")
raw_body = params.get("body")
# ============================
# Resolve body
# ============================
if isinstance(raw_body, str) and raw_body.startswith("$map"):
body = handle_map_expression(raw_body, context)
elif isinstance(raw_body, dict):
body = deep_resolve_values(raw_body, context)
elif raw_body is None:
body = None
else:
body = resolve_value(raw_body, context)
print(f" [API_CALL] {method} {url}")
print(f" [API_CALL] Resolved Body: {body}")
# ============================
# Execute request
# ============================
if method == "POST":
resp = client.post(url, body, content_type="application/json")
elif method == "PATCH":
resp = client.patch(url, body, content_type="application/json")
elif method == "PUT":
resp = client.put(url, body, content_type="application/json")
elif method == "DELETE":
resp = client.delete(url)
else:
resp = client.get(url)
print(f" [API_CALL] Status Code: {resp.status_code}")
# ============================
# Handle error
# ============================
if resp.status_code >= 400:
error_content = resp.content.decode("utf-8") if resp.content else ""
print(f" [API_CALL] Error: {error_content}")
raise Exception(f"API Call failed: {error_content}")
# ============================
# Handle response safely
# ============================
if resp.status_code == 204 or not resp.content:
# DELETE / No Content
result = {"deleted": True}
else:
try:
result = resp.json()
except ValueError:
# Fallback nếu response không phải JSON
result = resp.content.decode("utf-8")
print(f" [API_CALL] Result: {result}")
if save_as:
context[save_as] = result
return result
# ============================
# Gọi Utility / API bên ngoài
# ============================
@register_action("CALL_UTILITY", schema={"required": ["utility_code"]})
def call_utility_action(params, context):
Utility = apps.get_model("app", "Utility")
util = Utility.objects.get(code=params["utility_code"])
module_path = util.integration_module
if not module_path:
return {"error": "utility has no module"}
from django.utils.module_loading import import_string
func = import_string(module_path)
resolved_params = {k: resolve_value(v, context) for k,v in params.get("params", {}).items()}
return func(**resolved_params)
@register_action("SEND_EMAIL", schema={"required": ["template"]})
def send_email_action(params, context):
tpl_name = params["template"]
tpl_pks = {k: resolve_value(v, context) for k,v in params.get("context_pks", {}).items()}
Email_Template = apps.get_model("app", "Email_Template")
try:
template_obj = Email_Template.objects.get(name=tpl_name)
except Email_Template.DoesNotExist:
raise Exception(f"Email template '{tpl_name}' not found")
runner = EmailJobRunner(template=template_obj, context_pks=tpl_pks)
success = runner.run()
return {"sent": success}
# ============================
# Tạo Document
# ============================
@register_action("GENERATE_DOCUMENT", schema={"required": ["document_code"]})
def generate_document_action(params, context):
code = resolve_value(params["document_code"], context)
pks = {k: str(resolve_value(v, context)) for k, v in params.get("context_pks", {}).items()}
save_as = params.get("save_as")
print(f" [GEN_DOC] Generating for code: {code}")
gen = DocumentGenerator(document_code=code, context_pks=pks)
result = gen.generate()
formatted_result = [{
"pdf": result.get("pdf"),
"file": result.get("file"),
"name": result.get("name"),
"code": code
}]
if save_as:
context[save_as] = formatted_result
print(f" [GEN_DOC] Success: Saved to context as '{save_as}'")
return formatted_result
# ============================
# Hạch toán
# ============================
@register_action("ACCOUNT_ENTRY", schema={"required": ["amount", "category_code"]})
def account_entry_action(params, context):
amount = resolve_value(params["amount"], context)
content = params.get("content", "")
userid = resolve_value(params.get("userid"), context)
return account_entry_api(
code=params.get("internal_account", "HOAC02VND"),
amount=amount,
content=content,
type=params.get("type", "CR"),
category=apps.get_model("app", "Entry_Category").objects.get(code=params["category_code"]).id,
userid=userid,
ref=params.get("ref")
)
# ============================
# Tìm bản ghi
# ============================
@register_action("LOOKUP_DATA", schema={"required": ["model_name", "lookup_field", "lookup_value"]})
def lookup_data_action(params, context):
model_name = params["model_name"]
field = params["lookup_field"]
save_as = params.get("save_as")
# Lấy giá trị thực tế (ví dụ: "reserved")
value = resolve_value(params["lookup_value"], context)
print(f" [LOOKUP] Searching {model_name} where {field} = '{value}'")
try:
Model = apps.get_model("app", model_name)
obj = Model.objects.filter(**{field: value}).first()
if not obj:
print(f" [LOOKUP] ERROR: Not found!")
raise Exception(f"Lookup failed: {model_name} with {field}={value} not found.")
if save_as:
context[save_as] = obj
print(f" [LOOKUP] Success: Found ID {obj.id}, saved to context as '{save_as}'")
return obj.id
except Exception as e:
print(f" [LOOKUP] EXCEPTION: {str(e)}")
raise e