from fastapi import FastAPI, HTTPException from pydantic import BaseModel import sqlite3 import time DB_FILE = "cosmic_timer.db" app = FastAPI() def get_db_conn(): conn = sqlite3.connect(DB_FILE) conn.row_factory = sqlite3.Row return conn class UserModel(BaseModel): username: str @app.post("/session/start") def start_session(user: UserModel): username = user.username.strip() or "Anonymous" conn = get_db_conn() c = conn.cursor() # Ensure user exists c.execute("SELECT 1 FROM users WHERE username = ?", (username,)) if not c.fetchone(): c.execute("INSERT INTO users (username) VALUES (?)", (username,)) # Insert session start c.execute("INSERT INTO sessions (username) VALUES (?)", (username,)) session_id = c.lastrowid conn.commit() conn.close() return {"session_id": session_id, "username": username, "message": f"Session started for {username}"} @app.post("/session/{session_id}/stop") def stop_session(session_id: int): conn = get_db_conn() c = conn.cursor() c.execute("SELECT start_time, username FROM sessions WHERE session_id = ?", (session_id,)) row = c.fetchone() if not row: conn.close() raise HTTPException(status_code=404, detail="Session not found") # Calculate duration start_time_str = row["start_time"] username = row["username"] try: start_time = time.mktime(time.strptime(start_time_str, "%Y-%m-%d %H:%M:%S")) except Exception: conn.close() raise HTTPException(status_code=500, detail="Invalid start_time format in DB") end_time = time.time() duration_seconds = int(end_time - start_time) # Update session end time and duration c.execute("UPDATE sessions SET end_time = CURRENT_TIMESTAMP, duration_seconds = ? WHERE session_id = ?", (duration_seconds, session_id)) # Update user lifetime_seconds (add duration) c.execute("SELECT lifetime_seconds FROM users WHERE username = ?", (username,)) current_lifetime = c.fetchone()[0] or 0 new_lifetime = current_lifetime + duration_seconds # Recalculate carbon footprint and entropy score (simplified here) CARBON_PER_SECOND_KG = 0.0000005 carbon_footprint = new_lifetime * CARBON_PER_SECOND_KG # Get trivia event count for entropy calculation c.execute("SELECT COUNT(*) FROM trivia_events WHERE username = ?", (username,)) trivia_events_count = c.fetchone()[0] or 0 def calculate_entropy_score(trivia_events_count, total_seconds): import math if total_seconds == 0 or trivia_events_count == 0: return 0.0 p = trivia_events_count / total_seconds entropy = -p * math.log2(p) - (1 - p) * math.log2(1 - p) if 0 < p < 1 else 0 return entropy * total_seconds entropy_score = calculate_entropy_score(trivia_events_count, new_lifetime) c.execute(''' UPDATE users SET lifetime_seconds = ?, carbon_footprint_kg = ?, entropy_score = ? WHERE username = ? ''', (new_lifetime, carbon_footprint, entropy_score, username)) conn.commit() conn.close() return { "session_id": session_id, "username": username, "duration_seconds": duration_seconds, "carbon_footprint_kg": carbon_footprint, "entropy_score": entropy_score, "message": f"Session stopped and stats updated for {username}" } @app.get("/leaderboard") def get_leaderboard(): conn = get_db_conn() c = conn.cursor() # Top 10 users by lifetime_seconds c.execute("SELECT username, lifetime_seconds FROM users ORDER BY lifetime_seconds DESC LIMIT 10") lifetime_lb = [{"username": r["username"], "lifetime_seconds": r["lifetime_seconds"]} for r in c.fetchall()] # Top 10 users by trivia_points c.execute("SELECT username, trivia_points FROM users ORDER BY trivia_points DESC LIMIT 10") trivia_lb = [{"username": r["username"], "trivia_points": r["trivia_points"]} for r in c.fetchall()] conn.close() return { "top_lifetime_users": lifetime_lb, "top_trivia_users": trivia_lb, } @app.get("/user/{username}/stats") def get_user_stats(username: str): conn = get_db_conn() c = conn.cursor() c.execute("SELECT * FROM users WHERE username = ?", (username,)) row = c.fetchone() if not row: conn.close() raise HTTPException(status_code=404, detail="User not found") stats = { "username": row["username"], "lifetime_seconds": row["lifetime_seconds"], "trivia_points": row["trivia_points"], "carbon_footprint_kg": row["carbon_footprint_kg"], "entropy_score": row["entropy_score"], } conn.close() return stats