Remove service package

This commit is contained in:
Sergey Stepanov 2023-07-10 11:56:33 +03:00
parent 7c0a2051d4
commit 7fe3a893f6
No known key found for this signature in database
GPG key ID: A56B4929BAA8556B
5 changed files with 101 additions and 89 deletions

View file

@ -19,10 +19,14 @@ func main() {
if log.GetLevel() < logger.InfoLevel {
log.Debug().Msgf("conf: %+v", conf)
}
c := coordinator.New(conf, log)
c, err := coordinator.New(conf, log)
if err != nil {
log.Error().Err(err).Msgf("init fail")
return
}
c.Start()
<-os.ExpectTermination()
if err := c.Stop(); err != nil {
log.Error().Err(err).Msg("service shutdown errors")
log.Error().Err(err).Msg("shutdown fail")
}
}

View file

@ -24,15 +24,17 @@ func run() {
}
done := os.ExpectTermination()
wrk := worker.New(conf, log, done)
wrk.Start()
w, err := worker.New(conf, log)
if err != nil {
log.Error().Err(err).Msgf("init fail")
return
}
w.Start(done)
<-done
time.Sleep(100 * time.Millisecond)
if err := wrk.Stop(); err != nil {
log.Error().Err(err).Msg("service shutdown errors")
time.Sleep(100 * time.Millisecond) // hack
if err := w.Stop(); err != nil {
log.Error().Err(err).Msg("shutdown fail")
}
}
func main() {
thread.Wrap(run)
}
func main() { thread.Wrap(run) }

View file

@ -1,6 +1,8 @@
package coordinator
import (
"errors"
"fmt"
"html/template"
"net/http"
"strings"
@ -10,27 +12,55 @@ import (
"github.com/giongto35/cloud-game/v3/pkg/logger"
"github.com/giongto35/cloud-game/v3/pkg/monitoring"
"github.com/giongto35/cloud-game/v3/pkg/network/httpx"
"github.com/giongto35/cloud-game/v3/pkg/service"
)
func New(conf config.CoordinatorConfig, log *logger.Logger) (services service.Group) {
type Coordinator struct {
hub *Hub
services [2]runnable
}
type runnable interface {
Run()
Stop() error
}
func New(conf config.CoordinatorConfig, log *logger.Logger) (*Coordinator, error) {
coordinator := &Coordinator{}
lib := games.NewLib(conf.Coordinator.Library, conf.Emulator, log)
lib.Scan()
hub := NewHub(conf, lib, log)
coordinator.hub = NewHub(conf, lib, log)
h, err := NewHTTPServer(conf, log, func(mux *httpx.Mux) *httpx.Mux {
mux.HandleFunc("/ws", hub.handleUserConnection())
mux.HandleFunc("/wso", hub.handleWorkerConnection())
mux.HandleFunc("/ws", coordinator.hub.handleUserConnection())
mux.HandleFunc("/wso", coordinator.hub.handleWorkerConnection())
return mux
})
if err != nil {
log.Error().Err(err).Msg("http server init fail")
return
return nil, fmt.Errorf("http init fail: %w", err)
}
services.Add(hub, h)
coordinator.services[0] = h
if conf.Coordinator.Monitoring.IsEnabled() {
services.Add(monitoring.New(conf.Coordinator.Monitoring, h.GetHost(), log))
coordinator.services[1] = monitoring.New(conf.Coordinator.Monitoring, h.GetHost(), log)
}
return
return coordinator, nil
}
func (c *Coordinator) Start() {
for _, s := range c.services {
if s != nil {
s.Run()
}
}
}
func (c *Coordinator) Stop() error {
var err error
for _, s := range c.services {
if s != nil {
err0 := s.Stop()
err = errors.Join(err, err0)
}
}
return err
}
func NewHTTPServer(conf config.CoordinatorConfig, log *logger.Logger, fnMux func(*httpx.Mux) *httpx.Mux) (*httpx.Server, error) {

View file

@ -1,46 +0,0 @@
package service
import "fmt"
// Service defines a generic service.
type Service any
// RunnableService defines a service that can be run.
type RunnableService interface {
Service
Run()
Stop() error
}
// Group is a container for managing a bunch of services.
type Group struct {
list []Service
}
func (g *Group) Add(services ...Service) { g.list = append(g.list, services...) }
// Start starts each service in the group.
func (g *Group) Start() {
for _, s := range g.list {
if v, ok := s.(RunnableService); ok {
v.Run()
}
}
}
// Stop terminates a group of services.
func (g *Group) Stop() (err error) {
var errs []error
for _, s := range g.list {
if v, ok := s.(RunnableService); ok {
if err := v.Stop(); err != nil {
errs = append(errs, fmt.Errorf("error: failed to stop [%s] because of %v", s, err))
}
}
}
if len(errs) > 0 {
err = fmt.Errorf("%s", errs)
}
return
}

View file

@ -1,32 +1,41 @@
package worker
import (
"errors"
"fmt"
"time"
"github.com/giongto35/cloud-game/v3/pkg/config"
"github.com/giongto35/cloud-game/v3/pkg/logger"
"github.com/giongto35/cloud-game/v3/pkg/monitoring"
"github.com/giongto35/cloud-game/v3/pkg/network/httpx"
"github.com/giongto35/cloud-game/v3/pkg/service"
"github.com/giongto35/cloud-game/v3/pkg/worker/emulator/libretro/manager/remotehttp"
)
type Worker struct {
address string
conf config.WorkerConfig
cord *coordinator
log *logger.Logger
router Router
storage CloudStorage
done chan struct{}
address string
conf config.WorkerConfig
cord *coordinator
log *logger.Logger
router Router
services [2]runnable
storage CloudStorage
}
type runnable interface {
Run()
Stop() error
}
const retry = 10 * time.Second
func New(conf config.WorkerConfig, log *logger.Logger, done chan struct{}) (services service.Group) {
func New(conf config.WorkerConfig, log *logger.Logger) (*Worker, error) {
if err := remotehttp.CheckCores(conf.Emulator, log); err != nil {
log.Error().Err(err).Msg("cores sync error")
log.Warn().Err(err).Msgf("a Libretro cores sync fail")
}
worker := &Worker{conf: conf, log: log, router: NewRouter()}
h, err := httpx.NewServer(
conf.Worker.GetAddr(),
func(s *httpx.Server) httpx.Handler {
@ -36,30 +45,34 @@ func New(conf config.WorkerConfig, log *logger.Logger, done chan struct{}) (serv
})
},
httpx.WithServerConfig(conf.Worker.Server),
// no need just for one route
httpx.HttpsRedirect(false),
httpx.WithPortRoll(true),
httpx.WithZone(conf.Worker.Network.Zone),
httpx.WithLogger(log),
)
if err != nil {
log.Error().Err(err).Msg("http init fail")
return
return nil, fmt.Errorf("http init fail: %w", err)
}
services.Add(h)
worker.address = h.Addr
worker.services[0] = h
if conf.Worker.Monitoring.IsEnabled() {
services.Add(monitoring.New(conf.Worker.Monitoring, h.GetHost(), log))
worker.services[1] = monitoring.New(conf.Worker.Monitoring, h.GetHost(), log)
}
st, err := GetCloudStorage(conf.Storage.Provider, conf.Storage.Key)
if err != nil {
log.Error().Err(err).Msgf("cloud storage fail, using dummy cloud storage instead")
log.Warn().Err(err).Msgf("cloud storage fail, using dummy cloud storage instead")
}
services.Add(&Worker{address: h.Addr, conf: conf, done: done, log: log, storage: st, router: NewRouter()})
worker.storage = st
return
return worker, nil
}
func (w *Worker) Run() {
func (w *Worker) Start(done chan struct{}) {
for _, s := range w.services {
if s != nil {
s.Run()
}
}
go func() {
remoteAddr := w.conf.Worker.Network.CoordinatorAddress
defer func() {
@ -67,17 +80,16 @@ func (w *Worker) Run() {
w.cord.Disconnect()
}
w.router.Close()
w.log.Debug().Msgf("Service loop end")
}()
for {
select {
case <-w.done:
case <-done:
return
default:
cord, err := newCoordinatorConnection(remoteAddr, w.conf.Worker, w.address, w.log)
if err != nil {
w.log.Error().Err(err).Msgf("no connection: %v. Retrying in %v", remoteAddr, retry)
w.log.Warn().Err(err).Msgf("no connection: %v. Retrying in %v", remoteAddr, retry)
time.Sleep(retry)
continue
}
@ -89,4 +101,14 @@ func (w *Worker) Run() {
}
}()
}
func (w *Worker) Stop() error { return nil }
func (w *Worker) Stop() error {
var err error
for _, s := range w.services {
if s != nil {
err0 := s.Stop()
err = errors.Join(err, err0)
}
}
return err
}