mirror of
https://github.com/giongto35/cloud-game.git
synced 2026-01-23 10:35:44 +00:00
191 lines
4.4 KiB
Go
191 lines
4.4 KiB
Go
package coordinator
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
"sync/atomic"
|
|
|
|
"github.com/giongto35/cloud-game/v3/pkg/api"
|
|
"github.com/giongto35/cloud-game/v3/pkg/com"
|
|
"github.com/giongto35/cloud-game/v3/pkg/logger"
|
|
)
|
|
|
|
type Worker struct {
|
|
AppLibrary
|
|
Connection
|
|
RegionalClient
|
|
Session
|
|
slotted
|
|
|
|
Addr string
|
|
PingServer string
|
|
Port string
|
|
RoomId string // room reference
|
|
Tag string
|
|
Zone string
|
|
|
|
Lib []api.GameInfo
|
|
Sessions map[string]struct{}
|
|
|
|
log *logger.Logger
|
|
}
|
|
|
|
type RegionalClient interface {
|
|
In(region string) bool
|
|
}
|
|
|
|
type HasUserRegistry interface {
|
|
Find(id string) *User
|
|
}
|
|
|
|
type AppLibrary interface {
|
|
SetLib([]api.GameInfo)
|
|
AppNames() []api.GameInfo
|
|
}
|
|
|
|
type Session interface {
|
|
AddSession(id string)
|
|
// HadSession is true when an old session is found
|
|
HadSession(id string) bool
|
|
SetSessions(map[string]struct{})
|
|
}
|
|
|
|
type AppMeta struct {
|
|
Alias string
|
|
Base string
|
|
Name string
|
|
Path string
|
|
System string
|
|
Type string
|
|
}
|
|
|
|
func NewWorker(sock *com.Connection, handshake api.ConnectionRequest[com.Uid], log *logger.Logger) *Worker {
|
|
conn := com.NewConnection[api.PT, api.In[com.Uid], api.Out, *api.Out](sock, handshake.Id, log)
|
|
return &Worker{
|
|
Connection: conn,
|
|
Addr: handshake.Addr,
|
|
PingServer: handshake.PingURL,
|
|
Port: handshake.Port,
|
|
Tag: handshake.Tag,
|
|
Zone: handshake.Zone,
|
|
log: log.Extend(log.With().
|
|
Str(logger.ClientField, logger.MarkNone).
|
|
Str(logger.DirectionField, logger.MarkNone).
|
|
Str("cid", conn.Id().Short())),
|
|
}
|
|
}
|
|
|
|
func (w *Worker) HandleRequests(users HasUserRegistry) chan struct{} {
|
|
return w.ProcessPackets(func(p api.In[com.Uid]) (err error) {
|
|
switch p.T {
|
|
case api.RegisterRoom:
|
|
err = api.Do(p, func(d api.RegisterRoomRequest) {
|
|
w.log.Info().Msgf("set room [%v] = %v", w.Id(), d)
|
|
w.HandleRegisterRoom(d)
|
|
})
|
|
case api.CloseRoom:
|
|
err = api.Do(p, w.HandleCloseRoom)
|
|
case api.IceCandidate:
|
|
err = api.DoE(p, func(d api.WebrtcIceCandidateRequest) error {
|
|
return w.HandleIceCandidate(d, users)
|
|
})
|
|
case api.LibNewGameList:
|
|
err = api.DoE(p, w.HandleLibGameList)
|
|
case api.PrevSessions:
|
|
err = api.DoE(p, w.HandlePrevSessionList)
|
|
default:
|
|
w.log.Warn().Msgf("Unknown packet: %+v", p)
|
|
}
|
|
if err != nil && !errors.Is(err, api.ErrMalformed) {
|
|
w.log.Error().Err(err).Send()
|
|
err = api.ErrMalformed
|
|
}
|
|
return
|
|
})
|
|
}
|
|
|
|
func (w *Worker) SetLib(list []api.GameInfo) { w.Lib = list }
|
|
|
|
func (w *Worker) AppNames() []api.GameInfo {
|
|
return w.Lib
|
|
}
|
|
|
|
func (w *Worker) AddSession(id string) {
|
|
// sessions can be uninitialized until the coordinator pushes them to the worker
|
|
if w.Sessions == nil {
|
|
return
|
|
}
|
|
|
|
w.Sessions[id] = struct{}{}
|
|
}
|
|
|
|
func (w *Worker) HadSession(id string) bool {
|
|
_, ok := w.Sessions[id]
|
|
return ok
|
|
}
|
|
|
|
func (w *Worker) SetSessions(sessions map[string]struct{}) {
|
|
w.Sessions = sessions
|
|
}
|
|
|
|
// In say whether some worker from this region (zone).
|
|
// Empty region always returns true.
|
|
func (w *Worker) In(region string) bool { return region == "" || region == w.Zone }
|
|
|
|
// slotted used for tracking user slots and the availability.
|
|
type slotted int32
|
|
|
|
// HasSlot checks if the current worker has a free slot to start a new game.
|
|
// Workers support only one game at a time, so it returns true in case if
|
|
// there are no players in the room (worker).
|
|
func (s *slotted) HasSlot() bool { return atomic.LoadInt32((*int32)(s)) == 0 }
|
|
|
|
// TryReserve reserves the slot only when it's free.
|
|
func (s *slotted) TryReserve() bool {
|
|
for {
|
|
current := atomic.LoadInt32((*int32)(s))
|
|
if current != 0 {
|
|
return false
|
|
}
|
|
if atomic.CompareAndSwapInt32((*int32)(s), 0, 1) {
|
|
return true
|
|
}
|
|
}
|
|
}
|
|
|
|
// UnReserve decrements user counter of the worker.
|
|
func (s *slotted) UnReserve() {
|
|
for {
|
|
current := atomic.LoadInt32((*int32)(s))
|
|
if current <= 0 {
|
|
// reset to zero
|
|
if current < 0 {
|
|
if atomic.CompareAndSwapInt32((*int32)(s), current, 0) {
|
|
return
|
|
}
|
|
continue
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
// Regular decrement for positive values
|
|
newVal := current - 1
|
|
if atomic.CompareAndSwapInt32((*int32)(s), current, newVal) {
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *slotted) FreeSlots() { atomic.StoreInt32((*int32)(s), 0) }
|
|
|
|
func (w *Worker) Disconnect() {
|
|
w.Connection.Disconnect()
|
|
w.RoomId = ""
|
|
w.FreeSlots()
|
|
}
|
|
|
|
func (w *Worker) PrintInfo() string {
|
|
return fmt.Sprintf("id: %v, addr: %v, port: %v, zone: %v, ping addr: %v, tag: %v",
|
|
w.Id(), w.Addr, w.Port, w.Zone, w.PingServer, w.Tag)
|
|
}
|