From cbabe69f304301bbe4b24b613ca508975f7aa82b Mon Sep 17 00:00:00 2001 From: sergystepanov Date: Mon, 5 Jul 2021 19:11:12 +0300 Subject: [PATCH] Track connected user count of workers. (#328) If a worker (game) is shared, then without tracking players it's impossible to tell when it becomes available for a new game which leads to emulator share and crashes. --- pkg/coordinator/handlers.go | 11 +++++------ pkg/coordinator/worker.go | 32 ++++++++++++++++++++++++++++---- 2 files changed, 33 insertions(+), 10 deletions(-) diff --git a/pkg/coordinator/handlers.go b/pkg/coordinator/handlers.go index 51789f6e..f535d46c 100644 --- a/pkg/coordinator/handlers.go +++ b/pkg/coordinator/handlers.go @@ -217,7 +217,9 @@ func (o *Server) WS(w http.ResponseWriter, r *http.Request) { // Assign available worker to browserClient bc.WorkerID = wc.WorkerID - wc.IsAvailable = false + + wc.ChangeUserQuantityBy(1) + defer wc.ChangeUserQuantityBy(-1) // Everything is cool // Attach to Server instance with sessionID @@ -237,9 +239,6 @@ func (o *Server) WS(w http.ResponseWriter, r *http.Request) { // Notify worker to clean session wc.Send(api.TerminateSessionPacket(sessionID), nil) - - // WorkerClient become available again - wc.IsAvailable = true } func (o *Server) getBestWorkerClient(client *BrowserClient, zone string) (*WorkerClient, error) { @@ -269,7 +268,7 @@ func (o *Server) getBestWorkerClient(client *BrowserClient, zone string) (*Worke func (o *Server) getAvailableWorkers() map[string]*WorkerClient { workerClients := map[string]*WorkerClient{} for k, w := range o.workerClients { - if w.IsAvailable { + if w.HasGameSlot() { workerClients[k] = w } } @@ -280,7 +279,7 @@ func (o *Server) getAvailableWorkers() map[string]*WorkerClient { // getWorkerFromAddress returns the worker has given address func (o *Server) getWorkerFromAddress(address string) *WorkerClient { for _, w := range o.workerClients { - if w.IsAvailable && w.Address == address { + if w.HasGameSlot() && w.Address == address { return w } } diff --git a/pkg/coordinator/worker.go b/pkg/coordinator/worker.go index f074f261..ca5cef11 100644 --- a/pkg/coordinator/worker.go +++ b/pkg/coordinator/worker.go @@ -3,6 +3,7 @@ package coordinator import ( "fmt" "log" + "sync" "github.com/giongto35/cloud-game/v2/pkg/cws" "github.com/gorilla/websocket" @@ -16,20 +17,43 @@ type WorkerClient struct { // public server used for ping check (Cannot use worker address because they are not publicly exposed) PingServer string StunTurnServer string - IsAvailable bool + userCount int // may be atomic Zone string + + mu sync.Mutex } // NewWorkerClient returns a client connecting to worker. // This connection exchanges information between workers and server. func NewWorkerClient(c *websocket.Conn, workerID string) *WorkerClient { return &WorkerClient{ - Client: cws.NewClient(c), - WorkerID: workerID, - IsAvailable: true, + Client: cws.NewClient(c), + WorkerID: workerID, } } +// ChangeUserQuantityBy increases or decreases the total amount of +// users connected to the current worker. +// We count users to determine when the worker becomes new game ready. +func (wc *WorkerClient) ChangeUserQuantityBy(n int) { + wc.mu.Lock() + wc.userCount += n + // just to be on a safe side + if wc.userCount < 0 { + wc.userCount = 0 + } + wc.mu.Unlock() +} + +// HasGameSlot tells whether the current worker has a +// free slot to start a new game. +// Workers support only one game at a time. +func (wc *WorkerClient) HasGameSlot() bool { + wc.mu.Lock() + defer wc.mu.Unlock() + return wc.userCount == 0 +} + func (wc *WorkerClient) Printf(format string, args ...interface{}) { log.Printf(fmt.Sprintf("Worker %s] %s", wc.WorkerID, format), args...) }