diff --git a/pkg/coordinator/hub.go b/pkg/coordinator/hub.go index 490747df..9e646ced 100644 --- a/pkg/coordinator/hub.go +++ b/pkg/coordinator/hub.go @@ -69,12 +69,10 @@ func (h *Hub) handleUserConnection() http.HandlerFunc { return } - bound := user.Bind(worker) - if !bound { - user.Notify(api.ErrNoFreeSlots, "") - h.log.Info().Msg("no free slots") - return - } + // Link the user to the selected worker. Slot reservation is handled later + // on game start; this keeps connections lightweight and lets deep-link + // joins share a worker without consuming its single game slot. + user.w = worker h.users.Add(user) @@ -178,12 +176,15 @@ func (h *Hub) GetServerList() (r []api.Server) { // various conditions. func (h *Hub) findWorkerFor(usr *User, q url.Values, log *logger.Logger) *Worker { log.Debug().Msg("Search available workers") - roomId := q.Get(api.RoomIdQueryParam) + roomIdRaw := q.Get(api.RoomIdQueryParam) + sessionId, deepRoomId := api.ExplodeDeepLink(roomIdRaw) + roomId := roomIdRaw + if deepRoomId != "" { + roomId = deepRoomId + } zone := q.Get(api.ZoneQueryParam) wid := q.Get(api.WorkerIdParam) - sessionId, _ := api.ExplodeDeepLink(roomId) - var worker *Worker if wid != "" { @@ -195,7 +196,7 @@ func (h *Hub) findWorkerFor(usr *User, q url.Values, log *logger.Logger) *Worker } } - if worker = h.findWorkerByRoom(roomId, zone); worker != nil { + if worker = h.findWorkerByRoom(roomIdRaw, roomId, zone); worker != nil { log.Debug().Str("room", roomId).Msg("An existing worker has been found") } else if worker = h.findWorkerByPreviousRoom(sessionId); worker != nil { log.Debug().Msgf("Worker %v with the previous room: %v is found", wid, roomId) @@ -228,13 +229,19 @@ func (h *Hub) findWorkerByPreviousRoom(id string) *Worker { return w } -func (h *Hub) findWorkerByRoom(id string, region string) *Worker { - if id == "" { +func (h *Hub) findWorkerByRoom(id string, deepId string, region string) *Worker { + if id == "" && deepId == "" { return nil } // if there is zone param, we need to ensure the worker in that zone, // if not we consider the room is missing - w, _ := h.workers.FindBy(func(w *Worker) bool { return w.RoomId == id && w.In(region) }) + w, _ := h.workers.FindBy(func(w *Worker) bool { + matchId := w.RoomId == id + if !matchId && deepId != "" { + matchId = w.RoomId == deepId + } + return matchId && w.In(region) + }) return w } diff --git a/pkg/coordinator/user.go b/pkg/coordinator/user.go index 5d1e7ed5..b9d87e7e 100644 --- a/pkg/coordinator/user.go +++ b/pkg/coordinator/user.go @@ -30,14 +30,15 @@ func NewUser(sock *com.Connection, log *logger.Logger) *User { func (u *User) Bind(w *Worker) bool { u.w = w - - return u.w.TryReserve() + // Binding only links the worker; slot reservation is handled lazily on + // game start to avoid blocking deep-link joins or parallel connections + // that haven't started a game yet. + return true } func (u *User) Disconnect() { u.Connection.Disconnect() if u.w != nil { - u.w.UnReserve() u.w.TerminateSession(u.Id()) } } diff --git a/pkg/coordinator/userhandlers.go b/pkg/coordinator/userhandlers.go index 811ce332..80d0dc6e 100644 --- a/pkg/coordinator/userhandlers.go +++ b/pkg/coordinator/userhandlers.go @@ -2,6 +2,7 @@ package coordinator import ( "sort" + "time" "github.com/giongto35/cloud-game/v3/pkg/api" "github.com/giongto35/cloud-game/v3/pkg/com" @@ -26,6 +27,55 @@ func (u *User) HandleWebrtcIceCandidate(rq api.WebrtcUserIceCandidate) { } func (u *User) HandleStartGame(rq api.GameStartUserRequest, conf config.CoordinatorConfig) { + // Worker slot / room gating: + // - If the worker is BUSY (no free slot), we must not create another room. + // * If the worker has already reported a room id, only allow requests + // for that same room (deep-link joins / reloads). + // * If the worker hasn't reported a room yet, deny any new StartGame to + // avoid racing concurrent room creation on the worker. + // * When the user is starting a NEW game (empty room id), we give the + // worker a short grace period to close the previous room and free the + // slot before rejecting with "no slots". + // - If the worker is FREE, reserve the slot lazily before starting the + // game; the room id (if any) comes from the request / worker. + + // Grace period: when there's no room id in the request (new game) but the + // worker still appears busy, wait a bit for the previous room to close. + if rq.RoomId == "" && !u.w.HasSlot() { + const waitTotal = 3 * time.Second + const step = 100 * time.Millisecond + waited := time.Duration(0) + for waited < waitTotal { + if u.w.HasSlot() { + break + } + time.Sleep(step) + waited += step + } + } + + busy := !u.w.HasSlot() + if busy { + if u.w.RoomId == "" { + u.Notify(api.ErrNoFreeSlots, "") + return + } + if rq.RoomId == "" { + // No room id but worker is busy -> assume user wants to continue + // the existing room instead of starting a parallel game. + rq.RoomId = u.w.RoomId + } else if rq.RoomId != u.w.RoomId { + u.Notify(api.ErrNoFreeSlots, "") + return + } + } else { + // Worker is free: try to reserve the single slot for this new room. + if !u.w.TryReserve() { + u.Notify(api.ErrNoFreeSlots, "") + return + } + } + startGameResp, err := u.w.StartGame(u.Id(), rq) if err != nil || startGameResp == nil { u.log.Error().Err(err).Msg("malformed game start response") diff --git a/pkg/coordinator/workerhandlers.go b/pkg/coordinator/workerhandlers.go index 5716b621..edd7e210 100644 --- a/pkg/coordinator/workerhandlers.go +++ b/pkg/coordinator/workerhandlers.go @@ -10,6 +10,7 @@ func (w *Worker) HandleRegisterRoom(rq api.RegisterRoomRequest) { w.RoomId = str func (w *Worker) HandleCloseRoom(rq api.CloseRoomRequest) { if string(rq) == w.RoomId { w.RoomId = "" + w.FreeSlots() } }