from datetime import datetime import csv def empty(value): return True if value == '' or value == "" or value == None else False def isnumber(value): if empty(value) == True: return False try: float(value) return True except: return False def isdate(value): try: datetime.strptime(value[0:10], '%Y-%m-%d') return True except Exception as e: print(e) return False def formatdate(value): try: dt = datetime.strptime(value[0:10], '%Y-%m-%d') return str(dt.date()) except: return None def getattr(obj, attr): return obj[attr] if attr in obj else None def isvalid(ele, obj): valid = True for key, value in obj.items(): if getattr(ele, key) != value: valid = False return valid def find(arr, obj, attr=None): for ele in arr: if isvalid(ele, obj) == True: return ele def find_index(arr, obj, attr=None): count = 0 for ele in arr: if isvalid(ele, obj) == True: return count count += 1 def filter(arr, obj, attr=None): result = [] for ele in arr: if isvalid(ele, obj) == True: result.append(ele) return result def write_csv(file, data, header): with open(file, 'w', encoding='UTF8') as f: print('called') writer = csv.writer(f) writer.writerow(header) for row in data: arr = [row[column] for column in header] writer.writerow(arr) f.close() def execute_data_query(name, params): """ Hàm này thực thi một query dữ liệu dựa trên tên model và các tham số. Nó tái sử dụng logic từ API view. """ from app.views import get_serializer, base_query, final_result, get_limit_rows from django.db.models import Q, F import ast Model, serializer_class = get_serializer(name) if Model is None: return None def parse_param_to_dict(param): if isinstance(param, dict): return param if isinstance(param, str): try: return ast.literal_eval(param) except (ValueError, SyntaxError): return {} return {} # Lấy các tham số từ dict `params` filter_param = params.get('filter') values = params.get('values') values = values if values==None else values.split(',') summary = params.get('summary') page = int(params.get('page', 1)) onpage = int(params.get('perpage')) if params.get('perpage') != None else None sort = params.get('sort') sort = None if sort==None else sort.split(',') distinct_values = params.get('distinct_values') filter_or_param = params.get('filter_or') exclude_param = params.get('exclude') calculation = params.get('calculation') final_filter_param = params.get('final_filter') final_exclude_param = params.get('final_exclude') # Xây dựng filter_list filter_list = Q() filter_or_dict = parse_param_to_dict(filter_or_param) if filter_or_dict: for key, value in filter_or_dict.items(): filter_list.add(Q(**{key: value}), Q.OR) filter_dict = parse_param_to_dict(filter_param) if filter_dict: for key, value in filter_dict.items(): if isinstance(value, dict) and value.get('type') == 'F': filter_list.add(Q(**{key: F(value['field'])}), Q.AND) else: filter_list.add(Q(**{key: value}), Q.AND) # Thực thi query rows = Model.objects.all() if len(filter_list) == 0 else Model.objects.filter(filter_list) exclude_dict = parse_param_to_dict(exclude_param) if exclude_dict: exclude_list = Q() for key, value in exclude_dict.items(): if isinstance(value, dict) and value.get('type') == 'F': exclude_list.add(Q(**{key: F(value['field'])}), Q.AND) else: exclude_list.add(Q(**{key: value}), Q.AND) rows = rows.exclude(exclude_list) rows, need_serializer = base_query(rows, values, summary, distinct_values) # We need to parse final_filter and final_exclude here as they are applied in final_result final_filter_dict = parse_param_to_dict(final_filter_param) final_exclude_dict = parse_param_to_dict(final_exclude_param) rows = final_result(rows, calculation, final_filter_dict, final_exclude_dict, sort) # Initialize total_rows and full_data total_rows = 0 full_data = False if summary == 'count': total_rows = rows full_data = True rows = total_rows elif summary == 'aggregate': total_rows = 1 full_data = True else: total_rows, full_data, rows = get_limit_rows(rows, page, onpage) if need_serializer == True: rows = serializer_class(rows, many=True).data else: import json from django.core.serializers.json import DjangoJSONEncoder rows = json.loads(json.dumps(list(rows), cls=DjangoJSONEncoder)) return {'total_rows': total_rows, 'full_data': full_data, 'rows': rows}