cloud-game/pkg/coordinator/hub.go
2025-12-15 18:42:41 +03:00

337 lines
8.6 KiB
Go

package coordinator
import (
"bytes"
"encoding/base64"
"fmt"
"net/http"
"net/url"
"github.com/giongto35/cloud-game/v3/pkg/api"
"github.com/giongto35/cloud-game/v3/pkg/com"
"github.com/giongto35/cloud-game/v3/pkg/config"
"github.com/giongto35/cloud-game/v3/pkg/logger"
)
type Connection interface {
Disconnect()
Id() com.Uid
ProcessPackets(func(api.In[com.Uid]) error) chan struct{}
Send(api.PT, any) ([]byte, error)
Notify(api.PT, any)
}
type Hub struct {
conf config.CoordinatorConfig
log *logger.Logger
users com.NetMap[com.Uid, *User]
workers com.NetMap[com.Uid, *Worker]
}
func NewHub(conf config.CoordinatorConfig, log *logger.Logger) *Hub {
return &Hub{
conf: conf,
users: com.NewNetMap[com.Uid, *User](),
workers: com.NewNetMap[com.Uid, *Worker](),
log: log,
}
}
// handleUserConnection handles all connections from user/frontend.
func (h *Hub) handleUserConnection() http.HandlerFunc {
var connector com.Server
connector.Origin(h.conf.Coordinator.Origin.UserWs)
log := h.log.Extend(h.log.With().
Str(logger.ClientField, "u").
Str(logger.DirectionField, logger.MarkIn),
)
return func(w http.ResponseWriter, r *http.Request) {
h.log.Debug().Msgf("Handshake %v", r.Host)
conn, err := connector.Connect(w, r)
if err != nil {
h.log.Error().Err(err).Msg("user connection fail")
return
}
user := NewUser(conn, log)
defer h.users.RemoveDisconnect(user)
done := user.HandleRequests(h, h.conf)
params := r.URL.Query()
worker := h.findWorkerFor(user, params, h.log.Extend(h.log.With().Str("cid", user.Id().Short())))
if worker == nil {
user.Notify(api.ErrNoFreeSlots, "")
h.log.Info().Msg("no free workers")
return
}
// Link the user to the selected worker. Slot reservation is handled later
// on game start; this keeps connections lightweight and lets deep-link
// joins share a worker without consuming its single game slot.
user.w = worker
h.users.Add(user)
apps := worker.AppNames()
list := make([]api.AppMeta, len(apps))
for i := range apps {
list[i] = api.AppMeta{Alias: apps[i].Alias, Title: apps[i].Name, System: apps[i].System}
}
user.InitSession(worker.Id().String(), h.conf.Webrtc.IceServers, list)
log.Info().Str(logger.DirectionField, logger.MarkPlus).Msgf("user %s", user.Id())
<-done
}
}
func RequestToHandshake(data string) (*api.ConnectionRequest[com.Uid], error) {
if data == "" {
return nil, api.ErrMalformed
}
handshake, err := api.UnwrapChecked[api.ConnectionRequest[com.Uid]](base64.URLEncoding.DecodeString(data))
if err != nil || handshake == nil {
return nil, fmt.Errorf("%w (%v)", err, handshake)
}
return handshake, nil
}
// handleWorkerConnection handles all connections from a new worker to coordinator.
func (h *Hub) handleWorkerConnection() http.HandlerFunc {
var connector com.Server
connector.Origin(h.conf.Coordinator.Origin.WorkerWs)
log := h.log.Extend(h.log.With().
Str(logger.ClientField, "w").
Str(logger.DirectionField, logger.MarkIn),
)
h.log.Debug().Msgf("WS max message size: %vb", h.conf.Coordinator.MaxWsSize)
return func(w http.ResponseWriter, r *http.Request) {
h.log.Debug().Msgf("Handshake %v", r.Host)
handshake, err := RequestToHandshake(r.URL.Query().Get(api.DataQueryParam))
if err != nil {
h.log.Error().Err(err).Msg("handshake fail")
return
}
if handshake.PingURL == "" {
h.log.Warn().Msg("Ping address is not set")
}
if h.conf.Coordinator.Server.Https && !handshake.IsHTTPS {
h.log.Warn().Msg("Unsecure worker connection. Unsecure to secure may be bad.")
}
// set connection uid from the handshake
if handshake.Id != com.NilUid {
h.log.Debug().Msgf("Worker uid will be set to %v", handshake.Id)
}
conn, err := connector.Connect(w, r)
if err != nil {
log.Error().Err(err).Msg("worker connection fail")
return
}
conn.SetMaxReadSize(h.conf.Coordinator.MaxWsSize)
worker := NewWorker(conn, *handshake, log)
defer h.workers.RemoveDisconnect(worker)
done := worker.HandleRequests(&h.users)
h.workers.Add(worker)
log.Info().
Str(logger.DirectionField, logger.MarkPlus).
Msgf("worker %s", worker.PrintInfo())
<-done
}
}
func (h *Hub) GetServerList() (r []api.Server) {
debug := h.conf.Coordinator.Debug
for w := range h.workers.Values() {
server := api.Server{
Addr: w.Addr,
Id: w.Id(),
IsBusy: !w.HasSlot(),
Machine: string(w.Id().Machine()),
PingURL: w.PingServer,
Port: w.Port,
Tag: w.Tag,
Zone: w.Zone,
}
if debug {
server.Room = w.RoomId
}
r = append(r, server)
}
return
}
// findWorkerFor searches a free worker for the user depending on
// various conditions.
func (h *Hub) findWorkerFor(usr *User, q url.Values, log *logger.Logger) *Worker {
log.Debug().Msg("Search available workers")
roomIdRaw := q.Get(api.RoomIdQueryParam)
sessionId, deepRoomId := api.ExplodeDeepLink(roomIdRaw)
roomId := roomIdRaw
if deepRoomId != "" {
roomId = deepRoomId
}
zone := q.Get(api.ZoneQueryParam)
wid := q.Get(api.WorkerIdParam)
var worker *Worker
if wid != "" {
if worker = h.findWorkerById(wid, h.conf.Coordinator.Debug); worker != nil {
log.Debug().Msgf("Worker with id: %v has been found", wid)
return worker
} else {
return nil
}
}
if worker = h.findWorkerByRoom(roomIdRaw, roomId, zone); worker != nil {
log.Debug().Str("room", roomId).Msg("An existing worker has been found")
} else if worker = h.findWorkerByPreviousRoom(sessionId); worker != nil {
log.Debug().Msgf("Worker %v with the previous room: %v is found", wid, roomId)
} else {
switch h.conf.Coordinator.Selector {
case config.SelectByPing:
log.Debug().Msgf("Searching fastest free worker...")
if worker = h.findFastestWorker(zone,
func(servers []string) (map[string]int64, error) { return usr.CheckLatency(servers) }); worker != nil {
log.Debug().Msg("The fastest worker has been found")
}
default:
log.Debug().Msgf("Searching any free worker...")
if worker = h.find1stFreeWorker(zone); worker != nil {
log.Debug().Msgf("Found next free worker")
}
}
}
return worker
}
func (h *Hub) findWorkerByPreviousRoom(id string) *Worker {
if id == "" {
return nil
}
w, _ := h.workers.FindBy(func(w *Worker) bool {
// session and room id are the same
return w.HadSession(id) && w.HasSlot()
})
return w
}
func (h *Hub) findWorkerByRoom(id string, deepId string, region string) *Worker {
if id == "" && deepId == "" {
return nil
}
// if there is zone param, we need to ensure the worker in that zone,
// if not we consider the room is missing
w, _ := h.workers.FindBy(func(w *Worker) bool {
matchId := w.RoomId == id
if !matchId && deepId != "" {
matchId = w.RoomId == deepId
}
return matchId && w.In(region)
})
return w
}
func (h *Hub) getAvailableWorkers(region string) []*Worker {
var workers []*Worker
for w := range h.workers.Values() {
if w.HasSlot() && w.In(region) {
workers = append(workers, w)
}
}
return workers
}
func (h *Hub) find1stFreeWorker(region string) *Worker {
workers := h.getAvailableWorkers(region)
if len(workers) > 0 {
return workers[0]
}
return nil
}
// findFastestWorker returns the best server for a session.
// All workers addresses are sent to user and user will ping to get latency.
// !to rewrite
func (h *Hub) findFastestWorker(region string, fn func(addresses []string) (map[string]int64, error)) *Worker {
workers := h.getAvailableWorkers(region)
if len(workers) == 0 {
return nil
}
var addresses []string
group := map[string][]struct{}{}
for _, w := range workers {
if _, ok := group[w.PingServer]; !ok {
addresses = append(addresses, w.PingServer)
}
group[w.PingServer] = append(group[w.PingServer], struct{}{})
}
latencies, err := fn(addresses)
if len(latencies) == 0 || err != nil {
return nil
}
workers = h.getAvailableWorkers(region)
if len(workers) == 0 {
return nil
}
var bestWorker *Worker
var minLatency int64 = 1<<31 - 1
// get a worker with the lowest latency
for addr, ping := range latencies {
if ping < minLatency {
for _, w := range workers {
if w.PingServer == addr {
bestWorker = w
}
}
minLatency = ping
}
}
return bestWorker
}
func (h *Hub) findWorkerById(id string, useAllWorkers bool) *Worker {
if id == "" {
return nil
}
uid, err := com.UidFromString(id)
if err != nil {
return nil
}
for _, w := range h.getAvailableWorkers("") {
if w.Id() == com.NilUid {
continue
}
if useAllWorkers {
if uid == w.Id() {
return w
}
} else {
// select any worker on the same machine when workers are grouped on the client
if bytes.Equal(uid.Machine(), w.Id().Machine()) {
return w
}
}
}
return nil
}