diff --git a/CMakeLists.txt b/CMakeLists.txt index 8a331f9..52c15b4 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -102,6 +102,7 @@ set(NETWORK_SOURCES src/network/NetworkQualityMonitor.cpp src/network/PredictionSystem.cpp src/network/WebSocketProtocol.cpp + src/network/WebSocketSession.cpp ) # Loot system source files diff --git a/build.sh b/build.sh index 9bb4d20..6795121 100755 --- a/build.sh +++ b/build.sh @@ -79,6 +79,32 @@ cmake .. -B . \ # Build make -j$(nproc) +# ========== SSL Certificate Generation ========== +# Generate self-signed SSL certificates if missing +if command -v openssl &> /dev/null; then + # Create certs directory if needed + mkdir -p certs + # Generate server certificate and key if not present + if [ ! -f "certs/server.crt" ] || [ ! -f "certs/server.key" ]; then + echo "Generating self-signed SSL certificate..." + openssl req -x509 -newkey rsa:4096 \ + -keyout certs/server.key \ + -out certs/server.crt \ + -days 365 -nodes \ + -subj "/CN=localhost" + echo "SSL certificate and key created in certs/" + fi + # Generate DH parameters if not present (optional but may be used) + if [ ! -f "certs/dhparam.pem" ]; then + echo "Generating DH parameters (this may take a moment)..." + openssl dhparam -out certs/dhparam.pem 2048 + echo "DH parameters generated." + fi +else + echo "openssl not found, skipping SSL certificate generation" +fi +# ================================================ + if [ -f "gameserver" ]; then echo "Build successful! Executable: $(pwd)/gameserver" else diff --git a/config/core.json b/config/core.json index 76790cc..699abc3 100644 --- a/config/core.json +++ b/config/core.json @@ -1,14 +1,39 @@ { "process": { - "max_message_size": 1048576, - "receive_timeout_ms": 1000 - }, - "server": { - "host": "127.0.0.1", - "port": 8080, - "maxConnections": 1000, - "processCount": 4, - "workerThreads": 8 + "workers": [ + { + "protocol": "binary", + "host": "127.0.0.1", + "port": 9999, + "max_connections": 1000, + "reuse": true, + "threads": 4, + "count": 2, + "cpu_affinity": [0, 1], + "tcp_nodelay": true, + "send_buffer_size": 65536, + "receive_buffer_size": 65536 + }, + { + "protocol": "websocket", + "host": "127.0.0.1", + "port": 8080, + "max_connections": 1000, + "reuse": true, + "threads": 2, + "count": 2, + "cpu_affinity": [2, 3], + "tcp_nodelay": true, + "path": "/game", + "subprotocols": ["binary", "json"], + "max_frame_size": 16384, + "ssl": { + "certificate": "certs/server.crt", + "private_key": "certs/server.key", + "dh_params": "certs/dhparam.pem" + } + } + ] }, "world": { @@ -85,10 +110,10 @@ }, "database": { - "backend": "postgresql", + "backend": "sqlite", "host": "127.0.0.1", "port": 5432, - "name": "gamedb", + "name": "data/game.db", "user": "gameuser", "password": "password", "workerNodes": [], diff --git a/include/config/ConfigManager.hpp b/include/config/ConfigManager.hpp index 37553c0..5633abc 100644 --- a/include/config/ConfigManager.hpp +++ b/include/config/ConfigManager.hpp @@ -7,6 +7,7 @@ #include #include #include +#include #include #include #include @@ -14,23 +15,40 @@ #include -#ifdef USE_SPDLOG #include "logging/Logger.hpp" -#else -class Logger { -public: - template - static void Info(Args&&...) {} - template - static void Warn(Args&&...) {} - template - static void Debug(Args&&...) {} - template - static void Error(Args&&...) {} - template - static void Critical(Args&&...) {} + +// SSL configuration (optional) +struct SSLConfig { + std::string certificate; + std::string private_key; + std::string dh_params; // optional DH parameters + bool verify_peer = false; + std::vector ciphers; // allowed cipher suites + std::string ca_cert; // CA certificate for client validation +}; + +// Worker group configuration (one per listener) +struct WorkerGroupConfig { + std::string protocol; // "binary" or "websocket" + std::string host; // "0.0.0.0" or specific IP + uint16_t port; // listening port + int max_connections; // asio::ip::tcp::acceptor.listen(max_connections) + bool reuse; // asio::ip::tcp::acceptor::reuse_address + int threads; // number of io_context threads for this worker + int count; // number of worker processes for this group + std::vector cpu_affinity; // CPU cores to bind to (optional) + bool tcp_nodelay; // TCP_NODELAY option + int send_buffer_size; // SO_SNDBUF (0 = system default) + int receive_buffer_size; // SO_RCVBUF (0 = system default) + + // WebSocket-specific (only used when protocol == "websocket") + std::string path; // WebSocket endpoint path (e.g., "/game") + std::vector subprotocols; + int max_frame_size; // maximum WebSocket frame size in bytes + + // SSL (optional) + std::optional ssl; }; -#endif class ConfigManager { public: @@ -40,20 +58,19 @@ class ConfigManager { bool ReloadConfig(); const std::string& GetConfigPath() const { return configPath_; } - // Setters + // Setters (generic) void SetBool(const std::string& key, bool value); void SetInt(const std::string& key, int value); void SetFloat(const std::string& key, float value); void SetString(const std::string& key, const std::string& value); void SetJson(const std::string& key, const nlohmann::json& value); - // Server configuration - std::string GetServerHost() const; - uint16_t GetServerPort() const; - int GetMaxConnections() const; - int GetIoThreads() const; - bool GetReusePort() const; - int GetProcessCount() const; + // Worker groups API + std::vector GetWorkerGroups() const; + + // Total workers and threads (derived from groups) + int GetTotalWorkerCount() const; + int GetTotalThreadCount() const; // Database configuration std::string GetDatabaseHost() const; @@ -94,8 +111,6 @@ class ConfigManager { float GetFloat(const std::string& key, float defaultValue = 0.0f) const; bool GetBool(const std::string& key, bool defaultValue = false) const; std::string GetString(const std::string& key, const std::string& defaultValue = "") const; - // nlohmann::json j = nlohmann::json::parse(R"(["root", "home", "var"])"); - // std::vector colors = {"root", "home", "var"}; std::vector GetStringArray(const std::string& key) const; nlohmann::json GetJson(const std::string& key) const; bool HasKey(const std::string& key) const; @@ -105,9 +120,10 @@ class ConfigManager { ConfigManager(const ConfigManager&) = delete; ConfigManager& operator=(const ConfigManager&) = delete; - bool ValidateConfig() const; - - mutable std::mutex configMutex_; // For thread safety + bool HasProcessConfig() const; + bool ValidateConfig(const nlohmann::json& config) const; + + mutable std::mutex configMutex_; nlohmann::json config_; std::string configPath_; }; diff --git a/include/database/Backend.hpp b/include/database/Backend.hpp index d659e7a..fa0f4e5 100644 --- a/include/database/Backend.hpp +++ b/include/database/Backend.hpp @@ -4,6 +4,20 @@ #ifdef USE_SPDLOG #include "logging/Logger.hpp" +#else +class Logger { +public: + template + static void Info(Args&&...) {} + template + static void Warn(Args&&...) {} + template + static void Debug(Args&&...) {} + template + static void Error(Args&&...) {} + template + static void Critical(Args&&...) {} +}; #endif #include "database/SQLProvider.hpp" diff --git a/include/network/ConnectionManager.hpp b/include/network/ConnectionManager.hpp index 3539a5d..310efdd 100644 --- a/include/network/ConnectionManager.hpp +++ b/include/network/ConnectionManager.hpp @@ -3,20 +3,21 @@ #include #include #include -#include -#include -#include +#include #include +#include +#include #include -#include -#include #include +#include +#include #include -#include "logging/Logger.hpp" -#include "network/GameSession.hpp" #include "nlohmann/json.hpp" +#include "logging/Logger.hpp" +#include "network/IConnection.hpp" + class ConnectionManager { public: // Delete copy constructor and assignment operator @@ -29,13 +30,13 @@ class ConnectionManager { // Get shared_ptr to singleton (useful for passing to other components) static std::shared_ptr GetInstancePtr(); - void Start(std::shared_ptr session); - void Stop(std::shared_ptr session); + void Start(std::shared_ptr session); + void Stop(std::shared_ptr session); void StopAll(); size_t GetConnectionCount() const; - std::shared_ptr GetSession(uint64_t sessionId) const; - std::vector> GetAllSessions() const; + std::shared_ptr GetSession(uint64_t sessionId) const; + std::vector> GetAllSessions() const; // Broadcast methods void Broadcast(const nlohmann::json& message); @@ -43,7 +44,7 @@ class ConnectionManager { // New broadcast methods void BroadcastWithFilter(const nlohmann::json& message, - std::function)> filter); + std::function)> filter); void BroadcastExcept(uint64_t excludeSessionId, const nlohmann::json& message); void BroadcastToAuthenticated(const nlohmann::json& message); void BroadcastToUnauthenticated(const nlohmann::json& message); @@ -54,9 +55,9 @@ class ConnectionManager { void RemoveFromAllGroups(uint64_t sessionId); // Session query methods - std::vector> GetSessionsByPlayerId(int64_t playerId) const; + std::vector> GetSessionsByPlayerId(int64_t playerId) const; std::vector GetSessionIdsInGroup(const std::string& groupId) const; - std::vector> GetSessionsInGroup(const std::string& groupId) const; + std::vector> GetSessionsInGroup(const std::string& groupId) const; std::set GetGroupsForSession(uint64_t sessionId) const; bool IsSessionInGroup(uint64_t sessionId, const std::string& groupId) const; @@ -106,7 +107,7 @@ class ConnectionManager { void DisconnectAllInGroup(const std::string& groupId); // Load balancing - std::vector> GetSessionsByWorkerId(int workerId) const; + std::vector> GetSessionsByWorkerId(int workerId) const; void RedistributeSessions(const std::vector& workerIds); // Event system @@ -118,7 +119,7 @@ class ConnectionManager { void EnforceGlobalRateLimit(int maxMessagesPerSecond); // Session migration - bool MigrateSession(uint64_t sessionId, std::shared_ptr newSession); + bool MigrateSession(uint64_t sessionId, std::shared_ptr newSession); // Monitoring void MonitorConnections(); @@ -138,7 +139,7 @@ class ConnectionManager { // Session storage mutable std::shared_mutex sessionsMutex_; - std::unordered_map> sessions_; + std::unordered_map> sessions_; // Group management mutable std::shared_mutex groupsMutex_; diff --git a/include/network/GameServer.hpp b/include/network/GameServer.hpp index 2ed5a6a..7e7e917 100644 --- a/include/network/GameServer.hpp +++ b/include/network/GameServer.hpp @@ -2,28 +2,37 @@ #include #include -#include -#include #include -#include +#include +#include +#include #include -#include +#include #include "logging/Logger.hpp" #include "config/ConfigManager.hpp" #include "network/ConnectionManager.hpp" +#include "network/GameSession.hpp" +#include "network/WebSocketProtocol.hpp" +#include "network/WebSocketSession.hpp" class GameServer { public: - GameServer(const ConfigManager& config); + GameServer(const WorkerGroupConfig& groupConfig, const ConfigManager& globalConfig); ~GameServer(); bool Initialize(); void Run(); void Shutdown(); - void SetSessionFactory(std::function(asio::ip::tcp::socket)> factory); + // For binary protocol + using SessionFactory = std::function(asio::ip::tcp::socket, std::shared_ptr)>; + void SetSessionFactory(SessionFactory factory); + + // For WebSocket protocol + using WebSocketFactory = std::function)>; + void SetWebSocketConnectionFactory(WebSocketFactory factory); private: void DoAccept(); @@ -34,15 +43,19 @@ class GameServer { asio::ip::tcp::acceptor acceptor_; asio::signal_set signals_; + WorkerGroupConfig groupConfig_; + const ConfigManager& globalConfig_; + std::string host_; uint16_t port_; - bool reusePort_; + bool reuse_; int ioThreads_; std::vector workerThreads_; std::atomic running_{false}; - std::function(asio::ip::tcp::socket)> sessionFactory_; + std::shared_ptr sslContext_; // optional, set if SSL is configured - const ConfigManager& config_; + SessionFactory sessionFactory_; + WebSocketFactory webSocketFactory_; }; diff --git a/include/network/GameSession.hpp b/include/network/GameSession.hpp index c4d727d..12e6f6e 100644 --- a/include/network/GameSession.hpp +++ b/include/network/GameSession.hpp @@ -24,6 +24,7 @@ #include "network/BinaryProtocol.hpp" #include "network/NetworkQualityMonitor.hpp" #include "network/PredictionSystem.hpp" +#include "network/IConnection.hpp" struct SessionStats { // Message statistics @@ -152,35 +153,43 @@ struct RateLimitConfig { bool exempt_authenticated_users{false}; // Whether authenticated users are exempt }; -class GameSession : public std::enable_shared_from_this { +class GameSession : public IConnection, public std::enable_shared_from_this { public: using Pointer = std::shared_ptr; // Constructor with SSL context option - explicit GameSession(asio::ip::tcp::socket socket, + explicit GameSession(asio::ip::tcp::socket socket, std::shared_ptr ssl_context = nullptr); ~GameSession(); // Core session management - void Start(); - void Stop(); - void Disconnect(); + void Start() override; + void Stop() override; + void Disconnect() override; + bool IsConnected() const override; + uint64_t GetSessionId() const override { return sessionId_; } + - bool IsConnected() const; - uint64_t GetSessionId() const { return sessionId_; } asio::ip::tcp::endpoint GetRemoteEndpoint() const; bool IsEncrypted() const { return ssl_stream_ != nullptr; } - + std::string GetRemoteAddress() const override { + try { + return GetRemoteEndpoint().address().to_string(); + } catch (const std::exception& e) { + Logger::Error("GetRemoteAddress: {}", e.what()); + return "unknown"; + } + } // Binary protocol methods void SendPing(); void SendPong(); - void SendBinary(uint16_t message_type, const std::vector& data); + void SendBinary(uint16_t message_type, const std::vector& data) override; void SendBinary(uint16_t message_type, const void* data, size_t length); void SendBinaryWithAck(uint16_t message_type, const std::vector& data); void SendBinaryError(uint16_t message_type, const std::string& error_message, int code); // JSON compatibility (for backward compatibility) - void Send(const nlohmann::json& message); + void Send(const nlohmann::json& message) override; void SendRaw(const std::string& data); void SendError(const std::string& message, int code); void SendSuccess(const std::string& message, const nlohmann::json& data = {}); @@ -200,110 +209,112 @@ class GameSession : public std::enable_shared_from_this { // Protocol negotiation void StartProtocolNegotiation(); - + // Network quality adaptation void AdaptToNetworkConditions(); NetworkQualityMonitor& GetNetworkMonitor() { return network_monitor_; } - + // Prediction system PredictionSystem& GetPredictionSystem(); - + // Message handling with binary support using BinaryMessageHandler = std::function&)>; void SetBinaryMessageHandler(uint16_t message_type, BinaryMessageHandler handler); void SetDefaultBinaryMessageHandler(BinaryMessageHandler handler); - + // Callback setters - void SetMessageHandler(std::function handler); - void SetCloseHandler(std::function handler); - + //void SetMessageHandler(std::function handler); + //void SetCloseHandler(std::function handler); + void SetMessageHandler(MessageHandler handler) override; + void SetCloseHandler(CloseHandler handler) override; + // Authentication and security - void Authenticate(const std::string& authToken); + void Authenticate(const std::string& authToken) override; void Deauthenticate(); - bool IsAuthenticated() const; + bool IsAuthenticated() const override; std::string GetAuthToken() const; - void SetPlayerId(int64_t playerId); - int64_t GetPlayerId() const; - + void SetPlayerId(int64_t playerId) override; + int64_t GetPlayerId() const override; + // Session data storage (unchanged) - void SetData(const std::string& key, const nlohmann::json& value); - nlohmann::json GetData(const std::string& key, const nlohmann::json& defaultValue = {}) const; - bool HasData(const std::string& key) const; - void RemoveData(const std::string& key); - void ClearData(); - nlohmann::json GetAllData() const; - + void SetData(const std::string& key, const nlohmann::json& value) override; + nlohmann::json GetData(const std::string& key, const nlohmann::json& defaultValue = {}) const override; + bool HasData(const std::string& key) const override; + void RemoveData(const std::string& key) override; + void ClearData() override; + nlohmann::json GetAllData() const override; + // Session properties (unchanged) void SetProperty(const std::string& key, const std::string& value); std::string GetProperty(const std::string& key, const std::string& defaultValue = "") const; std::map GetAllProperties() const; - + // Session groups (unchanged) - void JoinGroup(const std::string& groupId); - void LeaveGroup(const std::string& groupId); - void LeaveAllGroups(); - std::set GetJoinedGroups() const; - bool IsInGroup(const std::string& groupId) const; - + void JoinGroup(const std::string& groupId) override; + void LeaveGroup(const std::string& groupId) override; + void LeaveAllGroups() override; + std::set GetJoinedGroups() const override; + bool IsInGroup(const std::string& groupId) const override; + // Statistics and metrics SessionStats GetStats() const; void ResetStats(); void RecordMessageReceived(size_t size); void RecordMessageSent(size_t size); - + SessionMetrics GetMetrics() const; void PrintMetrics() const; - + // Compression void SetCompressionEnabled(bool enabled); bool IsCompressionEnabled() const; - + // Rate limiting void SetRateLimit(int messagesPerSecond, int burstSize); void SetRateLimitEnabled(bool enabled); bool CheckRateLimit(); - + // Connection quality monitoring void RecordLatency(uint64_t latencyMs); uint64_t GetAverageLatency() const; uint64_t GetMinLatency() const; uint64_t GetMaxLatency() const; std::vector GetLatencySamples() const; - + // Custom event handlers void SetCustomEventHandler(const std::string& eventName, std::function handler); void RemoveCustomEventHandler(const std::string& eventName); void HandleCustomEvent(const std::string& eventName, const nlohmann::json& data); - + // Message queue management size_t GetPendingMessageCount() const; void ClearPendingMessages(); bool IsWriteQueueFull() const; void SetMaxWriteQueueSize(size_t maxSize); - + // Heartbeat management void UpdateHeartbeat(); - + // Utility methods std::string ToString() const; uint64_t GetUptimeSeconds() const; - + // Graceful shutdown void BeginGracefulShutdown(); void CancelGracefulShutdown(); - + // World and entity methods (binary versions) void SendWorldChunkBinary(int chunkX, int chunkZ, const std::vector& chunkData); void SendEntityUpdateBinary(uint64_t entityId, const std::vector& entityData); void SendEntitySpawnBinary(uint64_t entityId, const std::vector& spawnData); void SendEntityDespawnBinary(uint64_t entityId); - + // Player state synchronization (with prediction) - void SyncPlayerStateBinary(const glm::vec3& position, const glm::vec3& rotation, + void SyncPlayerStateBinary(const glm::vec3& position, const glm::vec3& rotation, const glm::vec3& velocity, uint32_t last_input_id); void SendPositionCorrection(const glm::vec3& position, const glm::vec3& velocity); - + private: // Core networking asio::ip::tcp::socket socket_; @@ -313,109 +324,110 @@ class GameSession : public std::enable_shared_from_this { asio::streambuf read_buffer_; std::queue> write_queue_; mutable std::mutex write_mutex_; - + // Binary protocol state std::unordered_map binary_handlers_; BinaryMessageHandler default_binary_handler_; std::mutex binary_handlers_mutex_; - + std::atomic next_sequence_{1}; std::unordered_map pending_acks_; std::mutex ack_mutex_; - + // Session identification uint64_t sessionId_; static std::atomic nextSessionId_; - + // Callbacks std::function message_handler_; std::function close_handler_; - + // State management std::atomic connected_{false}; std::atomic closing_{false}; std::atomic graceful_shutdown_{false}; std::atomic protocol_negotiated_{false}; - + // Heartbeat asio::steady_timer heartbeat_timer_; asio::steady_timer shutdown_timer_; asio::steady_timer network_adaptation_timer_; std::chrono::steady_clock::time_point last_heartbeat_; std::chrono::steady_clock::time_point connected_time_; - + // Network quality monitoring NetworkQualityMonitor network_monitor_; - + // Prediction system PredictionSystem prediction_system_; - + // Statistics mutable std::mutex stats_mutex_; SessionStats stats_; - + // Compression std::atomic compression_enabled_{false}; - + // Rate limiting mutable std::mutex rate_limit_mutex_; RateLimitConfig rate_limit_; std::atomic rate_limit_enabled_{false}; - + // Groups mutable std::mutex groups_mutex_; std::set joined_groups_; - + // Authentication mutable std::mutex auth_mutex_; std::string auth_token_; std::atomic authenticated_{false}; std::atomic player_id_{0}; std::chrono::steady_clock::time_point authentication_time_; - + // Session data mutable std::mutex data_mutex_; std::map session_data_; - + // Properties mutable std::mutex properties_mutex_; std::map properties_; - + // Custom event handlers mutable std::mutex event_handlers_mutex_; std::map> custom_event_handlers_; - + // Queue management size_t max_write_queue_size_{1000}; - + // Private methods void StartHeartbeat(); void CheckHeartbeat(); void DoRead(); void DoWrite(); void HandleMessage(const std::string& message); - + // Binary protocol methods void DoBinaryRead(); void HandleBinaryMessage(const BinaryProtocol::BinaryMessage& message); void DoBinaryWrite(); void ProcessAcknowledgment(uint32_t sequence); void SendAcknowledgment(uint32_t sequence); - + // SSL/TLS methods void StartTLSHandshake(); void HandleTLSHandshake(std::error_code ec); - + // Network adaptation void StartNetworkAdaptation(); void CheckNetworkConditions(); - + // Protocol negotiation void SendProtocolCapabilities(); void HandleProtocolNegotiation(const std::vector& data); - + // Helper methods asio::ip::tcp::socket& GetSocket(); const asio::ip::tcp::socket& GetSocket() const; void HandleNetworkError(std::error_code ec); }; + diff --git a/include/network/IConnection.hpp b/include/network/IConnection.hpp new file mode 100644 index 0000000..e948623 --- /dev/null +++ b/include/network/IConnection.hpp @@ -0,0 +1,68 @@ +#pragma once + +#include +#include +#include +#include +#include +#include + +#include + +class IConnection { +public: + virtual ~IConnection() = default; + + // Core methods + virtual void Start() = 0; + virtual void Stop() = 0; + virtual void Disconnect() = 0; + virtual bool IsConnected() const = 0; + virtual uint64_t GetSessionId() const = 0; + + // Send methods + virtual void Send(const nlohmann::json& message) = 0; + virtual void SendRaw(const std::string& data) = 0; + virtual void SendBinary(uint16_t message_type, const std::vector& data) = 0; + + // Callback setters + using MessageHandler = std::function; + virtual void SetMessageHandler(MessageHandler handler) = 0; + + using CloseHandler = std::function; + virtual void SetCloseHandler(CloseHandler handler) = 0; + + // Authentication and properties + virtual void Authenticate(const std::string& authToken) = 0; + virtual bool IsAuthenticated() const = 0; + virtual void SetPlayerId(int64_t playerId) = 0; + virtual int64_t GetPlayerId() const = 0; + virtual std::string GetAuthToken() const = 0; + + // Properties (key‑value pairs for metadata) + virtual void SetProperty(const std::string& key, const std::string& value) = 0; + virtual std::string GetProperty(const std::string& key, const std::string& defaultValue = "") const = 0; + virtual std::map GetAllProperties() const = 0; + + // Groups + virtual void JoinGroup(const std::string& groupId) = 0; + virtual void LeaveGroup(const std::string& groupId) = 0; + virtual void LeaveAllGroups() = 0; + virtual std::set GetJoinedGroups() const = 0; + virtual bool IsInGroup(const std::string& groupId) const = 0; + + // Data storage (JSON) + virtual void SetData(const std::string& key, const nlohmann::json& value) = 0; + virtual nlohmann::json GetData(const std::string& key, const nlohmann::json& defaultValue = {}) const = 0; + virtual bool HasData(const std::string& key) const = 0; + virtual void RemoveData(const std::string& key) = 0; + virtual void ClearData() = 0; + virtual nlohmann::json GetAllData() const = 0; + + // Remote info + virtual std::string GetRemoteAddress() const = 0; + + // Statistics (optional, can be empty implementations) + virtual void RecordMessageReceived(size_t size) {(void)size;} + virtual void RecordMessageSent(size_t size) {(void)size;} +}; diff --git a/include/network/WebSocketSession.hpp b/include/network/WebSocketSession.hpp new file mode 100644 index 0000000..4bdf0e5 --- /dev/null +++ b/include/network/WebSocketSession.hpp @@ -0,0 +1,83 @@ +#pragma once + +#include +#include +#include + +#include "logging/Logger.hpp" + +#include "network/IConnection.hpp" +#include "network/WebSocketProtocol.hpp" + +class WebSocketSession : public IConnection, public std::enable_shared_from_this { +public: + WebSocketSession(WebSocketProtocol::WebSocketConnection::Pointer wsConn); + ~WebSocketSession(); + + // IConnection implementation + void Start() override; + void Stop() override; + void Disconnect() override; + bool IsConnected() const override; + uint64_t GetSessionId() const override; + + void Send(const nlohmann::json& message) override; + void SendRaw(const std::string& data) override; + void SendBinary(uint16_t message_type, const std::vector& data) override; + + void SetMessageHandler(MessageHandler handler) override; + void SetCloseHandler(CloseHandler handler) override; + + void Authenticate(const std::string& authToken) override; + bool IsAuthenticated() const override; + void SetPlayerId(int64_t playerId) override; + int64_t GetPlayerId() const override; + std::string GetAuthToken() const override; + + void SetProperty(const std::string& key, const std::string& value) override; + std::string GetProperty(const std::string& key, const std::string& defaultValue = "") const override; + std::map GetAllProperties() const override; + + void JoinGroup(const std::string& groupId) override; + void LeaveGroup(const std::string& groupId) override; + void LeaveAllGroups() override; + std::set GetJoinedGroups() const override; + bool IsInGroup(const std::string& groupId) const override; + + void SetData(const std::string& key, const nlohmann::json& value) override; + nlohmann::json GetData(const std::string& key, const nlohmann::json& defaultValue = {}) const override; + bool HasData(const std::string& key) const override; + void RemoveData(const std::string& key) override; + void ClearData() override; + nlohmann::json GetAllData() const override; + + std::string GetRemoteAddress() const override; + +private: + WebSocketProtocol::WebSocketConnection::Pointer wsConn_; + uint64_t sessionId_; + static std::atomic nextSessionId_; + + MessageHandler messageHandler_; + CloseHandler closeHandler_; + + std::atomic authenticated_{false}; + int64_t playerId_{0}; + std::string authToken_; + + // Groups + mutable std::mutex groupsMutex_; + std::set groups_; + + // Data + mutable std::mutex dataMutex_; + std::map data_; + + // Properties + mutable std::mutex propertiesMutex_; + std::map properties_; + + // Internal helpers + void OnMessage(const WebSocketProtocol::WebSocketMessage& msg); + void OnClose(uint16_t code, const std::string& reason); +}; \ No newline at end of file diff --git a/include/process/ProcessPool.hpp b/include/process/ProcessPool.hpp index 9bfa8ee..32c81d6 100644 --- a/include/process/ProcessPool.hpp +++ b/include/process/ProcessPool.hpp @@ -24,6 +24,7 @@ #include #include "logging/Logger.hpp" +#include "config/ConfigManager.hpp" class ProcessPool { public: @@ -32,7 +33,8 @@ class ProcessPool { WORKER }; - ProcessPool(int numProcesses); + // New constructor: takes a list of worker groups + ProcessPool(const std::vector& groups); ~ProcessPool(); ProcessPool(const ProcessPool&) = delete; @@ -45,10 +47,13 @@ class ProcessPool { ProcessRole GetRole() const { return role_; } int GetWorkerId() const { return workerId_; } + const WorkerGroupConfig& GetWorkerGroupConfig() const { return groupConfig_; } pid_t GetMasterPid() const { return masterPid_; } + int GetTotalWorkerCount() const { return totalWorkers_; } - // Callback for worker process - void SetWorkerMain(std::function workerMainFunc); + // Callback for worker process – now receives both global ID and group config + using WorkerMainFunc = std::function; + void SetWorkerMain(WorkerMainFunc workerMainFunc); // Inter-process communication with message length prefix bool SendToWorker(int workerId, const std::string& message); @@ -68,37 +73,47 @@ class ProcessPool { void UnblockSignals(const sigset_t* oldset); private: + struct WorkerInfo { + pid_t pid; + int groupIdx; // index in groups_ vector + int localWorkerId; // index within the group (0..group.count-1) + WorkerGroupConfig config; + }; + void MasterProcess(); - void WorkerProcess(int workerId); + void WorkerProcess(int globalWorkerId, const WorkerGroupConfig& config); void SetupSignalHandlers(); void CleanupDeadWorkers(); void CloseAllPipes(); - void CreateWorkerPipe(int workerId); - + void CreateWorkerPipe(int globalWorkerId); + // Helper functions for message protocol bool WriteAll(int fd, const void* buffer, size_t count); bool ReadAll(int fd, void* buffer, size_t count, bool nonBlocking = false); bool DrainPipe(int fd, size_t bytesToDrain); - int numProcesses_; + // Group configuration + std::vector groups_; + int totalWorkers_; + ProcessRole role_{ProcessRole::MASTER}; - int workerId_{-1}; + int workerId_{-1}; // global ID (for worker) + WorkerGroupConfig groupConfig_; // config for this worker (filled after fork) pid_t masterPid_{-1}; - + std::thread masterThread_; std::atomic running_{false}; std::atomic shutdownRequested_{false}; - std::vector workerPids_; - std::function workerMainFunc_; + std::vector workers_; // indexed by global worker ID + std::vector workerPipes_; // [read_fd, write_fd] for each global worker - // IPC mechanisms - std::vector workerPipes_; // [read_fd, write_fd] for each worker + WorkerMainFunc workerMainFunc_; - // Health monitoring + // Health monitoring (keyed by global worker ID) mutable std::mutex healthMutex_; std::unordered_map> workerHealth_; - + // Message protocol configuration uint32_t maxMessageSize_{1024 * 1024}; // 1MB default uint32_t receiveTimeoutMs_{1000}; // 1 second default diff --git a/src/config/ConfigManager.cpp b/src/config/ConfigManager.cpp index fee21af..4c307cb 100644 --- a/src/config/ConfigManager.cpp +++ b/src/config/ConfigManager.cpp @@ -6,9 +6,7 @@ ConfigManager& ConfigManager::GetInstance() { } bool ConfigManager::LoadConfig(const std::string& configPath) { - std::lock_guard lock(configMutex_); - configPath_ = configPath; - + nlohmann::json loaded; try { std::ifstream configFile(configPath); if (!configFile.is_open()) { @@ -18,12 +16,7 @@ bool ConfigManager::LoadConfig(const std::string& configPath) { std::stringstream buffer; buffer << configFile.rdbuf(); - config_ = nlohmann::json::parse(buffer.str()); - - Logger::Info("Configuration loaded successfully from: {}", configPath); - - // Validate configuration - return ValidateConfig(); + loaded = nlohmann::json::parse(buffer.str()); } catch (const nlohmann::json::parse_error& e) { Logger::Critical("JSON parse error in config file: {}", e.what()); @@ -32,6 +25,20 @@ bool ConfigManager::LoadConfig(const std::string& configPath) { Logger::Critical("Failed to load config: {}", e.what()); return false; } + + if (!ValidateConfig(loaded)) { + Logger::Critical("Configuration validation failed."); + return false; + } + + { + std::lock_guard lock(configMutex_); + config_ = std::move(loaded); + configPath_ = configPath; + } + + Logger::Info("Configuration loaded successfully from: {}", configPath); + return true; } bool ConfigManager::ReloadConfig() { @@ -39,71 +46,144 @@ bool ConfigManager::ReloadConfig() { Logger::Error("No config file path set for reload"); return false; } - Logger::Info("Reloading configuration from: {}", configPath_); return LoadConfig(configPath_); } -bool ConfigManager::ValidateConfig() const { +bool ConfigManager::HasProcessConfig() const { + //ATTENTION: RECURSIVELY CALL MUTEX LOCK, DO NOT USE LINE BELOW + //std::lock_guard lock(configMutex_); + return config_.contains("process") && config_["process"].contains("workers") && + config_["process"]["workers"].is_array() && !config_["process"]["workers"].empty(); +} + +std::vector ConfigManager::GetWorkerGroups() const { + std::lock_guard lock(configMutex_); + std::vector groups; + + if (!HasProcessConfig())//ATTENTION: RECURSIVELY CALL MUTEX LOCK + return groups; + + for (const auto& w : config_["process"]["workers"]) { + WorkerGroupConfig g; + g.protocol = w.value("protocol", "binary"); + g.host = w.value("host", "0.0.0.0"); + g.port = static_cast(w.value("port", 8080)); + g.max_connections = w.value("max_connections", 1000); + g.reuse = w.value("reuse", true); + g.threads = w.value("threads", 1); + g.count = w.value("count", 1); + g.cpu_affinity = w.value("cpu_affinity", std::vector()); + g.tcp_nodelay = w.value("tcp_nodelay", true); + g.send_buffer_size = w.value("send_buffer_size", 0); + g.receive_buffer_size = w.value("receive_buffer_size", 0); + g.path = w.value("path", "/"); + g.subprotocols = w.value("subprotocols", std::vector()); + g.max_frame_size = w.value("max_frame_size", 16384); + if (w.contains("ssl")) { + SSLConfig ssl; + ssl.certificate = w["ssl"].value("certificate", ""); + ssl.private_key = w["ssl"].value("private_key", ""); + ssl.dh_params = w["ssl"].value("dh_params", ""); + ssl.verify_peer = w["ssl"].value("verify_peer", false); + ssl.ciphers = w["ssl"].value("ciphers", std::vector()); + ssl.ca_cert = w["ssl"].value("ca_cert", ""); + g.ssl = ssl; + } + groups.push_back(g); + } + return groups; +} + +int ConfigManager::GetTotalWorkerCount() const { + int total = 0; + for (const auto& g : GetWorkerGroups()) + total += g.count; + return total; +} + +int ConfigManager::GetTotalThreadCount() const { + int total = 0; + for (const auto& g : GetWorkerGroups()) + total += g.threads * g.count; // each worker in group has its own threads + return total; +} + +bool ConfigManager::ValidateConfig(const nlohmann::json& config) const { Logger::Info("Validate config started..."); try { - if (config_.contains("server")) - Logger::Info("Validate config 'server' section..."); - else - throw std::runtime_error("Missing 'server' section"); - const auto& server = config_["server"]; - if (!server.contains("host") || !server["host"].is_string()) { - throw std::runtime_error("Invalid or missing 'server.host'"); - } - if (!server.contains("port") || !server["port"].is_number_unsigned()) { - throw std::runtime_error("Invalid or missing 'server.port'"); + if (!config.contains("process") || !config["process"].contains("workers") || + !config["process"]["workers"].is_array() || config["process"]["workers"].empty()) { + throw std::runtime_error("Missing 'process.workers' array section"); } - if (server["port"].get() == 0) { - throw std::runtime_error("Invalid server port"); + + const auto& workers = config["process"]["workers"]; + for (size_t i = 0; i < workers.size(); ++i) { + const auto& w = workers[i]; + std::string proto = w.value("protocol", "binary"); + if (proto != "binary" && proto != "websocket") { + throw std::runtime_error("Worker group " + std::to_string(i) + + ": invalid protocol '" + proto + "' (must be 'binary' or 'websocket')"); + } + if (w.value("port", 0) == 0) { + throw std::runtime_error("Worker group " + std::to_string(i) + ": missing or zero port"); + } + int count = w.value("count", 1); + if (count <= 0) { + throw std::runtime_error("Worker group " + std::to_string(i) + ": count must be positive"); + } + int threads = w.value("threads", 1); + if (threads <= 0) { + throw std::runtime_error("Worker group " + std::to_string(i) + ": threads must be positive"); + } + if (proto == "websocket" && w.contains("ssl")) { + const auto& ssl = w["ssl"]; + if (!ssl.contains("certificate") || !ssl["certificate"].is_string() || + !ssl.contains("private_key") || !ssl["private_key"].is_string()) { + throw std::runtime_error("Worker group " + std::to_string(i) + + ": WebSocket SSL requires 'certificate' and 'private_key'"); + } + } } - if (config_.contains("database")) - Logger::Info("Validate config 'database' section..."); - else + // Validate database section + if (config.contains("database")) { + const auto& database = config["database"]; + if (!database.contains("name") || !database["name"].is_string()) { + throw std::runtime_error("Invalid or missing 'database.name'"); + } + } else { throw std::runtime_error("Missing 'database' section"); - const auto& database = config_["database"]; - if (!database.contains("host") || !database["host"].is_string()) { - Logger::Warn("database.host not set, will use default 127.0.0.1"); - } - if (!database.contains("port") || !database["port"].is_number_unsigned()) { - Logger::Warn("database.port not set, will use default 5432"); - } - if (!database.contains("name") || !database["name"].is_string()) { - throw std::runtime_error("Invalid or missing 'database.name'"); } - if (config_.contains("game")) - Logger::Info("Validate config 'game' section..."); - else - throw std::runtime_error("Missing 'game' section"); - const auto& game = config_["game"]; - if (!game.contains("max_players_per_session") || - !game["max_players_per_session"].is_number_unsigned()) { - throw std::runtime_error("Invalid or missing 'game.max_players_per_session'"); + // Validate game section + if (config.contains("game")) { + const auto& game = config["game"]; + if (!game.contains("max_players_per_session") || + !game["max_players_per_session"].is_number_unsigned()) { + throw std::runtime_error("Invalid or missing 'game.max_players_per_session'"); } + } else { + throw std::runtime_error("Missing 'game' section"); + } - if (config_.contains("logging")) - Logger::Info("Validate config 'logging' section..."); - else + // Validate logging section + if (config.contains("logging")) { + const auto& logging = config["logging"]; + if (!logging.contains("level") || !logging["level"].is_string()) { + throw std::runtime_error("Invalid or missing 'logging.level'"); + } + const std::string logLevel = logging["level"]; + const std::vector validLevels = { + "trace", "debug", "info", "warn", "error", "critical", "off" + }; + std::string lowerLevel = logLevel; + std::transform(lowerLevel.begin(), lowerLevel.end(), lowerLevel.begin(), ::tolower); + if (std::find(validLevels.begin(), validLevels.end(), lowerLevel) == validLevels.end()) { + throw std::runtime_error("Invalid log level: " + logLevel); + } + } else { throw std::runtime_error("Missing 'logging' section"); - const auto& logging = config_["logging"]; - if (!logging.contains("level") || !logging["level"].is_string()) { - throw std::runtime_error("Invalid or missing 'logging.level'"); - } - // Validate log levels - const std::string logLevel = logging["level"]; - const std::vector validLevels = { - "trace", "debug", "info", "warn", "error", "critical", "off" - }; - std::string lowerLevel = logLevel; - std::transform(lowerLevel.begin(), lowerLevel.end(), lowerLevel.begin(), ::tolower); - if (std::find(validLevels.begin(), validLevels.end(), lowerLevel) == validLevels.end()) { - throw std::runtime_error("Invalid log level: " + logLevel); } Logger::Info("Configuration validation passed"); @@ -115,6 +195,9 @@ bool ConfigManager::ValidateConfig() const { } } +// -------------------------------------------------------------------------- +// Setters (unchanged) +// -------------------------------------------------------------------------- void ConfigManager::SetBool(const std::string& key, bool value) { std::lock_guard lock(configMutex_); std::string keyPath = key; @@ -155,74 +238,14 @@ void ConfigManager::SetJson(const std::string& key, const nlohmann::json& value) config_[ptr] = value; } -// Server configuration getters -std::string ConfigManager::GetServerHost() const { - std::lock_guard lock(configMutex_); - try { - return config_.at("server").at("host").get(); - } catch (const std::exception& e) { - Logger::Warn("Failed to get server host, using default: 0.0.0.0"); - return "0.0.0.0"; - } -} - -uint16_t ConfigManager::GetServerPort() const { - std::lock_guard lock(configMutex_); - try { - return config_.at("server").at("port").get(); - } catch (const std::exception& e) { - Logger::Warn("Failed to get server port, using default: 8080"); - return 8080; - } -} - -int ConfigManager::GetMaxConnections() const { - std::lock_guard lock(configMutex_); - try { - return config_.at("server").at("max_connections").get(); - } catch (const std::exception& e) { - Logger::Warn("Failed to get max connections, using default: 10000"); - return 10000; - } -} - -int ConfigManager::GetIoThreads() const { - std::lock_guard lock(configMutex_); - try { - return config_.at("server").at("io_threads").get(); - } catch (const std::exception& e) { - Logger::Warn("Failed to get IO threads, using default: 4"); - return 4; - } -} - -bool ConfigManager::GetReusePort() const { - std::lock_guard lock(configMutex_); - try { - return config_.at("server").at("reuse_port").get(); - } catch (const std::exception& e) { - Logger::Warn("Failed to get reuse_port, using default: true"); - return true; - } -} - -int ConfigManager::GetProcessCount() const { - std::lock_guard lock(configMutex_); - try { - return config_.at("server").at("process_count").get(); - } catch (const std::exception& e) { - Logger::Warn("Failed to get process count, using default: 4"); - return 4; - } -} - -// Database configuration getters +// -------------------------------------------------------------------------- +// Database configuration getters (unchanged) +// -------------------------------------------------------------------------- std::string ConfigManager::GetDatabaseHost() const { std::lock_guard lock(configMutex_); try { return config_.at("database").at("host").get(); - } catch (const std::exception& e) { - Logger::Warn("Failed to get database host, using default: 127.0.0.1"); + } catch (const std::exception& err) {Logger::Warn("failed: {}", err.what()); return "127.0.0.1"; } } @@ -231,8 +254,7 @@ uint16_t ConfigManager::GetDatabasePort() const { std::lock_guard lock(configMutex_); try { return config_.at("database").at("port").get(); - } catch (const std::exception& e) { - Logger::Warn("Failed to get database port, using default: 5432"); + } catch (const std::exception& err) {Logger::Warn("failed: {}", err.what()); return 5432; } } @@ -241,8 +263,7 @@ std::string ConfigManager::GetDatabaseName() const { std::lock_guard lock(configMutex_); try { return config_.at("database").at("name").get(); - } catch (const std::exception& e) { - Logger::Warn("Failed to get database name, using default: game_db"); + } catch (const std::exception& err) {Logger::Warn("failed: {}", err.what()); return "game_db"; } } @@ -251,8 +272,7 @@ std::string ConfigManager::GetDatabaseUser() const { std::lock_guard lock(configMutex_); try { return config_.at("database").at("user").get(); - } catch (const std::exception& e) { - Logger::Warn("Failed to get database user, using default: game_user"); + } catch (const std::exception& err) {Logger::Warn("failed: {}", err.what()); return "game_user"; } } @@ -261,8 +281,7 @@ std::string ConfigManager::GetDatabasePassword() const { std::lock_guard lock(configMutex_); try { return config_.at("database").at("password").get(); - } catch (const std::exception& e) { - Logger::Warn("Failed to get database password, using empty default"); + } catch (const std::exception& err) {Logger::Warn("failed: {}", err.what()); return ""; } } @@ -271,8 +290,7 @@ std::string ConfigManager::GetDatabaseBackend() const { std::lock_guard lock(configMutex_); try { return config_.at("database").at("backend").get(); - } catch (const std::exception& e) { - Logger::Warn("Failed to get database backend, using default: postgresql"); + } catch (const std::exception& err) {Logger::Warn("failed: {}", err.what()); return "postgresql"; } } @@ -281,8 +299,7 @@ int ConfigManager::GetDatabasePoolSize() const { std::lock_guard lock(configMutex_); try { return config_.at("database").at("pool_size").get(); - } catch (const std::exception& e) { - Logger::Warn("Failed to get database pool size, using default: 10"); + } catch (const std::exception& err) {Logger::Warn("failed: {}", err.what()); return 10; } } @@ -294,21 +311,11 @@ std::vector ConfigManager::GetCitusWorkerNodes() const { auto& db = config_.at("database"); if (db.contains("citus_worker_nodes") && db["citus_worker_nodes"].is_array()) { for (const auto& node : db["citus_worker_nodes"]) { - if (node.is_string()) { + if (node.is_string()) nodes.push_back(node.get()); - } } } - } catch (const std::exception& e) { - Logger::Warn("Failed to get Citus worker nodes, using empty list"); - } - // Add default coordinator if present - if (nodes.empty() && config_.contains("database") && config_["database"].contains("citus_coordinator")) { - try { - std::string coordinator = config_["database"]["citus_coordinator"].get(); - nodes.push_back(coordinator + ":5432"); - } catch (...) {} - } + } catch (const std::exception& err) {Logger::Warn("failed: {}", err.what());} return nodes; } @@ -316,19 +323,19 @@ int ConfigManager::GetShardCount() const { std::lock_guard lock(configMutex_); try { return config_.at("database").at("shard_count").get(); - } catch (const std::exception& e) { - Logger::Warn("Failed to get shard count, using default: 32"); + } catch (const std::exception& err) {Logger::Warn("failed: {}", err.what()); return 32; } } -// Game configuration getters +// -------------------------------------------------------------------------- +// Game configuration getters (unchanged) +// -------------------------------------------------------------------------- int ConfigManager::GetMaxPlayersPerSession() const { std::lock_guard lock(configMutex_); try { return config_.at("game").at("max_players_per_session").get(); - } catch (const std::exception& e) { - Logger::Warn("Failed to get max players per session, using default: 100"); + } catch (const std::exception& err) {Logger::Warn("failed: {}", err.what()); return 100; } } @@ -337,8 +344,7 @@ int ConfigManager::GetHeartbeatInterval() const { std::lock_guard lock(configMutex_); try { return config_.at("game").at("heartbeat_interval_seconds").get(); - } catch (const std::exception& e) { - Logger::Warn("Failed to get heartbeat interval, using default: 30"); + } catch (const std::exception& err) {Logger::Warn("failed: {}", err.what()); return 30; } } @@ -347,19 +353,19 @@ int ConfigManager::GetSessionTimeout() const { std::lock_guard lock(configMutex_); try { return config_.at("game").at("session_timeout_seconds").get(); - } catch (const std::exception& e) { - Logger::Warn("Failed to get session timeout, using default: 300"); + } catch (const std::exception& err) {Logger::Warn("failed: {}", err.what()); return 300; } } -// 3D World configuration getters +// -------------------------------------------------------------------------- +// World configuration getters (unchanged) +// -------------------------------------------------------------------------- int ConfigManager::GetWorldSeed() const { std::lock_guard lock(configMutex_); try { return config_.at("world").at("seed").get(); - } catch (const std::exception& e) { - Logger::Warn("Failed to get world seed, using default: 12345"); + } catch (const std::exception& err) {Logger::Warn("failed: {}", err.what()); return 12345; } } @@ -368,8 +374,7 @@ int ConfigManager::GetViewDistance() const { std::lock_guard lock(configMutex_); try { return config_.at("world").at("view_distance").get(); - } catch (const std::exception& e) { - Logger::Warn("Failed to get view distance, using default: 1000"); + } catch (const std::exception& err) {Logger::Warn("failed: {}", err.what()); return 1000; } } @@ -378,8 +383,7 @@ int ConfigManager::GetChunkSize() const { std::lock_guard lock(configMutex_); try { return config_.at("world").at("chunk_size").get(); - } catch (const std::exception& e) { - Logger::Warn("Failed to get chunk size, using default: 32"); + } catch (const std::exception& err) {Logger::Warn("failed: {}", err.what()); return 32; } } @@ -388,8 +392,7 @@ int ConfigManager::GetMaxActiveChunks() const { std::lock_guard lock(configMutex_); try { return config_.at("world").at("max_active_chunks").get(); - } catch (const std::exception& e) { - Logger::Warn("Failed to get max active chunks, using default: 1000"); + } catch (const std::exception& err) {Logger::Warn("failed: {}", err.what()); return 1000; } } @@ -398,8 +401,7 @@ float ConfigManager::GetTerrainScale() const { std::lock_guard lock(configMutex_); try { return config_.at("world").at("terrain_scale").get(); - } catch (const std::exception& e) { - Logger::Warn("Failed to get terrain scale, using default: 1.0"); + } catch (const std::exception& err) {Logger::Warn("failed: {}", err.what()); return 1.0f; } } @@ -408,8 +410,7 @@ float ConfigManager::GetMaxTerrainHeight() const { std::lock_guard lock(configMutex_); try { return config_.at("world").at("max_terrain_height").get(); - } catch (const std::exception& e) { - Logger::Warn("Failed to get max terrain height, using default: 100.0"); + } catch (const std::exception& err) {Logger::Warn("failed: {}", err.what()); return 100.0f; } } @@ -418,8 +419,7 @@ float ConfigManager::GetWaterLevel() const { std::lock_guard lock(configMutex_); try { return config_.at("world").at("water_level").get(); - } catch (const std::exception& e) { - Logger::Warn("Failed to get water level, using default: 10.0"); + } catch (const std::exception& err) {Logger::Warn("failed: {}", err.what()); return 10.0f; } } @@ -428,8 +428,7 @@ bool ConfigManager::ShouldPreloadWorld() const { std::lock_guard lock(configMutex_); try { return config_.at("world").at("preload_world").get(); - } catch (const std::exception& e) { - Logger::Warn("Failed to get preload world setting, using default: false"); + } catch (const std::exception& err) {Logger::Warn("failed: {}", err.what()); return false; } } @@ -438,21 +437,21 @@ int ConfigManager::GetWorldPreloadRadius() const { std::lock_guard lock(configMutex_); try { return config_.at("world").at("preload_radius").get(); - } catch (const std::exception& e) { - Logger::Warn("Failed to get world preload radius, using default: 500"); + } catch (const std::exception& err) {Logger::Warn("failed: {}", err.what()); return 500; } } -// Logging configuration getters +// -------------------------------------------------------------------------- +// Logging configuration getters (unchanged) +// -------------------------------------------------------------------------- std::string ConfigManager::GetLogLevel() const { std::lock_guard lock(configMutex_); try { std::string level = config_.at("logging").at("level").get(); std::transform(level.begin(), level.end(), level.begin(), ::tolower); return level; - } catch (const std::exception& e) { - Logger::Warn("Failed to get log level, using default: info"); + } catch (const std::exception& err) {Logger::Warn("failed: {}", err.what()); return "info"; } } @@ -461,8 +460,7 @@ std::string ConfigManager::GetLogFilePath() const { std::lock_guard lock(configMutex_); try { return config_.at("logging").at("file").get(); - } catch (const std::exception& e) { - Logger::Warn("Failed to get log file path, using default: gameserver.log"); + } catch (const std::exception& err) {Logger::Warn("failed: {}", err.what()); return "gameserver.log"; } } @@ -471,8 +469,7 @@ int ConfigManager::GetMaxLogFileSize() const { std::lock_guard lock(configMutex_); try { return config_.at("logging").at("max_file_size_mb").get(); - } catch (const std::exception& e) { - Logger::Warn("Failed to get max log file size, using default: 100"); + } catch (const std::exception& err) {Logger::Warn("failed: {}", err.what()); return 100; } } @@ -481,8 +478,7 @@ int ConfigManager::GetMaxLogFiles() const { std::lock_guard lock(configMutex_); try { return config_.at("logging").at("max_files").get(); - } catch (const std::exception& e) { - Logger::Warn("Failed to get max log files, using default: 10"); + } catch (const std::exception& err) {Logger::Warn("failed: {}", err.what()); return 10; } } @@ -491,21 +487,21 @@ bool ConfigManager::GetConsoleOutput() const { std::lock_guard lock(configMutex_); try { return config_.at("logging").at("console_output").get(); - } catch (const std::exception& e) { - Logger::Warn("Failed to get console output setting, using default: true"); + } catch (const std::exception& err) {Logger::Warn("failed: {}", err.what()); return true; } } -// Generic config accessors +// -------------------------------------------------------------------------- +// Generic config accessors (unchanged) +// -------------------------------------------------------------------------- int ConfigManager::GetInt(const std::string& key, int defaultValue) const { std::lock_guard lock(configMutex_); try { std::string keyPath = key; std::replace(keyPath.begin(), keyPath.end(), '.', '/'); return config_.at(nlohmann::json::json_pointer("/" + keyPath)).get(); - } catch (const std::exception& e) { - Logger::Warn("Failed to get int for key '{}': {}", key, e.what()); + } catch (const std::exception& err) {Logger::Warn("failed: {}", err.what()); return defaultValue; } } @@ -516,8 +512,7 @@ float ConfigManager::GetFloat(const std::string& key, float defaultValue) const std::string keyPath = key; std::replace(keyPath.begin(), keyPath.end(), '.', '/'); return config_.at(nlohmann::json::json_pointer("/" + keyPath)).get(); - } catch (const std::exception& e) { - Logger::Warn("Failed to get float for key '{}': {}", key, e.what()); + } catch (const std::exception& err) {Logger::Warn("failed: {}", err.what()); return defaultValue; } } @@ -528,8 +523,7 @@ bool ConfigManager::GetBool(const std::string& key, bool defaultValue) const { std::string keyPath = key; std::replace(keyPath.begin(), keyPath.end(), '.', '/'); return config_.at(nlohmann::json::json_pointer("/" + keyPath)).get(); - } catch (const std::exception& e) { - Logger::Warn("Failed to get bool for key '{}': {}", key, e.what()); + } catch (const std::exception& err) {Logger::Warn("failed: {}", err.what()); return defaultValue; } } @@ -540,8 +534,7 @@ std::string ConfigManager::GetString(const std::string& key, const std::string& std::string keyPath = key; std::replace(keyPath.begin(), keyPath.end(), '.', '/'); return config_.at(nlohmann::json::json_pointer("/" + keyPath)).get(); - } catch (const std::exception& e) { - Logger::Warn("Failed to get string for key '{}': {}", key, e.what()); + } catch (const std::exception& err) {Logger::Warn("failed: {}", err.what()); return defaultValue; } } @@ -555,16 +548,13 @@ std::vector ConfigManager::GetStringArray(const std::string& key) c auto& arr = config_.at(nlohmann::json::json_pointer("/" + keyPath)); if (arr.is_array()) { for (const auto& item : arr) { - if (item.is_string()) { + if (item.is_string()) result.push_back(item.get()); - } else { + else result.push_back(item.dump()); - } } } - } catch (const std::exception& e) { - Logger::Warn("Failed to get string array for key '{}': {}", key, e.what()); - } + } catch (const std::exception& err) {Logger::Warn("failed: {}", err.what());} return result; } @@ -574,8 +564,7 @@ nlohmann::json ConfigManager::GetJson(const std::string& key) const { std::string keyPath = key; std::replace(keyPath.begin(), keyPath.end(), '.', '/'); return config_.at(nlohmann::json::json_pointer("/" + keyPath)); - } catch (const std::exception& e) { - Logger::Warn("Failed to get json for key '{}': {}", key, e.what()); + } catch (const std::exception& err) {Logger::Warn("failed: {}", err.what()); return nlohmann::json(); } } @@ -586,8 +575,7 @@ bool ConfigManager::HasKey(const std::string& key) const { std::string keyPath = key; std::replace(keyPath.begin(), keyPath.end(), '.', '/'); return config_.contains(nlohmann::json::json_pointer("/" + keyPath)); - } catch (const std::exception& e) { - Logger::Warn("Failed HasKey for key '{}': {}", key, e.what()); + } catch (const std::exception& err) {Logger::Warn("failed: {}", err.what()); return false; } } diff --git a/src/main.cpp b/src/main.cpp index 3a4f5f0..1efc947 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -4,7 +4,6 @@ #include #include -//#define USE_SPDLOG 1 #include "logging/Logger.hpp" #include "config/ConfigManager.hpp" @@ -12,7 +11,6 @@ #include "network/GameServer.hpp" #include "process/ProcessPool.hpp" -//#define USE_POSTGRESQL 1 #include "database/DbManager.hpp" #include "game/GameLogic.hpp" @@ -22,18 +20,19 @@ std::atomic g_shutdown(false); void SignalHandler(int signal) { (void)signal; - //Logger::Info("Received signal {}, initiating shutdown...", signal); g_shutdown.store(true); } -void WorkerMain(int workerId, ProcessPool* processPool = nullptr, const std::string& path_config = "config.json") { +// New worker main signature: receives group config +void WorkerMain(int workerId, const WorkerGroupConfig& groupConfig, ProcessPool* processPool = nullptr, const std::string& path_config = "config.json") { try { // Initialize logging with worker-specific configuration auto& config = ConfigManager::GetInstance(); // Use worker-specific logger initialization Logger::InitializeWithWorkerId(workerId); - Logger::Info("Worker {} starting game world system", workerId); + Logger::Info("Worker {} starting game world system for group: {} ({}:{})", + workerId, groupConfig.protocol, groupConfig.host, groupConfig.port); // Initialize configuration if (!config.LoadConfig(path_config)) { @@ -101,60 +100,69 @@ void WorkerMain(int workerId, ProcessPool* processPool = nullptr, const std::str gameLogic.SetWorldConfig(worldConfig); - // Game logic will now use DbManager singleton directly instead of being passed a backend - - // Initialize and run server - GameServer server(config); + // Initialize and run server with group configuration + // Note: GameServer constructor must be updated to accept WorkerGroupConfig + GameServer server(groupConfig, config); // Pass group config and global config // Set session factory - using lambda with necessary captures - server.SetSessionFactory([workerId, processPool](asio::ip::tcp::socket socket) { - auto session = std::make_shared(std::move(socket)); - - Logger::Debug("Worker {} created new game session {}", - workerId, session->GetSessionId()); - - // Message handler - simplified for demonstration - session->SetMessageHandler([session, workerId, processPool](const nlohmann::json& msg) { - try { - std::string msgType = msg.value("type", ""); - Logger::Debug("Worker {} processing message type: {}", workerId, msgType); - - // Check if message is for inter-process communication - if (msgType == "ipc_message" && processPool) { - // Extract IPC message details - if (msg.contains("target_worker") && msg.contains("payload")) { - int targetWorker = msg["target_worker"]; - std::string payload = msg["payload"].dump(); - - // Send to another worker via master using new protocol - if (processPool->SendToWorker(targetWorker, payload)) { - Logger::Debug("Worker {} sent IPC message to worker {}", - workerId, targetWorker); - } else { - Logger::Error("Worker {} failed to send IPC message to worker {}", - workerId, targetWorker); + if (groupConfig.protocol == "binary") { + server.SetSessionFactory([workerId, processPool, &groupConfig] + (asio::ip::tcp::socket socket, + std::shared_ptr sslCtx) + { + auto session = std::make_shared(std::move(socket), sslCtx); + Logger::Debug("Worker {} created new game session {}", + workerId, session->GetSessionId()); + // Message handler - simplified for demonstration + session->SetMessageHandler([session, workerId, processPool](const nlohmann::json& msg) { + try { + std::string msgType = msg.value("type", ""); + Logger::Debug("Worker {} processing message type: {}", workerId, msgType); + // Check if message is for inter-process communication + if (msgType == "ipc_message" && processPool) { // Extract IPC message details + if (msg.contains("target_worker") && msg.contains("payload")) { + int targetWorker = msg["target_worker"]; + std::string payload = msg["payload"].dump(); + // Send to another worker via master using new protocol + if (processPool->SendToWorker(targetWorker, payload)) { + Logger::Debug("Worker {} sent IPC message to worker {}", + workerId, targetWorker); + } else { + Logger::Error("Worker {} failed to send IPC message to worker {}", + workerId, targetWorker); + } } + } else { // Regular game message - delegate to game logic + GameLogic::GetInstance().HandleMessage(session->GetSessionId(), msg); } - } else { - // Regular game message - delegate to game logic - GameLogic::GetInstance().HandleMessage(session->GetSessionId(), msg); + } catch (const std::exception& e) { + Logger::Error("Worker {} error processing message: {}", workerId, e.what()); + session->SendError("Internal server error", 500); } - } catch (const std::exception& e) { - Logger::Error("Worker {} error processing message: {}", workerId, e.what()); - session->SendError("Internal server error", 500); - } + }); + session->SetCloseHandler([session, workerId]() { // Close handler + Logger::Info("Worker {} session {} closing", workerId, session->GetSessionId()); + GameLogic::GetInstance().OnPlayerDisconnected(session->GetSessionId()); + Logger::Debug("Worker {} session {} cleanup complete", workerId, session->GetSessionId()); + }); + return session; }); - - // Close handler - session->SetCloseHandler([session, workerId]() { - Logger::Info("Worker {} session {} closing", workerId, session->GetSessionId()); - - GameLogic::GetInstance().OnPlayerDisconnected(session->GetSessionId()); - Logger::Debug("Worker {} session {} cleanup complete", workerId, session->GetSessionId()); + } else if (groupConfig.protocol == "websocket") { + server.SetWebSocketConnectionFactory([workerId, processPool, &groupConfig](asio::ip::tcp::socket socket, std::shared_ptr sslCtx) { + // Create a WebSocketConnection (which handles the upgrade) + auto wsConn = std::make_shared(std::move(socket)); + if (sslCtx) { + // For wss, we need to do SSL handshake before WebSocket upgrade. + // This is more complex; we might need a separate SSL stream. + // For now, assume SSL is handled by the acceptor (e.g., using asio::ssl::stream). + // We'll need to extend WebSocketConnection to accept an SSL stream. + // This is a more advanced integration; for simplicity, we can require SSL to be handled by the listener (i.e., wss://) which will already have an SSL stream. + // The current WebSocketConnection doesn't support SSL; we may need to create an SSL version. + // As a simplification, we can defer SSL WebSocket support. + } + return wsConn; }); - - return session; - }); + } // Pass connection manager to game logic before initialization gameLogic.SetConnectionManager(ConnectionManager::GetInstancePtr()); @@ -168,7 +176,8 @@ void WorkerMain(int workerId, ProcessPool* processPool = nullptr, const std::str // Initialize and run server if (server.Initialize()) { - Logger::Info("Worker {} game server initialized on port {}", workerId, config.GetServerPort()); + Logger::Info("Worker {} game server initialized on {}:{} (protocol: {})", + workerId, groupConfig.host, groupConfig.port, groupConfig.protocol); // Start background world maintenance thread std::atomic worldMaintenanceRunning{true}; @@ -267,11 +276,13 @@ int main(int argc, char* argv[]) { std::string conf_path = "config/core.json"; auto& config = ConfigManager::GetInstance(); if (!config.LoadConfig(conf_path)) { - std::cerr << "Failed to load configuration" << std::endl; + //std::cerr << "Failed to load configuration" << std::endl; + Logger::Critical("Failed to load configuration."); return 1; } else { - std::cout << "Success to load configuration" << std::endl; + //std::cout << "Success to load configuration" << std::endl; + Logger::Info("Success to load configuration."); } // Initialize logger with the actual config settings @@ -299,9 +310,15 @@ int main(int argc, char* argv[]) { } Logger::Info("{} commands ({})", argc, cmdline); - // Create process pool - int processCount = config.GetProcessCount(); - ProcessPool processPool(processCount); + // Get worker groups from config + auto groups = config.GetWorkerGroups(); + if (groups.empty()) { + Logger::Critical("No worker groups configured"); + return 1; + } + + // Create process pool with groups + ProcessPool processPool(groups); // Configure process pool message protocol from config uint32_t maxMessageSize = config.GetInt("process.max_message_size", 1048576); // 1MB default @@ -313,26 +330,28 @@ int main(int argc, char* argv[]) { Logger::Info("Process pool configured: max message size = {} bytes, timeout = {}ms", maxMessageSize, receiveTimeout); - // Create a lambda to capture processPool pointer for WorkerMain - auto workerMainWithPool = [&processPool, &conf_path](int workerId) { - WorkerMain(workerId, &processPool, conf_path); + // Create a lambda that captures processPool pointer and group configs (though group config will be passed by worker) + // But we need to pass the global config path. The worker will load it itself. + auto workerMainWithPool = [&processPool, &conf_path](int workerId, const WorkerGroupConfig& groupConfig) { + WorkerMain(workerId, groupConfig, &processPool, conf_path); }; - // Set worker main function with process pool context + // Set worker main function with the new signature processPool.SetWorkerMain(workerMainWithPool); - // Initialize as master process - Logger::Info("Starting {} worker processes for world", processCount); + // Initialize and run process pool (will fork workers) + Logger::Info("Starting {} worker processes", processPool.GetTotalWorkerCount()); processPool.Run(); // Send test messages to workers using new protocol - std::thread masterMessagingThread([&processPool, processCount]() { + std::thread masterMessagingThread([&processPool]() { std::this_thread::sleep_for(std::chrono::seconds(3)); // Wait for workers to start Logger::Info("Master process starting IPC message test"); // Send a test message to each worker - for (int i = 0; i < processCount; i++) { + int totalWorkers = processPool.GetTotalWorkerCount(); + for (int i = 0; i < totalWorkers; i++) { // Skip dead workers if (!processPool.IsWorkerAlive(i)) { Logger::Warn("Master skipping welcome message to dead worker {}", i); @@ -364,7 +383,8 @@ int main(int argc, char* argv[]) { Logger::Info("Master process waiting for shutdown signal..."); while (!g_shutdown.load()) { // Periodically check worker health - for (int i = 0; i < processCount; i++) { + int totalWorkers = processPool.GetTotalWorkerCount(); + for (int i = 0; i < totalWorkers; i++) { if (!processPool.IsWorkerAlive(i)) { Logger::Warn("Master detected worker {} is not alive", i); } @@ -380,7 +400,7 @@ int main(int argc, char* argv[]) { static int heartbeatCount = 0; heartbeatCount++; - for (int i = 0; i < processCount; i++) { + for (int i = 0; i < totalWorkers; i++) { // Stop sending if shutdown requested if (g_shutdown.load()) break; @@ -405,7 +425,7 @@ int main(int argc, char* argv[]) { if (statusUpdateCount % 10 == 0) { // Every 10 seconds (since sleep is 1 sec) nlohmann::json serverStatus; serverStatus["type"] = "server_status"; - serverStatus["online_workers"] = processCount; + serverStatus["online_workers"] = totalWorkers; serverStatus["timestamp"] = std::chrono::system_clock::now().time_since_epoch().count(); // Send broadcast command to all workers @@ -414,7 +434,7 @@ int main(int argc, char* argv[]) { broadcastMsg["data"] = serverStatus; std::string broadcastSerialized = broadcastMsg.dump(); - for (int i = 0; i < processCount; i++) { + for (int i = 0; i < totalWorkers; i++) { if (g_shutdown.load()) break; if (!processPool.IsWorkerAlive(i)) continue; processPool.SendToWorker(i, broadcastSerialized); diff --git a/src/network/ConnectionManager.cpp b/src/network/ConnectionManager.cpp index 557846e..333e2a6 100644 --- a/src/network/ConnectionManager.cpp +++ b/src/network/ConnectionManager.cpp @@ -36,7 +36,7 @@ ConnectionManager::~ConnectionManager() { Logger::Info("ConnectionManager destroyed"); } -void ConnectionManager::Start(std::shared_ptr session) { +void ConnectionManager::Start(std::shared_ptr session) { if (!session) { Logger::Error("Cannot start null session"); return; @@ -63,25 +63,18 @@ void ConnectionManager::Start(std::shared_ptr session) { // Record session start time for statistics std::lock_guard statsLock(statsMutex_); sessionStats_[sessionId] = SessionStatsInfo{std::chrono::system_clock::now(),std::chrono::system_clock::now(),0,0,0,0}; - // .start_time = std::chrono::system_clock::now(), - // .last_activity = std::chrono::system_clock::now(), - // .messages_sent = 0, - // .messages_received = 0, - // .bytes_sent = 0, - // .bytes_received = 0 - // }; Logger::Info("Session {} started. Total connections: {}", sessionId, totalConnections_.load()); // Emit connection event - EmitEvent("connection_started", { + EmitEvent("connection_started", nlohmann::json{ {"session_id", sessionId}, - {"remote_endpoint", session->GetRemoteEndpoint().address().to_string()} + {"remote_endpoint", session->GetRemoteAddress()} }); } -void ConnectionManager::Stop(std::shared_ptr session) { +void ConnectionManager::Stop(std::shared_ptr session) { if (!session) { return; } @@ -132,7 +125,7 @@ void ConnectionManager::Stop(std::shared_ptr session) { void ConnectionManager::StopAll() { Logger::Info("Stopping all connections..."); - std::vector> allSessions; + std::vector> allSessions; { std::unique_lock lock(sessionsMutex_); @@ -167,9 +160,9 @@ size_t ConnectionManager::GetConnectionCount() const { return totalConnections_.load(); } -std::vector> ConnectionManager::GetAllSessions() const { +std::vector> ConnectionManager::GetAllSessions() const { std::shared_lock lock(sessionsMutex_); - std::vector> result; + std::vector> result; result.reserve(sessions_.size()); for (const auto& [id, session] : sessions_) { @@ -179,7 +172,7 @@ std::vector> ConnectionManager::GetAllSessions() co return result; } -std::shared_ptr ConnectionManager::GetSession(uint64_t sessionId) const { +std::shared_ptr ConnectionManager::GetSession(uint64_t sessionId) const { std::shared_lock lock(sessionsMutex_); auto it = sessions_.find(sessionId); if (it != sessions_.end()) { @@ -389,8 +382,8 @@ std::set ConnectionManager::GetDefaultGroups() const { // =============== Session Query Methods =============== -std::vector> ConnectionManager::GetSessionsByPlayerId(int64_t playerId) const { - std::vector> result; +std::vector> ConnectionManager::GetSessionsByPlayerId(int64_t playerId) const { + std::vector> result; std::shared_lock lock(sessionsMutex_); for (const auto& [sessionId, session] : sessions_) { @@ -413,8 +406,8 @@ std::vector ConnectionManager::GetSessionIdsInGroup(const std::string& return {}; } -std::vector> ConnectionManager::GetSessionsInGroup(const std::string& groupId) const { - std::vector> result; +std::vector> ConnectionManager::GetSessionsInGroup(const std::string& groupId) const { + std::vector> result; std::shared_lock groupsLock(groupsMutex_); auto groupIt = groups_.find(groupId); @@ -579,7 +572,7 @@ void ConnectionManager::CleanupInactiveSessions(int timeoutSeconds) { lastCleanup_ = now; - std::vector> sessionsToRemove; + std::vector> sessionsToRemove; { std::shared_lock sessionsLock(sessionsMutex_); @@ -633,8 +626,8 @@ void ConnectionManager::DisconnectAllInGroup(const std::string& groupId) { // =============== Load Balancing and Distribution =============== -std::vector> ConnectionManager::GetSessionsByWorkerId(int workerId) const { - std::vector> result; +std::vector> ConnectionManager::GetSessionsByWorkerId(int workerId) const { + std::vector> result; std::shared_lock lock(sessionsMutex_); for (const auto& [sessionId, session] : sessions_) { @@ -704,7 +697,7 @@ void ConnectionManager::EmitEvent(const std::string& eventType, const nlohmann:: // =============== Broadcast with Filters =============== void ConnectionManager::BroadcastWithFilter(const nlohmann::json& message, - std::function)> filter) { + std::function)> filter) { std::shared_lock lock(sessionsMutex_); if (sessions_.empty()) { @@ -747,19 +740,19 @@ void ConnectionManager::BroadcastWithFilter(const nlohmann::json& message, void ConnectionManager::BroadcastExcept(uint64_t excludeSessionId, const nlohmann::json& message) { -BroadcastWithFilter(message, [excludeSessionId](std::shared_ptr session) { +BroadcastWithFilter(message, [excludeSessionId](std::shared_ptr session) { return session->GetSessionId() != excludeSessionId; }); } void ConnectionManager::BroadcastToAuthenticated(const nlohmann::json& message) { -BroadcastWithFilter(message, [](std::shared_ptr session) { +BroadcastWithFilter(message, [](std::shared_ptr session) { return session->IsAuthenticated(); }); } void ConnectionManager::BroadcastToUnauthenticated(const nlohmann::json& message) { -BroadcastWithFilter(message, [](std::shared_ptr session) { +BroadcastWithFilter(message, [](std::shared_ptr session) { return !session->IsAuthenticated(); }); } @@ -823,7 +816,7 @@ void ConnectionManager::EnforceGlobalRateLimit(int maxMessagesPerSecond) { // =============== Session Migration =============== bool ConnectionManager::MigrateSession(uint64_t sessionId, -std::shared_ptr newSession) { +std::shared_ptr newSession) { std::unique_lock lock(sessionsMutex_); auto it = sessions_.find(sessionId); diff --git a/src/network/GameServer.cpp b/src/network/GameServer.cpp index 154a8af..9844e9e 100644 --- a/src/network/GameServer.cpp +++ b/src/network/GameServer.cpp @@ -1,15 +1,26 @@ #include "network/GameServer.hpp" -GameServer::GameServer(const ConfigManager& config) -: ioContext_(config.GetIoThreads()), -acceptor_(ioContext_), -signals_(ioContext_), -config_(config) +GameServer::GameServer(const WorkerGroupConfig& groupConfig, const ConfigManager& globalConfig) + : ioContext_(groupConfig.threads), + acceptor_(ioContext_), + signals_(ioContext_), + groupConfig_(groupConfig), + globalConfig_(globalConfig), + host_(groupConfig.host), + port_(groupConfig.port), + reuse_(groupConfig.reuse), + ioThreads_(groupConfig.threads) { - host_ = config.GetServerHost(); - port_ = config.GetServerPort(); - reusePort_ = config.GetReusePort(); - ioThreads_ = config.GetIoThreads(); + // Set up SSL context if requested + if (groupConfig.ssl.has_value()) { + sslContext_ = std::make_shared(asio::ssl::context::tls_server); + sslContext_->use_certificate_chain_file(groupConfig.ssl->certificate); + sslContext_->use_private_key_file(groupConfig.ssl->private_key, asio::ssl::context::pem); + if (!groupConfig.ssl->dh_params.empty()) { + sslContext_->use_tmp_dh_file(groupConfig.ssl->dh_params); + } + // Additional SSL options can be set here + } } GameServer::~GameServer() = default; @@ -18,10 +29,10 @@ bool GameServer::Initialize() { try { asio::ip::tcp::endpoint endpoint( asio::ip::make_address(host_), - port_ + port_ ); acceptor_.open(endpoint.protocol()); - if (reusePort_) { + if (reuse_) { acceptor_.set_option(asio::ip::tcp::acceptor::reuse_address(true)); int optval = 1; if (setsockopt(acceptor_.native_handle(), @@ -30,15 +41,18 @@ bool GameServer::Initialize() { &optval, sizeof(optval)) < 0) { Logger::Error("Failed to set SO_REUSEPORT: {}", strerror(errno)); - } + } } acceptor_.bind(endpoint); - acceptor_.listen(config_.GetMaxConnections()); + acceptor_.listen(groupConfig_.max_connections); + SetupSignalHandlers(); - Logger::Info("GameServer initialized on {}:{}", host_, port_); + Logger::Info("GameServer initialized for protocol '{}' on {}:{}", + groupConfig_.protocol, host_, port_); return true; } catch (const std::exception& e) { - Logger::Critical("Failed to initialize server: {}", e.what()); + Logger::Critical("Failed to initialize server for protocol '{}': {}", + groupConfig_.protocol, e.what()); return false; } } @@ -47,27 +61,56 @@ void GameServer::Run() { running_ = true; DoAccept(); StartWorkerThreads(); - Logger::Info("GameServer started with {} IO threads", ioThreads_); + Logger::Info("GameServer started with {} IO threads for protocol '{}'", + ioThreads_, groupConfig_.protocol); ioContext_.run(); for (auto& thread : workerThreads_) { if (thread.joinable()) { thread.join(); } } - Logger::Info("GameServer run finished"); + Logger::Info("GameServer run finished for protocol '{}'", groupConfig_.protocol); } void GameServer::DoAccept() { acceptor_.async_accept( [this](std::error_code ec, asio::ip::tcp::socket socket) { if (!ec) { - if (sessionFactory_) { - auto session = sessionFactory_(std::move(socket)); - ConnectionManager::GetInstance().Start(session); - session->Start(); + // Apply socket options + if (groupConfig_.tcp_nodelay) { + asio::ip::tcp::no_delay option(true); + socket.set_option(option); + } + if (groupConfig_.send_buffer_size > 0) { + asio::socket_base::send_buffer_size option(groupConfig_.send_buffer_size); + socket.set_option(option); + } + if (groupConfig_.receive_buffer_size > 0) { + asio::socket_base::receive_buffer_size option(groupConfig_.receive_buffer_size); + socket.set_option(option); + } + + if (groupConfig_.protocol == "binary") { + if (sessionFactory_) { + auto session = sessionFactory_(std::move(socket), sslContext_); + ConnectionManager::GetInstance().Start(session); + session->Start(); + } else { + Logger::Error("No session factory set for binary protocol"); + } + } else if (groupConfig_.protocol == "websocket") { + if (webSocketFactory_) { + auto wsConn = webSocketFactory_(std::move(socket), sslContext_); + auto session = std::make_shared(wsConn); + ConnectionManager::GetInstance().Start(session); + session->Start(); + } else { + Logger::Error("No WebSocket factory set"); + } + } else { + Logger::Error("Unknown protocol: {}", groupConfig_.protocol); } } else { - // During shutdown, operation_aborted is expected, don't log as error if (ec != asio::error::operation_aborted) { Logger::Error("Accept error: {}", ec.message()); } else { @@ -107,9 +150,13 @@ void GameServer::Shutdown() { signals_.cancel(); acceptor_.close(); ioContext_.stop(); - Logger::Info("GameServer shutdown initiated"); + Logger::Info("GameServer shutdown initiated for protocol '{}'", groupConfig_.protocol); } -void GameServer::SetSessionFactory(std::function(asio::ip::tcp::socket)> factory) { +void GameServer::SetSessionFactory(SessionFactory factory) { sessionFactory_ = std::move(factory); } + +void GameServer::SetWebSocketConnectionFactory(WebSocketFactory factory) { + webSocketFactory_ = std::move(factory); +} diff --git a/src/network/WebSocketSession.cpp b/src/network/WebSocketSession.cpp new file mode 100644 index 0000000..94947f3 --- /dev/null +++ b/src/network/WebSocketSession.cpp @@ -0,0 +1,158 @@ +#include "network/WebSocketSession.hpp" + +std::atomic WebSocketSession::nextSessionId_{1}; + +WebSocketSession::WebSocketSession(WebSocketProtocol::WebSocketConnection::Pointer wsConn) + : wsConn_(std::move(wsConn)), sessionId_(nextSessionId_++) { + wsConn_->SetMessageHandler([this](const WebSocketProtocol::WebSocketMessage& msg) { + OnMessage(msg); + }); + wsConn_->SetCloseHandler([this](uint16_t code, const std::string& reason) { + OnClose(code, reason); + }); +} + +WebSocketSession::~WebSocketSession() = default; + +void WebSocketSession::Start() { wsConn_->Start(); } +void WebSocketSession::Stop() { wsConn_->Close(); } +void WebSocketSession::Disconnect() { wsConn_->Close(1000, "Disconnect"); } +bool WebSocketSession::IsConnected() const { return wsConn_->IsOpen(); } +uint64_t WebSocketSession::GetSessionId() const { return sessionId_; } + +void WebSocketSession::Send(const nlohmann::json& message) { + wsConn_->SendJson(message); +} + +void WebSocketSession::SendRaw(const std::string& data) { + wsConn_->SendText(data); +} + +void WebSocketSession::SendBinary(uint16_t /*message_type*/, const std::vector& data) { + wsConn_->SendBinary(data); +} + +void WebSocketSession::SetMessageHandler(MessageHandler handler) { + messageHandler_ = std::move(handler); +} + +void WebSocketSession::SetCloseHandler(CloseHandler handler) { + closeHandler_ = std::move(handler); +} + +void WebSocketSession::Authenticate(const std::string& authToken) { + authToken_ = authToken; + authenticated_ = true; + // Optionally validate token +} + +bool WebSocketSession::IsAuthenticated() const { return authenticated_; } +void WebSocketSession::SetPlayerId(int64_t playerId) { playerId_ = playerId; } +int64_t WebSocketSession::GetPlayerId() const { return playerId_; } +std::string WebSocketSession::GetAuthToken() const { return authToken_; } + +// Properties +void WebSocketSession::SetProperty(const std::string& key, const std::string& value) { + std::lock_guard lock(propertiesMutex_); + properties_[key] = value; +} + +std::string WebSocketSession::GetProperty(const std::string& key, const std::string& defaultValue) const { + std::lock_guard lock(propertiesMutex_); + auto it = properties_.find(key); + return it != properties_.end() ? it->second : defaultValue; +} + +std::map WebSocketSession::GetAllProperties() const { + std::lock_guard lock(propertiesMutex_); + return properties_; +} + +// Groups +void WebSocketSession::JoinGroup(const std::string& groupId) { + std::lock_guard lock(groupsMutex_); + groups_.insert(groupId); +} + +void WebSocketSession::LeaveGroup(const std::string& groupId) { + std::lock_guard lock(groupsMutex_); + groups_.erase(groupId); +} + +void WebSocketSession::LeaveAllGroups() { + std::lock_guard lock(groupsMutex_); + groups_.clear(); +} + +std::set WebSocketSession::GetJoinedGroups() const { + std::lock_guard lock(groupsMutex_); + return groups_; +} + +bool WebSocketSession::IsInGroup(const std::string& groupId) const { + std::lock_guard lock(groupsMutex_); + return groups_.find(groupId) != groups_.end(); +} + +// Data +void WebSocketSession::SetData(const std::string& key, const nlohmann::json& value) { + std::lock_guard lock(dataMutex_); + data_[key] = value; +} + +nlohmann::json WebSocketSession::GetData(const std::string& key, const nlohmann::json& defaultValue) const { + std::lock_guard lock(dataMutex_); + auto it = data_.find(key); + return it != data_.end() ? it->second : defaultValue; +} + +bool WebSocketSession::HasData(const std::string& key) const { + std::lock_guard lock(dataMutex_); + return data_.find(key) != data_.end(); +} + +void WebSocketSession::RemoveData(const std::string& key) { + std::lock_guard lock(dataMutex_); + data_.erase(key); +} + +void WebSocketSession::ClearData() { + std::lock_guard lock(dataMutex_); + data_.clear(); +} + +nlohmann::json WebSocketSession::GetAllData() const { + std::lock_guard lock(dataMutex_); + return data_; +} + +std::string WebSocketSession::GetRemoteAddress() const { + try { + return wsConn_->GetRemoteEndpoint().address().to_string(); + } catch (...) { + return "unknown"; + } +} + +void WebSocketSession::OnMessage(const WebSocketProtocol::WebSocketMessage& msg) { + if (messageHandler_) { + if (msg.opcode == WebSocketProtocol::OP_TEXT) { + try { + auto json = nlohmann::json::parse(msg.GetText()); + messageHandler_(json); + } catch (const std::exception& e) { + Logger::Error("WebSocketSession: invalid JSON: {}", e.what()); + } + } else if (msg.opcode == WebSocketProtocol::OP_BINARY) { + // Binary message handling – can be extended to parse binary protocol + Logger::Debug("WebSocket binary message received, size: {}", msg.data.size()); + } + } +} + +void WebSocketSession::OnClose(uint16_t code, const std::string& reason) { + Logger::Info("WebSocketSession closed: code={}, reason={}", code, reason); + if (closeHandler_) { + closeHandler_(); + } +} \ No newline at end of file diff --git a/src/process/ProcessPool.cpp b/src/process/ProcessPool.cpp index 5120d17..51de7fb 100644 --- a/src/process/ProcessPool.cpp +++ b/src/process/ProcessPool.cpp @@ -1,12 +1,19 @@ #include "process/ProcessPool.hpp" -// Declare the global shutdown flag from main.cpp extern std::atomic g_shutdown; -ProcessPool::ProcessPool(int numProcesses) - : numProcesses_(numProcesses > 0 ? numProcesses : 1) { - workerPids_.resize(numProcesses_, -1); - workerPipes_.resize(numProcesses_ * 2, -1); +ProcessPool::ProcessPool(const std::vector& groups) + : groups_(groups) +{ + // Compute total number of workers + totalWorkers_ = 0; + for (const auto& g : groups_) { + totalWorkers_ += g.count; + } + + // Pre-allocate vectors + workers_.resize(totalWorkers_); + workerPipes_.resize(totalWorkers_ * 2, -1); } ProcessPool::~ProcessPool() { @@ -14,8 +21,8 @@ ProcessPool::~ProcessPool() { } bool ProcessPool::Initialize() { - if (numProcesses_ <= 0) { - Logger::Error("Invalid number of processes: {}", numProcesses_); + if (totalWorkers_ <= 0) { + Logger::Error("No workers configured"); return false; } @@ -24,22 +31,23 @@ bool ProcessPool::Initialize() { running_.store(true); shutdownRequested_.store(false); - for (int i = 0; i < numProcesses_; ++i) { + // Create pipes for each worker + for (int i = 0; i < totalWorkers_; ++i) { CreateWorkerPipe(i); } return true; } -void ProcessPool::CreateWorkerPipe(int workerId) { +void ProcessPool::CreateWorkerPipe(int globalWorkerId) { int pipefd[2]; if (pipe(pipefd) == -1) { - Logger::Error("Failed to create pipe for worker {}: {}", workerId, strerror(errno)); + Logger::Error("Failed to create pipe for worker {}: {}", globalWorkerId, strerror(errno)); return; } - int read_idx = workerId * 2; - int write_idx = workerId * 2 + 1; + int read_idx = globalWorkerId * 2; + int write_idx = globalWorkerId * 2 + 1; if (workerPipes_[read_idx] != -1) close(workerPipes_[read_idx]); if (workerPipes_[write_idx] != -1) close(workerPipes_[write_idx]); @@ -60,7 +68,14 @@ void ProcessPool::Run() { if (getpid() == masterPid_) { masterThread_ = std::thread(&ProcessPool::MasterProcess, this); } else { - WorkerProcess(workerId_); + // Worker process: we need to know which global worker ID and group config we have. + // This is set in the child before calling Run(), but here we need to retrieve it. + // We'll set a member variable in the child branch before calling Run. + // However, the constructor only runs in the master; for workers, we need to + // set these after fork. The child branch of MasterProcess will set them. + // So this branch should never be executed directly because we call WorkerProcess + // explicitly from the child after fork. + Logger::Error("Worker process started without proper initialization"); } } @@ -70,64 +85,79 @@ void ProcessPool::MasterProcess() { sigset_t oldset; BlockSignals(&oldset); - for (int i = 0; i < numProcesses_; ++i) { - pid_t pid = fork(); - - if (pid == 0) { - // Child - signal(SIGINT, SIG_IGN); - // SIGTERM handler: sets the global shutdown flag (async‑safe) - signal(SIGTERM, [](int) { g_shutdown.store(true, std::memory_order_relaxed); }); - - UnblockSignals(&oldset); + // Spawn workers in order of groups + int globalWorkerId = 0; + for (size_t gidx = 0; gidx < groups_.size(); ++gidx) { + const auto& group = groups_[gidx]; + for (int w = 0; w < group.count; ++w, ++globalWorkerId) { + pid_t pid = fork(); + + if (pid == 0) { + // Child process + signal(SIGINT, SIG_IGN); + // SIGTERM handler: sets the global shutdown flag (async‑safe) + signal(SIGTERM, [](int) { g_shutdown.store(true, std::memory_order_relaxed); }); + + UnblockSignals(&oldset); + + // Permanently block SIGINT only (SIGTERM remains unblocked) + sigset_t block_int; + sigemptyset(&block_int); + sigaddset(&block_int, SIGINT); + pthread_sigmask(SIG_BLOCK, &block_int, nullptr); + + // Close all pipe ends except the one for this worker + for (int i = 0; i < totalWorkers_ * 2; ++i) { + if (i != globalWorkerId * 2) { + if (workerPipes_[i] != -1) close(workerPipes_[i]); + } + } - // Permanently block SIGINT only (SIGTERM remains unblocked) - sigset_t block_int; - sigemptyset(&block_int); - sigaddset(&block_int, SIGINT); - pthread_sigmask(SIG_BLOCK, &block_int, nullptr); + // Set worker process name + std::string processName = "game_worker_" + std::to_string(globalWorkerId); + prctl(PR_SET_NAME, processName.c_str(), 0, 0, 0); - workerId_ = i; - role_ = ProcessRole::WORKER; + // Store config for this worker + groupConfig_ = group; - for (int j = 0; j < numProcesses_ * 2; ++j) { - if (j != workerId_ * 2) { - if (workerPipes_[j] != -1) close(workerPipes_[j]); - } - } + role_ = ProcessRole::WORKER; - std::string processName = "game_worker_" + std::to_string(i); - prctl(PR_SET_NAME, processName.c_str(), 0, 0, 0); + Logger::Info("Worker {} started (PID: {}) for group {} ({}:{})", + globalWorkerId, getpid(), gidx, group.protocol, group.port); - Logger::Info("Worker process {} started (PID: {})", i, getpid()); + WorkerProcess(globalWorkerId, group); - WorkerProcess(i); + // Cleanup: close read end of pipe + if (workerPipes_[globalWorkerId * 2] != -1) + close(workerPipes_[globalWorkerId * 2]); + _exit(0); - if (workerPipes_[workerId_ * 2] != -1) - close(workerPipes_[workerId_ * 2]); - _exit(0); + } else if (pid > 0) { + // Master: record worker info + workers_[globalWorkerId] = {pid, static_cast(gidx), w, group}; - } else if (pid > 0) { - workerPids_[i] = pid; + // Close the read end in master + if (workerPipes_[globalWorkerId * 2] != -1) { + close(workerPipes_[globalWorkerId * 2]); + workerPipes_[globalWorkerId * 2] = -1; + } - if (workerPipes_[i * 2] != -1) { - close(workerPipes_[i * 2]); - workerPipes_[i * 2] = -1; - } + // Health tracking + { + std::lock_guard lock(healthMutex_); + workerHealth_[globalWorkerId] = {pid, time(nullptr)}; + } - { - std::lock_guard lock(healthMutex_); - workerHealth_[i] = {pid, time(nullptr)}; + Logger::Info("Worker {} started with PID: {}", globalWorkerId, pid); + } else { + Logger::Error("Failed to fork worker {}: {}", globalWorkerId, strerror(errno)); } - - Logger::Info("Worker {} started with PID: {}", i, pid); - } else { - Logger::Error("Failed to fork worker {}: {}", i, strerror(errno)); } } UnblockSignals(&oldset); + // Main loop: monitor workers and check for shutdown while (running_.load() && !shutdownRequested_.load()) { CleanupDeadWorkers(); for (int i = 0; i < 10 && running_.load() && !shutdownRequested_.load(); ++i) { @@ -135,17 +165,19 @@ void ProcessPool::MasterProcess() { } } - for (int i = 0; i < numProcesses_; ++i) { - if (workerPids_[i] > 0) { - Logger::Info("Terminating worker {} (PID: {})", i, workerPids_[i]); - kill(workerPids_[i], SIGTERM); + // Send SIGTERM to all workers + for (int i = 0; i < totalWorkers_; ++i) { + if (workers_[i].pid > 0) { + Logger::Info("Terminating worker {} (PID: {})", i, workers_[i].pid); + kill(workers_[i].pid, SIGTERM); } } + // Wait for all workers to exit int status; - for (int i = 0; i < numProcesses_; ++i) { - if (workerPids_[i] > 0) { - waitpid(workerPids_[i], &status, 0); + for (int i = 0; i < totalWorkers_; ++i) { + if (workers_[i].pid > 0) { + waitpid(workers_[i].pid, &status, 0); Logger::Info("Worker {} exited with status: {}", i, status); } } @@ -154,23 +186,12 @@ void ProcessPool::MasterProcess() { Logger::Info("Master process shutdown complete"); } -void ProcessPool::CloseAllPipes() { - for (size_t i = 0; i < workerPipes_.size(); ++i) { - if (workerPipes_[i] != -1) { - close(workerPipes_[i]); - workerPipes_[i] = -1; - } - } -} - -void ProcessPool::WorkerProcess(int workerId) { - this->workerId_ = workerId; - - // No need to set signal handlers here – they are already set in the child branch. - // The SIGTERM handler set earlier will remain active. +void ProcessPool::WorkerProcess(int globalWorkerId, const WorkerGroupConfig& config) { + this->workerId_ = globalWorkerId; + this->groupConfig_ = config; if (workerMainFunc_) { - workerMainFunc_(workerId); + workerMainFunc_(globalWorkerId, config); } } @@ -179,8 +200,8 @@ void ProcessPool::CleanupDeadWorkers() { pid_t pid; while ((pid = waitpid(-1, &status, WNOHANG)) > 0) { - for (int i = 0; i < numProcesses_; ++i) { - if (workerPids_[i] == pid) { + for (int i = 0; i < totalWorkers_; ++i) { + if (workers_[i].pid == pid) { Logger::Warn("Worker {} (PID: {}) died with status {}, {}...", i, pid, WEXITSTATUS(status), shutdownRequested_.load() ? "not restarting (shutdown)" : "restarting"); @@ -203,17 +224,31 @@ void ProcessPool::CleanupDeadWorkers() { } } -void ProcessPool::RestartWorker(int workerId) { +void ProcessPool::CloseAllPipes() { + for (size_t i = 0; i < workerPipes_.size(); ++i) { + if (workerPipes_[i] != -1) { + close(workerPipes_[i]); + workerPipes_[i] = -1; + } + } +} + +void ProcessPool::RestartWorker(int globalWorkerId) { + // Find which group this worker belongs to + const auto& oldInfo = workers_[globalWorkerId]; + const auto& group = oldInfo.config; + + // Create new pipe for this worker int pipefd[2]; if (pipe(pipefd) == -1) { - Logger::Error("Failed to create pipe for worker {}: {}", workerId, strerror(errno)); + Logger::Error("Failed to create pipe for restarting worker {}: {}", globalWorkerId, strerror(errno)); return; } - int old_read = workerPipes_[workerId * 2]; - int old_write = workerPipes_[workerId * 2 + 1]; - workerPipes_[workerId * 2] = pipefd[0]; - workerPipes_[workerId * 2 + 1] = pipefd[1]; + int old_read = workerPipes_[globalWorkerId * 2]; + int old_write = workerPipes_[globalWorkerId * 2 + 1]; + workerPipes_[globalWorkerId * 2] = pipefd[0]; + workerPipes_[globalWorkerId * 2 + 1] = pipefd[1]; sigset_t oldset; BlockSignals(&oldset); @@ -221,59 +256,63 @@ void ProcessPool::RestartWorker(int workerId) { pid_t pid = fork(); if (pid == 0) { + // Child signal(SIGINT, SIG_IGN); signal(SIGTERM, [](int) { g_shutdown.store(true, std::memory_order_relaxed); }); UnblockSignals(&oldset); - // Permanently block SIGINT only + // Block SIGINT permanently sigset_t block_int; sigemptyset(&block_int); sigaddset(&block_int, SIGINT); pthread_sigmask(SIG_BLOCK, &block_int, nullptr); - workerId_ = workerId; - role_ = ProcessRole::WORKER; - - for (int j = 0; j < numProcesses_ * 2; ++j) { - if (j != workerId * 2) { - if (workerPipes_[j] != -1) close(workerPipes_[j]); + // Close all pipe ends except this worker's read end + for (int i = 0; i < totalWorkers_ * 2; ++i) { + if (i != globalWorkerId * 2) { + if (workerPipes_[i] != -1) close(workerPipes_[i]); } } - fcntl(workerPipes_[workerId * 2], F_SETFL, O_NONBLOCK); - - std::string processName = "game_worker_" + std::to_string(workerId); + std::string processName = "game_worker_" + std::to_string(globalWorkerId); prctl(PR_SET_NAME, processName.c_str(), 0, 0, 0); - Logger::Info("Restarted worker {} (PID: {})", workerId, getpid()); + role_ = ProcessRole::WORKER; - WorkerProcess(workerId); + Logger::Info("Restarted worker {} (PID: {})", globalWorkerId, getpid()); - if (workerPipes_[workerId * 2] != -1) - close(workerPipes_[workerId * 2]); + WorkerProcess(globalWorkerId, group); + + if (workerPipes_[globalWorkerId * 2] != -1) + close(workerPipes_[globalWorkerId * 2]); _exit(0); } else if (pid > 0) { - workerPids_[workerId] = pid; + // Master + workers_[globalWorkerId].pid = pid; - close(workerPipes_[workerId * 2]); - workerPipes_[workerId * 2] = -1; + // Close read end in master + if (workerPipes_[globalWorkerId * 2] != -1) { + close(workerPipes_[globalWorkerId * 2]); + workerPipes_[globalWorkerId * 2] = -1; + } - fcntl(workerPipes_[workerId * 2 + 1], F_SETFL, O_NONBLOCK); + // Set non‑blocking for write end + fcntl(workerPipes_[globalWorkerId * 2 + 1], F_SETFL, O_NONBLOCK); { std::lock_guard lock(healthMutex_); - workerHealth_[workerId] = {pid, time(nullptr)}; + workerHealth_[globalWorkerId] = {pid, time(nullptr)}; } - Logger::Info("Worker {} restarted with PID: {}", workerId, pid); + Logger::Info("Worker {} restarted with PID: {}", globalWorkerId, pid); } else { - Logger::Error("Failed to restart worker {}: {}", workerId, strerror(errno)); + Logger::Error("Failed to restart worker {}: {}", globalWorkerId, strerror(errno)); close(pipefd[0]); close(pipefd[1]); - workerPipes_[workerId * 2] = old_read; - workerPipes_[workerId * 2 + 1] = old_write; + workerPipes_[globalWorkerId * 2] = old_read; + workerPipes_[globalWorkerId * 2 + 1] = old_write; } UnblockSignals(&oldset); @@ -404,7 +443,7 @@ bool ProcessPool::DrainPipe(int fd, size_t bytesToDrain) { } bool ProcessPool::SendToWorker(int workerId, const std::string& message) { - if (workerId < 0 || workerId >= numProcesses_) { + if (workerId < 0 || workerId >= totalWorkers_) { Logger::Error("Invalid worker ID: {}", workerId); return false; } @@ -466,7 +505,7 @@ std::string ProcessPool::ReceiveFromMaster() { } bool ProcessPool::IsWorkerAlive(int workerId) const { - if (workerId < 0 || workerId >= numProcesses_) { + if (workerId < 0 || workerId >= totalWorkers_) { return false; } std::lock_guard lock(healthMutex_); @@ -484,7 +523,7 @@ void ProcessPool::SetupSignalHandlers() { signal(SIGPIPE, SIG_IGN); } -void ProcessPool::SetWorkerMain(std::function workerMainFunc) { +void ProcessPool::SetWorkerMain(WorkerMainFunc workerMainFunc) { workerMainFunc_ = std::move(workerMainFunc); }