From 980a97a52674e2eb1f3c09cfece549a3c549331e Mon Sep 17 00:00:00 2001 From: sergystepanov Date: Sun, 3 Jan 2021 21:23:55 +0300 Subject: [PATCH] Handle no config situation for workers (#253) (experimental feature) Before a worker can start, it should have a configuration file. In case if such a file is not found it may request configuration from the coordinator to which it connected. Added example logic if a worker is needed to be blocked until a successful packet exchange with a coordinator is being made. * Add error return for config loader * Add config loaded flag to worker * Add zone flag * Add a custom mutex lock with timout * Refactor worker runtime * Refactor internal api * Extract monitoring server config * Extract worker HTTP(S) server * Add generic sub-server interface * Add internal coordinator API * Add internal routes and handlers to worker * Add internal worker API * Refactor worker run * Migrate serverId call to new API * Add packet handler to cws * Extract handlers for internal worker routes in coordinator * Pass worker to the worker internal heandlers * Cleanup worker handlers in coordinator * Add closeRoom packet handler to the API * Add GetRoom packet handler to the API * Add RegisterRoom packet handler to the API * Add IceCandidate packet handler to the API (internal and browser) * Add Heartbeat packet handler to the API (internal and browser) * Rename worker routes init function * Extract worker/coordinator internal ws handlers * Update timed locker * Allow sequential timed locks * Add config request from workers * Add nil check for the route registration functions --- cmd/worker/main.go | 19 +- pkg/config/coordinator/config.go | 11 +- pkg/config/loader.go | 9 +- pkg/config/monitoring/config.go | 8 + pkg/config/worker/config.go | 26 +- pkg/coordinator/browser.go | 205 +----------- pkg/coordinator/coordinator.go | 2 +- pkg/coordinator/handlers.go | 24 +- pkg/coordinator/internalhandlers.go | 69 ++++ pkg/coordinator/routes.go | 33 ++ pkg/coordinator/useragenthandlers.go | 211 ++++++++++++ pkg/coordinator/worker.go | 68 +--- .../coordinator_api.go => cws/api/api.go} | 20 +- pkg/cws/api/coordinator.go | 51 +++ pkg/cws/api/worker.go | 21 ++ pkg/cws/cws.go | 49 +-- .../libretro/nanoarch/nanoarch_test.go | 4 +- pkg/lock/lock.go | 48 +++ pkg/lock/lock_test.go | 38 +++ pkg/monitoring/monitoring.go | 62 ++-- pkg/server/server.go | 9 + pkg/worker/coordinator.go | 311 ------------------ pkg/worker/handlers.go | 24 +- pkg/worker/internalhandlers.go | 296 +++++++++++++++++ pkg/worker/room/room_test.go | 4 +- pkg/worker/routes.go | 22 ++ pkg/worker/server.go | 121 +++++++ pkg/worker/worker.go | 206 +++--------- tests/e2e/main_test.go | 4 +- web/js/network/rtcp.js | 15 +- web/js/network/socket.js | 6 +- 31 files changed, 1134 insertions(+), 862 deletions(-) create mode 100644 pkg/config/monitoring/config.go create mode 100644 pkg/coordinator/internalhandlers.go create mode 100644 pkg/coordinator/routes.go create mode 100644 pkg/coordinator/useragenthandlers.go rename pkg/{api/coordinator_api.go => cws/api/api.go} (50%) create mode 100644 pkg/cws/api/coordinator.go create mode 100644 pkg/cws/api/worker.go create mode 100644 pkg/lock/lock.go create mode 100644 pkg/lock/lock_test.go create mode 100644 pkg/server/server.go create mode 100644 pkg/worker/internalhandlers.go create mode 100644 pkg/worker/routes.go create mode 100644 pkg/worker/server.go diff --git a/cmd/worker/main.go b/cmd/worker/main.go index f5a33672..7bfb9d1e 100644 --- a/cmd/worker/main.go +++ b/cmd/worker/main.go @@ -16,9 +16,11 @@ import ( flag "github.com/spf13/pflag" ) -func run() { +func init() { rand.Seed(time.Now().UTC().UnixNano()) +} +func run() { conf := config.NewConfig() flag.CommandLine.AddGoFlagSet(goflag.CommandLine) conf.ParseFlags() @@ -28,20 +30,17 @@ func run() { ctx, cancelCtx := context.WithCancel(context.Background()) - glog.Infof("Initializing worker server") - glog.V(4).Infof("Worker configs %v", conf) - o := worker.New(ctx, conf) - if err := o.Run(); err != nil { - glog.Errorf("Failed to run worker, reason %v", err) - os.Exit(1) - } + glog.V(4).Info("[worker] Initialization") + glog.V(4).Infof("[worker] Local configuration %+v", conf) + wrk := worker.New(ctx, conf) + wrk.Run() stop := make(chan os.Signal, 1) signal.Notify(stop, os.Interrupt) select { case <-stop: - glog.Infoln("Received SIGTERM, Quiting Worker") - o.Shutdown() + glog.V(4).Info("[worker] Shutting down") + wrk.Shutdown() cancelCtx() } } diff --git a/pkg/config/coordinator/config.go b/pkg/config/coordinator/config.go index 16abaaae..47511f0c 100644 --- a/pkg/config/coordinator/config.go +++ b/pkg/config/coordinator/config.go @@ -3,10 +3,10 @@ package coordinator import ( "github.com/giongto35/cloud-game/v2/pkg/config" "github.com/giongto35/cloud-game/v2/pkg/config/emulator" + "github.com/giongto35/cloud-game/v2/pkg/config/monitoring" "github.com/giongto35/cloud-game/v2/pkg/config/shared" webrtcConfig "github.com/giongto35/cloud-game/v2/pkg/config/webrtc" "github.com/giongto35/cloud-game/v2/pkg/games" - "github.com/giongto35/cloud-game/v2/pkg/monitoring" flag "github.com/spf13/pflag" ) @@ -21,16 +21,17 @@ type Config struct { } Emulator emulator.Emulator Environment shared.Environment - Webrtc struct { - IceServers []webrtcConfig.IceServer - } + Webrtc webrtcConfig.Webrtc } // allows custom config path var configPath string func NewConfig() (conf Config) { - config.LoadConfig(&conf, configPath) + err := config.LoadConfig(&conf, configPath) + if err != nil { + panic(err) + } return } diff --git a/pkg/config/loader.go b/pkg/config/loader.go index d57d2320..0d665222 100644 --- a/pkg/config/loader.go +++ b/pkg/config/loader.go @@ -10,16 +10,17 @@ import ( // The path param specifies a custom path to the configuration file. // Reads and puts environment variables with the prefix CLOUD_GAME_. // Params from the config should be in uppercase separated with _. -func LoadConfig(config interface{}, path string) interface{} { +func LoadConfig(config interface{}, path string) error { envPrefix := "CLOUD_GAME" dirs := []string{path} if path == "" { + dirs = append(dirs, ".", "configs", "../../../configs") if home, err := os.UserHomeDir(); err == nil { - dirs = append(dirs, ".", "configs", home+"/.cr", "../../../configs") + dirs = append(dirs, home+"/.cr") } } if err := fig.Load(config, fig.Dirs(dirs...), fig.UseEnv(envPrefix)); err != nil { - panic(err) + return err } - return config + return nil } diff --git a/pkg/config/monitoring/config.go b/pkg/config/monitoring/config.go new file mode 100644 index 00000000..319304f6 --- /dev/null +++ b/pkg/config/monitoring/config.go @@ -0,0 +1,8 @@ +package monitoring + +type ServerMonitoringConfig struct { + Port int + URLPrefix string + MetricEnabled bool `json:"metric_enabled"` + ProfilingEnabled bool `json:"profiling_enabled"` +} diff --git a/pkg/config/worker/config.go b/pkg/config/worker/config.go index c352954e..1006060d 100644 --- a/pkg/config/worker/config.go +++ b/pkg/config/worker/config.go @@ -1,12 +1,14 @@ package worker import ( + "encoding/json" + "github.com/giongto35/cloud-game/v2/pkg/config" "github.com/giongto35/cloud-game/v2/pkg/config/emulator" "github.com/giongto35/cloud-game/v2/pkg/config/encoder" + "github.com/giongto35/cloud-game/v2/pkg/config/monitoring" "github.com/giongto35/cloud-game/v2/pkg/config/shared" webrtcConfig "github.com/giongto35/cloud-game/v2/pkg/config/webrtc" - "github.com/giongto35/cloud-game/v2/pkg/monitoring" flag "github.com/spf13/pflag" ) @@ -23,13 +25,21 @@ type Config struct { Server shared.Server } Webrtc webrtcConfig.Webrtc + Loaded bool } // allows custom config path var configPath string func NewConfig() (conf Config) { - config.LoadConfig(&conf, configPath) + if err := config.LoadConfig(&conf, configPath); err == nil { + conf.Loaded = true + } + return +} + +func EmptyConfig() (conf Config) { + conf.Loaded = false return } @@ -41,6 +51,18 @@ func (c *Config) ParseFlags() { c.Worker.Server.WithFlags() flag.IntVar(&c.Worker.Monitoring.Port, "monitoring.port", c.Worker.Monitoring.Port, "Monitoring server port") flag.StringVar(&c.Worker.Network.CoordinatorAddress, "coordinatorhost", c.Worker.Network.CoordinatorAddress, "Worker URL to connect") + flag.StringVar(&c.Worker.Network.Zone, "zone", c.Worker.Network.Zone, "Worker network zone (us, eu, etc.)") flag.StringVarP(&configPath, "conf", "c", configPath, "Set custom configuration file path") flag.Parse() } + +func (c *Config) Serialize() []byte { + res, _ := json.Marshal(c) + return res +} + +func (c *Config) Deserialize(data []byte) { + if err := json.Unmarshal(data, c); err == nil { + c.Loaded = true + } +} diff --git a/pkg/coordinator/browser.go b/pkg/coordinator/browser.go index 7eb3b11f..41ad610f 100644 --- a/pkg/coordinator/browser.go +++ b/pkg/coordinator/browser.go @@ -1,14 +1,10 @@ package coordinator import ( - "errors" "fmt" "log" - "github.com/giongto35/cloud-game/v2/pkg/api" "github.com/giongto35/cloud-game/v2/pkg/cws" - "github.com/giongto35/cloud-game/v2/pkg/games" - "github.com/giongto35/cloud-game/v2/pkg/worker/room" "github.com/gorilla/websocket" ) @@ -30,206 +26,9 @@ func NewBrowserClient(c *websocket.Conn, browserID string) *BrowserClient { // Register new log func (bc *BrowserClient) Printf(format string, args ...interface{}) { - newFmt := fmt.Sprintf("Browser %s] %s", bc.SessionID, format) - log.Printf(newFmt, args...) + log.Printf(fmt.Sprintf("Browser %s] %s", bc.SessionID, format), args...) } func (bc *BrowserClient) Println(args ...interface{}) { - msg := fmt.Sprintf("Browser %s] %s", bc.SessionID, fmt.Sprint(args...)) - log.Println(msg) -} - -// RouteBrowser -// Register callbacks for connection from browser -> coordinator -func (o *Server) RouteBrowser(client *BrowserClient) { - /* WebSocket */ - client.Receive("heartbeat", func(resp cws.WSPacket) cws.WSPacket { - return resp - }) - - /* WebRTC */ - client.Receive("initwebrtc", func(resp cws.WSPacket) cws.WSPacket { - // initwebrtc now only sends signal to worker, asks it to createOffer - client.Printf("Received initwebrtc request -> relay to worker: %s", client.WorkerID) - - // relay request to target worker - // worker creates a PeerConnection, and createOffer - // send SDP back to browser - resp.SessionID = client.SessionID - wc, ok := o.workerClients[client.WorkerID] - if !ok { - return cws.EmptyPacket - } - sdp := wc.SyncSend(resp) - - client.Println("Received SDP from worker -> sending back to browser") - return sdp - }) - - client.Receive("answer", func(resp cws.WSPacket) cws.WSPacket { - // contains SDP of browser createAnswer - // forward to worker - client.Println("Received browser answered SDP -> relay to worker") - - resp.SessionID = client.SessionID - wc, ok := o.workerClients[client.WorkerID] - if !ok { - return cws.EmptyPacket - } - wc.Send(resp, nil) - - // no need to response - return cws.EmptyPacket - }) - - client.Receive("candidate", func(resp cws.WSPacket) cws.WSPacket { - // contains ICE candidate of browser - // forward to worker - client.Println("Received IceCandidate from browser -> relay to worker") - - resp.SessionID = client.SessionID - wc, ok := o.workerClients[client.WorkerID] - if !ok { - return cws.EmptyPacket - } - wc.Send(resp, nil) - - return cws.EmptyPacket - }) - - /* GameLogic */ - client.Receive("quit", func(resp cws.WSPacket) (req cws.WSPacket) { - client.Println("Received quit request from a browser -> relay to worker") - - // TODO: Async - resp.SessionID = client.SessionID - wc, ok := o.workerClients[client.WorkerID] - if !ok { - return cws.EmptyPacket - } - // Send but, waiting - wc.SyncSend(resp) - - return cws.EmptyPacket - }) - - client.Receive("start", func(resp cws.WSPacket) cws.WSPacket { - client.Println("Received start request from a browser -> relay to worker") - - // TODO: Async - resp.SessionID = client.SessionID - wc, ok := o.workerClients[client.WorkerID] - if !ok { - return cws.EmptyPacket - } - - // +injects game data into the original game request - gameStartCall, err := newGameStartCall(resp.RoomID, resp.Data, o.library) - if err != nil { - return cws.EmptyPacket - } - if packet, err := gameStartCall.To(); err != nil { - return cws.EmptyPacket - } else { - resp.Data = packet - } - workerResp := wc.SyncSend(resp) - - // Response from worker contains initialized roomID. Set roomID to the session - client.RoomID = workerResp.RoomID - client.Println("Received room response from browser: ", workerResp.RoomID) - - return workerResp - }) - - client.Receive("save", func(resp cws.WSPacket) (req cws.WSPacket) { - client.Println("Received save request from a browser -> relay to worker") - - // TODO: Async - resp.SessionID = client.SessionID - resp.RoomID = client.RoomID - wc, ok := o.workerClients[client.WorkerID] - if !ok { - return cws.EmptyPacket - } - resp = wc.SyncSend(resp) - - return resp - }) - - client.Receive("load", func(resp cws.WSPacket) (req cws.WSPacket) { - client.Println("Received load request from a browser -> relay to worker") - - // TODO: Async - resp.SessionID = client.SessionID - resp.RoomID = client.RoomID - wc, ok := o.workerClients[client.WorkerID] - if !ok { - return cws.EmptyPacket - } - resp = wc.SyncSend(resp) - - return resp - }) - - client.Receive("playerIdx", func(resp cws.WSPacket) (req cws.WSPacket) { - client.Println("Received update player index request from a browser -> relay to worker") - - // TODO: Async - resp.SessionID = client.SessionID - resp.RoomID = client.RoomID - wc, ok := o.workerClients[client.WorkerID] - if !ok { - return cws.EmptyPacket - } - resp = wc.SyncSend(resp) - - return resp - }) - - client.Receive("multitap", func(resp cws.WSPacket) (req cws.WSPacket) { - client.Println("Received multitap request from a browser -> relay to worker") - - // TODO: Async - resp.SessionID = client.SessionID - resp.RoomID = client.RoomID - wc, ok := o.workerClients[client.WorkerID] - if !ok { - return cws.EmptyPacket - } - resp = wc.SyncSend(resp) - - return resp - }) -} - -// newGameStartCall gathers data for a new game start call of the worker -func newGameStartCall(roomId string, data string, library games.GameLibrary) (api.GameStartCall, error) { - request := api.GameStartRequest{} - if err := request.From(data); err != nil { - return api.GameStartCall{}, errors.New("invalid request") - } - - // the name of the game either in the `room id` field or - // it's in the initial request - game := request.GameName - if roomId != "" { - // ! should be moved into coordinator - name := room.GetGameNameFromRoomID(roomId) - if name == "" { - return api.GameStartCall{}, errors.New("couldn't decode game name from the room id") - } - game = name - } - - gameInfo := library.FindGameByName(game) - if gameInfo.Path == "" { - return api.GameStartCall{}, fmt.Errorf("couldn't find game info for the game %v", game) - } - - return api.GameStartCall{ - Name: gameInfo.Name, - Path: gameInfo.Path, - Type: gameInfo.Type, - }, nil + log.Println(fmt.Sprintf("Browser %s] %s", bc.SessionID, fmt.Sprint(args...))) } diff --git a/pkg/coordinator/coordinator.go b/pkg/coordinator/coordinator.go index cc87b7ab..db9a4e91 100644 --- a/pkg/coordinator/coordinator.go +++ b/pkg/coordinator/coordinator.go @@ -33,7 +33,7 @@ func New(ctx context.Context, cfg coordinator.Config) *Coordinator { ctx: ctx, cfg: cfg, - monitoringServer: monitoring.NewServerMonitoring(cfg.Coordinator.Monitoring), + monitoringServer: monitoring.NewServerMonitoring(cfg.Coordinator.Monitoring, "cord"), } } diff --git a/pkg/coordinator/handlers.go b/pkg/coordinator/handlers.go index 636d7742..ed84fc95 100644 --- a/pkg/coordinator/handlers.go +++ b/pkg/coordinator/handlers.go @@ -12,6 +12,7 @@ import ( "github.com/giongto35/cloud-game/v2/pkg/config/coordinator" "github.com/giongto35/cloud-game/v2/pkg/cws" + "github.com/giongto35/cloud-game/v2/pkg/cws/api" "github.com/giongto35/cloud-game/v2/pkg/environment" "github.com/giongto35/cloud-game/v2/pkg/games" "github.com/giongto35/cloud-game/v2/pkg/util" @@ -63,7 +64,8 @@ func (o *Server) GetWeb(w http.ResponseWriter, r *http.Request) { tmpl.Execute(w, struct{}{}) } -// getPingServer returns the server for latency check of a zone. In latency check to find best worker step, we use this server to find the closest worker. +// getPingServer returns the server for latency check of a zone. +// In latency check to find best worker step, we use this server to find the closest worker. func (o *Server) getPingServer(zone string) string { if o.cfg.Coordinator.PingServer != "" { return fmt.Sprintf("%s/echo", o.cfg.Coordinator.PingServer) @@ -73,8 +75,6 @@ func (o *Server) getPingServer(zone string) string { if mode.AnyOf(environment.Production, environment.Staging) { return fmt.Sprintf(pingServerTemp, zone, o.cfg.Coordinator.PublicDomain) } - - // If not Prod or Staging, return dev environment return devPingServer } @@ -137,20 +137,13 @@ func (o *Server) WSO(w http.ResponseWriter, r *http.Request) { wc.Zone = zone wc.PingServer = pingServer - // Eveything is cool // Attach to Server instance with workerID, add defer o.workerClients[workerID] = wc defer o.cleanWorker(wc, workerID) - // Sendback the ID to worker - // TODO: do we need this packet? - wc.Send(cws.WSPacket{ - ID: "serverID", - Data: workerID, - }, nil) + wc.Send(api.ServerIdPacket(workerID), nil) - // Add receiver callbacks, and listen - o.RouteWorker(wc) + o.workerRoutes(wc) wc.Listen() } @@ -233,7 +226,7 @@ func (o *Server) WS(w http.ResponseWriter, r *http.Request) { defer o.cleanBrowser(bc, sessionID) // Routing browserClient message - o.RouteBrowser(bc) + o.useragentRoutes(bc) bc.Send(cws.WSPacket{ ID: "init", @@ -244,10 +237,7 @@ func (o *Server) WS(w http.ResponseWriter, r *http.Request) { <-bc.Done // Notify worker to clean session - wc.Send(cws.WSPacket{ - ID: "terminateSession", - SessionID: sessionID, - }, nil) + wc.Send(api.TerminateSessionPacket(sessionID), nil) // WorkerClient become available again wc.IsAvailable = true diff --git a/pkg/coordinator/internalhandlers.go b/pkg/coordinator/internalhandlers.go new file mode 100644 index 00000000..92f8b189 --- /dev/null +++ b/pkg/coordinator/internalhandlers.go @@ -0,0 +1,69 @@ +package coordinator + +import ( + "log" + + "github.com/giongto35/cloud-game/v2/pkg/config/worker" + "github.com/giongto35/cloud-game/v2/pkg/cws" + "github.com/giongto35/cloud-game/v2/pkg/cws/api" +) + +func (wc *WorkerClient) handleConfigRequest() cws.PacketHandler { + return func(resp cws.WSPacket) cws.WSPacket { + // try to load worker config + conf := worker.NewConfig() + return api.ConfigRequestPacket(conf.Serialize()) + } +} + +func (wc *WorkerClient) handleHeartbeat() cws.PacketHandler { + return func(resp cws.WSPacket) cws.WSPacket { + return resp + } +} + +// handleRegisterRoom event from a worker, when worker created a new room. +// RoomID is global so it is managed by coordinator. +func (wc *WorkerClient) handleRegisterRoom(s *Server) cws.PacketHandler { + return func(resp cws.WSPacket) cws.WSPacket { + log.Printf("Coordinator: Received registerRoom room %s from worker %s", resp.Data, wc.WorkerID) + s.roomToWorker[resp.Data] = wc.WorkerID + log.Printf("Coordinator: Current room list is: %+v", s.roomToWorker) + return api.RegisterRoomPacket(api.NoData) + } +} + +// handleGetRoom returns the server ID based on requested roomID. +func (wc *WorkerClient) handleGetRoom(s *Server) cws.PacketHandler { + return func(resp cws.WSPacket) cws.WSPacket { + log.Println("Coordinator: Received a get room request") + log.Println("Result: ", s.roomToWorker[resp.Data]) + return api.GetRoomPacket(s.roomToWorker[resp.Data]) + } +} + +// handleCloseRoom event from a worker, when worker close a room. +func (wc *WorkerClient) handleCloseRoom(s *Server) cws.PacketHandler { + return func(resp cws.WSPacket) cws.WSPacket { + log.Printf("Coordinator: Received closeRoom room %s from worker %s", resp.Data, wc.WorkerID) + delete(s.roomToWorker, resp.Data) + log.Printf("Coordinator: Current room list is: %+v", s.roomToWorker) + return api.CloseRoomPacket(api.NoData) + } +} + +// handleIceCandidate passes an ICE candidate (WebRTC) to the browser. +func (wc *WorkerClient) handleIceCandidate(s *Server) cws.PacketHandler { + return func(resp cws.WSPacket) cws.WSPacket { + wc.Println("Received IceCandidate from worker -> relay to browser") + bc, ok := s.browserClients[resp.SessionID] + if ok { + // Remove SessionID while sending back to browser + resp.SessionID = "" + bc.Send(resp, nil) + } else { + wc.Println("Error: unknown SessionID:", resp.SessionID) + } + return cws.EmptyPacket + } +} diff --git a/pkg/coordinator/routes.go b/pkg/coordinator/routes.go new file mode 100644 index 00000000..483d9e4e --- /dev/null +++ b/pkg/coordinator/routes.go @@ -0,0 +1,33 @@ +package coordinator + +import "github.com/giongto35/cloud-game/v2/pkg/cws/api" + +// workerRoutes adds all worker request routes. +func (o *Server) workerRoutes(wc *WorkerClient) { + if wc == nil { + return + } + wc.Receive(api.ConfigRequest, wc.handleConfigRequest()) + wc.Receive(api.Heartbeat, wc.handleHeartbeat()) + wc.Receive(api.RegisterRoom, wc.handleRegisterRoom(o)) + wc.Receive(api.GetRoom, wc.handleGetRoom(o)) + wc.Receive(api.CloseRoom, wc.handleCloseRoom(o)) + wc.Receive(api.IceCandidate, wc.handleIceCandidate(o)) +} + +// useragentRoutes adds all useragent (browser) request routes. +func (o *Server) useragentRoutes(bc *BrowserClient) { + if bc == nil { + return + } + bc.Receive(api.Heartbeat, bc.handleHeartbeat()) + bc.Receive(api.InitWebrtc, bc.handleInitWebrtc(o)) + bc.Receive(api.Answer, bc.handleAnswer(o)) + bc.Receive(api.IceCandidate, bc.handleIceCandidate(o)) + bc.Receive(api.GameStart, bc.handleGameStart(o)) + bc.Receive(api.GameQuit, bc.handleGameQuit(o)) + bc.Receive(api.GameSave, bc.handleGameSave(o)) + bc.Receive(api.GameLoad, bc.handleGameLoad(o)) + bc.Receive(api.GamePlayerSelect, bc.handleGamePlayerSelect(o)) + bc.Receive(api.GameMultitap, bc.handleGameMultitap(o)) +} diff --git a/pkg/coordinator/useragenthandlers.go b/pkg/coordinator/useragenthandlers.go new file mode 100644 index 00000000..3b8d105e --- /dev/null +++ b/pkg/coordinator/useragenthandlers.go @@ -0,0 +1,211 @@ +package coordinator + +import ( + "errors" + "fmt" + + "github.com/giongto35/cloud-game/v2/pkg/cws" + "github.com/giongto35/cloud-game/v2/pkg/cws/api" + "github.com/giongto35/cloud-game/v2/pkg/games" + "github.com/giongto35/cloud-game/v2/pkg/worker/room" +) + +func (bc *BrowserClient) handleHeartbeat() cws.PacketHandler { + return func(resp cws.WSPacket) cws.WSPacket { return resp } +} + +func (bc *BrowserClient) handleInitWebrtc(o *Server) cws.PacketHandler { + return func(resp cws.WSPacket) (req cws.WSPacket) { + // initWebrtc now only sends signal to worker, asks it to createOffer + bc.Printf("Received init_webrtc request -> relay to worker: %s", bc.WorkerID) + // relay request to target worker + // worker creates a PeerConnection, and createOffer + // send SDP back to browser + resp.SessionID = bc.SessionID + wc, ok := o.workerClients[bc.WorkerID] + if !ok { + return cws.EmptyPacket + } + sdp := wc.SyncSend(resp) + bc.Println("Received SDP from worker -> sending back to browser") + return sdp + } +} + +func (bc *BrowserClient) handleAnswer(o *Server) cws.PacketHandler { + return func(resp cws.WSPacket) (req cws.WSPacket) { + // contains SDP of browser createAnswer + // forward to worker + bc.Println("Received browser answered SDP -> relay to worker") + resp.SessionID = bc.SessionID + wc, ok := o.workerClients[bc.WorkerID] + if !ok { + return cws.EmptyPacket + } + wc.Send(resp, nil) + // no need to response + return cws.EmptyPacket + } +} + +func (bc *BrowserClient) handleIceCandidate(o *Server) cws.PacketHandler { + return func(resp cws.WSPacket) (req cws.WSPacket) { + // contains ICE candidate of browser + // forward to worker + bc.Println("Received IceCandidate from browser -> relay to worker") + resp.SessionID = bc.SessionID + wc, ok := o.workerClients[bc.WorkerID] + if !ok { + return cws.EmptyPacket + } + wc.Send(resp, nil) + return cws.EmptyPacket + } +} + +func (bc *BrowserClient) handleGameStart(o *Server) cws.PacketHandler { + return func(resp cws.WSPacket) (req cws.WSPacket) { + bc.Println("Received start request from a browser -> relay to worker") + + // TODO: Async + resp.SessionID = bc.SessionID + wc, ok := o.workerClients[bc.WorkerID] + if !ok { + return cws.EmptyPacket + } + + // +injects game data into the original game request + gameStartCall, err := newGameStartCall(resp.RoomID, resp.Data, o.library) + if err != nil { + return cws.EmptyPacket + } + if packet, err := gameStartCall.To(); err != nil { + return cws.EmptyPacket + } else { + resp.Data = packet + } + workerResp := wc.SyncSend(resp) + + // Response from worker contains initialized roomID. Set roomID to the session + bc.RoomID = workerResp.RoomID + bc.Println("Received room response from browser: ", workerResp.RoomID) + + return workerResp + } +} + +func (bc *BrowserClient) handleGameQuit(o *Server) cws.PacketHandler { + return func(resp cws.WSPacket) (req cws.WSPacket) { + bc.Println("Received quit request from a browser -> relay to worker") + + // TODO: Async + resp.SessionID = bc.SessionID + wc, ok := o.workerClients[bc.WorkerID] + if !ok { + return cws.EmptyPacket + } + // Send but, waiting + wc.SyncSend(resp) + + return cws.EmptyPacket + } +} + +func (bc *BrowserClient) handleGameSave(o *Server) cws.PacketHandler { + return func(resp cws.WSPacket) (req cws.WSPacket) { + bc.Println("Received save request from a browser -> relay to worker") + + // TODO: Async + resp.SessionID = bc.SessionID + resp.RoomID = bc.RoomID + wc, ok := o.workerClients[bc.WorkerID] + if !ok { + return cws.EmptyPacket + } + resp = wc.SyncSend(resp) + + return resp + } +} + +func (bc *BrowserClient) handleGameLoad(o *Server) cws.PacketHandler { + return func(resp cws.WSPacket) (req cws.WSPacket) { + bc.Println("Received load request from a browser -> relay to worker") + + // TODO: Async + resp.SessionID = bc.SessionID + resp.RoomID = bc.RoomID + wc, ok := o.workerClients[bc.WorkerID] + if !ok { + return cws.EmptyPacket + } + resp = wc.SyncSend(resp) + + return resp + } +} + +func (bc *BrowserClient) handleGamePlayerSelect(o *Server) cws.PacketHandler { + return func(resp cws.WSPacket) (req cws.WSPacket) { + bc.Println("Received update player index request from a browser -> relay to worker") + + // TODO: Async + resp.SessionID = bc.SessionID + resp.RoomID = bc.RoomID + wc, ok := o.workerClients[bc.WorkerID] + if !ok { + return cws.EmptyPacket + } + resp = wc.SyncSend(resp) + + return resp + } +} + +func (bc *BrowserClient) handleGameMultitap(o *Server) cws.PacketHandler { + return func(resp cws.WSPacket) (req cws.WSPacket) { + bc.Println("Received multitap request from a browser -> relay to worker") + + // TODO: Async + resp.SessionID = bc.SessionID + resp.RoomID = bc.RoomID + wc, ok := o.workerClients[bc.WorkerID] + if !ok { + return cws.EmptyPacket + } + resp = wc.SyncSend(resp) + + return resp + } +} + +// newGameStartCall gathers data for a new game start call of the worker +func newGameStartCall(roomId string, data string, library games.GameLibrary) (api.GameStartCall, error) { + request := api.GameStartRequest{} + if err := request.From(data); err != nil { + return api.GameStartCall{}, errors.New("invalid request") + } + + // the name of the game either in the `room id` field or + // it's in the initial request + game := request.GameName + if roomId != "" { + // ! should be moved into coordinator + name := room.GetGameNameFromRoomID(roomId) + if name == "" { + return api.GameStartCall{}, errors.New("couldn't decode game name from the room id") + } + game = name + } + + gameInfo := library.FindGameByName(game) + if gameInfo.Path == "" { + return api.GameStartCall{}, fmt.Errorf("couldn't find game info for the game %v", game) + } + + return api.GameStartCall{ + Name: gameInfo.Name, + Path: gameInfo.Path, + Type: gameInfo.Type, + }, nil +} diff --git a/pkg/coordinator/worker.go b/pkg/coordinator/worker.go index 948c09a1..f074f261 100644 --- a/pkg/coordinator/worker.go +++ b/pkg/coordinator/worker.go @@ -8,10 +8,9 @@ import ( "github.com/gorilla/websocket" ) -const pingServer = "%s://%s/echo" - type WorkerClient struct { *cws.Client + WorkerID string Address string // ip address of worker // public server used for ping check (Cannot use worker address because they are not publicly exposed) @@ -21,7 +20,8 @@ type WorkerClient struct { Zone string } -// NewWorkerClient returns a client connecting to worker. This connection exchanges information between workers and server +// NewWorkerClient returns a client connecting to worker. +// This connection exchanges information between workers and server. func NewWorkerClient(c *websocket.Conn, workerID string) *WorkerClient { return &WorkerClient{ Client: cws.NewClient(c), @@ -30,68 +30,10 @@ func NewWorkerClient(c *websocket.Conn, workerID string) *WorkerClient { } } -// Register new log func (wc *WorkerClient) Printf(format string, args ...interface{}) { - newFmt := fmt.Sprintf("Worker %s] %s", wc.WorkerID, format) - log.Printf(newFmt, args...) + log.Printf(fmt.Sprintf("Worker %s] %s", wc.WorkerID, format), args...) } func (wc *WorkerClient) Println(args ...interface{}) { - msg := fmt.Sprintf("Worker %s] %s", wc.WorkerID, fmt.Sprint(args...)) - log.Println(msg) -} - -// RouteWorker are all routes server received from worker -func (o *Server) RouteWorker(wc *WorkerClient) { - // registerRoom event from a worker, when worker created a new room. - // RoomID is global so it is managed by coordinator. - wc.Receive("registerRoom", func(resp cws.WSPacket) cws.WSPacket { - log.Printf("Coordinator: Received registerRoom room %s from worker %s", resp.Data, wc.WorkerID) - o.roomToWorker[resp.Data] = wc.WorkerID - log.Printf("Coordinator: Current room list is: %+v", o.roomToWorker) - - return cws.WSPacket{ - ID: "registerRoom", - } - }) - - // closeRoom event from a worker, when worker close a room - wc.Receive("closeRoom", func(resp cws.WSPacket) cws.WSPacket { - log.Printf("Coordinator: Received closeRoom room %s from worker %s", resp.Data, wc.WorkerID) - delete(o.roomToWorker, resp.Data) - log.Printf("Coordinator: Current room list is: %+v", o.roomToWorker) - - return cws.WSPacket{ - ID: "closeRoom", - } - }) - - // getRoom returns the server ID based on requested roomID. - wc.Receive("getRoom", func(resp cws.WSPacket) cws.WSPacket { - log.Println("Coordinator: Received a getroom request") - log.Println("Result: ", o.roomToWorker[resp.Data]) - return cws.WSPacket{ - ID: "getRoom", - Data: o.roomToWorker[resp.Data], - } - }) - - wc.Receive("heartbeat", func(resp cws.WSPacket) cws.WSPacket { - return resp - }) - - /* WebRTC */ - wc.Receive("candidate", func(resp cws.WSPacket) cws.WSPacket { - wc.Println("Received IceCandidate from worker -> relay to browser") - bc, ok := o.browserClients[resp.SessionID] - if ok { - // Remove SessionID while sending back to browser - resp.SessionID = "" - bc.Send(resp, nil) - } else { - wc.Println("Error: unknown SessionID:", resp.SessionID) - } - - return cws.EmptyPacket - }) + log.Println(fmt.Sprintf("Worker %s] %s", wc.WorkerID, fmt.Sprint(args...))) } diff --git a/pkg/api/coordinator_api.go b/pkg/cws/api/api.go similarity index 50% rename from pkg/api/coordinator_api.go rename to pkg/cws/api/api.go index f0988077..d261a15a 100644 --- a/pkg/api/coordinator_api.go +++ b/pkg/cws/api/api.go @@ -1,29 +1,11 @@ package api -import ( - "encoding/json" -) +import "encoding/json" // This list of postfixes is used in the API: // - *Request postfix denotes clients calls (i.e. from a browser to the HTTP-server). // - *Call postfix denotes IPC calls (from the coordinator to a worker). -type GameStartRequest struct { - GameName string `json:"game_name"` - IsMobile bool `json:"is_mobile"` -} - -func (packet *GameStartRequest) From(data string) error { return from(packet, data) } - -type GameStartCall struct { - Name string `json:"name"` - Path string `json:"path"` - Type string `json:"type"` -} - -func (packet *GameStartCall) From(data string) error { return from(packet, data) } -func (packet *GameStartCall) To() (string, error) { return to(packet) } - func from(source interface{}, data string) error { err := json.Unmarshal([]byte(data), source) if err != nil { diff --git a/pkg/cws/api/coordinator.go b/pkg/cws/api/coordinator.go new file mode 100644 index 00000000..d2688cff --- /dev/null +++ b/pkg/cws/api/coordinator.go @@ -0,0 +1,51 @@ +package api + +import "github.com/giongto35/cloud-game/v2/pkg/cws" + +const ( + ConfigRequest = "config_request" + GetRoom = "get_room" + CloseRoom = "close_room" + RegisterRoom = "register_room" + Heartbeat = "heartbeat" + IceCandidate = "ice_candidate" + + NoData = "" + + InitWebrtc = "init_webrtc" + Answer = "answer" + + GameStart = "start" + GameQuit = "quit" + GameSave = "save" + GameLoad = "load" + GamePlayerSelect = "player_index" + GameMultitap = "multitap" +) + +type GameStartRequest struct { + GameName string `json:"game_name"` + IsMobile bool `json:"is_mobile"` +} + +func (packet *GameStartRequest) From(data string) error { return from(packet, data) } + +type GameStartCall struct { + Name string `json:"name"` + Path string `json:"path"` + Type string `json:"type"` +} + +func (packet *GameStartCall) From(data string) error { return from(packet, data) } +func (packet *GameStartCall) To() (string, error) { return to(packet) } + +// +// *** packets *** +// +func ConfigPacket() cws.WSPacket { return cws.WSPacket{ID: ConfigRequest} } +func RegisterRoomPacket(data string) cws.WSPacket { return cws.WSPacket{ID: RegisterRoom, Data: data} } +func GetRoomPacket(data string) cws.WSPacket { return cws.WSPacket{ID: GetRoom, Data: data} } +func CloseRoomPacket(data string) cws.WSPacket { return cws.WSPacket{ID: CloseRoom, Data: data} } +func IceCandidatePacket(data string, sessionId string) cws.WSPacket { + return cws.WSPacket{ID: IceCandidate, Data: data, SessionID: sessionId} +} diff --git a/pkg/cws/api/worker.go b/pkg/cws/api/worker.go new file mode 100644 index 00000000..0bd9f707 --- /dev/null +++ b/pkg/cws/api/worker.go @@ -0,0 +1,21 @@ +package api + +import "github.com/giongto35/cloud-game/v2/pkg/cws" + +const ( + ServerId = "server_id" + TerminateSession = "terminateSession" +) + +type ConfPushCall struct { + Data []byte `json:"data"` +} + +func (packet *ConfPushCall) From(data string) error { return from(packet, data) } +func (packet *ConfPushCall) To() (string, error) { return to(packet) } + +func ServerIdPacket(id string) cws.WSPacket { return cws.WSPacket{ID: ServerId, Data: id} } +func ConfigRequestPacket(conf []byte) cws.WSPacket { return cws.WSPacket{Data: string(conf)} } +func TerminateSessionPacket(sessionId string) cws.WSPacket { + return cws.WSPacket{ID: TerminateSession, SessionID: sessionId} +} diff --git a/pkg/cws/cws.go b/pkg/cws/cws.go index 97cc29e0..8b7342fb 100644 --- a/pkg/cws/cws.go +++ b/pkg/cws/cws.go @@ -11,33 +11,37 @@ import ( "github.com/gorilla/websocket" ) -type Client struct { - id string +type ( + Client struct { + id string - conn *websocket.Conn + conn *websocket.Conn - sendLock sync.Mutex - // sendCallback is callback based on packetID - sendCallback map[string]func(req WSPacket) - sendCallbackLock sync.Mutex - // recvCallback is callback when receive based on ID of the packet - recvCallback map[string]func(req WSPacket) + sendLock sync.Mutex + // sendCallback is callback based on packetID + sendCallback map[string]func(req WSPacket) + sendCallbackLock sync.Mutex + // recvCallback is callback when receive based on ID of the packet + recvCallback map[string]func(req WSPacket) - Done chan struct{} -} + Done chan struct{} + } -type WSPacket struct { - ID string `json:"id"` - // TODO: Make Data generic: map[string]interface{} for more usecases - Data string `json:"data"` + WSPacket struct { + ID string `json:"id"` + // TODO: Make Data generic: map[string]interface{} for more usecases + Data string `json:"data"` - RoomID string `json:"room_id"` - PlayerIndex int `json:"player_index"` + RoomID string `json:"room_id"` + PlayerIndex int `json:"player_index"` - PacketID string `json:"packet_id"` - // Globally ID of a browser session - SessionID string `json:"session_id"` -} + PacketID string `json:"packet_id"` + // Globally ID of a browser session + SessionID string `json:"session_id"` + } + + PacketHandler func(resp WSPacket) (req WSPacket) +) var EmptyPacket = WSPacket{} @@ -93,7 +97,7 @@ func (c *Client) Send(request WSPacket, callback func(response WSPacket)) { } // Receive receive and response back -func (c *Client) Receive(id string, f func(response WSPacket) (request WSPacket)) { +func (c *Client) Receive(id string, f PacketHandler) { c.recvCallback[id] = func(response WSPacket) { defer func() { if err := recover(); err != nil { @@ -161,6 +165,7 @@ func (c *Client) Heartbeat() { return default: } + // !to resolve cycle deps c.Send(WSPacket{ID: "heartbeat"}, nil) } } diff --git a/pkg/emulator/libretro/nanoarch/nanoarch_test.go b/pkg/emulator/libretro/nanoarch/nanoarch_test.go index 2cc20a58..bbba57e1 100644 --- a/pkg/emulator/libretro/nanoarch/nanoarch_test.go +++ b/pkg/emulator/libretro/nanoarch/nanoarch_test.go @@ -61,7 +61,9 @@ func GetEmulatorMock(room string, system string) *EmulatorMock { configPath := rootPath + "configs/" var conf worker.Config - config.LoadConfig(&conf, configPath) + if err := config.LoadConfig(&conf, configPath); err != nil { + panic(err) + } meta := conf.Emulator.GetLibretroCoreConfig(system) diff --git a/pkg/lock/lock.go b/pkg/lock/lock.go new file mode 100644 index 00000000..482ad100 --- /dev/null +++ b/pkg/lock/lock.go @@ -0,0 +1,48 @@ +package lock + +import ( + "sync/atomic" + "time" +) + +type TimeLock struct { + l chan struct{} + locked int32 +} + +// NewLock returns new lock (mutex) with a timeout option. +func NewLock() *TimeLock { + return &TimeLock{l: make(chan struct{}, 1)} +} + +// Lock unconditionally blocks the execution. +func (tl *TimeLock) Lock() { + if tl.isLocked() { + return + } + tl.lock() + <-tl.l +} + +// LockFor blocks the execution at most for +// the given period of time. +func (tl *TimeLock) LockFor(d time.Duration) { + tl.lock() + select { + case <-tl.l: + case <-time.After(d): + } +} + +// Unlock removes the current block if any. +func (tl *TimeLock) Unlock() { + if !tl.isLocked() { + return + } + tl.unlock() + tl.l <- struct{}{} +} + +func (tl *TimeLock) isLocked() bool { return atomic.LoadInt32(&tl.locked) == 1 } +func (tl *TimeLock) lock() { atomic.StoreInt32(&tl.locked, 1) } +func (tl *TimeLock) unlock() { atomic.StoreInt32(&tl.locked, 0) } diff --git a/pkg/lock/lock_test.go b/pkg/lock/lock_test.go new file mode 100644 index 00000000..7d26170a --- /dev/null +++ b/pkg/lock/lock_test.go @@ -0,0 +1,38 @@ +package lock + +import ( + "testing" + "time" +) + +func TestLock(t *testing.T) { + a := 1 + lock := NewLock() + wait := time.Millisecond * 10 + + lock.Unlock() + lock.Unlock() + lock.Unlock() + + go func(timeLock *TimeLock) { + time.Sleep(time.Second * 1) + lock.Unlock() + }(lock) + + lock.LockFor(time.Second * 30) + lock.LockFor(wait) + lock.LockFor(wait) + lock.LockFor(wait) + lock.LockFor(wait) + lock.LockFor(time.Millisecond * 10) + go func(timeLock *TimeLock) { + time.Sleep(time.Millisecond * 1) + lock.Unlock() + }(lock) + lock.Lock() + + a -= 1 + if a != 0 { + t.Errorf("lock test failed because a != 0") + } +} diff --git a/pkg/monitoring/monitoring.go b/pkg/monitoring/monitoring.go index 210c9b0f..1df212e1 100644 --- a/pkg/monitoring/monitoring.go +++ b/pkg/monitoring/monitoring.go @@ -7,39 +7,26 @@ import ( "net/http/pprof" "strings" + "github.com/giongto35/cloud-game/v2/pkg/config/monitoring" + config "github.com/giongto35/cloud-game/v2/pkg/config/worker" "github.com/golang/glog" "github.com/prometheus/client_golang/prometheus/promhttp" ) -type ServerMonitoringConfig struct { - Port int - URLPrefix string - MetricEnabled bool `json:"metric_enabled"` - ProfilingEnabled bool `json:"profiling_enabled"` -} - type ServerMonitoring struct { - cfg ServerMonitoringConfig - + cfg monitoring.ServerMonitoringConfig + tag string server http.Server } -func NewServerMonitoring(cfg ServerMonitoringConfig) *ServerMonitoring { - if cfg.Port == 0 { - cfg.Port = 6365 - } +func NewServerMonitoring(cfg monitoring.ServerMonitoringConfig, tag string) *ServerMonitoring { + return &ServerMonitoring{cfg: validate(&cfg), tag: tag} +} - if len(cfg.URLPrefix) > 0 { - cfg.URLPrefix = strings.TrimSpace(cfg.URLPrefix) - if !strings.HasPrefix(cfg.URLPrefix, "/") { - cfg.URLPrefix = "/" + cfg.URLPrefix - } - - if strings.HasSuffix(cfg.URLPrefix, "/") { - cfg.URLPrefix = strings.TrimSuffix(cfg.URLPrefix, "/") - } - } - return &ServerMonitoring{cfg: cfg} +func (sm *ServerMonitoring) Init(conf interface{}) error { + cfg := conf.(config.Config).Worker.Monitoring + sm.cfg = validate(&cfg) + return nil } func (sm *ServerMonitoring) Run() error { @@ -50,11 +37,11 @@ func (sm *ServerMonitoring) Run() error { Addr: fmt.Sprintf(":%d", sm.cfg.Port), Handler: monitoringServerMux, } - glog.Infoln("Starting monitoring server at", srv.Addr) + glog.Infof("[%v] Starting monitoring server at %v", sm.tag, srv.Addr) if sm.cfg.ProfilingEnabled { pprofPath := fmt.Sprintf("%s/debug/pprof", sm.cfg.URLPrefix) - glog.Infoln("Profiling is enabled at", srv.Addr+pprofPath) + glog.Infof("[%v] Profiling is enabled at %v", sm.tag, srv.Addr+pprofPath) monitoringServerMux.Handle(pprofPath+"/", http.HandlerFunc(pprof.Index)) monitoringServerMux.Handle(pprofPath+"/cmdline", http.HandlerFunc(pprof.Cmdline)) monitoringServerMux.Handle(pprofPath+"/profile", http.HandlerFunc(pprof.Profile)) @@ -72,17 +59,34 @@ func (sm *ServerMonitoring) Run() error { if sm.cfg.MetricEnabled { metricPath := fmt.Sprintf("%s/metrics", sm.cfg.URLPrefix) - glog.Infoln("Prometheus metric is enabled at", srv.Addr+metricPath) + glog.Infof("[%v] Prometheus metric is enabled at %v", sm.tag, srv.Addr+metricPath) monitoringServerMux.Handle(metricPath, promhttp.Handler()) } sm.server = srv return srv.ListenAndServe() } - glog.Infoln("Monitoring server is disabled via config") return nil } func (sm *ServerMonitoring) Shutdown(ctx context.Context) error { - glog.Infoln("Shutting down monitoring server") + glog.Infof("[%v] Shutting down monitoring server", sm.tag) return sm.server.Shutdown(ctx) } + +func validate(conf *monitoring.ServerMonitoringConfig) monitoring.ServerMonitoringConfig { + if conf.Port == 0 { + conf.Port = 6365 + } + + if len(conf.URLPrefix) > 0 { + conf.URLPrefix = strings.TrimSpace(conf.URLPrefix) + if !strings.HasPrefix(conf.URLPrefix, "/") { + conf.URLPrefix = "/" + conf.URLPrefix + } + + if strings.HasSuffix(conf.URLPrefix, "/") { + conf.URLPrefix = strings.TrimSuffix(conf.URLPrefix, "/") + } + } + return *conf +} diff --git a/pkg/server/server.go b/pkg/server/server.go new file mode 100644 index 00000000..288e4b05 --- /dev/null +++ b/pkg/server/server.go @@ -0,0 +1,9 @@ +package server + +import "context" + +type Server interface { + Init(conf interface{}) error + Run() error + Shutdown(ctx context.Context) error +} diff --git a/pkg/worker/coordinator.go b/pkg/worker/coordinator.go index 48f12264..45951bd1 100644 --- a/pkg/worker/coordinator.go +++ b/pkg/worker/coordinator.go @@ -1,18 +1,7 @@ package worker import ( - "encoding/json" - "log" - "strconv" - - "github.com/giongto35/cloud-game/v2/pkg/api" - webrtcConfig "github.com/giongto35/cloud-game/v2/pkg/config/webrtc" "github.com/giongto35/cloud-game/v2/pkg/cws" - "github.com/giongto35/cloud-game/v2/pkg/encoder" - "github.com/giongto35/cloud-game/v2/pkg/games" - "github.com/giongto35/cloud-game/v2/pkg/util" - "github.com/giongto35/cloud-game/v2/pkg/webrtc" - "github.com/giongto35/cloud-game/v2/pkg/worker/room" "github.com/gorilla/websocket" ) @@ -34,303 +23,3 @@ func NewCoordinatorClient(oc *websocket.Conn) *CoordinatorClient { } return oClient } - -// RouteCoordinator are all routes server received from coordinator. -func (h *Handler) RouteCoordinator() { - oClient := h.oClient - - /* Coordinator */ - - // Received from coordinator the serverID - oClient.Receive("serverID", func(response cws.WSPacket) (request cws.WSPacket) { - // Stick session with serverID got from coordinator - log.Println("Received serverID ", response.Data) - h.serverID = response.Data - - return cws.EmptyPacket - }) - - /* WebRTC Connection */ - - oClient.Receive("initwebrtc", func(resp cws.WSPacket) (req cws.WSPacket) { - log.Println("Received a request to createOffer from browser via coordinator") - - peerconnection := webrtc.NewWebRTC().WithConfig( - webrtcConfig.Config{Encoder: h.cfg.Encoder, Webrtc: h.cfg.Webrtc}, - ) - var initPacket struct { - IsMobile bool `json:"is_mobile"` - } - err := json.Unmarshal([]byte(resp.Data), &initPacket) - if err != nil { - log.Println("Error: Cannot decode json:", err) - return cws.EmptyPacket - } - - localSession, err := peerconnection.StartClient( - initPacket.IsMobile, - func(candidate string) { - // send back candidate string to browser - oClient.Send(cws.WSPacket{ - ID: "candidate", - Data: candidate, - SessionID: resp.SessionID, - }, nil) - }, - ) - - // localSession, err := peerconnection.StartClient(initPacket.IsMobile, iceCandidates[resp.SessionID]) - // h.peerconnections[resp.SessionID] = peerconnection - - // Create new sessions when we have new peerconnection initialized - session := &Session{ - peerconnection: peerconnection, - } - h.sessions[resp.SessionID] = session - log.Println("Start peerconnection", resp.SessionID) - - if err != nil { - log.Println("Error: Cannot create new webrtc session", err) - return cws.EmptyPacket - } - - return cws.WSPacket{ - ID: "offer", - Data: localSession, - } - }) - - oClient.Receive("answer", func(resp cws.WSPacket) (req cws.WSPacket) { - log.Println("Received answer SDP from browser") - session := h.getSession(resp.SessionID) - - if session != nil { - peerconnection := session.peerconnection - - err := peerconnection.SetRemoteSDP(resp.Data) - if err != nil { - log.Println("Error: Cannot set RemoteSDP of client: " + resp.SessionID) - } - } else { - log.Printf("Error: No session for ID: %s\n", resp.SessionID) - } - - return cws.EmptyPacket - }) - - oClient.Receive("candidate", func(resp cws.WSPacket) (req cws.WSPacket) { - log.Println("Received remote Ice Candidate from browser") - session := h.getSession(resp.SessionID) - - if session != nil { - peerconnection := session.peerconnection - - err := peerconnection.AddCandidate(resp.Data) - if err != nil { - log.Println("Error: Cannot add IceCandidate of client: " + resp.SessionID) - } - } else { - log.Printf("Error: No session for ID: %s\n", resp.SessionID) - } - - return cws.EmptyPacket - }) - - /* Game Logic */ - - oClient.Receive("start", func(resp cws.WSPacket) (req cws.WSPacket) { - log.Println("Received a start request from coordinator") - session := h.getSession(resp.SessionID) - if session == nil { - log.Printf("Error: No session for ID: %s\n", resp.SessionID) - return cws.EmptyPacket - } - - peerconnection := session.peerconnection - // TODO: Standardize for all types of packet. Make WSPacket generic - startPacket := api.GameStartCall{} - if err := startPacket.From(resp.Data); err != nil { - return cws.EmptyPacket - } - gameMeta := games.GameMetadata{ - Name: startPacket.Name, - Type: startPacket.Type, - Path: startPacket.Path, - } - - room := h.startGameHandler(gameMeta, resp.RoomID, resp.PlayerIndex, peerconnection, util.GetVideoEncoder(false)) - session.RoomID = room.ID - // TODO: can data race - h.rooms[room.ID] = room - - return cws.WSPacket{ - ID: "start", - RoomID: room.ID, - } - }) - - oClient.Receive("quit", func(resp cws.WSPacket) (req cws.WSPacket) { - log.Println("Received a quit request from coordinator") - session := h.getSession(resp.SessionID) - - if session != nil { - room := h.getRoom(session.RoomID) - // Defensive coding, check if the peerconnection is in room - if room.IsPCInRoom(session.peerconnection) { - h.detachPeerConn(session.peerconnection) - } - } else { - log.Printf("Error: No session for ID: %s\n", resp.SessionID) - } - - return cws.EmptyPacket - }) - - oClient.Receive("save", func(resp cws.WSPacket) (req cws.WSPacket) { - log.Println("Received a save game from coordinator") - log.Println("RoomID:", resp.RoomID) - req.ID = "save" - req.Data = "ok" - if resp.RoomID != "" { - room := h.getRoom(resp.RoomID) - if room == nil { - return - } - err := room.SaveGame() - if err != nil { - log.Println("[!] Cannot save game state: ", err) - req.Data = "error" - } - } else { - req.Data = "error" - } - - return req - }) - - oClient.Receive("load", func(resp cws.WSPacket) (req cws.WSPacket) { - log.Println("Received a load game from coordinator") - log.Println("Loading game state") - req.ID = "load" - req.Data = "ok" - if resp.RoomID != "" { - room := h.getRoom(resp.RoomID) - err := room.LoadGame() - if err != nil { - log.Println("[!] Cannot load game state: ", err) - req.Data = "error" - } - } else { - req.Data = "error" - } - - return req - }) - - oClient.Receive("playerIdx", func(resp cws.WSPacket) (req cws.WSPacket) { - log.Println("Received an update player index event from coordinator") - req.ID = "playerIdx" - - room := h.getRoom(resp.RoomID) - session := h.getSession(resp.SessionID) - idx, err := strconv.Atoi(resp.Data) - log.Printf("Got session %v and room %v", session, room) - - if room != nil && session != nil && err == nil { - room.UpdatePlayerIndex(session.peerconnection, idx) - req.Data = strconv.Itoa(idx) - } else { - req.Data = "error" - } - - return req - }) - - oClient.Receive("multitap", func(resp cws.WSPacket) (req cws.WSPacket) { - log.Println("Received a multitap toggle from coordinator") - req.ID = "multitap" - req.Data = "ok" - if resp.RoomID != "" { - room := h.getRoom(resp.RoomID) - err := room.ToggleMultitap() - if err != nil { - log.Println("[!] Could not toggle multitap state: ", err) - req.Data = "error" - } - } else { - req.Data = "error" - } - - return req - }) - - oClient.Receive("terminateSession", func(resp cws.WSPacket) (req cws.WSPacket) { - log.Println("Received a terminate session ", resp.SessionID) - session := h.getSession(resp.SessionID) - if session != nil { - session.Close() - delete(h.sessions, resp.SessionID) - h.detachPeerConn(session.peerconnection) - } else { - log.Printf("Error: No session for ID: %s\n", resp.SessionID) - } - - return cws.EmptyPacket - }) -} - -func getServerIDOfRoom(oc *CoordinatorClient, roomID string) string { - log.Println("Request coordinator roomID ", roomID) - packet := oc.SyncSend( - cws.WSPacket{ - ID: "getRoom", - Data: roomID, - }, - ) - log.Println("Received roomID from coordinator ", packet.Data) - - return packet.Data -} - -// startGameHandler starts a game if roomID is given, if not create new room -func (h *Handler) startGameHandler(game games.GameMetadata, existedRoomID string, playerIndex int, peerconnection *webrtc.WebRTC, videoCodec encoder.VideoCodec) *room.Room { - log.Printf("Loading game: %v\n", game.Name) - // If we are connecting to coordinator, request corresponding serverID based on roomID - // TODO: check if existedRoomID is in the current server - room := h.getRoom(existedRoomID) - // If room is not running - if room == nil { - log.Println("Got Room from local ", room, " ID: ", existedRoomID) - // Create new room and update player index - room = h.createNewRoom(game, existedRoomID, videoCodec) - room.UpdatePlayerIndex(peerconnection, playerIndex) - - // Wait for done signal from room - go func() { - <-room.Done - h.detachRoom(room.ID) - // send signal to coordinator that the room is closed, coordinator will remove that room - h.oClient.Send(cws.WSPacket{ - ID: "closeRoom", - Data: room.ID, - }, nil) - }() - } - - // Attach peerconnection to room. If PC is already in room, don't detach - log.Println("Is PC in room", room.IsPCInRoom(peerconnection)) - if !room.IsPCInRoom(peerconnection) { - h.detachPeerConn(peerconnection) - room.AddConnectionToRoom(peerconnection) - } - - // Register room to coordinator if we are connecting to coordinator - if room != nil && h.oClient != nil { - h.oClient.Send(cws.WSPacket{ - ID: "registerRoom", - Data: room.ID, - }, nil) - } - - return room -} diff --git a/pkg/worker/handlers.go b/pkg/worker/handlers.go index 77199abf..749002cd 100644 --- a/pkg/worker/handlers.go +++ b/pkg/worker/handlers.go @@ -9,6 +9,7 @@ import ( "time" "github.com/giongto35/cloud-game/v2/pkg/config/worker" + "github.com/giongto35/cloud-game/v2/pkg/cws/api" "github.com/giongto35/cloud-game/v2/pkg/emulator/libretro/manager/remotehttp" "github.com/giongto35/cloud-game/v2/pkg/encoder" "github.com/giongto35/cloud-game/v2/pkg/environment" @@ -37,10 +38,12 @@ type Handler struct { onlineStorage *storage.Client // sessions handles all sessions server is handler (key is sessionID) sessions map[string]*Session + + w *Worker } // NewHandler returns a new server -func NewHandler(cfg worker.Config) *Handler { +func NewHandler(cfg worker.Config, wrk *Worker) *Handler { // Create offline storage folder createOfflineStorage() @@ -52,29 +55,38 @@ func NewHandler(cfg worker.Config) *Handler { coordinatorHost: cfg.Worker.Network.CoordinatorAddress, cfg: cfg, onlineStorage: onlineStorage, + w: wrk, } } // Run starts a Handler running logic func (h *Handler) Run() { + conf := h.cfg.Worker.Network for { - conf := h.cfg.Worker.Network - oClient, err := setupCoordinatorConnection(conf.CoordinatorAddress, conf.Zone, h.cfg) + conn, err := setupCoordinatorConnection(conf.CoordinatorAddress, conf.Zone, h.cfg) if err != nil { log.Printf("Cannot connect to coordinator. %v Retrying...", err) time.Sleep(time.Second) continue } + log.Printf("[worker] connected to: %v", conf.CoordinatorAddress) - h.oClient = oClient - log.Println("Connected to coordinator successfully.", oClient, err) + h.oClient = conn go h.oClient.Heartbeat() - h.RouteCoordinator() + h.routes() h.oClient.Listen() // If cannot listen, reconnect to coordinator } } +func (h *Handler) RequestConfig() { + log.Printf("[worker] asking for a config...") + response := h.oClient.SyncSend(api.ConfigPacket()) + conf := worker.EmptyConfig() + conf.Deserialize([]byte(response.Data)) + log.Printf("[worker] pulled config: %+v", conf) +} + func (h *Handler) Prepare() { if !h.cfg.Emulator.Libretro.Cores.Repo.Sync { return diff --git a/pkg/worker/internalhandlers.go b/pkg/worker/internalhandlers.go new file mode 100644 index 00000000..1822994d --- /dev/null +++ b/pkg/worker/internalhandlers.go @@ -0,0 +1,296 @@ +package worker + +import ( + "encoding/json" + "log" + "strconv" + + webrtcConfig "github.com/giongto35/cloud-game/v2/pkg/config/webrtc" + "github.com/giongto35/cloud-game/v2/pkg/cws" + "github.com/giongto35/cloud-game/v2/pkg/cws/api" + "github.com/giongto35/cloud-game/v2/pkg/encoder" + "github.com/giongto35/cloud-game/v2/pkg/games" + "github.com/giongto35/cloud-game/v2/pkg/util" + "github.com/giongto35/cloud-game/v2/pkg/webrtc" + "github.com/giongto35/cloud-game/v2/pkg/worker/room" +) + +func (h *Handler) handleServerId() cws.PacketHandler { + return func(resp cws.WSPacket) (req cws.WSPacket) { + log.Printf("[worker] new id: %s", resp.Data) + h.serverID = resp.Data + // unlock worker if it's locked + h.w.lock.Unlock() + return + } +} + +func (h *Handler) handleTerminateSession() cws.PacketHandler { + return func(resp cws.WSPacket) (req cws.WSPacket) { + log.Println("Received a terminate session ", resp.SessionID) + session := h.getSession(resp.SessionID) + if session != nil { + session.Close() + delete(h.sessions, resp.SessionID) + h.detachPeerConn(session.peerconnection) + } else { + log.Printf("Error: No session for ID: %s\n", resp.SessionID) + } + + return cws.EmptyPacket + } +} + +func (h *Handler) handleInitWebrtc() cws.PacketHandler { + return func(resp cws.WSPacket) (req cws.WSPacket) { + log.Println("Received a request to createOffer from browser via coordinator") + + peerconnection := webrtc.NewWebRTC().WithConfig( + webrtcConfig.Config{Encoder: h.cfg.Encoder, Webrtc: h.cfg.Webrtc}, + ) + var initPacket struct { + IsMobile bool `json:"is_mobile"` + } + err := json.Unmarshal([]byte(resp.Data), &initPacket) + if err != nil { + log.Println("Error: Cannot decode json:", err) + return cws.EmptyPacket + } + + localSession, err := peerconnection.StartClient( + initPacket.IsMobile, + // send back candidate string to browser + func(cd string) { h.oClient.Send(api.IceCandidatePacket(cd, resp.SessionID), nil) }, + ) + + // localSession, err := peerconnection.StartClient(initPacket.IsMobile, iceCandidates[resp.SessionID]) + // h.peerconnections[resp.SessionID] = peerconnection + + // Create new sessions when we have new peerconnection initialized + session := &Session{ + peerconnection: peerconnection, + } + h.sessions[resp.SessionID] = session + log.Println("Start peerconnection", resp.SessionID) + + if err != nil { + log.Println("Error: Cannot create new webrtc session", err) + return cws.EmptyPacket + } + + return cws.WSPacket{ + ID: "offer", + Data: localSession, + } + } +} + +func (h *Handler) handleAnswer() cws.PacketHandler { + return func(resp cws.WSPacket) (req cws.WSPacket) { + log.Println("Received answer SDP from browser") + session := h.getSession(resp.SessionID) + if session != nil { + peerconnection := session.peerconnection + + err := peerconnection.SetRemoteSDP(resp.Data) + if err != nil { + log.Println("Error: Cannot set RemoteSDP of client: " + resp.SessionID) + } + } else { + log.Printf("Error: No session for ID: %s\n", resp.SessionID) + } + return cws.EmptyPacket + } +} + +func (h *Handler) handleIceCandidate() cws.PacketHandler { + return func(resp cws.WSPacket) (req cws.WSPacket) { + log.Println("Received remote Ice Candidate from browser") + session := h.getSession(resp.SessionID) + + if session != nil { + peerconnection := session.peerconnection + + err := peerconnection.AddCandidate(resp.Data) + if err != nil { + log.Println("Error: Cannot add IceCandidate of client: " + resp.SessionID) + } + } else { + log.Printf("Error: No session for ID: %s\n", resp.SessionID) + } + + return cws.EmptyPacket + } +} + +func (h *Handler) handleGameStart() cws.PacketHandler { + return func(resp cws.WSPacket) (req cws.WSPacket) { + log.Println("Received a start request from coordinator") + session := h.getSession(resp.SessionID) + if session == nil { + log.Printf("Error: No session for ID: %s\n", resp.SessionID) + return cws.EmptyPacket + } + + peerconnection := session.peerconnection + // TODO: Standardize for all types of packet. Make WSPacket generic + startPacket := api.GameStartCall{} + if err := startPacket.From(resp.Data); err != nil { + return cws.EmptyPacket + } + gameMeta := games.GameMetadata{ + Name: startPacket.Name, + Type: startPacket.Type, + Path: startPacket.Path, + } + + room := h.startGameHandler(gameMeta, resp.RoomID, resp.PlayerIndex, peerconnection, util.GetVideoEncoder(false)) + session.RoomID = room.ID + // TODO: can data race + h.rooms[room.ID] = room + + return cws.WSPacket{ID: "start", RoomID: room.ID} + } +} + +func (h *Handler) handleGameQuit() cws.PacketHandler { + return func(resp cws.WSPacket) (req cws.WSPacket) { + log.Println("Received a quit request from coordinator") + session := h.getSession(resp.SessionID) + + if session != nil { + room := h.getRoom(session.RoomID) + // Defensive coding, check if the peerconnection is in room + if room.IsPCInRoom(session.peerconnection) { + h.detachPeerConn(session.peerconnection) + } + } else { + log.Printf("Error: No session for ID: %s\n", resp.SessionID) + } + + return cws.EmptyPacket + } +} + +func (h *Handler) handleGameSave() cws.PacketHandler { + return func(resp cws.WSPacket) (req cws.WSPacket) { + log.Println("Received a save game from coordinator") + log.Println("RoomID:", resp.RoomID) + req.ID = api.GameSave + req.Data = "ok" + if resp.RoomID != "" { + room := h.getRoom(resp.RoomID) + if room == nil { + return + } + err := room.SaveGame() + if err != nil { + log.Println("[!] Cannot save game state: ", err) + req.Data = "error" + } + } else { + req.Data = "error" + } + + return req + } +} + +func (h *Handler) handleGameLoad() cws.PacketHandler { + return func(resp cws.WSPacket) (req cws.WSPacket) { + log.Println("Received a load game from coordinator") + log.Println("Loading game state") + req.ID = api.GameLoad + req.Data = "ok" + if resp.RoomID != "" { + room := h.getRoom(resp.RoomID) + err := room.LoadGame() + if err != nil { + log.Println("[!] Cannot load game state: ", err) + req.Data = "error" + } + } else { + req.Data = "error" + } + + return req + } +} + +func (h *Handler) handleGamePlayerSelect() cws.PacketHandler { + return func(resp cws.WSPacket) (req cws.WSPacket) { + log.Println("Received an update player index event from coordinator") + req.ID = api.GamePlayerSelect + + room := h.getRoom(resp.RoomID) + session := h.getSession(resp.SessionID) + idx, err := strconv.Atoi(resp.Data) + log.Printf("Got session %v and room %v", session, room) + + if room != nil && session != nil && err == nil { + room.UpdatePlayerIndex(session.peerconnection, idx) + req.Data = strconv.Itoa(idx) + } else { + req.Data = "error" + } + + return req + } +} + +func (h *Handler) handleGameMultitap() cws.PacketHandler { + return func(resp cws.WSPacket) (req cws.WSPacket) { + log.Println("Received a multitap toggle from coordinator") + req.ID = api.GameMultitap + req.Data = "ok" + if resp.RoomID != "" { + room := h.getRoom(resp.RoomID) + err := room.ToggleMultitap() + if err != nil { + log.Println("[!] Could not toggle multitap state: ", err) + req.Data = "error" + } + } else { + req.Data = "error" + } + + return req + } +} + +// startGameHandler starts a game if roomID is given, if not create new room +func (h *Handler) startGameHandler(game games.GameMetadata, existedRoomID string, playerIndex int, peerconnection *webrtc.WebRTC, videoCodec encoder.VideoCodec) *room.Room { + log.Printf("Loading game: %v\n", game.Name) + // If we are connecting to coordinator, request corresponding serverID based on roomID + // TODO: check if existedRoomID is in the current server + room := h.getRoom(existedRoomID) + // If room is not running + if room == nil { + log.Println("Got Room from local ", room, " ID: ", existedRoomID) + // Create new room and update player index + room = h.createNewRoom(game, existedRoomID, videoCodec) + room.UpdatePlayerIndex(peerconnection, playerIndex) + + // Wait for done signal from room + go func() { + <-room.Done + h.detachRoom(room.ID) + // send signal to coordinator that the room is closed, coordinator will remove that room + h.oClient.Send(api.CloseRoomPacket(room.ID), nil) + }() + } + + // Attach peerconnection to room. If PC is already in room, don't detach + log.Println("Is PC in room", room.IsPCInRoom(peerconnection)) + if !room.IsPCInRoom(peerconnection) { + h.detachPeerConn(peerconnection) + room.AddConnectionToRoom(peerconnection) + } + + // Register room to coordinator if we are connecting to coordinator + if room != nil && h.oClient != nil { + h.oClient.Send(api.RegisterRoomPacket(room.ID), nil) + } + + return room +} diff --git a/pkg/worker/room/room_test.go b/pkg/worker/room/room_test.go index 24dacbb4..7f2eb831 100644 --- a/pkg/worker/room/room_test.go +++ b/pkg/worker/room/room_test.go @@ -226,7 +226,9 @@ func getRoomMock(cfg roomMockConfig) roomMock { cfg.game.Path = cfg.gamesPath + cfg.game.Path var conf worker.Config - config.LoadConfig(&conf, whereIsConfigs) + if err := config.LoadConfig(&conf, whereIsConfigs); err != nil { + panic(err) + } fixEmulators(&conf, cfg.autoGlContext) // sync cores coreManager := remotehttp.NewRemoteHttpManager(conf.Emulator.Libretro) diff --git a/pkg/worker/routes.go b/pkg/worker/routes.go new file mode 100644 index 00000000..07c157a6 --- /dev/null +++ b/pkg/worker/routes.go @@ -0,0 +1,22 @@ +package worker + +import "github.com/giongto35/cloud-game/v2/pkg/cws/api" + +func (h *Handler) routes() { + if h.oClient == nil { + return + } + + h.oClient.Receive(api.ServerId, h.handleServerId()) + h.oClient.Receive(api.TerminateSession, h.handleTerminateSession()) + h.oClient.Receive(api.InitWebrtc, h.handleInitWebrtc()) + h.oClient.Receive(api.Answer, h.handleAnswer()) + h.oClient.Receive(api.IceCandidate, h.handleIceCandidate()) + + h.oClient.Receive(api.GameStart, h.handleGameStart()) + h.oClient.Receive(api.GameQuit, h.handleGameQuit()) + h.oClient.Receive(api.GameSave, h.handleGameSave()) + h.oClient.Receive(api.GameLoad, h.handleGameLoad()) + h.oClient.Receive(api.GamePlayerSelect, h.handleGamePlayerSelect()) + h.oClient.Receive(api.GameMultitap, h.handleGameMultitap()) +} diff --git a/pkg/worker/server.go b/pkg/worker/server.go new file mode 100644 index 00000000..ae746cde --- /dev/null +++ b/pkg/worker/server.go @@ -0,0 +1,121 @@ +package worker + +import ( + "crypto/tls" + "fmt" + "log" + "net/http" + "strconv" + "time" + + "github.com/giongto35/cloud-game/v2/pkg/environment" + "golang.org/x/crypto/acme" + "golang.org/x/crypto/acme/autocert" +) + +const stagingLEURL = "https://acme-staging-v02.api.letsencrypt.org/directory" + +func makeServerFromMux(mux *http.ServeMux) *http.Server { + // set timeouts so that a slow or malicious client doesn't + // hold resources forever + return &http.Server{ + ReadTimeout: 5 * time.Second, + WriteTimeout: 5 * time.Second, + IdleTimeout: 120 * time.Second, + Handler: mux, + } +} + +func makeHTTPServer() *http.Server { + mux := &http.ServeMux{} + mux.HandleFunc("/echo", func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Access-Control-Allow-Origin", "*") + log.Println(w, "echo") + }) + + return makeServerFromMux(mux) +} + +func makeHTTPToHTTPSRedirectServer() *http.Server { + handleRedirect := func(w http.ResponseWriter, r *http.Request) { + newURI := "https://" + r.Host + r.URL.String() + http.Redirect(w, r, newURI, http.StatusFound) + } + mux := &http.ServeMux{} + mux.HandleFunc("/", handleRedirect) + + return makeServerFromMux(mux) +} + +func (wrk *Worker) spawnServer(port int) { + var certManager *autocert.Manager + var httpsSrv *http.Server + + mode := wrk.conf.Environment.Get() + if mode.AnyOf(environment.Production, environment.Staging) { + serverConfig := wrk.conf.Worker.Server + httpsSrv = makeHTTPServer() + httpsSrv.Addr = fmt.Sprintf(":%d", serverConfig.HttpsPort) + + if serverConfig.HttpsChain == "" || serverConfig.HttpsKey == "" { + serverConfig.HttpsChain = "" + serverConfig.HttpsKey = "" + + var leurl string + if mode == environment.Staging { + leurl = stagingLEURL + } else { + leurl = acme.LetsEncryptURL + } + + certManager = &autocert.Manager{ + Prompt: autocert.AcceptTOS, + Cache: autocert.DirCache("assets/cache"), + Client: &acme.Client{DirectoryURL: leurl}, + } + + httpsSrv.TLSConfig = &tls.Config{GetCertificate: certManager.GetCertificate} + } + + go func(chain string, key string) { + log.Printf("Starting HTTPS server on %s\n", httpsSrv.Addr) + err := httpsSrv.ListenAndServeTLS(chain, key) + if err != nil { + log.Printf("httpsSrv.ListendAndServeTLS() failed with %s", err) + } + }(serverConfig.HttpsChain, serverConfig.HttpsKey) + } + + var httpSrv *http.Server + if mode.AnyOf(environment.Production, environment.Staging) { + httpSrv = makeHTTPToHTTPSRedirectServer() + } else { + httpSrv = makeHTTPServer() + } + + if certManager != nil { + httpSrv.Handler = certManager.HTTPHandler(httpSrv.Handler) + } + + startServer(httpSrv, port) +} + +func startServer(serv *http.Server, startPort int) { + // It's recommend to run one worker on one instance. + // This logic is to make sure more than 1 workers still work + for port, n := startPort, startPort+100; port < n; port++ { + serv.Addr = ":" + strconv.Itoa(port) + err := serv.ListenAndServe() + switch err { + case http.ErrServerClosed: + log.Printf("HTTP(S) server was closed") + return + default: + } + port++ + + if port == n { + log.Printf("error: couldn't find an open port in range %v-%v\n", startPort, port) + } + } +} diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go index 8b90e56f..2661b5d3 100644 --- a/pkg/worker/worker.go +++ b/pkg/worker/worker.go @@ -2,177 +2,73 @@ package worker import ( "context" - "crypto/tls" - "fmt" "log" - "net/http" - "strconv" "time" "github.com/giongto35/cloud-game/v2/pkg/config/worker" - "github.com/giongto35/cloud-game/v2/pkg/environment" + "github.com/giongto35/cloud-game/v2/pkg/lock" "github.com/giongto35/cloud-game/v2/pkg/monitoring" + "github.com/giongto35/cloud-game/v2/pkg/server" "github.com/golang/glog" - "golang.org/x/crypto/acme" - "golang.org/x/crypto/acme/autocert" ) type Worker struct { - ctx context.Context - cfg worker.Config - - monitoringServer *monitoring.ServerMonitoring + ctx context.Context + conf worker.Config + servers []server.Server + // to pause initialization + lock *lock.TimeLock } -const stagingLEURL = "https://acme-staging-v02.api.letsencrypt.org/directory" +func New(ctx context.Context, conf worker.Config) *Worker { + return &Worker{ctx: ctx, conf: conf, lock: lock.NewLock()} +} -func New(ctx context.Context, cfg worker.Config) *Worker { - return &Worker{ - ctx: ctx, - cfg: cfg, - - monitoringServer: monitoring.NewServerMonitoring(cfg.Worker.Monitoring), +func (wrk *Worker) Run() { + go wrk.init() + wrk.servers = []server.Server{ + monitoring.NewServerMonitoring(wrk.conf.Worker.Monitoring, "worker"), } + wrk.startModules() } -func (o *Worker) Run() error { - go o.initializeWorker() - go o.RunMonitoringServer() - return nil -} - -func (o *Worker) RunMonitoringServer() { - glog.Infoln("Starting monitoring server for overwork") - err := o.monitoringServer.Run() - if err != nil { - glog.Errorf("Failed to start monitoring server, reason %s", err) - } -} - -func (o *Worker) Shutdown() { - // !to add a proper HTTP(S) server shutdown (cws/handler bad loop) - if err := o.monitoringServer.Shutdown(o.ctx); err != nil { - glog.Errorln("Failed to shutdown monitoring server") - } -} - -func makeServerFromMux(mux *http.ServeMux) *http.Server { - // set timeouts so that a slow or malicious client doesn't - // hold resources forever - return &http.Server{ - ReadTimeout: 5 * time.Second, - WriteTimeout: 5 * time.Second, - IdleTimeout: 120 * time.Second, - Handler: mux, - } -} - -func makeHTTPServer() *http.Server { - mux := &http.ServeMux{} - mux.HandleFunc("/echo", func(w http.ResponseWriter, r *http.Request) { - w.Header().Set("Access-Control-Allow-Origin", "*") - log.Println(w, "echo") - }) - - return makeServerFromMux(mux) -} - -func makeHTTPToHTTPSRedirectServer() *http.Server { - handleRedirect := func(w http.ResponseWriter, r *http.Request) { - newURI := "https://" + r.Host + r.URL.String() - http.Redirect(w, r, newURI, http.StatusFound) - } - mux := &http.ServeMux{} - mux.HandleFunc("/", handleRedirect) - - return makeServerFromMux(mux) -} - -func (o *Worker) spawnServer(port int) { - var certManager *autocert.Manager - var httpsSrv *http.Server - - mode := o.cfg.Environment.Get() - if mode.AnyOf(environment.Production, environment.Staging) { - serverConfig := o.cfg.Worker.Server - httpsSrv = makeHTTPServer() - httpsSrv.Addr = fmt.Sprintf(":%d", serverConfig.HttpsPort) - - if serverConfig.HttpsChain == "" || serverConfig.HttpsKey == "" { - serverConfig.HttpsChain = "" - serverConfig.HttpsKey = "" - - var leurl string - if mode == environment.Staging { - leurl = stagingLEURL - } else { - leurl = acme.LetsEncryptURL - } - - certManager = &autocert.Manager{ - Prompt: autocert.AcceptTOS, - Cache: autocert.DirCache("assets/cache"), - Client: &acme.Client{DirectoryURL: leurl}, - } - - httpsSrv.TLSConfig = &tls.Config{GetCertificate: certManager.GetCertificate} - } - - go func(chain string, key string) { - fmt.Printf("Starting HTTPS server on %s\n", httpsSrv.Addr) - err := httpsSrv.ListenAndServeTLS(chain, key) - if err != nil { - log.Printf("httpsSrv.ListendAndServeTLS() failed with %s", err) - } - }(serverConfig.HttpsChain, serverConfig.HttpsKey) - } - - var httpSrv *http.Server - if mode.AnyOf(environment.Production, environment.Staging) { - httpSrv = makeHTTPToHTTPSRedirectServer() - } else { - httpSrv = makeHTTPServer() - } - - if certManager != nil { - httpSrv.Handler = certManager.HTTPHandler(httpSrv.Handler) - } - - startServer(httpSrv, port) -} - -func startServer(serv *http.Server, startPort int) { - // It's recommend to run one worker on one instance. - // This logic is to make sure more than 1 workers still work - for port, n := startPort, startPort+100; port < n; port++ { - serv.Addr = ":" + strconv.Itoa(port) - err := serv.ListenAndServe() - switch err { - case http.ErrServerClosed: - log.Printf("HTTP(S) server was closed") - return - default: - } - port++ - - if port == n { - log.Printf("error: couldn't find an open port in range %v-%v\n", startPort, port) - } - } -} - -// initializeWorker setup a worker -func (o *Worker) initializeWorker() { - wrk := NewHandler(o.cfg) - +func (wrk *Worker) init() { + h := NewHandler(wrk.conf, wrk) defer func() { - log.Println("Close worker") - wrk.Close() + log.Printf("[worker] Closing handler") + h.Close() }() - go wrk.Run() - // will block here - wrk.Prepare() - - o.spawnServer(o.cfg.Worker.Server.Port) + go h.Run() + if !wrk.conf.Loaded { + wrk.lock.LockFor(time.Second * 10) + h.RequestConfig() + } + h.Prepare() + wrk.spawnServer(wrk.conf.Worker.Server.Port) +} + +func (wrk *Worker) startModules() { + glog.Info(wrk.servers) + for _, s := range wrk.servers { + s := s + go func() { + if err := s.Init(wrk.conf); err != nil { + glog.Errorf("failed server init") + return + } + if err := s.Run(); err != nil { + glog.Errorf("failed start server") + } + }() + } +} + +// !to add a proper HTTP(S) server shutdown (cws/handler bad loop) +func (wrk *Worker) Shutdown() { + for _, s := range wrk.servers { + if err := s.Shutdown(wrk.ctx); err != nil { + glog.Errorln("failed server shutdown") + } + } } diff --git a/tests/e2e/main_test.go b/tests/e2e/main_test.go index fb282df9..dffc7cad 100644 --- a/tests/e2e/main_test.go +++ b/tests/e2e/main_test.go @@ -85,7 +85,7 @@ package e2e // // fmt.Println("Sending offer...") // client.Send(cws.WSPacket{ -// ID: "initwebrtc", +// ID: "init_webrtc", // Data: gamertc.Encode(offer), // }, nil) // fmt.Println("Waiting sdp...") @@ -127,7 +127,7 @@ package e2e // } // log.Println("return offer") // return cws.WSPacket{ -// ID: "initwebrtc", +// ID: "init_webrtc", // Data: gamertc.Encode(offer), // } // }) diff --git a/web/js/network/rtcp.js b/web/js/network/rtcp.js index 710c9015..8546586c 100644 --- a/web/js/network/rtcp.js +++ b/web/js/network/rtcp.js @@ -36,7 +36,7 @@ const rtcp = (() => { inputChannel.onclose = () => log.debug('[rtcp] the input channel has closed'); } - addVoiceStream(connection) + // addVoiceStream(connection) connection.oniceconnectionstatechange = ice.onIceConnectionStateChange; connection.onicegatheringstatechange = ice.onIceStateChange; @@ -45,6 +45,10 @@ const rtcp = (() => { mediaStream.addTrack(event.track); } + socket.send({ + 'id': 'init_webrtc', + 'data': JSON.stringify({'is_mobile': env.isMobileDevice()}), + }); }; async function addVoiceStream(connection) { @@ -64,17 +68,12 @@ const rtcp = (() => { } finally { socket.send({ - 'id': 'initwebrtc', + 'id': 'init_webrtc', 'data': JSON.stringify({'is_mobile': env.isMobileDevice()}), }); } } - const popup = (msg) => { - popupBox.html(msg); - popupBox.fadeIn().delay(0).fadeOut(); - }; - const ice = (() => { let isGatheringDone = false; let timeForIceGathering; @@ -89,7 +88,7 @@ const rtcp = (() => { candidate = JSON.stringify(event.candidate); log.info(`[rtcp] got ice candidate: ${candidate}`); socket.send({ - 'id': 'candidate', + 'id': 'ice_candidate', 'data': btoa(candidate), }) } diff --git a/web/js/network/socket.js b/web/js/network/socket.js index 02800a87..480850a1 100644 --- a/web/js/network/socket.js +++ b/web/js/network/socket.js @@ -63,7 +63,7 @@ const socket = (() => { // this is offer from worker event.pub(MEDIA_STREAM_SDP_AVAILABLE, {sdp: data.data}); break; - case 'candidate': + case 'ice_candidate': event.pub(MEDIA_STREAM_CANDIDATE_ADD, {candidate: data.data}); break; case 'heartbeat': @@ -78,7 +78,7 @@ const socket = (() => { case 'load': event.pub(GAME_LOADED); break; - case 'playerIdx': + case 'player_index': event.pub(GAME_PLAYER_IDX, data.data); break; case 'checkLatency': @@ -103,7 +103,7 @@ const socket = (() => { }); const saveGame = () => send({"id": "save", "data": ""}); const loadGame = () => send({"id": "load", "data": ""}); - const updatePlayerIndex = (idx) => send({"id": "playerIdx", "data": idx.toString()}); + const updatePlayerIndex = (idx) => send({"id": "player_index", "data": idx.toString()}); const startGame = (gameName, isMobile, roomId, playerIndex) => send({ "id": "start", "data": JSON.stringify({