diff --git a/pkg/emulator/libretro/nanoarch/naemulator.go b/pkg/emulator/libretro/nanoarch/naemulator.go index ad982523..4513c8e5 100644 --- a/pkg/emulator/libretro/nanoarch/naemulator.go +++ b/pkg/emulator/libretro/nanoarch/naemulator.go @@ -65,8 +65,10 @@ var NAEmulator *naEmulator var outputImg *image.RGBA // NAEmulator implements CloudEmulator interface based on NanoArch(golang RetroArch) -func NewNAEmulator(etype string, roomID string, imageChannel chan<- *image.RGBA, audioChannel chan<- []int16, inputChannel <-chan int) *naEmulator { +func NewNAEmulator(etype string, roomID string, inputChannel <-chan int) (*naEmulator, chan *image.RGBA, chan []int16) { meta := config.EmulatorConfig[etype] + imageChannel := make(chan *image.RGBA, 30) + audioChannel := make(chan []int16, 30) return &naEmulator{ meta: meta, @@ -76,13 +78,17 @@ func NewNAEmulator(etype string, roomID string, imageChannel chan<- *image.RGBA, keys: make([]bool, joypadNumKeys), roomID: roomID, done: make(chan struct{}, 1), - } + }, imageChannel, audioChannel } // Init initialize new RetroArch cloud emulator -func Init(etype string, roomID string, imageChannel chan<- *image.RGBA, audioChannel chan<- []int16, inputChannel <-chan int) { - NAEmulator = NewNAEmulator(etype, roomID, imageChannel, audioChannel, inputChannel) +func Init(etype string, roomID string, inputChannel <-chan int) (*naEmulator, chan *image.RGBA, chan []int16) { + emulator, imageChannel, audioChannel := NewNAEmulator(etype, roomID, inputChannel) + // Set to global NAEmulator + NAEmulator = emulator + go NAEmulator.listenInput() + return emulator, imageChannel, audioChannel } func (na *naEmulator) listenInput() { @@ -130,6 +136,8 @@ func (na *naEmulator) Start() { // Slow response here case <-na.done: nanoarchShutdown() + close(na.imageChannel) + close(na.audioChannel) log.Println("Closed Director") return default: diff --git a/pkg/encoder/h264encoder/encoder.go b/pkg/encoder/h264encoder/encoder.go index b7d174e0..bf18dae7 100644 --- a/pkg/encoder/h264encoder/encoder.go +++ b/pkg/encoder/h264encoder/encoder.go @@ -20,8 +20,6 @@ type H264Encoder struct { buf *bytes.Buffer enc *x264.Encoder - IsRunning bool - Done bool // C width int height int @@ -34,9 +32,6 @@ func NewH264Encoder(width, height, fps int) (encoder.Encoder, error) { Output: make(chan []byte, 5*chanSize), Input: make(chan *image.RGBA, chanSize), - IsRunning: true, - Done: false, - buf: bytes.NewBuffer(make([]byte, 0)), width: width, height: height, @@ -51,8 +46,6 @@ func NewH264Encoder(width, height, fps int) (encoder.Encoder, error) { } func (v *H264Encoder) init() error { - v.IsRunning = true - opts := &x264.Options{ Width: v.width, Height: v.height, @@ -79,21 +72,9 @@ func (v *H264Encoder) startLooping() { log.Println("Warn: Recovered panic in encoding ", r) log.Println(debug.Stack()) } - - if v.Done == true { - // The first time we see IsRunning set to false, we release and return - v.release() - return - } }() for img := range v.Input { - if v.Done == true { - // The first time we see IsRunning set to false, we release and return - v.release() - return - } - err := v.enc.Encode(img) if err != nil { log.Println("err encoding ", img, " using h264") @@ -105,17 +86,13 @@ func (v *H264Encoder) startLooping() { // Release release memory and stop loop func (v *H264Encoder) release() { - if v.IsRunning { - v.IsRunning = false - log.Println("Releasing encoder") - // TODO: Bug here, after close it will signal - close(v.Output) - err := v.enc.Close() - if err != nil { - log.Println("Failed to close H264 encoder") - } + log.Println("Releasing encoder") + // TODO: Bug here, after close it will signal + close(v.Output) + err := v.enc.Close() + if err != nil { + log.Println("Failed to close H264 encoder") } - // TODO: Can we merge IsRunning and Done together } // GetInputChan returns input channel @@ -130,6 +107,5 @@ func (v *H264Encoder) GetOutputChan() chan []byte { // GetDoneChan returns done channel func (v *H264Encoder) Stop() { - v.Done = true - close(v.Input) + v.release() } diff --git a/pkg/encoder/vpx-encoder/encoder.go b/pkg/encoder/vpx-encoder/encoder.go index 04a492ad..9791d73f 100644 --- a/pkg/encoder/vpx-encoder/encoder.go +++ b/pkg/encoder/vpx-encoder/encoder.go @@ -49,9 +49,6 @@ type VpxEncoder struct { Output chan []byte // frame Input chan *image.RGBA // yuvI420 - IsRunning bool - Done bool - width int height int // C @@ -70,8 +67,6 @@ func NewVpxEncoder(w, h, fps, bitrate, keyframe int) (encoder.Encoder, error) { Output: make(chan []byte, 5*chanSize), Input: make(chan *image.RGBA, chanSize), - IsRunning: true, - Done: false, // C width: w, height: h, @@ -116,7 +111,6 @@ func (v *VpxEncoder) init() error { if C.call_vpx_codec_enc_init(&v.vpxCodexCtx, encoder, &cfg) != 0 { return fmt.Errorf("Failed to initialize encoder") } - v.IsRunning = true go v.startLooping() return nil } @@ -126,24 +120,12 @@ func (v *VpxEncoder) startLooping() { if r := recover(); r != nil { log.Println("Warn: Recovered panic in encoding ", r) } - - if v.Done == true { - // The first time we see IsRunning set to false, we release and return - v.release() - return - } }() size := int(float32(v.width*v.height) * 1.5) yuv := make([]byte, size, size) for img := range v.Input { - if v.Done == true { - // The first time we see IsRunning set to false, we release and return - v.release() - return - } - util.RgbaToYuvInplace(img, yuv, v.width, v.height) // Add Image @@ -175,18 +157,14 @@ func (v *VpxEncoder) startLooping() { // Release release memory and stop loop func (v *VpxEncoder) release() { - if v.IsRunning { - v.IsRunning = false - log.Println("Releasing encoder") - C.vpx_img_free(&v.vpxImage) - C.vpx_codec_destroy(&v.vpxCodexCtx) - // TODO: Bug here, after close it will signal - close(v.Output) - if v.Input != nil { - close(v.Input) - } + log.Println("Releasing encoder") + C.vpx_img_free(&v.vpxImage) + C.vpx_codec_destroy(&v.vpxCodexCtx) + // TODO: Bug here, after close it will signal + close(v.Output) + if v.Input != nil { + close(v.Input) } - // TODO: Can we merge IsRunning and Done together } // GetInputChan returns input channel @@ -201,5 +179,5 @@ func (v *VpxEncoder) GetOutputChan() chan []byte { // GetDoneChan returns done channel func (v *VpxEncoder) Stop() { - v.Done = true + v.release() } diff --git a/pkg/worker/overlord.go b/pkg/worker/overlord.go index 841a3537..e5ca3b23 100644 --- a/pkg/worker/overlord.go +++ b/pkg/worker/overlord.go @@ -215,16 +215,17 @@ func getServerIDOfRoom(oc *OverlordClient, roomID string) string { return packet.Data } -func (h *Handler) startGameHandler(gameName, roomID string, playerIndex int, peerconnection *webrtc.WebRTC, videoEncoderType string) *room.Room { +// startGameHandler starts a game if roomID is given, if not create new room +func (h *Handler) startGameHandler(gameName, existedRoomID string, playerIndex int, peerconnection *webrtc.WebRTC, videoEncoderType string) *room.Room { log.Println("Starting game", gameName) // If we are connecting to overlord, request corresponding serverID based on roomID - // TODO: check if roomID is in the current server - room := h.getRoom(roomID) - log.Println("Got Room from local ", room, " ID: ", roomID) + // TODO: check if existedRoomID is in the current server + room := h.getRoom(existedRoomID) // If room is not running if room == nil { + log.Println("Got Room from local ", room, " ID: ", existedRoomID) // Create new room - room = h.createNewRoom(gameName, roomID, playerIndex, videoEncoderType) + room = h.createNewRoom(gameName, existedRoomID, playerIndex, videoEncoderType) // Wait for done signal from room go func() { <-room.Done @@ -232,7 +233,7 @@ func (h *Handler) startGameHandler(gameName, roomID string, playerIndex int, pee // send signal to overlord that the room is closed, overlord will remove that room h.oClient.Send(cws.WSPacket{ ID: "closeRoom", - Data: roomID, + Data: room.ID, }, nil) }() } @@ -248,7 +249,7 @@ func (h *Handler) startGameHandler(gameName, roomID string, playerIndex int, pee if room != nil && h.oClient != nil { h.oClient.Send(cws.WSPacket{ ID: "registerRoom", - Data: roomID, + Data: room.ID, }, nil) } diff --git a/pkg/worker/room/room.go b/pkg/worker/room/room.go index 5751e9ba..1fe56b0f 100644 --- a/pkg/worker/room/room.go +++ b/pkg/worker/room/room.go @@ -70,15 +70,11 @@ func NewRoom(roomID string, gameName string, videoEncoderType string, onlineStor gameInfo := gamelist.GetGameInfoFromName(gameName) log.Println("Init new room: ", roomID, gameName, gameInfo) - imageChannel := make(chan *image.RGBA, 30) - audioChannel := make(chan []int16, 30) inputChannel := make(chan int, 100) room := &Room{ ID: roomID, - imageChannel: imageChannel, - audioChannel: audioChannel, inputChannel: inputChannel, rtcSessions: []*webrtc.WebRTC{}, sessionsLock: &sync.Mutex{}, @@ -102,7 +98,12 @@ func NewRoom(roomID string, gameName string, videoEncoderType string, onlineStor // Spawn new emulator based on gameName and plug-in all channels emuName, _ := config.FileTypeToEmulator[game.Type] - room.director = getEmulator(emuName, roomID, imageChannel, audioChannel, inputChannel) + + director, imageChannel, audioChannel := nanoarch.Init(emuName, roomID, inputChannel) + room.director = director + room.imageChannel = imageChannel + room.audioChannel = audioChannel + gameMeta := room.director.LoadMeta(game.Path) // nwidth, nheight are the webRTC output size. @@ -153,7 +154,6 @@ func resizeToAspect(ratio float64, sw int, sh int) (dw int, dh int) { // getEmulator creates new emulator and run it func getEmulator(emuName string, roomID string, imageChannel chan<- *image.RGBA, audioChannel chan<- []int16, inputChannel <-chan int) emulator.CloudEmulator { - nanoarch.Init(emuName, roomID, imageChannel, audioChannel, inputChannel) return nanoarch.NAEmulator }