709 lines
28 KiB
Python
709 lines
28 KiB
Python
import os
|
|
import subprocess
|
|
from datetime import datetime
|
|
from django.db import models
|
|
import numpy as np
|
|
from docx import Document
|
|
from docx.enum.text import WD_ALIGN_PARAGRAPH
|
|
from docx.shared import Inches, Pt
|
|
from django.apps import apps
|
|
from num2words import num2words
|
|
from django.conf import settings
|
|
from app.models import Document_Configuration
|
|
|
|
# =============================================================================
|
|
# Constants
|
|
# =============================================================================
|
|
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
|
static_folder = os.path.join(settings.BASE_DIR, "static")
|
|
|
|
|
|
# =============================================================================
|
|
# Utility Functions
|
|
# =============================================================================
|
|
|
|
def replace_text_in_doc(doc, old_text, new_text):
|
|
"""Thay thế tất cả các lần xuất hiện của old_text bằng new_text trong tài liệu, xử lý split run."""
|
|
new_text = str(new_text) if new_text is not None else ""
|
|
|
|
def replace_in_paragraph(para):
|
|
runs = list(para.runs)
|
|
full_text = ''.join(run.text for run in runs)
|
|
if old_text not in full_text:
|
|
return
|
|
|
|
start_idx = full_text.find(old_text)
|
|
if start_idx == -1:
|
|
return
|
|
|
|
current_pos = 0
|
|
runs_to_modify = []
|
|
for run in runs:
|
|
run_len = len(run.text)
|
|
run_start = current_pos
|
|
run_end = current_pos + run_len
|
|
current_pos = run_end
|
|
|
|
if run_start < start_idx + len(old_text) and run_end > start_idx:
|
|
runs_to_modify.append(run)
|
|
|
|
if not runs_to_modify:
|
|
return
|
|
|
|
first_run = runs_to_modify[0]
|
|
first_run_index = next(i for i, r in enumerate(runs) if r is first_run)
|
|
|
|
local_start = start_idx - sum(len(runs[i].text) for i in range(first_run_index))
|
|
|
|
remaining_old = old_text
|
|
|
|
for i, run in enumerate(runs_to_modify):
|
|
run_text = run.text
|
|
if i == 0:
|
|
prefix = run_text[:local_start]
|
|
remove_len = min(len(remaining_old), len(run_text) - local_start)
|
|
suffix = run_text[local_start + remove_len:]
|
|
run.text = prefix + suffix
|
|
remaining_old = remaining_old[remove_len:]
|
|
else:
|
|
remove_len = min(len(remaining_old), len(run_text))
|
|
suffix = run_text[remove_len:]
|
|
run.text = suffix
|
|
remaining_old = remaining_old[remove_len:]
|
|
|
|
first_run = runs_to_modify[0]
|
|
first_run.text = first_run.text[:local_start] + new_text + first_run.text[local_start:]
|
|
|
|
replace_in_paragraph(para)
|
|
|
|
for para in doc.paragraphs:
|
|
replace_in_paragraph(para)
|
|
|
|
for table in doc.tables:
|
|
for row in table.rows:
|
|
for cell in row.cells:
|
|
for para in cell.paragraphs:
|
|
replace_in_paragraph(para)
|
|
|
|
for section in doc.sections:
|
|
footer = section.footer
|
|
for para in footer.paragraphs:
|
|
if any("PAGE" in run._element.xml for run in para.runs):
|
|
continue
|
|
replace_in_paragraph(para)
|
|
|
|
|
|
def docx_to_pdf(input_path, output_dir=None):
|
|
"""Converts a .docx file to .pdf using LibreOffice, handling non-zero exit codes gracefully."""
|
|
if output_dir is None:
|
|
output_dir = os.path.dirname(os.path.abspath(input_path))
|
|
|
|
pdf_path = os.path.join(output_dir, os.path.basename(input_path).replace(".docx", ".pdf"))
|
|
|
|
try:
|
|
result = subprocess.run(
|
|
[
|
|
"libreoffice",
|
|
"--headless",
|
|
"--convert-to",
|
|
"pdf",
|
|
"--outdir",
|
|
output_dir,
|
|
input_path,
|
|
],
|
|
timeout=60,
|
|
capture_output=True,
|
|
text=True,
|
|
)
|
|
|
|
if result.returncode != 0:
|
|
# Log the warning/error from LibreOffice
|
|
print(f"WARNING: libreoffice command returned non-zero exit code ({result.returncode}) for {input_path}.")
|
|
print(f" STDOUT: {result.stdout}")
|
|
print(f" STDERR: {result.stderr}")
|
|
|
|
# Check if the PDF was created anyway
|
|
if not os.path.exists(pdf_path) or os.path.getsize(pdf_path) == 0:
|
|
# This is a real failure
|
|
raise Exception(f"PDF conversion failed and output file was not created. STDERR: {result.stderr}")
|
|
else:
|
|
print(f"INFO: PDF file was created successfully despite the non-zero exit code.")
|
|
|
|
except FileNotFoundError:
|
|
print("ERROR: libreoffice command not found. Please ensure it is installed and in your PATH.")
|
|
raise
|
|
except Exception as e:
|
|
# Re-raise other exceptions (like timeout)
|
|
print(f"ERROR: An unexpected error occurred during PDF conversion for {input_path}. Error: {e}")
|
|
raise
|
|
|
|
|
|
def insert_image_after_keyword(doc, keywords, image_path, full_name, time):
|
|
"""Finds a keyword in a table and inserts an image and text after it."""
|
|
if not os.path.exists(image_path):
|
|
print(f"==INSERT IMAGE ERROR== File not found: {image_path}")
|
|
return
|
|
|
|
try:
|
|
for table in doc.tables:
|
|
for row in table.rows:
|
|
for cell in row.cells:
|
|
for para in cell.paragraphs:
|
|
for keyword in keywords:
|
|
if keyword in para.text:
|
|
p_img = cell.add_paragraph()
|
|
p_img.alignment = WD_ALIGN_PARAGRAPH.CENTER
|
|
p_img.add_run().add_picture(image_path, width=Inches(1.5))
|
|
|
|
p_name = cell.add_paragraph()
|
|
p_name.alignment = WD_ALIGN_PARAGRAPH.CENTER
|
|
run_name = p_name.add_run(full_name)
|
|
run_name.bold = True
|
|
|
|
p_time = cell.add_paragraph()
|
|
p_time.alignment = WD_ALIGN_PARAGRAPH.CENTER
|
|
p_time.add_run(time)
|
|
return
|
|
except Exception as e:
|
|
print(f"==INSERT IMAGE ERROR== {e}")
|
|
|
|
|
|
# =============================================================================
|
|
# Document Generator Class
|
|
# =============================================================================
|
|
|
|
|
|
import os
|
|
import re
|
|
import subprocess
|
|
from datetime import datetime
|
|
from django.db import models
|
|
import numpy as np
|
|
from docx import Document
|
|
from docx.enum.text import WD_ALIGN_PARAGRAPH
|
|
from docx.shared import Inches, Pt
|
|
from django.apps import apps
|
|
from num2words import num2words
|
|
from django.conf import settings
|
|
from app.models import Document_Configuration
|
|
|
|
# =============================================================================
|
|
# Constants
|
|
# =============================================================================
|
|
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
|
static_folder = os.path.join(settings.BASE_DIR, "static")
|
|
|
|
|
|
# =============================================================================
|
|
# Utility Functions
|
|
# =============================================================================
|
|
|
|
def replace_text_in_doc(doc, old_text, new_text):
|
|
"""Thay thế tất cả các lần xuất hiện của old_text bằng new_text trong tài liệu, xử lý split run."""
|
|
new_text = str(new_text) if new_text is not None else ""
|
|
|
|
def replace_in_paragraph(para):
|
|
runs = list(para.runs)
|
|
full_text = ''.join(run.text for run in runs)
|
|
if old_text not in full_text:
|
|
return
|
|
|
|
start_idx = full_text.find(old_text)
|
|
if start_idx == -1:
|
|
return
|
|
|
|
current_pos = 0
|
|
runs_to_modify = []
|
|
for run in runs:
|
|
run_len = len(run.text)
|
|
run_start = current_pos
|
|
run_end = current_pos + run_len
|
|
current_pos = run_end
|
|
|
|
if run_start < start_idx + len(old_text) and run_end > start_idx:
|
|
runs_to_modify.append(run)
|
|
|
|
if not runs_to_modify:
|
|
return
|
|
|
|
first_run = runs_to_modify[0]
|
|
first_run_index = next(i for i, r in enumerate(runs) if r is first_run)
|
|
|
|
local_start = start_idx - sum(len(runs[i].text) for i in range(first_run_index))
|
|
|
|
remaining_old = old_text
|
|
|
|
for i, run in enumerate(runs_to_modify):
|
|
run_text = run.text
|
|
if i == 0:
|
|
prefix = run_text[:local_start]
|
|
remove_len = min(len(remaining_old), len(run_text) - local_start)
|
|
suffix = run_text[local_start + remove_len:]
|
|
run.text = prefix + suffix
|
|
remaining_old = remaining_old[remove_len:]
|
|
else:
|
|
remove_len = min(len(remaining_old), len(run_text))
|
|
suffix = run_text[remove_len:]
|
|
run.text = suffix
|
|
remaining_old = remaining_old[remove_len:]
|
|
|
|
first_run = runs_to_modify[0]
|
|
first_run.text = first_run.text[:local_start] + new_text + first_run.text[local_start:]
|
|
|
|
replace_in_paragraph(para)
|
|
|
|
for para in doc.paragraphs:
|
|
replace_in_paragraph(para)
|
|
|
|
for table in doc.tables:
|
|
for row in table.rows:
|
|
for cell in row.cells:
|
|
for para in cell.paragraphs:
|
|
replace_in_paragraph(para)
|
|
|
|
for section in doc.sections:
|
|
footer = section.footer
|
|
for para in footer.paragraphs:
|
|
if any("PAGE" in run._element.xml for run in para.runs):
|
|
continue
|
|
replace_in_paragraph(para)
|
|
|
|
|
|
def docx_to_pdf(input_path, output_dir=None):
|
|
"""Converts a .docx file to .pdf using LibreOffice, handling non-zero exit codes gracefully."""
|
|
if output_dir is None:
|
|
output_dir = os.path.dirname(os.path.abspath(input_path))
|
|
|
|
pdf_path = os.path.join(output_dir, os.path.basename(input_path).replace(".docx", ".pdf"))
|
|
|
|
try:
|
|
result = subprocess.run(
|
|
[
|
|
"libreoffice",
|
|
"--headless",
|
|
"--convert-to",
|
|
"pdf",
|
|
"--outdir",
|
|
output_dir,
|
|
input_path,
|
|
],
|
|
timeout=60,
|
|
capture_output=True,
|
|
text=True,
|
|
)
|
|
|
|
if result.returncode != 0:
|
|
# Log the warning/error from LibreOffice
|
|
print(f"WARNING: libreoffice command returned non-zero exit code ({result.returncode}) for {input_path}.")
|
|
print(f" STDOUT: {result.stdout}")
|
|
print(f" STDERR: {result.stderr}")
|
|
|
|
# Check if the PDF was created anyway
|
|
if not os.path.exists(pdf_path) or os.path.getsize(pdf_path) == 0:
|
|
# This is a real failure
|
|
raise Exception(f"PDF conversion failed and output file was not created. STDERR: {result.stderr}")
|
|
else:
|
|
print(f"INFO: PDF file was created successfully despite the non-zero exit code.")
|
|
|
|
except FileNotFoundError:
|
|
print("ERROR: libreoffice command not found. Please ensure it is installed and in your PATH.")
|
|
raise
|
|
except Exception as e:
|
|
# Re-raise other exceptions (like timeout)
|
|
print(f"ERROR: An unexpected error occurred during PDF conversion for {input_path}. Error: {e}")
|
|
raise
|
|
|
|
|
|
def insert_image_after_keyword(doc, keywords, image_path, full_name, time):
|
|
"""Finds a keyword in a table and inserts an image and text after it."""
|
|
if not os.path.exists(image_path):
|
|
print(f"==INSERT IMAGE ERROR== File not found: {image_path}")
|
|
return
|
|
|
|
try:
|
|
for table in doc.tables:
|
|
for row in table.rows:
|
|
for cell in row.cells:
|
|
for para in cell.paragraphs:
|
|
for keyword in keywords:
|
|
if keyword in para.text:
|
|
p_img = cell.add_paragraph()
|
|
p_img.alignment = WD_ALIGN_PARAGRAPH.CENTER
|
|
p_img.add_run().add_picture(image_path, width=Inches(1.5))
|
|
|
|
p_name = cell.add_paragraph()
|
|
p_name.alignment = WD_ALIGN_PARAGRAPH.CENTER
|
|
run_name = p_name.add_run(full_name)
|
|
run_name.bold = True
|
|
|
|
p_time = cell.add_paragraph()
|
|
p_time.alignment = WD_ALIGN_PARAGRAPH.CENTER
|
|
p_time.add_run(time)
|
|
return
|
|
except Exception as e:
|
|
print(f"==INSERT IMAGE ERROR== {e}")
|
|
|
|
|
|
# =============================================================================
|
|
# Document Generator Class
|
|
# =============================================================================
|
|
|
|
|
|
class DocumentGenerator:
|
|
def __init__(self, document_code, context_pks: dict):
|
|
self.document_code = document_code
|
|
self.context_pks = context_pks
|
|
self.config = self._get_config()
|
|
self.data_context = {}
|
|
self.replacements = {}
|
|
|
|
def _get_config(self):
|
|
try:
|
|
return Document_Configuration.objects.get(code=self.document_code)
|
|
except Document_Configuration.DoesNotExist:
|
|
raise ValueError(f"Document configuration '{self.document_code}' not found.")
|
|
|
|
def _get_model(self, model_string):
|
|
app_label, model_name = model_string.split(".")
|
|
return apps.get_model(app_label, model_name)
|
|
|
|
def _resolve_lookup_value(self, lookup_from):
|
|
if lookup_from in self.context_pks:
|
|
return self.context_pks[lookup_from]
|
|
|
|
try:
|
|
alias, field_path = lookup_from.split(".", 1)
|
|
if alias not in self.data_context:
|
|
raise ValueError(f"Alias '{alias}' not found in data context.")
|
|
|
|
source_object = self.data_context.get(alias)
|
|
return self._get_value_from_object(source_object, field_path)
|
|
except ValueError:
|
|
raise ValueError(f"Could not resolve '{lookup_from}'. It is not a valid API parameter or a reference to another data source.")
|
|
|
|
def _get_value_from_object(self, obj, field_path):
|
|
if not obj:
|
|
return None
|
|
|
|
import re
|
|
parts = field_path.split('.')
|
|
value = obj
|
|
for part in parts:
|
|
if value is None:
|
|
break
|
|
|
|
# 1. Kiểm tra nếu part chứa index mảng, ví dụ: "payment_plan[0]"
|
|
array_match = re.match(r"(\w+)\[(\d+)\]", part)
|
|
|
|
if array_match:
|
|
attr_name = array_match.group(1) # Lấy "payment_plan"
|
|
index = int(array_match.group(2)) # Lấy 0
|
|
|
|
# Lấy list từ object
|
|
value = getattr(value, attr_name, None)
|
|
|
|
# Truy cập phần tử theo index
|
|
try:
|
|
if isinstance(value, (list, tuple)):
|
|
value = value[index]
|
|
elif hasattr(value, 'all'): # QuerySet
|
|
value = value[index]
|
|
except (IndexError, TypeError):
|
|
return None
|
|
else:
|
|
# 2. Xử lý truy cập thuộc tính hoặc key của Dict (JSON)
|
|
if isinstance(value, dict):
|
|
# Nếu là dict (phần tử trong JSONField), dùng .get()
|
|
value = value.get(part)
|
|
else:
|
|
# Nếu là object, dùng getattr()
|
|
value = getattr(value, part, None)
|
|
|
|
# 3. Hỗ trợ lấy bản ghi đầu tiên nếu gặp Quan hệ ngược (Manager)
|
|
if hasattr(value, 'all') and not isinstance(value, models.Model):
|
|
value = value.first()
|
|
|
|
return value
|
|
|
|
def fetch_data(self):
|
|
if not isinstance(self.config.mappings, list):
|
|
raise TypeError("Document configuration 'mappings' must be a list.")
|
|
|
|
for mapping in self.config.mappings:
|
|
model_cls = self._get_model(mapping["model"])
|
|
lookup_field = mapping["lookup_field"]
|
|
lookup_value = self._resolve_lookup_value(mapping["lookup_value_from"])
|
|
alias = mapping["alias"]
|
|
|
|
if lookup_value is None:
|
|
self.data_context[alias] = None if mapping["type"] == "object" else []
|
|
continue
|
|
|
|
queryset = model_cls.objects.filter(**{lookup_field: lookup_value})
|
|
|
|
if mapping["type"] == "object":
|
|
self.data_context[alias] = queryset.first()
|
|
elif mapping["type"] == "list":
|
|
self.data_context[alias] = list(queryset)
|
|
|
|
def _format_value(self, value, format_config, obj=None):
|
|
if value is None:
|
|
return ""
|
|
|
|
def apply_format(val, fmt, obj):
|
|
fmt_type = fmt.get("type") if isinstance(fmt, dict) else fmt
|
|
if fmt_type == "currency":
|
|
try:
|
|
# Đảm bảo val là số trước khi format, và làm tròn về số nguyên
|
|
num_val = int(round(float(val), 0))
|
|
# Format tiền tệ kiểu VN (dấu chấm phân cách hàng nghìn)
|
|
return "{:,}".format(num_val).replace(",", ".")
|
|
except Exception:
|
|
return str(val)
|
|
if fmt_type == "date":
|
|
date_format = fmt.get("format", "%d/%m/%Y").replace("dd", "%d").replace("mm", "%m").replace("YYYY", "%Y")
|
|
try:
|
|
return val.strftime(date_format)
|
|
except Exception:
|
|
return str(val)
|
|
if fmt_type == "number_to_words":
|
|
try:
|
|
return num2words(val, lang=fmt.get("lang", "en"))
|
|
except Exception:
|
|
return str(val)
|
|
if fmt_type == "conditional":
|
|
return fmt.get("true_value") if val else fmt.get("false_value")
|
|
if fmt_type == "computed_months":
|
|
start_date = self._resolve_lookup_value(fmt.get("start_date_from"))
|
|
end_date = self._resolve_lookup_value(fmt.get("end_date_from"))
|
|
if start_date and end_date:
|
|
# Thêm kiểm tra type để tránh lỗi
|
|
import datetime
|
|
if not isinstance(start_date, datetime.date): return ""
|
|
if not isinstance(end_date, datetime.date): return ""
|
|
|
|
return str(int(round(((end_date - start_date).days) / 30, 0)))
|
|
return ""
|
|
if fmt_type == "expression":
|
|
expr = fmt.get("expr")
|
|
if not expr:
|
|
return ""
|
|
import re
|
|
tokens = re.findall(r"[a-zA-Z0-9_\.]+", expr)
|
|
local_dict = {}
|
|
for token in tokens:
|
|
if "__" in token or "." in token:
|
|
val2 = self._get_value_from_object(obj, token)
|
|
else:
|
|
val2 = getattr(obj, token, None)
|
|
|
|
try:
|
|
val_str = str(val2) if val2 is not None else ""
|
|
|
|
val_to_parse = val_str.replace(',', '')
|
|
|
|
# Nếu val_to_parse là chuỗi rỗng, đặt bằng 0
|
|
local_dict[token] = float(val_to_parse) if val_to_parse else 0
|
|
|
|
except Exception:
|
|
local_dict[token] = 0
|
|
|
|
try:
|
|
result = eval(expr, {"__builtins__": None}, local_dict)
|
|
|
|
# Làm tròn kết quả về số nguyên (theo yêu cầu trước đó)
|
|
# Trả về kết quả số để formatter 'next' (currency) xử lý tiếp
|
|
return int(round(result, 0)) if isinstance(result, (int, float)) else result
|
|
|
|
except Exception as e:
|
|
return 0
|
|
return val
|
|
|
|
# Áp dụng lồng định dạng qua khóa 'next'
|
|
cur_fmt = format_config
|
|
val = value
|
|
|
|
# Vòng lặp này sẽ đảm bảo định dạng 'expression' được thực thi,
|
|
# sau đó kết quả số (chưa format) sẽ được truyền sang định dạng 'currency'
|
|
while isinstance(cur_fmt, dict) and cur_fmt.get("next") is not None:
|
|
val = apply_format(val, cur_fmt, obj)
|
|
cur_fmt = cur_fmt["next"]
|
|
|
|
val = apply_format(val, cur_fmt, obj)
|
|
return str(val)
|
|
|
|
def _scan_placeholders_in_doc(self, doc):
|
|
"""Scans the entire document and returns a set of unique placeholders."""
|
|
placeholders = set()
|
|
pattern = re.compile(r'\[([^\[\]]+)\]')
|
|
|
|
def scan_paragraph(para):
|
|
full_text = ''.join(run.text for run in para.runs)
|
|
for match in pattern.finditer(full_text):
|
|
placeholders.add(f"[{match.group(1)}]")
|
|
|
|
for para in doc.paragraphs:
|
|
scan_paragraph(para)
|
|
|
|
for table in doc.tables:
|
|
for row in table.rows:
|
|
for cell in row.cells:
|
|
for para in cell.paragraphs:
|
|
scan_paragraph(para)
|
|
|
|
for section in doc.sections:
|
|
footer = section.footer
|
|
for para in footer.paragraphs:
|
|
if any("PAGE" in run._element.xml for run in para.runs):
|
|
continue
|
|
scan_paragraph(para)
|
|
|
|
return placeholders
|
|
|
|
def _parse_format_args(self, args_string):
|
|
"""Parses a string like 'lang:vi, type:number_to_words' into a dictionary."""
|
|
if not args_string:
|
|
return {}
|
|
format_config = {}
|
|
args = args_string.split(',')
|
|
for arg in args:
|
|
if ':' in arg:
|
|
key, value = arg.split(':', 1)
|
|
format_config[key.strip()] = value.strip()
|
|
return format_config
|
|
|
|
def prepare_replacements(self, doc):
|
|
"""
|
|
Prepares all replacements by implementing a hybrid approach:
|
|
1. Prioritizes manual configuration from 'fields'.
|
|
2. Automatically handles any remaining dynamic placeholders.
|
|
"""
|
|
today = datetime.now()
|
|
self.replacements['[day]'] = str(today.day)
|
|
self.replacements['[month]'] = str(today.month)
|
|
self.replacements['[year]'] = str(today.year)
|
|
self.replacements['[date]'] = today.strftime("%d/%m/%Y")
|
|
|
|
placeholders_in_doc = self._scan_placeholders_in_doc(doc)
|
|
|
|
# PASS 1: Handle manual/explicit configuration (backward compatibility)
|
|
if isinstance(self.config.mappings, list):
|
|
for mapping in self.config.mappings:
|
|
if "fields" not in mapping:
|
|
continue
|
|
|
|
alias = mapping["alias"]
|
|
data = self.data_context.get(alias)
|
|
|
|
if mapping["type"] == "list":
|
|
items = data or []
|
|
max_items = mapping.get("max_items", 4)
|
|
for i in range(max_items):
|
|
item = items[i] if i < len(items) else None
|
|
for p_template, config in mapping["fields"].items():
|
|
placeholder = p_template.replace("{index}", str(i + 1))
|
|
if placeholder in placeholders_in_doc:
|
|
if item is None:
|
|
self.replacements[placeholder] = ""
|
|
else:
|
|
if isinstance(config, dict):
|
|
value = self._get_value_from_object(item, config["source"])
|
|
self.replacements[placeholder] = self._format_value(value, config["format"], item)
|
|
else:
|
|
value = self._get_value_from_object(item, config)
|
|
self.replacements[placeholder] = str(value) if value is not None else ""
|
|
placeholders_in_doc.discard(placeholder)
|
|
|
|
elif mapping["type"] == "object":
|
|
if data is None:
|
|
for placeholder in mapping["fields"]:
|
|
if placeholder in placeholders_in_doc:
|
|
self.replacements[placeholder] = ""
|
|
placeholders_in_doc.discard(placeholder)
|
|
continue
|
|
|
|
for placeholder, config in mapping["fields"].items():
|
|
if placeholder in placeholders_in_doc:
|
|
if isinstance(config, dict):
|
|
value = self._get_value_from_object(data, config["source"])
|
|
self.replacements[placeholder] = self._format_value(value, config["format"], data)
|
|
else:
|
|
value = self._get_value_from_object(data, config)
|
|
self.replacements[placeholder] = str(value) if value is not None else ""
|
|
placeholders_in_doc.discard(placeholder)
|
|
|
|
# PASS 2: Handle remaining dynamic placeholders
|
|
dynamic_pattern = re.compile(r'\[([a-zA-Z0-9_]+\.[a-zA-Z0-9_.]*)(?:\((.*?)\))?\]')
|
|
|
|
for placeholder in list(placeholders_in_doc):
|
|
match = dynamic_pattern.fullmatch(placeholder)
|
|
if not match:
|
|
continue
|
|
|
|
data_path, format_args_str = match.groups()
|
|
|
|
try:
|
|
alias, field_path = data_path.split('.', 1)
|
|
|
|
if alias not in self.data_context:
|
|
self.replacements[placeholder] = f"[ALIAS_NOT_FOUND: {alias}]"
|
|
continue
|
|
|
|
source_object = self.data_context.get(alias)
|
|
value = self._get_value_from_object(source_object, field_path)
|
|
|
|
if format_args_str:
|
|
format_config = self._parse_format_args(format_args_str)
|
|
self.replacements[placeholder] = self._format_value(value, format_config, source_object)
|
|
else:
|
|
self.replacements[placeholder] = str(value) if value is not None else ""
|
|
|
|
except Exception as e:
|
|
self.replacements[placeholder] = f"[ERROR: {e}]"
|
|
|
|
def generate(self, signature_info=None, output_filename=None):
|
|
self.fetch_data()
|
|
|
|
clean_template_path = self.config.template_path.lstrip('/')
|
|
template_full_path = os.path.join(static_folder, clean_template_path)
|
|
if not os.path.exists(template_full_path):
|
|
raise FileNotFoundError(f"Template file not found at: {template_full_path}")
|
|
|
|
doc = Document(template_full_path)
|
|
|
|
self.prepare_replacements(doc)
|
|
|
|
if output_filename:
|
|
if not output_filename.endswith(".docx"):
|
|
base_name = os.path.splitext(output_filename)[0]
|
|
output_filename = f"{base_name}.docx"
|
|
else:
|
|
pk_values = "_".join(str(v) for v in self.context_pks.values())
|
|
output_filename = f"{self.document_code}_{pk_values}_{int(datetime.now().timestamp())}.docx"
|
|
|
|
output_dir = os.path.join(static_folder, "contract")
|
|
os.makedirs(output_dir, exist_ok=True)
|
|
output_path = os.path.join(output_dir, output_filename)
|
|
pdf_filename = output_filename.replace(".docx", ".pdf")
|
|
|
|
for old_text, new_text in self.replacements.items():
|
|
replace_text_in_doc(doc, old_text, new_text)
|
|
|
|
if signature_info:
|
|
insert_image_after_keyword(
|
|
doc,
|
|
signature_info["keywords"],
|
|
signature_info["file_path"],
|
|
signature_info["full_name"],
|
|
signature_info["timestamp"],
|
|
)
|
|
|
|
doc.save(output_path)
|
|
docx_to_pdf(output_path, output_dir)
|
|
|
|
return {
|
|
"code": self.document_code,
|
|
"name": self.config.name,
|
|
"file": output_filename,
|
|
"pdf": pdf_filename,
|
|
} |