From facd0fdc01b666f9f13b9437b38e380b4ec3dc8c Mon Sep 17 00:00:00 2001 From: giongto35 Date: Thu, 23 May 2019 19:23:45 +0800 Subject: [PATCH] Worker auto reconnect (#34) * Change join to start * WIP * Use RouteWork to route worker call * Add reconnect * Update main test * Remove fatal * Update test * Remove unnecessary --- .gitignore | 1 + cmd/main.go | 17 +--- cmd/main_test.go | 53 +++------- config/config.go | 6 +- cws/cws.go | 5 +- document/implementation/README.md | 22 +++++ overlord/browser.go | 3 + overlord/handlers.go | 27 +---- overlord/worker.go | 158 +++++------------------------- static/gameboy2.html | 4 +- worker/handlers.go | 46 +++++++-- worker/overlord.go | 2 - 12 files changed, 120 insertions(+), 224 deletions(-) create mode 100644 document/implementation/README.md diff --git a/.gitignore b/.gitignore index 0f1f2478..13b6f2e7 100644 --- a/.gitignore +++ b/.gitignore @@ -47,4 +47,5 @@ Temporary Items ### Production DockerfileProd +key.json prod/ diff --git a/cmd/main.go b/cmd/main.go index a30918ae..6f1179ac 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -22,15 +22,6 @@ const gamePath = "games" // Time allowed to write a message to the peer. var upgrader = websocket.Upgrader{} -func createOverlordConnection() (*websocket.Conn, error) { - c, _, err := websocket.DefaultDialer.Dial(*config.OverlordHost, nil) - if err != nil { - return nil, err - } - - return c, nil -} - // initilizeOverlord setup an overlord server func initilizeOverlord() { overlord := overlord.NewServer() @@ -50,13 +41,7 @@ func initilizeOverlord() { // initializeWorker setup a worker func initializeWorker() { - conn, err := createOverlordConnection() - if err != nil { - log.Println("Cannot connect to overlord") - log.Println("Run as a single server") - } - - worker := worker.NewHandler(conn, *config.IsDebug, gamePath) + worker := worker.NewHandler(*config.OverlordHost, gamePath) defer func() { log.Println("Close worker") diff --git a/cmd/main_test.go b/cmd/main_test.go index aa6c538e..a60bd25b 100644 --- a/cmd/main_test.go +++ b/cmd/main_test.go @@ -32,15 +32,8 @@ func initOverlord() (*httptest.Server, *httptest.Server) { return overlordWorker, overlordBrowser } -func initWorker(t *testing.T, oconn *websocket.Conn) *worker.Handler { +func initWorker(t *testing.T, overlordURL string) *worker.Handler { fmt.Println("Spawn new worker") - handler := worker.NewHandler(oconn, true, testGamePath) - go handler.Run() - //server := httptest.NewServer(http.HandlerFunc(handler.WS)) - return handler -} - -func connectTestOverlordServer(t *testing.T, overlordURL string) *websocket.Conn { if overlordURL == "" { return nil } else { @@ -48,20 +41,18 @@ func connectTestOverlordServer(t *testing.T, overlordURL string) *websocket.Conn fmt.Println("connecting to overlord: ", overlordURL) } - oconn, _, err := websocket.DefaultDialer.Dial(overlordURL, nil) - if err != nil { - t.Fatalf("%v", err) - } + handler := worker.NewHandler(overlordURL, testGamePath) - return oconn + go handler.Run() + time.Sleep(time.Second) + return handler } func initClient(t *testing.T, host string) (client *cws.Client) { // Convert http://127.0.0.1 to ws://127.0.0. u := "ws" + strings.TrimPrefix(host, "http") - // Connect to the overlord - fmt.Println("Connecting to ", u) + fmt.Println("Connecting to", u) ws, _, err := websocket.DefaultDialer.Dial(u, nil) if err != nil { t.Fatalf("%v", err) @@ -157,10 +148,8 @@ func TestSingleServerOneOverlord(t *testing.T) { defer obrowser.Close() defer oworker.Close() - oconn := connectTestOverlordServer(t, oworker.URL) - defer oconn.Close() // Init worker - worker := initWorker(t, oconn) + worker := initWorker(t, oworker.URL) defer worker.Close() // connect overlord @@ -203,14 +192,10 @@ func TestTwoServerOneOverlord(t *testing.T) { defer obrowser.Close() defer oworker.Close() - oconn1 := connectTestOverlordServer(t, oworker.URL) - defer oconn1.Close() - worker1 := initWorker(t, oconn1) + worker1 := initWorker(t, oworker.URL) defer worker1.Close() - oconn2 := connectTestOverlordServer(t, oworker.URL) - defer oconn2.Close() - worker2 := initWorker(t, oconn2) + worker2 := initWorker(t, oworker.URL) defer worker2.Close() client1 := initClient(t, obrowser.URL) @@ -295,9 +280,7 @@ func TestReconnectRoom(t *testing.T) { defer oworker.Close() // Init worker - oconn := connectTestOverlordServer(t, oworker.URL) - defer oconn.Close() - worker := initWorker(t, oconn) + worker := initWorker(t, oworker.URL) client := initClient(t, obrowser.URL) @@ -321,16 +304,14 @@ func TestReconnectRoom(t *testing.T) { log.Println("Closing room and server") client.Close() - oconn.Close() + worker.GetOverlordClient().Close() worker.Close() // Close server and reconnect log.Println("Server respawn") // Init slave server again - oconn = connectTestOverlordServer(t, oworker.URL) - defer oconn.Close() - worker = initWorker(t, oconn) + worker = initWorker(t, oworker.URL) defer worker.Close() client = initClient(t, obrowser.URL) @@ -383,9 +364,7 @@ func TestReconnectRoomNoLocal(t *testing.T) { return } - oconn := connectTestOverlordServer(t, oworker.URL) - defer oconn.Close() - worker := initWorker(t, oconn) + worker := initWorker(t, oworker.URL) client := initClient(t, obrowser.URL) @@ -409,7 +388,7 @@ func TestReconnectRoomNoLocal(t *testing.T) { log.Println("Closing room and server") client.Close() - oconn.Close() + worker.GetOverlordClient().Close() worker.Close() // Remove room on local path := emulator.GetSavePath(saveRoomID) @@ -420,9 +399,7 @@ func TestReconnectRoomNoLocal(t *testing.T) { log.Println("Server respawn") // Init slave server again - oconn = connectTestOverlordServer(t, oworker.URL) - defer oconn.Close() - worker = initWorker(t, oconn) + worker = initWorker(t, oworker.URL) defer worker.Close() client = initClient(t, obrowser.URL) diff --git a/config/config.go b/config/config.go index 99b9382d..8550dcd0 100644 --- a/config/config.go +++ b/config/config.go @@ -1,6 +1,9 @@ package config -import "flag" +import ( + "flag" + "time" +) const defaultoverlord = "ws://localhost:9000/wso" @@ -10,3 +13,4 @@ var Port = flag.String("port", "8000", "Port of the game") var IsMonitor = flag.Bool("monitor", false, "Turn on monitor") var Width = 256 var Height = 240 +var WSWait = 20 * time.Second diff --git a/cws/cws.go b/cws/cws.go index 856bd3d5..d2f75342 100644 --- a/cws/cws.go +++ b/cws/cws.go @@ -6,6 +6,7 @@ import ( "sync" "time" + "github.com/giongto35/cloud-game/config" "github.com/gorilla/websocket" uuid "github.com/satori/go.uuid" ) @@ -84,6 +85,7 @@ func (c *Client) Send(request WSPacket, callback func(response WSPacket)) { } c.sendLock.Lock() + c.conn.SetWriteDeadline(time.Now().Add(config.WSWait)) c.conn.WriteMessage(websocket.TextMessage, data) c.sendLock.Unlock() } @@ -110,8 +112,8 @@ func (c *Client) Receive(id string, f func(response WSPacket) (request WSPacket) if err != nil { log.Println("[!] json marshal error:", err) } - //c.conn.SetWriteDeadline(time.Now().Add(writeWait)) c.sendLock.Lock() + c.conn.SetWriteDeadline(time.Now().Add(config.WSWait)) c.conn.WriteMessage(websocket.TextMessage, resp) c.sendLock.Unlock() } @@ -145,6 +147,7 @@ func (c *Client) Heartbeat() { func (c *Client) Listen() { for { + c.conn.SetReadDeadline(time.Now().Add(config.WSWait)) _, rawMsg, err := c.conn.ReadMessage() if err != nil { log.Println("[!] read:", err) diff --git a/document/implementation/README.md b/document/implementation/README.md new file mode 100644 index 00000000..e1971a00 --- /dev/null +++ b/document/implementation/README.md @@ -0,0 +1,22 @@ +# Web-based Cloud Gaming Service Implementation Document + +## Code structure +. +├── cmd +│ ├── main.go +│ └── main_test.go +├── emulator: emulator internal +│ ├── director.go: coordinator of views +│ └── gameview.go: in game logic +├── overlord: coordinator of workers +├── games: roms list, no code logic +├── static: static file for front end +│ ├── js +│ │ └── ws.js: client logic +│ ├── gameboy.html: frontend with gameboy ui +│ └── index_ws.html: raw frontend without ui +├── cws +│ └── cws.go: socket multiplexer library, used for signalling +├── webrtc +└── worker: integration between emulator + webrtc (communication) + diff --git a/overlord/browser.go b/overlord/browser.go index f1327827..add428a0 100644 --- a/overlord/browser.go +++ b/overlord/browser.go @@ -40,6 +40,9 @@ func (s *Session) RouteBrowser() { resp, ) + log.Println("Overlord: Received sdp request from a worker") + log.Println("Overlord: Sending back sdp to browser") + return sdp }) diff --git a/overlord/handlers.go b/overlord/handlers.go index 112110d8..21d6c52d 100644 --- a/overlord/handlers.go +++ b/overlord/handlers.go @@ -62,7 +62,7 @@ func (o *Server) WSO(w http.ResponseWriter, r *http.Request) { log.Println("Overlord: A new server connected to Overlord", serverID) // Register to workersClients map the client connection - client := NewWorkerClient(c) + client := NewWorkerClient(c, serverID) o.workerClients[serverID] = client defer o.cleanConnection(client, serverID) @@ -74,26 +74,7 @@ func (o *Server) WSO(w http.ResponseWriter, r *http.Request) { }, nil, ) - - // registerRoom event from a server, when server created a new room. - // RoomID is global so it is managed by overlord. - client.Receive("registerRoom", func(resp cws.WSPacket) cws.WSPacket { - log.Println("Overlord: Received registerRoom ", resp.Data, serverID) - o.roomToServer[resp.Data] = serverID - return cws.WSPacket{ - ID: "registerRoom", - } - }) - - // getRoom returns the server ID based on requested roomID. - client.Receive("getRoom", func(resp cws.WSPacket) cws.WSPacket { - log.Println("Overlord: Received a getroom request") - log.Println("Result: ", o.roomToServer[resp.Data]) - return cws.WSPacket{ - ID: "getRoom", - Data: o.roomToServer[resp.Data], - } - }) + o.RouteWorker(client) client.Listen() } @@ -101,7 +82,6 @@ func (o *Server) WSO(w http.ResponseWriter, r *http.Request) { // WSO handles all connections from frontend to overlord func (o *Server) WS(w http.ResponseWriter, r *http.Request) { log.Println("Browser connected to overlord") - //TODO: Add it back defer func() { if r := recover(); r != nil { log.Println("Warn: Something wrong. Recovered in ", r) @@ -120,7 +100,8 @@ func (o *Server) WS(w http.ResponseWriter, r *http.Request) { sessionID := uuid.Must(uuid.NewV4()).String() serverID, err := o.findBestServer() if err != nil { - log.Fatal(err) + log.Println(err) + return } client := NewBrowserClient(c) diff --git a/overlord/worker.go b/overlord/worker.go index 1458db64..fda7a1a5 100644 --- a/overlord/worker.go +++ b/overlord/worker.go @@ -9,152 +9,40 @@ import ( type WorkerClient struct { *cws.Client + ServerID string } // RouteWorker are all routes server received from worker -func (s *Session) RouteWorker() { - iceCandidates := [][]byte{} +func (o *Server) RouteWorker(workerClient *WorkerClient) { + // registerRoom event from a server, when server created a new room. + // RoomID is global so it is managed by overlord. + workerClient.Receive("registerRoom", func(resp cws.WSPacket) cws.WSPacket { + log.Println("Overlord: Received registerRoom ", resp.Data, workerClient.ServerID) + o.roomToServer[resp.Data] = workerClient.ServerID + return cws.WSPacket{ + ID: "registerRoom", + } + }) - workerClient := s.WorkerClient + // getRoom returns the server ID based on requested roomID. + workerClient.Receive("getRoom", func(resp cws.WSPacket) cws.WSPacket { + log.Println("Overlord: Received a getroom request") + log.Println("Result: ", o.roomToServer[resp.Data]) + return cws.WSPacket{ + ID: "getRoom", + Data: o.roomToServer[resp.Data], + } + }) workerClient.Receive("heartbeat", func(resp cws.WSPacket) cws.WSPacket { return resp }) - - workerClient.Receive("icecandidate", func(resp cws.WSPacket) cws.WSPacket { - log.Println("Received candidates ", resp.Data) - iceCandidates = append(iceCandidates, []byte(resp.Data)) - return cws.EmptyPacket - }) - - workerClient.Receive("initwebrtc", func(resp cws.WSPacket) cws.WSPacket { - log.Println("Overlord: Received sdp request from a worker") - log.Println("Overlord: Sending back sdp to browser") - s.BrowserClient.Send(resp, nil) - - return cws.EmptyPacket - }) - - //workerClient.Receive("quit", func(resp cws.WSPacket) (req cws.WSPacket) { - //log.Println("Overlord: Received quit request from a worker") - //log.Println("Overlord: Sending back sdp to browser") - //s.GameName = resp.Data - //s.RoomID = resp.RoomID - //s.PlayerIndex = resp.PlayerIndex - - //// TODO: - ////room := s.handler.getRoom(s.RoomID) - ////if room.IsPCInRoom(s.peerconnection) { - ////s.handler.detachPeerConn(s.peerconnection) - ////} - //log.Println("Sending to target host", resp.TargetHostID, " ", resp) - //resp = s.handler.servers[resp.TargetHostID].SyncSend( - //resp, - //) - - //return cws.EmptyPacket - //}) - - // TODO: Add save and load - //browserClient.Receive("save", func(resp cws.WSPacket) (req cws.WSPacket) { - //log.Println("Saving game state") - //req.ID = "save" - //req.Data = "ok" - //if s.RoomID != "" { - //room := s.handler.getRoom(s.RoomID) - //if room == nil { - //return - //} - //err := room.SaveGame() - //if err != nil { - //log.Println("[!] Cannot save game state: ", err) - //req.Data = "error" - //} - //} else { - //req.Data = "error" - //} - - //return req - //}) - - //browserClient.Receive("load", func(resp cws.WSPacket) (req cws.WSPacket) { - //log.Println("Loading game state") - //req.ID = "load" - //req.Data = "ok" - //if s.RoomID != "" { - //room := s.handler.getRoom(s.RoomID) - //err := room.LoadGame() - //if err != nil { - //log.Println("[!] Cannot load game state: ", err) - //req.Data = "error" - //} - //} else { - //req.Data = "error" - //} - - //return req - //}) - - //browserClient.Receive("start", func(resp cws.WSPacket) (req cws.WSPacket) { - //s.GameName = resp.Data - //s.RoomID = resp.RoomID - //s.PlayerIndex = resp.PlayerIndex - - //log.Println("Starting game") - //// If we are connecting to overlord, request corresponding serverID based on roomID - //if s.OverlordClient != nil { - //roomServerID := getServerIDOfRoom(s.OverlordClient, s.RoomID) - //log.Println("Server of RoomID ", s.RoomID, " is ", roomServerID, " while current server is ", s.ServerID) - //// If the target serverID is different from current serverID - //if roomServerID != "" && s.ServerID != roomServerID { - //// TODO: Re -register - //// Bridge Connection to the target serverID - //go s.bridgeConnection(roomServerID, s.GameName, s.RoomID, s.PlayerIndex) - //return - //} - //} - - //// Get Room in local server - //// TODO: check if roomID is in the current server - //room := s.handler.getRoom(s.RoomID) - //log.Println("Got Room from local ", room, " ID: ", s.RoomID) - //// If room is not running - //if room == nil { - //// Create new room - //room = s.handler.createNewRoom(s.GameName, s.RoomID, s.PlayerIndex) - //// Wait for done signal from room - //go func() { - //<-room.Done - //s.handler.detachRoom(room.ID) - //}() - //} - - //// Attach peerconnection to room. If PC is already in room, don't detach - //log.Println("Is PC in room", room.IsPCInRoom(s.peerconnection)) - //if !room.IsPCInRoom(s.peerconnection) { - //s.handler.detachPeerConn(s.peerconnection) - //room.AddConnectionToRoom(s.peerconnection, s.PlayerIndex) - //} - //s.RoomID = room.ID - - //// Register room to overlord if we are connecting to overlord - //if room != nil && s.OverlordClient != nil { - //s.OverlordClient.Send(cws.WSPacket{ - //ID: "registerRoom", - //Data: s.RoomID, - //}, nil) - //} - //req.ID = "start" - //req.RoomID = s.RoomID - //req.SessionID = s.ID - - //return req - //}) } // NewWorkerClient returns a client connecting to worker. This connection exchanges information between workers and server -func NewWorkerClient(c *websocket.Conn) *WorkerClient { +func NewWorkerClient(c *websocket.Conn, serverID string) *WorkerClient { return &WorkerClient{ - Client: cws.NewClient(c), + Client: cws.NewClient(c), + ServerID: serverID, } } diff --git a/static/gameboy2.html b/static/gameboy2.html index 7012a313..fc97a569 100644 --- a/static/gameboy2.html +++ b/static/gameboy2.html @@ -37,7 +37,7 @@
load
save
-
join
+
start
quit
@@ -79,4 +79,4 @@ - \ No newline at end of file + diff --git a/worker/handlers.go b/worker/handlers.go index 423a437f..4622b06f 100644 --- a/worker/handlers.go +++ b/worker/handlers.go @@ -2,6 +2,7 @@ package worker import ( "log" + "time" "github.com/giongto35/cloud-game/webrtc" storage "github.com/giongto35/cloud-game/worker/cloud-storage" @@ -20,6 +21,8 @@ var upgrader = websocket.Upgrader{} type Handler struct { // Client that connects to overlord oClient *OverlordClient + // Raw address of overlord + overlordHost string // Rooms map : RoomID -> Room rooms map[string]*room.Room // ID of the current server globalwise @@ -33,14 +36,13 @@ type Handler struct { } // NewHandler returns a new server -func NewHandler(overlordConn *websocket.Conn, isDebug bool, gamePath string) *Handler { +func NewHandler(overlordHost string, gamePath string) *Handler { onlineStorage := storage.NewInitClient() - oClient := NewOverlordClient(overlordConn) return &Handler{ - oClient: oClient, rooms: map[string]*room.Room{}, sessions: map[string]*Session{}, + overlordHost: overlordHost, gamePath: gamePath, onlineStorage: onlineStorage, } @@ -48,10 +50,42 @@ func NewHandler(overlordConn *websocket.Conn, isDebug bool, gamePath string) *Ha // Run starts a Handler running logic func (h *Handler) Run() { - go h.oClient.Heartbeat() + for { + oClient, err := setupOverlordConnection(h.overlordHost) + if err != nil { + log.Println("Cannot connect to overlord. Retrying...") + time.Sleep(time.Second) + continue + } - h.RouteOverlord() - h.oClient.Listen() + h.oClient = oClient + log.Println("Connected to overlord successfully.") + go h.oClient.Heartbeat() + h.RouteOverlord() + h.oClient.Listen() + // If cannot listen, reconnect to overlord + } +} + +func setupOverlordConnection(ohost string) (*OverlordClient, error) { + conn, err := createOverlordConnection(ohost) + if err != nil { + return nil, err + } + return NewOverlordClient(conn), nil +} + +func createOverlordConnection(ohost string) (*websocket.Conn, error) { + c, _, err := websocket.DefaultDialer.Dial(ohost, nil) + if err != nil { + return nil, err + } + + return c, nil +} + +func (h *Handler) GetOverlordClient() *OverlordClient { + return h.oClient } // detachPeerConn detach/remove a peerconnection from current room diff --git a/worker/overlord.go b/worker/overlord.go index 13c0253f..f1850de6 100644 --- a/worker/overlord.go +++ b/worker/overlord.go @@ -171,8 +171,6 @@ func (h *Handler) RouteOverlord() { return cws.EmptyPacket }, ) - - // heartbeat to keep pinging overlord. We not ping from server to browser, so we don't call heartbeat in browserClient } func getServerIDOfRoom(oc *OverlordClient, roomID string) string {