Files
PyQt6_LaTaniere/src/fivemserver/queuemanager.py

90 lines
2.9 KiB
Python

import requests
import time
from typing import Callable
from config.constants import Urls, ApiEndPoints
from config.constants import PlayerServerInfo # ← à ajouter si pas déjà importé
class QueueManager:
def __init__(self, user_id: str, on_update: Callable[[str], None]):
self.user_id = user_id
self.on_update = on_update
self._running = True
self._last_refresh = 0 # ← timestamp du dernier refresh
self.REFRESH_INTERVAL = 30 # ← en secondes
def stop(self):
self._running = False
def join_queue(self) -> dict:
res = requests.post(
f"{Urls.API_URL.value}{ApiEndPoints.QUEUE_JOIN.value}",
json={"uuid": self.user_id},
verify=False
)
return res.json()
def check_status(self) -> dict:
res = requests.get(
f"{Urls.API_URL.value}{ApiEndPoints.QUEUE_STATUS.value}/{self.user_id}"
)
return res.json()
def leave_queue(self):
requests.post(
f"{Urls.API_URL.value}{ApiEndPoints.QUEUE_LEAVE.value}",
json={"uuid": self.user_id},
verify=False
)
def refresh_session(self):
"""Informe le serveur que le client est toujours présent, sans changer le session_id."""
session_id = PlayerServerInfo.session_id
if not session_id:
return
try:
requests.post(
f"{Urls.API_URL.value}{ApiEndPoints.QUEUE_REFRESH.value}",
json={
"uuid": self.user_id,
"session_id": session_id,
},
verify=False,
timeout=5,
)
except requests.RequestException:
pass # On ignore silencieusement, le serveur expirera de lui-même
def start(self):
join = self.join_queue()
if join.get("position") == 0:
self.on_update("ok")
return
self.on_update(f"position:{join.get('position')}:{join.get('queueSize')}")
""" time.monotonic() est insensible aux changements d'heure système (heure d'été, NTP, etc.),
ce qui le rend plus fiable pour mesurer des intervalles."""
self._last_refresh = time.monotonic() # ← démarre le compteur après le join
while self._running:
time.sleep(5)
if not self._running:
break
# Refresh du session_id toutes les 30 secondes
now = time.monotonic()
if now - self._last_refresh >= self.REFRESH_INTERVAL:
self.refresh_session()
self._last_refresh = now
# Vérification de la position en file
status = self.check_status()
if status.get("position") == 0:
self.on_update("ready")
return
else:
self.on_update(f"position:{status.get('position')}:{status.get('queueSize')}")