This commit is contained in:
anhduy-tech
2026-01-07 23:20:58 +07:00
parent 4d6d4daadd
commit 37f2ee75e2
13 changed files with 121 additions and 272 deletions

View File

@@ -1,178 +1,3 @@
import os
import subprocess
from datetime import datetime
from django.db import models
import numpy as np
from docx import Document
from docx.enum.text import WD_ALIGN_PARAGRAPH
from docx.shared import Inches, Pt
from django.apps import apps
from num2words import num2words
from django.conf import settings
from app.models import Document_Configuration
# =============================================================================
# Constants
# =============================================================================
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
static_folder = os.path.join(settings.BASE_DIR, "static")
# =============================================================================
# Utility Functions
# =============================================================================
def replace_text_in_doc(doc, old_text, new_text):
"""Thay thế tất cả các lần xuất hiện của old_text bằng new_text trong tài liệu, xử lý split run."""
new_text = str(new_text) if new_text is not None else ""
def replace_in_paragraph(para):
runs = list(para.runs)
full_text = ''.join(run.text for run in runs)
if old_text not in full_text:
return
start_idx = full_text.find(old_text)
if start_idx == -1:
return
current_pos = 0
runs_to_modify = []
for run in runs:
run_len = len(run.text)
run_start = current_pos
run_end = current_pos + run_len
current_pos = run_end
if run_start < start_idx + len(old_text) and run_end > start_idx:
runs_to_modify.append(run)
if not runs_to_modify:
return
first_run = runs_to_modify[0]
first_run_index = next(i for i, r in enumerate(runs) if r is first_run)
local_start = start_idx - sum(len(runs[i].text) for i in range(first_run_index))
remaining_old = old_text
for i, run in enumerate(runs_to_modify):
run_text = run.text
if i == 0:
prefix = run_text[:local_start]
remove_len = min(len(remaining_old), len(run_text) - local_start)
suffix = run_text[local_start + remove_len:]
run.text = prefix + suffix
remaining_old = remaining_old[remove_len:]
else:
remove_len = min(len(remaining_old), len(run_text))
suffix = run_text[remove_len:]
run.text = suffix
remaining_old = remaining_old[remove_len:]
first_run = runs_to_modify[0]
first_run.text = first_run.text[:local_start] + new_text + first_run.text[local_start:]
replace_in_paragraph(para)
for para in doc.paragraphs:
replace_in_paragraph(para)
for table in doc.tables:
for row in table.rows:
for cell in row.cells:
for para in cell.paragraphs:
replace_in_paragraph(para)
for section in doc.sections:
footer = section.footer
for para in footer.paragraphs:
if any("PAGE" in run._element.xml for run in para.runs):
continue
replace_in_paragraph(para)
def docx_to_pdf(input_path, output_dir=None):
"""Converts a .docx file to .pdf using LibreOffice, handling non-zero exit codes gracefully."""
if output_dir is None:
output_dir = os.path.dirname(os.path.abspath(input_path))
pdf_path = os.path.join(output_dir, os.path.basename(input_path).replace(".docx", ".pdf"))
try:
result = subprocess.run(
[
"libreoffice",
"--headless",
"--convert-to",
"pdf",
"--outdir",
output_dir,
input_path,
],
timeout=60,
capture_output=True,
text=True,
)
if result.returncode != 0:
# Log the warning/error from LibreOffice
print(f"WARNING: libreoffice command returned non-zero exit code ({result.returncode}) for {input_path}.")
print(f" STDOUT: {result.stdout}")
print(f" STDERR: {result.stderr}")
# Check if the PDF was created anyway
if not os.path.exists(pdf_path) or os.path.getsize(pdf_path) == 0:
# This is a real failure
raise Exception(f"PDF conversion failed and output file was not created. STDERR: {result.stderr}")
else:
print(f"INFO: PDF file was created successfully despite the non-zero exit code.")
except FileNotFoundError:
print("ERROR: libreoffice command not found. Please ensure it is installed and in your PATH.")
raise
except Exception as e:
# Re-raise other exceptions (like timeout)
print(f"ERROR: An unexpected error occurred during PDF conversion for {input_path}. Error: {e}")
raise
def insert_image_after_keyword(doc, keywords, image_path, full_name, time):
"""Finds a keyword in a table and inserts an image and text after it."""
if not os.path.exists(image_path):
print(f"==INSERT IMAGE ERROR== File not found: {image_path}")
return
try:
for table in doc.tables:
for row in table.rows:
for cell in row.cells:
for para in cell.paragraphs:
for keyword in keywords:
if keyword in para.text:
p_img = cell.add_paragraph()
p_img.alignment = WD_ALIGN_PARAGRAPH.CENTER
p_img.add_run().add_picture(image_path, width=Inches(1.5))
p_name = cell.add_paragraph()
p_name.alignment = WD_ALIGN_PARAGRAPH.CENTER
run_name = p_name.add_run(full_name)
run_name.bold = True
p_time = cell.add_paragraph()
p_time.alignment = WD_ALIGN_PARAGRAPH.CENTER
p_time.add_run(time)
return
except Exception as e:
print(f"==INSERT IMAGE ERROR== {e}")
# =============================================================================
# Document Generator Class
# =============================================================================
import os
import re
import subprocess
@@ -203,14 +28,15 @@ def replace_text_in_doc(doc, old_text, new_text):
new_text = str(new_text) if new_text is not None else ""
def replace_in_paragraph(para):
para_full_text = ''.join(run.text for run in para.runs)
if old_text in para_full_text:
while old_text in ''.join(run.text for run in para.runs):
runs = list(para.runs)
full_text = ''.join(run.text for run in runs)
if old_text not in full_text:
return
start_idx = full_text.find(old_text)
if start_idx == -1:
return
break
current_pos = 0
runs_to_modify = []
@@ -224,15 +50,15 @@ def replace_text_in_doc(doc, old_text, new_text):
runs_to_modify.append(run)
if not runs_to_modify:
return
break
first_run = runs_to_modify[0]
first_run_index = next(i for i, r in enumerate(runs) if r is first_run)
local_start = start_idx - sum(len(runs[i].text) for i in range(first_run_index))
# Clear the old text from the runs
remaining_old = old_text
for i, run in enumerate(runs_to_modify):
run_text = run.text
if i == 0:
@@ -247,11 +73,9 @@ def replace_text_in_doc(doc, old_text, new_text):
run.text = suffix
remaining_old = remaining_old[remove_len:]
first_run = runs_to_modify[0]
# Insert the new text in the first run
first_run.text = first_run.text[:local_start] + new_text + first_run.text[local_start:]
replace_in_paragraph(para)
for para in doc.paragraphs:
replace_in_paragraph(para)
@@ -262,8 +86,11 @@ def replace_text_in_doc(doc, old_text, new_text):
replace_in_paragraph(para)
for section in doc.sections:
footer = section.footer
for para in footer.paragraphs:
if section.header:
for para in section.header.paragraphs:
replace_in_paragraph(para)
if section.footer:
for para in section.footer.paragraphs:
if any("PAGE" in run._element.xml for run in para.runs):
continue
replace_in_paragraph(para)
@@ -293,14 +120,11 @@ def docx_to_pdf(input_path, output_dir=None):
)
if result.returncode != 0:
# Log the warning/error from LibreOffice
print(f"WARNING: libreoffice command returned non-zero exit code ({result.returncode}) for {input_path}.")
print(f" STDOUT: {result.stdout}")
print(f" STDERR: {result.stderr}")
#print(f"WARNING: libreoffice command returned non-zero exit code ({result.returncode}) for {input_path}.")
#print(f" STDOUT: {result.stdout}")
#print(f" STDERR: {result.stderr}")
# Check if the PDF was created anyway
if not os.path.exists(pdf_path) or os.path.getsize(pdf_path) == 0:
# This is a real failure
raise Exception(f"PDF conversion failed and output file was not created. STDERR: {result.stderr}")
else:
print(f"INFO: PDF file was created successfully despite the non-zero exit code.")
@@ -309,7 +133,6 @@ def docx_to_pdf(input_path, output_dir=None):
print("ERROR: libreoffice command not found. Please ensure it is installed and in your PATH.")
raise
except Exception as e:
# Re-raise other exceptions (like timeout)
print(f"ERROR: An unexpected error occurred during PDF conversion for {input_path}. Error: {e}")
raise
@@ -317,7 +140,7 @@ def docx_to_pdf(input_path, output_dir=None):
def insert_image_after_keyword(doc, keywords, image_path, full_name, time):
"""Finds a keyword in a table and inserts an image and text after it."""
if not os.path.exists(image_path):
print(f"==INSERT IMAGE ERROR== File not found: {image_path}")
#print(f"==INSERT IMAGE ERROR== File not found: {image_path}")
return
try:
@@ -396,8 +219,8 @@ class DocumentGenerator:
array_match = re.match(r"(\w+)\[(\d+)\]", part)
if array_match:
attr_name = array_match.group(1) # Lấy "payment_plan"
index = int(array_match.group(2)) # Lấy 0
attr_name = array_match.group(1)
index = int(array_match.group(2))
# Lấy list từ object
value = getattr(value, attr_name, None)
@@ -406,17 +229,15 @@ class DocumentGenerator:
try:
if isinstance(value, (list, tuple)):
value = value[index]
elif hasattr(value, 'all'): # QuerySet
value = value[index]
elif hasattr(value, 'all'):
value = list(value)[index]
except (IndexError, TypeError):
return None
else:
# 2. Xử lý truy cập thuộc tính hoặc key của Dict (JSON)
if isinstance(value, dict):
# Nếu là dict (phần tử trong JSONField), dùng .get()
value = value.get(part)
else:
# Nếu là object, dùng getattr()
value = getattr(value, part, None)
# 3. Hỗ trợ lấy bản ghi đầu tiên nếu gặp Quan hệ ngược (Manager)
@@ -454,9 +275,7 @@ class DocumentGenerator:
fmt_type = fmt.get("type") if isinstance(fmt, dict) else fmt
if fmt_type == "currency":
try:
# Đảm bảo val là số trước khi format, và làm tròn về số nguyên
num_val = int(round(float(val), 0))
# Format tiền tệ kiểu VN (dấu chấm phân cách hàng nghìn)
return "{:,}".format(num_val).replace(",", ".")
except Exception:
return str(val)
@@ -477,7 +296,6 @@ class DocumentGenerator:
start_date = self._resolve_lookup_value(fmt.get("start_date_from"))
end_date = self._resolve_lookup_value(fmt.get("end_date_from"))
if start_date and end_date:
# Thêm kiểm tra type để tránh lỗi
import datetime
if not isinstance(start_date, datetime.date): return ""
if not isinstance(end_date, datetime.date): return ""
@@ -499,32 +317,21 @@ class DocumentGenerator:
try:
val_str = str(val2) if val2 is not None else ""
val_to_parse = val_str.replace(',', '')
# Nếu val_to_parse là chuỗi rỗng, đặt bằng 0
local_dict[token] = float(val_to_parse) if val_to_parse else 0
except Exception:
local_dict[token] = 0
try:
result = eval(expr, {"__builtins__": None}, local_dict)
# Làm tròn kết quả về số nguyên (theo yêu cầu trước đó)
# Trả về kết quả số để formatter 'next' (currency) xử lý tiếp
return int(round(result, 0)) if isinstance(result, (int, float)) else result
except Exception as e:
return 0
return val
# Áp dụng lồng định dạng qua khóa 'next'
cur_fmt = format_config
val = value
# Vòng lặp này sẽ đảm bảo định dạng 'expression' được thực thi,
# sau đó kết quả số (chưa format) sẽ được truyền sang định dạng 'currency'
while isinstance(cur_fmt, dict) and cur_fmt.get("next") is not None:
val = apply_format(val, cur_fmt, obj)
cur_fmt = cur_fmt["next"]
@@ -533,30 +340,59 @@ class DocumentGenerator:
return str(val)
def _scan_placeholders_in_doc(self, doc):
"""Scans the entire document and returns a set of unique placeholders."""
"""
Scans the entire document and returns a set of unique placeholders.
FIXED: Now handles nested brackets correctly (e.g., [Transaction.payment_plan[0].amount(type:currency)])
"""
placeholders = set()
pattern = re.compile(r'\[([^\[\]]+)\]')
def scan_paragraph(para):
full_text = ''.join(run.text for run in para.runs)
for match in pattern.finditer(full_text):
placeholders.add(f"[{match.group(1)}]")
def scan_paragraphs(paragraphs):
for para in paragraphs:
if any("PAGE" in run._element.xml for run in para.runs):
continue
for para in doc.paragraphs:
scan_paragraph(para)
text_buffer = ""
bracket_depth = 0 # Track nested bracket depth
for run in para.runs:
for char in run.text:
if char == '[':
if bracket_depth == 0:
# Start of new placeholder
text_buffer = ""
else:
# Nested bracket - keep it in buffer
text_buffer += char
bracket_depth += 1
elif char == ']':
bracket_depth -= 1
if bracket_depth == 0:
# End of placeholder
if text_buffer:
placeholders.add(f"[{text_buffer}]")
text_buffer = ""
elif bracket_depth > 0:
# Still inside placeholder
text_buffer += char
# else: bracket_depth < 0 means unmatched ] - ignore
else:
if bracket_depth > 0:
text_buffer += char
scan_paragraphs(doc.paragraphs)
for table in doc.tables:
for row in table.rows:
for cell in row.cells:
for para in cell.paragraphs:
scan_paragraph(para)
scan_paragraphs(cell.paragraphs)
for section in doc.sections:
footer = section.footer
for para in footer.paragraphs:
if any("PAGE" in run._element.xml for run in para.runs):
continue
scan_paragraph(para)
if section.header:
scan_paragraphs(section.header.paragraphs)
if section.footer:
scan_paragraphs(section.footer.paragraphs)
return placeholders
@@ -577,6 +413,7 @@ class DocumentGenerator:
Prepares all replacements by implementing a hybrid approach:
1. Prioritizes manual configuration from 'fields'.
2. Automatically handles any remaining dynamic placeholders.
FIXED: Better regex pattern for nested brackets
"""
today = datetime.now()
self.replacements['[day]'] = str(today.day)
@@ -585,6 +422,7 @@ class DocumentGenerator:
self.replacements['[date]'] = today.strftime("%d/%m/%Y")
placeholders_in_doc = self._scan_placeholders_in_doc(doc)
#print(f"FINAL DEBUG: Placeholders found in document: {placeholders_in_doc}")
# PASS 1: Handle manual/explicit configuration (backward compatibility)
if isinstance(self.config.mappings, list):
@@ -633,20 +471,26 @@ class DocumentGenerator:
placeholders_in_doc.discard(placeholder)
# PASS 2: Handle remaining dynamic placeholders
dynamic_pattern = re.compile(r'\[([a-zA-Z0-9_]+\.[a-zA-Z0-9_.]*)(?:\((.*?)\))?\]')
# FIXED: Better regex that properly handles nested brackets
dynamic_pattern = re.compile(r'\[([a-zA-Z0-9_]+(?:\.[a-zA-Z0-9_\[\]]+)*)(?:\((.*?)\))?\]')
for placeholder in list(placeholders_in_doc):
match = dynamic_pattern.fullmatch(placeholder)
if not match:
#print(f"WARNING: Could not parse placeholder: {placeholder}")
continue
data_path, format_args_str = match.groups()
if '.' not in data_path:
continue
try:
alias, field_path = data_path.split('.', 1)
if alias not in self.data_context:
self.replacements[placeholder] = f"[ALIAS_NOT_FOUND: {alias}]"
#print(f"WARNING: Alias '{alias}' not found for placeholder: {placeholder}")
continue
source_object = self.data_context.get(alias)
@@ -658,8 +502,11 @@ class DocumentGenerator:
else:
self.replacements[placeholder] = str(value) if value is not None else ""
#print(f"DEBUG: Resolved {placeholder} = {self.replacements[placeholder]}")
except Exception as e:
self.replacements[placeholder] = f"[ERROR: {e}]"
#print(f"ERROR resolving placeholder {placeholder}: {e}")
def generate(self, signature_info=None, output_filename=None):
self.fetch_data()
@@ -686,7 +533,9 @@ class DocumentGenerator:
output_path = os.path.join(output_dir, output_filename)
pdf_filename = output_filename.replace(".docx", ".pdf")
#print(f"\n=== REPLACEMENTS TO BE APPLIED ===")
for old_text, new_text in self.replacements.items():
#print(f"{old_text} -> {new_text}")
replace_text_in_doc(doc, old_text, new_text)
if signature_info: