diff --git a/pkg/worker/coordinatorhandlers.go b/pkg/worker/coordinatorhandlers.go index aa32e8a1..ebce6062 100644 --- a/pkg/worker/coordinatorhandlers.go +++ b/pkg/worker/coordinatorhandlers.go @@ -107,29 +107,34 @@ func (c *coordinator) HandleGameStart(rq api.StartGameRequest[com.Uid], w *Worke app.EnableCloudStorage(uid, w.storage) app.EnableRecording(rq.Record, rq.RecordUser, rq.Game.Name) + r.SetApp(app) + w.log.Info().Msgf("Starting the game: %v", rq.Game.Name) if err := app.Load(game, w.conf.Worker.Library.BasePath); err != nil { c.log.Error().Err(err).Msgf("couldn't load the game %v", game) - app.Close() + r.Close() w.router.SetRoom(nil) return api.EmptyPacket } - r.SetApp(app) m := media.NewWebRtcMediaPipe(w.conf.Encoder.Audio, w.conf.Encoder.Video, w.log) m.AudioSrcHz = app.AudioSampleRate() m.AudioFrame = w.conf.Encoder.Audio.Frame m.VideoW, m.VideoH = app.ViewportSize() + + r.SetMedia(m) + if err := m.Init(); err != nil { c.log.Error().Err(err).Msgf("couldn't init the media") - app.Close() + r.Close() w.router.SetRoom(nil) return api.EmptyPacket } if app.Flipped() { m.SetVideoFlip(true) } - r.SetMedia(m) + m.SetPixFmt(app.PixFormat()) + m.SetRot(app.Rotation()) r.BindAppMedia() r.StartApp() @@ -148,6 +153,7 @@ func (c *coordinator) HandleTerminateSession(rq api.TerminateSessionRequest[com. if user := w.router.FindUser(rq.Id); user != nil { w.router.Remove(user) user.Disconnect() + w.router.SetRoom(nil) } } @@ -155,6 +161,7 @@ func (c *coordinator) HandleTerminateSession(rq api.TerminateSessionRequest[com. func (c *coordinator) HandleQuitGame(rq api.GameQuitRequest[com.Uid], w *Worker) { if user := w.router.FindUser(rq.Id); user != nil { w.router.Remove(user) + w.router.SetRoom(nil) } } diff --git a/pkg/worker/room/room.go b/pkg/worker/room/room.go index 7c2aef78..b2341b4e 100644 --- a/pkg/worker/room/room.go +++ b/pkg/worker/room/room.go @@ -80,7 +80,7 @@ func (r *Room[T]) SetMedia(m MediaPipe) { r.media = m } func (r *Room[T]) StartApp() { r.app.Start() } func (r *Room[T]) Close() { - if r.closed { + if r == nil || r.closed { return } r.closed = true @@ -104,17 +104,6 @@ type Router[T Session] struct { mu sync.Mutex } -func (r *Router[T]) AddUser(user T) { r.users.Add(user) } - -func (r *Router[T]) Close() { - r.mu.Lock() - if r.room != nil { - r.room.Close() - r.room = nil - } - r.mu.Unlock() -} - func (r *Router[T]) FindRoom(id string) *Room[T] { r.mu.Lock() defer r.mu.Unlock() @@ -129,8 +118,11 @@ func (r *Router[T]) Remove(user T) { r.Close() } } -func (r *Router[T]) Room() *Room[T] { r.mu.Lock(); defer r.mu.Unlock(); return r.room } + +func (r *Router[T]) AddUser(user T) { r.users.Add(user) } +func (r *Router[T]) Close() { r.mu.Lock(); r.room.Close(); r.room = nil; r.mu.Unlock() } func (r *Router[T]) FindUser(uid Uid) T { return r.users.Find(uid.Id()) } +func (r *Router[T]) Room() *Room[T] { r.mu.Lock(); defer r.mu.Unlock(); return r.room } func (r *Router[T]) SetRoom(room *Room[T]) { r.mu.Lock(); r.room = room; r.mu.Unlock() } func (r *Router[T]) Users() SessionManager[T] { return r.users } diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go index c74375db..0c0f1d5a 100644 --- a/pkg/worker/worker.go +++ b/pkg/worker/worker.go @@ -28,12 +28,6 @@ type Worker struct { storage cloud.Storage } -func (w *Worker) Reset() { - w.log.Debug().Msgf("Users before close: %v", w.router.Users()) - w.router.Close() - w.log.Debug().Msgf("Users after close: %v", w.router.Users()) -} - const retry = 10 * time.Second func New(conf config.WorkerConfig, log *logger.Logger) (*Worker, error) { @@ -74,6 +68,8 @@ func New(conf config.WorkerConfig, log *logger.Logger) (*Worker, error) { return worker, nil } +func (w *Worker) Reset() { w.router.Close() } + func (w *Worker) Start(done chan struct{}) { for _, s := range w.services { if s != nil {