feat: implement AI Director & Narrative Voting System (Phase 9)
Add complete AI Director system that transforms the survival simulation into a user-driven interactive story with audience voting. Backend: - Add DirectorService for LLM-powered plot generation with fallback templates - Add VoteManager for dual-channel voting (Twitch + Unity) - Integrate 4-phase game loop: Simulation → Narrative → Voting → Resolution - Add vote command parsing (!1, !2, !A, !B) in Twitch service - Add type-safe LLM output handling with _coerce_int() helper - Normalize voter IDs for case-insensitive duplicate prevention Unity Client: - Add NarrativeUI for cinematic event cards and voting progress bars - Add 7 new event types and data models for director/voting events - Add delayed subscription coroutine for NetworkManager timing - Sync client timer with server's remaining_seconds to prevent drift Documentation: - Update README.md with AI Director features, voting commands, and event types 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
562
backend/app/director_service.py
Normal file
562
backend/app/director_service.py
Normal file
@@ -0,0 +1,562 @@
|
||||
"""
|
||||
AI Director Service - Narrative Control Module (Phase 9).
|
||||
|
||||
The Director acts as the Dungeon Master for the survival drama,
|
||||
generating dramatic plot points and resolving audience votes.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import random
|
||||
import time
|
||||
import uuid
|
||||
from dataclasses import dataclass, field
|
||||
from enum import Enum
|
||||
from typing import Any
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class GameMode(str, Enum):
|
||||
"""Game engine operating modes."""
|
||||
SIMULATION = "simulation" # Normal agent behavior
|
||||
NARRATIVE = "narrative" # Director presents plot point
|
||||
VOTING = "voting" # Audience voting window
|
||||
RESOLUTION = "resolution" # Applying vote consequences
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class PlotChoice:
|
||||
"""A choice option in a plot point."""
|
||||
choice_id: str
|
||||
text: str
|
||||
effects: dict[str, Any] = field(default_factory=dict)
|
||||
|
||||
|
||||
@dataclass
|
||||
class PlotPoint:
|
||||
"""A narrative event generated by the Director."""
|
||||
plot_id: str
|
||||
title: str
|
||||
description: str
|
||||
choices: list[PlotChoice]
|
||||
ttl_seconds: int = 60
|
||||
created_at: float = field(default_factory=time.time)
|
||||
|
||||
def to_dict(self) -> dict[str, Any]:
|
||||
"""Convert to dictionary for broadcasting."""
|
||||
return {
|
||||
"plot_id": self.plot_id,
|
||||
"title": self.title,
|
||||
"description": self.description,
|
||||
"choices": [
|
||||
{"choice_id": c.choice_id, "text": c.text}
|
||||
for c in self.choices
|
||||
],
|
||||
"ttl_seconds": self.ttl_seconds,
|
||||
}
|
||||
|
||||
|
||||
@dataclass
|
||||
class ResolutionResult:
|
||||
"""Result of resolving a plot point vote."""
|
||||
plot_id: str
|
||||
choice_id: str
|
||||
message: str
|
||||
effects: dict[str, Any] = field(default_factory=dict)
|
||||
|
||||
def to_dict(self) -> dict[str, Any]:
|
||||
"""Convert to dictionary for broadcasting."""
|
||||
return {
|
||||
"plot_id": self.plot_id,
|
||||
"choice_id": self.choice_id,
|
||||
"message": self.message,
|
||||
"effects_json": json.dumps(self.effects),
|
||||
}
|
||||
|
||||
|
||||
# Fallback templates when LLM is unavailable
|
||||
FALLBACK_PLOT_TEMPLATES = [
|
||||
{
|
||||
"title": "Mysterious Footprints",
|
||||
"description": "Strange footprints appear on the beach overnight. Someone - or something - has been watching the camp.",
|
||||
"choices": [
|
||||
PlotChoice("investigate", "Follow the tracks into the forest", {"risk": "medium", "reward": "discovery"}),
|
||||
PlotChoice("fortify", "Strengthen camp defenses and wait", {"safety": "high", "mood_delta": -5}),
|
||||
],
|
||||
},
|
||||
{
|
||||
"title": "Supply Shortage",
|
||||
"description": "The food stores are running dangerously low. Tension builds among the survivors.",
|
||||
"choices": [
|
||||
PlotChoice("ration", "Implement strict rationing for everyone", {"mood_delta": -10, "food_save": 2}),
|
||||
PlotChoice("hunt", "Send a group on a risky hunting expedition", {"risk": "high", "food_gain": 3}),
|
||||
],
|
||||
},
|
||||
{
|
||||
"title": "Storm Warning",
|
||||
"description": "Dark clouds gather on the horizon. A massive storm approaches the island.",
|
||||
"choices": [
|
||||
PlotChoice("shelter", "Everyone take shelter immediately", {"safety": "high", "mood_delta": 5}),
|
||||
PlotChoice("salvage", "Quickly gather supplies before the storm hits", {"risk": "medium", "resource_gain": 2}),
|
||||
],
|
||||
},
|
||||
{
|
||||
"title": "Trust Crisis",
|
||||
"description": "Accusations fly as valuable supplies go missing from the camp.",
|
||||
"choices": [
|
||||
PlotChoice("accuse", "Hold a trial to find the culprit", {"drama": "high", "relationship_delta": -5}),
|
||||
PlotChoice("forgive", "Call for unity and move on together", {"mood_delta": 3, "trust": "restored"}),
|
||||
],
|
||||
},
|
||||
{
|
||||
"title": "Rescue Signal",
|
||||
"description": "A faint light flickers on the distant horizon. Could it be a ship?",
|
||||
"choices": [
|
||||
PlotChoice("signal", "Build a massive signal fire on the beach", {"energy_delta": -15, "hope": "high"}),
|
||||
PlotChoice("wait", "Wait and observe - it could be dangerous", {"safety": "medium", "mood_delta": -3}),
|
||||
],
|
||||
},
|
||||
]
|
||||
|
||||
|
||||
class DirectorService:
|
||||
"""
|
||||
AI Director for generating and resolving narrative events.
|
||||
Uses LLM to create dramatic plot points based on world state.
|
||||
"""
|
||||
|
||||
def __init__(self, llm_service=None) -> None:
|
||||
"""
|
||||
Initialize the Director service.
|
||||
|
||||
Args:
|
||||
llm_service: Optional LLMService instance. If None, uses global instance.
|
||||
"""
|
||||
self._llm_service = llm_service
|
||||
self._rng = random.Random()
|
||||
self._current_plot: PlotPoint | None = None
|
||||
self._plot_history: list[str] = [] # Recent plot titles to avoid repetition
|
||||
|
||||
@property
|
||||
def llm(self):
|
||||
"""Lazy-load LLM service to avoid circular imports."""
|
||||
if self._llm_service is None:
|
||||
from .llm import llm_service
|
||||
self._llm_service = llm_service
|
||||
return self._llm_service
|
||||
|
||||
@property
|
||||
def current_plot(self) -> PlotPoint | None:
|
||||
"""Get the current active plot point."""
|
||||
return self._current_plot
|
||||
|
||||
def clear_current_plot(self) -> None:
|
||||
"""Clear the current plot after resolution."""
|
||||
if self._current_plot:
|
||||
self._plot_history.append(self._current_plot.title)
|
||||
# Keep only last 5 titles to avoid repetition
|
||||
self._plot_history = self._plot_history[-5:]
|
||||
self._current_plot = None
|
||||
|
||||
async def generate_plot_point(self, world_state: dict[str, Any]) -> PlotPoint:
|
||||
"""
|
||||
Generate a dramatic plot point based on current world state.
|
||||
|
||||
Args:
|
||||
world_state: Dictionary containing:
|
||||
- day: Current game day
|
||||
- weather: Current weather condition
|
||||
- time_of_day: dawn/day/dusk/night
|
||||
- alive_agents: List of alive agent summaries
|
||||
- recent_events: List of recent event descriptions
|
||||
- tension_level: low/medium/high (derived from deaths, resources, etc.)
|
||||
|
||||
Returns:
|
||||
PlotPoint with title, description, and 2 choices
|
||||
"""
|
||||
# Extract context
|
||||
day = world_state.get("day", 1)
|
||||
weather = world_state.get("weather", "Sunny")
|
||||
alive_count = len(world_state.get("alive_agents", []))
|
||||
recent_events = world_state.get("recent_events", [])
|
||||
tension_level = world_state.get("tension_level", "medium")
|
||||
mood_avg = world_state.get("mood_avg", 50)
|
||||
|
||||
# Build context summary
|
||||
agents_summary = ", ".join([
|
||||
f"{a.get('name', 'Unknown')} (HP:{a.get('hp', 0)})"
|
||||
for a in world_state.get("alive_agents", [])[:5]
|
||||
]) or "No agents alive"
|
||||
|
||||
events_summary = "; ".join(recent_events[-3:]) if recent_events else "Nothing notable recently"
|
||||
|
||||
# Try LLM generation first
|
||||
if not self.llm.is_mock_mode:
|
||||
try:
|
||||
plot = await self._generate_llm_plot(
|
||||
day=day,
|
||||
weather=weather,
|
||||
alive_count=alive_count,
|
||||
agents_summary=agents_summary,
|
||||
events_summary=events_summary,
|
||||
tension_level=tension_level,
|
||||
mood_avg=mood_avg,
|
||||
)
|
||||
if plot:
|
||||
self._current_plot = plot
|
||||
return plot
|
||||
except Exception as e:
|
||||
logger.error(f"LLM plot generation failed: {e}")
|
||||
|
||||
# Fallback to template-based generation
|
||||
plot = self._generate_fallback_plot(weather, tension_level, mood_avg)
|
||||
self._current_plot = plot
|
||||
return plot
|
||||
|
||||
async def _generate_llm_plot(
|
||||
self,
|
||||
day: int,
|
||||
weather: str,
|
||||
alive_count: int,
|
||||
agents_summary: str,
|
||||
events_summary: str,
|
||||
tension_level: str,
|
||||
mood_avg: int,
|
||||
) -> PlotPoint | None:
|
||||
"""Generate plot point using LLM."""
|
||||
|
||||
# Build the prompt for the AI Director
|
||||
system_prompt = f"""You are the AI Director for a survival drama on a deserted island.
|
||||
Your role is to create dramatic narrative moments that engage the audience.
|
||||
|
||||
CURRENT SITUATION:
|
||||
- Day {day} on the island
|
||||
- Weather: {weather}
|
||||
- Survivors: {alive_count} ({agents_summary})
|
||||
- Recent events: {events_summary}
|
||||
- Tension level: {tension_level}
|
||||
- Average mood: {mood_avg}/100
|
||||
|
||||
RECENTLY USED PLOTS (avoid these):
|
||||
{', '.join(self._plot_history) if self._plot_history else 'None yet'}
|
||||
|
||||
GUIDELINES:
|
||||
1. Create dramatic tension appropriate to the tension level
|
||||
2. Choices should have meaningful trade-offs
|
||||
3. Consider weather and mood in your narrative
|
||||
4. Keep descriptions cinematic but brief (under 50 words)
|
||||
|
||||
OUTPUT FORMAT (strict JSON):
|
||||
{{
|
||||
"title": "Brief dramatic title (3-5 words)",
|
||||
"description": "Cinematic description of the situation (under 50 words)",
|
||||
"choices": [
|
||||
{{"id": "choice_a", "text": "First option (under 15 words)", "effects": {{"mood_delta": 5}}}},
|
||||
{{"id": "choice_b", "text": "Second option (under 15 words)", "effects": {{"mood_delta": -5}}}}
|
||||
]
|
||||
}}"""
|
||||
|
||||
user_prompt = f"""The current tension is {tension_level}.
|
||||
{"Create an intense, high-stakes event!" if tension_level == "high" else "Create an interesting event to raise the drama." if tension_level == "low" else "Create a moderately dramatic event."}
|
||||
|
||||
Generate a plot point now. Output ONLY valid JSON, no explanation."""
|
||||
|
||||
try:
|
||||
# Use LLM service's internal acompletion
|
||||
kwargs = {
|
||||
"model": self.llm._model,
|
||||
"messages": [
|
||||
{"role": "system", "content": system_prompt},
|
||||
{"role": "user", "content": user_prompt}
|
||||
],
|
||||
"max_tokens": 300,
|
||||
"temperature": 0.9,
|
||||
}
|
||||
if self.llm._api_base:
|
||||
kwargs["api_base"] = self.llm._api_base
|
||||
if self.llm._api_key and not self.llm._api_key_header:
|
||||
kwargs["api_key"] = self.llm._api_key
|
||||
if self.llm._extra_headers:
|
||||
kwargs["extra_headers"] = self.llm._extra_headers
|
||||
|
||||
response = await self.llm._acompletion(**kwargs)
|
||||
content = response.choices[0].message.content.strip()
|
||||
|
||||
# Parse JSON response
|
||||
# Handle potential markdown code blocks
|
||||
if content.startswith("```"):
|
||||
content = content.split("```")[1]
|
||||
if content.startswith("json"):
|
||||
content = content[4:]
|
||||
|
||||
data = json.loads(content)
|
||||
|
||||
# Validate and construct PlotPoint
|
||||
choices = [
|
||||
PlotChoice(
|
||||
choice_id=c.get("id", f"choice_{i}"),
|
||||
text=c.get("text", "Unknown option"),
|
||||
effects=c.get("effects", {}),
|
||||
)
|
||||
for i, c in enumerate(data.get("choices", []))
|
||||
]
|
||||
|
||||
if len(choices) < 2:
|
||||
logger.warning("LLM returned fewer than 2 choices, using fallback")
|
||||
return None
|
||||
|
||||
return PlotPoint(
|
||||
plot_id=uuid.uuid4().hex,
|
||||
title=data.get("title", "Unexpected Event"),
|
||||
description=data.get("description", "Something happens..."),
|
||||
choices=choices[:2], # Limit to 2 choices
|
||||
ttl_seconds=60,
|
||||
)
|
||||
|
||||
except json.JSONDecodeError as e:
|
||||
logger.error(f"Failed to parse LLM JSON response: {e}")
|
||||
return None
|
||||
except Exception as e:
|
||||
logger.error(f"LLM plot generation error: {e}")
|
||||
return None
|
||||
|
||||
def _generate_fallback_plot(
|
||||
self,
|
||||
weather: str,
|
||||
tension_level: str,
|
||||
mood_avg: int,
|
||||
) -> PlotPoint:
|
||||
"""Generate plot point from templates when LLM is unavailable."""
|
||||
|
||||
# Filter templates based on context
|
||||
available = [t for t in FALLBACK_PLOT_TEMPLATES if t["title"] not in self._plot_history]
|
||||
|
||||
if not available:
|
||||
available = FALLBACK_PLOT_TEMPLATES
|
||||
|
||||
# Weight selection based on weather and tension
|
||||
if weather.lower() in ("stormy", "rainy", "thunder"):
|
||||
# Prefer storm-related plots
|
||||
storm_plots = [t for t in available if "storm" in t["title"].lower()]
|
||||
if storm_plots:
|
||||
available = storm_plots
|
||||
elif tension_level == "low" and mood_avg > 60:
|
||||
# Prefer dramatic plots to shake things up
|
||||
drama_plots = [t for t in available if "crisis" in t["title"].lower() or "trust" in t["title"].lower()]
|
||||
if drama_plots:
|
||||
available = drama_plots
|
||||
|
||||
template = self._rng.choice(available)
|
||||
|
||||
return PlotPoint(
|
||||
plot_id=uuid.uuid4().hex,
|
||||
title=template["title"],
|
||||
description=template["description"],
|
||||
choices=list(template["choices"]),
|
||||
ttl_seconds=60,
|
||||
)
|
||||
|
||||
async def resolve_vote(
|
||||
self,
|
||||
plot_point: PlotPoint,
|
||||
winning_choice_id: str,
|
||||
world_state: dict[str, Any],
|
||||
) -> ResolutionResult:
|
||||
"""
|
||||
Resolve the vote and generate consequences.
|
||||
|
||||
Args:
|
||||
plot_point: The PlotPoint that was voted on
|
||||
winning_choice_id: The ID of the winning choice
|
||||
world_state: Current world state for context
|
||||
|
||||
Returns:
|
||||
ResolutionResult with message and effects to apply
|
||||
"""
|
||||
# Find the winning choice
|
||||
winning_choice = next(
|
||||
(c for c in plot_point.choices if c.choice_id == winning_choice_id),
|
||||
plot_point.choices[0] # Fallback to first choice
|
||||
)
|
||||
|
||||
# Try LLM resolution first
|
||||
if not self.llm.is_mock_mode:
|
||||
try:
|
||||
result = await self._generate_llm_resolution(
|
||||
plot_point=plot_point,
|
||||
winning_choice=winning_choice,
|
||||
world_state=world_state,
|
||||
)
|
||||
if result:
|
||||
return result
|
||||
except Exception as e:
|
||||
logger.error(f"LLM resolution failed: {e}")
|
||||
|
||||
# Fallback resolution
|
||||
return self._generate_fallback_resolution(plot_point, winning_choice)
|
||||
|
||||
async def _generate_llm_resolution(
|
||||
self,
|
||||
plot_point: PlotPoint,
|
||||
winning_choice: PlotChoice,
|
||||
world_state: dict[str, Any],
|
||||
) -> ResolutionResult | None:
|
||||
"""Generate resolution using LLM."""
|
||||
|
||||
agents_summary = ", ".join([
|
||||
a.get("name", "Unknown")
|
||||
for a in world_state.get("alive_agents", [])[:5]
|
||||
]) or "the survivors"
|
||||
|
||||
system_prompt = f"""You are the AI Director narrating the consequences of an audience vote.
|
||||
|
||||
THE SITUATION:
|
||||
{plot_point.description}
|
||||
|
||||
THE AUDIENCE VOTED FOR:
|
||||
"{winning_choice.text}"
|
||||
|
||||
SURVIVORS INVOLVED:
|
||||
{agents_summary}
|
||||
|
||||
GUIDELINES:
|
||||
1. Describe the immediate consequence dramatically
|
||||
2. Mention how the survivors react
|
||||
3. Keep it brief but impactful (under 40 words)
|
||||
4. The effects should feel meaningful
|
||||
|
||||
OUTPUT FORMAT (strict JSON):
|
||||
{{
|
||||
"message": "Dramatic description of what happens...",
|
||||
"effects": {{
|
||||
"mood_delta": -5,
|
||||
"hp_delta": 0,
|
||||
"energy_delta": -10,
|
||||
"item_gained": null,
|
||||
"item_lost": null,
|
||||
"relationship_change": null
|
||||
}}
|
||||
}}"""
|
||||
|
||||
try:
|
||||
kwargs = {
|
||||
"model": self.llm._model,
|
||||
"messages": [
|
||||
{"role": "system", "content": system_prompt},
|
||||
{"role": "user", "content": f"Narrate the consequence of choosing: {winning_choice.text}"}
|
||||
],
|
||||
"max_tokens": 200,
|
||||
"temperature": 0.8,
|
||||
}
|
||||
if self.llm._api_base:
|
||||
kwargs["api_base"] = self.llm._api_base
|
||||
if self.llm._api_key and not self.llm._api_key_header:
|
||||
kwargs["api_key"] = self.llm._api_key
|
||||
if self.llm._extra_headers:
|
||||
kwargs["extra_headers"] = self.llm._extra_headers
|
||||
|
||||
response = await self.llm._acompletion(**kwargs)
|
||||
content = response.choices[0].message.content.strip()
|
||||
|
||||
# Handle markdown code blocks
|
||||
if content.startswith("```"):
|
||||
content = content.split("```")[1]
|
||||
if content.startswith("json"):
|
||||
content = content[4:]
|
||||
|
||||
data = json.loads(content)
|
||||
|
||||
# Merge LLM effects with choice's predefined effects
|
||||
effects = {**winning_choice.effects, **data.get("effects", {})}
|
||||
|
||||
return ResolutionResult(
|
||||
plot_id=plot_point.plot_id,
|
||||
choice_id=winning_choice.choice_id,
|
||||
message=data.get("message", f"The survivors chose: {winning_choice.text}"),
|
||||
effects=effects,
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"LLM resolution error: {e}")
|
||||
return None
|
||||
|
||||
def _generate_fallback_resolution(
|
||||
self,
|
||||
plot_point: PlotPoint,
|
||||
winning_choice: PlotChoice,
|
||||
) -> ResolutionResult:
|
||||
"""Generate fallback resolution message."""
|
||||
|
||||
# Template-based resolution messages
|
||||
messages = [
|
||||
f"The decision is made! {winning_choice.text}",
|
||||
f"The survivors act: {winning_choice.text}",
|
||||
f"Following the audience's choice: {winning_choice.text}",
|
||||
]
|
||||
|
||||
return ResolutionResult(
|
||||
plot_id=plot_point.plot_id,
|
||||
choice_id=winning_choice.choice_id,
|
||||
message=self._rng.choice(messages),
|
||||
effects=dict(winning_choice.effects),
|
||||
)
|
||||
|
||||
def calculate_tension_level(self, world_state: dict[str, Any]) -> str:
|
||||
"""
|
||||
Calculate the current tension level based on world state.
|
||||
|
||||
Args:
|
||||
world_state: Dictionary with game state information
|
||||
|
||||
Returns:
|
||||
"low", "medium", or "high"
|
||||
"""
|
||||
score = 0
|
||||
|
||||
# Factor: Agent health
|
||||
alive_agents = world_state.get("alive_agents", [])
|
||||
if alive_agents:
|
||||
avg_hp = sum(a.get("hp", 100) for a in alive_agents) / len(alive_agents)
|
||||
if avg_hp < 30:
|
||||
score += 3
|
||||
elif avg_hp < 50:
|
||||
score += 2
|
||||
elif avg_hp < 70:
|
||||
score += 1
|
||||
|
||||
# Factor: Weather severity
|
||||
weather = world_state.get("weather", "").lower()
|
||||
if weather in ("stormy", "thunder"):
|
||||
score += 2
|
||||
elif weather in ("rainy",):
|
||||
score += 1
|
||||
|
||||
# Factor: Mood
|
||||
mood_avg = world_state.get("mood_avg", 50)
|
||||
if mood_avg < 30:
|
||||
score += 2
|
||||
elif mood_avg < 50:
|
||||
score += 1
|
||||
|
||||
# Factor: Recent deaths
|
||||
recent_deaths = world_state.get("recent_deaths", 0)
|
||||
score += min(recent_deaths * 2, 4)
|
||||
|
||||
# Factor: Low resources
|
||||
if world_state.get("resources_critical", False):
|
||||
score += 2
|
||||
|
||||
# Determine level
|
||||
if score >= 6:
|
||||
return "high"
|
||||
elif score >= 3:
|
||||
return "medium"
|
||||
return "low"
|
||||
|
||||
|
||||
# Global instance
|
||||
director_service = DirectorService()
|
||||
@@ -15,6 +15,8 @@ from .database import init_db, get_db_session
|
||||
from .models import User, Agent, WorldState, GameConfig, AgentRelationship
|
||||
from .llm import llm_service
|
||||
from .memory_service import memory_service
|
||||
from .director_service import DirectorService, GameMode, PlotPoint
|
||||
from .vote_manager import VoteManager, VoteOption, VoteSnapshot
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from .server import ConnectionManager
|
||||
@@ -57,6 +59,14 @@ REVIVE_COST = 10 # Casual mode cost
|
||||
INITIAL_USER_GOLD = 100
|
||||
IDLE_CHAT_PROBABILITY = 0.15
|
||||
|
||||
# =============================================================================
|
||||
# AI Director & Narrative Voting (Phase 9)
|
||||
# =============================================================================
|
||||
DIRECTOR_TRIGGER_INTERVAL = 60 # Ticks between narrative events (5 minutes at 5s/tick)
|
||||
DIRECTOR_MIN_ALIVE_AGENTS = 2 # Minimum alive agents to trigger narrative
|
||||
VOTING_DURATION_SECONDS = 60 # Duration of voting window
|
||||
VOTE_BROADCAST_INTERVAL = 1.0 # How often to broadcast vote updates
|
||||
|
||||
# =============================================================================
|
||||
# Day/Night cycle
|
||||
# =============================================================================
|
||||
@@ -136,6 +146,20 @@ class GameEngine:
|
||||
# Key: agent_id (who needs to respond), Value: {partner_id, last_text, topic, expires_at_tick}
|
||||
self._active_conversations = {}
|
||||
|
||||
# Phase 9: AI Director & Narrative Voting
|
||||
self._director = DirectorService()
|
||||
self._vote_manager = VoteManager(
|
||||
duration_seconds=VOTING_DURATION_SECONDS,
|
||||
broadcast_interval=VOTE_BROADCAST_INTERVAL,
|
||||
)
|
||||
self._game_mode = GameMode.SIMULATION
|
||||
self._last_narrative_tick = 0
|
||||
self._current_plot: PlotPoint | None = None
|
||||
self._mode_change_tick = 0 # Tick when mode changed
|
||||
|
||||
# Set up vote broadcast callback
|
||||
self._vote_manager.set_broadcast_callback(self._on_vote_update)
|
||||
|
||||
@property
|
||||
def is_running(self) -> bool:
|
||||
return self._running
|
||||
@@ -250,6 +274,222 @@ class GameEngine:
|
||||
if world:
|
||||
await self._broadcast_event(EventType.WORLD_UPDATE, world.to_dict())
|
||||
|
||||
# =========================================================================
|
||||
# AI Director & Narrative Voting (Phase 9)
|
||||
# =========================================================================
|
||||
async def _on_vote_update(self, snapshot: VoteSnapshot) -> None:
|
||||
"""Callback for broadcasting vote updates."""
|
||||
await self._broadcast_event(EventType.VOTE_UPDATE, snapshot.to_dict())
|
||||
|
||||
async def _set_game_mode(self, new_mode: GameMode, message: str = "") -> None:
|
||||
"""Switch game mode and broadcast the change."""
|
||||
old_mode = self._game_mode
|
||||
self._game_mode = new_mode
|
||||
self._mode_change_tick = self._tick_count
|
||||
|
||||
ends_at = 0.0
|
||||
if new_mode == GameMode.VOTING:
|
||||
session = self._vote_manager.current_session
|
||||
if session:
|
||||
ends_at = session.end_ts
|
||||
|
||||
await self._broadcast_event(EventType.MODE_CHANGE, {
|
||||
"mode": new_mode.value,
|
||||
"old_mode": old_mode.value,
|
||||
"message": message,
|
||||
"ends_at": ends_at,
|
||||
})
|
||||
|
||||
logger.info(f"Game mode changed: {old_mode.value} -> {new_mode.value}")
|
||||
|
||||
def _get_world_state_for_director(self) -> dict:
|
||||
"""Build world state context for the Director."""
|
||||
with get_db_session() as db:
|
||||
world = db.query(WorldState).first()
|
||||
agents = db.query(Agent).filter(Agent.status == "Alive").all()
|
||||
|
||||
alive_agents = [
|
||||
{"name": a.name, "hp": a.hp, "energy": a.energy, "mood": a.mood}
|
||||
for a in agents
|
||||
]
|
||||
|
||||
mood_avg = sum(a.mood for a in agents) / len(agents) if agents else 50
|
||||
|
||||
return {
|
||||
"day": world.day_count if world else 1,
|
||||
"weather": world.weather if world else "Sunny",
|
||||
"time_of_day": world.time_of_day if world else "day",
|
||||
"alive_agents": alive_agents,
|
||||
"mood_avg": mood_avg,
|
||||
"recent_events": [], # Could be populated from event history
|
||||
"tension_level": self._director.calculate_tension_level({
|
||||
"alive_agents": alive_agents,
|
||||
"weather": world.weather if world else "Sunny",
|
||||
"mood_avg": mood_avg,
|
||||
}),
|
||||
}
|
||||
|
||||
async def _should_trigger_narrative(self) -> bool:
|
||||
"""Check if conditions are met to trigger a narrative event."""
|
||||
# Only trigger in simulation mode
|
||||
if self._game_mode != GameMode.SIMULATION:
|
||||
return False
|
||||
|
||||
# Check tick interval
|
||||
ticks_since_last = self._tick_count - self._last_narrative_tick
|
||||
if ticks_since_last < DIRECTOR_TRIGGER_INTERVAL:
|
||||
return False
|
||||
|
||||
# Check minimum alive agents
|
||||
with get_db_session() as db:
|
||||
alive_count = db.query(Agent).filter(Agent.status == "Alive").count()
|
||||
if alive_count < DIRECTOR_MIN_ALIVE_AGENTS:
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
async def _trigger_narrative_event(self) -> None:
|
||||
"""Trigger a narrative event from the Director."""
|
||||
logger.info("Director triggering narrative event...")
|
||||
|
||||
# Switch to narrative mode
|
||||
await self._set_game_mode(GameMode.NARRATIVE, "The Director intervenes...")
|
||||
|
||||
# Generate plot point
|
||||
world_state = self._get_world_state_for_director()
|
||||
plot = await self._director.generate_plot_point(world_state)
|
||||
self._current_plot = plot
|
||||
self._last_narrative_tick = self._tick_count
|
||||
|
||||
# Broadcast narrative event
|
||||
await self._broadcast_event(EventType.NARRATIVE_PLOT, plot.to_dict())
|
||||
|
||||
logger.info(f"Narrative event: {plot.title}")
|
||||
|
||||
# Start voting session
|
||||
options = [
|
||||
VoteOption(choice_id=c.choice_id, text=c.text)
|
||||
for c in plot.choices
|
||||
]
|
||||
self._vote_manager.start_vote(options, duration_seconds=VOTING_DURATION_SECONDS)
|
||||
|
||||
# Broadcast vote started
|
||||
vote_data = self._vote_manager.get_vote_started_data()
|
||||
if vote_data:
|
||||
await self._broadcast_event(EventType.VOTE_STARTED, vote_data)
|
||||
|
||||
# Switch to voting mode
|
||||
await self._set_game_mode(
|
||||
GameMode.VOTING,
|
||||
f"Vote now! {plot.choices[0].text} or {plot.choices[1].text}"
|
||||
)
|
||||
|
||||
async def _process_voting_tick(self) -> None:
|
||||
"""Process voting phase - check if voting has ended."""
|
||||
if self._game_mode != GameMode.VOTING:
|
||||
return
|
||||
|
||||
result = self._vote_manager.maybe_finalize()
|
||||
if result:
|
||||
# Voting ended
|
||||
await self._broadcast_event(EventType.VOTE_ENDED, {
|
||||
"vote_id": result.vote_id,
|
||||
"total_votes": result.total_votes,
|
||||
})
|
||||
await self._broadcast_event(EventType.VOTE_RESULT, result.to_dict())
|
||||
|
||||
# Switch to resolution mode
|
||||
await self._set_game_mode(
|
||||
GameMode.RESOLUTION,
|
||||
f"The audience has spoken: {result.winning_choice_text}"
|
||||
)
|
||||
|
||||
# Process resolution
|
||||
await self._process_vote_result(result)
|
||||
|
||||
async def _process_vote_result(self, result) -> None:
|
||||
"""Process the voting result and apply consequences."""
|
||||
if not self._current_plot:
|
||||
logger.error("No current plot for resolution")
|
||||
await self._set_game_mode(GameMode.SIMULATION, "Returning to normal...")
|
||||
return
|
||||
|
||||
# Get resolution from Director
|
||||
world_state = self._get_world_state_for_director()
|
||||
resolution = await self._director.resolve_vote(
|
||||
plot_point=self._current_plot,
|
||||
winning_choice_id=result.winning_choice_id,
|
||||
world_state=world_state,
|
||||
)
|
||||
|
||||
# Apply effects
|
||||
await self._apply_resolution_effects(resolution.effects)
|
||||
|
||||
# Broadcast resolution
|
||||
await self._broadcast_event(EventType.RESOLUTION_APPLIED, resolution.to_dict())
|
||||
|
||||
logger.info(f"Resolution applied: {resolution.message}")
|
||||
|
||||
# Clear current plot
|
||||
self._director.clear_current_plot()
|
||||
self._current_plot = None
|
||||
|
||||
# Return to simulation after a brief pause
|
||||
await asyncio.sleep(3.0) # Let players read the resolution
|
||||
await self._set_game_mode(GameMode.SIMULATION, "The story continues...")
|
||||
|
||||
async def _apply_resolution_effects(self, effects: dict) -> None:
|
||||
"""Apply resolution effects to the game world."""
|
||||
def _coerce_int(value) -> int:
|
||||
"""Safely convert LLM output (string/float/int) to int."""
|
||||
try:
|
||||
return int(value)
|
||||
except (TypeError, ValueError):
|
||||
return 0
|
||||
|
||||
mood_delta = _coerce_int(effects.get("mood_delta", 0))
|
||||
hp_delta = _coerce_int(effects.get("hp_delta", 0))
|
||||
energy_delta = _coerce_int(effects.get("energy_delta", 0))
|
||||
|
||||
if not any([mood_delta, hp_delta, energy_delta]):
|
||||
return
|
||||
|
||||
with get_db_session() as db:
|
||||
agents = db.query(Agent).filter(Agent.status == "Alive").all()
|
||||
for agent in agents:
|
||||
if mood_delta:
|
||||
agent.mood = max(0, min(100, agent.mood + mood_delta))
|
||||
if hp_delta:
|
||||
agent.hp = max(0, min(100, agent.hp + hp_delta))
|
||||
if energy_delta:
|
||||
agent.energy = max(0, min(100, agent.energy + energy_delta))
|
||||
|
||||
logger.info(
|
||||
f"Applied resolution effects: mood={mood_delta}, "
|
||||
f"hp={hp_delta}, energy={energy_delta}"
|
||||
)
|
||||
|
||||
async def process_vote(self, voter_id: str, choice_index: int, source: str = "twitch") -> bool:
|
||||
"""
|
||||
Process a vote from Twitch or Unity.
|
||||
|
||||
Args:
|
||||
voter_id: Unique identifier for the voter
|
||||
choice_index: 0-indexed choice number
|
||||
source: Vote source ("twitch" or "unity")
|
||||
|
||||
Returns:
|
||||
True if vote was recorded
|
||||
"""
|
||||
if self._game_mode != GameMode.VOTING:
|
||||
return False
|
||||
|
||||
return self._vote_manager.cast_vote(voter_id, choice_index, source)
|
||||
|
||||
def parse_vote_command(self, message: str) -> int | None:
|
||||
"""Parse a message for vote commands. Returns choice index or None."""
|
||||
return self._vote_manager.parse_twitch_message(message)
|
||||
|
||||
# =========================================================================
|
||||
# Day/Night cycle (Phase 2)
|
||||
# =========================================================================
|
||||
@@ -1702,6 +1942,20 @@ class GameEngine:
|
||||
while self._running:
|
||||
self._tick_count += 1
|
||||
|
||||
# Phase 9: Check voting phase (always runs)
|
||||
await self._process_voting_tick()
|
||||
|
||||
# Phase 9: Check if we should trigger a narrative event
|
||||
if await self._should_trigger_narrative():
|
||||
await self._trigger_narrative_event()
|
||||
|
||||
# Skip simulation processing during narrative/voting/resolution modes
|
||||
if self._game_mode != GameMode.SIMULATION:
|
||||
await asyncio.sleep(self._tick_interval)
|
||||
continue
|
||||
|
||||
# ========== SIMULATION MODE PROCESSING ==========
|
||||
|
||||
# 1. Advance time (Phase 2)
|
||||
phase_change = await self._advance_time()
|
||||
if phase_change:
|
||||
@@ -1774,7 +2028,8 @@ class GameEngine:
|
||||
"day": day,
|
||||
"time_of_day": time_of_day,
|
||||
"weather": weather,
|
||||
"alive_agents": alive_count
|
||||
"alive_agents": alive_count,
|
||||
"game_mode": self._game_mode.value # Phase 9: Include game mode
|
||||
})
|
||||
|
||||
await asyncio.sleep(self._tick_interval)
|
||||
|
||||
@@ -66,6 +66,15 @@ class EventType(str, Enum):
|
||||
VFX_EVENT = "vfx_event" # Visual effect trigger
|
||||
GIFT_EFFECT = "gift_effect" # Twitch bits/sub effect
|
||||
|
||||
# AI Director & Narrative Voting (Phase 9)
|
||||
MODE_CHANGE = "mode_change" # Game mode transition
|
||||
NARRATIVE_PLOT = "narrative_plot" # Director generated plot point
|
||||
VOTE_STARTED = "vote_started" # Voting session started
|
||||
VOTE_UPDATE = "vote_update" # Real-time vote count update
|
||||
VOTE_ENDED = "vote_ended" # Voting closed
|
||||
VOTE_RESULT = "vote_result" # Final voting result
|
||||
RESOLUTION_APPLIED = "resolution_applied" # Plot resolution executed
|
||||
|
||||
|
||||
class GameEvent(BaseModel):
|
||||
"""
|
||||
|
||||
@@ -74,6 +74,18 @@ class TwitchBot(commands.Bot):
|
||||
# Log the message for debugging
|
||||
logger.info(f"Twitch chat [{username}]: {content}")
|
||||
|
||||
# Phase 9: Check for vote commands first (!1, !2, !A, !B)
|
||||
vote_index = self._game_engine.parse_vote_command(content)
|
||||
if vote_index is not None:
|
||||
try:
|
||||
voted = await self._game_engine.process_vote(username, vote_index, "twitch")
|
||||
if voted:
|
||||
logger.info(f"Vote recorded: {username} -> option {vote_index + 1}")
|
||||
return # Don't process as regular command
|
||||
except Exception as e:
|
||||
logger.error(f"Error processing vote: {e}")
|
||||
return
|
||||
|
||||
# Forward to game engine for command processing
|
||||
try:
|
||||
await self._game_engine.process_command(username, content)
|
||||
|
||||
445
backend/app/vote_manager.py
Normal file
445
backend/app/vote_manager.py
Normal file
@@ -0,0 +1,445 @@
|
||||
"""
|
||||
Vote Manager - Audience Voting System (Phase 9).
|
||||
|
||||
Manages voting sessions for narrative decisions,
|
||||
supporting both Twitch chat commands and Unity client votes.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
import re
|
||||
import time
|
||||
import uuid
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Any, Callable, Awaitable
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class VoteOption:
|
||||
"""A voting option."""
|
||||
choice_id: str
|
||||
text: str
|
||||
|
||||
|
||||
@dataclass
|
||||
class VoteSession:
|
||||
"""An active voting session."""
|
||||
vote_id: str
|
||||
options: list[VoteOption]
|
||||
start_ts: float
|
||||
end_ts: float
|
||||
duration_seconds: int = 60 # Store actual duration for this session
|
||||
votes_by_user: dict[str, int] = field(default_factory=dict) # user_id -> choice_index
|
||||
tallies: list[int] = field(default_factory=list) # vote count per option
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class VoteSnapshot:
|
||||
"""Real-time voting statistics snapshot."""
|
||||
vote_id: str
|
||||
tallies: list[int]
|
||||
percentages: list[float]
|
||||
total_votes: int
|
||||
remaining_seconds: float
|
||||
ends_at: float
|
||||
|
||||
def to_dict(self) -> dict[str, Any]:
|
||||
"""Convert to dictionary for broadcasting."""
|
||||
return {
|
||||
"vote_id": self.vote_id,
|
||||
"tallies": self.tallies,
|
||||
"percentages": self.percentages,
|
||||
"total_votes": self.total_votes,
|
||||
"remaining_seconds": max(0, self.remaining_seconds),
|
||||
"ends_at": self.ends_at,
|
||||
}
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class VoteResult:
|
||||
"""Final voting result after session ends."""
|
||||
vote_id: str
|
||||
winning_choice_id: str
|
||||
winning_choice_text: str
|
||||
winning_index: int
|
||||
tallies: list[int]
|
||||
percentages: list[float]
|
||||
total_votes: int
|
||||
is_tie: bool = False
|
||||
|
||||
def to_dict(self) -> dict[str, Any]:
|
||||
"""Convert to dictionary for broadcasting."""
|
||||
return {
|
||||
"vote_id": self.vote_id,
|
||||
"winning_choice_id": self.winning_choice_id,
|
||||
"winning_choice_text": self.winning_choice_text,
|
||||
"winning_index": self.winning_index,
|
||||
"tallies": self.tallies,
|
||||
"percentages": self.percentages,
|
||||
"total_votes": self.total_votes,
|
||||
"is_tie": self.is_tie,
|
||||
}
|
||||
|
||||
|
||||
# Twitch command patterns
|
||||
VOTE_PATTERN_NUMERIC = re.compile(r"^!([1-9])$") # !1, !2, etc.
|
||||
VOTE_PATTERN_ALPHA = re.compile(r"^!([AaBb])$") # !A, !B, etc.
|
||||
|
||||
|
||||
class VoteManager:
|
||||
"""
|
||||
Manages voting sessions with dual-channel support (Twitch + Unity).
|
||||
|
||||
Features:
|
||||
- Real-time vote counting
|
||||
- Vote changing (users can change their vote)
|
||||
- Automatic session expiration
|
||||
- Periodic snapshot broadcasting
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
duration_seconds: int = 60,
|
||||
broadcast_interval: float = 1.0,
|
||||
) -> None:
|
||||
"""
|
||||
Initialize the vote manager.
|
||||
|
||||
Args:
|
||||
duration_seconds: Default voting window duration
|
||||
broadcast_interval: How often to broadcast vote updates (seconds)
|
||||
"""
|
||||
self._duration_seconds = duration_seconds
|
||||
self._broadcast_interval = broadcast_interval
|
||||
self._current: VoteSession | None = None
|
||||
self._broadcast_callback: Callable[[VoteSnapshot], Awaitable[None]] | None = None
|
||||
self._broadcast_task: asyncio.Task | None = None
|
||||
|
||||
@property
|
||||
def is_voting_active(self) -> bool:
|
||||
"""Check if a voting session is currently active."""
|
||||
if not self._current:
|
||||
return False
|
||||
return time.time() < self._current.end_ts
|
||||
|
||||
@property
|
||||
def current_session(self) -> VoteSession | None:
|
||||
"""Get the current voting session."""
|
||||
return self._current
|
||||
|
||||
def set_broadcast_callback(
|
||||
self,
|
||||
callback: Callable[[VoteSnapshot], Awaitable[None]],
|
||||
) -> None:
|
||||
"""
|
||||
Set the callback for broadcasting vote updates.
|
||||
|
||||
Args:
|
||||
callback: Async function that receives VoteSnapshot and broadcasts it
|
||||
"""
|
||||
self._broadcast_callback = callback
|
||||
|
||||
def start_vote(
|
||||
self,
|
||||
options: list[VoteOption],
|
||||
duration_seconds: int | None = None,
|
||||
now: float | None = None,
|
||||
) -> VoteSession:
|
||||
"""
|
||||
Start a new voting session.
|
||||
|
||||
Args:
|
||||
options: List of voting options (minimum 2)
|
||||
duration_seconds: Override default duration
|
||||
now: Override current timestamp (for testing)
|
||||
|
||||
Returns:
|
||||
The created VoteSession
|
||||
"""
|
||||
if len(options) < 2:
|
||||
raise ValueError("Voting requires at least 2 options")
|
||||
|
||||
now = now or time.time()
|
||||
duration = duration_seconds or self._duration_seconds
|
||||
|
||||
session = VoteSession(
|
||||
vote_id=uuid.uuid4().hex,
|
||||
options=options,
|
||||
start_ts=now,
|
||||
end_ts=now + duration,
|
||||
duration_seconds=duration,
|
||||
tallies=[0 for _ in options],
|
||||
)
|
||||
self._current = session
|
||||
|
||||
# Start broadcast loop
|
||||
if self._broadcast_callback:
|
||||
self._start_broadcast_loop()
|
||||
|
||||
logger.info(
|
||||
f"Vote started: {session.vote_id} with {len(options)} options, "
|
||||
f"duration={duration}s"
|
||||
)
|
||||
|
||||
return session
|
||||
|
||||
def _start_broadcast_loop(self) -> None:
|
||||
"""Start the periodic broadcast task."""
|
||||
if self._broadcast_task and not self._broadcast_task.done():
|
||||
self._broadcast_task.cancel()
|
||||
|
||||
async def broadcast_loop():
|
||||
try:
|
||||
while self.is_voting_active:
|
||||
snapshot = self.snapshot()
|
||||
if snapshot and self._broadcast_callback:
|
||||
try:
|
||||
await self._broadcast_callback(snapshot)
|
||||
except Exception as e:
|
||||
logger.error(f"Broadcast callback error: {e}")
|
||||
await asyncio.sleep(self._broadcast_interval)
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
|
||||
self._broadcast_task = asyncio.create_task(broadcast_loop())
|
||||
|
||||
def parse_twitch_message(self, content: str) -> int | None:
|
||||
"""
|
||||
Parse a Twitch chat message for vote commands.
|
||||
|
||||
Supported formats:
|
||||
- !1, !2, !3, etc. (1-indexed, converted to 0-indexed)
|
||||
- !A, !B (converted to 0, 1)
|
||||
|
||||
Args:
|
||||
content: The chat message content
|
||||
|
||||
Returns:
|
||||
Choice index (0-indexed) or None if not a vote command
|
||||
"""
|
||||
text = content.strip()
|
||||
|
||||
# Try numeric pattern first
|
||||
match = VOTE_PATTERN_NUMERIC.match(text)
|
||||
if match:
|
||||
return int(match.group(1)) - 1 # Convert to 0-indexed
|
||||
|
||||
# Try alphabetic pattern
|
||||
match = VOTE_PATTERN_ALPHA.match(text)
|
||||
if match:
|
||||
letter = match.group(1).upper()
|
||||
return ord(letter) - ord('A') # A=0, B=1
|
||||
|
||||
return None
|
||||
|
||||
def cast_vote(
|
||||
self,
|
||||
voter_id: str,
|
||||
choice_index: int,
|
||||
source: str = "twitch",
|
||||
) -> bool:
|
||||
"""
|
||||
Record a vote from a user.
|
||||
|
||||
Users can change their vote - the previous vote is subtracted
|
||||
and the new vote is added.
|
||||
|
||||
Args:
|
||||
voter_id: Unique identifier for the voter
|
||||
choice_index: 0-indexed choice number
|
||||
source: Vote source ("twitch" or "unity")
|
||||
|
||||
Returns:
|
||||
True if vote was recorded, False if invalid or session ended
|
||||
"""
|
||||
if not self._current:
|
||||
logger.debug(f"Vote rejected: no active session (voter={voter_id})")
|
||||
return False
|
||||
|
||||
if time.time() > self._current.end_ts:
|
||||
logger.debug(f"Vote rejected: session ended (voter={voter_id})")
|
||||
return False
|
||||
|
||||
if choice_index < 0 or choice_index >= len(self._current.options):
|
||||
logger.debug(
|
||||
f"Vote rejected: invalid choice {choice_index} "
|
||||
f"(voter={voter_id}, max={len(self._current.options)-1})"
|
||||
)
|
||||
return False
|
||||
|
||||
# Normalize voter ID (Twitch usernames are case-insensitive)
|
||||
normalized_voter_id = voter_id.strip().lower()
|
||||
if not normalized_voter_id:
|
||||
logger.debug("Vote rejected: empty voter id")
|
||||
return False
|
||||
|
||||
# Handle vote change - subtract previous vote
|
||||
previous = self._current.votes_by_user.get(normalized_voter_id)
|
||||
if previous is not None:
|
||||
if previous == choice_index:
|
||||
# Same vote, no change needed
|
||||
return True
|
||||
# Subtract old vote
|
||||
self._current.tallies[previous] = max(
|
||||
0, self._current.tallies[previous] - 1
|
||||
)
|
||||
logger.debug(f"Vote changed: {normalized_voter_id} from {previous} to {choice_index}")
|
||||
|
||||
# Record new vote
|
||||
self._current.votes_by_user[normalized_voter_id] = choice_index
|
||||
self._current.tallies[choice_index] += 1
|
||||
|
||||
logger.debug(
|
||||
f"Vote cast: {voter_id} -> {choice_index} "
|
||||
f"(source={source}, tallies={self._current.tallies})"
|
||||
)
|
||||
|
||||
return True
|
||||
|
||||
def snapshot(self, now: float | None = None) -> VoteSnapshot | None:
|
||||
"""
|
||||
Generate a real-time snapshot of current voting status.
|
||||
|
||||
Args:
|
||||
now: Override current timestamp (for testing)
|
||||
|
||||
Returns:
|
||||
VoteSnapshot or None if no active session
|
||||
"""
|
||||
if not self._current:
|
||||
return None
|
||||
|
||||
now = now or time.time()
|
||||
tallies = list(self._current.tallies)
|
||||
total = sum(tallies)
|
||||
|
||||
# Calculate percentages
|
||||
if total > 0:
|
||||
percentages = [round((t / total) * 100, 1) for t in tallies]
|
||||
else:
|
||||
percentages = [0.0 for _ in tallies]
|
||||
|
||||
return VoteSnapshot(
|
||||
vote_id=self._current.vote_id,
|
||||
tallies=tallies,
|
||||
percentages=percentages,
|
||||
total_votes=total,
|
||||
remaining_seconds=self._current.end_ts - now,
|
||||
ends_at=self._current.end_ts,
|
||||
)
|
||||
|
||||
def maybe_finalize(self, now: float | None = None) -> VoteResult | None:
|
||||
"""
|
||||
Check if voting has ended and finalize results.
|
||||
|
||||
Args:
|
||||
now: Override current timestamp (for testing)
|
||||
|
||||
Returns:
|
||||
VoteResult if voting ended, None if still active
|
||||
"""
|
||||
if not self._current:
|
||||
return None
|
||||
|
||||
now = now or time.time()
|
||||
if now < self._current.end_ts:
|
||||
return None
|
||||
|
||||
# Cancel broadcast task
|
||||
if self._broadcast_task and not self._broadcast_task.done():
|
||||
self._broadcast_task.cancel()
|
||||
|
||||
# Calculate final results
|
||||
tallies = list(self._current.tallies)
|
||||
total = sum(tallies)
|
||||
|
||||
# Calculate percentages
|
||||
if total > 0:
|
||||
percentages = [round((t / total) * 100, 1) for t in tallies]
|
||||
else:
|
||||
percentages = [0.0 for _ in tallies]
|
||||
|
||||
# Find winner
|
||||
if tallies:
|
||||
max_votes = max(tallies)
|
||||
winners = [i for i, t in enumerate(tallies) if t == max_votes]
|
||||
is_tie = len(winners) > 1
|
||||
|
||||
# In case of tie, choose randomly (or could defer to Director)
|
||||
import random
|
||||
winning_index = random.choice(winners) if is_tie else winners[0]
|
||||
else:
|
||||
winning_index = 0
|
||||
is_tie = False
|
||||
|
||||
winning_option = self._current.options[winning_index]
|
||||
|
||||
result = VoteResult(
|
||||
vote_id=self._current.vote_id,
|
||||
winning_choice_id=winning_option.choice_id,
|
||||
winning_choice_text=winning_option.text,
|
||||
winning_index=winning_index,
|
||||
tallies=tallies,
|
||||
percentages=percentages,
|
||||
total_votes=total,
|
||||
is_tie=is_tie,
|
||||
)
|
||||
|
||||
logger.info(
|
||||
f"Vote finalized: {result.vote_id} "
|
||||
f"winner={result.winning_choice_id} ({result.winning_choice_text}) "
|
||||
f"votes={result.tallies} tie={result.is_tie}"
|
||||
)
|
||||
|
||||
# Clear current session
|
||||
self._current = None
|
||||
|
||||
return result
|
||||
|
||||
def cancel_vote(self) -> bool:
|
||||
"""
|
||||
Cancel the current voting session.
|
||||
|
||||
Returns:
|
||||
True if a session was cancelled, False if no active session
|
||||
"""
|
||||
if not self._current:
|
||||
return False
|
||||
|
||||
if self._broadcast_task and not self._broadcast_task.done():
|
||||
self._broadcast_task.cancel()
|
||||
|
||||
vote_id = self._current.vote_id
|
||||
self._current = None
|
||||
|
||||
logger.info(f"Vote cancelled: {vote_id}")
|
||||
return True
|
||||
|
||||
def get_vote_started_data(self) -> dict[str, Any] | None:
|
||||
"""
|
||||
Get data for VOTE_STARTED event.
|
||||
|
||||
Returns:
|
||||
Dictionary with vote session info, or None if no active session
|
||||
"""
|
||||
if not self._current:
|
||||
return None
|
||||
|
||||
return {
|
||||
"vote_id": self._current.vote_id,
|
||||
"choices": [
|
||||
{"choice_id": o.choice_id, "text": o.text}
|
||||
for o in self._current.options
|
||||
],
|
||||
"duration_seconds": self._current.duration_seconds,
|
||||
"ends_at": self._current.end_ts,
|
||||
"source": "director",
|
||||
}
|
||||
|
||||
|
||||
# Global instance
|
||||
vote_manager = VoteManager()
|
||||
Reference in New Issue
Block a user