import json from channels.generic.websocket import AsyncJsonWebsocketConsumer from channels.db import database_sync_to_async from django.core.serializers.json import DjangoJSONEncoder from .basic import execute_data_query class DataConsumer(AsyncJsonWebsocketConsumer): async def encode_json(self, content): """ Encode the given content as JSON, using Django's encoder to handle dates, decimals, etc. """ return json.dumps(content, cls=DjangoJSONEncoder) async def connect(self): self.subscribed_groups = set() self.subscription_params = {} await self.accept() async def disconnect(self, close_code): for group_name in self.subscribed_groups: await self.channel_layer.group_discard(group_name, self.channel_name) async def receive_json(self, content): action = content.get("action") if action == "subscribe": await self.handle_subscribe(content.get("payload", {}), content.get("request_id")) async def handle_subscribe(self, payload, request_id): model_name = payload.get("name") params = payload.get("params", {}) if not model_name: await self.send_json({"type": "error", "request_id": request_id, "message": "Model name is required."}) return # Store subscription params for this client self.subscription_params[model_name] = params # Run the initial data query data = await database_sync_to_async(execute_data_query)(model_name, params) # Send the initial result back to the client await self.send_json({ "type": "subscription_response", "request_id": request_id, "data": data }) # Join the group using a lowercase model name to match the signal group_name = f"model_{model_name.lower()}_updates" if group_name not in self.subscribed_groups: await self.channel_layer.group_add(group_name, self.channel_name) self.subscribed_groups.add(group_name) async def realtime_update(self, event): # Bước gỡ lỗi cuối cùng: xác nhận phương thức này được gọi và bắt bất kỳ lỗi nào print(f"--- CONSUMER: realtime_update invoked for event name: {event['payload']['name']} ---") try: # Move imports inside the method to prevent AppRegistryNotReady error on startup from django.db.models import Q from .views import get_serializer payload = event["payload"] change_type = payload.get("change_type") record_id = payload["record"].get("id") if not record_id or not change_type: return model_name_lower = payload["name"] # Chuyển đổi snake_case (từ signal) thành PascalCase với dấu gạch dưới (được sử dụng trong subscription) # Ví dụ: 'product_note' -> 'Product_Note' model_name_pascal_case = '_'.join(word.capitalize() for word in model_name_lower.split('_')) # 1. Lấy các tham số đăng ký của client này cho model cụ thể client_params = self.subscription_params.get(model_name_pascal_case) if not client_params: return # Client này không đăng ký model này. # 2. Xử lý sự kiện DELETION if change_type == 'deleted': await self.send_json({ "type": "realtime_update", "payload": { "name": model_name_lower, "change_type": "deleted", "record": {"id": record_id} } }) return # 3. Xử lý sự kiện CREATION và UPDATE Model, _ = get_serializer(model_name_lower) if not Model: return # Xây dựng một đối tượng Q từ từ điển bộ lọc của client filter_q = Q() filter_dict = client_params.get('filter') if isinstance(filter_dict, dict): filter_q = Q(**filter_dict) # Kiểm tra xem đối tượng có thực sự tồn tại với bộ lọc kết hợp không record_exists = await database_sync_to_async( Model.objects.filter(Q(pk=record_id) & filter_q).exists )() if record_exists: # Nếu bản ghi khớp với bộ lọc, chạy lại truy vấn để lấy dữ liệu có định dạng đúng single_record_params = client_params.copy() single_record_params['filter'] = {'pk': record_id} data = await database_sync_to_async(execute_data_query)(model_name_pascal_case, single_record_params) if data and data.get('rows'): payload_for_client = { "name": model_name_lower, "change_type": change_type, "record": data['rows'][0] } await self.send_json({ "type": "realtime_update", "payload": payload_for_client }) else: # Nếu bản ghi tồn tại nhưng không còn khớp với bộ lọc, coi như là một thao tác xóa đối với client này await self.send_json({ "type": "realtime_update", "payload": { "name": model_name_lower, "change_type": "deleted", "record": {"id": record_id} } }) except Exception as e: import traceback print(f"!!!!!!!!!! EXCEPTION in realtime_update: {e} !!!!!!!!!!!") traceback.print_exc()