Files
api/app/document_generator.py
Xuan Loi 4079761962 changes
2026-01-17 12:14:53 +07:00

584 lines
24 KiB
Python

import os
import re
import subprocess
from datetime import datetime
from django.db import models
import numpy as np
from docx import Document
from docx.enum.text import WD_ALIGN_PARAGRAPH
from docx.shared import Inches, Pt
from django.apps import apps
from num2words import num2words
from django.conf import settings
from app.models import Document_Configuration
from decimal import Decimal
# =============================================================================
# Constants
# =============================================================================
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
static_folder = os.path.join(settings.BASE_DIR, "static")
# =============================================================================
# Utility Functions
# =============================================================================
def replace_text_in_doc(doc, old_text, new_text):
"""Thay thế tất cả các lần xuất hiện của old_text bằng new_text trong tài liệu, xử lý split run."""
new_text = str(new_text) if new_text is not None else ""
def replace_in_paragraph(para):
para_full_text = ''.join(run.text for run in para.runs)
if old_text in para_full_text:
while old_text in ''.join(run.text for run in para.runs):
runs = list(para.runs)
full_text = ''.join(run.text for run in runs)
start_idx = full_text.find(old_text)
if start_idx == -1:
break
current_pos = 0
runs_to_modify = []
for run in runs:
run_len = len(run.text)
run_start = current_pos
run_end = current_pos + run_len
current_pos = run_end
if run_start < start_idx + len(old_text) and run_end > start_idx:
runs_to_modify.append(run)
if not runs_to_modify:
break
first_run = runs_to_modify[0]
first_run_index = next(i for i, r in enumerate(runs) if r is first_run)
local_start = start_idx - sum(len(runs[i].text) for i in range(first_run_index))
# Clear the old text from the runs
remaining_old = old_text
for i, run in enumerate(runs_to_modify):
run_text = run.text
if i == 0:
prefix = run_text[:local_start]
remove_len = min(len(remaining_old), len(run_text) - local_start)
suffix = run_text[local_start + remove_len:]
run.text = prefix + suffix
remaining_old = remaining_old[remove_len:]
else:
remove_len = min(len(remaining_old), len(run_text))
suffix = run_text[remove_len:]
run.text = suffix
remaining_old = remaining_old[remove_len:]
# Insert the new text in the first run
first_run.text = first_run.text[:local_start] + new_text + first_run.text[local_start:]
for para in doc.paragraphs:
replace_in_paragraph(para)
for table in doc.tables:
for row in table.rows:
for cell in row.cells:
for para in cell.paragraphs:
replace_in_paragraph(para)
for section in doc.sections:
if section.header:
for para in section.header.paragraphs:
replace_in_paragraph(para)
if section.footer:
for para in section.footer.paragraphs:
if any("PAGE" in run._element.xml for run in para.runs):
continue
replace_in_paragraph(para)
def docx_to_pdf(input_path, output_dir=None):
"""Converts a .docx file to .pdf using LibreOffice, handling non-zero exit codes gracefully."""
if output_dir is None:
output_dir = os.path.dirname(os.path.abspath(input_path))
pdf_path = os.path.join(output_dir, os.path.basename(input_path).replace(".docx", ".pdf"))
try:
result = subprocess.run(
[
"libreoffice",
"--headless",
"--convert-to",
"pdf",
"--outdir",
output_dir,
input_path,
],
timeout=60,
capture_output=True,
text=True,
)
if result.returncode != 0:
#print(f"WARNING: libreoffice command returned non-zero exit code ({result.returncode}) for {input_path}.")
#print(f" STDOUT: {result.stdout}")
#print(f" STDERR: {result.stderr}")
if not os.path.exists(pdf_path) or os.path.getsize(pdf_path) == 0:
raise Exception(f"PDF conversion failed and output file was not created. STDERR: {result.stderr}")
else:
print(f"INFO: PDF file was created successfully despite the non-zero exit code.")
except FileNotFoundError:
print("ERROR: libreoffice command not found. Please ensure it is installed and in your PATH.")
raise
except Exception as e:
print(f"ERROR: An unexpected error occurred during PDF conversion for {input_path}. Error: {e}")
raise
def insert_image_after_keyword(doc, keywords, image_path, full_name, time):
"""Finds a keyword in a table and inserts an image and text after it."""
if not os.path.exists(image_path):
#print(f"==INSERT IMAGE ERROR== File not found: {image_path}")
return
try:
for table in doc.tables:
for row in table.rows:
for cell in row.cells:
for para in cell.paragraphs:
for keyword in keywords:
if keyword in para.text:
p_img = cell.add_paragraph()
p_img.alignment = WD_ALIGN_PARAGRAPH.CENTER
p_img.add_run().add_picture(image_path, width=Inches(1.5))
p_name = cell.add_paragraph()
p_name.alignment = WD_ALIGN_PARAGRAPH.CENTER
run_name = p_name.add_run(full_name)
run_name.bold = True
p_time = cell.add_paragraph()
p_time.alignment = WD_ALIGN_PARAGRAPH.CENTER
p_time.add_run(time)
return
except Exception as e:
print(f"==INSERT IMAGE ERROR== {e}")
# =============================================================================
# Document Generator Class
# =============================================================================
class DocumentGenerator:
def __init__(self, document_code, context_pks: dict):
self.document_code = document_code
self.context_pks = context_pks
self.config = self._get_config()
self.data_context = {}
self.replacements = {}
def _get_config(self):
try:
return Document_Configuration.objects.get(code=self.document_code)
except Document_Configuration.DoesNotExist:
raise ValueError(f"Document configuration '{self.document_code}' not found.")
def _get_model(self, model_string):
app_label, model_name = model_string.split(".")
return apps.get_model(app_label, model_name)
def _resolve_lookup_value(self, lookup_from):
if lookup_from in self.context_pks:
return self.context_pks[lookup_from]
try:
alias, field_path = lookup_from.split(".", 1)
if alias not in self.data_context:
raise ValueError(f"Alias '{alias}' not found in data context.")
source_object = self.data_context.get(alias)
return self._get_value_from_object(source_object, field_path)
except ValueError:
raise ValueError(f"Could not resolve '{lookup_from}'. It is not a valid API parameter or a reference to another data source.")
def _get_value_from_object(self, obj, field_path):
if not obj:
return None
import re
parts = field_path.split('.')
value = obj
for part in parts:
if value is None:
break
# 1. Kiểm tra nếu part chứa index mảng, ví dụ: "payment_plan[0]"
array_match = re.match(r"(\w+)\[(\d+)\]", part)
if array_match:
attr_name = array_match.group(1)
index = int(array_match.group(2))
# Lấy list từ object
value = getattr(value, attr_name, None)
# Truy cập phần tử theo index
try:
if isinstance(value, (list, tuple)):
value = value[index]
elif hasattr(value, 'all'):
value = list(value)[index]
except (IndexError, TypeError):
return None
else:
# 2. Xử lý truy cập thuộc tính hoặc key của Dict (JSON)
if isinstance(value, dict):
value = value.get(part)
else:
value = getattr(value, part, None)
# 3. Hỗ trợ lấy bản ghi đầu tiên nếu gặp Quan hệ ngược (Manager)
if hasattr(value, 'all') and not isinstance(value, models.Model):
value = value.first()
return value
def fetch_data(self):
if not isinstance(self.config.mappings, list):
raise TypeError("Document configuration 'mappings' must be a list.")
for mapping in self.config.mappings:
model_cls = self._get_model(mapping["model"])
lookup_field = mapping["lookup_field"]
lookup_value = self._resolve_lookup_value(mapping["lookup_value_from"])
alias = mapping["alias"]
if lookup_value is None:
self.data_context[alias] = None if mapping["type"] == "object" else []
continue
queryset = model_cls.objects.filter(**{lookup_field: lookup_value})
if mapping["type"] == "object":
self.data_context[alias] = queryset.first()
elif mapping["type"] == "list":
self.data_context[alias] = list(queryset)
def _format_value(self, value, format_config, obj=None):
if value is None:
return ""
def apply_format(val, fmt, obj):
fmt_type = fmt.get("type") if isinstance(fmt, dict) else fmt
if fmt_type == "currency":
try:
num_val = round(float(val), 2)
if Decimal(num_val) == Decimal(int(num_val)):
return "{:,}".format(int(num_val)).replace(",", ".")
else:
s = f"{num_val:,.2f}"
return s.replace(",", "X").replace(".", ",").replace("X", ".")
except Exception:
return str(val)
if fmt_type == "date":
date_format = fmt.get("format", "%d/%m/%Y").replace("dd", "%d").replace("mm", "%m").replace("YYYY", "%Y")
try:
return val.strftime(date_format)
except Exception:
return str(val)
if fmt_type == "number_to_words":
try:
return num2words(val, lang=fmt.get("lang", "vi"))
except Exception:
return str(val)
if fmt_type == "conditional":
return fmt.get("true_value") if val else fmt.get("false_value")
if fmt_type == "computed_months":
start_date = self._resolve_lookup_value(fmt.get("start_date_from"))
end_date = self._resolve_lookup_value(fmt.get("end_date_from"))
if start_date and end_date:
import datetime
if not isinstance(start_date, datetime.date): return ""
if not isinstance(end_date, datetime.date): return ""
return str(int(round(((end_date - start_date).days) / 30, 0)))
return ""
if fmt_type == "expression":
expr = fmt.get("expr")
if not expr:
return ""
import re
tokens = re.findall(r"[a-zA-Z0-9_\.]+", expr)
local_dict = {}
for token in tokens:
if "__" in token or "." in token:
val2 = self._get_value_from_object(obj, token)
else:
val2 = getattr(obj, token, None)
try:
val_str = str(val2) if val2 is not None else ""
val_to_parse = val_str.replace(',', '')
local_dict[token] = float(val_to_parse) if val_to_parse else 0
except Exception:
local_dict[token] = 0
try:
result = eval(expr, {"__builtins__": None}, local_dict)
return int(round(result, 0)) if isinstance(result, (int, float)) else result
except Exception as e:
return 0
return val
cur_fmt = format_config
val = value
while isinstance(cur_fmt, dict) and cur_fmt.get("next") is not None:
val = apply_format(val, cur_fmt, obj)
cur_fmt = cur_fmt["next"]
val = apply_format(val, cur_fmt, obj)
return str(val)
def _scan_placeholders_in_doc(self, doc):
"""
Scans the entire document and returns a set of unique placeholders.
FIXED: Now handles nested brackets correctly (e.g., [Transaction.payment_plan[0].amount(type:currency)])
"""
placeholders = set()
def scan_paragraphs(paragraphs):
for para in paragraphs:
if any("PAGE" in run._element.xml for run in para.runs):
continue
text_buffer = ""
bracket_depth = 0 # Track nested bracket depth
for run in para.runs:
for char in run.text:
if char == '[':
if bracket_depth == 0:
# Start of new placeholder
text_buffer = ""
else:
# Nested bracket - keep it in buffer
text_buffer += char
bracket_depth += 1
elif char == ']':
bracket_depth -= 1
if bracket_depth == 0:
# End of placeholder
if text_buffer:
placeholders.add(f"[{text_buffer}]")
text_buffer = ""
elif bracket_depth > 0:
# Still inside placeholder
text_buffer += char
# else: bracket_depth < 0 means unmatched ] - ignore
else:
if bracket_depth > 0:
text_buffer += char
scan_paragraphs(doc.paragraphs)
for table in doc.tables:
for row in table.rows:
for cell in row.cells:
scan_paragraphs(cell.paragraphs)
for section in doc.sections:
if section.header:
scan_paragraphs(section.header.paragraphs)
if section.footer:
scan_paragraphs(section.footer.paragraphs)
return placeholders
def _parse_format_args(self, args_string):
if not args_string:
return {}
parts = [p.strip() for p in args_string.split(',')]
root = {}
current = root
for part in parts:
if ':' not in part:
continue
key, value = part.split(':', 1)
key = key.strip()
value = value.strip()
if key == "next":
# nếu next chưa tồn tại thì tạo
if "next" not in current or not isinstance(current["next"], dict):
current["next"] = {}
current = current["next"]
# hỗ trợ next:type:number_to_words
if ':' in value:
sub_key, sub_val = value.split(':', 1)
current[sub_key.strip()] = sub_val.strip()
else:
current[key] = value
return root
def prepare_replacements(self, doc):
"""
Prepares all replacements by implementing a hybrid approach:
1. Prioritizes manual configuration from 'fields'.
2. Automatically handles any remaining dynamic placeholders.
FIXED: Better regex pattern for nested brackets
"""
today = datetime.now()
self.replacements['[day]'] = str(today.day)
self.replacements['[month]'] = str(today.month)
self.replacements['[year]'] = str(today.year)
self.replacements['[date]'] = today.strftime("%d/%m/%Y")
placeholders_in_doc = self._scan_placeholders_in_doc(doc)
#print(f"FINAL DEBUG: Placeholders found in document: {placeholders_in_doc}")
# PASS 1: Handle manual/explicit configuration (backward compatibility)
if isinstance(self.config.mappings, list):
for mapping in self.config.mappings:
if "fields" not in mapping:
continue
alias = mapping["alias"]
data = self.data_context.get(alias)
if mapping["type"] == "list":
items = data or []
max_items = mapping.get("max_items", 4)
for i in range(max_items):
item = items[i] if i < len(items) else None
for p_template, config in mapping["fields"].items():
placeholder = p_template.replace("{index}", str(i + 1))
if placeholder in placeholders_in_doc:
if item is None:
self.replacements[placeholder] = ""
else:
if isinstance(config, dict):
value = self._get_value_from_object(item, config["source"])
self.replacements[placeholder] = self._format_value(value, config["format"], item)
else:
value = self._get_value_from_object(item, config)
self.replacements[placeholder] = str(value) if value is not None else ""
placeholders_in_doc.discard(placeholder)
elif mapping["type"] == "object":
if data is None:
for placeholder in mapping["fields"]:
if placeholder in placeholders_in_doc:
self.replacements[placeholder] = ""
placeholders_in_doc.discard(placeholder)
continue
for placeholder, config in mapping["fields"].items():
if placeholder in placeholders_in_doc:
if isinstance(config, dict):
value = self._get_value_from_object(data, config["source"])
self.replacements[placeholder] = self._format_value(value, config["format"], data)
else:
value = self._get_value_from_object(data, config)
self.replacements[placeholder] = str(value) if value is not None else ""
placeholders_in_doc.discard(placeholder)
# PASS 2: Handle remaining dynamic placeholders
# FIXED: Better regex that properly handles nested brackets
dynamic_pattern = re.compile(r'\[([a-zA-Z0-9_]+(?:\.[a-zA-Z0-9_\[\]]+)*)(?:\((.*?)\))?\]')
for placeholder in list(placeholders_in_doc):
match = dynamic_pattern.fullmatch(placeholder)
if not match:
#print(f"WARNING: Could not parse placeholder: {placeholder}")
continue
data_path, format_args_str = match.groups()
if '.' not in data_path:
continue
try:
alias, field_path = data_path.split('.', 1)
if alias not in self.data_context:
self.replacements[placeholder] = f"[ALIAS_NOT_FOUND: {alias}]"
#print(f"WARNING: Alias '{alias}' not found for placeholder: {placeholder}")
continue
source_object = self.data_context.get(alias)
value = self._get_value_from_object(source_object, field_path)
if format_args_str:
format_config = self._parse_format_args(format_args_str)
self.replacements[placeholder] = self._format_value(value, format_config, source_object)
else:
self.replacements[placeholder] = str(value) if value is not None else ""
#print(f"DEBUG: Resolved {placeholder} = {self.replacements[placeholder]}")
except Exception as e:
self.replacements[placeholder] = f"[ERROR: {e}]"
#print(f"ERROR resolving placeholder {placeholder}: {e}")
def generate(self, signature_info=None, output_filename=None):
self.fetch_data()
clean_template_path = self.config.template_path.lstrip('/')
template_full_path = os.path.join(static_folder, clean_template_path)
if not os.path.exists(template_full_path):
raise FileNotFoundError(f"Template file not found at: {template_full_path}")
doc = Document(template_full_path)
self.prepare_replacements(doc)
if output_filename:
if not output_filename.endswith(".docx"):
base_name = os.path.splitext(output_filename)[0]
output_filename = f"{base_name}.docx"
else:
pk_values = "_".join(str(v) for v in self.context_pks.values())
output_filename = f"{self.document_code}_{pk_values}_{int(datetime.now().timestamp())}.docx"
output_dir = os.path.join(static_folder, "contract")
os.makedirs(output_dir, exist_ok=True)
output_path = os.path.join(output_dir, output_filename)
pdf_filename = output_filename.replace(".docx", ".pdf")
#print(f"\n=== REPLACEMENTS TO BE APPLIED ===")
for old_text, new_text in self.replacements.items():
#print(f"{old_text} -> {new_text}")
replace_text_in_doc(doc, old_text, new_text)
if signature_info:
insert_image_after_keyword(
doc,
signature_info["keywords"],
signature_info["file_path"],
signature_info["full_name"],
signature_info["timestamp"],
)
doc.save(output_path)
docx_to_pdf(output_path, output_dir)
return {
"code": self.document_code,
"name": self.config.name,
"file": output_filename,
"pdf": pdf_filename,
}