|
|
|
|
@@ -33,7 +33,7 @@ processed_agent_data = Table(
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
# WebSocket subscriptions
|
|
|
|
|
subscriptions: Dict[int, Set[WebSocket]] = {}
|
|
|
|
|
subscriptions: Set[WebSocket] = set()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# FastAPI WebSocket endpoint
|
|
|
|
|
@@ -41,10 +41,7 @@ subscriptions: Dict[int, Set[WebSocket]] = {}
|
|
|
|
|
async def websocket_endpoint(websocket: WebSocket, user_id: int):
|
|
|
|
|
await websocket.accept()
|
|
|
|
|
|
|
|
|
|
if user_id not in subscriptions:
|
|
|
|
|
subscriptions[user_id] = set()
|
|
|
|
|
|
|
|
|
|
subscriptions[user_id].add(websocket)
|
|
|
|
|
subscriptions.add(websocket)
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
# send already available data
|
|
|
|
|
@@ -55,20 +52,20 @@ async def websocket_endpoint(websocket: WebSocket, user_id: int):
|
|
|
|
|
for i in jsonable_data:
|
|
|
|
|
i['timestamp'] = i['timestamp'].strftime("%Y-%m-%dT%H:%M:%SZ")
|
|
|
|
|
|
|
|
|
|
await websocket.send_json(json.dumps(jsonable_data))
|
|
|
|
|
for i in jsonable_data:
|
|
|
|
|
await websocket.send_json(json.dumps([i]))
|
|
|
|
|
|
|
|
|
|
# receive forever
|
|
|
|
|
while True:
|
|
|
|
|
await websocket.receive_text()
|
|
|
|
|
except WebSocketDisconnect:
|
|
|
|
|
subscriptions[user_id].remove(websocket)
|
|
|
|
|
subscriptions.remove(websocket)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# Function to send data to subscribed users
|
|
|
|
|
async def send_data_to_subscribers(user_id: int, data):
|
|
|
|
|
if user_id in subscriptions:
|
|
|
|
|
for websocket in subscriptions[user_id]:
|
|
|
|
|
await websocket.send_json(json.dumps(data))
|
|
|
|
|
async def send_data_to_subscribers(data):
|
|
|
|
|
for websocket in subscriptions:
|
|
|
|
|
await websocket.send_json(json.dumps([data]))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# FastAPI CRUDL endpoints
|
|
|
|
|
@@ -100,7 +97,7 @@ async def create_processed_agent_data(data: List[ProcessedAgentData], user_id: i
|
|
|
|
|
session.commit()
|
|
|
|
|
|
|
|
|
|
for record in created_records:
|
|
|
|
|
await send_data_to_subscribers(user_id, jsonable_encoder(record))
|
|
|
|
|
await send_data_to_subscribers(jsonable_encoder(record))
|
|
|
|
|
return created_records
|
|
|
|
|
except Exception as err:
|
|
|
|
|
session.rollback()
|
|
|
|
|
|