import os import subprocess from datetime import datetime from django.db import models import numpy as np from docx import Document from docx.enum.text import WD_ALIGN_PARAGRAPH from docx.shared import Inches, Pt from django.apps import apps from num2words import num2words from django.conf import settings from app.models import Document_Configuration # ============================================================================= # Constants # ============================================================================= BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) static_folder = os.path.join(settings.BASE_DIR, "static") # ============================================================================= # Utility Functions # ============================================================================= def replace_text_in_doc(doc, old_text, new_text): """Thay thế tất cả các lần xuất hiện của old_text bằng new_text trong tài liệu, xử lý split run.""" new_text = str(new_text) if new_text is not None else "" def replace_in_paragraph(para): runs = list(para.runs) full_text = ''.join(run.text for run in runs) if old_text not in full_text: return start_idx = full_text.find(old_text) if start_idx == -1: return current_pos = 0 runs_to_modify = [] for run in runs: run_len = len(run.text) run_start = current_pos run_end = current_pos + run_len current_pos = run_end if run_start < start_idx + len(old_text) and run_end > start_idx: runs_to_modify.append(run) if not runs_to_modify: return first_run = runs_to_modify[0] first_run_index = next(i for i, r in enumerate(runs) if r is first_run) local_start = start_idx - sum(len(runs[i].text) for i in range(first_run_index)) remaining_old = old_text for i, run in enumerate(runs_to_modify): run_text = run.text if i == 0: prefix = run_text[:local_start] remove_len = min(len(remaining_old), len(run_text) - local_start) suffix = run_text[local_start + remove_len:] run.text = prefix + suffix remaining_old = remaining_old[remove_len:] else: remove_len = min(len(remaining_old), len(run_text)) suffix = run_text[remove_len:] run.text = suffix remaining_old = remaining_old[remove_len:] first_run = runs_to_modify[0] first_run.text = first_run.text[:local_start] + new_text + first_run.text[local_start:] replace_in_paragraph(para) for para in doc.paragraphs: replace_in_paragraph(para) for table in doc.tables: for row in table.rows: for cell in row.cells: for para in cell.paragraphs: replace_in_paragraph(para) for section in doc.sections: footer = section.footer for para in footer.paragraphs: if any("PAGE" in run._element.xml for run in para.runs): continue replace_in_paragraph(para) def docx_to_pdf(input_path, output_dir=None): """Converts a .docx file to .pdf using LibreOffice, handling non-zero exit codes gracefully.""" if output_dir is None: output_dir = os.path.dirname(os.path.abspath(input_path)) pdf_path = os.path.join(output_dir, os.path.basename(input_path).replace(".docx", ".pdf")) try: result = subprocess.run( [ "libreoffice", "--headless", "--convert-to", "pdf", "--outdir", output_dir, input_path, ], timeout=60, capture_output=True, text=True, ) if result.returncode != 0: # Log the warning/error from LibreOffice print(f"WARNING: libreoffice command returned non-zero exit code ({result.returncode}) for {input_path}.") print(f" STDOUT: {result.stdout}") print(f" STDERR: {result.stderr}") # Check if the PDF was created anyway if not os.path.exists(pdf_path) or os.path.getsize(pdf_path) == 0: # This is a real failure raise Exception(f"PDF conversion failed and output file was not created. STDERR: {result.stderr}") else: print(f"INFO: PDF file was created successfully despite the non-zero exit code.") except FileNotFoundError: print("ERROR: libreoffice command not found. Please ensure it is installed and in your PATH.") raise except Exception as e: # Re-raise other exceptions (like timeout) print(f"ERROR: An unexpected error occurred during PDF conversion for {input_path}. Error: {e}") raise def insert_image_after_keyword(doc, keywords, image_path, full_name, time): """Finds a keyword in a table and inserts an image and text after it.""" if not os.path.exists(image_path): print(f"==INSERT IMAGE ERROR== File not found: {image_path}") return try: for table in doc.tables: for row in table.rows: for cell in row.cells: for para in cell.paragraphs: for keyword in keywords: if keyword in para.text: p_img = cell.add_paragraph() p_img.alignment = WD_ALIGN_PARAGRAPH.CENTER p_img.add_run().add_picture(image_path, width=Inches(1.5)) p_name = cell.add_paragraph() p_name.alignment = WD_ALIGN_PARAGRAPH.CENTER run_name = p_name.add_run(full_name) run_name.bold = True p_time = cell.add_paragraph() p_time.alignment = WD_ALIGN_PARAGRAPH.CENTER p_time.add_run(time) return except Exception as e: print(f"==INSERT IMAGE ERROR== {e}") # ============================================================================= # Document Generator Class # ============================================================================= class DocumentGenerator: def __init__(self, document_code, context_pks: dict): self.document_code = document_code self.context_pks = context_pks self.config = self._get_config() self.data_context = {} self.replacements = {} def _get_config(self): try: return Document_Configuration.objects.get(code=self.document_code) except Document_Configuration.DoesNotExist: raise ValueError(f"Document configuration '{self.document_code}' not found.") def _get_model(self, model_string): app_label, model_name = model_string.split(".") return apps.get_model(app_label, model_name) def _resolve_lookup_value(self, lookup_from): if lookup_from in self.context_pks: return self.context_pks[lookup_from] try: alias, field_path = lookup_from.split(".", 1) if alias not in self.data_context: raise ValueError(f"Alias '{alias}' not found in data context.") source_object = self.data_context.get(alias) return self._get_value_from_object(source_object, field_path) except ValueError: raise ValueError(f"Could not resolve '{lookup_from}'. It is not a valid API parameter or a reference to another data source.") def _get_value_from_object(self, obj, field_path): if not obj: return None parts = field_path.split('.') value = obj for part in parts: if value is None: break # Lấy thuộc tính từ object value = getattr(value, part, None) # KIỂM TRA NẾU LÀ QUAN HỆ NGƯỢC (ForeignKey ngược hoặc ManyToMany) # Trong Django, các quan hệ này trả về một Manager (có method 'all') if hasattr(value, 'all') and not isinstance(value, models.Model): value = value.first() # Tự động lấy bản ghi đầu tiên return value def fetch_data(self): if not isinstance(self.config.mappings, list): raise TypeError("Document configuration 'mappings' must be a list.") for mapping in self.config.mappings: model_cls = self._get_model(mapping["model"]) lookup_field = mapping["lookup_field"] lookup_value = self._resolve_lookup_value(mapping["lookup_value_from"]) alias = mapping["alias"] if lookup_value is None: self.data_context[alias] = None if mapping["type"] == "object" else [] continue queryset = model_cls.objects.filter(**{lookup_field: lookup_value}) if mapping["type"] == "object": self.data_context[alias] = queryset.first() elif mapping["type"] == "list": self.data_context[alias] = list(queryset) def _format_value(self, value, format_config, obj=None): if value is None: return "" def apply_format(val, fmt, obj): fmt_type = fmt.get("type") if isinstance(fmt, dict) else fmt if fmt_type == "currency": try: # Đảm bảo val là số trước khi format, và làm tròn về số nguyên num_val = int(round(float(val), 0)) # Format tiền tệ kiểu VN (dấu chấm phân cách hàng nghìn) return "{:,}".format(num_val).replace(",", ".") except Exception: return str(val) if fmt_type == "date": date_format = fmt.get("format", "%d/%m/%Y").replace("dd", "%d").replace("mm", "%m").replace("YYYY", "%Y") try: return val.strftime(date_format) except Exception: return str(val) if fmt_type == "number_to_words": try: return num2words(val, lang=fmt.get("lang", "en")) except Exception: return str(val) if fmt_type == "conditional": return fmt.get("true_value") if val else fmt.get("false_value") if fmt_type == "computed_months": start_date = self._resolve_lookup_value(fmt.get("start_date_from")) end_date = self._resolve_lookup_value(fmt.get("end_date_from")) if start_date and end_date: # Thêm kiểm tra type để tránh lỗi import datetime if not isinstance(start_date, datetime.date): return "" if not isinstance(end_date, datetime.date): return "" return str(int(round(((end_date - start_date).days) / 30, 0))) return "" if fmt_type == "expression": expr = fmt.get("expr") if not expr: return "" import re tokens = re.findall(r"[a-zA-Z0-9_\.]+", expr) local_dict = {} for token in tokens: if "__" in token or "." in token: val2 = self._get_value_from_object(obj, token) else: val2 = getattr(obj, token, None) try: val_str = str(val2) if val2 is not None else "" val_to_parse = val_str.replace(',', '') # Nếu val_to_parse là chuỗi rỗng, đặt bằng 0 local_dict[token] = float(val_to_parse) if val_to_parse else 0 except Exception: local_dict[token] = 0 try: result = eval(expr, {"__builtins__": None}, local_dict) # Làm tròn kết quả về số nguyên (theo yêu cầu trước đó) # Trả về kết quả số để formatter 'next' (currency) xử lý tiếp return int(round(result, 0)) if isinstance(result, (int, float)) else result except Exception as e: return 0 return val # Áp dụng lồng định dạng qua khóa 'next' cur_fmt = format_config val = value # Vòng lặp này sẽ đảm bảo định dạng 'expression' được thực thi, # sau đó kết quả số (chưa format) sẽ được truyền sang định dạng 'currency' while isinstance(cur_fmt, dict) and cur_fmt.get("next") is not None: val = apply_format(val, cur_fmt, obj) cur_fmt = cur_fmt["next"] val = apply_format(val, cur_fmt, obj) return str(val) def prepare_replacements(self): # Set base date replacements today = datetime.now() self.replacements['[day]'] = str(today.day) self.replacements['[month]'] = str(today.month) self.replacements['[year]'] = str(today.year) self.replacements['[date]'] = today.strftime("%d/%m/%Y") for mapping in self.config.mappings: alias = mapping["alias"] data = self.data_context.get(alias) if mapping["type"] == "object": if data is None: for placeholder in mapping["fields"]: self.replacements[placeholder] = "" continue for placeholder, config in mapping["fields"].items(): if isinstance(config, dict): value = self._get_value_from_object(data, config["source"]) self.replacements[placeholder] = self._format_value(value, config["format"], data) else: value = self._get_value_from_object(data, config) self.replacements[placeholder] = str(value) if value is not None else "" elif mapping["type"] == "list": items = data or [] max_items = mapping.get("max_items", 4) for i in range(max_items): item = items[i] if i < len(items) else None for p_template, config in mapping["fields"].items(): placeholder = p_template.replace("{index}", str(i + 1)) if item is None: self.replacements[placeholder] = "" continue if isinstance(config, dict): value = self._get_value_from_object(item, config["source"]) self.replacements[placeholder] = self._format_value(value, config["format"], item) else: value = self._get_value_from_object(item, config) self.replacements[placeholder] = str(value) if value is not None else "" def generate(self, signature_info=None, output_filename=None): self.fetch_data() self.prepare_replacements() # Remove leading slashes from template_path to prevent os.path.join issues clean_template_path = self.config.template_path.lstrip('/') template_full_path = os.path.join(static_folder, clean_template_path) if not os.path.exists(template_full_path): raise FileNotFoundError(f"Template file not found at: {template_full_path}") # --- FILENAME LOGIC --- if output_filename: # Use user-provided filename, ensure it has the correct extension if not output_filename.endswith(".docx"): base_name = os.path.splitext(output_filename)[0] output_filename = f"{base_name}.docx" else: # Use a more descriptive output filename (original logic) pk_values = "_".join(self.context_pks.values()) output_filename = f"{self.document_code}_{pk_values}_{int(datetime.now().timestamp())}.docx" output_dir = os.path.join(static_folder, "contract") os.makedirs(output_dir, exist_ok=True) output_path = os.path.join(output_dir, output_filename) pdf_filename = output_filename.replace(".docx", ".pdf") doc = Document(template_full_path) for old_text, new_text in self.replacements.items(): replace_text_in_doc(doc, old_text, new_text) if signature_info: insert_image_after_keyword( doc, signature_info["keywords"], signature_info["file_path"], signature_info["full_name"], signature_info["timestamp"], ) doc.save(output_path) docx_to_pdf(output_path, output_dir) return { "code": self.document_code, "name": self.config.name, "file": output_filename, "pdf": pdf_filename, }