cloud-game/pkg/coordinator/worker.go
2025-12-24 21:23:19 +03:00

191 lines
4.4 KiB
Go

package coordinator
import (
"errors"
"fmt"
"sync/atomic"
"github.com/giongto35/cloud-game/v3/pkg/api"
"github.com/giongto35/cloud-game/v3/pkg/com"
"github.com/giongto35/cloud-game/v3/pkg/logger"
)
type Worker struct {
AppLibrary
Connection
RegionalClient
Session
slotted
Addr string
PingServer string
Port string
RoomId string // room reference
Tag string
Zone string
Lib []api.GameInfo
Sessions map[string]struct{}
log *logger.Logger
}
type RegionalClient interface {
In(region string) bool
}
type HasUserRegistry interface {
Find(id string) *User
}
type AppLibrary interface {
SetLib([]api.GameInfo)
AppNames() []api.GameInfo
}
type Session interface {
AddSession(id string)
// HadSession is true when an old session is found
HadSession(id string) bool
SetSessions(map[string]struct{})
}
type AppMeta struct {
Alias string
Base string
Name string
Path string
System string
Type string
}
func NewWorker(sock *com.Connection, handshake api.ConnectionRequest[com.Uid], log *logger.Logger) *Worker {
conn := com.NewConnection[api.PT, api.In[com.Uid], api.Out, *api.Out](sock, handshake.Id, log)
return &Worker{
Connection: conn,
Addr: handshake.Addr,
PingServer: handshake.PingURL,
Port: handshake.Port,
Tag: handshake.Tag,
Zone: handshake.Zone,
log: log.Extend(log.With().
Str(logger.ClientField, logger.MarkNone).
Str(logger.DirectionField, logger.MarkNone).
Str("cid", conn.Id().Short())),
}
}
func (w *Worker) HandleRequests(users HasUserRegistry) chan struct{} {
return w.ProcessPackets(func(p api.In[com.Uid]) (err error) {
switch p.T {
case api.RegisterRoom:
err = api.Do(p, func(d api.RegisterRoomRequest) {
w.log.Info().Msgf("set room [%v] = %v", w.Id(), d)
w.HandleRegisterRoom(d)
})
case api.CloseRoom:
err = api.Do(p, w.HandleCloseRoom)
case api.IceCandidate:
err = api.DoE(p, func(d api.WebrtcIceCandidateRequest) error {
return w.HandleIceCandidate(d, users)
})
case api.LibNewGameList:
err = api.DoE(p, w.HandleLibGameList)
case api.PrevSessions:
err = api.DoE(p, w.HandlePrevSessionList)
default:
w.log.Warn().Msgf("Unknown packet: %+v", p)
}
if err != nil && !errors.Is(err, api.ErrMalformed) {
w.log.Error().Err(err).Send()
err = api.ErrMalformed
}
return
})
}
func (w *Worker) SetLib(list []api.GameInfo) { w.Lib = list }
func (w *Worker) AppNames() []api.GameInfo {
return w.Lib
}
func (w *Worker) AddSession(id string) {
// sessions can be uninitialized until the coordinator pushes them to the worker
if w.Sessions == nil {
return
}
w.Sessions[id] = struct{}{}
}
func (w *Worker) HadSession(id string) bool {
_, ok := w.Sessions[id]
return ok
}
func (w *Worker) SetSessions(sessions map[string]struct{}) {
w.Sessions = sessions
}
// In say whether some worker from this region (zone).
// Empty region always returns true.
func (w *Worker) In(region string) bool { return region == "" || region == w.Zone }
// slotted used for tracking user slots and the availability.
type slotted int32
// HasSlot checks if the current worker has a free slot to start a new game.
// Workers support only one game at a time, so it returns true in case if
// there are no players in the room (worker).
func (s *slotted) HasSlot() bool { return atomic.LoadInt32((*int32)(s)) == 0 }
// TryReserve reserves the slot only when it's free.
func (s *slotted) TryReserve() bool {
for {
current := atomic.LoadInt32((*int32)(s))
if current != 0 {
return false
}
if atomic.CompareAndSwapInt32((*int32)(s), 0, 1) {
return true
}
}
}
// UnReserve decrements user counter of the worker.
func (s *slotted) UnReserve() {
for {
current := atomic.LoadInt32((*int32)(s))
if current <= 0 {
// reset to zero
if current < 0 {
if atomic.CompareAndSwapInt32((*int32)(s), current, 0) {
return
}
continue
}
return
}
// Regular decrement for positive values
newVal := current - 1
if atomic.CompareAndSwapInt32((*int32)(s), current, newVal) {
return
}
}
}
func (s *slotted) FreeSlots() { atomic.StoreInt32((*int32)(s), 0) }
func (w *Worker) Disconnect() {
w.Connection.Disconnect()
w.RoomId = ""
w.FreeSlots()
}
func (w *Worker) PrintInfo() string {
return fmt.Sprintf("id: %v, addr: %v, port: %v, zone: %v, ping addr: %v, tag: %v",
w.Id(), w.Addr, w.Port, w.Zone, w.PingServer, w.Tag)
}