import re import math from datetime import datetime, date, timedelta from decimal import Decimal from django.db import models from django.apps import apps # ============================================= # CORE RESOLVER # ============================================= def resolve_value(expr, context): """ Universal expression resolver with support for: - Literals (int, float, bool, string) - Template strings: {key}, "text {key} text" - Dotted paths: customer.address.city - Math functions: $add, $sub, $multiply, $divide, $mod, $power, $round, $abs, $min, $max - Date functions: $now, $today, $date_diff, $date_add, $date_format, $date_parse - String functions: $concat, $upper, $lower, $trim, $replace, $substring, $split, $length - Logic functions: $if, $switch, $and, $or, $not - List functions: $append, $agg, $filter, $map, $first, $last, $count, $sum - Lookup functions: $vlookup, $lookup, $get - Nested functions support """ if expr is None: return None # Direct literal types if isinstance(expr, (int, float, bool, Decimal)): return expr if not isinstance(expr, str): return expr expr = expr.strip() # ============================================= # 1. SYSTEM VARIABLES # ============================================= if expr == "$now": return context.get("now", datetime.now()) if expr == "$today": if "today" in context: return context["today"] now_in_context = context.get("now") if isinstance(now_in_context, datetime): return now_in_context.date() return date.today() if expr == "$now_iso": return datetime.now().isoformat(timespec='seconds') if expr == "$timestamp": return int(datetime.now().timestamp()) # ============================================= # 2. MATH FUNCTIONS (Support Nested) # ============================================= math_functions = { 'add': lambda a, b: a + b, 'sub': lambda a, b: a - b, 'subtract': lambda a, b: a - b, 'multiply': lambda a, b: a * b, 'mul': lambda a, b: a * b, 'divide': lambda a, b: a / b if b != 0 else 0, 'div': lambda a, b: a / b if b != 0 else 0, 'mod': lambda a, b: a % b if b != 0 else 0, 'power': lambda a, b: a ** b, 'pow': lambda a, b: a ** b, } for func_name, func in math_functions.items(): pattern = rf'^\${func_name}\((.*)\)$' match = re.match(pattern, expr, re.IGNORECASE) if match: args = split_args(match.group(1)) if len(args) == 2: a = to_number(resolve_value(args[0], context)) b = to_number(resolve_value(args[1], context)) return func(a, b) # Single-argument math functions single_math = { 'round': lambda x, d=0: round(x, int(d)), 'abs': lambda x: abs(x), 'ceil': lambda x: math.ceil(x), 'floor': lambda x: math.floor(x), 'sqrt': lambda x: math.sqrt(x) if x >= 0 else 0, } for func_name, func in single_math.items(): pattern = rf'^\${func_name}\((.*)\)$' match = re.match(pattern, expr, re.IGNORECASE) if match: args = split_args(match.group(1)) if len(args) >= 1: val = to_number(resolve_value(args[0], context)) if len(args) == 2 and func_name == 'round': decimals = to_number(resolve_value(args[1], context)) return func(val, decimals) return func(val) # Multi-argument math if re.match(r'^\$(min|max)\(', expr, re.IGNORECASE): match = re.match(r'^\$(min|max)\((.*)\)$', expr, re.IGNORECASE) if match: func_name = match.group(1).lower() args = split_args(match.group(2)) values = [to_number(resolve_value(arg, context)) for arg in args] return min(values) if func_name == 'min' else max(values) # ============================================= # 3. DATE FUNCTIONS # ============================================= # $date_diff(date1, date2, unit?) if re.match(r'^\$date_diff\(', expr, re.IGNORECASE): match = re.match(r'^\$date_diff\((.*)\)$', expr, re.IGNORECASE) if match: args = split_args(match.group(1)) if len(args) >= 2: raw_d1 = resolve_value(args[0], context) raw_d2 = resolve_value(args[1], context) d1 = to_date(raw_d1) d2 = to_date(raw_d2) unit = resolve_value(args[2], context).lower() if len(args) > 2 else 'days' #print(f"[DEBUG date_diff] raw_d1: {raw_d1}, raw_d2: {raw_d2}") # DEBUG #print(f"[DEBUG date_diff] d1 (datetime): {d1}, d2 (datetime): {d2}") # DEBUG #print(f"[DEBUG date_diff] unit: {unit}") # DEBUG if not (d1 and d2): #print("[DEBUG date_diff] One or both dates are invalid. Returning 0.") # DEBUG return 0 # Ensure we are comparing date objects, ignoring time d1_date_only = d1.date() d2_date_only = d2.date() #print(f"[DEBUG date_diff] d1_date_only: {d1_date_only}, d2_date_only: {d2_date_only}") # DEBUG if unit == 'days': delta_days = (d1_date_only - d2_date_only).days #print(f"[DEBUG date_diff] Calculated delta_days: {delta_days}. Returning {delta_days}.") # DEBUG return delta_days elif unit == 'months': delta_months = (d1_date_only.year - d2_date_only.year) * 12 + d1_date_only.month - d2_date_only.month #print(f"[DEBUG date_diff] Calculated delta_months: {delta_months}. Returning {delta_months}.") # DEBUG return delta_months elif unit == 'years': delta_years = d1_date_only.year - d2_date_only.year #print(f"[DEBUG date_diff] Calculated delta_years: {delta_years}. Returning {delta_years}.") # DEBUG return delta_years #print(f"[DEBUG date_diff] Unit '{unit}' not recognized. Returning 0.") # DEBUG return 0 # $date_add(date, amount, unit?) if re.match(r'^\$date_add\(', expr, re.IGNORECASE): match = re.match(r'^\$date_add\((.*)\)$', expr, re.IGNORECASE) if match: args = split_args(match.group(1)) if len(args) >= 2: base_date = to_date(resolve_value(args[0], context)) amount = to_number(resolve_value(args[1], context)) unit = resolve_value(args[2], context).lower() if len(args) > 2 else 'days' if base_date: # Ensure base_date is datetime if isinstance(base_date, date) and not isinstance(base_date, datetime): base_date = datetime.combine(base_date, datetime.min.time()) if unit == 'days': result = base_date + timedelta(days=int(amount)) elif unit == 'months': month = base_date.month + int(amount) year = base_date.year + (month - 1) // 12 month = ((month - 1) % 12) + 1 result = base_date.replace(year=year, month=month) elif unit == 'years': result = base_date.replace(year=base_date.year + int(amount)) elif unit == 'hours': result = base_date + timedelta(hours=int(amount)) else: result = base_date + timedelta(days=int(amount)) return result.isoformat() if isinstance(result, datetime) else result.strftime("%Y-%m-%d") # $date_format(date, format) if re.match(r'^\$date_format\(', expr, re.IGNORECASE): match = re.match(r'^\$date_format\((.*)\)$', expr, re.IGNORECASE) if match: args = split_args(match.group(1)) if len(args) == 2: dt = to_date(resolve_value(args[0], context)) fmt = resolve_value(args[1], context).strip('\'"') if dt: return dt.strftime(fmt) # $date_parse(string, format) if re.match(r'^\$date_parse\(', expr, re.IGNORECASE): match = re.match(r'^\$date_parse\((.*)\)$', expr, re.IGNORECASE) if match: args = split_args(match.group(1)) if len(args) >= 1: date_str = str(resolve_value(args[0], context)) fmt = resolve_value(args[1], context).strip('\'"') if len(args) > 1 else "%Y-%m-%d" try: return datetime.strptime(date_str, fmt).strftime("%Y-%m-%d") except: return None # ============================================= # 4. STRING FUNCTIONS # ============================================= # $concat(str1, str2, ...) if re.match(r'^\$concat\(', expr, re.IGNORECASE): match = re.match(r'^\$concat\((.*)\)$', expr, re.IGNORECASE) if match: args = split_args(match.group(1)) return ''.join(str(resolve_value(arg, context) or '') for arg in args) # $upper, $lower, $trim string_single = { 'upper': lambda s: str(s).upper(), 'lower': lambda s: str(s).lower(), 'trim': lambda s: str(s).strip(), 'length': lambda s: len(str(s)), } for func_name, func in string_single.items(): pattern = rf'^\${func_name}\((.*)\)$' match = re.match(pattern, expr, re.IGNORECASE) if match: arg = resolve_value(match.group(1).strip(), context) return func(arg) # $replace(text, old, new) if re.match(r'^\$replace\(', expr, re.IGNORECASE): match = re.match(r'^\$replace\((.*)\)$', expr, re.IGNORECASE) if match: args = split_args(match.group(1)) if len(args) == 3: text = str(resolve_value(args[0], context)) old = str(resolve_value(args[1], context)).strip('\'"') new = str(resolve_value(args[2], context)).strip('\'"') return text.replace(old, new) # $substring(text, start, length?) if re.match(r'^\$substring\(', expr, re.IGNORECASE): match = re.match(r'^\$substring\((.*)\)$', expr, re.IGNORECASE) if match: args = split_args(match.group(1)) if len(args) >= 2: text = str(resolve_value(args[0], context)) start = int(to_number(resolve_value(args[1], context))) length = int(to_number(resolve_value(args[2], context))) if len(args) > 2 else None return text[start:start+length] if length else text[start:] # $split(text, delimiter) if re.match(r'^\$split\(', expr, re.IGNORECASE): match = re.match(r'^\$split\((.*)\)$', expr, re.IGNORECASE) if match: args = split_args(match.group(1)) if len(args) == 2: text = str(resolve_value(args[0], context)) delimiter = str(resolve_value(args[1], context)).strip('\'"') return text.split(delimiter) # ============================================= # 5. LOGIC FUNCTIONS # ============================================= # $if(condition, true_value, false_value) if re.match(r'^\$if\(', expr, re.IGNORECASE): match = re.match(r'^\$if\((.*)\)$', expr, re.IGNORECASE) if match: args = split_args(match.group(1)) if len(args) == 3: condition = resolve_value(args[0], context) return resolve_value(args[1], context) if condition else resolve_value(args[2], context) # $switch(value, case1, result1, case2, result2, ..., default) if re.match(r'^\$switch\(', expr, re.IGNORECASE): match = re.match(r'^\$switch\((.*)\)$', expr, re.IGNORECASE) if match: args = split_args(match.group(1)) if len(args) >= 2: value = resolve_value(args[0], context) for i in range(1, len(args) - 1, 2): if i + 1 < len(args): case = resolve_value(args[i], context) if value == case: return resolve_value(args[i + 1], context) # Default value is last arg if odd number of args if len(args) % 2 == 0: return resolve_value(args[-1], context) # $and, $or, $not if re.match(r'^\$and\(', expr, re.IGNORECASE): match = re.match(r'^\$and\((.*)\)$', expr, re.IGNORECASE) if match: args = split_args(match.group(1)) return all(resolve_value(arg, context) for arg in args) if re.match(r'^\$or\(', expr, re.IGNORECASE): match = re.match(r'^\$or\((.*)\)$', expr, re.IGNORECASE) if match: args = split_args(match.group(1)) return any(resolve_value(arg, context) for arg in args) if re.match(r'^\$not\(', expr, re.IGNORECASE): match = re.match(r'^\$not\((.*)\)$', expr, re.IGNORECASE) if match: arg = resolve_value(match.group(1).strip(), context) return not arg # ============================================= # 6. LIST/ARRAY FUNCTIONS # ============================================= # $append(list, element) if re.match(r'^\$append\(', expr, re.IGNORECASE): match = re.match(r"^\$append\(([^,]+),\s*(.+)\)$", expr, re.DOTALL) if match: list_expr = match.group(1).strip() element_expr = match.group(2).strip() # 1. Resolve the list target_list = resolve_value(list_expr, context) if target_list is None: target_list = [] # Ensure it's a copy so we don't modify the original context variable directly target_list = list(target_list) # 2. Resolve the element resolved_element = resolve_value(element_expr, context) if isinstance(resolved_element, str): try: import json element_to_append = json.loads(resolved_element) except json.JSONDecodeError: element_to_append = resolved_element else: element_to_append = resolved_element target_list.append(element_to_append) return target_list # $first(list), $last(list) if re.match(r'^\$(first|last)\(', expr, re.IGNORECASE): match = re.match(r'^\$(first|last)\((.*)\)$', expr, re.IGNORECASE) if match: func_name = match.group(1).lower() lst = resolve_value(match.group(2).strip(), context) if isinstance(lst, list) and len(lst) > 0: return lst[0] if func_name == 'first' else lst[-1] # $count(list) if re.match(r'^\$count\(', expr, re.IGNORECASE): match = re.match(r'^\$count\((.*)\)$', expr, re.IGNORECASE) if match: lst = resolve_value(match.group(1).strip(), context) return len(lst) if isinstance(lst, list) else 0 # $agg(list, operation, field?) if re.match(r'^\$agg\(', expr, re.IGNORECASE): match = re.match(r'^\$agg\(([^,]+),\s*[\'"]([^\'\"]+)[\'"](?:,\s*[\'"]?([^\'\")]+)[\'"]?)?\)$', expr) if match: list_expr = match.group(1).strip() operation = match.group(2).strip() field_expr = match.group(3).strip() if match.group(3) else None target_list = resolve_value(list_expr, context) if not isinstance(target_list, list): return 0 if operation == 'count': return len(target_list) if operation == 'sum': if not field_expr: return sum(to_number(item) for item in target_list) total = 0 for item in target_list: value = item.get(field_expr) if isinstance(item, dict) else getattr(item, field_expr, 0) total += to_number(value) return total if operation in ['min', 'max', 'avg']: values = [] for item in target_list: if field_expr: value = item.get(field_expr) if isinstance(item, dict) else getattr(item, field_expr, 0) else: value = item values.append(to_number(value)) if not values: return 0 if operation == 'min': return min(values) elif operation == 'max': return max(values) elif operation == 'avg': return sum(values) / len(values) # ============================================= # 7. LOOKUP FUNCTIONS # ============================================= # $vlookup(lookup_value, model_name, lookup_field, return_field) if re.match(r'^\$vlookup\(', expr, re.IGNORECASE): match = re.match(r'^\$vlookup\((.*)\)$', expr, re.IGNORECASE) if match: args = split_args(match.group(1)) if len(args) == 4: lookup_value = resolve_value(args[0], context) model_name = resolve_value(args[1], context).strip('\'"') lookup_field = resolve_value(args[2], context).strip('\'"') return_field = resolve_value(args[3], context).strip('\'"') try: Model = apps.get_model('app', model_name) obj = Model.objects.filter(**{lookup_field: lookup_value}).first() if obj: return getattr(obj, return_field, None) except: pass # $lookup(model_name, field, value) if re.match(r'^\$lookup\(', expr, re.IGNORECASE): match = re.match(r'^\$lookup\((.*)\)$', expr, re.IGNORECASE) if match: args = split_args(match.group(1)) if len(args) == 3: model_name = resolve_value(args[0], context).strip('\'"') field = resolve_value(args[1], context).strip('\'"') value = resolve_value(args[2], context) try: Model = apps.get_model('app', model_name) return Model.objects.filter(**{field: value}).first() except: pass # $get(dict_or_object, key, default?) if re.match(r'^\$get\(', expr, re.IGNORECASE): match = re.match(r'^\$get\((.*)\)$', expr, re.IGNORECASE) if match: args = split_args(match.group(1)) if len(args) >= 2: obj = resolve_value(args[0], context) key = resolve_value(args[1], context) default = resolve_value(args[2], context) if len(args) > 2 else None if isinstance(obj, dict): return obj.get(key, default) else: return getattr(obj, key, default) # ============================================= # 8. COMPARISON OPERATORS # ============================================= # $eq, $ne, $gt, $gte, $lt, $lte, $in, $contains comparisons = { 'eq': lambda a, b: a == b, 'ne': lambda a, b: a != b, 'gt': lambda a, b: a > b, 'gte': lambda a, b: a >= b, 'lt': lambda a, b: a < b, 'lte': lambda a, b: a <= b, 'in': lambda a, b: a in b, 'contains': lambda a, b: b in a, } for op_name, op_func in comparisons.items(): pattern = rf'^\${op_name}\(' if re.match(pattern, expr, re.IGNORECASE): match = re.match(rf'^\${op_name}\((.*)\)$', expr, re.IGNORECASE) if match: args = split_args(match.group(1)) if len(args) == 2: a = resolve_value(args[0], context) b = resolve_value(args[1], context) return op_func(a, b) # ============================================= # 9. HELPER: Get context value (dotted path) # ============================================= def get_context_value(key_path): if not key_path: return None # Check if numeric literal if re.match(r"^-?\d+(\.\d+)?$", key_path): return float(key_path) # Simple key if "." not in key_path: val = context.get(key_path) if isinstance(val, Decimal): return float(val) return val # Dotted path root, *rest = key_path.split(".") val = context.get(root) for r in rest: if val is None: return None # Array notation: field[0] array_match = re.match(r"(\w+)\[(\d+)\]", r) if array_match: attr_name = array_match.group(1) index = int(array_match.group(2)) val = getattr(val, attr_name, None) if not isinstance(val, dict) else val.get(attr_name) try: val = val[index] except: return None else: if isinstance(val, dict): val = val.get(r) else: val = getattr(val, r, None) # Auto-fetch first() for QuerySet if hasattr(val, 'all') and not isinstance(val, models.Model): val = val.first() if isinstance(val, Decimal): return float(val) return val # ============================================= # 10. TEMPLATE STRING PROCESSING # ============================================= pattern = re.compile(r"\{(\w+(\.\w+)*)\}") if pattern.search(expr): single_match = pattern.fullmatch(expr) if single_match: return get_context_value(single_match.group(1)) def replace_match(match): val = get_context_value(match.group(1)) return str(val) if val is not None else "" return pattern.sub(replace_match, expr) # ============================================= # 11. SUPPORT $last_result # ============================================= if expr.startswith("$last_result"): _, _, field = expr.partition(".") last_res = context.get("last_result") if not field: return last_res if last_res is None: return None return getattr(last_res, field, None) if not isinstance(last_res, dict) else last_res.get(field) # ============================================= # 12. DOTTED PATH OR DIRECT CONTEXT KEY # ============================================= if re.match(r"^-?\d+(\.\d+)?$", expr): return float(expr) if "." in expr or expr in context: return get_context_value(expr) return expr # ============================================= # HELPER FUNCTIONS # ============================================= def split_args(content): """ Split function arguments respecting nested parentheses and quotes. Example: "a, $add(b, c), 'd'" -> ["a", "$add(b, c)", "'d'"] """ args = [] current = [] depth = 0 in_quote = None for char in content: if char in ('"', "'") and (not in_quote or in_quote == char): in_quote = None if in_quote else char current.append(char) elif in_quote: current.append(char) elif char == '(': depth += 1 current.append(char) elif char == ')': depth -= 1 current.append(char) elif char == ',' and depth == 0: args.append(''.join(current).strip()) current = [] else: current.append(char) if current: args.append(''.join(current).strip()) return args def to_number(value, default=0): """Convert value to number, return default if fails.""" if value is None or value == '': return default try: return float(value) except (ValueError, TypeError): return default def to_date(value): """Convert value to datetime object.""" #print(f"[DEBUG to_date] Input value: {value} (type: {type(value)})") # DEBUG if isinstance(value, datetime): #print(f"[DEBUG to_date] Output (datetime): {value}") # DEBUG return value if isinstance(value, date): result = datetime.combine(value, datetime.min.time()) #print(f"[DEBUG to_date] Output (date -> datetime): {result}") # DEBUG return result if isinstance(value, str): for fmt in ("%Y-%m-%d %H:%M:%S", "%Y-%m-%d", "%d/%m/%Y", "%Y-%m-%dT%H:%M:%S"): try: result = datetime.strptime(value.split('.')[0], fmt) #print(f"[DEBUG to_date] Output (str -> datetime): {result}") # DEBUG return result except: continue #print(f"[DEBUG to_date] Output (None): None") # DEBUG return None