Files
api/app/workflow_actions.py
anhduy-tech 5b360753d8 changes
2026-02-04 21:01:54 +07:00

375 lines
14 KiB
Python

from django.test import Client
from app.workflow_registry import register_action
from app.workflow_utils import resolve_value
from app.document_generator import DocumentGenerator
from app.jobemail import EmailJobRunner
from app.payment import account_entry_api
from django.apps import apps
import re
import datetime
client = Client()
# ============================
# Logic xử lý Map Expression ($map)
# ============================
def handle_map_expression(expression, context):
"""
Xử lý biểu thức đặc biệt để biến đổi danh sách dữ liệu.
Cú pháp: $map(data.installments, {amount: amount, due_date: $add_days(created_at, gap)})
"""
# Regex tách nguồn dữ liệu và template
match = re.match(r"^\$map\(([^,]+),\s*\{(.*)\}\)$", expression.strip())
if not match:
return []
source_path = match.group(1).strip()
template_content = match.group(2).strip()
# Lấy danh sách dữ liệu gốc từ context (ví dụ: data.installments)
source_data = resolve_value(source_path, context)
if not isinstance(source_data, list):
return []
# Tìm các cặp key: value trong template định nghĩa
# Hỗ trợ cả trường hợp value là một hàm lồng như $add_days
pairs = re.findall(r"(\w+):\s*(\$add_days\([^)]+\)|[^{},]+)", template_content)
results = []
for index, item in enumerate(source_data):
# Tạo context riêng cho từng item để resolve
item_context = {**context, "item": item, "index": index}
processed_row = {}
for key, val_expr in pairs:
val_expr = val_expr.strip()
# 1. Xử lý biến chỉ mục ($index)
if val_expr == "$index":
processed_row[key] = index
# 2. Xử lý hàm cộng ngày ($add_days)
elif "$add_days" in val_expr:
m = re.search(r"\$add_days\(([^,]+),\s*([^)]+)\)", val_expr)
if m:
base_key = m.group(1).strip()
days_key = m.group(2).strip()
# Tìm giá trị ngày gốc và số ngày cần cộng
base_date = item.get(base_key) if base_key in item else resolve_value(base_key, item_context)
days = item.get(days_key) if days_key in item else resolve_value(days_key, item_context)
try:
# Chuyển đổi string sang date object
if isinstance(base_date, str):
base_date = datetime.datetime.strptime(base_date[:10], "%Y-%m-%d").date()
new_date = base_date + datetime.timedelta(days=int(days or 0))
processed_row[key] = new_date.isoformat()
except Exception:
processed_row[key] = str(base_date) # Trả về bản gốc nếu lỗi
# 3. Xử lý lấy giá trị từ item hiện tại hoặc context chung
else:
if val_expr == "$item":
processed_row[key] = item
elif val_expr == "$index":
processed_row[key] = index
elif val_expr in item:
processed_row[key] = item[val_expr]
else:
processed_row[key] = resolve_value(val_expr, item_context)
results.append(processed_row)
return results
# ============================
# CRUD thông qua API có sẵn
# ============================
def deep_resolve_values(data, context):
if isinstance(data, dict):
return {k: deep_resolve_values(v, context) for k, v in data.items()}
elif isinstance(data, list):
return [deep_resolve_values(item, context) for item in data]
elif isinstance(data, str):
# Workaround for resolver bug: handle strings that are only a placeholder
match = re.fullmatch(r"\{([^}]+)\}", data)
if match:
# The path is the content inside the braces, e.g., "transaction_detail.id"
path = match.group(1)
# resolve_value works on raw paths, so call it directly
return resolve_value(path, context)
else:
# This handles complex strings like "/prefix/{path}/" or normal strings
return resolve_value(data, context)
else:
return data
@register_action("API_CALL", schema={"required": ["method", "url"]})
def api_call_action(params, context):
"""Thực hiện gọi API nội bộ bằng Django Test Client"""
method = params["method"].upper()
url = resolve_value(params["url"], context)
save_as = params.get("save_as")
raw_body = params.get("body")
# ============================
# Resolve body
# ============================
if isinstance(raw_body, str) and raw_body.startswith("$map"):
body = handle_map_expression(raw_body, context)
elif isinstance(raw_body, dict):
body = deep_resolve_values(raw_body, context)
elif raw_body is None:
body = None
else:
body = resolve_value(raw_body, context)
print(f" [API_CALL] {method} {url}")
print(f" [API_CALL] Resolved Body: {body}")
# ============================
# Execute request
# ============================
if method == "POST":
resp = client.post(url, body, content_type="application/json")
elif method == "PATCH":
resp = client.patch(url, body, content_type="application/json")
elif method == "PUT":
resp = client.put(url, body, content_type="application/json")
elif method == "DELETE":
resp = client.delete(url)
else:
resp = client.get(url)
print(f" [API_CALL] Status Code: {resp.status_code}")
# ============================
# Handle error
# ============================
if resp.status_code >= 400:
error_content = resp.content.decode("utf-8") if resp.content else ""
print(f" [API_CALL] Error: {error_content}")
raise Exception(f"API Call failed: {error_content}")
# ============================
# Handle response safely
# ============================
if resp.status_code == 204 or not resp.content:
# DELETE / No Content
result = {"deleted": True}
else:
try:
result = resp.json()
except ValueError:
# Fallback nếu response không phải JSON
result = resp.content.decode("utf-8")
print(f" [API_CALL] Result: {result}")
if save_as:
context[save_as] = result
return result
# ============================
# Gọi Utility / API bên ngoài
# ============================
@register_action("CALL_UTILITY", schema={"required": ["utility_code"]})
def call_utility_action(params, context):
Utility = apps.get_model("app", "Utility")
util = Utility.objects.get(code=params["utility_code"])
module_path = util.integration_module
if not module_path:
return {"error": "utility has no module"}
from django.utils.module_loading import import_string
func = import_string(module_path)
resolved_params = {k: resolve_value(v, context) for k,v in params.get("params", {}).items()}
return func(**resolved_params)
@register_action("SEND_EMAIL", schema={"required": ["template"]})
def send_email_action(params, context):
tpl_name = params["template"]
tpl_pks = {k: resolve_value(v, context) for k,v in params.get("context_pks", {}).items()}
Email_Template = apps.get_model("app", "Email_Template")
try:
template_obj = Email_Template.objects.get(name=tpl_name)
except Email_Template.DoesNotExist:
raise Exception(f"Email template '{tpl_name}' not found")
runner = EmailJobRunner(template=template_obj, context_pks=tpl_pks)
success = runner.run()
return {"sent": success}
# ============================
# Tạo Document
# ============================
@register_action("GENERATE_DOCUMENT", schema={"required": ["document_code"]})
def generate_document_action(params, context):
code = resolve_value(params["document_code"], context)
pks = {k: str(resolve_value(v, context)) for k, v in params.get("context_pks", {}).items()}
save_as = params.get("save_as")
print(f" [GEN_DOC] Generating for code: {code}")
gen = DocumentGenerator(document_code=code, context_pks=pks)
result = gen.generate()
formatted_result = [{
"pdf": result.get("pdf"),
"file": result.get("file"),
"name": result.get("name"),
"code": code
}]
if save_as:
context[save_as] = formatted_result
print(f" [GEN_DOC] Success: Saved to context as '{save_as}'")
return formatted_result
# ============================
# Hạch toán
# ============================
@register_action("ACCOUNT_ENTRY", schema={"required": ["amount", "category_code"]})
def account_entry_action(params, context):
amount = resolve_value(params["amount"], context)
content = params.get("content", "")
userid = resolve_value(params.get("userid"), context)
return account_entry_api(
code=params.get("internal_account", "HOAC02VND"),
amount=amount,
content=content,
type=params.get("type", "CR"),
category=apps.get_model("app", "Entry_Category").objects.get(code=params["category_code"]).id,
userid=userid,
ref=params.get("ref")
)
# ============================
# Tìm bản ghi
# ============================
@register_action("LOOKUP_DATA", schema={"required": ["model_name", "lookup_field", "lookup_value"]})
def lookup_data_action(params, context):
model_name = params["model_name"]
field = params["lookup_field"]
save_as = params.get("save_as")
# Lấy giá trị thực tế (ví dụ: "reserved")
value = resolve_value(params["lookup_value"], context)
print(f" [LOOKUP] Searching {model_name} where {field} = '{value}'")
try:
Model = apps.get_model("app", model_name)
obj = Model.objects.filter(**{field: value}).first()
if not obj:
print(f" [LOOKUP] ERROR: Not found!")
raise Exception(f"Lookup failed: {model_name} with {field}={value} not found.")
if save_as:
context[save_as] = obj
print(f" [LOOKUP] Success: Found ID {obj.id}, saved to context as '{save_as}'")
return obj.id
except Exception as e:
print(f" [LOOKUP] EXCEPTION: {str(e)}")
raise e
# ============================
# Quét và phân bổ toàn bộ bút toán còn phần dư
# ============================
@register_action("ALLOCATE_ALL_PENDING", schema={})
def allocate_all_pending_action(params, context):
"""
Quét toàn bộ Internal_Entry có allocation_remain > 0 (type CR),
group by product_id, gọi phân bổ cho từng product cho đến khi hết.
"""
from app.payment import allocate_payment_to_schedules, allocate_penalty_reduction
from decimal import Decimal
Internal_Entry = apps.get_model("app", "Internal_Entry")
Payment_Schedule = apps.get_model("app", "Payment_Schedule")
Product_Booked = apps.get_model("app", "Product_Booked")
Transaction_Current = apps.get_model("app", "Transaction_Current")
Transaction_Detail = apps.get_model("app", "Transaction_Detail")
# ---------- Lấy toàn bộ product_id còn entry chưa phân bổ hết ----------
product_ids = list(
Internal_Entry.objects.filter(
type__code="CR",
allocation_remain__gt=0,
product__isnull=False
)
.values_list("product_id", flat=True)
.distinct()
)
print(f" [ALLOCATE_ALL] Tìm được {len(product_ids)} product có entry còn phần dư")
if not product_ids:
return {"total_products": 0, "results": []}
# ---------- DEBUG: dump trạng thái trước khi phân bổ ----------
for pid in product_ids:
print(f"\n [DEBUG] ===== Product {pid} — trạng thái TRƯỚC phân bổ =====")
# Entries
entries = Internal_Entry.objects.filter(
product_id=pid, type__code="CR", allocation_remain__gt=0
).order_by("date", "create_time")
for e in entries:
print(f" Entry id={e.id} | account_id={e.account_id} | amount={e.amount} | allocation_remain={e.allocation_remain} | date={e.date}")
# Lấy txn_detail của product
booked = Product_Booked.objects.filter(product_id=pid).first()
if not booked or not booked.transaction:
print(f" !! Không có Product_Booked / Transaction")
continue
txn = booked.transaction
txn_detail = None
try:
current = Transaction_Current.objects.get(transaction=txn)
txn_detail = current.detail
except Exception:
txn_detail = Transaction_Detail.objects.filter(transaction=txn).order_by("-create_time").first()
if not txn_detail:
print(f" !! Không có Transaction_Detail")
continue
# Schedules
all_schedules = Payment_Schedule.objects.filter(txn_detail=txn_detail).order_by("cycle", "from_date")
unpaid = all_schedules.filter(status__id=1)
print(f" Tổng schedule: {all_schedules.count()} | Chưa thanh toán (status=1): {unpaid.count()}")
for s in all_schedules:
print(f" Schedule id={s.id} | cycle={s.cycle} | status_id={s.status_id} | amount_remain={s.amount_remain} | penalty_remain={s.penalty_remain} | remain_amount={s.remain_amount}")
# ---------- Chạy phân bổ ----------
results = []
for product_id in product_ids:
try:
normal = allocate_payment_to_schedules(product_id)
reduction = allocate_penalty_reduction(product_id)
results.append({"product_id": product_id, "normal": normal, "reduction": reduction})
print(f" [ALLOCATE_ALL] Product {product_id}: OK — normal={normal}")
except Exception as e:
print(f" [ALLOCATE_ALL] Product {product_id}: ERROR - {str(e)}")
results.append({"product_id": product_id, "error": str(e)})
return {"total_products": len(product_ids), "results": results}