This commit is contained in:
anhduy-tech
2025-12-31 09:22:13 +07:00
parent ef48c93de0
commit bfbe0a4061
9 changed files with 83 additions and 60 deletions

View File

@@ -14,7 +14,7 @@ class DataConsumer(AsyncJsonWebsocketConsumer):
async def connect(self):
self.subscribed_groups = set()
self.subscription_params = {} # e.g., {'Product': {'filter': '...', 'values': '...'}}
self.subscription_params = {}
await self.accept()
async def disconnect(self, close_code):
@@ -55,12 +55,14 @@ class DataConsumer(AsyncJsonWebsocketConsumer):
async def realtime_update(self, event):
# Move imports inside the method to prevent AppRegistryNotReady error on startup
import ast
from django.db.models import Q
from .views import get_serializer
payload = event["payload"]
record = payload["record"]
record_id = payload["record"].get("id")
if not record_id:
return
model_name_lower = payload["name"]
model_name_capitalized = model_name_lower.capitalize()
@@ -69,38 +71,44 @@ class DataConsumer(AsyncJsonWebsocketConsumer):
if not client_params:
return # This client is not subscribed to this model.
# 2. Check if the updated record matches the client's filter
filter_str = client_params.get('filter')
if filter_str:
try:
Model, _ = get_serializer(model_name_lower)
if not Model:
return
# 2. Check if the updated record ID could possibly match the client's filter
Model, _ = get_serializer(model_name_lower)
if not Model:
return
filter_q = Q()
filter_dict = ast.literal_eval(filter_str)
for key, value in filter_dict.items():
filter_q.add(Q(**{key: value}), Q.AND)
# Build a Q object from the client's filter dictionary
filter_q = Q()
filter_dict = client_params.get('filter')
if isinstance(filter_dict, dict):
for key, value in filter_dict.items():
filter_q.add(Q(**{key: value}), Q.AND)
# Combine the client's filter with the specific record's ID
combined_filter = Q(pk=record_id) & filter_q
matches = await database_sync_to_async(
Model.objects.filter(pk=record["id"]).filter(filter_q).exists
)()
# Check if the object actually exists with the combined filter
record_exists = await database_sync_to_async(Model.objects.filter(combined_filter).exists)()
if not record_exists:
return # The record does not match the client's filter, so don't send.
if not matches:
return # Record does not match the client's filter, so don't send.
except Exception:
return # Fail silently if filter is invalid or DB check fails.
# 3. Re-run the original query for just this single object.
# This correctly applies 'values', 'distinct_values', 'summary', etc.
single_record_params = client_params.copy()
single_record_params['filter'] = {'pk': record_id}
data = await database_sync_to_async(execute_data_query)(model_name_capitalized, single_record_params)
# 3. Create a tailored payload, respecting the 'values' parameter
payload_for_client = payload.copy()
values_str = client_params.get('values')
if values_str:
requested_values = values_str.split(',')
# The record from the signal contains all fields. Filter it down.
filtered_record = {key: record.get(key) for key in requested_values if key in record}
payload_for_client['record'] = filtered_record
# If the query returns no data (e.g., record was deleted or no longer matches), do nothing.
if not data or not data.get('rows'):
return
# 4. Send the final, tailored payload to the client
# 4. Build the final payload with the correctly-shaped record
payload_for_client = {
"name": model_name_lower,
"record": data['rows'][0]
}
# 5. Send the final, tailored payload to the client
await self.send_json({
"type": "realtime_update",
"payload": payload_for_client