131 lines
4.8 KiB
Python
131 lines
4.8 KiB
Python
import json
|
|
from channels.generic.websocket import AsyncJsonWebsocketConsumer
|
|
from channels.db import database_sync_to_async
|
|
from django.core.serializers.json import DjangoJSONEncoder
|
|
from .basic import execute_data_query
|
|
|
|
class DataConsumer(AsyncJsonWebsocketConsumer):
|
|
async def encode_json(self, content):
|
|
"""
|
|
Encode the given content as JSON, using Django's encoder
|
|
to handle dates, decimals, etc.
|
|
"""
|
|
return json.dumps(content, cls=DjangoJSONEncoder)
|
|
|
|
async def connect(self):
|
|
self.subscribed_groups = set()
|
|
self.subscription_params = {}
|
|
await self.accept()
|
|
|
|
async def disconnect(self, close_code):
|
|
for group_name in self.subscribed_groups:
|
|
await self.channel_layer.group_discard(group_name, self.channel_name)
|
|
|
|
async def receive_json(self, content):
|
|
action = content.get("action")
|
|
if action == "subscribe":
|
|
await self.handle_subscribe(content.get("payload", {}), content.get("request_id"))
|
|
|
|
async def handle_subscribe(self, payload, request_id):
|
|
model_name = payload.get("name")
|
|
params = payload.get("params", {})
|
|
|
|
if not model_name:
|
|
await self.send_json({"type": "error", "request_id": request_id, "message": "Model name is required."})
|
|
return
|
|
|
|
# Store subscription params for this client
|
|
self.subscription_params[model_name] = params
|
|
|
|
# Run the initial data query
|
|
data = await database_sync_to_async(execute_data_query)(model_name, params)
|
|
|
|
# Send the initial result back to the client
|
|
await self.send_json({
|
|
"type": "subscription_response",
|
|
"request_id": request_id,
|
|
"data": data
|
|
})
|
|
|
|
# Join the group using a lowercase model name to match the signal
|
|
group_name = f"model_{model_name.lower()}_updates"
|
|
if group_name not in self.subscribed_groups:
|
|
await self.channel_layer.group_add(group_name, self.channel_name)
|
|
self.subscribed_groups.add(group_name)
|
|
|
|
async def realtime_update(self, event):
|
|
# Move imports inside the method to prevent AppRegistryNotReady error on startup
|
|
from django.db.models import Q
|
|
from .views import get_serializer
|
|
|
|
payload = event["payload"]
|
|
change_type = payload.get("change_type")
|
|
record_id = payload["record"].get("id")
|
|
|
|
if not record_id or not change_type:
|
|
return
|
|
|
|
model_name_lower = payload["name"]
|
|
model_name_capitalized = model_name_lower.capitalize()
|
|
|
|
# 1. Get this client's subscription parameters for the specific model
|
|
client_params = self.subscription_params.get(model_name_capitalized)
|
|
if not client_params:
|
|
return # This client is not subscribed to this model.
|
|
|
|
# 2. Handle DELETION event
|
|
if change_type == 'deleted':
|
|
await self.send_json({
|
|
"type": "realtime_update",
|
|
"payload": {
|
|
"name": model_name_lower,
|
|
"change_type": "deleted",
|
|
"record": {"id": record_id}
|
|
}
|
|
})
|
|
return
|
|
|
|
# 3. Handle CREATION and UPDATE events
|
|
Model, _ = get_serializer(model_name_lower)
|
|
if not Model:
|
|
return
|
|
|
|
# Build a Q object from the client's filter dictionary
|
|
filter_q = Q()
|
|
filter_dict = client_params.get('filter')
|
|
if isinstance(filter_dict, dict):
|
|
filter_q = Q(**filter_dict)
|
|
|
|
# Check if the object actually exists with the combined filter
|
|
record_exists = await database_sync_to_async(
|
|
Model.objects.filter(Q(pk=record_id) & filter_q).exists
|
|
)()
|
|
|
|
if record_exists:
|
|
# If record matches filter, re-run the query to get the correctly shaped data
|
|
single_record_params = client_params.copy()
|
|
single_record_params['filter'] = {'pk': record_id}
|
|
|
|
data = await database_sync_to_async(execute_data_query)(model_name_capitalized, single_record_params)
|
|
|
|
if data and data.get('rows'):
|
|
payload_for_client = {
|
|
"name": model_name_lower,
|
|
"change_type": change_type,
|
|
"record": data['rows'][0]
|
|
}
|
|
await self.send_json({
|
|
"type": "realtime_update",
|
|
"payload": payload_for_client
|
|
})
|
|
else:
|
|
# If the record exists but no longer matches the filter, treat as a deletion for this client
|
|
await self.send_json({
|
|
"type": "realtime_update",
|
|
"payload": {
|
|
"name": model_name_lower,
|
|
"change_type": "deleted",
|
|
"record": {"id": record_id}
|
|
}
|
|
})
|