Close Leaked goroutine

This commit is contained in:
giongto35 2019-05-08 23:59:47 +08:00
parent 322b5bffe0
commit 0eea6cfcc8
11 changed files with 144 additions and 72 deletions

View file

@ -6,6 +6,7 @@ import (
"math/rand"
"net/http"
_ "net/http/pprof"
"runtime"
"strings"
"time"
@ -65,12 +66,22 @@ func initializeServer() {
http.ListenAndServe(":"+*config.Port, nil)
}
func monitor() {
c := time.Tick(time.Second)
for range c {
log.Printf("#goroutines: %d\n", runtime.NumGoroutine())
}
}
func main() {
flag.Parse()
log.Println("Usage: ./game [-debug]")
rand.Seed(time.Now().UTC().UnixNano())
//if *config.IsMonitor {
go monitor()
//}
// There are two server mode
// Overlord is coordinator. If the OvelordHost Param is `overlord`, we spawn a new host as Overlord.
// else we spawn new server as normal server connecting to OverlordHost.

View file

@ -7,6 +7,7 @@ import (
"net/http/httptest"
"strings"
"testing"
"time"
"github.com/giongto35/cloud-game/cws"
"github.com/giongto35/cloud-game/handler"
@ -159,6 +160,7 @@ func TestSingleServerNoOverlord(t *testing.T) {
fmt.Println("RoomID should not be empty")
t.Fail()
}
time.Sleep(time.Second)
fmt.Println("Done")
}
@ -192,6 +194,7 @@ func TestSingleServerOneOverlord(t *testing.T) {
fmt.Println("RoomID should not be empty")
t.Fail()
}
time.Sleep(time.Second)
fmt.Println("Done")
}
@ -262,6 +265,7 @@ func TestTwoServerOneOverlord(t *testing.T) {
})
// If receive roomID, the server is running correctly
time.Sleep(time.Second)
fmt.Println("Done")
}
@ -319,6 +323,7 @@ func TestReconnectRoomNoOverlord(t *testing.T) {
t.Fail()
}
time.Sleep(time.Second)
fmt.Println("Done")
}

View file

@ -21,6 +21,8 @@ type Client struct {
sendCallbackLock sync.Mutex
// recvCallback is callback when receive based on ID of the packet
recvCallback map[string]func(req WSPacket)
Done chan struct{}
}
type WSPacket struct {
@ -49,6 +51,8 @@ func NewClient(conn *websocket.Conn) *Client {
sendCallback: sendCallback,
recvCallback: recvCallback,
Done: make(chan struct{}),
}
}
@ -120,43 +124,45 @@ func (c *Client) Heartbeat() {
timer := time.Tick(time.Second)
for range timer {
select {
case <-c.Done:
log.Println("Close heartbeat")
return
default:
}
c.Send(WSPacket{ID: "heartbeat"}, nil)
}
}
func (c *Client) Listen() {
for {
//log.Println("Waiting for message ...")
_, rawMsg, err := c.conn.ReadMessage()
if err != nil {
log.Println("[!] read:", err)
// TODO: Check explicit disconnect error to break
close(c.Done)
break
}
wspacket := WSPacket{}
err = json.Unmarshal(rawMsg, &wspacket)
//log.Println( "Received: ", wspacket)
if err != nil {
continue
}
// Check if some async send is waiting for the response based on packetID
// TODO: Change to read lock
c.sendCallbackLock.Lock()
//log.Println("Listening: Callback waiting list: ", c.id, c.sendCallback)
// TODO: Change to read lock.
//c.sendCallbackLock.Lock()
callback, ok := c.sendCallback[wspacket.PacketID]
//log.Println("Has callback: ", ok, "ClientID: ", c.id, "PacketID ", wspacket.PacketID)
c.sendCallbackLock.Unlock()
//c.sendCallbackLock.Unlock()
if ok {
go callback(wspacket)
c.sendCallbackLock.Lock()
//log.Println("Deleteing Packet ", wspacket.PacketID)
//c.sendCallbackLock.Lock()
delete(c.sendCallback, wspacket.PacketID)
c.sendCallbackLock.Unlock()
//c.sendCallbackLock.Unlock()
// Skip receiveCallback to avoid duplication
continue
}
//log.Println("Listening: Callback waiting list: ", c.id, c.recvCallback)
// Check if some receiver with the ID is registered
if callback, ok := c.recvCallback[wspacket.ID]; ok {
go callback(wspacket)

View file

@ -78,12 +78,13 @@ L:
for range c {
// for {
// quit game
// TODO: Anyway not using select because it will slow down
// TODO: How to not using select because it will slow down
select {
// if there is event from close channel => the game is ended
//case input := <-d.inputChannel:
//d.UpdateInput(input)
case <-d.Done:
log.Println("Closed Director")
break L
default:
}

View file

@ -75,12 +75,11 @@ func (h *Handler) GetWeb(w http.ResponseWriter, r *http.Request) {
// WS handles normal traffic (from browser to host)
func (h *Handler) WS(w http.ResponseWriter, r *http.Request) {
c, err := upgrader.Upgrade(w, r, nil)
defer c.Close()
if err != nil {
log.Print("[!] WS upgrade:", err)
return
}
defer c.Close()
client := NewBrowserClient(c)
sessionID := uuid.Must(uuid.NewV4()).String()
@ -91,6 +90,7 @@ func (h *Handler) WS(w http.ResponseWriter, r *http.Request) {
peerconnection: webrtc.NewWebRTC(),
handler: h,
}
defer wssession.Close()
if wssession.OverlordClient != nil {
wssession.RouteOverlord()

View file

@ -74,7 +74,7 @@ func NewRoom(roomID, gamepath, gameName string, onlineStorage *storage.Client) *
if !room.isGameOnLocal(savepath) {
// Fetch room from GCP to server
log.Println("Load room from online storage", savepath)
if err := room.loadRoomOnline(roomID, savepath); err != nil {
if err := room.saveOnlineRoomToLocal(roomID, savepath); err != nil {
log.Printf("Warn: Room %s is not in online storage, error %s", roomID, err)
}
}
@ -111,7 +111,7 @@ func (r *Room) startWebRTCSession(peerconnection *webrtc.WebRTC, playerIndex int
for {
select {
case <-r.Done:
log.Println("Close listening from peerconnection for room", r.ID)
log.Println("Detach peerconnection from room", r.ID)
return
case <-peerconnection.Done:
r.removeSession(peerconnection)
@ -196,7 +196,7 @@ func (r *Room) SaveGame() error {
return nil
}
func (r *Room) loadRoomOnline(roomID string, savepath string) error {
func (r *Room) saveOnlineRoomToLocal(roomID string, savepath string) error {
log.Println("Try loading game from cloud storage")
// If the game is not on local server
// Try to load from gcloud
@ -206,11 +206,7 @@ func (r *Room) loadRoomOnline(roomID string, savepath string) error {
}
// Save the data fetched from gcloud to local server
ioutil.WriteFile(savepath, data, 0644)
// Reload game again
//err = r.director.LoadGame(nil)
//if err != nil {
//return err
//}
return nil
}

View file

@ -5,7 +5,8 @@ import (
)
// Session represents a session connected from the browser to the current server
// It involves one connection to browser and one connection to the overlord
// It requires one connection to browser and one connection to the overlord
// connection to browser is 1-1. connection to overlord is n - 1
// Peerconnection can be from other server to ensure better latency
type Session struct {
ID string
@ -21,3 +22,7 @@ type Session struct {
RoomID string
PlayerIndex int
}
func (s *Session) Close() {
s.peerconnection.StopClient()
}

BIN
trace.out vendored Normal file

Binary file not shown.

View file

@ -119,7 +119,10 @@ func (v *VpxEncoder) startLooping() {
for {
beginEncoding := time.Now()
yuv := <-v.Input
yuv, ok := <-v.Input
if !ok {
return
}
// Add Image
v.vpxCodexIter = nil
C.vpx_img_read(&v.vpxImage, unsafe.Pointer(&yuv[0]))
@ -156,11 +159,11 @@ func (v *VpxEncoder) startLooping() {
// Release release memory and stop loop
func (v *VpxEncoder) Release() {
v.started = false
if v.started {
close(v.Input)
close(v.Output)
C.vpx_img_free(&v.vpxImage)
C.vpx_codec_destroy(&v.vpxCodexCtx)
}
v.started = false
}

43
webrtc/util.go Normal file
View file

@ -0,0 +1,43 @@
// credit to https://github.com/poi5305/go-yuv2webRTC/blob/master/webrtc/webrtc.go
package webrtc
import (
"bytes"
"compress/gzip"
"io/ioutil"
)
func zip(in []byte) []byte {
var b bytes.Buffer
gz := gzip.NewWriter(&b)
_, err := gz.Write(in)
if err != nil {
panic(err)
}
err = gz.Flush()
if err != nil {
panic(err)
}
err = gz.Close()
if err != nil {
panic(err)
}
return b.Bytes()
}
func unzip(in []byte) []byte {
var b bytes.Buffer
_, err := b.Write(in)
if err != nil {
panic(err)
}
r, err := gzip.NewReader(&b)
if err != nil {
panic(err)
}
res, err := ioutil.ReadAll(r)
if err != nil {
panic(err)
}
return res
}

View file

@ -2,12 +2,9 @@
package webrtc
import (
"bytes"
"compress/gzip"
"encoding/base64"
"encoding/json"
"fmt"
"io/ioutil"
"log"
"math/rand"
"time"
@ -24,41 +21,6 @@ var webrtcconfig = webrtc.Configuration{ICEServers: []webrtc.ICEServer{{URLs: []
// Allows compressing offer/answer to bypass terminal input limits.
const compress = false
func zip(in []byte) []byte {
var b bytes.Buffer
gz := gzip.NewWriter(&b)
_, err := gz.Write(in)
if err != nil {
panic(err)
}
err = gz.Flush()
if err != nil {
panic(err)
}
err = gz.Close()
if err != nil {
panic(err)
}
return b.Bytes()
}
func unzip(in []byte) []byte {
var b bytes.Buffer
_, err := b.Write(in)
if err != nil {
panic(err)
}
r, err := gzip.NewReader(&b)
if err != nil {
panic(err)
}
res, err := ioutil.ReadAll(r)
if err != nil {
panic(err)
}
return res
}
// Encode encodes the input in base64
// It can optionally zip the input before encoding
func Encode(obj interface{}) string {
@ -203,9 +165,10 @@ func (w *WebRTC) StartClient(remoteSession string, width, height int) (string, e
})
inputTrack.OnClose(func() {
fmt.Println("closed webrtc")
w.Done <- struct{}{}
close(w.Done)
fmt.Println("Data channel closed")
fmt.Println("Closed webrtc")
//close(w.Done)
//w.StopClient()
})
// WebRTC state callback
@ -261,6 +224,11 @@ func (w *WebRTC) AddCandidate(candidate webrtc.ICECandidateInit) {
// StopClient disconnect
func (w *WebRTC) StopClient() {
// if stopped, bypass
if w.isConnected == false {
return
}
log.Println("===StopClient===")
w.isConnected = false
if w.encoder != nil {
@ -270,6 +238,10 @@ func (w *WebRTC) StopClient() {
w.connection.Close()
}
w.connection = nil
close(w.InputChannel)
// NOTE: ImageChannel is waiting for input. Close in writer is not correct for this
close(w.ImageChannel)
close(w.AudioChannel)
}
// IsConnected comment
@ -282,8 +254,18 @@ func (w *WebRTC) startStreaming(vp8Track *webrtc.Track, audioTrack *webrtc.DataC
log.Println("Start streaming")
// send screenshot
go func() {
defer func() {
if r := recover(); r != nil {
fmt.Println("Recovered when sent to close Image Channel")
}
}()
for w.isConnected {
yuv := <-w.ImageChannel
yuv, ok := <-w.ImageChannel
if !ok {
log.Println("Screenshot from emulator closed")
return
}
if len(w.encoder.Input) < cap(w.encoder.Input) {
w.encoder.Input <- yuv
}
@ -292,10 +274,21 @@ func (w *WebRTC) startStreaming(vp8Track *webrtc.Track, audioTrack *webrtc.DataC
// receive frame buffer
go func() {
defer func() {
if r := recover(); r != nil {
fmt.Println("Recovered when sent to closed encoder output channel")
}
}()
for w.isConnected {
bs := <-w.encoder.Output
bs, ok := <-w.encoder.Output
if !ok {
log.Println("WebRTC Video sending Closed")
return
}
if *config.IsMonitor {
log.Println("FPS : ", w.calculateFPS())
log.Println("Encoding FPS : ", w.calculateFPS())
}
vp8Track.WriteSample(media.Sample{Data: bs, Samples: 1})
}
@ -303,10 +296,19 @@ func (w *WebRTC) startStreaming(vp8Track *webrtc.Track, audioTrack *webrtc.DataC
// send audio
go func() {
defer func() {
if r := recover(); r != nil {
fmt.Println("Recovered when sent to closed Audio Channel")
}
}()
for w.isConnected {
data := <-w.AudioChannel
// time.Sleep()
// time.Sleep(time.Millisecond * time.Duration(rand.Intn(100)))
data, ok := <-w.AudioChannel
if !ok {
log.Println("WebRTC Audio sending Closed")
return
}
audioTrack.Send(data)
}
}()