import express from 'express'; import cors from 'cors'; import helmet from 'helmet'; import morgan from 'morgan'; import compression from 'compression'; import { createServer } from 'http'; import { createConnection, createServer as createNetServer } from 'net'; import { exec } from 'child_process'; import { promisify } from 'util'; import { config } from './config/index.js'; import { errorHandler } from './middleware/error.js'; import { setupRoutes } from './routes/index.js'; import { setupWebSocket } from './websocket/index.js'; import { initDatabase } from './models/index.js'; import { initRedis } from './config/redis.js'; import { logger } from './utils/logger.js'; import { taskScheduler } from './scheduler/index.js'; import { registerTaskExecutors } from './services/taskExecutors.js'; import { taskQueueService } from './services/TaskQueueService.js'; import { browserLoginService } from './services/BrowserLoginService.js'; import { wsManager } from './websocket/index.js'; const execAsync = promisify(exec); const app = express(); const httpServer = createServer(app); // 中间件 app.use(helmet({ crossOriginResourcePolicy: { policy: 'cross-origin' }, })); app.use(cors({ origin: config.cors.origin, credentials: true, })); app.use(compression()); app.use(morgan('combined', { stream: { write: (message) => logger.info(message.trim()) }, })); app.use(express.json({ limit: '50mb' })); app.use(express.urlencoded({ extended: true, limit: '50mb' })); // 静态文件 app.use('/uploads', express.static(config.upload.path)); // 健康检查 app.get('/api/health', (_req, res) => { res.json({ status: 'ok', version: config.version, uptime: process.uptime(), timestamp: new Date().toISOString(), }); }); // API 路由 setupRoutes(app); // 错误处理 app.use(errorHandler); // WebSocket setupWebSocket(httpServer); // 检查端口是否被占用(改进版:使用 net.Server 来测试) async function checkPortInUse(port: number): Promise { return new Promise((resolve) => { const testServer = createNetServer(); testServer.once('error', (err: NodeJS.ErrnoException) => { if (err.code === 'EADDRINUSE') { resolve(true); // 端口被占用 } else { resolve(false); } }); testServer.once('listening', () => { testServer.close(() => { resolve(false); // 端口未被占用 }); }); testServer.listen(port); }); } // 获取占用端口的进程ID(跨平台) async function getProcessOnPort(port: number): Promise { const isWindows = process.platform === 'win32'; try { if (isWindows) { // Windows: 使用 netstat 命令,改进解析逻辑 const { stdout } = await execAsync(`netstat -ano | findstr :${port}`); const lines = stdout.trim().split('\n'); for (const line of lines) { // 匹配 LISTENING 状态的连接 if (line.includes('LISTENING')) { const parts = line.trim().split(/\s+/); const pid = parseInt(parts[parts.length - 1], 10); if (!isNaN(pid) && pid > 0) { logger.info(`[Port Check] Found PID ${pid} listening on port ${port}`); return pid; } } } } else { const { stdout } = await execAsync(`lsof -i :${port} -t`); const pid = parseInt(stdout.trim().split('\n')[0], 10); if (!isNaN(pid)) return pid; } } catch (error) { // 命令执行失败,可能端口没有被占用 logger.debug(`[Port Check] Command failed (port might not be in use):`, error); } return null; } // 终止进程(跨平台) async function killProcess(pid: number): Promise { const isWindows = process.platform === 'win32'; const currentPid = process.pid; // 不要杀死自己的进程 if (pid === currentPid) { logger.warn(`[Port Check] Cannot kill own process (PID: ${pid})`); return false; } try { if (isWindows) { await execAsync(`taskkill /PID ${pid} /F`); } else { await execAsync(`kill -9 ${pid}`); } // 等待进程完全终止 await new Promise(resolve => setTimeout(resolve, 2000)); return true; } catch (error) { logger.error(`Failed to kill process ${pid}:`, error); return false; } } // 检查并释放端口(增加重试机制) async function ensurePortAvailable(port: number, maxRetries: number = 3): Promise { for (let attempt = 1; attempt <= maxRetries; attempt++) { const inUse = await checkPortInUse(port); if (!inUse) { logger.info(`[Port Check] Port ${port} is available`); return; } logger.warn(`[Port Check] Attempt ${attempt}/${maxRetries}: Port ${port} is already in use`); const pid = await getProcessOnPort(port); if (pid) { logger.info(`[Port Check] Found process ${pid} using port ${port}, attempting to terminate...`); const killed = await killProcess(pid); if (killed) { logger.info(`[Port Check] Successfully terminated process ${pid}`); // 等待更长时间让端口释放 await new Promise(resolve => setTimeout(resolve, 2000)); // 再次检查端口 const stillInUse = await checkPortInUse(port); if (!stillInUse) { logger.info(`[Port Check] Port ${port} is now available`); return; } } } else { logger.warn(`[Port Check] Port ${port} is in use but could not identify the process`); } // 如果还有重试机会,等待一会儿 if (attempt < maxRetries) { logger.info(`[Port Check] Waiting before retry...`); await new Promise(resolve => setTimeout(resolve, 3000)); } } throw new Error(`Port ${port} is still in use after ${maxRetries} attempts. Please manually kill the process or use a different port.`); } // 启动服务 async function bootstrap() { // 确保端口可用 try { await ensurePortAvailable(config.port); } catch (error) { logger.error('Port availability check failed:', error); process.exit(1); } let dbConnected = false; let redisConnected = false; // 尝试初始化数据库 try { await initDatabase(); logger.info('Database connected'); dbConnected = true; } catch (error) { logger.warn('Database connection failed - running in limited mode'); logger.warn('Please install MySQL and create the database, or use Docker'); logger.error('Database error:', error instanceof Error ? error.message : error); } // 尝试初始化 Redis try { await initRedis(); logger.info('Redis connected'); redisConnected = true; } catch (error) { logger.warn('Redis connection failed - some features may not work'); } // 只有在数据库连接成功时才启动调度器和注册任务执行器 if (dbConnected) { registerTaskExecutors(); taskScheduler.start(); // 启动任务队列 Worker taskQueueService.startWorker(); } // 注册浏览器登录服务的事件监听(用于 AI 分析结果的 WebSocket 推送) setupBrowserLoginEvents(); // 启动 HTTP 服务 httpServer.listen(config.port, config.host, () => { logger.info(`Server running on http://${config.host}:${config.port}`); logger.info(`Environment: ${config.env}`); if (!dbConnected) { logger.warn('⚠️ Running without database - API endpoints will not work'); logger.warn('⚠️ Please configure MySQL in .env file'); } }); } /** * 设置浏览器登录服务的事件监听 * 用于通过 WebSocket 推送 AI 分析结果给前端 */ function setupBrowserLoginEvents(): void { // AI 分析结果事件 browserLoginService.on('aiAnalysis', (data: { sessionId: string; userId?: number; status: string; analysis: { isLoggedIn: boolean; hasVerification: boolean; verificationType?: string; verificationDescription?: string; pageDescription: string; suggestedAction?: string; }; }) => { if (data.userId) { wsManager.sendToUser(data.userId, 'login:aiAnalysis', { sessionId: data.sessionId, status: data.status, analysis: data.analysis, }); } }); // 验证码检测事件 browserLoginService.on('verificationNeeded', (data: { sessionId: string; userId?: number; verificationType?: string; description?: string; suggestedAction?: string; }) => { if (data.userId) { wsManager.sendToUser(data.userId, 'login:verificationNeeded', { sessionId: data.sessionId, verificationType: data.verificationType, description: data.description, suggestedAction: data.suggestedAction, }); } }); // 导航建议事件 browserLoginService.on('navigationSuggestion', (data: { sessionId: string; userId?: number; guide: unknown; }) => { if (data.userId) { wsManager.sendToUser(data.userId, 'login:navigationSuggestion', { sessionId: data.sessionId, guide: data.guide, }); } }); // 登录结果事件(也通过 WebSocket 推送) browserLoginService.on('loginResult', (data: { sessionId: string; userId?: number; status: string; cookies?: string; accountInfo?: unknown; error?: string; message?: string; }) => { logger.info(`[BrowserLogin] Login result event: ${data.sessionId}, status: ${data.status}`); if (data.userId) { wsManager.sendToUser(data.userId, 'login:result', { sessionId: data.sessionId, status: data.status, accountInfo: data.accountInfo, error: data.error, message: data.message, }); } }); logger.info('Browser login events registered'); } // 优雅关闭 process.on('SIGTERM', async () => { logger.info('SIGTERM received, shutting down gracefully'); taskScheduler.stop(); await taskQueueService.close(); httpServer.close(() => { logger.info('Server closed'); process.exit(0); }); }); bootstrap(); export { app, httpServer };