Refactor: Fix channel init order (#141)

* Fix channel order

* Remove Done in encoder

* Update h264 encoder done code

* remove Unnecessary IsRunning
This commit is contained in:
giongto35 2019-11-27 02:38:13 +08:00 committed by GitHub
parent e8fbd28b2f
commit 0b9d628ec2
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 41 additions and 78 deletions

View file

@ -65,8 +65,10 @@ var NAEmulator *naEmulator
var outputImg *image.RGBA
// NAEmulator implements CloudEmulator interface based on NanoArch(golang RetroArch)
func NewNAEmulator(etype string, roomID string, imageChannel chan<- *image.RGBA, audioChannel chan<- []int16, inputChannel <-chan int) *naEmulator {
func NewNAEmulator(etype string, roomID string, inputChannel <-chan int) (*naEmulator, chan *image.RGBA, chan []int16) {
meta := config.EmulatorConfig[etype]
imageChannel := make(chan *image.RGBA, 30)
audioChannel := make(chan []int16, 30)
return &naEmulator{
meta: meta,
@ -76,13 +78,17 @@ func NewNAEmulator(etype string, roomID string, imageChannel chan<- *image.RGBA,
keys: make([]bool, joypadNumKeys),
roomID: roomID,
done: make(chan struct{}, 1),
}
}, imageChannel, audioChannel
}
// Init initialize new RetroArch cloud emulator
func Init(etype string, roomID string, imageChannel chan<- *image.RGBA, audioChannel chan<- []int16, inputChannel <-chan int) {
NAEmulator = NewNAEmulator(etype, roomID, imageChannel, audioChannel, inputChannel)
func Init(etype string, roomID string, inputChannel <-chan int) (*naEmulator, chan *image.RGBA, chan []int16) {
emulator, imageChannel, audioChannel := NewNAEmulator(etype, roomID, inputChannel)
// Set to global NAEmulator
NAEmulator = emulator
go NAEmulator.listenInput()
return emulator, imageChannel, audioChannel
}
func (na *naEmulator) listenInput() {
@ -130,6 +136,8 @@ func (na *naEmulator) Start() {
// Slow response here
case <-na.done:
nanoarchShutdown()
close(na.imageChannel)
close(na.audioChannel)
log.Println("Closed Director")
return
default:

View file

@ -20,8 +20,6 @@ type H264Encoder struct {
buf *bytes.Buffer
enc *x264.Encoder
IsRunning bool
Done bool
// C
width int
height int
@ -34,9 +32,6 @@ func NewH264Encoder(width, height, fps int) (encoder.Encoder, error) {
Output: make(chan []byte, 5*chanSize),
Input: make(chan *image.RGBA, chanSize),
IsRunning: true,
Done: false,
buf: bytes.NewBuffer(make([]byte, 0)),
width: width,
height: height,
@ -51,8 +46,6 @@ func NewH264Encoder(width, height, fps int) (encoder.Encoder, error) {
}
func (v *H264Encoder) init() error {
v.IsRunning = true
opts := &x264.Options{
Width: v.width,
Height: v.height,
@ -79,21 +72,9 @@ func (v *H264Encoder) startLooping() {
log.Println("Warn: Recovered panic in encoding ", r)
log.Println(debug.Stack())
}
if v.Done == true {
// The first time we see IsRunning set to false, we release and return
v.release()
return
}
}()
for img := range v.Input {
if v.Done == true {
// The first time we see IsRunning set to false, we release and return
v.release()
return
}
err := v.enc.Encode(img)
if err != nil {
log.Println("err encoding ", img, " using h264")
@ -105,17 +86,13 @@ func (v *H264Encoder) startLooping() {
// Release release memory and stop loop
func (v *H264Encoder) release() {
if v.IsRunning {
v.IsRunning = false
log.Println("Releasing encoder")
// TODO: Bug here, after close it will signal
close(v.Output)
err := v.enc.Close()
if err != nil {
log.Println("Failed to close H264 encoder")
}
log.Println("Releasing encoder")
// TODO: Bug here, after close it will signal
close(v.Output)
err := v.enc.Close()
if err != nil {
log.Println("Failed to close H264 encoder")
}
// TODO: Can we merge IsRunning and Done together
}
// GetInputChan returns input channel
@ -130,6 +107,5 @@ func (v *H264Encoder) GetOutputChan() chan []byte {
// GetDoneChan returns done channel
func (v *H264Encoder) Stop() {
v.Done = true
close(v.Input)
v.release()
}

View file

@ -49,9 +49,6 @@ type VpxEncoder struct {
Output chan []byte // frame
Input chan *image.RGBA // yuvI420
IsRunning bool
Done bool
width int
height int
// C
@ -70,8 +67,6 @@ func NewVpxEncoder(w, h, fps, bitrate, keyframe int) (encoder.Encoder, error) {
Output: make(chan []byte, 5*chanSize),
Input: make(chan *image.RGBA, chanSize),
IsRunning: true,
Done: false,
// C
width: w,
height: h,
@ -116,7 +111,6 @@ func (v *VpxEncoder) init() error {
if C.call_vpx_codec_enc_init(&v.vpxCodexCtx, encoder, &cfg) != 0 {
return fmt.Errorf("Failed to initialize encoder")
}
v.IsRunning = true
go v.startLooping()
return nil
}
@ -126,24 +120,12 @@ func (v *VpxEncoder) startLooping() {
if r := recover(); r != nil {
log.Println("Warn: Recovered panic in encoding ", r)
}
if v.Done == true {
// The first time we see IsRunning set to false, we release and return
v.release()
return
}
}()
size := int(float32(v.width*v.height) * 1.5)
yuv := make([]byte, size, size)
for img := range v.Input {
if v.Done == true {
// The first time we see IsRunning set to false, we release and return
v.release()
return
}
util.RgbaToYuvInplace(img, yuv, v.width, v.height)
// Add Image
@ -175,18 +157,14 @@ func (v *VpxEncoder) startLooping() {
// Release release memory and stop loop
func (v *VpxEncoder) release() {
if v.IsRunning {
v.IsRunning = false
log.Println("Releasing encoder")
C.vpx_img_free(&v.vpxImage)
C.vpx_codec_destroy(&v.vpxCodexCtx)
// TODO: Bug here, after close it will signal
close(v.Output)
if v.Input != nil {
close(v.Input)
}
log.Println("Releasing encoder")
C.vpx_img_free(&v.vpxImage)
C.vpx_codec_destroy(&v.vpxCodexCtx)
// TODO: Bug here, after close it will signal
close(v.Output)
if v.Input != nil {
close(v.Input)
}
// TODO: Can we merge IsRunning and Done together
}
// GetInputChan returns input channel
@ -201,5 +179,5 @@ func (v *VpxEncoder) GetOutputChan() chan []byte {
// GetDoneChan returns done channel
func (v *VpxEncoder) Stop() {
v.Done = true
v.release()
}

View file

@ -215,16 +215,17 @@ func getServerIDOfRoom(oc *OverlordClient, roomID string) string {
return packet.Data
}
func (h *Handler) startGameHandler(gameName, roomID string, playerIndex int, peerconnection *webrtc.WebRTC, videoEncoderType string) *room.Room {
// startGameHandler starts a game if roomID is given, if not create new room
func (h *Handler) startGameHandler(gameName, existedRoomID string, playerIndex int, peerconnection *webrtc.WebRTC, videoEncoderType string) *room.Room {
log.Println("Starting game", gameName)
// If we are connecting to overlord, request corresponding serverID based on roomID
// TODO: check if roomID is in the current server
room := h.getRoom(roomID)
log.Println("Got Room from local ", room, " ID: ", roomID)
// TODO: check if existedRoomID is in the current server
room := h.getRoom(existedRoomID)
// If room is not running
if room == nil {
log.Println("Got Room from local ", room, " ID: ", existedRoomID)
// Create new room
room = h.createNewRoom(gameName, roomID, playerIndex, videoEncoderType)
room = h.createNewRoom(gameName, existedRoomID, playerIndex, videoEncoderType)
// Wait for done signal from room
go func() {
<-room.Done
@ -232,7 +233,7 @@ func (h *Handler) startGameHandler(gameName, roomID string, playerIndex int, pee
// send signal to overlord that the room is closed, overlord will remove that room
h.oClient.Send(cws.WSPacket{
ID: "closeRoom",
Data: roomID,
Data: room.ID,
}, nil)
}()
}
@ -248,7 +249,7 @@ func (h *Handler) startGameHandler(gameName, roomID string, playerIndex int, pee
if room != nil && h.oClient != nil {
h.oClient.Send(cws.WSPacket{
ID: "registerRoom",
Data: roomID,
Data: room.ID,
}, nil)
}

View file

@ -70,15 +70,11 @@ func NewRoom(roomID string, gameName string, videoEncoderType string, onlineStor
gameInfo := gamelist.GetGameInfoFromName(gameName)
log.Println("Init new room: ", roomID, gameName, gameInfo)
imageChannel := make(chan *image.RGBA, 30)
audioChannel := make(chan []int16, 30)
inputChannel := make(chan int, 100)
room := &Room{
ID: roomID,
imageChannel: imageChannel,
audioChannel: audioChannel,
inputChannel: inputChannel,
rtcSessions: []*webrtc.WebRTC{},
sessionsLock: &sync.Mutex{},
@ -102,7 +98,12 @@ func NewRoom(roomID string, gameName string, videoEncoderType string, onlineStor
// Spawn new emulator based on gameName and plug-in all channels
emuName, _ := config.FileTypeToEmulator[game.Type]
room.director = getEmulator(emuName, roomID, imageChannel, audioChannel, inputChannel)
director, imageChannel, audioChannel := nanoarch.Init(emuName, roomID, inputChannel)
room.director = director
room.imageChannel = imageChannel
room.audioChannel = audioChannel
gameMeta := room.director.LoadMeta(game.Path)
// nwidth, nheight are the webRTC output size.
@ -153,7 +154,6 @@ func resizeToAspect(ratio float64, sw int, sh int) (dw int, dh int) {
// getEmulator creates new emulator and run it
func getEmulator(emuName string, roomID string, imageChannel chan<- *image.RGBA, audioChannel chan<- []int16, inputChannel <-chan int) emulator.CloudEmulator {
nanoarch.Init(emuName, roomID, imageChannel, audioChannel, inputChannel)
return nanoarch.NAEmulator
}