diff --git a/pkg/api/api.go b/pkg/api/api.go index 2deeb44a..93fedb17 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -39,6 +39,14 @@ type ( PT uint8 ) +func State[T Id](id T) Stateful[T] { + return Stateful[T]{Id: id} +} + +func StateRoom[T Id](id T, rid string) StatefulRoom[T] { + return StatefulRoom[T]{Stateful: State(id), Room: Room{Rid: rid}} +} + type In[I Id] struct { Id I `json:"id,omitempty"` T PT `json:"t"` @@ -157,6 +165,21 @@ var ( OkPacket = Out{Payload: "ok"} ) +func Do[I Id, T any](in In[I], fn func(T)) error { + if dat := Unwrap[T](in.Payload); dat != nil { + fn(*dat) + return nil + } + return ErrMalformed +} + +func DoE[I Id, T any](in In[I], fn func(T) error) error { + if dat := Unwrap[T](in.Payload); dat != nil { + return fn(*dat) + } + return ErrMalformed +} + func Unwrap[T any](data []byte) *T { out := new(T) if err := json.Unmarshal(data, out); err != nil { diff --git a/pkg/coordinator/user.go b/pkg/coordinator/user.go index b9d87e7e..2a4f1e98 100644 --- a/pkg/coordinator/user.go +++ b/pkg/coordinator/user.go @@ -44,67 +44,38 @@ func (u *User) Disconnect() { } func (u *User) HandleRequests(info HasServerInfo, conf config.CoordinatorConfig) chan struct{} { - return u.ProcessPackets(func(x api.In[com.Uid]) error { - payload := x.GetPayload() - switch x.GetType() { + return u.ProcessPackets(func(x api.In[com.Uid]) (err error) { + switch x.T { case api.WebrtcInit: if u.w != nil { u.HandleWebrtcInit() } case api.WebrtcAnswer: - rq := api.Unwrap[api.WebrtcAnswerUserRequest](payload) - if rq == nil { - return api.ErrMalformed - } - u.HandleWebrtcAnswer(*rq) + err = api.Do(x, u.HandleWebrtcAnswer) case api.WebrtcIce: - rq := api.Unwrap[api.WebrtcUserIceCandidate](payload) - if rq == nil { - return api.ErrMalformed - } - u.HandleWebrtcIceCandidate(*rq) + err = api.Do(x, u.HandleWebrtcIceCandidate) case api.StartGame: - rq := api.Unwrap[api.GameStartUserRequest](payload) - if rq == nil { - return api.ErrMalformed - } - u.HandleStartGame(*rq, conf) + err = api.Do(x, func(d api.GameStartUserRequest) { u.HandleStartGame(d, conf) }) case api.QuitGame: - rq := api.Unwrap[api.GameQuitRequest[com.Uid]](payload) - if rq == nil { - return api.ErrMalformed - } - u.HandleQuitGame(*rq) + err = api.Do(x, u.HandleQuitGame) case api.SaveGame: - return u.HandleSaveGame() + err = u.HandleSaveGame() case api.LoadGame: - return u.HandleLoadGame() + err = u.HandleLoadGame() case api.ChangePlayer: - rq := api.Unwrap[api.ChangePlayerUserRequest](payload) - if rq == nil { - return api.ErrMalformed - } - u.HandleChangePlayer(*rq) + err = api.Do(x, u.HandleChangePlayer) case api.ResetGame: - rq := api.Unwrap[api.ResetGameRequest[com.Uid]](payload) - if rq == nil { - return api.ErrMalformed - } - u.HandleResetGame(*rq) + err = api.Do(x, u.HandleResetGame) case api.RecordGame: if !conf.Recording.Enabled { return api.ErrForbidden } - rq := api.Unwrap[api.RecordGameRequest[com.Uid]](payload) - if rq == nil { - return api.ErrMalformed - } - u.HandleRecordGame(*rq) + err = api.Do(x, u.HandleRecordGame) case api.GetWorkerList: u.handleGetWorkerList(conf.Coordinator.Debug, info) default: u.log.Warn().Msgf("Unknown packet: %+v", x) } - return nil + return }) } diff --git a/pkg/coordinator/userapi.go b/pkg/coordinator/userapi.go index 047fe1d1..fd8b7235 100644 --- a/pkg/coordinator/userapi.go +++ b/pkg/coordinator/userapi.go @@ -10,15 +10,11 @@ import ( // CheckLatency sends a list of server addresses to the user // and waits get back this list with tested ping times for each server. func (u *User) CheckLatency(req api.CheckLatencyUserResponse) (api.CheckLatencyUserRequest, error) { - data, err := u.Send(api.CheckLatency, req) - if err != nil || data == nil { - return nil, err - } - dat := api.Unwrap[api.CheckLatencyUserRequest](data) + dat, err := api.UnwrapChecked[api.CheckLatencyUserRequest](u.Send(api.CheckLatency, req)) if dat == nil { return api.CheckLatencyUserRequest{}, err } - return *dat, err + return *dat, nil } // InitSession signals the user that the app is ready to go. diff --git a/pkg/coordinator/worker.go b/pkg/coordinator/worker.go index f4b2b2d0..e89d3dad 100644 --- a/pkg/coordinator/worker.go +++ b/pkg/coordinator/worker.go @@ -1,6 +1,7 @@ package coordinator import ( + "errors" "fmt" "sync/atomic" @@ -75,60 +76,35 @@ func NewWorker(sock *com.Connection, handshake api.ConnectionRequest[com.Uid], l } func (w *Worker) HandleRequests(users HasUserRegistry) chan struct{} { - return w.ProcessPackets(func(p api.In[com.Uid]) error { - payload := p.GetPayload() - switch p.GetType() { + return w.ProcessPackets(func(p api.In[com.Uid]) (err error) { + switch p.T { case api.RegisterRoom: - rq := api.Unwrap[api.RegisterRoomRequest](payload) - if rq == nil { - return api.ErrMalformed - } - w.log.Info().Msgf("set room [%v] = %v", w.Id(), *rq) - w.HandleRegisterRoom(*rq) + err = api.Do(p, func(d api.RegisterRoomRequest) { + w.log.Info().Msgf("set room [%v] = %v", w.Id(), d) + w.HandleRegisterRoom(d) + }) case api.CloseRoom: - rq := api.Unwrap[api.CloseRoomRequest](payload) - if rq == nil { - return api.ErrMalformed - } - w.HandleCloseRoom(*rq) + err = api.Do(p, w.HandleCloseRoom) case api.IceCandidate: - rq := api.Unwrap[api.WebrtcIceCandidateRequest[com.Uid]](payload) - if rq == nil { - return api.ErrMalformed - } - err := w.HandleIceCandidate(*rq, users) - if err != nil { - w.log.Error().Err(err).Send() - return api.ErrMalformed - } + err = api.DoE(p, func(d api.WebrtcIceCandidateRequest[com.Uid]) error { + return w.HandleIceCandidate(d, users) + }) case api.LibNewGameList: - inf := api.Unwrap[api.LibGameListInfo](payload) - if inf == nil { - return api.ErrMalformed - } - if err := w.HandleLibGameList(*inf); err != nil { - w.log.Error().Err(err).Send() - return api.ErrMalformed - } + err = api.DoE(p, w.HandleLibGameList) case api.PrevSessions: - sess := api.Unwrap[api.PrevSessionInfo](payload) - if sess == nil { - return api.ErrMalformed - } - if err := w.HandlePrevSessionList(*sess); err != nil { - w.log.Error().Err(err).Send() - return api.ErrMalformed - } + err = api.DoE(p, w.HandlePrevSessionList) default: w.log.Warn().Msgf("Unknown packet: %+v", p) } - return nil + if err != nil && !errors.Is(err, api.ErrMalformed) { + w.log.Error().Err(err).Send() + err = api.ErrMalformed + } + return }) } -func (w *Worker) SetLib(list []api.GameInfo) { - w.Lib = list -} +func (w *Worker) SetLib(list []api.GameInfo) { w.Lib = list } func (w *Worker) AppNames() []api.GameInfo { return w.Lib diff --git a/pkg/coordinator/workerapi.go b/pkg/coordinator/workerapi.go index 7a1ccf51..43205871 100644 --- a/pkg/coordinator/workerapi.go +++ b/pkg/coordinator/workerapi.go @@ -5,23 +5,27 @@ import ( "github.com/giongto35/cloud-game/v3/pkg/com" ) +func (w *Worker) room(id com.Uid) api.StatefulRoom[com.Uid] { + return api.StateRoom(id, w.RoomId) +} + func (w *Worker) WebrtcInit(id com.Uid) (*api.WebrtcInitResponse, error) { return api.UnwrapChecked[api.WebrtcInitResponse]( - w.Send(api.WebrtcInit, api.WebrtcInitRequest[com.Uid]{Stateful: api.Stateful[com.Uid]{Id: id}})) + w.Send(api.WebrtcInit, api.WebrtcInitRequest[com.Uid]{Stateful: api.State(id)})) } func (w *Worker) WebrtcAnswer(id com.Uid, sdp string) { - w.Notify(api.WebrtcAnswer, api.WebrtcAnswerRequest[com.Uid]{Stateful: api.Stateful[com.Uid]{Id: id}, Sdp: sdp}) + w.Notify(api.WebrtcAnswer, api.WebrtcAnswerRequest[com.Uid]{Stateful: api.State(id), Sdp: sdp}) } func (w *Worker) WebrtcIceCandidate(id com.Uid, can string) { - w.Notify(api.WebrtcIce, api.WebrtcIceCandidateRequest[com.Uid]{Stateful: api.Stateful[com.Uid]{Id: id}, Candidate: can}) + w.Notify(api.WebrtcIce, api.WebrtcIceCandidateRequest[com.Uid]{Stateful: api.State(id), Candidate: can}) } func (w *Worker) StartGame(id com.Uid, req api.GameStartUserRequest) (*api.StartGameResponse, error) { return api.UnwrapChecked[api.StartGameResponse]( w.Send(api.StartGame, api.StartGameRequest[com.Uid]{ - StatefulRoom: StateRoom(id, req.RoomId), + StatefulRoom: api.StateRoom(id, req.RoomId), Game: req.GameName, PlayerIndex: req.PlayerIndex, Record: req.Record, @@ -30,37 +34,33 @@ func (w *Worker) StartGame(id com.Uid, req api.GameStartUserRequest) (*api.Start } func (w *Worker) QuitGame(id com.Uid) { - w.Notify(api.QuitGame, api.GameQuitRequest[com.Uid]{StatefulRoom: StateRoom(id, w.RoomId)}) + w.Notify(api.QuitGame, api.GameQuitRequest[com.Uid]{StatefulRoom: w.room(id)}) } func (w *Worker) SaveGame(id com.Uid) (*api.SaveGameResponse, error) { return api.UnwrapChecked[api.SaveGameResponse]( - w.Send(api.SaveGame, api.SaveGameRequest[com.Uid]{StatefulRoom: StateRoom(id, w.RoomId)})) + w.Send(api.SaveGame, api.SaveGameRequest[com.Uid]{StatefulRoom: w.room(id)})) } func (w *Worker) LoadGame(id com.Uid) (*api.LoadGameResponse, error) { return api.UnwrapChecked[api.LoadGameResponse]( - w.Send(api.LoadGame, api.LoadGameRequest[com.Uid]{StatefulRoom: StateRoom(id, w.RoomId)})) + w.Send(api.LoadGame, api.LoadGameRequest[com.Uid]{StatefulRoom: w.room(id)})) } func (w *Worker) ChangePlayer(id com.Uid, index int) (*api.ChangePlayerResponse, error) { return api.UnwrapChecked[api.ChangePlayerResponse]( - w.Send(api.ChangePlayer, api.ChangePlayerRequest[com.Uid]{StatefulRoom: StateRoom(id, w.RoomId), Index: index})) + w.Send(api.ChangePlayer, api.ChangePlayerRequest[com.Uid]{StatefulRoom: w.room(id), Index: index})) } func (w *Worker) ResetGame(id com.Uid) { - w.Notify(api.ResetGame, api.ResetGameRequest[com.Uid]{StatefulRoom: StateRoom(id, w.RoomId)}) + w.Notify(api.ResetGame, api.ResetGameRequest[com.Uid]{StatefulRoom: w.room(id)}) } func (w *Worker) RecordGame(id com.Uid, rec bool, recUser string) (*api.RecordGameResponse, error) { return api.UnwrapChecked[api.RecordGameResponse]( - w.Send(api.RecordGame, api.RecordGameRequest[com.Uid]{StatefulRoom: StateRoom(id, w.RoomId), Active: rec, User: recUser})) + w.Send(api.RecordGame, api.RecordGameRequest[com.Uid]{StatefulRoom: w.room(id), Active: rec, User: recUser})) } func (w *Worker) TerminateSession(id com.Uid) { - _, _ = w.Send(api.TerminateSession, api.TerminateSessionRequest[com.Uid]{Stateful: api.Stateful[com.Uid]{Id: id}}) -} - -func StateRoom[T api.Id](id T, rid string) api.StatefulRoom[T] { - return api.StatefulRoom[T]{Stateful: api.Stateful[T]{Id: id}, Room: api.Room{Rid: rid}} + _, _ = w.Send(api.TerminateSession, api.TerminateSessionRequest[com.Uid]{Stateful: api.State(id)}) } diff --git a/pkg/worker/coordinator.go b/pkg/worker/coordinator.go index 07963577..f56f84b4 100644 --- a/pkg/worker/coordinator.go +++ b/pkg/worker/coordinator.go @@ -67,84 +67,41 @@ func (c *coordinator) HandleRequests(w *Worker) chan struct{} { if err != nil { c.log.Panic().Err(err).Msg("WebRTC API creation has been failed") } - skipped := api.Out{} return c.ProcessPackets(func(x api.In[com.Uid]) (err error) { var out api.Out + switch x.T { case api.WebrtcInit: - if dat := api.Unwrap[api.WebrtcInitRequest[com.Uid]](x.Payload); dat == nil { - err, out = api.ErrMalformed, api.EmptyPacket - } else { - out = c.HandleWebrtcInit(*dat, w, ap) - } - case api.WebrtcAnswer: - dat := api.Unwrap[api.WebrtcAnswerRequest[com.Uid]](x.Payload) - if dat == nil { - return api.ErrMalformed - } - c.HandleWebrtcAnswer(*dat, w) - case api.WebrtcIce: - dat := api.Unwrap[api.WebrtcIceCandidateRequest[com.Uid]](x.Payload) - if dat == nil { - return api.ErrMalformed - } - c.HandleWebrtcIceCandidate(*dat, w) + err = api.Do(x, func(d api.WebrtcInitRequest[com.Uid]) { out = c.HandleWebrtcInit(d, w, ap) }) case api.StartGame: - if dat := api.Unwrap[api.StartGameRequest[com.Uid]](x.Payload); dat == nil { - err, out = api.ErrMalformed, api.EmptyPacket - } else { - out = c.HandleGameStart(*dat, w) - } - case api.TerminateSession: - dat := api.Unwrap[api.TerminateSessionRequest[com.Uid]](x.Payload) - if dat == nil { - return api.ErrMalformed - } - c.HandleTerminateSession(*dat, w) - case api.QuitGame: - dat := api.Unwrap[api.GameQuitRequest[com.Uid]](x.Payload) - if dat == nil { - return api.ErrMalformed - } - c.HandleQuitGame(*dat, w) + err = api.Do(x, func(d api.StartGameRequest[com.Uid]) { out = c.HandleGameStart(d, w) }) case api.SaveGame: - if dat := api.Unwrap[api.SaveGameRequest[com.Uid]](x.Payload); dat == nil { - err, out = api.ErrMalformed, api.EmptyPacket - } else { - out = c.HandleSaveGame(*dat, w) - } + err = api.Do(x, func(d api.SaveGameRequest[com.Uid]) { out = c.HandleSaveGame(d, w) }) case api.LoadGame: - if dat := api.Unwrap[api.LoadGameRequest[com.Uid]](x.Payload); dat == nil { - err, out = api.ErrMalformed, api.EmptyPacket - } else { - out = c.HandleLoadGame(*dat, w) - } + err = api.Do(x, func(d api.LoadGameRequest[com.Uid]) { out = c.HandleLoadGame(d, w) }) case api.ChangePlayer: - if dat := api.Unwrap[api.ChangePlayerRequest[com.Uid]](x.Payload); dat == nil { - err, out = api.ErrMalformed, api.EmptyPacket - } else { - out = c.HandleChangePlayer(*dat, w) - } - case api.ResetGame: - dat := api.Unwrap[api.ResetGameRequest[com.Uid]](x.Payload) - if dat == nil { - return api.ErrMalformed - } - c.HandleResetGame(*dat, w) + err = api.Do(x, func(d api.ChangePlayerRequest[com.Uid]) { out = c.HandleChangePlayer(d, w) }) case api.RecordGame: - if dat := api.Unwrap[api.RecordGameRequest[com.Uid]](x.Payload); dat == nil { - err, out = api.ErrMalformed, api.EmptyPacket - } else { - out = c.HandleRecordGame(*dat, w) - } + err = api.Do(x, func(d api.RecordGameRequest[com.Uid]) { out = c.HandleRecordGame(d, w) }) + case api.WebrtcAnswer: + err = api.Do(x, func(d api.WebrtcAnswerRequest[com.Uid]) { c.HandleWebrtcAnswer(d, w) }) + case api.WebrtcIce: + err = api.Do(x, func(d api.WebrtcIceCandidateRequest[com.Uid]) { c.HandleWebrtcIceCandidate(d, w) }) + case api.TerminateSession: + err = api.Do(x, func(d api.TerminateSessionRequest[com.Uid]) { c.HandleTerminateSession(d, w) }) + case api.QuitGame: + err = api.Do(x, func(d api.GameQuitRequest[com.Uid]) { c.HandleQuitGame(d, w) }) + case api.ResetGame: + err = api.Do(x, func(d api.ResetGameRequest[com.Uid]) { c.HandleResetGame(d, w) }) default: c.log.Warn().Msgf("unhandled packet type %v", x.T) } - if out != skipped { + + if out != (api.Out{}) { w.cord.Route(x, &out) } - return err + return }) }