project/shmuliar-FIX-01-store-websocket-push #27
@@ -32,7 +32,7 @@ class MapViewApp(App):
|
|||||||
Встановлює необхідні маркери, викликає функцію для оновлення мапи
|
Встановлює необхідні маркери, викликає функцію для оновлення мапи
|
||||||
"""
|
"""
|
||||||
self.update()
|
self.update()
|
||||||
Clock.schedule_interval(self.update, 5)
|
Clock.schedule_interval(self.update, 0.1)
|
||||||
|
|
||||||
def update(self, *args):
|
def update(self, *args):
|
||||||
"""
|
"""
|
||||||
|
|||||||
@@ -33,7 +33,7 @@ processed_agent_data = Table(
|
|||||||
)
|
)
|
||||||
|
|
||||||
# WebSocket subscriptions
|
# WebSocket subscriptions
|
||||||
subscriptions: Dict[int, Set[WebSocket]] = {}
|
subscriptions: Set[WebSocket] = set()
|
||||||
|
|
||||||
|
|
||||||
# FastAPI WebSocket endpoint
|
# FastAPI WebSocket endpoint
|
||||||
@@ -41,10 +41,7 @@ subscriptions: Dict[int, Set[WebSocket]] = {}
|
|||||||
async def websocket_endpoint(websocket: WebSocket, user_id: int):
|
async def websocket_endpoint(websocket: WebSocket, user_id: int):
|
||||||
await websocket.accept()
|
await websocket.accept()
|
||||||
|
|
||||||
if user_id not in subscriptions:
|
subscriptions.add(websocket)
|
||||||
subscriptions[user_id] = set()
|
|
||||||
|
|
||||||
subscriptions[user_id].add(websocket)
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# send already available data
|
# send already available data
|
||||||
@@ -55,20 +52,20 @@ async def websocket_endpoint(websocket: WebSocket, user_id: int):
|
|||||||
for i in jsonable_data:
|
for i in jsonable_data:
|
||||||
i['timestamp'] = i['timestamp'].strftime("%Y-%m-%dT%H:%M:%SZ")
|
i['timestamp'] = i['timestamp'].strftime("%Y-%m-%dT%H:%M:%SZ")
|
||||||
|
|
||||||
await websocket.send_json(json.dumps(jsonable_data))
|
for i in jsonable_data:
|
||||||
|
await websocket.send_json(json.dumps([i]))
|
||||||
|
|
||||||
# receive forever
|
# receive forever
|
||||||
while True:
|
while True:
|
||||||
await websocket.receive_text()
|
await websocket.receive_text()
|
||||||
except WebSocketDisconnect:
|
except WebSocketDisconnect:
|
||||||
subscriptions[user_id].remove(websocket)
|
subscriptions.remove(websocket)
|
||||||
|
|
||||||
|
|
||||||
# Function to send data to subscribed users
|
# Function to send data to subscribed users
|
||||||
async def send_data_to_subscribers(user_id: int, data):
|
async def send_data_to_subscribers(data):
|
||||||
if user_id in subscriptions:
|
for websocket in subscriptions:
|
||||||
for websocket in subscriptions[user_id]:
|
await websocket.send_json(json.dumps([data]))
|
||||||
await websocket.send_json(json.dumps(data))
|
|
||||||
|
|
||||||
|
|
||||||
# FastAPI CRUDL endpoints
|
# FastAPI CRUDL endpoints
|
||||||
@@ -100,7 +97,7 @@ async def create_processed_agent_data(data: List[ProcessedAgentData], user_id: i
|
|||||||
session.commit()
|
session.commit()
|
||||||
|
|
||||||
for record in created_records:
|
for record in created_records:
|
||||||
await send_data_to_subscribers(user_id, jsonable_encoder(record))
|
await send_data_to_subscribers(jsonable_encoder(record))
|
||||||
return created_records
|
return created_records
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
session.rollback()
|
session.rollback()
|
||||||
|
|||||||
Reference in New Issue
Block a user