cloud-game/pkg/worker/worker.go
2024-12-05 01:10:16 +03:00

144 lines
3.4 KiB
Go

package worker
import (
"errors"
"fmt"
"time"
"github.com/giongto35/cloud-game/v3/pkg/config"
"github.com/giongto35/cloud-game/v3/pkg/games"
"github.com/giongto35/cloud-game/v3/pkg/logger"
"github.com/giongto35/cloud-game/v3/pkg/monitoring"
"github.com/giongto35/cloud-game/v3/pkg/network"
"github.com/giongto35/cloud-game/v3/pkg/network/httpx"
"github.com/giongto35/cloud-game/v3/pkg/worker/caged"
"github.com/giongto35/cloud-game/v3/pkg/worker/cloud"
"github.com/giongto35/cloud-game/v3/pkg/worker/room"
)
type Worker struct {
address string
conf config.WorkerConfig
cord *coordinator
lib games.GameLibrary
launcher games.Launcher
log *logger.Logger
mana *caged.Manager
router *room.GameRouter
services [3]interface {
Run()
Stop() error
}
storage cloud.Storage
}
func New(conf config.WorkerConfig, log *logger.Logger) (*Worker, error) {
manager := caged.NewManager(log)
if err := manager.Load(caged.Libretro, conf); err != nil {
return nil, fmt.Errorf("couldn't cage libretro: %v", err)
}
library := games.NewLib(conf.Library, conf.Emulator, log)
library.Scan()
worker := &Worker{
conf: conf,
lib: library,
launcher: games.NewGameLauncher(library),
log: log,
mana: manager,
router: room.NewGameRouter(),
}
h, err := httpx.NewServer(
conf.Worker.GetAddr(),
func(s *httpx.Server) httpx.Handler {
return s.Mux().HandleW(conf.Worker.Network.PingEndpoint, func(w httpx.ResponseWriter) {
w.Header().Set("Access-Control-Allow-Origin", "*")
_, _ = w.Write([]byte{0x65, 0x63, 0x68, 0x6f}) // echo
})
},
httpx.WithServerConfig(conf.Worker.Server),
httpx.HttpsRedirect(false),
httpx.WithPortRoll(true),
httpx.WithZone(conf.Worker.Network.Zone),
httpx.WithLogger(log),
)
if err != nil {
return nil, fmt.Errorf("http init fail: %w", err)
}
worker.address = h.Addr
worker.services[0] = h
if conf.Worker.Monitoring.IsEnabled() {
worker.services[1] = monitoring.New(conf.Worker.Monitoring, h.GetHost(), log)
}
st, err := cloud.Store(conf.Storage, log)
if err != nil {
log.Warn().Err(err).Msgf("cloud storage fail, using no storage")
}
worker.storage = st
worker.services[2] = NewWatcher(30*time.Minute, worker.router, log)
return worker, nil
}
func (w *Worker) Reset() { w.router.Reset() }
func (w *Worker) Start(done chan struct{}) {
for _, s := range w.services {
if s != nil {
s.Run()
}
}
// !to restore alive worker info when coordinator connection was lost
retry := network.NewRetry()
onRetryFail := func(err error) {
w.log.Warn().Err(err).Msgf("socket fail. Retrying in %v", retry.Time())
retry.Fail().Multiply(2)
}
go func() {
remoteAddr := w.conf.Worker.Network.CoordinatorAddress
defer func() {
if w.cord != nil {
w.cord.Disconnect()
}
w.Reset()
}()
for {
select {
case <-done:
return
default:
w.Reset()
cord, err := newCoordinatorConnection(remoteAddr, w.conf.Worker, w.address, w.log)
if err != nil {
onRetryFail(err)
continue
}
cord.SetErrorHandler(onRetryFail)
w.cord = cord
w.cord.log.Info().Msgf("Connected to the coordinator %v", remoteAddr)
wait := w.cord.HandleRequests(w)
w.cord.SendLibrary(w)
w.cord.SendPrevSessions(w)
<-wait
retry.Success()
}
}
}()
}
func (w *Worker) Stop() error {
var err error
for _, s := range w.services {
if s != nil {
err0 := s.Stop()
err = errors.Join(err, err0)
}
}
return err
}