feat: initialize interactive live-stream game backend MVP

- Add FastAPI backend with WebSocket support
- Implement ConnectionManager for client connections
- Create GameEngine with async game loop (2s tick)
- Add RuleBasedAgent for keyword-based responses
- Define Pydantic schemas for GameEvent protocol
- Create debug frontend dashboard for testing

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
empty
2025-12-30 14:58:38 +08:00
commit 714b5824ba
10 changed files with 834 additions and 0 deletions

6
backend/app/__init__.py Normal file
View File

@@ -0,0 +1,6 @@
"""
The Island - Interactive Live-Stream Game Backend
A commercial-grade backend for real-time game interactions.
"""
__version__ = "0.1.0"

82
backend/app/agents.py Normal file
View File

@@ -0,0 +1,82 @@
"""
Agent classes for processing game inputs and generating responses.
Provides base abstraction and rule-based implementation.
"""
from abc import ABC, abstractmethod
import logging
logger = logging.getLogger(__name__)
class BaseAgent(ABC):
"""
Abstract base class for all game agents.
Agents process input text and generate appropriate responses.
Subclasses must implement the process_input method.
"""
def __init__(self, name: str = "BaseAgent") -> None:
"""
Initialize the agent.
Args:
name: Display name for the agent
"""
self.name = name
@abstractmethod
def process_input(self, text: str) -> str:
"""
Process input text and return a response.
Args:
text: The input text to process
Returns:
The agent's response string
"""
pass
class RuleBasedAgent(BaseAgent):
"""
A simple rule-based agent that responds based on keyword matching.
This agent does not use LLMs - it returns static strings based on
detected keywords in the input text.
"""
def __init__(self, name: str = "RuleBot") -> None:
"""Initialize the rule-based agent."""
super().__init__(name)
self._rules: dict[str, str] = {
"attack": "Defending! Shield activated!",
"heal": "Healing spell cast! +50 HP",
"run": "Too slow! You can't escape!",
"help": "Allies are on the way!",
"fire": "Water shield deployed!",
"magic": "Counter-spell activated!",
}
self._default_response = "I heard you! Processing command..."
def process_input(self, text: str) -> str:
"""
Process input and return response based on keyword rules.
Args:
text: The input text to analyze
Returns:
Response string based on matched keywords
"""
text_lower = text.lower()
for keyword, response in self._rules.items():
if keyword in text_lower:
logger.info(f"Agent matched keyword: {keyword}")
return f"{self.name}: {response}"
logger.info("Agent using default response")
return f"{self.name}: {self._default_response}"

145
backend/app/engine.py Normal file
View File

@@ -0,0 +1,145 @@
"""
Core Game Engine - The "Heartbeat" of the game.
Runs the main game loop and coordinates between comments and agents.
"""
import asyncio
import logging
import random
import time
from typing import TYPE_CHECKING
from .schemas import GameEvent, EventType
from .agents import BaseAgent, RuleBasedAgent
if TYPE_CHECKING:
from .server import ConnectionManager
logger = logging.getLogger(__name__)
class GameEngine:
"""
The core game engine that runs the main game loop.
Simulates live comments, processes them through agents,
and broadcasts responses to connected clients.
"""
def __init__(self, connection_manager: "ConnectionManager") -> None:
"""
Initialize the game engine.
Args:
connection_manager: The WebSocket connection manager for broadcasting
"""
self._manager = connection_manager
self._agent: BaseAgent = RuleBasedAgent(name="Guardian")
self._running = False
self._tick_count = 0
self._tick_interval = 2.0 # seconds
# Mock comment templates
self._mock_users = ["User123", "GamerPro", "DragonSlayer", "NightOwl", "StarGazer"]
self._mock_actions = ["Attack!", "Heal me!", "Run away!", "Help!", "Fire spell!", "Magic blast!"]
@property
def is_running(self) -> bool:
"""Check if the engine is currently running."""
return self._running
def _generate_mock_comment(self) -> tuple[str, str]:
"""
Generate a mock live comment.
Returns:
Tuple of (username, comment_text)
"""
user = random.choice(self._mock_users)
action = random.choice(self._mock_actions)
return user, action
async def _broadcast_event(self, event_type: str, data: dict) -> None:
"""
Create and broadcast a game event.
Args:
event_type: Type of the event
data: Event payload data
"""
event = GameEvent(
event_type=event_type,
timestamp=time.time(),
data=data
)
await self._manager.broadcast(event)
async def process_comment(self, user: str, message: str) -> None:
"""
Process a comment (real or mock) through the agent pipeline.
Args:
user: Username of the commenter
message: The comment text
"""
# Broadcast the incoming comment
await self._broadcast_event(
EventType.COMMENT,
{"user": user, "message": message}
)
# Process through agent
full_comment = f"{user}: {message}"
response = self._agent.process_input(full_comment)
# Broadcast agent response
await self._broadcast_event(
EventType.AGENT_RESPONSE,
{"agent": self._agent.name, "response": response}
)
async def _game_loop(self) -> None:
"""
The main game loop - runs continuously while engine is active.
Every tick:
1. Simulates a mock live comment
2. Passes it to the agent
3. Broadcasts the response
"""
logger.info("Game loop started")
while self._running:
self._tick_count += 1
# Broadcast tick event
await self._broadcast_event(
EventType.TICK,
{"tick": self._tick_count}
)
# Generate and process mock comment
user, message = self._generate_mock_comment()
logger.info(f"Tick {self._tick_count}: {user} says '{message}'")
await self.process_comment(user, message)
# Wait for next tick
await asyncio.sleep(self._tick_interval)
logger.info("Game loop stopped")
async def start(self) -> None:
"""Start the game engine loop as a background task."""
if self._running:
logger.warning("Engine already running")
return
self._running = True
asyncio.create_task(self._game_loop())
logger.info("Game engine started")
async def stop(self) -> None:
"""Stop the game engine loop."""
self._running = False
logger.info("Game engine stopping...")

102
backend/app/main.py Normal file
View File

@@ -0,0 +1,102 @@
"""
FastAPI entry point for the interactive live-stream game backend.
Configures the application, WebSocket routes, and lifecycle events.
"""
import logging
from contextlib import asynccontextmanager
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.middleware.cors import CORSMiddleware
from .server import ConnectionManager
from .engine import GameEngine
from .schemas import GameEvent, ClientMessage, EventType
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
# Global instances
manager = ConnectionManager()
engine = GameEngine(manager)
@asynccontextmanager
async def lifespan(app: FastAPI):
"""
Application lifespan manager.
Starts the game engine on startup and stops it on shutdown.
"""
logger.info("Starting application...")
await engine.start()
yield
logger.info("Shutting down application...")
await engine.stop()
# Create FastAPI application
app = FastAPI(
title="The Island - Live Stream Game Backend",
description="Commercial-grade interactive live-stream game backend",
version="0.1.0",
lifespan=lifespan
)
# Configure CORS for frontend access
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.get("/")
async def root():
"""Health check endpoint."""
return {
"status": "running",
"service": "The Island Game Backend",
"connections": manager.connection_count,
"engine_running": engine.is_running
}
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
"""
WebSocket endpoint for real-time game communication.
Handles client connections and processes incoming messages.
"""
await manager.connect(websocket)
# Send welcome message
welcome = GameEvent(
event_type=EventType.SYSTEM,
data={"message": "Connected to The Island!"}
)
await manager.send_personal(websocket, welcome)
try:
while True:
# Receive and parse client message
data = await websocket.receive_json()
message = ClientMessage(**data)
# Handle mock comment action
if message.action == "send_comment":
user = message.payload.get("user", "Anonymous")
text = message.payload.get("message", "")
await engine.process_comment(user, text)
except WebSocketDisconnect:
manager.disconnect(websocket)
except Exception as e:
logger.error(f"WebSocket error: {e}")
manager.disconnect(websocket)

53
backend/app/schemas.py Normal file
View File

@@ -0,0 +1,53 @@
"""
Pydantic models for the JSON protocol.
Defines standardized structures for game events and messages.
"""
from enum import Enum
from typing import Any
from pydantic import BaseModel, Field
import time
class EventType(str, Enum):
"""Enumeration of all possible game event types."""
COMMENT = "comment"
AGENT_RESPONSE = "agent_response"
TICK = "tick"
SYSTEM = "system"
ERROR = "error"
class GameEvent(BaseModel):
"""
Standardized game event structure for WebSocket communication.
Attributes:
event_type: The type of event (comment, agent_response, tick, etc.)
timestamp: Unix timestamp when the event was created
data: Arbitrary payload data for the event
"""
event_type: str = Field(..., description="Type of the game event")
timestamp: float = Field(default_factory=time.time, description="Unix timestamp")
data: dict[str, Any] = Field(default_factory=dict, description="Event payload")
class Config:
json_schema_extra = {
"example": {
"event_type": "comment",
"timestamp": 1704067200.0,
"data": {"user": "User123", "message": "Attack!"}
}
}
class ClientMessage(BaseModel):
"""
Message structure for client-to-server communication.
Attributes:
action: The action the client wants to perform
payload: Data associated with the action
"""
action: str = Field(..., description="Action to perform")
payload: dict[str, Any] = Field(default_factory=dict, description="Action payload")

90
backend/app/server.py Normal file
View File

@@ -0,0 +1,90 @@
"""
WebSocket connection manager and server utilities.
Handles client connections, disconnections, and message broadcasting.
"""
import logging
from typing import Any
from fastapi import WebSocket
from .schemas import GameEvent
logger = logging.getLogger(__name__)
class ConnectionManager:
"""
Manages WebSocket connections for real-time communication.
Handles connection lifecycle and provides broadcast capabilities
to all connected clients.
"""
def __init__(self) -> None:
"""Initialize the connection manager with an empty connection list."""
self._active_connections: list[WebSocket] = []
@property
def connection_count(self) -> int:
"""Return the number of active connections."""
return len(self._active_connections)
async def connect(self, websocket: WebSocket) -> None:
"""
Accept and register a new WebSocket connection.
Args:
websocket: The WebSocket connection to accept
"""
await websocket.accept()
self._active_connections.append(websocket)
logger.info(f"Client connected. Total connections: {self.connection_count}")
def disconnect(self, websocket: WebSocket) -> None:
"""
Remove a WebSocket connection from the active list.
Args:
websocket: The WebSocket connection to remove
"""
if websocket in self._active_connections:
self._active_connections.remove(websocket)
logger.info(f"Client disconnected. Total connections: {self.connection_count}")
async def broadcast(self, event: GameEvent) -> None:
"""
Send a GameEvent to all connected clients.
Args:
event: The GameEvent to broadcast
"""
if not self._active_connections:
return
message = event.model_dump_json()
disconnected: list[WebSocket] = []
for connection in self._active_connections:
try:
await connection.send_text(message)
except Exception as e:
logger.warning(f"Failed to send to client: {e}")
disconnected.append(connection)
# Clean up failed connections
for conn in disconnected:
self.disconnect(conn)
async def send_personal(self, websocket: WebSocket, event: GameEvent) -> None:
"""
Send a GameEvent to a specific client.
Args:
websocket: The target WebSocket connection
event: The GameEvent to send
"""
try:
await websocket.send_text(event.model_dump_json())
except Exception as e:
logger.warning(f"Failed to send personal message: {e}")
self.disconnect(websocket)