Files
api/app/views.py
anhduy-tech 69937da0b6 changes
2026-01-30 12:03:15 +07:00

1614 lines
70 KiB
Python

import os, ast, re, mimetypes
import cv2, requests, secrets, csv
from django.apps import apps
from rest_framework import status, serializers
from rest_framework.decorators import api_view
from rest_framework.response import Response
from app.models import *
from django.contrib.auth.hashers import make_password, check_password
from django.http import FileResponse
from django.db import models as aggregator
from django.db.models.functions import Concat, RowNumber, Coalesce, Length, ExtractDay, TruncDate, Cast
from django.db.models import F, Value, TextField, CharField, Q, Count, Min, Max, Sum, Func, JSONField, FloatField, Case, When, OuterRef, Subquery, NOT_PROVIDED
from datetime import datetime, date, timedelta
from django.contrib.postgres.aggregates import ArrayAgg
from django.db.models.expressions import Window
from operator import add, sub, mul, truediv
from django.http.response import StreamingHttpResponse, HttpResponse
from wsgiref.util import FileWrapper
from django.core.cache import cache
from django.views.decorators.csrf import csrf_exempt
from django.http.request import QueryDict
from app.querydict import querydict_to_nested_dict
from django.http import JsonResponse
from app.document_generator import DocumentGenerator
from django.core.cache import cache
from concurrent.futures import ThreadPoolExecutor
import json
import io
import pandas as pd
import numpy as np
from num2words import num2words
from django.db import transaction
from django.apps import apps
from django.core.exceptions import FieldDoesNotExist
from django.db.models import fields as models_fields
from django.db.models import CharField, TextField
from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework import status
from rest_framework.parsers import MultiPartParser, FormParser
import time
limit_rows = 2000
perpage = 20
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
static_folder = os.path.join(BASE_DIR, "static")
TextField.register_lookup(Length, 'length')
CharField.register_lookup(Length, 'length')
BLOCKED_HOST = ["api.utopia.com.vn", "dev.api.utopia.com.vn"]
#=============================================================================
def check_access(request):
# origin = request.headers.get("Origin")
# host = request.get_host()
# user_agent = request.META.get("HTTP_USER_AGENT", "").lower()
# if "dart" in user_agent:
# return True
# if not origin and host in BLOCKED_HOST:
# return False
return True
# Get limit rows
#=============================================================================
def get_limit_rows(rows, page, onpage):
total_rows = rows.count()
full_data = True if (total_rows <= limit_rows) or page == -1 else False
if full_data == True and onpage != None:
full_data = False if total_rows > onpage else full_data
if full_data == False:
onpage = onpage if onpage != None else perpage
rows = rows[(page-1) * onpage : page * onpage]
return total_rows, full_data, rows
#=============================================================================
#--- get ip ---
def get_client_ip(request):
x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
if x_forwarded_for:
ip = x_forwarded_for.split(',')[0]
else:
ip = request.META.get('REMOTE_ADDR')
return ip
#=============================================================================
def get_serializer(name):
try:
Model = apps.get_model('app', name)
except:
return None, None
class GenericSerializer(serializers.ModelSerializer):
class Meta:
model = Model
fields = '__all__'
def create(self, validated_data):
return Model.objects.create(**validated_data)
def update(self, instance, validated_data):
for attr, value in validated_data.items():
setattr(instance, attr, value)
instance.save()
return instance
return Model, GenericSerializer
#=============================================================================
def update_increment(name):
value = cache.get(name)
if value == None:
cache.set(name, {'date': date.today(), 'current': 1}, timeout=2592000)
else:
if value['date'] == date.today():
value['current'] += 1
cache.set(name, value, timeout=2592000)
else:
cache.set(name, {'date': date.today(), 'current': 1}, timeout=2592000)
#=============================================================================
def current_increment(name, prefix):
value = cache.get(name)
if value == None:
cache.set(name, {'date': date.today(), 'current': 0}, timeout=2592000)
next = 1
current_date = date.today()
else:
next = value['current'] + 1
current_date = value['date']
dt = datetime.strptime(str(current_date), '%Y-%m-%d')
formatted = dt.strftime('%d%m%y')
return f"{prefix}{formatted}{next:03d}"
#=============================================================================
@api_view(['GET'])
def get_increment(request, name, prefix):
value = cache.get(name)
if value == None:
cache.set(name, {'date': date.today(), 'current': 0}, timeout=2592000)
next = 1
current_date = date.today()
else:
next = value['current'] + 1
current_date = value['date']
dt = datetime.strptime(str(current_date), '%Y-%m-%d')
formatted = dt.strftime('%d%m%y')
return Response(f"{prefix}{formatted}{next:03d}")
#=============================================================================
@api_view(['GET'])
def get_increment_next(request, name, prefix):
value = cache.get(name)
if value == None:
cache.set(name, {'date': date.today(), 'current': 0}, timeout=2592000)
next = 1
current_date = date.today()
else:
next = value['current'] + 1
current_date = value['date']
cache.set(name, {'date': date.today(), 'current': next}, timeout=2592000)
dt = datetime.strptime(str(current_date), '%Y-%m-%d')
formatted = dt.strftime('%d%m%y')
return Response(f"{prefix}{formatted}{next:03d}")
#=============================================================================
def subquery(value):
Model, serializer_class = get_serializer(value['subquery']['model'])
column = value['subquery']['column']
field = value['field']
func = value['type']
filter = value['filter'] if 'filter' in value else {}
filter[column] = OuterRef('pk')
query = Model.objects.filter(**filter
).annotate(
value_query=Func(F(field), function=func)
).values('value_query')
return query
#=============================================================================
def base_query(rows, values, summary, distinct_values):
need_serializer = True
funcs = {'Count': Count, 'Min': Min, 'Max': Max, 'Sum': Sum, 'ExtractDay': ExtractDay}
if values != None:
rows = rows.values(*values)
need_serializer = False
if summary =='distinct':
distinct_values = '' if distinct_values==None else distinct_values.split(',')
rows = rows.distinct(*distinct_values)
elif summary == 'count':
rows = rows.count()
elif summary =='annotate':
ele = {}
for key, value in ast.literal_eval(distinct_values).items():
if isinstance(value, str) == True:
reducer = getattr(aggregator, value)
ele[key] = reducer(key)
else:
if value['type'] == 'Concat':
arr = []
char = value['char'] if 'char' in value else ' / '
for idx, field in enumerate(value['field']):
arr.append(F(field))
arr.append(Value(char)) if idx < len(value['field']) - 1 else False
reducer = Concat(*arr, output_field=CharField())
if value['type'] == 'RowNumber':
reducer = Window(expression=RowNumber())
elif value['type'] == 'ArrayAgg':
arr1 = []
for field in value['field']:
arr1.append(Value(field))
arr1.append(field)
reducer = ArrayAgg(Func(*arr1, function="jsonb_build_object", output_field=JSONField()), distinct=True)
elif value['type'] in funcs:
func = funcs[value['type']]
arr = Q()
if 'subquery' in value:
ele[key] = Subquery(subquery(value))
continue;
if 'filter' in value:
for fkey in value['filter']:
arr.add(Q(**{fkey: value['filter'][fkey]}), Q.AND)
if 'formula' in value:
reducer = None; exp = None
operator = {'+': add, '-': sub, '*': mul, ':': truediv, 'or': Coalesce}
keyword = {'now': Value(datetime.now())}
for field in value['formula']:
if field in operator:
exp = operator[field]
else:
if isinstance(field, str):
expression = keyword[field] if field in keyword else F(field)
else:
expression = Value(field)
if value['type'] == 'ExtractDay':
expression = TruncDate(expression)
reducer = expression if reducer==None else exp(reducer, expression)
reducer = func(reducer, filter=arr, output_field=FloatField())
else:
reducer = func(value['field'], filter=arr, distinct= True if 'distinct' in value else False)
# reducer
ele[key] = reducer
# query
rows = rows.annotate(**ele)
elif summary == 'aggregate':
ele = {}
for key, value in ast.literal_eval(distinct_values).items():
arr = Q()
if 'filter' in value:
for fkey in value['filter']:
arr.add(Q(**{fkey: value['filter'][fkey]}), Q.AND)
func = funcs[value['type']]
reducer = func(value['field'], filter=arr)
ele[key] = reducer
rows = rows.aggregate(**ele)
# return
return rows, need_serializer
#=============================================================================
def calculate(rows, calculation):
divcheck = None
ele = {}
for key, value in ast.literal_eval(calculation).items():
reducer = None; exp = None
operator = {'+': add, '-': sub, '*': mul, ':': truediv, 'or': Coalesce}
for field in value['formula']:
if field in operator:
exp = operator[field]
else:
expression = F(field) if isinstance(field, str) else Value(field)
reducer = expression if reducer==None else exp(reducer, expression)
if exp == truediv:
divcheck = {field: 0}
if divcheck:
reducer = Case(When(**divcheck, then=Value(None)), **{'default': reducer}, output_field=FloatField())
ele[key] = reducer
rows = rows.annotate(**ele)
return rows
#=============================================================================
def final_result(rows, calculation=None, final_filter=None, final_exclude=None, sort=None):
if calculation:
rows = calculate(rows, calculation)
if final_filter:
filter_list = Q()
for key, value in final_filter.items():
if isinstance(value, dict) and value.get('type') == 'F':
filter_list.add(Q(**{key: F(value['field'])}), Q.AND)
else:
filter_list.add(Q(**{key: value}), Q.AND)
rows = rows.filter(filter_list)
if final_exclude:
exclude_list = Q()
for key, value in final_exclude.items():
if isinstance(value, dict) and value.get('type') == 'F':
exclude_list.add(Q(**{key: F(value['field'])}), Q.AND)
else:
exclude_list.add(Q(**{key: value}), Q.AND)
rows = rows.exclude(exclude_list)
# sort
if sort:
rows = rows.order_by(*sort)
return rows
#=============================================================================
@api_view(['GET', 'POST'])
def data_list(request, name):
Model, serializer_class = get_serializer(name)
if Model == None:
return Response(status=status.HTTP_400_BAD_REQUEST)
# check access
if check_access(request)==False:
return JsonResponse({"detail": "Direct access not allowed"}, status=403)
filter = request.query_params['filter'] if request.query_params.get('filter') != None else None
values = request.query_params['values'] if request.query_params.get('values') != None else None
values = values if values==None else values.split(',')
summary = request.query_params['summary'] if request.query_params.get('summary') != None else None
page = int(request.query_params['page']) if request.query_params.get('page') != None else 1
onpage = int(request.query_params['perpage']) if request.query_params.get('perpage') != None else None
sort = request.query_params['sort'] if request.query_params.get('sort') != None else None
sort = None if sort==None else sort.split(',')
distinct_values = request.query_params['distinct_values'] if request.query_params.get('distinct_values') != None else None
filter_or = request.query_params['filter_or'] if request.query_params.get('filter_or') != None else None
exclude = request.query_params['exclude'] if request.query_params.get('exclude') != None else None
calculation = request.query_params['calculation'] if request.query_params.get('calculation') != None else None
final_filter = request.query_params['final_filter'] if request.query_params.get('final_filter') != None else None
final_exclude = request.query_params['final_exclude'] if request.query_params.get('final_exclude') != None else None
cache_info = request.query_params['cache'] if request.query_params.get('cache') != None else None
if cache_info != None:
cache_info = ast.literal_eval(cache_info)
cache_value = cache.get(cache_info["key"])
if cache_value != None:
return Response({'total_rows': len(cache_value), 'full_data': True, 'rows': cache_value})
need_serializer = True
filter_list = Q()
if filter_or != None:
field_map = {f.name: f for f in Model._meta.get_fields()}
for key, value in ast.literal_eval(filter_or).items():
lookup_parts = key.split('__')
base_field_name = lookup_parts[0]
if base_field_name not in field_map:
continue
field_obj = field_map[base_field_name]
if field_obj.is_relation and len(lookup_parts) == 2 and lookup_parts[1] == 'icontains':
continue
is_numeric = isinstance(field_obj, (aggregator.IntegerField, aggregator.DecimalField, aggregator.FloatField))
is_date = isinstance(field_obj, (aggregator.DateField, aggregator.DateTimeField))
if (is_numeric or is_date) and key.endswith('__icontains'):
continue
filter_list.add(Q(**{key: value}), Q.OR)
if filter != None:
for key, value in ast.literal_eval(filter).items():
if isinstance(value, dict) == True:
if value['type'] == 'F':
filter_list.add(Q(**{key: F(value['field'])}), Q.AND)
else:
filter_list.add(Q(**{key: value}), Q.AND)
if request.method == 'GET':
rows = Model.objects.all() if len(filter_list) == 0 else Model.objects.filter(filter_list)
if exclude != None:
exclude_list = Q()
for key, value in ast.literal_eval(exclude).items():
if isinstance(value, dict) == True:
if value['type'] == 'F':
exclude_list.add(Q(**{key: F(value['field'])}), Q.AND)
else:
exclude_list.add(Q(**{key: value}), Q.AND)
rows = rows.exclude(exclude_list)
rows, need_serializer = base_query(rows, values, summary, distinct_values)
rows = final_result(rows, calculation, final_filter, final_exclude, sort)
if summary == 'count' or summary == 'aggregate':
return Response({'total_rows': 1, 'full_data': True, 'rows': rows})
total_rows, full_data, rows = get_limit_rows(rows, page, onpage)
if need_serializer == True:
rows = serializer_class(rows, many=True).data
if cache_info:
# Lưu giá trị vào cache
cache.set(cache_info['key'], rows, timeout=cache_info['timeout'])
return Response({'total_rows': total_rows, 'full_data': full_data, 'rows': rows})
elif request.method == 'POST':
serializer = serializer_class(data = request.data)
if serializer.is_valid():
serializer.save()
data = serializer.data
# update increment
update_increment(name)
if values != None:
rows = Model.objects.filter(id=data['id'])
rows, need_serializer = base_query(rows, values, summary, distinct_values)
rows = final_result(rows, calculation, final_filter, final_exclude, sort)
if need_serializer == True:
rows = serializer_class(rows, many=True).data
return Response(rows[0])
return Response(data, status=status.HTTP_201_CREATED)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
#=============================================================================
@api_view(['GET', 'PUT', 'DELETE', 'PATCH'])
def data_detail(request, name, pk):
# check access
if check_access(request) == False:
return JsonResponse({"detail": "Direct access not allowed"}, status=403)
query_params = querydict_to_nested_dict(request.query_params)
query_params = {} if not query_params else query_params
values = query_params['values'] if query_params.get('values') != None else None
values = values if values == None else values.split(',')
summary = query_params['summary'] if query_params.get('summary') != None else None
sort = query_params['sort'] if query_params.get('sort') != None else None
sort = sort if sort == None else sort.split(',')
distinct_values = query_params['distinct_values'] if query_params.get('distinct_values') != None else None
calculation = query_params['calculation'] if query_params.get('calculation') != None else None
Model, serializer_class = get_serializer(name)
if Model == None:
return Response(status=status.HTTP_400_BAD_REQUEST)
try:
obj = Model.objects.get(pk=pk)
except Model.DoesNotExist:
return Response(status=status.HTTP_404_NOT_FOUND)
if request.method == 'GET':
serializer = serializer_class(obj)
return Response(serializer.data)
elif request.method == 'PUT':
serializer = serializer_class(obj, data=request.data)
if serializer.is_valid():
serializer.save()
data = serializer.data
if values != None:
rows = Model.objects.filter(id=data['id'])
rows, need_serializer = base_query(rows, values, summary, str(distinct_values))
rows = final_result(rows, calculation)
return Response(rows.first())
else:
return Response(data)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
elif request.method == 'PATCH':
serializer = serializer_class(obj, data=request.data, partial=True)
if serializer.is_valid():
serializer.save()
data = serializer.data
if values != None:
rows = Model.objects.filter(id=data['id'])
rows, need_serializer = base_query(rows, values, summary, str(distinct_values))
rows = final_result(rows, calculation)
return Response(rows.first())
else:
return Response(data)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
elif request.method == 'DELETE':
if name == 'File' or name == 'Image' or name == 'Video':
file_name = static_folder + ('/' + name.lower() + 's/') + obj.file
if os.path.exists(file_name):
os.remove(file_name)
try:
obj.delete()
except Exception as e:
print(e)
return Response(data=str(e), status=status.HTTP_400_BAD_REQUEST)
return Response(status=status.HTTP_204_NO_CONTENT)
#====================================================================
@api_view(['POST'])
def import_data(request, name):
Model, serializer_class = get_serializer(name)
if Model == None:
return Response(status=status.HTTP_400_BAD_REQUEST)
# check access
if check_access(request)==False:
return JsonResponse({"detail": "Direct access not allowed"}, status=403)
data = [request.data.dict()] if type(request.data) == QueryDict else request.data
query_params = querydict_to_nested_dict(request.query_params)
query_params = {} if not query_params else query_params
values = query_params['values'] if query_params.get('values') != None else None
values = values if values==None else values.split(',')
summary = query_params['summary'] if query_params.get('summary') != None else None
sort = query_params['sort'] if query_params.get('sort') != None else None
sort = sort if sort==None else sort.split(',')
distinct_values = query_params['distinct_values'] if query_params.get('distinct_values') != None else None
calculation = query_params['calculation'] if query_params.get('calculation') != None else None
error = False
return_data = []
for row in data:
try:
if 'id' in row:
ele = Model.objects.filter(pk=row['id']).first()
if ele == None:
serializer = serializer_class(data = row, partial=True) # insert
else:
serializer = serializer_class(ele, data=row, partial=True) # update
else:
serializer = serializer_class(data = row)
if serializer.is_valid():
serializer.save()
return_data.append(serializer.data if values == None else serializer.data['id'])
else:
row['error'] = True
row['note'] = serializer.errors
error = True
except:
row['error'] = True
error = True
if error == True:
return Response(data)
elif values == None:
return Response(return_data)
else:
rows = Model.objects.filter(id__in = return_data)
rows, need_serializer = base_query(rows, values, summary, str(distinct_values))
rows = final_result(rows, calculation)
return Response(rows)
#=============================================================================
@api_view(['POST'])
def delete_data(request, name):
Model, serializer_class = get_serializer(name)
if Model == None:
return Response(status=status.HTTP_400_BAD_REQUEST)
from django.http.request import QueryDict
data = [request.data.dict()] if type(request.data) == QueryDict else request.data
for row in data:
try:
if 'id' in row:
ele = Model.objects.filter(pk=row['id']).first()
if ele == None:
row['error'] = True
row['note'] = 'id=' + str(row['id']) + ' not exist'
else:
ele.delete()
row['deleted'] = True
else:
row['error'] = True
row['note'] = 'field id not found'
except Exception as e:
row['error'] = True
row['note'] = str(e)
# return
return Response(data)
#=============================================================================
@api_view(['POST'])
def get_hash(request):
if request.method == 'POST':
text = request.data['text']
password = make_password(text)
return Response({'total_rows': 1, 'full_data': True , 'rows': [password]})
return Response(status = status.HTTP_400_BAD_REQUEST)
#=============================================================================
@api_view(['GET'])
def login(request):
if request.method == 'GET':
filter = request.query_params['filter'] if request.query_params.get('filter') != None else None
filter = ast.literal_eval(filter)
values = request.query_params['values'] if request.query_params.get('values') != None else None
values = values if values==None else values.split(',')
need_serializer = False
if values == None:
user = User.objects.filter(username=filter['username']).first()
need_serializer = True
else:
user = User.objects.filter(username=filter['username']).values(*values).first()
if user == None:
return Response(None)
result = check_password(filter['password'], user.password if need_serializer == True else user['password'])
if result == False:
return Response(None)
if need_serializer == True:
Model, serializer_class = get_serializer('User')
serializer = serializer_class(user)
return Response({'total_rows': 1, 'full_data': True , 'rows': serializer.data})
return Response({'total_rows': 1, 'full_data': True , 'rows': user})
return Response(status = status.HTTP_400_BAD_REQUEST)
#=============================================================================
@api_view(['POST'])
def signin(request):
username = request.data['username']
password = request.data['password']
user = User.objects.filter(username=username).first()
if user:
result = check_password(password, user.password)
if result == False:
return Response("invalid")
else:
info = User.objects.filter(pk=user.id).values('id','username','avatar','fullname','auth_status','auth_status__code','auth_status__name').first()
return Response(info)
# invalid
return Response(status = status.HTTP_400_BAD_REQUEST)
#=============================================================================
@api_view(['POST'])
def check_pin(request):
username = request.data['username']
pin = request.data['pin']
user = User.objects.filter(username=username).first()
if user:
result = check_password(pin, user.pin)
if result == False:
return Response("invalid")
else:
info = User.objects.filter(pk=user.id).values('id','username','avatar','fullname','auth_status','auth_status__code','auth_status__name').first()
return Response(info)
# invalid
return Response(status = status.HTTP_400_BAD_REQUEST)
#=============================================================================
def convert_webp(name, convert=80):
jpg_file_path = '{}/files/{}'.format(static_folder, name)
arr = name.split('.')
text = '.' + arr[len(arr)-1]
if text == '.webp':
return name
new_name = name.replace(text, '.webp')
webp_file_path = '{}/files/{}'.format(static_folder, new_name)
# Read the JPG image
jpg_img = cv2.imread(jpg_file_path)
# Save the image in JPG format with specific quality
cv2.imwrite(webp_file_path, jpg_img, [int(cv2.IMWRITE_WEBP_QUALITY), int(convert)])
os.remove(jpg_file_path)
return new_name
#=============================================================================
@api_view(['POST'])
def upload(request):
upload_folder = static_folder + '/files/'
Model, serializer_class = get_serializer('File')
# start
if request.method == 'POST':
file = request.data['file']
filename = request.data['filename']
convert = request.data['convert'] if 'convert' in request.data else None
doc_type = request.data['doc_type'] if 'doc_type' in request.data else None
# check type
type = 1
if request.data['type'] == 'video':
type = 3
elif request.data['type'] == 'image':
type = 2
# start upload
try:
with open(upload_folder + filename, 'wb+') as destination:
for chunk in file.chunks():
destination.write(chunk)
if type == 2 and convert != '0':
filename = convert_webp(filename, convert)
# save record
data = {'type': type, 'user': request.data['user'], 'name': request.data['name'], 'file': filename,
'size': request.data['size'], "doc_type": doc_type}
serializer = serializer_class(data = data)
if serializer.is_valid():
serializer.save()
else:
print(serializer.errors)
return Response({'total_rows': 1, 'full_data': True, 'rows': [serializer.data]})
except:
Response(status = status.HTTP_400_BAD_REQUEST)
# return
return Response(status = status.HTTP_400_BAD_REQUEST)
#=============================================================================
@csrf_exempt
@api_view(['GET'])
def download(request):
upload_folder = static_folder + '/files/'
name = request.query_params['name'] if request.query_params.get('name') != None else None
type = request.query_params['type'] if request.query_params.get('type') != None else None
if type=='contract':
upload_folder = static_folder + '/contract/'
if name == None:
return Response(status = status.HTTP_400_BAD_REQUEST)
if request.method == 'GET':
if os.path.exists(upload_folder + name):
response = FileResponse(open(upload_folder + name, 'rb'), as_attachment=True)
response["Access-Control-Allow-Origin"] = "*"
response["Access-Control-Expose-Headers"] = "Content-Disposition"
return response
return Response(status = status.HTTP_400_BAD_REQUEST)
#=============================================================================
@csrf_exempt
@api_view(['GET'])
def download_contract(request, name):
upload_folder = static_folder + '/contract/'
if name == None:
return Response(status = status.HTTP_400_BAD_REQUEST)
if request.method == 'GET':
if os.path.exists(upload_folder + name):
response = FileResponse(open(upload_folder + name, 'rb'), as_attachment=True)
response["Access-Control-Allow-Origin"] = "*"
response["Access-Control-Expose-Headers"] = "Content-Disposition"
return response
return Response(status = status.HTTP_400_BAD_REQUEST)
#=============================================================================
@api_view(['POST'])
def batch_upload(request):
folder = request.data['folder']
file = request.data['file']
filename = request.data['name']
upload_folder = static_folder + '/' + folder + '/'
try:
with open(upload_folder + filename, 'wb+') as destination:
for chunk in file.chunks():
destination.write(chunk)
return Response({'filename': filename, 'path': upload_folder + '/' + filename})
except:
Response(status = status.HTTP_400_BAD_REQUEST)
#=============================================================================
def write_log(data):
Model, serializer_class = get_serializer('Log')
serializer = serializer_class(data=data)
if serializer.is_valid():
serializer.save()
else:
print(serializer.errors)
#=============================================================================
@api_view(['POST'])
def auth_token(request):
data = request.data
Model, serializer_class = get_serializer('Token')
row = Model.objects.filter(token=data['token']).first()
if row:
serializer = serializer_class(row)
return Response(serializer.data)
# new
data['ip'] = get_client_ip(request)
# get ip info
url = "https://ipinfo.io/{}?token=1cc0a688798cf7".format(data['ip'])
try:
rs = requests.get(url, timeout=2)
obj = rs.json()
for key in obj:
data[key] = obj[key]
except:
print("An exception occurred")
# save
print(data)
serializer = serializer_class(data = data)
if serializer.is_valid():
serializer.save()
return Response(serializer.data)
else:
return Response(status = status.HTTP_400_BAD_REQUEST)
#=============================================================================
class RangeFileWrapper(object):
def __init__(self, filelike, blksize=8192, offset=0, length=None):
self.filelike = filelike
self.filelike.seek(offset, os.SEEK_SET)
self.remaining = length
self.blksize = blksize
def close(self):
if hasattr(self.filelike, 'close'):
self.filelike.close()
def __iter__(self):
return self
def __next__(self):
if self.remaining is None:
# If remaining is None, we're reading the entire file.
data = self.filelike.read(self.blksize)
if data:
return data
raise StopIteration()
else:
if self.remaining <= 0:
raise StopIteration()
data = self.filelike.read(min(self.remaining, self.blksize))
if not data:
raise StopIteration()
self.remaining -= len(data)
return data
#=============================================================================
def stream_video(request, path):
range_re = re.compile(r'bytes\s*=\s*(\d+)\s*-\s*(\d*)', re.I)
range_header = request.META.get('HTTP_RANGE', '').strip()
range_match = range_re.match(range_header)
path = static_folder + '/files/' + path
size = os.path.getsize(path)
content_type, encoding = mimetypes.guess_type(path)
content_type = content_type or 'application/octet-stream'
if range_match:
first_byte, last_byte = range_match.groups()
first_byte = int(first_byte) if first_byte else 0
last_byte = int(last_byte) if last_byte else size - 1
if last_byte >= size:
last_byte = size - 1
length = last_byte - first_byte + 1
resp = StreamingHttpResponse(RangeFileWrapper(open(path, 'rb'), offset=first_byte, length=length), status=206, content_type=content_type)
resp['Content-Length'] = str(length)
resp['Content-Range'] = 'bytes %s-%s/%s' % (first_byte, last_byte, size)
else:
resp = StreamingHttpResponse(FileWrapper(open(path, 'rb')), content_type=content_type)
resp['Content-Length'] = str(size)
resp['Accept-Ranges'] = 'bytes'
return resp
#=============================================================================
@api_view(['GET'])
def get_cache(request, name):
value = cache.get(name)
return Response(value)
#=============================================================================
@api_view(['GET'])
def delete_cache(request, name):
cache.delete(name)
return Response(status=status.HTTP_200_OK)
#=============================================================================
@api_view(['GET'])
def get_model(request):
# Lấy toàn bộ models trong project
arr = []
all_models = apps.get_models()
for model in all_models:
if str(model._meta).find("auth.")>=0:
continue
# next
fields = []
for field in model._meta.get_fields():
info = {'name': field.name, 'type': field.get_internal_type()}
info['unique'] = field.unique if hasattr(field, 'unique') else None
info['null'] = field.null if hasattr(field, 'null') else None
if hasattr(field, 'default'):
info['default'] = str(field.default) if field.default is not NOT_PROVIDED else None
# foreign keys
if field.get_internal_type() == 'ForeignKey':
model_class = field.remote_field.model
info['model'] = model_class.__name__
if isinstance(field, (models.OneToOneRel, models.ManyToOneRel)):
info['relation'] = type(field).__name__
else:
info['relation'] = "OneToManyRel"
# insert
fields.append(info)
arr.append({'model': model.__name__, 'fields':fields})
return Response({'total_rows': len(arr), 'full_data': True, 'rows': arr})
#=============================================================================
@api_view(['GET'])
def get_password(request, text):
password = make_password(text)
return Response(password)
#=============================================================================
@api_view(['GET'])
def export_csv(request, name):
filter = request.query_params['filter'] if request.query_params.get('filter') != None else None
values = request.query_params['values'] if request.query_params.get('values') != None else None
values = values if values==None else values.split(',')
summary = request.query_params['summary'] if request.query_params.get('summary') != None else None
sort = request.query_params['sort'] if request.query_params.get('sort') != None else None
sort = None if sort==None else sort.split(',')
distinct_values = request.query_params['distinct_values'] if request.query_params.get('distinct_values') != None else None
filter_or = request.query_params['filter_or'] if request.query_params.get('filter_or') != None else None
exclude = request.query_params['exclude'] if request.query_params.get('exclude') != None else None
calculation = request.query_params['calculation'] if request.query_params.get('calculation') != None else None
final_filter = request.query_params['final_filter'] if request.query_params.get('final_filter') != None else None
final_exclude = request.query_params['final_exclude'] if request.query_params.get('final_exclude') != None else None
fields = request.query_params['fields'] if request.query_params.get('fields') != None else None
# get model
Model, serializer_class = get_serializer(name)
if Model == None:
return Response(status=status.HTTP_400_BAD_REQUEST)
# filter
filter_list = Q()
if filter_or != None:
for key, value in ast.literal_eval(filter_or).items():
filter_list.add(Q(**{key: value}), Q.OR)
if filter != None:
for key, value in ast.literal_eval(filter).items():
if isinstance(value, dict) == True:
if value['type'] == 'F':
filter_list.add(Q(**{key: F(value['field'])}), Q.AND)
else:
filter_list.add(Q(**{key: value}), Q.AND)
rows = Model.objects.all() if len(filter_list) == 0 else Model.objects.filter(filter_list)
if exclude != None:
exclude_list = Q()
for key, value in ast.literal_eval(exclude).items():
if isinstance(value, dict) == True:
if value['type'] == 'F':
exclude_list.add(Q(**{key: F(value['field'])}), Q.AND)
else:
exclude_list.add(Q(**{key: value}), Q.AND)
rows = rows.exclude(exclude_list)
rows, need_serializer = base_query(rows, values, summary, distinct_values)
rows = final_result(rows, calculation, final_filter, final_exclude, sort)
columns = ast.literal_eval(fields)
dtFields = {}
for field in Model._meta.get_fields():
if field.get_internal_type() == 'DateTimeField':
dtFields[field.name] = 'DateTimeField'
# Lấy tất cả dữ liệu từ model Customer và ghi vào CSV
output = []
for row in rows:
arr = []
for o in columns:
name = o.get('name')
if type(row) == dict:
val = row[name] if name in row else None
else:
val = getattr(row, name)
if dtFields.get(name) != None and val != None:
val = row[name].strftime("%d/%m/%Y %H:%M:%S")
arr.append(val)
output.append(arr)
# Thiết lập HTTP response để tải file CSV
response = HttpResponse(content_type='text/csv; charset=utf-8')
response['Content-Disposition'] = 'attachment; filename="data.csv"'
# Thêm BOM cho UTF-8-SIG (tùy chọn, cần cho Excel)
response.write(b'\xef\xbb\xbf') # BOM cho UTF-8-SIG
writer = csv.writer(response, lineterminator='\n')
writer.writerow([o.get('label') for o in columns])
writer.writerows(output)
return response
#=============================================================================
@api_view(['GET', 'POST'])
def get_otp(request):
if request.method == 'GET':
max = 10
phone = request.query_params['phone'] if request.query_params.get('phone') != None else None
if phone == None:
return Response(status = status.HTTP_400_BAD_REQUEST)
# check sent today
rows = Phone_Otp.objects.filter(phone=phone, create_time__date=datetime.now().strftime("%Y-%m-%d"))
if rows.count() > max:
return Response({"status": "error", "text": "The OTP sending limit for today has been reached. Please wait until tomorrow to continue"})
code = current_increment('Phone_Otp', 'OTP')
otp = ''.join(str(secrets.randbelow(10)) for _ in range(6))
setting = System_Setting.objects.filter(classify="template", code="otp").first()
content = setting.vi
content = content.replace("<otp>", otp)
valid_to = datetime.now() + timedelta(minutes=5)
data = {"code": code, "phone": phone, "otp": otp, "status": 1, "sms_content": content, "sms_fee": 1000, "valid_to": valid_to}
obj = {"phone": phone, "message": content, "shop": 1, "type": 1, "agent": 3}
url = "https://accountapi.loan247.vn/send-sms/"
response = requests.post(url, obj)
try:
data['sms_info'] = response.json()
data['result'] = 2
except Exception as e:
print(e)
data['result'] = 3
Model, serializer_class = get_serializer("Phone_Otp")
serializer = serializer_class(data=data)
if serializer.is_valid():
serializer.save()
update_increment('Phone_Otp')
return Response(serializer.data)
else:
print(serializer.errors)
return Response(serializer.errors)
elif request.method == 'POST':
phone = request.data['phone']
otp = request.data['otp']
row = Phone_Otp.objects.filter(phone=phone, otp=otp, expiry=False).first()
if row:
row.expiry = True
row.status = Auth_Status.objects.filter(code="auth").first()
row.save()
info = Phone_Otp.objects.filter(pk=row.id).values('id', 'code', 'phone', 'otp', 'expiry', 'status', 'status__code', 'status__name').first()
return Response(info)
else:
return Response(status = status.HTTP_400_BAD_REQUEST)
return Response(status = status.HTTP_400_BAD_REQUEST)
#=============================================================================
@api_view(['GET'])
def set_token_expiry(request):
username = request.query_params['username'] if request.query_params.get('username') != None else None
reset = request.query_params['reset'] if request.query_params.get('reset') != None else None
if username:
tokens = Token.objects.filter(user__username=username, expiry=False)
tokens.update(expiry=True)
elif reset == "yes":
tokens = Token.objects.filter(expiry=False)
tokens.update(expiry=True)
return Response(status = status.HTTP_200_OK)
#=============================================================================
class ExcelImportAPIView(APIView):
parser_classes = (MultiPartParser, FormParser)
def post(self, request, format=None):
excel_file = request.FILES.get('file')
if not excel_file:
return Response({'error': 'No Excel file provided (key "file" not found)'}, status=status.HTTP_400_BAD_REQUEST)
config_str = request.data.get('config')
if not config_str:
return Response({'error': 'No configuration provided (key "config" not found)'}, status=status.HTTP_400_BAD_REQUEST)
try:
config = json.loads(config_str)
except json.JSONDecodeError:
return Response({'error': 'Invalid JSON configuration'}, status=status.HTTP_400_BAD_REQUEST)
model_name = config.get('model_name')
mappings = config.get('mappings', [])
import_mode = config.get('import_mode', 'insert_only')
header_row_excel = config.get('header_row_index', 1)
header_index = max(0, header_row_excel - 1)
# LẤY VÀ PHÂN TÍCH TRƯỜNG UNIQUE KEY
unique_fields_config = config.get('unique_fields', 'code')
if isinstance(unique_fields_config, str):
UNIQUE_KEY_FIELDS = [unique_fields_config]
elif isinstance(unique_fields_config, list):
UNIQUE_KEY_FIELDS = unique_fields_config
else:
return Response({'error': 'Invalid format for unique_fields. Must be a string or a list of strings.'}, status=status.HTTP_400_BAD_REQUEST)
if not model_name or not mappings:
return Response({'error': 'model_name or mappings missing in configuration'}, status=status.HTTP_400_BAD_REQUEST)
try:
TargetModel = apps.get_model('app', model_name)
except LookupError:
return Response({'error': f'Model "{model_name}" not found in app'}, status=status.HTTP_400_BAD_REQUEST)
related_models_cache = {}
for mapping in mappings:
if 'foreign_key' in mapping:
fk_config = mapping['foreign_key']
related_model_name = fk_config.get('model_name')
if related_model_name:
try:
related_models_cache[related_model_name] = apps.get_model('app', related_model_name)
except LookupError:
return Response({'error': f"Related model '{related_model_name}' not found for mapping '{mapping.get('excel_column')}'"}, status=status.HTTP_400_BAD_REQUEST)
try:
file_stream = io.BytesIO(excel_file.read())
if excel_file.name.lower().endswith(('.xlsx', '.xls')):
df = pd.read_excel(file_stream, header=header_index)
else:
df = pd.read_csv(file_stream, header=header_index)
except Exception as e:
return Response({'error': f'Error reading file: {str(e)}'}, status=status.HTTP_400_BAD_REQUEST)
cleaned_columns = []
for col in df.columns:
col_str = str(col).strip()
col_str = col_str.replace('\n', ' ').strip()
col_str = re.sub(r'\s*\([^)]*\)', '', col_str).strip()
col_str = ' '.join(col_str.split())
cleaned_columns.append(col_str)
df.columns = cleaned_columns
df.reset_index(drop=True, inplace=True)
# Caching Foreign Key objects
related_obj_cache = {}
for related_name, RelatedModel in related_models_cache.items():
lookup_field = next((m['foreign_key']['lookup_field'] for m in mappings if 'foreign_key' in m and m['foreign_key']['model_name'] == related_name), None)
if lookup_field:
try:
related_obj_cache[related_name] = {
str(getattr(obj, lookup_field)).strip().lower(): obj
for obj in RelatedModel.objects.all()
}
if 'pk' not in related_obj_cache[related_name]:
related_obj_cache[related_name].update({
str(obj.pk): obj for obj in RelatedModel.objects.all()
})
except Exception as e:
return Response({'error': f"Error caching related model {related_name}: {e}"}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
objects_to_create = []
errors = []
for index, row in df.iterrows():
instance_data = {}
row_errors = []
is_valid_for_db = True
for mapping in mappings:
excel_column = mapping.get('excel_column')
model_field = mapping.get('model_field')
default_value = mapping.get('default_value')
excel_value = None
is_static_default = False
# 1. XÁC ĐỊNH NGUỒN GIÁ TRỊ (STATIC DEFAULT HOẶC EXCEL)
if not excel_column and default_value is not None:
# Trường hợp 1: Không có cột Excel, luôn dùng giá trị mặc định tĩnh
excel_value = default_value
is_static_default = True
elif excel_column and excel_column in row:
# Trường hợp 2: Có cột Excel
excel_value = row[excel_column]
is_static_default = False
# === BỔ SUNG: KIỂM TRA VÀ SỬ DỤNG default_value NẾU CELL RỖNG ===
# Nếu giá trị từ Excel rỗng VÀ có default_value được cung cấp trong mapping
if (pd.isna(excel_value) or (isinstance(excel_value, str) and str(excel_value).strip() == '')) and default_value is not None:
excel_value = default_value
is_static_default = True # Coi như giá trị tĩnh để bypass Section 2 (kiểm tra NULL)
# === KẾT THÚC BỔ SUNG ===
elif excel_column and excel_column not in row:
row_errors.append(f"Excel column '{excel_column}' not found (Header index: {header_row_excel})")
is_valid_for_db = False
continue
elif excel_column is None and default_value is None:
continue
else:
row_errors.append(f"Invalid mapping entry: {mapping} - requires excel_column or default_value")
is_valid_for_db = False
continue
# 2. XỬ LÝ NULL/EMPTY VALUES (Chỉ khi giá trị đến từ Excel và KHÔNG phải giá trị tĩnh)
if not is_static_default and (pd.isna(excel_value) or (isinstance(excel_value, str) and str(excel_value).strip() == '')):
try:
field_obj = TargetModel._meta.get_field(model_field)
except FieldDoesNotExist:
row_errors.append(f"Model field '{model_field}' not found in model '{model_name}'")
is_valid_for_db = False
continue
# Trường cho phép NULL
if field_obj.null:
instance_data[model_field] = None
continue
# Trường có Default Value (từ Model)
elif field_obj.default is not models_fields.NOT_PROVIDED:
instance_data[model_field] = field_obj.default
continue
# Trường KHÔNG cho phép NULL (Non-nullable field)
else:
# === START: LOGIC BỔ SUNG CHO allow_empty_excel_non_nullable ===
allow_empty_non_nullable = mapping.get('allow_empty_excel_non_nullable', False)
# Chỉ áp dụng bypass nếu là CharField/TextField (có thể lưu "" để thỏa mãn NOT NULL)
if allow_empty_non_nullable and isinstance(field_obj, (CharField, TextField)):
instance_data[model_field] = ""
continue # Chấp nhận chuỗi rỗng và đi tiếp
# Nếu không được phép bypass HOẶC không phải CharField/TextField
row_errors.append(f"Non-nullable field '{model_field}' has empty value in row {index + 1}")
is_valid_for_db = False
instance_data[model_field] = "" if isinstance(field_obj, (CharField, TextField)) else None
continue
# === END: LOGIC BỔ SUNG CHO allow_empty_excel_non_nullable ===
# 3. XỬ LÝ FOREIGN KEY
if 'foreign_key' in mapping:
fk_config = mapping['foreign_key']
related_model_name = fk_config.get('model_name')
key_to_lookup = str(excel_value).strip().lower()
RelatedModelCache = related_obj_cache.get(related_model_name, {})
related_obj = RelatedModelCache.get(key_to_lookup)
# Logic dự phòng để tìm theo ID nếu là giá trị tĩnh và là số
if not related_obj and is_static_default and str(excel_value).isdigit():
related_obj = RelatedModelCache.get(str(excel_value))
if related_obj:
instance_data[model_field] = related_obj
else:
# Kiểm tra lại trường hợp giá trị lookup là rỗng/0 khi model field cho phép NULL
if (pd.isna(excel_value) or str(excel_value).strip() == '' or str(excel_value).strip() == '0') and TargetModel._meta.get_field(model_field).null:
instance_data[model_field] = None
continue
# Báo lỗi và không hợp lệ nếu không tìm thấy object
row_errors.append(f"Related object for '{model_field}' with value '{excel_value}' not found in model '{related_model_name}' (row {index + 1})")
if not TargetModel._meta.get_field(model_field).null:
is_valid_for_db = False
instance_data[model_field] = None
continue
else:
instance_data[model_field] = excel_value
if row_errors:
errors.append({'row': index + 1, 'messages': row_errors})
if is_valid_for_db:
try:
objects_to_create.append(TargetModel(**instance_data))
except Exception as e:
errors.append({'row': index + 1, 'messages': [f"Critical error creating model instance: {str(e)}"]})
successful_row_count = len(objects_to_create)
try:
with transaction.atomic():
# === LOGIC XỬ LÝ CÁC CHẾ ĐỘ NHẬP DỮ LIỆU ===
if import_mode == 'overwrite':
TargetModel.objects.all().delete()
TargetModel.objects.bulk_create(objects_to_create)
message = f'{successful_row_count} records imported successfully after full **overwrite**.'
elif import_mode == 'upsert':
for field in UNIQUE_KEY_FIELDS:
try:
TargetModel._meta.get_field(field)
except FieldDoesNotExist:
return Response({'error': f"Unique field '{field}' not found in model '{model_name}'. Cannot perform upsert."}, status=status.HTTP_400_BAD_REQUEST)
existing_objects_query = TargetModel.objects.only('pk', *UNIQUE_KEY_FIELDS)
existing_map = {}
for obj in existing_objects_query:
key_tuple = tuple(getattr(obj, field) for field in UNIQUE_KEY_FIELDS)
existing_map[key_tuple] = obj
to_update = []
to_insert = []
for new_instance in objects_to_create:
try:
lookup_key = tuple(getattr(new_instance, field) for field in UNIQUE_KEY_FIELDS)
except AttributeError:
continue
if lookup_key in existing_map:
new_instance.pk = existing_map[lookup_key].pk
to_update.append(new_instance)
else:
to_insert.append(new_instance)
update_fields = [
m['model_field']
for m in mappings
if m['model_field'] not in ['pk'] and m['model_field'] not in UNIQUE_KEY_FIELDS
]
TargetModel.objects.bulk_update(to_update, update_fields)
TargetModel.objects.bulk_create(to_insert)
message = f'{len(to_insert)} records inserted, {len(to_update)} records updated successfully (Upsert mode).'
elif import_mode == 'insert_only':
TargetModel.objects.bulk_create(objects_to_create)
message = f'{successful_row_count} records imported successfully (Insert Only mode).'
else:
return Response({'error': f"Invalid import_mode specified: {import_mode}"}, status=status.HTTP_400_BAD_REQUEST)
except Exception as e:
return Response({'error': f'Database error during bulk operation (Rollback occurred): {str(e)}', 'rows_attempted': successful_row_count}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
if errors:
return Response({'status': 'partial_success', 'message': f'{message} Invalid rows were skipped.', 'errors': errors}, status=status.HTTP_207_MULTI_STATUS)
return Response({'status': 'success', 'message': message}, status=status.HTTP_201_CREATED)
#=============================================================================
executor = ThreadPoolExecutor(max_workers=10)
def background_generate(doc_code, context_pks, output_filename, uid):
start_time = datetime.now()
cache.set(f'doc_log_{uid}', {'status': 'executing', 'start_time': start_time.isoformat()}, timeout=3600) # Expire sau 1h
try:
generator = DocumentGenerator(document_code=doc_code, context_pks=context_pks)
result = generator.generate(signature_info=None, output_filename=output_filename)
cache.set(f'doc_log_{uid}', {'status': 'done', 'result': result, 'duration': int((datetime.now() - start_time).total_seconds())})
except Exception as e:
cache.set(f'doc_log_{uid}', {'status': 'error', 'error': str(e)})
@api_view(["GET"])
def generate_document(request):
doc_code = request.query_params.get("doc_code")
if not doc_code:
return Response({"error": "Tham số 'doc_code' bắt buộc."}, status=status.HTTP_400_BAD_REQUEST)
custom_filename = request.query_params.get("output_filename")
context_pks = {k: v for k, v in request.query_params.items() if k not in ['doc_code', 'output_filename']}
uid = str(uuid.uuid4().hex)
executor.submit(background_generate, doc_code, context_pks, custom_filename, uid)
time.sleep(1)
max_attempts = 60
count = 0
while count < max_attempts:
log = cache.get(f'doc_log_{uid}')
if log and log['status'] in ["done", "error"]:
if log['status'] == "error":
return Response({"error": log['error']}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
return Response(log['result'], status=status.HTTP_200_OK)
time.sleep(5)
count += 1
return Response({"error": "Timeout chờ generate."}, status=status.HTTP_408_REQUEST_TIMEOUT)
#=============================================================================
# EMAIL TEMPLATE PREVIEW
#=============================================================================
class EmailTemplatePreview:
"""
Class để preview nội dung email template với dữ liệu đã được map
Không gửi email, chỉ trả về nội dung đã render
"""
def __init__(self, template, context_pks: dict):
self.template = template
self.context_pks = context_pks
self.config = self.template.content
self.data_context = {}
self.replacements = {}
def _get_model(self, model_string):
"""Lấy model class từ string 'app.Model'"""
app_label, model_name = model_string.split(".")
return apps.get_model(app_label, model_name)
def _get_value_from_object(self, obj, field_path):
"""Lấy giá trị từ object theo field path (hỗ trợ nested: 'user.profile.name')"""
if obj is None:
return None
value = obj
for part in field_path.replace("__", ".").split("."):
if value is None:
return None
value = getattr(value, part, None)
return value
def _resolve_lookup_value(self, lookup_from):
"""Resolve giá trị lookup từ context_pks hoặc data_context"""
if lookup_from in self.context_pks:
return self.context_pks[lookup_from]
try:
alias, field_path = lookup_from.split(".", 1)
if alias not in self.data_context:
raise ValueError(f"Alias '{alias}' not found in data context.")
source_object = self.data_context.get(alias)
return self._get_value_from_object(source_object, field_path)
except ValueError:
raise ValueError(f"Could not resolve '{lookup_from}'.")
def fetch_data(self):
"""Fetch data từ database theo mappings config"""
mappings = self.config.get("mappings", [])
if not isinstance(mappings, list):
raise TypeError("Email template 'mappings' must be a list.")
# Process trigger object first
trigger_model_mapping = next((m for m in mappings if m.get("is_trigger_object", False)), None)
if trigger_model_mapping:
model_cls = self._get_model(trigger_model_mapping["model"])
lookup_field = trigger_model_mapping["lookup_field"]
lookup_value = self._resolve_lookup_value(trigger_model_mapping["lookup_value_from"])
alias = trigger_model_mapping["alias"]
if lookup_value is not None:
self.data_context[alias] = model_cls.objects.filter(**{lookup_field: lookup_value}).first()
else:
self.data_context[alias] = None
# Process other mappings
for mapping in mappings:
if mapping.get("is_trigger_object", False):
continue
model_cls = self._get_model(mapping["model"])
lookup_field = mapping["lookup_field"]
lookup_value = self._resolve_lookup_value(mapping["lookup_value_from"])
alias = mapping["alias"]
if lookup_value is None:
self.data_context[alias] = None if mapping.get("type") == "object" else []
continue
queryset = model_cls.objects.filter(**{lookup_field: lookup_value})
if mapping.get("type") == "object":
self.data_context[alias] = queryset.first()
elif mapping.get("type") == "list":
self.data_context[alias] = list(queryset)
def _format_value(self, value, format_config):
"""Format value theo config (currency, date, number_to_words, conditional)"""
if value is None:
return ""
format_type = format_config.get("type")
if not format_type:
return str(value)
try:
if format_type == "currency":
return "{:,}".format(np.int64(value)).replace(",", ".")
if format_type == "date":
date_format = format_config.get("format", "dd/mm/YYYY").replace("dd", "%d").replace("mm", "%m").replace("YYYY", "%Y")
return value.strftime(date_format)
if format_type == "number_to_words":
return num2words(value, lang=format_config.get("lang", "vi"))
if format_type == "conditional":
return format_config["true_value"] if value else format_config["false_value"]
except Exception as e:
print(f"Error formatting value '{value}' with config '{format_config}': {e}")
return ""
return str(value)
def prepare_replacements(self):
"""Chuẩn bị dict replacements cho placeholders"""
# Add date placeholders
today = datetime.now()
self.replacements['[day]'] = str(today.day)
self.replacements['[month]'] = str(today.month)
self.replacements['[year]'] = str(today.year)
self.replacements['[date]'] = today.strftime("%d/%m/%Y")
# Process field mappings
mappings = self.config.get("mappings", [])
for mapping in mappings:
alias = mapping["alias"]
data = self.data_context.get(alias)
fields = mapping.get("fields", {})
if mapping.get("type") == "object":
if data is None:
for placeholder in fields:
self.replacements[placeholder] = ""
continue
for placeholder, config in fields.items():
if isinstance(config, dict):
value = self._get_value_from_object(data, config["source"])
self.replacements[placeholder] = self._format_value(value, config.get("format", {}))
else:
value = self._get_value_from_object(data, config)
self.replacements[placeholder] = str(value) if value is not None else ""
def get_preview(self):
"""
Main method để lấy preview email content - trả về nội dung y nguyên đã thay thế placeholders
Returns:
dict: {
'subject': subject đã thay thế placeholders,
'content': body content đã thay thế placeholders (giữ nguyên format, căn lề),
'recipient_email': email người nhận,
'replacements': dict của tất cả replacements được áp dụng
}
"""
try:
print(f"Generating preview for template: {self.template.name}")
# Fetch data and prepare replacements
self.fetch_data()
self.prepare_replacements()
# Get templates from config
subject_template = self.config.get("subject", "")
body_template = self.config.get("content", "")
recipient_placeholder = self.config.get("recipient_placeholder", "[customer.email]")
# Apply replacements - giữ nguyên format, căn lề của template gốc
final_subject = subject_template
final_content = body_template
for key, value in self.replacements.items():
final_subject = final_subject.replace(key, str(value))
final_content = final_content.replace(key, str(value))
recipient_email = self.replacements.get(recipient_placeholder, "")
result = {
'subject': final_subject,
'content': final_content, # Nội dung y nguyên đã thay thế
'recipient_email': recipient_email,
'replacements': self.replacements.copy()
}
print(f"Preview generated successfully for '{self.template.name}'")
return result
except Exception as e:
print(f"Error generating preview for template '{self.template.name}': {e}")
import traceback
traceback.print_exc()
return None
@api_view(['POST'])
def preview_email_template(request):
"""
API để preview email template - trả về nội dung đã thay thế placeholders
POST /api/email/preview/
Body: {
"template_id": 1,
"context_pks": {
"contract_id": 456,
"customer_id": 789
}
}
Response: {
"subject": "Thông báo hợp đồng #HD-001",
"content": "<div>Nội dung y nguyên đã thay thế...</div>",
"recipient_email": "customer@example.com",
"replacements": {
"[contract.code]": "HD-001",
"[customer.name]": "Nguyễn Văn A",
...
}
}
"""
try:
# Validate input
template_id = request.data.get('template_id')
if not template_id:
return Response(
{'error': 'template_id is required'},
status=status.HTTP_400_BAD_REQUEST
)
context_pks = request.data.get('context_pks', {})
if not isinstance(context_pks, dict):
return Response(
{'error': 'context_pks must be a dictionary'},
status=status.HTTP_400_BAD_REQUEST
)
# Get template
try:
template = Email_Template.objects.get(pk=template_id)
except Email_Template.DoesNotExist:
return Response(
{'error': f'Email_Template with id={template_id} does not exist'},
status=status.HTTP_404_NOT_FOUND
)
# Generate preview
previewer = EmailTemplatePreview(template, context_pks)
preview = previewer.get_preview()
if preview:
return Response(preview, status=status.HTTP_200_OK)
else:
return Response(
{'error': 'Failed to generate preview'},
status=status.HTTP_500_INTERNAL_SERVER_ERROR
)
except Exception as e:
import traceback
traceback.print_exc()
return Response(
{'error': f'Unexpected error: {str(e)}'},
status=status.HTTP_500_INTERNAL_SERVER_ERROR
)