diff --git a/cmd/coordinator/main.go b/cmd/coordinator/main.go index ce094f6a..95609982 100644 --- a/cmd/coordinator/main.go +++ b/cmd/coordinator/main.go @@ -19,10 +19,14 @@ func main() { if log.GetLevel() < logger.InfoLevel { log.Debug().Msgf("conf: %+v", conf) } - c := coordinator.New(conf, log) + c, err := coordinator.New(conf, log) + if err != nil { + log.Error().Err(err).Msgf("init fail") + return + } c.Start() <-os.ExpectTermination() if err := c.Stop(); err != nil { - log.Error().Err(err).Msg("service shutdown errors") + log.Error().Err(err).Msg("shutdown fail") } } diff --git a/cmd/worker/main.go b/cmd/worker/main.go index f6157d09..a20cd819 100644 --- a/cmd/worker/main.go +++ b/cmd/worker/main.go @@ -24,15 +24,17 @@ func run() { } done := os.ExpectTermination() - wrk := worker.New(conf, log, done) - wrk.Start() + w, err := worker.New(conf, log) + if err != nil { + log.Error().Err(err).Msgf("init fail") + return + } + w.Start(done) <-done - time.Sleep(100 * time.Millisecond) - if err := wrk.Stop(); err != nil { - log.Error().Err(err).Msg("service shutdown errors") + time.Sleep(100 * time.Millisecond) // hack + if err := w.Stop(); err != nil { + log.Error().Err(err).Msg("shutdown fail") } } -func main() { - thread.Wrap(run) -} +func main() { thread.Wrap(run) } diff --git a/pkg/coordinator/coordinator.go b/pkg/coordinator/coordinator.go index c3601666..b2614ac4 100644 --- a/pkg/coordinator/coordinator.go +++ b/pkg/coordinator/coordinator.go @@ -1,6 +1,8 @@ package coordinator import ( + "errors" + "fmt" "html/template" "net/http" "strings" @@ -10,27 +12,55 @@ import ( "github.com/giongto35/cloud-game/v3/pkg/logger" "github.com/giongto35/cloud-game/v3/pkg/monitoring" "github.com/giongto35/cloud-game/v3/pkg/network/httpx" - "github.com/giongto35/cloud-game/v3/pkg/service" ) -func New(conf config.CoordinatorConfig, log *logger.Logger) (services service.Group) { +type Coordinator struct { + hub *Hub + services [2]runnable +} + +type runnable interface { + Run() + Stop() error +} + +func New(conf config.CoordinatorConfig, log *logger.Logger) (*Coordinator, error) { + coordinator := &Coordinator{} lib := games.NewLib(conf.Coordinator.Library, conf.Emulator, log) lib.Scan() - hub := NewHub(conf, lib, log) + coordinator.hub = NewHub(conf, lib, log) h, err := NewHTTPServer(conf, log, func(mux *httpx.Mux) *httpx.Mux { - mux.HandleFunc("/ws", hub.handleUserConnection()) - mux.HandleFunc("/wso", hub.handleWorkerConnection()) + mux.HandleFunc("/ws", coordinator.hub.handleUserConnection()) + mux.HandleFunc("/wso", coordinator.hub.handleWorkerConnection()) return mux }) if err != nil { - log.Error().Err(err).Msg("http server init fail") - return + return nil, fmt.Errorf("http init fail: %w", err) } - services.Add(hub, h) + coordinator.services[0] = h if conf.Coordinator.Monitoring.IsEnabled() { - services.Add(monitoring.New(conf.Coordinator.Monitoring, h.GetHost(), log)) + coordinator.services[1] = monitoring.New(conf.Coordinator.Monitoring, h.GetHost(), log) } - return + return coordinator, nil +} + +func (c *Coordinator) Start() { + for _, s := range c.services { + if s != nil { + s.Run() + } + } +} + +func (c *Coordinator) Stop() error { + var err error + for _, s := range c.services { + if s != nil { + err0 := s.Stop() + err = errors.Join(err, err0) + } + } + return err } func NewHTTPServer(conf config.CoordinatorConfig, log *logger.Logger, fnMux func(*httpx.Mux) *httpx.Mux) (*httpx.Server, error) { diff --git a/pkg/service/service.go b/pkg/service/service.go deleted file mode 100644 index 48c3f063..00000000 --- a/pkg/service/service.go +++ /dev/null @@ -1,46 +0,0 @@ -package service - -import "fmt" - -// Service defines a generic service. -type Service any - -// RunnableService defines a service that can be run. -type RunnableService interface { - Service - - Run() - Stop() error -} - -// Group is a container for managing a bunch of services. -type Group struct { - list []Service -} - -func (g *Group) Add(services ...Service) { g.list = append(g.list, services...) } - -// Start starts each service in the group. -func (g *Group) Start() { - for _, s := range g.list { - if v, ok := s.(RunnableService); ok { - v.Run() - } - } -} - -// Stop terminates a group of services. -func (g *Group) Stop() (err error) { - var errs []error - for _, s := range g.list { - if v, ok := s.(RunnableService); ok { - if err := v.Stop(); err != nil { - errs = append(errs, fmt.Errorf("error: failed to stop [%s] because of %v", s, err)) - } - } - } - if len(errs) > 0 { - err = fmt.Errorf("%s", errs) - } - return -} diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go index 0af214cb..99dea997 100644 --- a/pkg/worker/worker.go +++ b/pkg/worker/worker.go @@ -1,32 +1,41 @@ package worker import ( + "errors" + "fmt" "time" "github.com/giongto35/cloud-game/v3/pkg/config" "github.com/giongto35/cloud-game/v3/pkg/logger" "github.com/giongto35/cloud-game/v3/pkg/monitoring" "github.com/giongto35/cloud-game/v3/pkg/network/httpx" - "github.com/giongto35/cloud-game/v3/pkg/service" "github.com/giongto35/cloud-game/v3/pkg/worker/emulator/libretro/manager/remotehttp" ) type Worker struct { - address string - conf config.WorkerConfig - cord *coordinator - log *logger.Logger - router Router - storage CloudStorage - done chan struct{} + address string + conf config.WorkerConfig + cord *coordinator + log *logger.Logger + router Router + services [2]runnable + storage CloudStorage +} + +type runnable interface { + Run() + Stop() error } const retry = 10 * time.Second -func New(conf config.WorkerConfig, log *logger.Logger, done chan struct{}) (services service.Group) { +func New(conf config.WorkerConfig, log *logger.Logger) (*Worker, error) { if err := remotehttp.CheckCores(conf.Emulator, log); err != nil { - log.Error().Err(err).Msg("cores sync error") + log.Warn().Err(err).Msgf("a Libretro cores sync fail") } + + worker := &Worker{conf: conf, log: log, router: NewRouter()} + h, err := httpx.NewServer( conf.Worker.GetAddr(), func(s *httpx.Server) httpx.Handler { @@ -36,30 +45,34 @@ func New(conf config.WorkerConfig, log *logger.Logger, done chan struct{}) (serv }) }, httpx.WithServerConfig(conf.Worker.Server), - // no need just for one route httpx.HttpsRedirect(false), httpx.WithPortRoll(true), httpx.WithZone(conf.Worker.Network.Zone), httpx.WithLogger(log), ) if err != nil { - log.Error().Err(err).Msg("http init fail") - return + return nil, fmt.Errorf("http init fail: %w", err) } - services.Add(h) + worker.address = h.Addr + worker.services[0] = h if conf.Worker.Monitoring.IsEnabled() { - services.Add(monitoring.New(conf.Worker.Monitoring, h.GetHost(), log)) + worker.services[1] = monitoring.New(conf.Worker.Monitoring, h.GetHost(), log) } st, err := GetCloudStorage(conf.Storage.Provider, conf.Storage.Key) if err != nil { - log.Error().Err(err).Msgf("cloud storage fail, using dummy cloud storage instead") + log.Warn().Err(err).Msgf("cloud storage fail, using dummy cloud storage instead") } - services.Add(&Worker{address: h.Addr, conf: conf, done: done, log: log, storage: st, router: NewRouter()}) + worker.storage = st - return + return worker, nil } -func (w *Worker) Run() { +func (w *Worker) Start(done chan struct{}) { + for _, s := range w.services { + if s != nil { + s.Run() + } + } go func() { remoteAddr := w.conf.Worker.Network.CoordinatorAddress defer func() { @@ -67,17 +80,16 @@ func (w *Worker) Run() { w.cord.Disconnect() } w.router.Close() - w.log.Debug().Msgf("Service loop end") }() for { select { - case <-w.done: + case <-done: return default: cord, err := newCoordinatorConnection(remoteAddr, w.conf.Worker, w.address, w.log) if err != nil { - w.log.Error().Err(err).Msgf("no connection: %v. Retrying in %v", remoteAddr, retry) + w.log.Warn().Err(err).Msgf("no connection: %v. Retrying in %v", remoteAddr, retry) time.Sleep(retry) continue } @@ -89,4 +101,14 @@ func (w *Worker) Run() { } }() } -func (w *Worker) Stop() error { return nil } + +func (w *Worker) Stop() error { + var err error + for _, s := range w.services { + if s != nil { + err0 := s.Stop() + err = errors.Join(err, err0) + } + } + return err +}