143 lines
4.6 KiB
Python
143 lines
4.6 KiB
Python
from fastapi import FastAPI, HTTPException
|
|
from pydantic import BaseModel
|
|
import sqlite3
|
|
import time
|
|
|
|
DB_FILE = "cosmic_timer.db"
|
|
|
|
app = FastAPI()
|
|
|
|
def get_db_conn():
|
|
conn = sqlite3.connect(DB_FILE)
|
|
conn.row_factory = sqlite3.Row
|
|
return conn
|
|
|
|
class UserModel(BaseModel):
|
|
username: str
|
|
|
|
@app.post("/session/start")
|
|
def start_session(user: UserModel):
|
|
username = user.username.strip() or "Anonymous"
|
|
conn = get_db_conn()
|
|
c = conn.cursor()
|
|
|
|
# Ensure user exists
|
|
c.execute("SELECT 1 FROM users WHERE username = ?", (username,))
|
|
if not c.fetchone():
|
|
c.execute("INSERT INTO users (username) VALUES (?)", (username,))
|
|
|
|
# Insert session start
|
|
c.execute("INSERT INTO sessions (username) VALUES (?)", (username,))
|
|
session_id = c.lastrowid
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
return {"session_id": session_id, "username": username, "message": f"Session started for {username}"}
|
|
|
|
@app.post("/session/{session_id}/stop")
|
|
def stop_session(session_id: int):
|
|
conn = get_db_conn()
|
|
c = conn.cursor()
|
|
|
|
c.execute("SELECT start_time, username FROM sessions WHERE session_id = ?", (session_id,))
|
|
row = c.fetchone()
|
|
if not row:
|
|
conn.close()
|
|
raise HTTPException(status_code=404, detail="Session not found")
|
|
|
|
# Calculate duration
|
|
start_time_str = row["start_time"]
|
|
username = row["username"]
|
|
|
|
try:
|
|
start_time = time.mktime(time.strptime(start_time_str, "%Y-%m-%d %H:%M:%S"))
|
|
except Exception:
|
|
conn.close()
|
|
raise HTTPException(status_code=500, detail="Invalid start_time format in DB")
|
|
|
|
end_time = time.time()
|
|
duration_seconds = int(end_time - start_time)
|
|
|
|
# Update session end time and duration
|
|
c.execute("UPDATE sessions SET end_time = CURRENT_TIMESTAMP, duration_seconds = ? WHERE session_id = ?", (duration_seconds, session_id))
|
|
|
|
# Update user lifetime_seconds (add duration)
|
|
c.execute("SELECT lifetime_seconds FROM users WHERE username = ?", (username,))
|
|
current_lifetime = c.fetchone()[0] or 0
|
|
new_lifetime = current_lifetime + duration_seconds
|
|
|
|
# Recalculate carbon footprint and entropy score (simplified here)
|
|
CARBON_PER_SECOND_KG = 0.0000005
|
|
carbon_footprint = new_lifetime * CARBON_PER_SECOND_KG
|
|
|
|
# Get trivia event count for entropy calculation
|
|
c.execute("SELECT COUNT(*) FROM trivia_events WHERE username = ?", (username,))
|
|
trivia_events_count = c.fetchone()[0] or 0
|
|
|
|
def calculate_entropy_score(trivia_events_count, total_seconds):
|
|
import math
|
|
if total_seconds == 0 or trivia_events_count == 0:
|
|
return 0.0
|
|
p = trivia_events_count / total_seconds
|
|
entropy = -p * math.log2(p) - (1 - p) * math.log2(1 - p) if 0 < p < 1 else 0
|
|
return entropy * total_seconds
|
|
|
|
entropy_score = calculate_entropy_score(trivia_events_count, new_lifetime)
|
|
|
|
c.execute('''
|
|
UPDATE users SET lifetime_seconds = ?, carbon_footprint_kg = ?, entropy_score = ? WHERE username = ?
|
|
''', (new_lifetime, carbon_footprint, entropy_score, username))
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
return {
|
|
"session_id": session_id,
|
|
"username": username,
|
|
"duration_seconds": duration_seconds,
|
|
"carbon_footprint_kg": carbon_footprint,
|
|
"entropy_score": entropy_score,
|
|
"message": f"Session stopped and stats updated for {username}"
|
|
}
|
|
|
|
@app.get("/leaderboard")
|
|
def get_leaderboard():
|
|
conn = get_db_conn()
|
|
c = conn.cursor()
|
|
|
|
# Top 10 users by lifetime_seconds
|
|
c.execute("SELECT username, lifetime_seconds FROM users ORDER BY lifetime_seconds DESC LIMIT 10")
|
|
lifetime_lb = [{"username": r["username"], "lifetime_seconds": r["lifetime_seconds"]} for r in c.fetchall()]
|
|
|
|
# Top 10 users by trivia_points
|
|
c.execute("SELECT username, trivia_points FROM users ORDER BY trivia_points DESC LIMIT 10")
|
|
trivia_lb = [{"username": r["username"], "trivia_points": r["trivia_points"]} for r in c.fetchall()]
|
|
|
|
conn.close()
|
|
return {
|
|
"top_lifetime_users": lifetime_lb,
|
|
"top_trivia_users": trivia_lb,
|
|
}
|
|
|
|
@app.get("/user/{username}/stats")
|
|
def get_user_stats(username: str):
|
|
conn = get_db_conn()
|
|
c = conn.cursor()
|
|
|
|
c.execute("SELECT * FROM users WHERE username = ?", (username,))
|
|
row = c.fetchone()
|
|
if not row:
|
|
conn.close()
|
|
raise HTTPException(status_code=404, detail="User not found")
|
|
|
|
stats = {
|
|
"username": row["username"],
|
|
"lifetime_seconds": row["lifetime_seconds"],
|
|
"trivia_points": row["trivia_points"],
|
|
"carbon_footprint_kg": row["carbon_footprint_kg"],
|
|
"entropy_score": row["entropy_score"],
|
|
}
|
|
conn.close()
|
|
return stats
|
|
|