Worker auto reconnect (#34)

* Change join to start

* WIP

* Use RouteWork to route worker call

* Add reconnect

* Update main test

* Remove fatal

* Update test

* Remove unnecessary
This commit is contained in:
giongto35 2019-05-23 19:23:45 +08:00 committed by GitHub
parent 4fc4186b1c
commit facd0fdc01
12 changed files with 120 additions and 224 deletions

1
.gitignore vendored
View file

@ -47,4 +47,5 @@ Temporary Items
### Production
DockerfileProd
key.json
prod/

View file

@ -22,15 +22,6 @@ const gamePath = "games"
// Time allowed to write a message to the peer.
var upgrader = websocket.Upgrader{}
func createOverlordConnection() (*websocket.Conn, error) {
c, _, err := websocket.DefaultDialer.Dial(*config.OverlordHost, nil)
if err != nil {
return nil, err
}
return c, nil
}
// initilizeOverlord setup an overlord server
func initilizeOverlord() {
overlord := overlord.NewServer()
@ -50,13 +41,7 @@ func initilizeOverlord() {
// initializeWorker setup a worker
func initializeWorker() {
conn, err := createOverlordConnection()
if err != nil {
log.Println("Cannot connect to overlord")
log.Println("Run as a single server")
}
worker := worker.NewHandler(conn, *config.IsDebug, gamePath)
worker := worker.NewHandler(*config.OverlordHost, gamePath)
defer func() {
log.Println("Close worker")

View file

@ -32,15 +32,8 @@ func initOverlord() (*httptest.Server, *httptest.Server) {
return overlordWorker, overlordBrowser
}
func initWorker(t *testing.T, oconn *websocket.Conn) *worker.Handler {
func initWorker(t *testing.T, overlordURL string) *worker.Handler {
fmt.Println("Spawn new worker")
handler := worker.NewHandler(oconn, true, testGamePath)
go handler.Run()
//server := httptest.NewServer(http.HandlerFunc(handler.WS))
return handler
}
func connectTestOverlordServer(t *testing.T, overlordURL string) *websocket.Conn {
if overlordURL == "" {
return nil
} else {
@ -48,20 +41,18 @@ func connectTestOverlordServer(t *testing.T, overlordURL string) *websocket.Conn
fmt.Println("connecting to overlord: ", overlordURL)
}
oconn, _, err := websocket.DefaultDialer.Dial(overlordURL, nil)
if err != nil {
t.Fatalf("%v", err)
}
handler := worker.NewHandler(overlordURL, testGamePath)
return oconn
go handler.Run()
time.Sleep(time.Second)
return handler
}
func initClient(t *testing.T, host string) (client *cws.Client) {
// Convert http://127.0.0.1 to ws://127.0.0.
u := "ws" + strings.TrimPrefix(host, "http")
// Connect to the overlord
fmt.Println("Connecting to ", u)
fmt.Println("Connecting to", u)
ws, _, err := websocket.DefaultDialer.Dial(u, nil)
if err != nil {
t.Fatalf("%v", err)
@ -157,10 +148,8 @@ func TestSingleServerOneOverlord(t *testing.T) {
defer obrowser.Close()
defer oworker.Close()
oconn := connectTestOverlordServer(t, oworker.URL)
defer oconn.Close()
// Init worker
worker := initWorker(t, oconn)
worker := initWorker(t, oworker.URL)
defer worker.Close()
// connect overlord
@ -203,14 +192,10 @@ func TestTwoServerOneOverlord(t *testing.T) {
defer obrowser.Close()
defer oworker.Close()
oconn1 := connectTestOverlordServer(t, oworker.URL)
defer oconn1.Close()
worker1 := initWorker(t, oconn1)
worker1 := initWorker(t, oworker.URL)
defer worker1.Close()
oconn2 := connectTestOverlordServer(t, oworker.URL)
defer oconn2.Close()
worker2 := initWorker(t, oconn2)
worker2 := initWorker(t, oworker.URL)
defer worker2.Close()
client1 := initClient(t, obrowser.URL)
@ -295,9 +280,7 @@ func TestReconnectRoom(t *testing.T) {
defer oworker.Close()
// Init worker
oconn := connectTestOverlordServer(t, oworker.URL)
defer oconn.Close()
worker := initWorker(t, oconn)
worker := initWorker(t, oworker.URL)
client := initClient(t, obrowser.URL)
@ -321,16 +304,14 @@ func TestReconnectRoom(t *testing.T) {
log.Println("Closing room and server")
client.Close()
oconn.Close()
worker.GetOverlordClient().Close()
worker.Close()
// Close server and reconnect
log.Println("Server respawn")
// Init slave server again
oconn = connectTestOverlordServer(t, oworker.URL)
defer oconn.Close()
worker = initWorker(t, oconn)
worker = initWorker(t, oworker.URL)
defer worker.Close()
client = initClient(t, obrowser.URL)
@ -383,9 +364,7 @@ func TestReconnectRoomNoLocal(t *testing.T) {
return
}
oconn := connectTestOverlordServer(t, oworker.URL)
defer oconn.Close()
worker := initWorker(t, oconn)
worker := initWorker(t, oworker.URL)
client := initClient(t, obrowser.URL)
@ -409,7 +388,7 @@ func TestReconnectRoomNoLocal(t *testing.T) {
log.Println("Closing room and server")
client.Close()
oconn.Close()
worker.GetOverlordClient().Close()
worker.Close()
// Remove room on local
path := emulator.GetSavePath(saveRoomID)
@ -420,9 +399,7 @@ func TestReconnectRoomNoLocal(t *testing.T) {
log.Println("Server respawn")
// Init slave server again
oconn = connectTestOverlordServer(t, oworker.URL)
defer oconn.Close()
worker = initWorker(t, oconn)
worker = initWorker(t, oworker.URL)
defer worker.Close()
client = initClient(t, obrowser.URL)

View file

@ -1,6 +1,9 @@
package config
import "flag"
import (
"flag"
"time"
)
const defaultoverlord = "ws://localhost:9000/wso"
@ -10,3 +13,4 @@ var Port = flag.String("port", "8000", "Port of the game")
var IsMonitor = flag.Bool("monitor", false, "Turn on monitor")
var Width = 256
var Height = 240
var WSWait = 20 * time.Second

View file

@ -6,6 +6,7 @@ import (
"sync"
"time"
"github.com/giongto35/cloud-game/config"
"github.com/gorilla/websocket"
uuid "github.com/satori/go.uuid"
)
@ -84,6 +85,7 @@ func (c *Client) Send(request WSPacket, callback func(response WSPacket)) {
}
c.sendLock.Lock()
c.conn.SetWriteDeadline(time.Now().Add(config.WSWait))
c.conn.WriteMessage(websocket.TextMessage, data)
c.sendLock.Unlock()
}
@ -110,8 +112,8 @@ func (c *Client) Receive(id string, f func(response WSPacket) (request WSPacket)
if err != nil {
log.Println("[!] json marshal error:", err)
}
//c.conn.SetWriteDeadline(time.Now().Add(writeWait))
c.sendLock.Lock()
c.conn.SetWriteDeadline(time.Now().Add(config.WSWait))
c.conn.WriteMessage(websocket.TextMessage, resp)
c.sendLock.Unlock()
}
@ -145,6 +147,7 @@ func (c *Client) Heartbeat() {
func (c *Client) Listen() {
for {
c.conn.SetReadDeadline(time.Now().Add(config.WSWait))
_, rawMsg, err := c.conn.ReadMessage()
if err != nil {
log.Println("[!] read:", err)

22
document/implementation/README.md vendored Normal file
View file

@ -0,0 +1,22 @@
# Web-based Cloud Gaming Service Implementation Document
## Code structure
.
├── cmd
│ ├── main.go
│ └── main_test.go
├── emulator: emulator internal
│ ├── director.go: coordinator of views
│ └── gameview.go: in game logic
├── overlord: coordinator of workers
├── games: roms list, no code logic
├── static: static file for front end
│ ├── js
│ │ └── ws.js: client logic
│ ├── gameboy.html: frontend with gameboy ui
│ └── index_ws.html: raw frontend without ui
├── cws
│ └── cws.go: socket multiplexer library, used for signalling
├── webrtc
└── worker: integration between emulator + webrtc (communication)

View file

@ -40,6 +40,9 @@ func (s *Session) RouteBrowser() {
resp,
)
log.Println("Overlord: Received sdp request from a worker")
log.Println("Overlord: Sending back sdp to browser")
return sdp
})

View file

@ -62,7 +62,7 @@ func (o *Server) WSO(w http.ResponseWriter, r *http.Request) {
log.Println("Overlord: A new server connected to Overlord", serverID)
// Register to workersClients map the client connection
client := NewWorkerClient(c)
client := NewWorkerClient(c, serverID)
o.workerClients[serverID] = client
defer o.cleanConnection(client, serverID)
@ -74,26 +74,7 @@ func (o *Server) WSO(w http.ResponseWriter, r *http.Request) {
},
nil,
)
// registerRoom event from a server, when server created a new room.
// RoomID is global so it is managed by overlord.
client.Receive("registerRoom", func(resp cws.WSPacket) cws.WSPacket {
log.Println("Overlord: Received registerRoom ", resp.Data, serverID)
o.roomToServer[resp.Data] = serverID
return cws.WSPacket{
ID: "registerRoom",
}
})
// getRoom returns the server ID based on requested roomID.
client.Receive("getRoom", func(resp cws.WSPacket) cws.WSPacket {
log.Println("Overlord: Received a getroom request")
log.Println("Result: ", o.roomToServer[resp.Data])
return cws.WSPacket{
ID: "getRoom",
Data: o.roomToServer[resp.Data],
}
})
o.RouteWorker(client)
client.Listen()
}
@ -101,7 +82,6 @@ func (o *Server) WSO(w http.ResponseWriter, r *http.Request) {
// WSO handles all connections from frontend to overlord
func (o *Server) WS(w http.ResponseWriter, r *http.Request) {
log.Println("Browser connected to overlord")
//TODO: Add it back
defer func() {
if r := recover(); r != nil {
log.Println("Warn: Something wrong. Recovered in ", r)
@ -120,7 +100,8 @@ func (o *Server) WS(w http.ResponseWriter, r *http.Request) {
sessionID := uuid.Must(uuid.NewV4()).String()
serverID, err := o.findBestServer()
if err != nil {
log.Fatal(err)
log.Println(err)
return
}
client := NewBrowserClient(c)

View file

@ -9,152 +9,40 @@ import (
type WorkerClient struct {
*cws.Client
ServerID string
}
// RouteWorker are all routes server received from worker
func (s *Session) RouteWorker() {
iceCandidates := [][]byte{}
func (o *Server) RouteWorker(workerClient *WorkerClient) {
// registerRoom event from a server, when server created a new room.
// RoomID is global so it is managed by overlord.
workerClient.Receive("registerRoom", func(resp cws.WSPacket) cws.WSPacket {
log.Println("Overlord: Received registerRoom ", resp.Data, workerClient.ServerID)
o.roomToServer[resp.Data] = workerClient.ServerID
return cws.WSPacket{
ID: "registerRoom",
}
})
workerClient := s.WorkerClient
// getRoom returns the server ID based on requested roomID.
workerClient.Receive("getRoom", func(resp cws.WSPacket) cws.WSPacket {
log.Println("Overlord: Received a getroom request")
log.Println("Result: ", o.roomToServer[resp.Data])
return cws.WSPacket{
ID: "getRoom",
Data: o.roomToServer[resp.Data],
}
})
workerClient.Receive("heartbeat", func(resp cws.WSPacket) cws.WSPacket {
return resp
})
workerClient.Receive("icecandidate", func(resp cws.WSPacket) cws.WSPacket {
log.Println("Received candidates ", resp.Data)
iceCandidates = append(iceCandidates, []byte(resp.Data))
return cws.EmptyPacket
})
workerClient.Receive("initwebrtc", func(resp cws.WSPacket) cws.WSPacket {
log.Println("Overlord: Received sdp request from a worker")
log.Println("Overlord: Sending back sdp to browser")
s.BrowserClient.Send(resp, nil)
return cws.EmptyPacket
})
//workerClient.Receive("quit", func(resp cws.WSPacket) (req cws.WSPacket) {
//log.Println("Overlord: Received quit request from a worker")
//log.Println("Overlord: Sending back sdp to browser")
//s.GameName = resp.Data
//s.RoomID = resp.RoomID
//s.PlayerIndex = resp.PlayerIndex
//// TODO:
////room := s.handler.getRoom(s.RoomID)
////if room.IsPCInRoom(s.peerconnection) {
////s.handler.detachPeerConn(s.peerconnection)
////}
//log.Println("Sending to target host", resp.TargetHostID, " ", resp)
//resp = s.handler.servers[resp.TargetHostID].SyncSend(
//resp,
//)
//return cws.EmptyPacket
//})
// TODO: Add save and load
//browserClient.Receive("save", func(resp cws.WSPacket) (req cws.WSPacket) {
//log.Println("Saving game state")
//req.ID = "save"
//req.Data = "ok"
//if s.RoomID != "" {
//room := s.handler.getRoom(s.RoomID)
//if room == nil {
//return
//}
//err := room.SaveGame()
//if err != nil {
//log.Println("[!] Cannot save game state: ", err)
//req.Data = "error"
//}
//} else {
//req.Data = "error"
//}
//return req
//})
//browserClient.Receive("load", func(resp cws.WSPacket) (req cws.WSPacket) {
//log.Println("Loading game state")
//req.ID = "load"
//req.Data = "ok"
//if s.RoomID != "" {
//room := s.handler.getRoom(s.RoomID)
//err := room.LoadGame()
//if err != nil {
//log.Println("[!] Cannot load game state: ", err)
//req.Data = "error"
//}
//} else {
//req.Data = "error"
//}
//return req
//})
//browserClient.Receive("start", func(resp cws.WSPacket) (req cws.WSPacket) {
//s.GameName = resp.Data
//s.RoomID = resp.RoomID
//s.PlayerIndex = resp.PlayerIndex
//log.Println("Starting game")
//// If we are connecting to overlord, request corresponding serverID based on roomID
//if s.OverlordClient != nil {
//roomServerID := getServerIDOfRoom(s.OverlordClient, s.RoomID)
//log.Println("Server of RoomID ", s.RoomID, " is ", roomServerID, " while current server is ", s.ServerID)
//// If the target serverID is different from current serverID
//if roomServerID != "" && s.ServerID != roomServerID {
//// TODO: Re -register
//// Bridge Connection to the target serverID
//go s.bridgeConnection(roomServerID, s.GameName, s.RoomID, s.PlayerIndex)
//return
//}
//}
//// Get Room in local server
//// TODO: check if roomID is in the current server
//room := s.handler.getRoom(s.RoomID)
//log.Println("Got Room from local ", room, " ID: ", s.RoomID)
//// If room is not running
//if room == nil {
//// Create new room
//room = s.handler.createNewRoom(s.GameName, s.RoomID, s.PlayerIndex)
//// Wait for done signal from room
//go func() {
//<-room.Done
//s.handler.detachRoom(room.ID)
//}()
//}
//// Attach peerconnection to room. If PC is already in room, don't detach
//log.Println("Is PC in room", room.IsPCInRoom(s.peerconnection))
//if !room.IsPCInRoom(s.peerconnection) {
//s.handler.detachPeerConn(s.peerconnection)
//room.AddConnectionToRoom(s.peerconnection, s.PlayerIndex)
//}
//s.RoomID = room.ID
//// Register room to overlord if we are connecting to overlord
//if room != nil && s.OverlordClient != nil {
//s.OverlordClient.Send(cws.WSPacket{
//ID: "registerRoom",
//Data: s.RoomID,
//}, nil)
//}
//req.ID = "start"
//req.RoomID = s.RoomID
//req.SessionID = s.ID
//return req
//})
}
// NewWorkerClient returns a client connecting to worker. This connection exchanges information between workers and server
func NewWorkerClient(c *websocket.Conn) *WorkerClient {
func NewWorkerClient(c *websocket.Conn, serverID string) *WorkerClient {
return &WorkerClient{
Client: cws.NewClient(c),
Client: cws.NewClient(c),
ServerID: serverID,
}
}

View file

@ -37,7 +37,7 @@
<div id="btn-load" unselectable="on" class="btn-big unselectable" value="load">load</div>
<div id="btn-save" unselectable="on" class="btn-big unselectable" value="save">save</div>
<div id="btn-join" unselectable="on" class="btn-big unselectable" value="join">join</div>
<div id="btn-join" unselectable="on" class="btn-big unselectable" value="join">start</div>
<div id="btn-quit" unselectable="on" class="btn-big unselectable" value="quit">quit</div>
@ -79,4 +79,4 @@
</body>
</html>
</html>

View file

@ -2,6 +2,7 @@ package worker
import (
"log"
"time"
"github.com/giongto35/cloud-game/webrtc"
storage "github.com/giongto35/cloud-game/worker/cloud-storage"
@ -20,6 +21,8 @@ var upgrader = websocket.Upgrader{}
type Handler struct {
// Client that connects to overlord
oClient *OverlordClient
// Raw address of overlord
overlordHost string
// Rooms map : RoomID -> Room
rooms map[string]*room.Room
// ID of the current server globalwise
@ -33,14 +36,13 @@ type Handler struct {
}
// NewHandler returns a new server
func NewHandler(overlordConn *websocket.Conn, isDebug bool, gamePath string) *Handler {
func NewHandler(overlordHost string, gamePath string) *Handler {
onlineStorage := storage.NewInitClient()
oClient := NewOverlordClient(overlordConn)
return &Handler{
oClient: oClient,
rooms: map[string]*room.Room{},
sessions: map[string]*Session{},
overlordHost: overlordHost,
gamePath: gamePath,
onlineStorage: onlineStorage,
}
@ -48,10 +50,42 @@ func NewHandler(overlordConn *websocket.Conn, isDebug bool, gamePath string) *Ha
// Run starts a Handler running logic
func (h *Handler) Run() {
go h.oClient.Heartbeat()
for {
oClient, err := setupOverlordConnection(h.overlordHost)
if err != nil {
log.Println("Cannot connect to overlord. Retrying...")
time.Sleep(time.Second)
continue
}
h.RouteOverlord()
h.oClient.Listen()
h.oClient = oClient
log.Println("Connected to overlord successfully.")
go h.oClient.Heartbeat()
h.RouteOverlord()
h.oClient.Listen()
// If cannot listen, reconnect to overlord
}
}
func setupOverlordConnection(ohost string) (*OverlordClient, error) {
conn, err := createOverlordConnection(ohost)
if err != nil {
return nil, err
}
return NewOverlordClient(conn), nil
}
func createOverlordConnection(ohost string) (*websocket.Conn, error) {
c, _, err := websocket.DefaultDialer.Dial(ohost, nil)
if err != nil {
return nil, err
}
return c, nil
}
func (h *Handler) GetOverlordClient() *OverlordClient {
return h.oClient
}
// detachPeerConn detach/remove a peerconnection from current room

View file

@ -171,8 +171,6 @@ func (h *Handler) RouteOverlord() {
return cws.EmptyPacket
},
)
// heartbeat to keep pinging overlord. We not ping from server to browser, so we don't call heartbeat in browserClient
}
func getServerIDOfRoom(oc *OverlordClient, roomID string) string {