feat: initialize Annual Gala Interactive System monorepo

- Set up pnpm workspace with 4 packages: shared, server, client-mobile, client-screen
- Implement Redis atomic voting with Lua scripts (HINCRBY + distributed lock)
- Add optimistic UI with IndexedDB queue for offline resilience
- Configure Socket.io with auto-reconnection (infinite retries)
- Separate mobile (Vant) and big screen (Pixi.js) dependencies

Tech stack:
- Frontend Mobile: Vue 3 + Vant + Socket.io-client
- Frontend Screen: Vue 3 + Pixi.js + GSAP
- Backend: Express + Socket.io + Redis + Prisma/MySQL

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
empty
2026-01-15 01:19:36 +08:00
commit e7397d22a9
74 changed files with 14088 additions and 0 deletions

View File

@@ -0,0 +1,21 @@
# Server Environment Variables
# Server
PORT=3000
NODE_ENV=development
# CORS (comma-separated origins)
CORS_ORIGINS=http://localhost:5173,http://localhost:5174
# Redis
REDIS_HOST=localhost
REDIS_PORT=6379
REDIS_PASSWORD=
REDIS_DB=0
# MySQL (Prisma)
DATABASE_URL="mysql://root:password@localhost:3306/gala"
# JWT
JWT_SECRET=your-super-secret-jwt-key-change-in-production
JWT_EXPIRES_IN=24h

View File

@@ -0,0 +1,44 @@
{
"name": "@gala/server",
"version": "1.0.0",
"private": true,
"type": "module",
"scripts": {
"dev": "tsx watch src/index.ts",
"build": "tsup src/index.ts --format esm --target node20 --clean",
"start": "node dist/index.js",
"typecheck": "tsc --noEmit",
"db:generate": "prisma generate",
"db:migrate": "prisma migrate dev",
"db:push": "prisma db push",
"db:seed": "tsx src/scripts/seed.ts",
"test": "vitest",
"test:load": "echo 'Load tests not configured yet'"
},
"dependencies": {
"@gala/shared": "workspace:*",
"express": "^4.21.2",
"socket.io": "^4.8.1",
"@socket.io/redis-adapter": "^8.3.0",
"ioredis": "^5.4.2",
"@prisma/client": "^6.2.1",
"zod": "^3.24.1",
"cors": "^2.8.5",
"helmet": "^8.0.0",
"compression": "^1.7.5",
"dotenv": "^16.4.7",
"pino": "^9.6.0",
"pino-pretty": "^13.0.0",
"nanoid": "^5.0.9"
},
"devDependencies": {
"@types/express": "^5.0.0",
"@types/cors": "^2.8.17",
"@types/compression": "^1.7.5",
"prisma": "^6.2.1",
"tsx": "^4.19.2",
"tsup": "^8.3.5",
"typescript": "^5.7.3",
"vitest": "^2.1.8"
}
}

View File

@@ -0,0 +1,115 @@
generator client {
provider = "prisma-client-js"
}
datasource db {
provider = "mysql"
url = env("DATABASE_URL")
}
// Users table
model User {
id String @id @default(cuid())
name String @db.VarChar(100)
department String @db.VarChar(100)
avatar String? @db.VarChar(512)
birthYear Int? @map("birth_year")
zodiac String? @db.VarChar(20)
isActive Boolean @default(true) @map("is_active")
createdAt DateTime @default(now()) @map("created_at")
updatedAt DateTime @updatedAt @map("updated_at")
votes Vote[]
drawResults DrawResult[]
@@map("users")
}
// Candidates for voting
model Candidate {
id String @id @default(cuid())
name String @db.VarChar(100)
department String @db.VarChar(100)
avatar String? @db.VarChar(512)
description String? @db.Text
category String @db.VarChar(50)
isActive Boolean @default(true) @map("is_active")
createdAt DateTime @default(now()) @map("created_at")
votes Vote[]
voteCounts VoteCount[]
@@index([category])
@@map("candidates")
}
// Individual votes
model Vote {
id String @id @default(cuid())
userId String @map("user_id")
candidateId String @map("candidate_id")
category String @db.VarChar(50)
localId String? @map("local_id") @db.VarChar(64)
createdAt DateTime @default(now()) @map("created_at")
user User @relation(fields: [userId], references: [id])
candidate Candidate @relation(fields: [candidateId], references: [id])
@@unique([userId, category])
@@index([category, candidateId])
@@index([createdAt])
@@map("votes")
}
// Aggregated vote counts (denormalized for performance)
model VoteCount {
id String @id @default(cuid())
candidateId String @map("candidate_id")
category String @db.VarChar(50)
count Int @default(0)
updatedAt DateTime @updatedAt @map("updated_at")
candidate Candidate @relation(fields: [candidateId], references: [id])
@@unique([candidateId, category])
@@index([category, count(sort: Desc)])
@@map("vote_counts")
}
// Lucky draw results
model DrawResult {
id String @id @default(cuid())
drawId String @map("draw_id")
prizeLevel String @map("prize_level") @db.VarChar(20)
prizeName String @map("prize_name") @db.VarChar(100)
winnerId String @map("winner_id")
winnerName String @map("winner_name") @db.VarChar(100)
winnerDepartment String @map("winner_department") @db.VarChar(100)
drawnAt DateTime @default(now()) @map("drawn_at")
drawnBy String @map("drawn_by") @db.VarChar(100)
winner User @relation(fields: [winnerId], references: [id])
@@index([drawId])
@@index([prizeLevel])
@@index([winnerId])
@@map("draw_results")
}
// Draw sessions
model DrawSession {
id String @id @default(cuid())
prizeLevel String @map("prize_level") @db.VarChar(20)
prizeName String @map("prize_name") @db.VarChar(100)
totalPrizes Int @map("total_prizes")
drawnCount Int @default(0) @map("drawn_count")
isActive Boolean @default(false) @map("is_active")
filters Json?
startedAt DateTime? @map("started_at")
endedAt DateTime? @map("ended_at")
createdAt DateTime @default(now()) @map("created_at")
@@index([prizeLevel])
@@index([isActive])
@@map("draw_sessions")
}

View File

@@ -0,0 +1,52 @@
import express, { Application } from 'express';
import cors from 'cors';
import helmet from 'helmet';
import compression from 'compression';
import { config } from './config';
import { logger } from './utils/logger';
import { errorHandler } from './middleware/errorHandler';
import { requestLogger } from './middleware/requestLogger';
import voteRoutes from './routes/vote.routes';
import adminRoutes from './routes/admin.routes';
export const app: Application = express();
// Security middleware
app.use(helmet());
// CORS
app.use(
cors({
origin: config.corsOrigins,
credentials: true,
})
);
// Compression
app.use(compression());
// Body parsing
app.use(express.json({ limit: '1mb' }));
app.use(express.urlencoded({ extended: true }));
// Request logging
app.use(requestLogger);
// Health check
app.get('/health', (_req, res) => {
res.json({ status: 'ok', timestamp: new Date().toISOString() });
});
// API routes
app.use('/api/vote', voteRoutes);
app.use('/api/admin', adminRoutes);
// 404 handler
app.use((_req, res) => {
res.status(404).json({ error: 'Not Found' });
});
// Error handler
app.use(errorHandler);
export { logger };

View File

@@ -0,0 +1,47 @@
import 'dotenv/config';
export const config = {
// Server
port: parseInt(process.env.PORT || '3000', 10),
nodeEnv: process.env.NODE_ENV || 'development',
isDev: process.env.NODE_ENV !== 'production',
// CORS
corsOrigins: process.env.CORS_ORIGINS?.split(',') || ['http://localhost:5173', 'http://localhost:5174'],
// Redis
redis: {
host: process.env.REDIS_HOST || 'localhost',
port: parseInt(process.env.REDIS_PORT || '6379', 10),
password: process.env.REDIS_PASSWORD || undefined,
db: parseInt(process.env.REDIS_DB || '0', 10),
},
// MySQL (via Prisma)
databaseUrl: process.env.DATABASE_URL || 'mysql://root:password@localhost:3306/gala',
// JWT (for session tokens)
jwtSecret: process.env.JWT_SECRET || 'dev-secret-change-in-production',
jwtExpiresIn: process.env.JWT_EXPIRES_IN || '24h',
// Socket.io
socket: {
pingTimeout: 10000,
pingInterval: 5000,
maxHttpBufferSize: 1e6, // 1MB
},
// Voting
voting: {
maxVotesPerUser: 7,
lockTtlMs: 5000,
},
// Sync
sync: {
batchSize: 100,
intervalMs: 1000,
},
} as const;
export type Config = typeof config;

View File

@@ -0,0 +1,63 @@
import Redis from 'ioredis';
import { config } from './index';
import { logger } from '../utils/logger';
export const redis = new Redis({
host: config.redis.host,
port: config.redis.port,
password: config.redis.password,
db: config.redis.db,
maxRetriesPerRequest: 3,
retryStrategy(times) {
const delay = Math.min(times * 100, 3000);
logger.warn({ times, delay }, 'Redis connection retry');
return delay;
},
lazyConnect: true,
});
// Connection event handlers
redis.on('connect', () => {
logger.info('Redis connected');
});
redis.on('ready', () => {
logger.info('Redis ready');
});
redis.on('error', (err) => {
logger.error({ err }, 'Redis error');
});
redis.on('close', () => {
logger.warn('Redis connection closed');
});
redis.on('reconnecting', () => {
logger.info('Redis reconnecting...');
});
/**
* Connect to Redis
*/
export async function connectRedis(): Promise<void> {
try {
await redis.connect();
// Test connection
await redis.ping();
logger.info('Redis connection established');
} catch (error) {
logger.error({ error }, 'Failed to connect to Redis');
throw error;
}
}
/**
* Disconnect from Redis
*/
export async function disconnectRedis(): Promise<void> {
await redis.quit();
logger.info('Redis disconnected');
}
export { Redis };

View File

@@ -0,0 +1,55 @@
import { createServer } from 'http';
import { app, logger } from './app';
import { config } from './config';
import { connectRedis } from './config/redis';
import { initializeSocket } from './socket';
import { loadLuaScripts } from './services/vote.service';
async function main(): Promise<void> {
try {
// Connect to Redis
logger.info('Connecting to Redis...');
await connectRedis();
// Load Lua scripts
logger.info('Loading Lua scripts...');
await loadLuaScripts();
// Create HTTP server
const httpServer = createServer(app);
// Initialize Socket.io
logger.info('Initializing Socket.io...');
await initializeSocket(httpServer);
// Start server
httpServer.listen(config.port, () => {
logger.info({ port: config.port, env: config.nodeEnv }, 'Server started');
logger.info(`Health check: http://localhost:${config.port}/health`);
});
// Graceful shutdown
const shutdown = async (signal: string) => {
logger.info({ signal }, 'Shutdown signal received');
httpServer.close(() => {
logger.info('HTTP server closed');
process.exit(0);
});
// Force exit after 10 seconds
setTimeout(() => {
logger.error('Forced shutdown after timeout');
process.exit(1);
}, 10000);
};
process.on('SIGTERM', () => shutdown('SIGTERM'));
process.on('SIGINT', () => shutdown('SIGINT'));
} catch (error) {
logger.error({ error }, 'Failed to start server');
process.exit(1);
}
}
main();

View File

@@ -0,0 +1,11 @@
-- check_user_votes.lua
-- Check which categories a user has voted in
--
-- KEYS[1] = vote:user:{user_id}:categories
-- Returns: Set of category IDs the user has voted in
local user_categories_key = KEYS[1]
local categories = redis.call('SMEMBERS', user_categories_key)
return cjson.encode(categories)

View File

@@ -0,0 +1,21 @@
-- get_category_results.lua
-- Get top candidates for a category from leaderboard
--
-- KEYS[1] = leaderboard:{category}
-- ARGV[1] = limit (top N results)
local leaderboard_key = KEYS[1]
local limit = tonumber(ARGV[1]) or 10
-- Get top candidates with scores (descending order)
local results = redis.call('ZREVRANGE', leaderboard_key, 0, limit - 1, 'WITHSCORES')
local formatted = {}
for i = 1, #results, 2 do
table.insert(formatted, {
candidate_id = results[i],
vote_count = tonumber(results[i + 1])
})
end
return cjson.encode(formatted)

View File

@@ -0,0 +1,98 @@
-- vote_submit.lua
-- Atomic vote submission with distributed locking
--
-- KEYS[1] = vote:count:{category}
-- KEYS[2] = vote:user:{user_id}:categories
-- KEYS[3] = vote:category:{category}:voters
-- KEYS[4] = leaderboard:{category}
-- KEYS[5] = sync:queue:votes
-- KEYS[6] = lock:vote:{user_id}:{category}
--
-- ARGV[1] = candidate_id
-- ARGV[2] = user_id
-- ARGV[3] = category
-- ARGV[4] = timestamp
-- ARGV[5] = local_id (client UUID)
-- ARGV[6] = lock_ttl_ms
-- ARGV[7] = max_categories
local vote_count_key = KEYS[1]
local user_categories_key = KEYS[2]
local category_voters_key = KEYS[3]
local leaderboard_key = KEYS[4]
local sync_queue_key = KEYS[5]
local lock_key = KEYS[6]
local candidate_id = ARGV[1]
local user_id = ARGV[2]
local category = ARGV[3]
local timestamp = ARGV[4]
local local_id = ARGV[5]
local lock_ttl = tonumber(ARGV[6])
local max_categories = tonumber(ARGV[7])
-- Step 1: Acquire distributed lock (prevent concurrent double-vote attempts)
local lock_acquired = redis.call('SET', lock_key, timestamp, 'NX', 'PX', lock_ttl)
if not lock_acquired then
return cjson.encode({
success = false,
error = 'LOCK_FAILED',
message = 'Another vote operation in progress'
})
end
-- Step 2: Check if user already voted in this category
local already_voted = redis.call('SISMEMBER', category_voters_key, user_id)
if already_voted == 1 then
redis.call('DEL', lock_key)
return cjson.encode({
success = false,
error = 'ALREADY_VOTED',
message = 'User already voted in this category'
})
end
-- Step 3: Check if user has exceeded max votes (7 categories)
local user_vote_count = redis.call('SCARD', user_categories_key)
if user_vote_count >= max_categories then
redis.call('DEL', lock_key)
return cjson.encode({
success = false,
error = 'MAX_VOTES_REACHED',
message = 'User has voted in all categories'
})
end
-- Step 4: Perform atomic vote operations
-- 4a: Increment vote count for candidate
local new_count = redis.call('HINCRBY', vote_count_key, candidate_id, 1)
-- 4b: Add category to user's voted categories
redis.call('SADD', user_categories_key, category)
-- 4c: Add user to category's voters
redis.call('SADD', category_voters_key, user_id)
-- 4d: Update leaderboard (sorted set)
redis.call('ZINCRBY', leaderboard_key, 1, candidate_id)
-- Step 5: Queue for MySQL sync
local vote_record = cjson.encode({
user_id = user_id,
category = category,
candidate_id = candidate_id,
timestamp = timestamp,
local_id = local_id
})
redis.call('RPUSH', sync_queue_key, vote_record)
-- Step 6: Release lock
redis.call('DEL', lock_key)
-- Return success with new count
return cjson.encode({
success = true,
candidate_id = candidate_id,
new_count = new_count,
user_total_votes = user_vote_count + 1
})

View File

@@ -0,0 +1,32 @@
import type { Request, Response, NextFunction } from 'express';
import { logger } from '../utils/logger';
export interface AppError extends Error {
statusCode?: number;
code?: string;
}
export function errorHandler(
err: AppError,
_req: Request,
res: Response,
_next: NextFunction
): void {
const statusCode = err.statusCode || 500;
const message = err.message || 'Internal Server Error';
logger.error(
{
err,
statusCode,
code: err.code,
},
'Request error'
);
res.status(statusCode).json({
success: false,
error: err.code || 'INTERNAL_ERROR',
message: statusCode === 500 ? 'Internal Server Error' : message,
});
}

View File

@@ -0,0 +1,40 @@
import type { Request, Response, NextFunction } from 'express';
import { logger } from '../utils/logger';
import { nanoid } from 'nanoid';
export function requestLogger(req: Request, res: Response, next: NextFunction): void {
const requestId = nanoid(10);
const startTime = Date.now();
// Attach request ID
req.headers['x-request-id'] = requestId;
// Log request
logger.info(
{
requestId,
method: req.method,
url: req.url,
ip: req.ip,
userAgent: req.get('user-agent'),
},
'Incoming request'
);
// Log response on finish
res.on('finish', () => {
const duration = Date.now() - startTime;
logger.info(
{
requestId,
method: req.method,
url: req.url,
statusCode: res.statusCode,
duration,
},
'Request completed'
);
});
next();
}

View File

@@ -0,0 +1,57 @@
import { Router, IRouter } from 'express';
const router: IRouter = Router();
/**
* GET /api/admin/stats
* Get system statistics
*/
router.get('/stats', async (_req, res, next) => {
try {
// TODO: Implement admin stats
return res.json({
success: true,
data: {
totalUsers: 0,
totalVotes: 0,
activeConnections: 0,
},
});
} catch (error) {
next(error);
}
});
/**
* POST /api/admin/draw/start
* Start a lucky draw
*/
router.post('/draw/start', async (_req, res, next) => {
try {
// TODO: Implement draw start
return res.json({
success: true,
message: 'Draw started',
});
} catch (error) {
next(error);
}
});
/**
* POST /api/admin/draw/stop
* Stop the current draw
*/
router.post('/draw/stop', async (_req, res, next) => {
try {
// TODO: Implement draw stop
return res.json({
success: true,
message: 'Draw stopped',
});
} catch (error) {
next(error);
}
});
export default router;

View File

@@ -0,0 +1,150 @@
import { Router, IRouter } from 'express';
import { voteService } from '../services/vote.service';
import { voteSubmitSchema } from '@gala/shared/utils';
import { VOTE_CATEGORIES } from '@gala/shared/types';
import type { VoteCategory } from '@gala/shared/types';
const router: IRouter = Router();
/**
* POST /api/vote/submit
* Submit a vote (HTTP fallback for WebSocket)
*/
router.post('/submit', async (req, res, next) => {
try {
// TODO: Get userId from auth middleware
const userId = req.headers['x-user-id'] as string;
if (!userId) {
return res.status(401).json({
success: false,
error: 'UNAUTHORIZED',
message: 'User ID required',
});
}
// Validate input
const parseResult = voteSubmitSchema.safeParse(req.body);
if (!parseResult.success) {
return res.status(400).json({
success: false,
error: 'INVALID_INPUT',
message: parseResult.error.message,
});
}
const { candidateId, category, localId } = parseResult.data;
const result = await voteService.submitVote(
userId,
category as VoteCategory,
candidateId,
localId
);
if (!result.success) {
const statusCodes: Record<string, number> = {
ALREADY_VOTED: 409,
MAX_VOTES_REACHED: 403,
LOCK_FAILED: 503,
INTERNAL_ERROR: 500,
};
return res.status(statusCodes[result.error!] || 500).json({
success: false,
error: result.error,
message: result.message,
});
}
return res.json({
success: true,
data: {
candidateId: result.candidate_id,
newCount: result.new_count,
userTotalVotes: result.user_total_votes,
},
});
} catch (error) {
next(error);
}
});
/**
* GET /api/vote/results/:category
* Get results for a specific category
*/
router.get('/results/:category', async (req, res, next) => {
try {
const { category } = req.params;
const limit = parseInt(req.query.limit as string, 10) || 10;
if (!VOTE_CATEGORIES.includes(category as VoteCategory)) {
return res.status(400).json({
success: false,
error: 'INVALID_CATEGORY',
message: 'Invalid vote category',
});
}
const results = await voteService.getCategoryResults(category as VoteCategory, limit);
return res.json({
success: true,
data: {
category,
results,
},
});
} catch (error) {
next(error);
}
});
/**
* GET /api/vote/results
* Get results for all categories
*/
router.get('/results', async (_req, res, next) => {
try {
const results = await voteService.getAllResults(VOTE_CATEGORIES as unknown as VoteCategory[]);
return res.json({
success: true,
data: results,
});
} catch (error) {
next(error);
}
});
/**
* GET /api/vote/status
* Get user's vote status
*/
router.get('/status', async (req, res, next) => {
try {
const userId = req.headers['x-user-id'] as string;
if (!userId) {
return res.status(401).json({
success: false,
error: 'UNAUTHORIZED',
message: 'User ID required',
});
}
const votedCategories = await voteService.getUserVotedCategories(userId);
return res.json({
success: true,
data: {
userId,
votedCategories,
remainingVotes: 7 - votedCategories.length,
},
});
} catch (error) {
next(error);
}
});
export default router;

View File

@@ -0,0 +1,222 @@
import { redis } from '../config/redis';
import { config } from '../config';
import { logger } from '../utils/logger';
import { REDIS_KEYS } from '@gala/shared/constants';
import type { VoteCategory } from '@gala/shared/types';
import { readFileSync } from 'fs';
import { join, dirname } from 'path';
import { fileURLToPath } from 'url';
const __dirname = dirname(fileURLToPath(import.meta.url));
// Load Lua scripts
const luaScripts = {
voteSubmit: readFileSync(join(__dirname, '../lua/vote_submit.lua'), 'utf-8'),
getCategoryResults: readFileSync(join(__dirname, '../lua/get_category_results.lua'), 'utf-8'),
checkUserVotes: readFileSync(join(__dirname, '../lua/check_user_votes.lua'), 'utf-8'),
};
// Script SHA cache
let scriptShas: Record<string, string> = {};
/**
* Load Lua scripts into Redis
*/
export async function loadLuaScripts(): Promise<void> {
try {
const [voteSubmitSha, getCategoryResultsSha, checkUserVotesSha] = await Promise.all([
redis.script('LOAD', luaScripts.voteSubmit),
redis.script('LOAD', luaScripts.getCategoryResults),
redis.script('LOAD', luaScripts.checkUserVotes),
]);
scriptShas = {
voteSubmit: voteSubmitSha as string,
getCategoryResults: getCategoryResultsSha as string,
checkUserVotes: checkUserVotesSha as string,
};
logger.info('Lua scripts loaded successfully');
} catch (error) {
logger.error({ error }, 'Failed to load Lua scripts');
throw error;
}
}
// ============================================================================
// Vote Result Types
// ============================================================================
interface VoteSubmitResult {
success: boolean;
error?: string;
message?: string;
candidate_id?: string;
new_count?: number;
user_total_votes?: number;
}
interface CategoryResult {
candidate_id: string;
vote_count: number;
}
// ============================================================================
// Vote Service
// ============================================================================
export class VoteService {
/**
* Submit a vote atomically
*/
async submitVote(
userId: string,
category: VoteCategory,
candidateId: string,
localId: string
): Promise<VoteSubmitResult> {
const timestamp = Date.now().toString();
// Build Redis keys
const keys = [
`${REDIS_KEYS.VOTE_COUNT}:${category}`,
`${REDIS_KEYS.USER_CATEGORIES}:${userId}:categories`,
`${REDIS_KEYS.CATEGORY_VOTERS}:${category}:voters`,
`${REDIS_KEYS.LEADERBOARD}:${category}`,
REDIS_KEYS.SYNC_QUEUE,
`${REDIS_KEYS.VOTE_LOCK}:${userId}:${category}`,
];
const args = [
candidateId,
userId,
category,
timestamp,
localId,
config.voting.lockTtlMs.toString(),
config.voting.maxVotesPerUser.toString(),
];
try {
const resultJson = await redis.evalsha(
scriptShas.voteSubmit,
keys.length,
...keys,
...args
);
const result = JSON.parse(resultJson as string) as VoteSubmitResult;
if (result.success) {
logger.info(
{ userId, category, candidateId, newCount: result.new_count },
'Vote submitted successfully'
);
} else {
logger.warn(
{ userId, category, candidateId, error: result.error },
'Vote submission rejected'
);
}
return result;
} catch (error) {
logger.error({ error, userId, category, candidateId }, 'Vote submission error');
return {
success: false,
error: 'INTERNAL_ERROR',
message: 'Failed to submit vote',
};
}
}
/**
* Get real-time results for a category
*/
async getCategoryResults(category: VoteCategory, limit = 10): Promise<CategoryResult[]> {
const key = `${REDIS_KEYS.LEADERBOARD}:${category}`;
try {
const resultJson = await redis.evalsha(
scriptShas.getCategoryResults,
1,
key,
limit.toString()
);
return JSON.parse(resultJson as string) as CategoryResult[];
} catch (error) {
logger.error({ error, category }, 'Failed to get category results');
return [];
}
}
/**
* Get all results for all categories
*/
async getAllResults(categories: VoteCategory[]): Promise<Record<VoteCategory, CategoryResult[]>> {
const pipeline = redis.pipeline();
for (const category of categories) {
const key = `${REDIS_KEYS.LEADERBOARD}:${category}`;
pipeline.zrevrange(key, 0, -1, 'WITHSCORES');
}
const results = await pipeline.exec();
const formatted: Record<string, CategoryResult[]> = {};
categories.forEach((category, index) => {
const [err, data] = results![index];
if (!err && data) {
const pairs = data as string[];
const categoryResults: CategoryResult[] = [];
for (let i = 0; i < pairs.length; i += 2) {
categoryResults.push({
candidate_id: pairs[i],
vote_count: parseInt(pairs[i + 1], 10),
});
}
formatted[category] = categoryResults;
} else {
formatted[category] = [];
}
});
return formatted as Record<VoteCategory, CategoryResult[]>;
}
/**
* Check which categories a user has voted in
*/
async getUserVotedCategories(userId: string): Promise<VoteCategory[]> {
const key = `${REDIS_KEYS.USER_CATEGORIES}:${userId}:categories`;
try {
const resultJson = await redis.evalsha(scriptShas.checkUserVotes, 1, key);
return JSON.parse(resultJson as string) as VoteCategory[];
} catch (error) {
logger.error({ error, userId }, 'Failed to get user voted categories');
return [];
}
}
/**
* Get vote count for a specific candidate
*/
async getCandidateVoteCount(category: VoteCategory, candidateId: string): Promise<number> {
const key = `${REDIS_KEYS.VOTE_COUNT}:${category}`;
const count = await redis.hget(key, candidateId);
return count ? parseInt(count, 10) : 0;
}
/**
* Check if user has voted in a specific category
*/
async hasUserVotedInCategory(userId: string, category: VoteCategory): Promise<boolean> {
const key = `${REDIS_KEYS.CATEGORY_VOTERS}:${category}:voters`;
const result = await redis.sismember(key, userId);
return result === 1;
}
}
export const voteService = new VoteService();

View File

@@ -0,0 +1,266 @@
import { Server as HttpServer } from 'http';
import { Server, Socket } from 'socket.io';
import { createAdapter } from '@socket.io/redis-adapter';
import { redis } from '../config/redis';
import { config } from '../config';
import { logger } from '../utils/logger';
import { voteService } from '../services/vote.service';
import { SOCKET_EVENTS, SOCKET_ROOMS } from '@gala/shared/constants';
import type {
ServerToClientEvents,
ClientToServerEvents,
InterServerEvents,
SocketData,
VoteSubmitPayload,
JoinPayload,
AckCallback,
VoteCategory,
ConnectionAckPayload,
} from '@gala/shared/types';
export type GalaSocket = Socket<ClientToServerEvents, ServerToClientEvents, InterServerEvents, SocketData>;
export type GalaServer = Server<ClientToServerEvents, ServerToClientEvents, InterServerEvents, SocketData>;
let io: GalaServer;
/**
* Initialize Socket.io server
*/
export async function initializeSocket(httpServer: HttpServer): Promise<GalaServer> {
io = new Server(httpServer, {
cors: {
origin: config.corsOrigins,
credentials: true,
},
pingTimeout: config.socket.pingTimeout,
pingInterval: config.socket.pingInterval,
maxHttpBufferSize: config.socket.maxHttpBufferSize,
});
// Set up Redis adapter for horizontal scaling
const pubClient = redis.duplicate();
const subClient = redis.duplicate();
await Promise.all([pubClient.connect(), subClient.connect()]);
io.adapter(createAdapter(pubClient, subClient));
// Connection handler
io.on('connection', handleConnection);
logger.info('Socket.io server initialized');
return io;
}
/**
* Handle new socket connection
*/
function handleConnection(socket: GalaSocket): void {
logger.info({ socketId: socket.id }, 'New socket connection');
// Join event
socket.on(SOCKET_EVENTS.CONNECTION_JOIN, (data: JoinPayload, callback: AckCallback<ConnectionAckPayload>) => {
handleJoin(socket, data, callback);
});
// Vote submit event
socket.on(SOCKET_EVENTS.VOTE_SUBMIT, (data: VoteSubmitPayload, callback: AckCallback<{ newCount: number }>) => {
handleVoteSubmit(socket, data, callback);
});
// Ping event (custom heartbeat)
socket.on(SOCKET_EVENTS.CONNECTION_PING, () => {
socket.emit(SOCKET_EVENTS.CONNECTION_PONG as any);
});
// Sync request
socket.on(SOCKET_EVENTS.SYNC_REQUEST, () => {
handleSyncRequest(socket);
});
// Disconnect handler
socket.on('disconnect', (reason) => {
handleDisconnect(socket, reason);
});
// Error handler
socket.on('error', (error) => {
logger.error({ socketId: socket.id, error }, 'Socket error');
});
}
/**
* Handle user join
*/
async function handleJoin(
socket: GalaSocket,
data: JoinPayload,
callback: AckCallback<ConnectionAckPayload>
): Promise<void> {
try {
const { userId, userName, role } = data;
// Store user data in socket
socket.data.userId = userId;
socket.data.userName = userName;
socket.data.role = role;
socket.data.connectedAt = new Date();
socket.data.sessionId = socket.id;
// Join appropriate rooms
await socket.join(SOCKET_ROOMS.ALL);
if (role === 'user') {
await socket.join(SOCKET_ROOMS.MOBILE_USERS);
} else if (role === 'screen') {
await socket.join(SOCKET_ROOMS.SCREEN_DISPLAY);
} else if (role === 'admin') {
await socket.join(SOCKET_ROOMS.ADMIN);
}
// Get user's voted categories
const votedCategories = await voteService.getUserVotedCategories(userId);
logger.info({ socketId: socket.id, userId, userName, role }, 'User joined');
// Broadcast user count update
const userCount = await getUserCount();
io.to(SOCKET_ROOMS.ALL).emit(SOCKET_EVENTS.CONNECTION_USERS_COUNT as any, userCount);
callback({
success: true,
data: {
sessionId: socket.id,
serverTime: Date.now(),
reconnected: false,
},
});
} catch (error) {
logger.error({ socketId: socket.id, error }, 'Join error');
callback({
success: false,
error: 'INTERNAL_ERROR',
message: 'Failed to join',
});
}
}
/**
* Handle vote submission via WebSocket
*/
async function handleVoteSubmit(
socket: GalaSocket,
data: VoteSubmitPayload,
callback: AckCallback<{ newCount: number }>
): Promise<void> {
const userId = socket.data.userId;
if (!userId) {
callback({
success: false,
error: 'UNAUTHORIZED',
message: 'Not authenticated',
});
return;
}
try {
const result = await voteService.submitVote(
userId,
data.category as VoteCategory,
data.candidateId,
data.localId
);
if (!result.success) {
callback({
success: false,
error: result.error as any,
message: result.message,
});
return;
}
// Broadcast vote update to all clients
io.to(SOCKET_ROOMS.ALL).emit(SOCKET_EVENTS.VOTE_UPDATED as any, {
candidateId: data.candidateId,
category: data.category,
totalVotes: result.new_count!,
delta: 1,
});
callback({
success: true,
data: {
newCount: result.new_count!,
},
});
} catch (error) {
logger.error({ socketId: socket.id, userId, error }, 'Vote submit error');
callback({
success: false,
error: 'INTERNAL_ERROR',
message: 'Failed to submit vote',
});
}
}
/**
* Handle sync request
*/
async function handleSyncRequest(socket: GalaSocket): Promise<void> {
const userId = socket.data.userId;
if (!userId) {
return;
}
try {
const votedCategories = await voteService.getUserVotedCategories(userId);
socket.emit(SOCKET_EVENTS.SYNC_STATE as any, {
votes: {}, // TODO: Include current vote counts
userVotedCategories: votedCategories,
});
} catch (error) {
logger.error({ socketId: socket.id, userId, error }, 'Sync request error');
}
}
/**
* Handle disconnect
*/
function handleDisconnect(socket: GalaSocket, reason: string): void {
logger.info(
{
socketId: socket.id,
userId: socket.data.userId,
reason,
},
'Socket disconnected'
);
// Broadcast updated user count
getUserCount().then((count) => {
io.to(SOCKET_ROOMS.ALL).emit(SOCKET_EVENTS.CONNECTION_USERS_COUNT as any, count);
});
}
/**
* Get current connected user count
*/
async function getUserCount(): Promise<number> {
const sockets = await io.in(SOCKET_ROOMS.MOBILE_USERS).fetchSockets();
return sockets.length;
}
/**
* Get Socket.io server instance
*/
export function getIO(): GalaServer {
if (!io) {
throw new Error('Socket.io not initialized');
}
return io;
}

View File

@@ -0,0 +1,18 @@
import pino from 'pino';
import { config } from '../config';
export const logger = pino({
level: config.isDev ? 'debug' : 'info',
transport: config.isDev
? {
target: 'pino-pretty',
options: {
colorize: true,
translateTime: 'SYS:standard',
ignore: 'pid,hostname',
},
}
: undefined,
});
export type Logger = typeof logger;

View File

@@ -0,0 +1,12 @@
{
"extends": "../../tsconfig.base.json",
"compilerOptions": {
"outDir": "./dist",
"rootDir": "./src",
"module": "ESNext",
"moduleResolution": "bundler",
"noEmit": true
},
"include": ["src/**/*"],
"exclude": ["node_modules", "dist"]
}