Files
api/app/basic.py
anhduy-tech bfbe0a4061 changes
2025-12-31 09:22:13 +07:00

176 lines
4.9 KiB
Python
Executable File

from datetime import datetime
import csv
def empty(value):
return True if value == '' or value == "" or value == None else False
def isnumber(value):
if empty(value) == True:
return False
try:
float(value)
return True
except:
return False
def isdate(value):
try:
datetime.strptime(value[0:10], '%Y-%m-%d')
return True
except Exception as e:
print(e)
return False
def formatdate(value):
try:
dt = datetime.strptime(value[0:10], '%Y-%m-%d')
return str(dt.date())
except:
return None
def getattr(obj, attr):
return obj[attr] if attr in obj else None
def isvalid(ele, obj):
valid = True
for key, value in obj.items():
if getattr(ele, key) != value:
valid = False
return valid
def find(arr, obj, attr=None):
for ele in arr:
if isvalid(ele, obj) == True:
return ele
def find_index(arr, obj, attr=None):
count = 0
for ele in arr:
if isvalid(ele, obj) == True:
return count
count += 1
def filter(arr, obj, attr=None):
result = []
for ele in arr:
if isvalid(ele, obj) == True:
result.append(ele)
return result
def write_csv(file, data, header):
with open(file, 'w', encoding='UTF8') as f:
print('called')
writer = csv.writer(f)
writer.writerow(header)
for row in data:
arr = [row[column] for column in header]
writer.writerow(arr)
f.close()
def execute_data_query(name, params):
"""
Hàm này thực thi một query dữ liệu dựa trên tên model và các tham số.
Nó tái sử dụng logic từ API view.
"""
from app.views import get_serializer, base_query, final_result, get_limit_rows
from django.db.models import Q, F
import ast
Model, serializer_class = get_serializer(name)
if Model is None:
return None
def parse_param_to_dict(param):
if isinstance(param, dict):
return param
if isinstance(param, str):
try:
return ast.literal_eval(param)
except (ValueError, SyntaxError):
return {}
return {}
# Lấy các tham số từ dict `params`
filter_param = params.get('filter')
values = params.get('values')
values = values if values==None else values.split(',')
summary = params.get('summary')
page = int(params.get('page', 1))
onpage = int(params.get('perpage')) if params.get('perpage') != None else None
sort = params.get('sort')
sort = None if sort==None else sort.split(',')
distinct_values = params.get('distinct_values')
filter_or_param = params.get('filter_or')
exclude_param = params.get('exclude')
calculation = params.get('calculation')
final_filter_param = params.get('final_filter')
final_exclude_param = params.get('final_exclude')
# Xây dựng filter_list
filter_list = Q()
filter_or_dict = parse_param_to_dict(filter_or_param)
if filter_or_dict:
for key, value in filter_or_dict.items():
filter_list.add(Q(**{key: value}), Q.OR)
filter_dict = parse_param_to_dict(filter_param)
if filter_dict:
for key, value in filter_dict.items():
if isinstance(value, dict) and value.get('type') == 'F':
filter_list.add(Q(**{key: F(value['field'])}), Q.AND)
else:
filter_list.add(Q(**{key: value}), Q.AND)
# Thực thi query
rows = Model.objects.all() if len(filter_list) == 0 else Model.objects.filter(filter_list)
exclude_dict = parse_param_to_dict(exclude_param)
if exclude_dict:
exclude_list = Q()
for key, value in exclude_dict.items():
if isinstance(value, dict) and value.get('type') == 'F':
exclude_list.add(Q(**{key: F(value['field'])}), Q.AND)
else:
exclude_list.add(Q(**{key: value}), Q.AND)
rows = rows.exclude(exclude_list)
rows, need_serializer = base_query(rows, values, summary, distinct_values)
# We need to parse final_filter and final_exclude here as they are applied in final_result
final_filter_dict = parse_param_to_dict(final_filter_param)
final_exclude_dict = parse_param_to_dict(final_exclude_param)
rows = final_result(rows, calculation, final_filter_dict, final_exclude_dict, sort)
# Initialize total_rows and full_data
total_rows = 0
full_data = False
if summary == 'count':
total_rows = rows
full_data = True
rows = total_rows
elif summary == 'aggregate':
total_rows = 1
full_data = True
else:
total_rows, full_data, rows = get_limit_rows(rows, page, onpage)
if need_serializer == True:
rows = serializer_class(rows, many=True).data
else:
import json
from django.core.serializers.json import DjangoJSONEncoder
rows = json.loads(json.dumps(list(rows), cls=DjangoJSONEncoder))
return {'total_rows': total_rows, 'full_data': full_data, 'rows': rows}