Handle no config situation for workers (#253)

(experimental feature)

Before a worker can start, it should have a configuration file. In case if such a file is not found it may request configuration from the coordinator to which it connected.

Added example logic if a worker is needed to be blocked until a successful packet exchange with a coordinator is being made.

* Add error return for config loader

* Add config loaded flag to worker

* Add zone flag

* Add a custom mutex lock with timout

* Refactor worker runtime

* Refactor internal api

* Extract monitoring server config

* Extract worker HTTP(S) server

* Add generic sub-server interface

* Add internal coordinator API

* Add internal routes and handlers to worker

* Add internal worker API

* Refactor worker run

* Migrate serverId call to new API

* Add packet handler to cws

* Extract handlers for internal worker routes in coordinator

* Pass worker to the worker internal heandlers

* Cleanup worker handlers in coordinator

* Add closeRoom packet handler to the API

* Add GetRoom packet handler to the API

* Add RegisterRoom packet handler to the API

* Add IceCandidate packet handler to the API (internal and browser)

* Add Heartbeat packet handler to the API (internal and browser)

* Rename worker routes init function

* Extract worker/coordinator internal ws handlers

* Update timed locker

* Allow sequential timed locks

* Add config request from workers

* Add nil check for the route registration functions
This commit is contained in:
sergystepanov 2021-01-03 21:23:55 +03:00 committed by Sergey Stepanov
parent 6cee6061b6
commit 980a97a526
No known key found for this signature in database
GPG key ID: A56B4929BAA8556B
31 changed files with 1134 additions and 862 deletions

View file

@ -16,9 +16,11 @@ import (
flag "github.com/spf13/pflag"
)
func run() {
func init() {
rand.Seed(time.Now().UTC().UnixNano())
}
func run() {
conf := config.NewConfig()
flag.CommandLine.AddGoFlagSet(goflag.CommandLine)
conf.ParseFlags()
@ -28,20 +30,17 @@ func run() {
ctx, cancelCtx := context.WithCancel(context.Background())
glog.Infof("Initializing worker server")
glog.V(4).Infof("Worker configs %v", conf)
o := worker.New(ctx, conf)
if err := o.Run(); err != nil {
glog.Errorf("Failed to run worker, reason %v", err)
os.Exit(1)
}
glog.V(4).Info("[worker] Initialization")
glog.V(4).Infof("[worker] Local configuration %+v", conf)
wrk := worker.New(ctx, conf)
wrk.Run()
stop := make(chan os.Signal, 1)
signal.Notify(stop, os.Interrupt)
select {
case <-stop:
glog.Infoln("Received SIGTERM, Quiting Worker")
o.Shutdown()
glog.V(4).Info("[worker] Shutting down")
wrk.Shutdown()
cancelCtx()
}
}

View file

@ -3,10 +3,10 @@ package coordinator
import (
"github.com/giongto35/cloud-game/v2/pkg/config"
"github.com/giongto35/cloud-game/v2/pkg/config/emulator"
"github.com/giongto35/cloud-game/v2/pkg/config/monitoring"
"github.com/giongto35/cloud-game/v2/pkg/config/shared"
webrtcConfig "github.com/giongto35/cloud-game/v2/pkg/config/webrtc"
"github.com/giongto35/cloud-game/v2/pkg/games"
"github.com/giongto35/cloud-game/v2/pkg/monitoring"
flag "github.com/spf13/pflag"
)
@ -21,16 +21,17 @@ type Config struct {
}
Emulator emulator.Emulator
Environment shared.Environment
Webrtc struct {
IceServers []webrtcConfig.IceServer
}
Webrtc webrtcConfig.Webrtc
}
// allows custom config path
var configPath string
func NewConfig() (conf Config) {
config.LoadConfig(&conf, configPath)
err := config.LoadConfig(&conf, configPath)
if err != nil {
panic(err)
}
return
}

View file

@ -10,16 +10,17 @@ import (
// The path param specifies a custom path to the configuration file.
// Reads and puts environment variables with the prefix CLOUD_GAME_.
// Params from the config should be in uppercase separated with _.
func LoadConfig(config interface{}, path string) interface{} {
func LoadConfig(config interface{}, path string) error {
envPrefix := "CLOUD_GAME"
dirs := []string{path}
if path == "" {
dirs = append(dirs, ".", "configs", "../../../configs")
if home, err := os.UserHomeDir(); err == nil {
dirs = append(dirs, ".", "configs", home+"/.cr", "../../../configs")
dirs = append(dirs, home+"/.cr")
}
}
if err := fig.Load(config, fig.Dirs(dirs...), fig.UseEnv(envPrefix)); err != nil {
panic(err)
return err
}
return config
return nil
}

View file

@ -0,0 +1,8 @@
package monitoring
type ServerMonitoringConfig struct {
Port int
URLPrefix string
MetricEnabled bool `json:"metric_enabled"`
ProfilingEnabled bool `json:"profiling_enabled"`
}

View file

@ -1,12 +1,14 @@
package worker
import (
"encoding/json"
"github.com/giongto35/cloud-game/v2/pkg/config"
"github.com/giongto35/cloud-game/v2/pkg/config/emulator"
"github.com/giongto35/cloud-game/v2/pkg/config/encoder"
"github.com/giongto35/cloud-game/v2/pkg/config/monitoring"
"github.com/giongto35/cloud-game/v2/pkg/config/shared"
webrtcConfig "github.com/giongto35/cloud-game/v2/pkg/config/webrtc"
"github.com/giongto35/cloud-game/v2/pkg/monitoring"
flag "github.com/spf13/pflag"
)
@ -23,13 +25,21 @@ type Config struct {
Server shared.Server
}
Webrtc webrtcConfig.Webrtc
Loaded bool
}
// allows custom config path
var configPath string
func NewConfig() (conf Config) {
config.LoadConfig(&conf, configPath)
if err := config.LoadConfig(&conf, configPath); err == nil {
conf.Loaded = true
}
return
}
func EmptyConfig() (conf Config) {
conf.Loaded = false
return
}
@ -41,6 +51,18 @@ func (c *Config) ParseFlags() {
c.Worker.Server.WithFlags()
flag.IntVar(&c.Worker.Monitoring.Port, "monitoring.port", c.Worker.Monitoring.Port, "Monitoring server port")
flag.StringVar(&c.Worker.Network.CoordinatorAddress, "coordinatorhost", c.Worker.Network.CoordinatorAddress, "Worker URL to connect")
flag.StringVar(&c.Worker.Network.Zone, "zone", c.Worker.Network.Zone, "Worker network zone (us, eu, etc.)")
flag.StringVarP(&configPath, "conf", "c", configPath, "Set custom configuration file path")
flag.Parse()
}
func (c *Config) Serialize() []byte {
res, _ := json.Marshal(c)
return res
}
func (c *Config) Deserialize(data []byte) {
if err := json.Unmarshal(data, c); err == nil {
c.Loaded = true
}
}

View file

@ -1,14 +1,10 @@
package coordinator
import (
"errors"
"fmt"
"log"
"github.com/giongto35/cloud-game/v2/pkg/api"
"github.com/giongto35/cloud-game/v2/pkg/cws"
"github.com/giongto35/cloud-game/v2/pkg/games"
"github.com/giongto35/cloud-game/v2/pkg/worker/room"
"github.com/gorilla/websocket"
)
@ -30,206 +26,9 @@ func NewBrowserClient(c *websocket.Conn, browserID string) *BrowserClient {
// Register new log
func (bc *BrowserClient) Printf(format string, args ...interface{}) {
newFmt := fmt.Sprintf("Browser %s] %s", bc.SessionID, format)
log.Printf(newFmt, args...)
log.Printf(fmt.Sprintf("Browser %s] %s", bc.SessionID, format), args...)
}
func (bc *BrowserClient) Println(args ...interface{}) {
msg := fmt.Sprintf("Browser %s] %s", bc.SessionID, fmt.Sprint(args...))
log.Println(msg)
}
// RouteBrowser
// Register callbacks for connection from browser -> coordinator
func (o *Server) RouteBrowser(client *BrowserClient) {
/* WebSocket */
client.Receive("heartbeat", func(resp cws.WSPacket) cws.WSPacket {
return resp
})
/* WebRTC */
client.Receive("initwebrtc", func(resp cws.WSPacket) cws.WSPacket {
// initwebrtc now only sends signal to worker, asks it to createOffer
client.Printf("Received initwebrtc request -> relay to worker: %s", client.WorkerID)
// relay request to target worker
// worker creates a PeerConnection, and createOffer
// send SDP back to browser
resp.SessionID = client.SessionID
wc, ok := o.workerClients[client.WorkerID]
if !ok {
return cws.EmptyPacket
}
sdp := wc.SyncSend(resp)
client.Println("Received SDP from worker -> sending back to browser")
return sdp
})
client.Receive("answer", func(resp cws.WSPacket) cws.WSPacket {
// contains SDP of browser createAnswer
// forward to worker
client.Println("Received browser answered SDP -> relay to worker")
resp.SessionID = client.SessionID
wc, ok := o.workerClients[client.WorkerID]
if !ok {
return cws.EmptyPacket
}
wc.Send(resp, nil)
// no need to response
return cws.EmptyPacket
})
client.Receive("candidate", func(resp cws.WSPacket) cws.WSPacket {
// contains ICE candidate of browser
// forward to worker
client.Println("Received IceCandidate from browser -> relay to worker")
resp.SessionID = client.SessionID
wc, ok := o.workerClients[client.WorkerID]
if !ok {
return cws.EmptyPacket
}
wc.Send(resp, nil)
return cws.EmptyPacket
})
/* GameLogic */
client.Receive("quit", func(resp cws.WSPacket) (req cws.WSPacket) {
client.Println("Received quit request from a browser -> relay to worker")
// TODO: Async
resp.SessionID = client.SessionID
wc, ok := o.workerClients[client.WorkerID]
if !ok {
return cws.EmptyPacket
}
// Send but, waiting
wc.SyncSend(resp)
return cws.EmptyPacket
})
client.Receive("start", func(resp cws.WSPacket) cws.WSPacket {
client.Println("Received start request from a browser -> relay to worker")
// TODO: Async
resp.SessionID = client.SessionID
wc, ok := o.workerClients[client.WorkerID]
if !ok {
return cws.EmptyPacket
}
// +injects game data into the original game request
gameStartCall, err := newGameStartCall(resp.RoomID, resp.Data, o.library)
if err != nil {
return cws.EmptyPacket
}
if packet, err := gameStartCall.To(); err != nil {
return cws.EmptyPacket
} else {
resp.Data = packet
}
workerResp := wc.SyncSend(resp)
// Response from worker contains initialized roomID. Set roomID to the session
client.RoomID = workerResp.RoomID
client.Println("Received room response from browser: ", workerResp.RoomID)
return workerResp
})
client.Receive("save", func(resp cws.WSPacket) (req cws.WSPacket) {
client.Println("Received save request from a browser -> relay to worker")
// TODO: Async
resp.SessionID = client.SessionID
resp.RoomID = client.RoomID
wc, ok := o.workerClients[client.WorkerID]
if !ok {
return cws.EmptyPacket
}
resp = wc.SyncSend(resp)
return resp
})
client.Receive("load", func(resp cws.WSPacket) (req cws.WSPacket) {
client.Println("Received load request from a browser -> relay to worker")
// TODO: Async
resp.SessionID = client.SessionID
resp.RoomID = client.RoomID
wc, ok := o.workerClients[client.WorkerID]
if !ok {
return cws.EmptyPacket
}
resp = wc.SyncSend(resp)
return resp
})
client.Receive("playerIdx", func(resp cws.WSPacket) (req cws.WSPacket) {
client.Println("Received update player index request from a browser -> relay to worker")
// TODO: Async
resp.SessionID = client.SessionID
resp.RoomID = client.RoomID
wc, ok := o.workerClients[client.WorkerID]
if !ok {
return cws.EmptyPacket
}
resp = wc.SyncSend(resp)
return resp
})
client.Receive("multitap", func(resp cws.WSPacket) (req cws.WSPacket) {
client.Println("Received multitap request from a browser -> relay to worker")
// TODO: Async
resp.SessionID = client.SessionID
resp.RoomID = client.RoomID
wc, ok := o.workerClients[client.WorkerID]
if !ok {
return cws.EmptyPacket
}
resp = wc.SyncSend(resp)
return resp
})
}
// newGameStartCall gathers data for a new game start call of the worker
func newGameStartCall(roomId string, data string, library games.GameLibrary) (api.GameStartCall, error) {
request := api.GameStartRequest{}
if err := request.From(data); err != nil {
return api.GameStartCall{}, errors.New("invalid request")
}
// the name of the game either in the `room id` field or
// it's in the initial request
game := request.GameName
if roomId != "" {
// ! should be moved into coordinator
name := room.GetGameNameFromRoomID(roomId)
if name == "" {
return api.GameStartCall{}, errors.New("couldn't decode game name from the room id")
}
game = name
}
gameInfo := library.FindGameByName(game)
if gameInfo.Path == "" {
return api.GameStartCall{}, fmt.Errorf("couldn't find game info for the game %v", game)
}
return api.GameStartCall{
Name: gameInfo.Name,
Path: gameInfo.Path,
Type: gameInfo.Type,
}, nil
log.Println(fmt.Sprintf("Browser %s] %s", bc.SessionID, fmt.Sprint(args...)))
}

View file

@ -33,7 +33,7 @@ func New(ctx context.Context, cfg coordinator.Config) *Coordinator {
ctx: ctx,
cfg: cfg,
monitoringServer: monitoring.NewServerMonitoring(cfg.Coordinator.Monitoring),
monitoringServer: monitoring.NewServerMonitoring(cfg.Coordinator.Monitoring, "cord"),
}
}

View file

@ -12,6 +12,7 @@ import (
"github.com/giongto35/cloud-game/v2/pkg/config/coordinator"
"github.com/giongto35/cloud-game/v2/pkg/cws"
"github.com/giongto35/cloud-game/v2/pkg/cws/api"
"github.com/giongto35/cloud-game/v2/pkg/environment"
"github.com/giongto35/cloud-game/v2/pkg/games"
"github.com/giongto35/cloud-game/v2/pkg/util"
@ -63,7 +64,8 @@ func (o *Server) GetWeb(w http.ResponseWriter, r *http.Request) {
tmpl.Execute(w, struct{}{})
}
// getPingServer returns the server for latency check of a zone. In latency check to find best worker step, we use this server to find the closest worker.
// getPingServer returns the server for latency check of a zone.
// In latency check to find best worker step, we use this server to find the closest worker.
func (o *Server) getPingServer(zone string) string {
if o.cfg.Coordinator.PingServer != "" {
return fmt.Sprintf("%s/echo", o.cfg.Coordinator.PingServer)
@ -73,8 +75,6 @@ func (o *Server) getPingServer(zone string) string {
if mode.AnyOf(environment.Production, environment.Staging) {
return fmt.Sprintf(pingServerTemp, zone, o.cfg.Coordinator.PublicDomain)
}
// If not Prod or Staging, return dev environment
return devPingServer
}
@ -137,20 +137,13 @@ func (o *Server) WSO(w http.ResponseWriter, r *http.Request) {
wc.Zone = zone
wc.PingServer = pingServer
// Eveything is cool
// Attach to Server instance with workerID, add defer
o.workerClients[workerID] = wc
defer o.cleanWorker(wc, workerID)
// Sendback the ID to worker
// TODO: do we need this packet?
wc.Send(cws.WSPacket{
ID: "serverID",
Data: workerID,
}, nil)
wc.Send(api.ServerIdPacket(workerID), nil)
// Add receiver callbacks, and listen
o.RouteWorker(wc)
o.workerRoutes(wc)
wc.Listen()
}
@ -233,7 +226,7 @@ func (o *Server) WS(w http.ResponseWriter, r *http.Request) {
defer o.cleanBrowser(bc, sessionID)
// Routing browserClient message
o.RouteBrowser(bc)
o.useragentRoutes(bc)
bc.Send(cws.WSPacket{
ID: "init",
@ -244,10 +237,7 @@ func (o *Server) WS(w http.ResponseWriter, r *http.Request) {
<-bc.Done
// Notify worker to clean session
wc.Send(cws.WSPacket{
ID: "terminateSession",
SessionID: sessionID,
}, nil)
wc.Send(api.TerminateSessionPacket(sessionID), nil)
// WorkerClient become available again
wc.IsAvailable = true

View file

@ -0,0 +1,69 @@
package coordinator
import (
"log"
"github.com/giongto35/cloud-game/v2/pkg/config/worker"
"github.com/giongto35/cloud-game/v2/pkg/cws"
"github.com/giongto35/cloud-game/v2/pkg/cws/api"
)
func (wc *WorkerClient) handleConfigRequest() cws.PacketHandler {
return func(resp cws.WSPacket) cws.WSPacket {
// try to load worker config
conf := worker.NewConfig()
return api.ConfigRequestPacket(conf.Serialize())
}
}
func (wc *WorkerClient) handleHeartbeat() cws.PacketHandler {
return func(resp cws.WSPacket) cws.WSPacket {
return resp
}
}
// handleRegisterRoom event from a worker, when worker created a new room.
// RoomID is global so it is managed by coordinator.
func (wc *WorkerClient) handleRegisterRoom(s *Server) cws.PacketHandler {
return func(resp cws.WSPacket) cws.WSPacket {
log.Printf("Coordinator: Received registerRoom room %s from worker %s", resp.Data, wc.WorkerID)
s.roomToWorker[resp.Data] = wc.WorkerID
log.Printf("Coordinator: Current room list is: %+v", s.roomToWorker)
return api.RegisterRoomPacket(api.NoData)
}
}
// handleGetRoom returns the server ID based on requested roomID.
func (wc *WorkerClient) handleGetRoom(s *Server) cws.PacketHandler {
return func(resp cws.WSPacket) cws.WSPacket {
log.Println("Coordinator: Received a get room request")
log.Println("Result: ", s.roomToWorker[resp.Data])
return api.GetRoomPacket(s.roomToWorker[resp.Data])
}
}
// handleCloseRoom event from a worker, when worker close a room.
func (wc *WorkerClient) handleCloseRoom(s *Server) cws.PacketHandler {
return func(resp cws.WSPacket) cws.WSPacket {
log.Printf("Coordinator: Received closeRoom room %s from worker %s", resp.Data, wc.WorkerID)
delete(s.roomToWorker, resp.Data)
log.Printf("Coordinator: Current room list is: %+v", s.roomToWorker)
return api.CloseRoomPacket(api.NoData)
}
}
// handleIceCandidate passes an ICE candidate (WebRTC) to the browser.
func (wc *WorkerClient) handleIceCandidate(s *Server) cws.PacketHandler {
return func(resp cws.WSPacket) cws.WSPacket {
wc.Println("Received IceCandidate from worker -> relay to browser")
bc, ok := s.browserClients[resp.SessionID]
if ok {
// Remove SessionID while sending back to browser
resp.SessionID = ""
bc.Send(resp, nil)
} else {
wc.Println("Error: unknown SessionID:", resp.SessionID)
}
return cws.EmptyPacket
}
}

33
pkg/coordinator/routes.go Normal file
View file

@ -0,0 +1,33 @@
package coordinator
import "github.com/giongto35/cloud-game/v2/pkg/cws/api"
// workerRoutes adds all worker request routes.
func (o *Server) workerRoutes(wc *WorkerClient) {
if wc == nil {
return
}
wc.Receive(api.ConfigRequest, wc.handleConfigRequest())
wc.Receive(api.Heartbeat, wc.handleHeartbeat())
wc.Receive(api.RegisterRoom, wc.handleRegisterRoom(o))
wc.Receive(api.GetRoom, wc.handleGetRoom(o))
wc.Receive(api.CloseRoom, wc.handleCloseRoom(o))
wc.Receive(api.IceCandidate, wc.handleIceCandidate(o))
}
// useragentRoutes adds all useragent (browser) request routes.
func (o *Server) useragentRoutes(bc *BrowserClient) {
if bc == nil {
return
}
bc.Receive(api.Heartbeat, bc.handleHeartbeat())
bc.Receive(api.InitWebrtc, bc.handleInitWebrtc(o))
bc.Receive(api.Answer, bc.handleAnswer(o))
bc.Receive(api.IceCandidate, bc.handleIceCandidate(o))
bc.Receive(api.GameStart, bc.handleGameStart(o))
bc.Receive(api.GameQuit, bc.handleGameQuit(o))
bc.Receive(api.GameSave, bc.handleGameSave(o))
bc.Receive(api.GameLoad, bc.handleGameLoad(o))
bc.Receive(api.GamePlayerSelect, bc.handleGamePlayerSelect(o))
bc.Receive(api.GameMultitap, bc.handleGameMultitap(o))
}

View file

@ -0,0 +1,211 @@
package coordinator
import (
"errors"
"fmt"
"github.com/giongto35/cloud-game/v2/pkg/cws"
"github.com/giongto35/cloud-game/v2/pkg/cws/api"
"github.com/giongto35/cloud-game/v2/pkg/games"
"github.com/giongto35/cloud-game/v2/pkg/worker/room"
)
func (bc *BrowserClient) handleHeartbeat() cws.PacketHandler {
return func(resp cws.WSPacket) cws.WSPacket { return resp }
}
func (bc *BrowserClient) handleInitWebrtc(o *Server) cws.PacketHandler {
return func(resp cws.WSPacket) (req cws.WSPacket) {
// initWebrtc now only sends signal to worker, asks it to createOffer
bc.Printf("Received init_webrtc request -> relay to worker: %s", bc.WorkerID)
// relay request to target worker
// worker creates a PeerConnection, and createOffer
// send SDP back to browser
resp.SessionID = bc.SessionID
wc, ok := o.workerClients[bc.WorkerID]
if !ok {
return cws.EmptyPacket
}
sdp := wc.SyncSend(resp)
bc.Println("Received SDP from worker -> sending back to browser")
return sdp
}
}
func (bc *BrowserClient) handleAnswer(o *Server) cws.PacketHandler {
return func(resp cws.WSPacket) (req cws.WSPacket) {
// contains SDP of browser createAnswer
// forward to worker
bc.Println("Received browser answered SDP -> relay to worker")
resp.SessionID = bc.SessionID
wc, ok := o.workerClients[bc.WorkerID]
if !ok {
return cws.EmptyPacket
}
wc.Send(resp, nil)
// no need to response
return cws.EmptyPacket
}
}
func (bc *BrowserClient) handleIceCandidate(o *Server) cws.PacketHandler {
return func(resp cws.WSPacket) (req cws.WSPacket) {
// contains ICE candidate of browser
// forward to worker
bc.Println("Received IceCandidate from browser -> relay to worker")
resp.SessionID = bc.SessionID
wc, ok := o.workerClients[bc.WorkerID]
if !ok {
return cws.EmptyPacket
}
wc.Send(resp, nil)
return cws.EmptyPacket
}
}
func (bc *BrowserClient) handleGameStart(o *Server) cws.PacketHandler {
return func(resp cws.WSPacket) (req cws.WSPacket) {
bc.Println("Received start request from a browser -> relay to worker")
// TODO: Async
resp.SessionID = bc.SessionID
wc, ok := o.workerClients[bc.WorkerID]
if !ok {
return cws.EmptyPacket
}
// +injects game data into the original game request
gameStartCall, err := newGameStartCall(resp.RoomID, resp.Data, o.library)
if err != nil {
return cws.EmptyPacket
}
if packet, err := gameStartCall.To(); err != nil {
return cws.EmptyPacket
} else {
resp.Data = packet
}
workerResp := wc.SyncSend(resp)
// Response from worker contains initialized roomID. Set roomID to the session
bc.RoomID = workerResp.RoomID
bc.Println("Received room response from browser: ", workerResp.RoomID)
return workerResp
}
}
func (bc *BrowserClient) handleGameQuit(o *Server) cws.PacketHandler {
return func(resp cws.WSPacket) (req cws.WSPacket) {
bc.Println("Received quit request from a browser -> relay to worker")
// TODO: Async
resp.SessionID = bc.SessionID
wc, ok := o.workerClients[bc.WorkerID]
if !ok {
return cws.EmptyPacket
}
// Send but, waiting
wc.SyncSend(resp)
return cws.EmptyPacket
}
}
func (bc *BrowserClient) handleGameSave(o *Server) cws.PacketHandler {
return func(resp cws.WSPacket) (req cws.WSPacket) {
bc.Println("Received save request from a browser -> relay to worker")
// TODO: Async
resp.SessionID = bc.SessionID
resp.RoomID = bc.RoomID
wc, ok := o.workerClients[bc.WorkerID]
if !ok {
return cws.EmptyPacket
}
resp = wc.SyncSend(resp)
return resp
}
}
func (bc *BrowserClient) handleGameLoad(o *Server) cws.PacketHandler {
return func(resp cws.WSPacket) (req cws.WSPacket) {
bc.Println("Received load request from a browser -> relay to worker")
// TODO: Async
resp.SessionID = bc.SessionID
resp.RoomID = bc.RoomID
wc, ok := o.workerClients[bc.WorkerID]
if !ok {
return cws.EmptyPacket
}
resp = wc.SyncSend(resp)
return resp
}
}
func (bc *BrowserClient) handleGamePlayerSelect(o *Server) cws.PacketHandler {
return func(resp cws.WSPacket) (req cws.WSPacket) {
bc.Println("Received update player index request from a browser -> relay to worker")
// TODO: Async
resp.SessionID = bc.SessionID
resp.RoomID = bc.RoomID
wc, ok := o.workerClients[bc.WorkerID]
if !ok {
return cws.EmptyPacket
}
resp = wc.SyncSend(resp)
return resp
}
}
func (bc *BrowserClient) handleGameMultitap(o *Server) cws.PacketHandler {
return func(resp cws.WSPacket) (req cws.WSPacket) {
bc.Println("Received multitap request from a browser -> relay to worker")
// TODO: Async
resp.SessionID = bc.SessionID
resp.RoomID = bc.RoomID
wc, ok := o.workerClients[bc.WorkerID]
if !ok {
return cws.EmptyPacket
}
resp = wc.SyncSend(resp)
return resp
}
}
// newGameStartCall gathers data for a new game start call of the worker
func newGameStartCall(roomId string, data string, library games.GameLibrary) (api.GameStartCall, error) {
request := api.GameStartRequest{}
if err := request.From(data); err != nil {
return api.GameStartCall{}, errors.New("invalid request")
}
// the name of the game either in the `room id` field or
// it's in the initial request
game := request.GameName
if roomId != "" {
// ! should be moved into coordinator
name := room.GetGameNameFromRoomID(roomId)
if name == "" {
return api.GameStartCall{}, errors.New("couldn't decode game name from the room id")
}
game = name
}
gameInfo := library.FindGameByName(game)
if gameInfo.Path == "" {
return api.GameStartCall{}, fmt.Errorf("couldn't find game info for the game %v", game)
}
return api.GameStartCall{
Name: gameInfo.Name,
Path: gameInfo.Path,
Type: gameInfo.Type,
}, nil
}

View file

@ -8,10 +8,9 @@ import (
"github.com/gorilla/websocket"
)
const pingServer = "%s://%s/echo"
type WorkerClient struct {
*cws.Client
WorkerID string
Address string // ip address of worker
// public server used for ping check (Cannot use worker address because they are not publicly exposed)
@ -21,7 +20,8 @@ type WorkerClient struct {
Zone string
}
// NewWorkerClient returns a client connecting to worker. This connection exchanges information between workers and server
// NewWorkerClient returns a client connecting to worker.
// This connection exchanges information between workers and server.
func NewWorkerClient(c *websocket.Conn, workerID string) *WorkerClient {
return &WorkerClient{
Client: cws.NewClient(c),
@ -30,68 +30,10 @@ func NewWorkerClient(c *websocket.Conn, workerID string) *WorkerClient {
}
}
// Register new log
func (wc *WorkerClient) Printf(format string, args ...interface{}) {
newFmt := fmt.Sprintf("Worker %s] %s", wc.WorkerID, format)
log.Printf(newFmt, args...)
log.Printf(fmt.Sprintf("Worker %s] %s", wc.WorkerID, format), args...)
}
func (wc *WorkerClient) Println(args ...interface{}) {
msg := fmt.Sprintf("Worker %s] %s", wc.WorkerID, fmt.Sprint(args...))
log.Println(msg)
}
// RouteWorker are all routes server received from worker
func (o *Server) RouteWorker(wc *WorkerClient) {
// registerRoom event from a worker, when worker created a new room.
// RoomID is global so it is managed by coordinator.
wc.Receive("registerRoom", func(resp cws.WSPacket) cws.WSPacket {
log.Printf("Coordinator: Received registerRoom room %s from worker %s", resp.Data, wc.WorkerID)
o.roomToWorker[resp.Data] = wc.WorkerID
log.Printf("Coordinator: Current room list is: %+v", o.roomToWorker)
return cws.WSPacket{
ID: "registerRoom",
}
})
// closeRoom event from a worker, when worker close a room
wc.Receive("closeRoom", func(resp cws.WSPacket) cws.WSPacket {
log.Printf("Coordinator: Received closeRoom room %s from worker %s", resp.Data, wc.WorkerID)
delete(o.roomToWorker, resp.Data)
log.Printf("Coordinator: Current room list is: %+v", o.roomToWorker)
return cws.WSPacket{
ID: "closeRoom",
}
})
// getRoom returns the server ID based on requested roomID.
wc.Receive("getRoom", func(resp cws.WSPacket) cws.WSPacket {
log.Println("Coordinator: Received a getroom request")
log.Println("Result: ", o.roomToWorker[resp.Data])
return cws.WSPacket{
ID: "getRoom",
Data: o.roomToWorker[resp.Data],
}
})
wc.Receive("heartbeat", func(resp cws.WSPacket) cws.WSPacket {
return resp
})
/* WebRTC */
wc.Receive("candidate", func(resp cws.WSPacket) cws.WSPacket {
wc.Println("Received IceCandidate from worker -> relay to browser")
bc, ok := o.browserClients[resp.SessionID]
if ok {
// Remove SessionID while sending back to browser
resp.SessionID = ""
bc.Send(resp, nil)
} else {
wc.Println("Error: unknown SessionID:", resp.SessionID)
}
return cws.EmptyPacket
})
log.Println(fmt.Sprintf("Worker %s] %s", wc.WorkerID, fmt.Sprint(args...)))
}

View file

@ -1,29 +1,11 @@
package api
import (
"encoding/json"
)
import "encoding/json"
// This list of postfixes is used in the API:
// - *Request postfix denotes clients calls (i.e. from a browser to the HTTP-server).
// - *Call postfix denotes IPC calls (from the coordinator to a worker).
type GameStartRequest struct {
GameName string `json:"game_name"`
IsMobile bool `json:"is_mobile"`
}
func (packet *GameStartRequest) From(data string) error { return from(packet, data) }
type GameStartCall struct {
Name string `json:"name"`
Path string `json:"path"`
Type string `json:"type"`
}
func (packet *GameStartCall) From(data string) error { return from(packet, data) }
func (packet *GameStartCall) To() (string, error) { return to(packet) }
func from(source interface{}, data string) error {
err := json.Unmarshal([]byte(data), source)
if err != nil {

View file

@ -0,0 +1,51 @@
package api
import "github.com/giongto35/cloud-game/v2/pkg/cws"
const (
ConfigRequest = "config_request"
GetRoom = "get_room"
CloseRoom = "close_room"
RegisterRoom = "register_room"
Heartbeat = "heartbeat"
IceCandidate = "ice_candidate"
NoData = ""
InitWebrtc = "init_webrtc"
Answer = "answer"
GameStart = "start"
GameQuit = "quit"
GameSave = "save"
GameLoad = "load"
GamePlayerSelect = "player_index"
GameMultitap = "multitap"
)
type GameStartRequest struct {
GameName string `json:"game_name"`
IsMobile bool `json:"is_mobile"`
}
func (packet *GameStartRequest) From(data string) error { return from(packet, data) }
type GameStartCall struct {
Name string `json:"name"`
Path string `json:"path"`
Type string `json:"type"`
}
func (packet *GameStartCall) From(data string) error { return from(packet, data) }
func (packet *GameStartCall) To() (string, error) { return to(packet) }
//
// *** packets ***
//
func ConfigPacket() cws.WSPacket { return cws.WSPacket{ID: ConfigRequest} }
func RegisterRoomPacket(data string) cws.WSPacket { return cws.WSPacket{ID: RegisterRoom, Data: data} }
func GetRoomPacket(data string) cws.WSPacket { return cws.WSPacket{ID: GetRoom, Data: data} }
func CloseRoomPacket(data string) cws.WSPacket { return cws.WSPacket{ID: CloseRoom, Data: data} }
func IceCandidatePacket(data string, sessionId string) cws.WSPacket {
return cws.WSPacket{ID: IceCandidate, Data: data, SessionID: sessionId}
}

21
pkg/cws/api/worker.go Normal file
View file

@ -0,0 +1,21 @@
package api
import "github.com/giongto35/cloud-game/v2/pkg/cws"
const (
ServerId = "server_id"
TerminateSession = "terminateSession"
)
type ConfPushCall struct {
Data []byte `json:"data"`
}
func (packet *ConfPushCall) From(data string) error { return from(packet, data) }
func (packet *ConfPushCall) To() (string, error) { return to(packet) }
func ServerIdPacket(id string) cws.WSPacket { return cws.WSPacket{ID: ServerId, Data: id} }
func ConfigRequestPacket(conf []byte) cws.WSPacket { return cws.WSPacket{Data: string(conf)} }
func TerminateSessionPacket(sessionId string) cws.WSPacket {
return cws.WSPacket{ID: TerminateSession, SessionID: sessionId}
}

View file

@ -11,33 +11,37 @@ import (
"github.com/gorilla/websocket"
)
type Client struct {
id string
type (
Client struct {
id string
conn *websocket.Conn
conn *websocket.Conn
sendLock sync.Mutex
// sendCallback is callback based on packetID
sendCallback map[string]func(req WSPacket)
sendCallbackLock sync.Mutex
// recvCallback is callback when receive based on ID of the packet
recvCallback map[string]func(req WSPacket)
sendLock sync.Mutex
// sendCallback is callback based on packetID
sendCallback map[string]func(req WSPacket)
sendCallbackLock sync.Mutex
// recvCallback is callback when receive based on ID of the packet
recvCallback map[string]func(req WSPacket)
Done chan struct{}
}
Done chan struct{}
}
type WSPacket struct {
ID string `json:"id"`
// TODO: Make Data generic: map[string]interface{} for more usecases
Data string `json:"data"`
WSPacket struct {
ID string `json:"id"`
// TODO: Make Data generic: map[string]interface{} for more usecases
Data string `json:"data"`
RoomID string `json:"room_id"`
PlayerIndex int `json:"player_index"`
RoomID string `json:"room_id"`
PlayerIndex int `json:"player_index"`
PacketID string `json:"packet_id"`
// Globally ID of a browser session
SessionID string `json:"session_id"`
}
PacketID string `json:"packet_id"`
// Globally ID of a browser session
SessionID string `json:"session_id"`
}
PacketHandler func(resp WSPacket) (req WSPacket)
)
var EmptyPacket = WSPacket{}
@ -93,7 +97,7 @@ func (c *Client) Send(request WSPacket, callback func(response WSPacket)) {
}
// Receive receive and response back
func (c *Client) Receive(id string, f func(response WSPacket) (request WSPacket)) {
func (c *Client) Receive(id string, f PacketHandler) {
c.recvCallback[id] = func(response WSPacket) {
defer func() {
if err := recover(); err != nil {
@ -161,6 +165,7 @@ func (c *Client) Heartbeat() {
return
default:
}
// !to resolve cycle deps
c.Send(WSPacket{ID: "heartbeat"}, nil)
}
}

View file

@ -61,7 +61,9 @@ func GetEmulatorMock(room string, system string) *EmulatorMock {
configPath := rootPath + "configs/"
var conf worker.Config
config.LoadConfig(&conf, configPath)
if err := config.LoadConfig(&conf, configPath); err != nil {
panic(err)
}
meta := conf.Emulator.GetLibretroCoreConfig(system)

48
pkg/lock/lock.go Normal file
View file

@ -0,0 +1,48 @@
package lock
import (
"sync/atomic"
"time"
)
type TimeLock struct {
l chan struct{}
locked int32
}
// NewLock returns new lock (mutex) with a timeout option.
func NewLock() *TimeLock {
return &TimeLock{l: make(chan struct{}, 1)}
}
// Lock unconditionally blocks the execution.
func (tl *TimeLock) Lock() {
if tl.isLocked() {
return
}
tl.lock()
<-tl.l
}
// LockFor blocks the execution at most for
// the given period of time.
func (tl *TimeLock) LockFor(d time.Duration) {
tl.lock()
select {
case <-tl.l:
case <-time.After(d):
}
}
// Unlock removes the current block if any.
func (tl *TimeLock) Unlock() {
if !tl.isLocked() {
return
}
tl.unlock()
tl.l <- struct{}{}
}
func (tl *TimeLock) isLocked() bool { return atomic.LoadInt32(&tl.locked) == 1 }
func (tl *TimeLock) lock() { atomic.StoreInt32(&tl.locked, 1) }
func (tl *TimeLock) unlock() { atomic.StoreInt32(&tl.locked, 0) }

38
pkg/lock/lock_test.go Normal file
View file

@ -0,0 +1,38 @@
package lock
import (
"testing"
"time"
)
func TestLock(t *testing.T) {
a := 1
lock := NewLock()
wait := time.Millisecond * 10
lock.Unlock()
lock.Unlock()
lock.Unlock()
go func(timeLock *TimeLock) {
time.Sleep(time.Second * 1)
lock.Unlock()
}(lock)
lock.LockFor(time.Second * 30)
lock.LockFor(wait)
lock.LockFor(wait)
lock.LockFor(wait)
lock.LockFor(wait)
lock.LockFor(time.Millisecond * 10)
go func(timeLock *TimeLock) {
time.Sleep(time.Millisecond * 1)
lock.Unlock()
}(lock)
lock.Lock()
a -= 1
if a != 0 {
t.Errorf("lock test failed because a != 0")
}
}

View file

@ -7,39 +7,26 @@ import (
"net/http/pprof"
"strings"
"github.com/giongto35/cloud-game/v2/pkg/config/monitoring"
config "github.com/giongto35/cloud-game/v2/pkg/config/worker"
"github.com/golang/glog"
"github.com/prometheus/client_golang/prometheus/promhttp"
)
type ServerMonitoringConfig struct {
Port int
URLPrefix string
MetricEnabled bool `json:"metric_enabled"`
ProfilingEnabled bool `json:"profiling_enabled"`
}
type ServerMonitoring struct {
cfg ServerMonitoringConfig
cfg monitoring.ServerMonitoringConfig
tag string
server http.Server
}
func NewServerMonitoring(cfg ServerMonitoringConfig) *ServerMonitoring {
if cfg.Port == 0 {
cfg.Port = 6365
}
func NewServerMonitoring(cfg monitoring.ServerMonitoringConfig, tag string) *ServerMonitoring {
return &ServerMonitoring{cfg: validate(&cfg), tag: tag}
}
if len(cfg.URLPrefix) > 0 {
cfg.URLPrefix = strings.TrimSpace(cfg.URLPrefix)
if !strings.HasPrefix(cfg.URLPrefix, "/") {
cfg.URLPrefix = "/" + cfg.URLPrefix
}
if strings.HasSuffix(cfg.URLPrefix, "/") {
cfg.URLPrefix = strings.TrimSuffix(cfg.URLPrefix, "/")
}
}
return &ServerMonitoring{cfg: cfg}
func (sm *ServerMonitoring) Init(conf interface{}) error {
cfg := conf.(config.Config).Worker.Monitoring
sm.cfg = validate(&cfg)
return nil
}
func (sm *ServerMonitoring) Run() error {
@ -50,11 +37,11 @@ func (sm *ServerMonitoring) Run() error {
Addr: fmt.Sprintf(":%d", sm.cfg.Port),
Handler: monitoringServerMux,
}
glog.Infoln("Starting monitoring server at", srv.Addr)
glog.Infof("[%v] Starting monitoring server at %v", sm.tag, srv.Addr)
if sm.cfg.ProfilingEnabled {
pprofPath := fmt.Sprintf("%s/debug/pprof", sm.cfg.URLPrefix)
glog.Infoln("Profiling is enabled at", srv.Addr+pprofPath)
glog.Infof("[%v] Profiling is enabled at %v", sm.tag, srv.Addr+pprofPath)
monitoringServerMux.Handle(pprofPath+"/", http.HandlerFunc(pprof.Index))
monitoringServerMux.Handle(pprofPath+"/cmdline", http.HandlerFunc(pprof.Cmdline))
monitoringServerMux.Handle(pprofPath+"/profile", http.HandlerFunc(pprof.Profile))
@ -72,17 +59,34 @@ func (sm *ServerMonitoring) Run() error {
if sm.cfg.MetricEnabled {
metricPath := fmt.Sprintf("%s/metrics", sm.cfg.URLPrefix)
glog.Infoln("Prometheus metric is enabled at", srv.Addr+metricPath)
glog.Infof("[%v] Prometheus metric is enabled at %v", sm.tag, srv.Addr+metricPath)
monitoringServerMux.Handle(metricPath, promhttp.Handler())
}
sm.server = srv
return srv.ListenAndServe()
}
glog.Infoln("Monitoring server is disabled via config")
return nil
}
func (sm *ServerMonitoring) Shutdown(ctx context.Context) error {
glog.Infoln("Shutting down monitoring server")
glog.Infof("[%v] Shutting down monitoring server", sm.tag)
return sm.server.Shutdown(ctx)
}
func validate(conf *monitoring.ServerMonitoringConfig) monitoring.ServerMonitoringConfig {
if conf.Port == 0 {
conf.Port = 6365
}
if len(conf.URLPrefix) > 0 {
conf.URLPrefix = strings.TrimSpace(conf.URLPrefix)
if !strings.HasPrefix(conf.URLPrefix, "/") {
conf.URLPrefix = "/" + conf.URLPrefix
}
if strings.HasSuffix(conf.URLPrefix, "/") {
conf.URLPrefix = strings.TrimSuffix(conf.URLPrefix, "/")
}
}
return *conf
}

9
pkg/server/server.go Normal file
View file

@ -0,0 +1,9 @@
package server
import "context"
type Server interface {
Init(conf interface{}) error
Run() error
Shutdown(ctx context.Context) error
}

View file

@ -1,18 +1,7 @@
package worker
import (
"encoding/json"
"log"
"strconv"
"github.com/giongto35/cloud-game/v2/pkg/api"
webrtcConfig "github.com/giongto35/cloud-game/v2/pkg/config/webrtc"
"github.com/giongto35/cloud-game/v2/pkg/cws"
"github.com/giongto35/cloud-game/v2/pkg/encoder"
"github.com/giongto35/cloud-game/v2/pkg/games"
"github.com/giongto35/cloud-game/v2/pkg/util"
"github.com/giongto35/cloud-game/v2/pkg/webrtc"
"github.com/giongto35/cloud-game/v2/pkg/worker/room"
"github.com/gorilla/websocket"
)
@ -34,303 +23,3 @@ func NewCoordinatorClient(oc *websocket.Conn) *CoordinatorClient {
}
return oClient
}
// RouteCoordinator are all routes server received from coordinator.
func (h *Handler) RouteCoordinator() {
oClient := h.oClient
/* Coordinator */
// Received from coordinator the serverID
oClient.Receive("serverID", func(response cws.WSPacket) (request cws.WSPacket) {
// Stick session with serverID got from coordinator
log.Println("Received serverID ", response.Data)
h.serverID = response.Data
return cws.EmptyPacket
})
/* WebRTC Connection */
oClient.Receive("initwebrtc", func(resp cws.WSPacket) (req cws.WSPacket) {
log.Println("Received a request to createOffer from browser via coordinator")
peerconnection := webrtc.NewWebRTC().WithConfig(
webrtcConfig.Config{Encoder: h.cfg.Encoder, Webrtc: h.cfg.Webrtc},
)
var initPacket struct {
IsMobile bool `json:"is_mobile"`
}
err := json.Unmarshal([]byte(resp.Data), &initPacket)
if err != nil {
log.Println("Error: Cannot decode json:", err)
return cws.EmptyPacket
}
localSession, err := peerconnection.StartClient(
initPacket.IsMobile,
func(candidate string) {
// send back candidate string to browser
oClient.Send(cws.WSPacket{
ID: "candidate",
Data: candidate,
SessionID: resp.SessionID,
}, nil)
},
)
// localSession, err := peerconnection.StartClient(initPacket.IsMobile, iceCandidates[resp.SessionID])
// h.peerconnections[resp.SessionID] = peerconnection
// Create new sessions when we have new peerconnection initialized
session := &Session{
peerconnection: peerconnection,
}
h.sessions[resp.SessionID] = session
log.Println("Start peerconnection", resp.SessionID)
if err != nil {
log.Println("Error: Cannot create new webrtc session", err)
return cws.EmptyPacket
}
return cws.WSPacket{
ID: "offer",
Data: localSession,
}
})
oClient.Receive("answer", func(resp cws.WSPacket) (req cws.WSPacket) {
log.Println("Received answer SDP from browser")
session := h.getSession(resp.SessionID)
if session != nil {
peerconnection := session.peerconnection
err := peerconnection.SetRemoteSDP(resp.Data)
if err != nil {
log.Println("Error: Cannot set RemoteSDP of client: " + resp.SessionID)
}
} else {
log.Printf("Error: No session for ID: %s\n", resp.SessionID)
}
return cws.EmptyPacket
})
oClient.Receive("candidate", func(resp cws.WSPacket) (req cws.WSPacket) {
log.Println("Received remote Ice Candidate from browser")
session := h.getSession(resp.SessionID)
if session != nil {
peerconnection := session.peerconnection
err := peerconnection.AddCandidate(resp.Data)
if err != nil {
log.Println("Error: Cannot add IceCandidate of client: " + resp.SessionID)
}
} else {
log.Printf("Error: No session for ID: %s\n", resp.SessionID)
}
return cws.EmptyPacket
})
/* Game Logic */
oClient.Receive("start", func(resp cws.WSPacket) (req cws.WSPacket) {
log.Println("Received a start request from coordinator")
session := h.getSession(resp.SessionID)
if session == nil {
log.Printf("Error: No session for ID: %s\n", resp.SessionID)
return cws.EmptyPacket
}
peerconnection := session.peerconnection
// TODO: Standardize for all types of packet. Make WSPacket generic
startPacket := api.GameStartCall{}
if err := startPacket.From(resp.Data); err != nil {
return cws.EmptyPacket
}
gameMeta := games.GameMetadata{
Name: startPacket.Name,
Type: startPacket.Type,
Path: startPacket.Path,
}
room := h.startGameHandler(gameMeta, resp.RoomID, resp.PlayerIndex, peerconnection, util.GetVideoEncoder(false))
session.RoomID = room.ID
// TODO: can data race
h.rooms[room.ID] = room
return cws.WSPacket{
ID: "start",
RoomID: room.ID,
}
})
oClient.Receive("quit", func(resp cws.WSPacket) (req cws.WSPacket) {
log.Println("Received a quit request from coordinator")
session := h.getSession(resp.SessionID)
if session != nil {
room := h.getRoom(session.RoomID)
// Defensive coding, check if the peerconnection is in room
if room.IsPCInRoom(session.peerconnection) {
h.detachPeerConn(session.peerconnection)
}
} else {
log.Printf("Error: No session for ID: %s\n", resp.SessionID)
}
return cws.EmptyPacket
})
oClient.Receive("save", func(resp cws.WSPacket) (req cws.WSPacket) {
log.Println("Received a save game from coordinator")
log.Println("RoomID:", resp.RoomID)
req.ID = "save"
req.Data = "ok"
if resp.RoomID != "" {
room := h.getRoom(resp.RoomID)
if room == nil {
return
}
err := room.SaveGame()
if err != nil {
log.Println("[!] Cannot save game state: ", err)
req.Data = "error"
}
} else {
req.Data = "error"
}
return req
})
oClient.Receive("load", func(resp cws.WSPacket) (req cws.WSPacket) {
log.Println("Received a load game from coordinator")
log.Println("Loading game state")
req.ID = "load"
req.Data = "ok"
if resp.RoomID != "" {
room := h.getRoom(resp.RoomID)
err := room.LoadGame()
if err != nil {
log.Println("[!] Cannot load game state: ", err)
req.Data = "error"
}
} else {
req.Data = "error"
}
return req
})
oClient.Receive("playerIdx", func(resp cws.WSPacket) (req cws.WSPacket) {
log.Println("Received an update player index event from coordinator")
req.ID = "playerIdx"
room := h.getRoom(resp.RoomID)
session := h.getSession(resp.SessionID)
idx, err := strconv.Atoi(resp.Data)
log.Printf("Got session %v and room %v", session, room)
if room != nil && session != nil && err == nil {
room.UpdatePlayerIndex(session.peerconnection, idx)
req.Data = strconv.Itoa(idx)
} else {
req.Data = "error"
}
return req
})
oClient.Receive("multitap", func(resp cws.WSPacket) (req cws.WSPacket) {
log.Println("Received a multitap toggle from coordinator")
req.ID = "multitap"
req.Data = "ok"
if resp.RoomID != "" {
room := h.getRoom(resp.RoomID)
err := room.ToggleMultitap()
if err != nil {
log.Println("[!] Could not toggle multitap state: ", err)
req.Data = "error"
}
} else {
req.Data = "error"
}
return req
})
oClient.Receive("terminateSession", func(resp cws.WSPacket) (req cws.WSPacket) {
log.Println("Received a terminate session ", resp.SessionID)
session := h.getSession(resp.SessionID)
if session != nil {
session.Close()
delete(h.sessions, resp.SessionID)
h.detachPeerConn(session.peerconnection)
} else {
log.Printf("Error: No session for ID: %s\n", resp.SessionID)
}
return cws.EmptyPacket
})
}
func getServerIDOfRoom(oc *CoordinatorClient, roomID string) string {
log.Println("Request coordinator roomID ", roomID)
packet := oc.SyncSend(
cws.WSPacket{
ID: "getRoom",
Data: roomID,
},
)
log.Println("Received roomID from coordinator ", packet.Data)
return packet.Data
}
// startGameHandler starts a game if roomID is given, if not create new room
func (h *Handler) startGameHandler(game games.GameMetadata, existedRoomID string, playerIndex int, peerconnection *webrtc.WebRTC, videoCodec encoder.VideoCodec) *room.Room {
log.Printf("Loading game: %v\n", game.Name)
// If we are connecting to coordinator, request corresponding serverID based on roomID
// TODO: check if existedRoomID is in the current server
room := h.getRoom(existedRoomID)
// If room is not running
if room == nil {
log.Println("Got Room from local ", room, " ID: ", existedRoomID)
// Create new room and update player index
room = h.createNewRoom(game, existedRoomID, videoCodec)
room.UpdatePlayerIndex(peerconnection, playerIndex)
// Wait for done signal from room
go func() {
<-room.Done
h.detachRoom(room.ID)
// send signal to coordinator that the room is closed, coordinator will remove that room
h.oClient.Send(cws.WSPacket{
ID: "closeRoom",
Data: room.ID,
}, nil)
}()
}
// Attach peerconnection to room. If PC is already in room, don't detach
log.Println("Is PC in room", room.IsPCInRoom(peerconnection))
if !room.IsPCInRoom(peerconnection) {
h.detachPeerConn(peerconnection)
room.AddConnectionToRoom(peerconnection)
}
// Register room to coordinator if we are connecting to coordinator
if room != nil && h.oClient != nil {
h.oClient.Send(cws.WSPacket{
ID: "registerRoom",
Data: room.ID,
}, nil)
}
return room
}

View file

@ -9,6 +9,7 @@ import (
"time"
"github.com/giongto35/cloud-game/v2/pkg/config/worker"
"github.com/giongto35/cloud-game/v2/pkg/cws/api"
"github.com/giongto35/cloud-game/v2/pkg/emulator/libretro/manager/remotehttp"
"github.com/giongto35/cloud-game/v2/pkg/encoder"
"github.com/giongto35/cloud-game/v2/pkg/environment"
@ -37,10 +38,12 @@ type Handler struct {
onlineStorage *storage.Client
// sessions handles all sessions server is handler (key is sessionID)
sessions map[string]*Session
w *Worker
}
// NewHandler returns a new server
func NewHandler(cfg worker.Config) *Handler {
func NewHandler(cfg worker.Config, wrk *Worker) *Handler {
// Create offline storage folder
createOfflineStorage()
@ -52,29 +55,38 @@ func NewHandler(cfg worker.Config) *Handler {
coordinatorHost: cfg.Worker.Network.CoordinatorAddress,
cfg: cfg,
onlineStorage: onlineStorage,
w: wrk,
}
}
// Run starts a Handler running logic
func (h *Handler) Run() {
conf := h.cfg.Worker.Network
for {
conf := h.cfg.Worker.Network
oClient, err := setupCoordinatorConnection(conf.CoordinatorAddress, conf.Zone, h.cfg)
conn, err := setupCoordinatorConnection(conf.CoordinatorAddress, conf.Zone, h.cfg)
if err != nil {
log.Printf("Cannot connect to coordinator. %v Retrying...", err)
time.Sleep(time.Second)
continue
}
log.Printf("[worker] connected to: %v", conf.CoordinatorAddress)
h.oClient = oClient
log.Println("Connected to coordinator successfully.", oClient, err)
h.oClient = conn
go h.oClient.Heartbeat()
h.RouteCoordinator()
h.routes()
h.oClient.Listen()
// If cannot listen, reconnect to coordinator
}
}
func (h *Handler) RequestConfig() {
log.Printf("[worker] asking for a config...")
response := h.oClient.SyncSend(api.ConfigPacket())
conf := worker.EmptyConfig()
conf.Deserialize([]byte(response.Data))
log.Printf("[worker] pulled config: %+v", conf)
}
func (h *Handler) Prepare() {
if !h.cfg.Emulator.Libretro.Cores.Repo.Sync {
return

View file

@ -0,0 +1,296 @@
package worker
import (
"encoding/json"
"log"
"strconv"
webrtcConfig "github.com/giongto35/cloud-game/v2/pkg/config/webrtc"
"github.com/giongto35/cloud-game/v2/pkg/cws"
"github.com/giongto35/cloud-game/v2/pkg/cws/api"
"github.com/giongto35/cloud-game/v2/pkg/encoder"
"github.com/giongto35/cloud-game/v2/pkg/games"
"github.com/giongto35/cloud-game/v2/pkg/util"
"github.com/giongto35/cloud-game/v2/pkg/webrtc"
"github.com/giongto35/cloud-game/v2/pkg/worker/room"
)
func (h *Handler) handleServerId() cws.PacketHandler {
return func(resp cws.WSPacket) (req cws.WSPacket) {
log.Printf("[worker] new id: %s", resp.Data)
h.serverID = resp.Data
// unlock worker if it's locked
h.w.lock.Unlock()
return
}
}
func (h *Handler) handleTerminateSession() cws.PacketHandler {
return func(resp cws.WSPacket) (req cws.WSPacket) {
log.Println("Received a terminate session ", resp.SessionID)
session := h.getSession(resp.SessionID)
if session != nil {
session.Close()
delete(h.sessions, resp.SessionID)
h.detachPeerConn(session.peerconnection)
} else {
log.Printf("Error: No session for ID: %s\n", resp.SessionID)
}
return cws.EmptyPacket
}
}
func (h *Handler) handleInitWebrtc() cws.PacketHandler {
return func(resp cws.WSPacket) (req cws.WSPacket) {
log.Println("Received a request to createOffer from browser via coordinator")
peerconnection := webrtc.NewWebRTC().WithConfig(
webrtcConfig.Config{Encoder: h.cfg.Encoder, Webrtc: h.cfg.Webrtc},
)
var initPacket struct {
IsMobile bool `json:"is_mobile"`
}
err := json.Unmarshal([]byte(resp.Data), &initPacket)
if err != nil {
log.Println("Error: Cannot decode json:", err)
return cws.EmptyPacket
}
localSession, err := peerconnection.StartClient(
initPacket.IsMobile,
// send back candidate string to browser
func(cd string) { h.oClient.Send(api.IceCandidatePacket(cd, resp.SessionID), nil) },
)
// localSession, err := peerconnection.StartClient(initPacket.IsMobile, iceCandidates[resp.SessionID])
// h.peerconnections[resp.SessionID] = peerconnection
// Create new sessions when we have new peerconnection initialized
session := &Session{
peerconnection: peerconnection,
}
h.sessions[resp.SessionID] = session
log.Println("Start peerconnection", resp.SessionID)
if err != nil {
log.Println("Error: Cannot create new webrtc session", err)
return cws.EmptyPacket
}
return cws.WSPacket{
ID: "offer",
Data: localSession,
}
}
}
func (h *Handler) handleAnswer() cws.PacketHandler {
return func(resp cws.WSPacket) (req cws.WSPacket) {
log.Println("Received answer SDP from browser")
session := h.getSession(resp.SessionID)
if session != nil {
peerconnection := session.peerconnection
err := peerconnection.SetRemoteSDP(resp.Data)
if err != nil {
log.Println("Error: Cannot set RemoteSDP of client: " + resp.SessionID)
}
} else {
log.Printf("Error: No session for ID: %s\n", resp.SessionID)
}
return cws.EmptyPacket
}
}
func (h *Handler) handleIceCandidate() cws.PacketHandler {
return func(resp cws.WSPacket) (req cws.WSPacket) {
log.Println("Received remote Ice Candidate from browser")
session := h.getSession(resp.SessionID)
if session != nil {
peerconnection := session.peerconnection
err := peerconnection.AddCandidate(resp.Data)
if err != nil {
log.Println("Error: Cannot add IceCandidate of client: " + resp.SessionID)
}
} else {
log.Printf("Error: No session for ID: %s\n", resp.SessionID)
}
return cws.EmptyPacket
}
}
func (h *Handler) handleGameStart() cws.PacketHandler {
return func(resp cws.WSPacket) (req cws.WSPacket) {
log.Println("Received a start request from coordinator")
session := h.getSession(resp.SessionID)
if session == nil {
log.Printf("Error: No session for ID: %s\n", resp.SessionID)
return cws.EmptyPacket
}
peerconnection := session.peerconnection
// TODO: Standardize for all types of packet. Make WSPacket generic
startPacket := api.GameStartCall{}
if err := startPacket.From(resp.Data); err != nil {
return cws.EmptyPacket
}
gameMeta := games.GameMetadata{
Name: startPacket.Name,
Type: startPacket.Type,
Path: startPacket.Path,
}
room := h.startGameHandler(gameMeta, resp.RoomID, resp.PlayerIndex, peerconnection, util.GetVideoEncoder(false))
session.RoomID = room.ID
// TODO: can data race
h.rooms[room.ID] = room
return cws.WSPacket{ID: "start", RoomID: room.ID}
}
}
func (h *Handler) handleGameQuit() cws.PacketHandler {
return func(resp cws.WSPacket) (req cws.WSPacket) {
log.Println("Received a quit request from coordinator")
session := h.getSession(resp.SessionID)
if session != nil {
room := h.getRoom(session.RoomID)
// Defensive coding, check if the peerconnection is in room
if room.IsPCInRoom(session.peerconnection) {
h.detachPeerConn(session.peerconnection)
}
} else {
log.Printf("Error: No session for ID: %s\n", resp.SessionID)
}
return cws.EmptyPacket
}
}
func (h *Handler) handleGameSave() cws.PacketHandler {
return func(resp cws.WSPacket) (req cws.WSPacket) {
log.Println("Received a save game from coordinator")
log.Println("RoomID:", resp.RoomID)
req.ID = api.GameSave
req.Data = "ok"
if resp.RoomID != "" {
room := h.getRoom(resp.RoomID)
if room == nil {
return
}
err := room.SaveGame()
if err != nil {
log.Println("[!] Cannot save game state: ", err)
req.Data = "error"
}
} else {
req.Data = "error"
}
return req
}
}
func (h *Handler) handleGameLoad() cws.PacketHandler {
return func(resp cws.WSPacket) (req cws.WSPacket) {
log.Println("Received a load game from coordinator")
log.Println("Loading game state")
req.ID = api.GameLoad
req.Data = "ok"
if resp.RoomID != "" {
room := h.getRoom(resp.RoomID)
err := room.LoadGame()
if err != nil {
log.Println("[!] Cannot load game state: ", err)
req.Data = "error"
}
} else {
req.Data = "error"
}
return req
}
}
func (h *Handler) handleGamePlayerSelect() cws.PacketHandler {
return func(resp cws.WSPacket) (req cws.WSPacket) {
log.Println("Received an update player index event from coordinator")
req.ID = api.GamePlayerSelect
room := h.getRoom(resp.RoomID)
session := h.getSession(resp.SessionID)
idx, err := strconv.Atoi(resp.Data)
log.Printf("Got session %v and room %v", session, room)
if room != nil && session != nil && err == nil {
room.UpdatePlayerIndex(session.peerconnection, idx)
req.Data = strconv.Itoa(idx)
} else {
req.Data = "error"
}
return req
}
}
func (h *Handler) handleGameMultitap() cws.PacketHandler {
return func(resp cws.WSPacket) (req cws.WSPacket) {
log.Println("Received a multitap toggle from coordinator")
req.ID = api.GameMultitap
req.Data = "ok"
if resp.RoomID != "" {
room := h.getRoom(resp.RoomID)
err := room.ToggleMultitap()
if err != nil {
log.Println("[!] Could not toggle multitap state: ", err)
req.Data = "error"
}
} else {
req.Data = "error"
}
return req
}
}
// startGameHandler starts a game if roomID is given, if not create new room
func (h *Handler) startGameHandler(game games.GameMetadata, existedRoomID string, playerIndex int, peerconnection *webrtc.WebRTC, videoCodec encoder.VideoCodec) *room.Room {
log.Printf("Loading game: %v\n", game.Name)
// If we are connecting to coordinator, request corresponding serverID based on roomID
// TODO: check if existedRoomID is in the current server
room := h.getRoom(existedRoomID)
// If room is not running
if room == nil {
log.Println("Got Room from local ", room, " ID: ", existedRoomID)
// Create new room and update player index
room = h.createNewRoom(game, existedRoomID, videoCodec)
room.UpdatePlayerIndex(peerconnection, playerIndex)
// Wait for done signal from room
go func() {
<-room.Done
h.detachRoom(room.ID)
// send signal to coordinator that the room is closed, coordinator will remove that room
h.oClient.Send(api.CloseRoomPacket(room.ID), nil)
}()
}
// Attach peerconnection to room. If PC is already in room, don't detach
log.Println("Is PC in room", room.IsPCInRoom(peerconnection))
if !room.IsPCInRoom(peerconnection) {
h.detachPeerConn(peerconnection)
room.AddConnectionToRoom(peerconnection)
}
// Register room to coordinator if we are connecting to coordinator
if room != nil && h.oClient != nil {
h.oClient.Send(api.RegisterRoomPacket(room.ID), nil)
}
return room
}

View file

@ -226,7 +226,9 @@ func getRoomMock(cfg roomMockConfig) roomMock {
cfg.game.Path = cfg.gamesPath + cfg.game.Path
var conf worker.Config
config.LoadConfig(&conf, whereIsConfigs)
if err := config.LoadConfig(&conf, whereIsConfigs); err != nil {
panic(err)
}
fixEmulators(&conf, cfg.autoGlContext)
// sync cores
coreManager := remotehttp.NewRemoteHttpManager(conf.Emulator.Libretro)

22
pkg/worker/routes.go Normal file
View file

@ -0,0 +1,22 @@
package worker
import "github.com/giongto35/cloud-game/v2/pkg/cws/api"
func (h *Handler) routes() {
if h.oClient == nil {
return
}
h.oClient.Receive(api.ServerId, h.handleServerId())
h.oClient.Receive(api.TerminateSession, h.handleTerminateSession())
h.oClient.Receive(api.InitWebrtc, h.handleInitWebrtc())
h.oClient.Receive(api.Answer, h.handleAnswer())
h.oClient.Receive(api.IceCandidate, h.handleIceCandidate())
h.oClient.Receive(api.GameStart, h.handleGameStart())
h.oClient.Receive(api.GameQuit, h.handleGameQuit())
h.oClient.Receive(api.GameSave, h.handleGameSave())
h.oClient.Receive(api.GameLoad, h.handleGameLoad())
h.oClient.Receive(api.GamePlayerSelect, h.handleGamePlayerSelect())
h.oClient.Receive(api.GameMultitap, h.handleGameMultitap())
}

121
pkg/worker/server.go Normal file
View file

@ -0,0 +1,121 @@
package worker
import (
"crypto/tls"
"fmt"
"log"
"net/http"
"strconv"
"time"
"github.com/giongto35/cloud-game/v2/pkg/environment"
"golang.org/x/crypto/acme"
"golang.org/x/crypto/acme/autocert"
)
const stagingLEURL = "https://acme-staging-v02.api.letsencrypt.org/directory"
func makeServerFromMux(mux *http.ServeMux) *http.Server {
// set timeouts so that a slow or malicious client doesn't
// hold resources forever
return &http.Server{
ReadTimeout: 5 * time.Second,
WriteTimeout: 5 * time.Second,
IdleTimeout: 120 * time.Second,
Handler: mux,
}
}
func makeHTTPServer() *http.Server {
mux := &http.ServeMux{}
mux.HandleFunc("/echo", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Access-Control-Allow-Origin", "*")
log.Println(w, "echo")
})
return makeServerFromMux(mux)
}
func makeHTTPToHTTPSRedirectServer() *http.Server {
handleRedirect := func(w http.ResponseWriter, r *http.Request) {
newURI := "https://" + r.Host + r.URL.String()
http.Redirect(w, r, newURI, http.StatusFound)
}
mux := &http.ServeMux{}
mux.HandleFunc("/", handleRedirect)
return makeServerFromMux(mux)
}
func (wrk *Worker) spawnServer(port int) {
var certManager *autocert.Manager
var httpsSrv *http.Server
mode := wrk.conf.Environment.Get()
if mode.AnyOf(environment.Production, environment.Staging) {
serverConfig := wrk.conf.Worker.Server
httpsSrv = makeHTTPServer()
httpsSrv.Addr = fmt.Sprintf(":%d", serverConfig.HttpsPort)
if serverConfig.HttpsChain == "" || serverConfig.HttpsKey == "" {
serverConfig.HttpsChain = ""
serverConfig.HttpsKey = ""
var leurl string
if mode == environment.Staging {
leurl = stagingLEURL
} else {
leurl = acme.LetsEncryptURL
}
certManager = &autocert.Manager{
Prompt: autocert.AcceptTOS,
Cache: autocert.DirCache("assets/cache"),
Client: &acme.Client{DirectoryURL: leurl},
}
httpsSrv.TLSConfig = &tls.Config{GetCertificate: certManager.GetCertificate}
}
go func(chain string, key string) {
log.Printf("Starting HTTPS server on %s\n", httpsSrv.Addr)
err := httpsSrv.ListenAndServeTLS(chain, key)
if err != nil {
log.Printf("httpsSrv.ListendAndServeTLS() failed with %s", err)
}
}(serverConfig.HttpsChain, serverConfig.HttpsKey)
}
var httpSrv *http.Server
if mode.AnyOf(environment.Production, environment.Staging) {
httpSrv = makeHTTPToHTTPSRedirectServer()
} else {
httpSrv = makeHTTPServer()
}
if certManager != nil {
httpSrv.Handler = certManager.HTTPHandler(httpSrv.Handler)
}
startServer(httpSrv, port)
}
func startServer(serv *http.Server, startPort int) {
// It's recommend to run one worker on one instance.
// This logic is to make sure more than 1 workers still work
for port, n := startPort, startPort+100; port < n; port++ {
serv.Addr = ":" + strconv.Itoa(port)
err := serv.ListenAndServe()
switch err {
case http.ErrServerClosed:
log.Printf("HTTP(S) server was closed")
return
default:
}
port++
if port == n {
log.Printf("error: couldn't find an open port in range %v-%v\n", startPort, port)
}
}
}

View file

@ -2,177 +2,73 @@ package worker
import (
"context"
"crypto/tls"
"fmt"
"log"
"net/http"
"strconv"
"time"
"github.com/giongto35/cloud-game/v2/pkg/config/worker"
"github.com/giongto35/cloud-game/v2/pkg/environment"
"github.com/giongto35/cloud-game/v2/pkg/lock"
"github.com/giongto35/cloud-game/v2/pkg/monitoring"
"github.com/giongto35/cloud-game/v2/pkg/server"
"github.com/golang/glog"
"golang.org/x/crypto/acme"
"golang.org/x/crypto/acme/autocert"
)
type Worker struct {
ctx context.Context
cfg worker.Config
monitoringServer *monitoring.ServerMonitoring
ctx context.Context
conf worker.Config
servers []server.Server
// to pause initialization
lock *lock.TimeLock
}
const stagingLEURL = "https://acme-staging-v02.api.letsencrypt.org/directory"
func New(ctx context.Context, conf worker.Config) *Worker {
return &Worker{ctx: ctx, conf: conf, lock: lock.NewLock()}
}
func New(ctx context.Context, cfg worker.Config) *Worker {
return &Worker{
ctx: ctx,
cfg: cfg,
monitoringServer: monitoring.NewServerMonitoring(cfg.Worker.Monitoring),
func (wrk *Worker) Run() {
go wrk.init()
wrk.servers = []server.Server{
monitoring.NewServerMonitoring(wrk.conf.Worker.Monitoring, "worker"),
}
wrk.startModules()
}
func (o *Worker) Run() error {
go o.initializeWorker()
go o.RunMonitoringServer()
return nil
}
func (o *Worker) RunMonitoringServer() {
glog.Infoln("Starting monitoring server for overwork")
err := o.monitoringServer.Run()
if err != nil {
glog.Errorf("Failed to start monitoring server, reason %s", err)
}
}
func (o *Worker) Shutdown() {
// !to add a proper HTTP(S) server shutdown (cws/handler bad loop)
if err := o.monitoringServer.Shutdown(o.ctx); err != nil {
glog.Errorln("Failed to shutdown monitoring server")
}
}
func makeServerFromMux(mux *http.ServeMux) *http.Server {
// set timeouts so that a slow or malicious client doesn't
// hold resources forever
return &http.Server{
ReadTimeout: 5 * time.Second,
WriteTimeout: 5 * time.Second,
IdleTimeout: 120 * time.Second,
Handler: mux,
}
}
func makeHTTPServer() *http.Server {
mux := &http.ServeMux{}
mux.HandleFunc("/echo", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Access-Control-Allow-Origin", "*")
log.Println(w, "echo")
})
return makeServerFromMux(mux)
}
func makeHTTPToHTTPSRedirectServer() *http.Server {
handleRedirect := func(w http.ResponseWriter, r *http.Request) {
newURI := "https://" + r.Host + r.URL.String()
http.Redirect(w, r, newURI, http.StatusFound)
}
mux := &http.ServeMux{}
mux.HandleFunc("/", handleRedirect)
return makeServerFromMux(mux)
}
func (o *Worker) spawnServer(port int) {
var certManager *autocert.Manager
var httpsSrv *http.Server
mode := o.cfg.Environment.Get()
if mode.AnyOf(environment.Production, environment.Staging) {
serverConfig := o.cfg.Worker.Server
httpsSrv = makeHTTPServer()
httpsSrv.Addr = fmt.Sprintf(":%d", serverConfig.HttpsPort)
if serverConfig.HttpsChain == "" || serverConfig.HttpsKey == "" {
serverConfig.HttpsChain = ""
serverConfig.HttpsKey = ""
var leurl string
if mode == environment.Staging {
leurl = stagingLEURL
} else {
leurl = acme.LetsEncryptURL
}
certManager = &autocert.Manager{
Prompt: autocert.AcceptTOS,
Cache: autocert.DirCache("assets/cache"),
Client: &acme.Client{DirectoryURL: leurl},
}
httpsSrv.TLSConfig = &tls.Config{GetCertificate: certManager.GetCertificate}
}
go func(chain string, key string) {
fmt.Printf("Starting HTTPS server on %s\n", httpsSrv.Addr)
err := httpsSrv.ListenAndServeTLS(chain, key)
if err != nil {
log.Printf("httpsSrv.ListendAndServeTLS() failed with %s", err)
}
}(serverConfig.HttpsChain, serverConfig.HttpsKey)
}
var httpSrv *http.Server
if mode.AnyOf(environment.Production, environment.Staging) {
httpSrv = makeHTTPToHTTPSRedirectServer()
} else {
httpSrv = makeHTTPServer()
}
if certManager != nil {
httpSrv.Handler = certManager.HTTPHandler(httpSrv.Handler)
}
startServer(httpSrv, port)
}
func startServer(serv *http.Server, startPort int) {
// It's recommend to run one worker on one instance.
// This logic is to make sure more than 1 workers still work
for port, n := startPort, startPort+100; port < n; port++ {
serv.Addr = ":" + strconv.Itoa(port)
err := serv.ListenAndServe()
switch err {
case http.ErrServerClosed:
log.Printf("HTTP(S) server was closed")
return
default:
}
port++
if port == n {
log.Printf("error: couldn't find an open port in range %v-%v\n", startPort, port)
}
}
}
// initializeWorker setup a worker
func (o *Worker) initializeWorker() {
wrk := NewHandler(o.cfg)
func (wrk *Worker) init() {
h := NewHandler(wrk.conf, wrk)
defer func() {
log.Println("Close worker")
wrk.Close()
log.Printf("[worker] Closing handler")
h.Close()
}()
go wrk.Run()
// will block here
wrk.Prepare()
o.spawnServer(o.cfg.Worker.Server.Port)
go h.Run()
if !wrk.conf.Loaded {
wrk.lock.LockFor(time.Second * 10)
h.RequestConfig()
}
h.Prepare()
wrk.spawnServer(wrk.conf.Worker.Server.Port)
}
func (wrk *Worker) startModules() {
glog.Info(wrk.servers)
for _, s := range wrk.servers {
s := s
go func() {
if err := s.Init(wrk.conf); err != nil {
glog.Errorf("failed server init")
return
}
if err := s.Run(); err != nil {
glog.Errorf("failed start server")
}
}()
}
}
// !to add a proper HTTP(S) server shutdown (cws/handler bad loop)
func (wrk *Worker) Shutdown() {
for _, s := range wrk.servers {
if err := s.Shutdown(wrk.ctx); err != nil {
glog.Errorln("failed server shutdown")
}
}
}

View file

@ -85,7 +85,7 @@ package e2e
//
// fmt.Println("Sending offer...")
// client.Send(cws.WSPacket{
// ID: "initwebrtc",
// ID: "init_webrtc",
// Data: gamertc.Encode(offer),
// }, nil)
// fmt.Println("Waiting sdp...")
@ -127,7 +127,7 @@ package e2e
// }
// log.Println("return offer")
// return cws.WSPacket{
// ID: "initwebrtc",
// ID: "init_webrtc",
// Data: gamertc.Encode(offer),
// }
// })

View file

@ -36,7 +36,7 @@ const rtcp = (() => {
inputChannel.onclose = () => log.debug('[rtcp] the input channel has closed');
}
addVoiceStream(connection)
// addVoiceStream(connection)
connection.oniceconnectionstatechange = ice.onIceConnectionStateChange;
connection.onicegatheringstatechange = ice.onIceStateChange;
@ -45,6 +45,10 @@ const rtcp = (() => {
mediaStream.addTrack(event.track);
}
socket.send({
'id': 'init_webrtc',
'data': JSON.stringify({'is_mobile': env.isMobileDevice()}),
});
};
async function addVoiceStream(connection) {
@ -64,17 +68,12 @@ const rtcp = (() => {
} finally {
socket.send({
'id': 'initwebrtc',
'id': 'init_webrtc',
'data': JSON.stringify({'is_mobile': env.isMobileDevice()}),
});
}
}
const popup = (msg) => {
popupBox.html(msg);
popupBox.fadeIn().delay(0).fadeOut();
};
const ice = (() => {
let isGatheringDone = false;
let timeForIceGathering;
@ -89,7 +88,7 @@ const rtcp = (() => {
candidate = JSON.stringify(event.candidate);
log.info(`[rtcp] got ice candidate: ${candidate}`);
socket.send({
'id': 'candidate',
'id': 'ice_candidate',
'data': btoa(candidate),
})
}

View file

@ -63,7 +63,7 @@ const socket = (() => {
// this is offer from worker
event.pub(MEDIA_STREAM_SDP_AVAILABLE, {sdp: data.data});
break;
case 'candidate':
case 'ice_candidate':
event.pub(MEDIA_STREAM_CANDIDATE_ADD, {candidate: data.data});
break;
case 'heartbeat':
@ -78,7 +78,7 @@ const socket = (() => {
case 'load':
event.pub(GAME_LOADED);
break;
case 'playerIdx':
case 'player_index':
event.pub(GAME_PLAYER_IDX, data.data);
break;
case 'checkLatency':
@ -103,7 +103,7 @@ const socket = (() => {
});
const saveGame = () => send({"id": "save", "data": ""});
const loadGame = () => send({"id": "load", "data": ""});
const updatePlayerIndex = (idx) => send({"id": "playerIdx", "data": idx.toString()});
const updatePlayerIndex = (idx) => send({"id": "player_index", "data": idx.toString()});
const startGame = (gameName, isMobile, roomId, playerIndex) => send({
"id": "start",
"data": JSON.stringify({