import os import re import subprocess from datetime import datetime from django.db import models import numpy as np from docx import Document from docx.enum.text import WD_ALIGN_PARAGRAPH from docx.shared import Inches, Pt from django.apps import apps from num2words import num2words from django.conf import settings from app.models import Document_Configuration # ============================================================================= # Constants # ============================================================================= BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) static_folder = os.path.join(settings.BASE_DIR, "static") # ============================================================================= # Utility Functions # ============================================================================= def replace_text_in_doc(doc, old_text, new_text): """Thay thế tất cả các lần xuất hiện của old_text bằng new_text trong tài liệu, xử lý split run.""" new_text = str(new_text) if new_text is not None else "" def replace_in_paragraph(para): para_full_text = ''.join(run.text for run in para.runs) if old_text in para_full_text: while old_text in ''.join(run.text for run in para.runs): runs = list(para.runs) full_text = ''.join(run.text for run in runs) start_idx = full_text.find(old_text) if start_idx == -1: break current_pos = 0 runs_to_modify = [] for run in runs: run_len = len(run.text) run_start = current_pos run_end = current_pos + run_len current_pos = run_end if run_start < start_idx + len(old_text) and run_end > start_idx: runs_to_modify.append(run) if not runs_to_modify: break first_run = runs_to_modify[0] first_run_index = next(i for i, r in enumerate(runs) if r is first_run) local_start = start_idx - sum(len(runs[i].text) for i in range(first_run_index)) # Clear the old text from the runs remaining_old = old_text for i, run in enumerate(runs_to_modify): run_text = run.text if i == 0: prefix = run_text[:local_start] remove_len = min(len(remaining_old), len(run_text) - local_start) suffix = run_text[local_start + remove_len:] run.text = prefix + suffix remaining_old = remaining_old[remove_len:] else: remove_len = min(len(remaining_old), len(run_text)) suffix = run_text[remove_len:] run.text = suffix remaining_old = remaining_old[remove_len:] # Insert the new text in the first run first_run.text = first_run.text[:local_start] + new_text + first_run.text[local_start:] for para in doc.paragraphs: replace_in_paragraph(para) for table in doc.tables: for row in table.rows: for cell in row.cells: for para in cell.paragraphs: replace_in_paragraph(para) for section in doc.sections: if section.header: for para in section.header.paragraphs: replace_in_paragraph(para) if section.footer: for para in section.footer.paragraphs: if any("PAGE" in run._element.xml for run in para.runs): continue replace_in_paragraph(para) def docx_to_pdf(input_path, output_dir=None): """Converts a .docx file to .pdf using LibreOffice, handling non-zero exit codes gracefully.""" if output_dir is None: output_dir = os.path.dirname(os.path.abspath(input_path)) pdf_path = os.path.join(output_dir, os.path.basename(input_path).replace(".docx", ".pdf")) try: result = subprocess.run( [ "libreoffice", "--headless", "--convert-to", "pdf", "--outdir", output_dir, input_path, ], timeout=60, capture_output=True, text=True, ) if result.returncode != 0: #print(f"WARNING: libreoffice command returned non-zero exit code ({result.returncode}) for {input_path}.") #print(f" STDOUT: {result.stdout}") #print(f" STDERR: {result.stderr}") if not os.path.exists(pdf_path) or os.path.getsize(pdf_path) == 0: raise Exception(f"PDF conversion failed and output file was not created. STDERR: {result.stderr}") else: print(f"INFO: PDF file was created successfully despite the non-zero exit code.") except FileNotFoundError: print("ERROR: libreoffice command not found. Please ensure it is installed and in your PATH.") raise except Exception as e: print(f"ERROR: An unexpected error occurred during PDF conversion for {input_path}. Error: {e}") raise def insert_image_after_keyword(doc, keywords, image_path, full_name, time): """Finds a keyword in a table and inserts an image and text after it.""" if not os.path.exists(image_path): #print(f"==INSERT IMAGE ERROR== File not found: {image_path}") return try: for table in doc.tables: for row in table.rows: for cell in row.cells: for para in cell.paragraphs: for keyword in keywords: if keyword in para.text: p_img = cell.add_paragraph() p_img.alignment = WD_ALIGN_PARAGRAPH.CENTER p_img.add_run().add_picture(image_path, width=Inches(1.5)) p_name = cell.add_paragraph() p_name.alignment = WD_ALIGN_PARAGRAPH.CENTER run_name = p_name.add_run(full_name) run_name.bold = True p_time = cell.add_paragraph() p_time.alignment = WD_ALIGN_PARAGRAPH.CENTER p_time.add_run(time) return except Exception as e: print(f"==INSERT IMAGE ERROR== {e}") # ============================================================================= # Document Generator Class # ============================================================================= class DocumentGenerator: def __init__(self, document_code, context_pks: dict): self.document_code = document_code self.context_pks = context_pks self.config = self._get_config() self.data_context = {} self.replacements = {} def _get_config(self): try: return Document_Configuration.objects.get(code=self.document_code) except Document_Configuration.DoesNotExist: raise ValueError(f"Document configuration '{self.document_code}' not found.") def _get_model(self, model_string): app_label, model_name = model_string.split(".") return apps.get_model(app_label, model_name) def _resolve_lookup_value(self, lookup_from): if lookup_from in self.context_pks: return self.context_pks[lookup_from] try: alias, field_path = lookup_from.split(".", 1) if alias not in self.data_context: raise ValueError(f"Alias '{alias}' not found in data context.") source_object = self.data_context.get(alias) return self._get_value_from_object(source_object, field_path) except ValueError: raise ValueError(f"Could not resolve '{lookup_from}'. It is not a valid API parameter or a reference to another data source.") def _get_value_from_object(self, obj, field_path): if not obj: return None import re parts = field_path.split('.') value = obj for part in parts: if value is None: break # 1. Kiểm tra nếu part chứa index mảng, ví dụ: "payment_plan[0]" array_match = re.match(r"(\w+)\[(\d+)\]", part) if array_match: attr_name = array_match.group(1) index = int(array_match.group(2)) # Lấy list từ object value = getattr(value, attr_name, None) # Truy cập phần tử theo index try: if isinstance(value, (list, tuple)): value = value[index] elif hasattr(value, 'all'): value = list(value)[index] except (IndexError, TypeError): return None else: # 2. Xử lý truy cập thuộc tính hoặc key của Dict (JSON) if isinstance(value, dict): value = value.get(part) else: value = getattr(value, part, None) # 3. Hỗ trợ lấy bản ghi đầu tiên nếu gặp Quan hệ ngược (Manager) if hasattr(value, 'all') and not isinstance(value, models.Model): value = value.first() return value def fetch_data(self): if not isinstance(self.config.mappings, list): raise TypeError("Document configuration 'mappings' must be a list.") for mapping in self.config.mappings: model_cls = self._get_model(mapping["model"]) lookup_field = mapping["lookup_field"] lookup_value = self._resolve_lookup_value(mapping["lookup_value_from"]) alias = mapping["alias"] if lookup_value is None: self.data_context[alias] = None if mapping["type"] == "object" else [] continue queryset = model_cls.objects.filter(**{lookup_field: lookup_value}) if mapping["type"] == "object": self.data_context[alias] = queryset.first() elif mapping["type"] == "list": self.data_context[alias] = list(queryset) def _format_value(self, value, format_config, obj=None): if value is None: return "" def apply_format(val, fmt, obj): fmt_type = fmt.get("type") if isinstance(fmt, dict) else fmt if fmt_type == "currency": try: num_val = int(round(float(val), 0)) return "{:,}".format(num_val).replace(",", ".") except Exception: return str(val) if fmt_type == "date": date_format = fmt.get("format", "%d/%m/%Y").replace("dd", "%d").replace("mm", "%m").replace("YYYY", "%Y") try: return val.strftime(date_format) except Exception: return str(val) if fmt_type == "number_to_words": try: return num2words(val, lang=fmt.get("lang", "en")) except Exception: return str(val) if fmt_type == "conditional": return fmt.get("true_value") if val else fmt.get("false_value") if fmt_type == "computed_months": start_date = self._resolve_lookup_value(fmt.get("start_date_from")) end_date = self._resolve_lookup_value(fmt.get("end_date_from")) if start_date and end_date: import datetime if not isinstance(start_date, datetime.date): return "" if not isinstance(end_date, datetime.date): return "" return str(int(round(((end_date - start_date).days) / 30, 0))) return "" if fmt_type == "expression": expr = fmt.get("expr") if not expr: return "" import re tokens = re.findall(r"[a-zA-Z0-9_\.]+", expr) local_dict = {} for token in tokens: if "__" in token or "." in token: val2 = self._get_value_from_object(obj, token) else: val2 = getattr(obj, token, None) try: val_str = str(val2) if val2 is not None else "" val_to_parse = val_str.replace(',', '') local_dict[token] = float(val_to_parse) if val_to_parse else 0 except Exception: local_dict[token] = 0 try: result = eval(expr, {"__builtins__": None}, local_dict) return int(round(result, 0)) if isinstance(result, (int, float)) else result except Exception as e: return 0 return val cur_fmt = format_config val = value while isinstance(cur_fmt, dict) and cur_fmt.get("next") is not None: val = apply_format(val, cur_fmt, obj) cur_fmt = cur_fmt["next"] val = apply_format(val, cur_fmt, obj) return str(val) def _scan_placeholders_in_doc(self, doc): """ Scans the entire document and returns a set of unique placeholders. FIXED: Now handles nested brackets correctly (e.g., [Transaction.payment_plan[0].amount(type:currency)]) """ placeholders = set() def scan_paragraphs(paragraphs): for para in paragraphs: if any("PAGE" in run._element.xml for run in para.runs): continue text_buffer = "" bracket_depth = 0 # Track nested bracket depth for run in para.runs: for char in run.text: if char == '[': if bracket_depth == 0: # Start of new placeholder text_buffer = "" else: # Nested bracket - keep it in buffer text_buffer += char bracket_depth += 1 elif char == ']': bracket_depth -= 1 if bracket_depth == 0: # End of placeholder if text_buffer: placeholders.add(f"[{text_buffer}]") text_buffer = "" elif bracket_depth > 0: # Still inside placeholder text_buffer += char # else: bracket_depth < 0 means unmatched ] - ignore else: if bracket_depth > 0: text_buffer += char scan_paragraphs(doc.paragraphs) for table in doc.tables: for row in table.rows: for cell in row.cells: scan_paragraphs(cell.paragraphs) for section in doc.sections: if section.header: scan_paragraphs(section.header.paragraphs) if section.footer: scan_paragraphs(section.footer.paragraphs) return placeholders def _parse_format_args(self, args_string): """Parses a string like 'lang:vi, type:number_to_words' into a dictionary.""" if not args_string: return {} format_config = {} args = args_string.split(',') for arg in args: if ':' in arg: key, value = arg.split(':', 1) format_config[key.strip()] = value.strip() return format_config def prepare_replacements(self, doc): """ Prepares all replacements by implementing a hybrid approach: 1. Prioritizes manual configuration from 'fields'. 2. Automatically handles any remaining dynamic placeholders. FIXED: Better regex pattern for nested brackets """ today = datetime.now() self.replacements['[day]'] = str(today.day) self.replacements['[month]'] = str(today.month) self.replacements['[year]'] = str(today.year) self.replacements['[date]'] = today.strftime("%d/%m/%Y") placeholders_in_doc = self._scan_placeholders_in_doc(doc) #print(f"FINAL DEBUG: Placeholders found in document: {placeholders_in_doc}") # PASS 1: Handle manual/explicit configuration (backward compatibility) if isinstance(self.config.mappings, list): for mapping in self.config.mappings: if "fields" not in mapping: continue alias = mapping["alias"] data = self.data_context.get(alias) if mapping["type"] == "list": items = data or [] max_items = mapping.get("max_items", 4) for i in range(max_items): item = items[i] if i < len(items) else None for p_template, config in mapping["fields"].items(): placeholder = p_template.replace("{index}", str(i + 1)) if placeholder in placeholders_in_doc: if item is None: self.replacements[placeholder] = "" else: if isinstance(config, dict): value = self._get_value_from_object(item, config["source"]) self.replacements[placeholder] = self._format_value(value, config["format"], item) else: value = self._get_value_from_object(item, config) self.replacements[placeholder] = str(value) if value is not None else "" placeholders_in_doc.discard(placeholder) elif mapping["type"] == "object": if data is None: for placeholder in mapping["fields"]: if placeholder in placeholders_in_doc: self.replacements[placeholder] = "" placeholders_in_doc.discard(placeholder) continue for placeholder, config in mapping["fields"].items(): if placeholder in placeholders_in_doc: if isinstance(config, dict): value = self._get_value_from_object(data, config["source"]) self.replacements[placeholder] = self._format_value(value, config["format"], data) else: value = self._get_value_from_object(data, config) self.replacements[placeholder] = str(value) if value is not None else "" placeholders_in_doc.discard(placeholder) # PASS 2: Handle remaining dynamic placeholders # FIXED: Better regex that properly handles nested brackets dynamic_pattern = re.compile(r'\[([a-zA-Z0-9_]+(?:\.[a-zA-Z0-9_\[\]]+)*)(?:\((.*?)\))?\]') for placeholder in list(placeholders_in_doc): match = dynamic_pattern.fullmatch(placeholder) if not match: #print(f"WARNING: Could not parse placeholder: {placeholder}") continue data_path, format_args_str = match.groups() if '.' not in data_path: continue try: alias, field_path = data_path.split('.', 1) if alias not in self.data_context: self.replacements[placeholder] = f"[ALIAS_NOT_FOUND: {alias}]" #print(f"WARNING: Alias '{alias}' not found for placeholder: {placeholder}") continue source_object = self.data_context.get(alias) value = self._get_value_from_object(source_object, field_path) if format_args_str: format_config = self._parse_format_args(format_args_str) self.replacements[placeholder] = self._format_value(value, format_config, source_object) else: self.replacements[placeholder] = str(value) if value is not None else "" #print(f"DEBUG: Resolved {placeholder} = {self.replacements[placeholder]}") except Exception as e: self.replacements[placeholder] = f"[ERROR: {e}]" #print(f"ERROR resolving placeholder {placeholder}: {e}") def generate(self, signature_info=None, output_filename=None): self.fetch_data() clean_template_path = self.config.template_path.lstrip('/') template_full_path = os.path.join(static_folder, clean_template_path) if not os.path.exists(template_full_path): raise FileNotFoundError(f"Template file not found at: {template_full_path}") doc = Document(template_full_path) self.prepare_replacements(doc) if output_filename: if not output_filename.endswith(".docx"): base_name = os.path.splitext(output_filename)[0] output_filename = f"{base_name}.docx" else: pk_values = "_".join(str(v) for v in self.context_pks.values()) output_filename = f"{self.document_code}_{pk_values}_{int(datetime.now().timestamp())}.docx" output_dir = os.path.join(static_folder, "contract") os.makedirs(output_dir, exist_ok=True) output_path = os.path.join(output_dir, output_filename) pdf_filename = output_filename.replace(".docx", ".pdf") #print(f"\n=== REPLACEMENTS TO BE APPLIED ===") for old_text, new_text in self.replacements.items(): #print(f"{old_text} -> {new_text}") replace_text_in_doc(doc, old_text, new_text) if signature_info: insert_image_after_keyword( doc, signature_info["keywords"], signature_info["file_path"], signature_info["full_name"], signature_info["timestamp"], ) doc.save(output_path) docx_to_pdf(output_path, output_dir) return { "code": self.document_code, "name": self.config.name, "file": output_filename, "pdf": pdf_filename, }