Creation d'un module http_client pour éviter la redondance et faciliter le logging. Ré-écriture en utilisant ce module

This commit is contained in:
2026-03-24 10:30:40 +01:00
parent bad0cb43bf
commit 177f224760
4 changed files with 145 additions and 37 deletions

View File

@@ -1,20 +1,21 @@
import base64
import requests
from cryptography.hazmat.primitives import serialization, hashes
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives.kdf.hkdf import HKDF
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
from config.constants import Urls
from tools.http_client import http_post, ApiError
class GetServerTokenForDiscord:
derived_key: bytes | None = None
@staticmethod
def authenticate(server = Urls.API_URL.value):
def authenticate(server: str | None = Urls.API_URL.value) -> str:
if server is None:
server = Urls.API_URL.value
# ==========================
# Génération clé ECDH client
# ==========================
@@ -29,14 +30,22 @@ class GetServerTokenForDiscord:
# ==========================
# AUTH
# ==========================
auth = requests.post(server + "/api_v2/auth", verify=False, json={
"client_pub": base64.b64encode(client_pub_pem).decode()
}).json()
try:
auth = http_post(
f"{server}/api_v2/auth",
json={"client_pub": base64.b64encode(client_pub_pem).decode()},
).json()
except ApiError:
raise
except ValueError as exc:
raise ApiError("Réponse JSON invalide lors de l'authentification.", url=f"{server}/api_v2/auth") from exc
server_pub = serialization.load_pem_public_key(
base64.b64decode(auth["server_pub"])
)
server_pub_b64 = auth.get("server_pub")
session_id = auth.get("session_id")
if not server_pub_b64 or not session_id:
raise ApiError("Réponse d'authentification invalide : données manquantes.", url=f"{server}/api_v2/auth")
server_pub = serialization.load_pem_public_key(base64.b64decode(server_pub_b64))
shared_key = client_private.exchange(ec.ECDH(), server_pub)
GetServerTokenForDiscord.derived_key = HKDF(
@@ -49,31 +58,56 @@ class GetServerTokenForDiscord:
return auth["session_id"]
@staticmethod
def get_token(session_id: bytes, server = Urls.API_URL.value):
def get_token(session_id: bytes, server: str | None = Urls.API_URL.value) -> bytes:
# ==========================
# DISCORD TOKEN
# ==========================
if server is None:
server = Urls.API_URL.value
download = requests.post(server + "/api_v2/tkn_auth", verify=False, headers={
"x-session-id": session_id
}).json()
nonce = base64.b64decode(download["nonce"])
encrypted_data = base64.b64decode(download["data"])
if GetServerTokenForDiscord.derived_key is None:
raise ApiError("Clé dérivée manquante : appelez authenticate() avant get_token().")
try:
download = http_post(
f"{server}/api_v2/tkn_auth",
headers={"x-session-id": session_id},
).json()
except ApiError:
raise
except ValueError as exc:
raise ApiError("Réponse JSON invalide lors de la récupération du token.",
url=f"{server}/api_v2/tkn_auth") from exc
nonce_b64 = download.get("nonce")
encrypted_data_b64 = download.get("data")
if not nonce_b64 or not encrypted_data_b64:
raise ApiError("Réponse de token invalide : nonce ou data manquant.", url=f"{server}/api_v2/tkn_auth")
nonce = base64.b64decode(nonce_b64)
encrypted_data = base64.b64decode(encrypted_data_b64)
aesgcm = AESGCM(GetServerTokenForDiscord.derived_key) # type: ignore[arg-type]
return aesgcm.decrypt(nonce, encrypted_data, None)
@staticmethod
def register_discord_user(discord_user_id: str, session_id: str, server = Urls.API_URL.value) -> bool:
def register_discord_user(
discord_user_id: str, session_id: str, server: str | None = Urls.API_URL.value) -> bool:
if server is None:
server = Urls.API_URL.value
registeredId = requests.post(server + "/api_v2/connection/register", verify=False, headers={
"x-session-id": session_id,
},
json={
"discord_id": discord_user_id
}).json()
return registeredId["success"]
try:
registration_data = http_post(
f"{server}/api_v2/connection/register",
headers={"x-session-id": session_id},
json={"discord_id": discord_user_id},
).json()
except ApiError:
raise
except ValueError as exc:
raise ApiError(
"Réponse JSON invalide lors de l'enregistrement Discord.",
url=f"{server}/api_v2/connection/register",
) from exc
return bool(registration_data.get("success", False))

View File

@@ -1,21 +1,22 @@
import requests
from urllib3 import disable_warnings
from urllib3.exceptions import InsecureRequestWarning
from tools.http_client import ApiError, http_get
from config.constants import PlayerServerInfo
# Supress only InsecureRequestWarning
disable_warnings(InsecureRequestWarning)
WHITELIST_URL_ENDPOINT = "iswhitelist/"
WHITELIST_URL_ENDPOINT = f'iswhitelist/'
class WhiteList:
@staticmethod
def checkwhitelist(url, discord_user_id: str) -> None:
def check_whitelist(url: str, discord_user_id: str) -> None:
try:
api_data = http_get(f"{url}{WHITELIST_URL_ENDPOINT}{discord_user_id}").json()
except ApiError:
raise
except ValueError as exc:
raise ApiError(
"Réponse JSON invalide lors de la vérification whitelist.",
url=f"{url}{WHITELIST_URL_ENDPOINT}{discord_user_id}",
) from exc
response = requests.get(url + WHITELIST_URL_ENDPOINT + discord_user_id, verify=False)
api_data = response.json()
PlayerServerInfo.is_whitelist = api_data.get('whitelisted', False)
PlayerServerInfo.is_staff = api_data.get('isStaff', False)
#PlayerServerInfo.is_staff = True
PlayerServerInfo.is_whitelist = api_data.get("whitelisted", False)
PlayerServerInfo.is_staff = api_data.get("isStaff", False)