Files
api/app/jobemail.py
2025-12-30 11:27:14 +07:00

194 lines
7.9 KiB
Python

import os
from django.conf import settings
import numpy as np
from datetime import datetime
from django.apps import apps
from num2words import num2words
from app.models import Email_Template
from app.email import send_via_zeptomail
class EmailJobRunner:
def __init__(self, template: Email_Template, context_pks: dict):
self.template = template
self.context_pks = context_pks
self.config = self.template.content
self.data_context = {}
self.replacements = {}
def _get_model(self, model_string):
app_label, model_name = model_string.split(".")
return apps.get_model(app_label, model_name)
def _get_value_from_object(self, obj, field_path):
if obj is None:
return None
value = obj
for part in field_path.replace("__", ".").split("."):
if value is None:
return None
value = getattr(value, part, None)
return value
def _resolve_lookup_value(self, lookup_from):
if lookup_from in self.context_pks:
return self.context_pks[lookup_from]
try:
alias, field_path = lookup_from.split(".", 1)
if alias not in self.data_context:
raise ValueError(f"Alias '{alias}' not found in data context.")
source_object = self.data_context.get(alias)
return self._get_value_from_object(source_object, field_path)
except ValueError:
raise ValueError(f"Could not resolve '{lookup_from}'. It is not a valid API parameter or a reference to another data source.")
def fetch_data(self):
mappings = self.config.get("mappings", [])
if not isinstance(mappings, list):
raise TypeError("Email template 'mappings' must be a list.")
trigger_model_mapping = next((m for m in mappings if m.get("is_trigger_object", False)), None)
if trigger_model_mapping:
model_cls = self._get_model(trigger_model_mapping["model"])
lookup_field = trigger_model_mapping["lookup_field"]
lookup_value = self._resolve_lookup_value(trigger_model_mapping["lookup_value_from"])
alias = trigger_model_mapping["alias"]
if lookup_value is not None:
self.data_context[alias] = model_cls.objects.filter(**{lookup_field: lookup_value}).first()
else:
self.data_context[alias] = None
for mapping in mappings:
if mapping.get("is_trigger_object", False):
continue
model_cls = self._get_model(mapping["model"])
lookup_field = mapping["lookup_field"]
lookup_value = self._resolve_lookup_value(mapping["lookup_value_from"])
alias = mapping["alias"]
if lookup_value is None:
self.data_context[alias] = None if mapping.get("type") == "object" else []
continue
queryset = model_cls.objects.filter(**{lookup_field: lookup_value})
if mapping.get("type") == "object":
self.data_context[alias] = queryset.first()
elif mapping.get("type") == "list":
self.data_context[alias] = list(queryset)
def _format_value(self, value, format_config):
"""Applies formatting to a value based on configuration."""
if value is None:
return ""
format_type = format_config.get("type")
if not format_type:
return str(value)
try:
if format_type == "currency":
return "{:,}".format(np.int64(value)).replace(",", ".")
if format_type == "date":
date_format = format_config.get("format", "dd/mm/YYYY").replace("dd", "%d").replace("mm", "%m").replace("YYYY", "%Y")
return value.strftime(date_format)
if format_type == "number_to_words":
return num2words(value, lang=format_config.get("lang", "vi"))
if format_type == "conditional":
return format_config["true_value"] if value else format_config["false_value"]
except Exception as e:
print(f"Error formatting value '{value}' with config '{format_config}': {e}")
return ""
return str(value)
def prepare_replacements(self):
today = datetime.now()
self.replacements['[day]'] = str(today.day)
self.replacements['[month]'] = str(today.month)
self.replacements['[year]'] = str(today.year)
self.replacements['[date]'] = today.strftime("%d/%m/%Y")
mappings = self.config.get("mappings", [])
for mapping in mappings:
alias = mapping["alias"]
data = self.data_context.get(alias)
fields = mapping.get("fields", {})
if mapping.get("type") == "object":
if data is None:
for placeholder in fields:
self.replacements[placeholder] = ""
continue
for placeholder, config in fields.items():
if isinstance(config, dict):
value = self._get_value_from_object(data, config["source"])
self.replacements[placeholder] = self._format_value(value, config.get("format", {}))
else:
value = self._get_value_from_object(data, config)
self.replacements[placeholder] = str(value) if value is not None else ""
def run(self):
try:
print(f"Running email job for template: {self.template.name}")
self.fetch_data()
self.prepare_replacements()
subject_template = self.config.get("subject", "")
body_template = self.config.get("content", "")
recipient_placeholder = self.config.get("recipient_placeholder", "[customer.email]")
sender_id = self.config.get("sender_id", 1)
image_url = self.config.get("imageUrl", "https://api.bigdatatech.vn/static/files/20251113051227-1.png")
final_subject = subject_template
final_body = body_template
for key, value in self.replacements.items():
final_subject = final_subject.replace(key, str(value))
final_body = final_body.replace(key, str(value))
recipient_email = self.replacements.get(recipient_placeholder)
if not recipient_email:
print(f"Email job '{self.template.name}' failed: Recipient email not found for placeholder '{recipient_placeholder}'.")
return False
# Load the base template
template_path = os.path.join(settings.BASE_DIR, 'app', 'email_base_template.html')
with open(template_path, 'r', encoding='utf-8') as f:
full_email_content = f.read()
# Replace placeholders in the base template
full_email_content = full_email_content.replace('{{subject}}', final_subject)
full_email_content = full_email_content.replace('{{body_content}}', final_body)
full_email_content = full_email_content.replace('{{image_url}}', image_url)
print(f"Sending email to '{recipient_email}' with subject '{final_subject}'")
success = send_via_zeptomail(
receiver=recipient_email,
subject=final_subject,
content=full_email_content,
sender=sender_id
)
if success:
print(f"Email job '{self.template.name}' completed successfully.")
else:
print(f"Email job '{self.template.name}' failed during sending.")
return success
except Exception as e:
print(f"An unexpected error occurred in EmailJobRunner for template '{self.template.name}': {e}")
import traceback
traceback.print_exc()
return False