From 85bb99f8f82b75ff04298fa7f54f1982cff2f8d9 Mon Sep 17 00:00:00 2001 From: 88hcsif <62969634+88hcsif@users.noreply.github.com> Date: Sun, 28 Jun 2020 10:25:23 +0100 Subject: [PATCH] Explicitly set RTP timestamp to cope with variable frame rate. (#211) Set the timestamp as early as possible and propagate it through the pipeline. --- pkg/emulator/libretro/nanoarch/naemulator.go | 13 +++++++++---- pkg/emulator/libretro/nanoarch/nanoarch.go | 10 ++++++++-- pkg/encoder/h264encoder/encoder.go | 19 +++++++++---------- pkg/encoder/type.go | 14 ++++++++++++-- pkg/encoder/vpx-encoder/encoder.go | 17 ++++++++--------- pkg/webrtc/webrtc.go | 20 +++++++++++++++----- pkg/worker/room/media.go | 17 +++++++++-------- pkg/worker/room/room.go | 5 ++--- 8 files changed, 72 insertions(+), 43 deletions(-) diff --git a/pkg/emulator/libretro/nanoarch/naemulator.go b/pkg/emulator/libretro/nanoarch/naemulator.go index 2ca3eaae..7fa8e163 100644 --- a/pkg/emulator/libretro/nanoarch/naemulator.go +++ b/pkg/emulator/libretro/nanoarch/naemulator.go @@ -55,7 +55,7 @@ type constrollerState struct { // naEmulator implements CloudEmulator type naEmulator struct { - imageChannel chan<- *image.RGBA + imageChannel chan<- GameFrame audioChannel chan<- []int16 inputChannel <-chan InputEvent @@ -78,15 +78,20 @@ type InputEvent struct { ConnID string } +type GameFrame struct { + Image *image.RGBA + Timestamp uint32 +} + var NAEmulator *naEmulator var outputImg *image.RGBA const maxPort = 8 // NAEmulator implements CloudEmulator interface based on NanoArch(golang RetroArch) -func NewNAEmulator(etype string, roomID string, inputChannel <-chan InputEvent) (*naEmulator, chan *image.RGBA, chan []int16) { +func NewNAEmulator(etype string, roomID string, inputChannel <-chan InputEvent) (*naEmulator, chan GameFrame, chan []int16) { meta := config.EmulatorConfig[etype] - imageChannel := make(chan *image.RGBA, 30) + imageChannel := make(chan GameFrame, 30) audioChannel := make(chan []int16, 30) return &naEmulator{ @@ -102,7 +107,7 @@ func NewNAEmulator(etype string, roomID string, inputChannel <-chan InputEvent) } // Init initialize new RetroArch cloud emulator -func Init(etype string, roomID string, inputChannel <-chan InputEvent) (*naEmulator, chan *image.RGBA, chan []int16) { +func Init(etype string, roomID string, inputChannel <-chan InputEvent) (*naEmulator, chan GameFrame, chan []int16) { emulator, imageChannel, audioChannel := NewNAEmulator(etype, roomID, inputChannel) // Set to global NAEmulator NAEmulator = emulator diff --git a/pkg/emulator/libretro/nanoarch/nanoarch.go b/pkg/emulator/libretro/nanoarch/nanoarch.go index 57f7e9b6..1cb5f408 100644 --- a/pkg/emulator/libretro/nanoarch/nanoarch.go +++ b/pkg/emulator/libretro/nanoarch/nanoarch.go @@ -6,10 +6,12 @@ import ( "fmt" stdimage "image" "log" + "math/rand" "os" "os/user" "runtime" "sync" + "time" "unsafe" "github.com/disintegration/imaging" @@ -102,6 +104,8 @@ var systemDirectory = C.CString("./pkg/emulator/libretro/system") var saveDirectory = C.CString(".") var currentUser *C.char +var seed = rand.New(rand.NewSource(time.Now().UnixNano())).Uint32() + var bindKeysMap = map[int]int{ C.RETRO_DEVICE_ID_JOYPAD_A: 0, C.RETRO_DEVICE_ID_JOYPAD_B: 1, @@ -135,6 +139,8 @@ func coreVideoRefresh(data unsafe.Pointer, width C.unsigned, height C.unsigned, if data == nil { return } + // divide by 8333 to give us the equivalent of a 120fps resolution + timestamp := uint32(time.Now().UnixNano() / 8333) + seed if (data == C.RETRO_HW_FRAME_BUFFER_VALID) { im := stdimage.NewNRGBA(stdimage.Rect(0, 0, int(width), int(height))) @@ -147,7 +153,7 @@ func coreVideoRefresh(data unsafe.Pointer, width C.unsigned, height C.unsigned, Stride: im.Stride, Rect: im.Rect, } - NAEmulator.imageChannel <- rgba + NAEmulator.imageChannel <- GameFrame{ Image: rgba, Timestamp: timestamp } return } @@ -170,7 +176,7 @@ func coreVideoRefresh(data unsafe.Pointer, width C.unsigned, height C.unsigned, // the image is pushed into a channel // where it will be distributed with fan-out - NAEmulator.imageChannel <- outputImg + NAEmulator.imageChannel <- GameFrame{ Image: outputImg, Timestamp: timestamp } } //export coreInputPoll diff --git a/pkg/encoder/h264encoder/encoder.go b/pkg/encoder/h264encoder/encoder.go index bf18dae7..dd60993f 100644 --- a/pkg/encoder/h264encoder/encoder.go +++ b/pkg/encoder/h264encoder/encoder.go @@ -2,7 +2,6 @@ package h264encoder import ( "bytes" - "image" "log" "runtime/debug" @@ -14,8 +13,8 @@ const chanSize = 2 // H264Encoder yuvI420 image to vp8 video type H264Encoder struct { - Output chan []byte // frame - Input chan *image.RGBA // yuvI420 + Output chan encoder.OutFrame + Input chan encoder.InFrame buf *bytes.Buffer enc *x264.Encoder @@ -29,8 +28,8 @@ type H264Encoder struct { // NewH264Encoder create h264 encoder func NewH264Encoder(width, height, fps int) (encoder.Encoder, error) { v := &H264Encoder{ - Output: make(chan []byte, 5*chanSize), - Input: make(chan *image.RGBA, chanSize), + Output: make(chan encoder.OutFrame, 5*chanSize), + Input: make(chan encoder.InFrame, chanSize), buf: bytes.NewBuffer(make([]byte, 0)), width: width, @@ -75,11 +74,11 @@ func (v *H264Encoder) startLooping() { }() for img := range v.Input { - err := v.enc.Encode(img) + err := v.enc.Encode(img.Image) if err != nil { - log.Println("err encoding ", img, " using h264") + log.Println("err encoding ", img.Image, " using h264") } - v.Output <- v.buf.Bytes() + v.Output <- encoder.OutFrame{ Data: v.buf.Bytes(), Timestamp: img.Timestamp } v.buf.Reset() } } @@ -96,12 +95,12 @@ func (v *H264Encoder) release() { } // GetInputChan returns input channel -func (v *H264Encoder) GetInputChan() chan *image.RGBA { +func (v *H264Encoder) GetInputChan() chan encoder.InFrame { return v.Input } // GetInputChan returns output channel -func (v *H264Encoder) GetOutputChan() chan []byte { +func (v *H264Encoder) GetOutputChan() chan encoder.OutFrame { return v.Output } diff --git a/pkg/encoder/type.go b/pkg/encoder/type.go index ec93a31a..4bfd8f05 100644 --- a/pkg/encoder/type.go +++ b/pkg/encoder/type.go @@ -2,8 +2,18 @@ package encoder import "image" +type InFrame struct { + Image *image.RGBA + Timestamp uint32 +} + +type OutFrame struct { + Data []byte + Timestamp uint32 +} + type Encoder interface { - GetInputChan() chan *image.RGBA - GetOutputChan() chan []byte + GetInputChan() chan InFrame + GetOutputChan() chan OutFrame Stop() } diff --git a/pkg/encoder/vpx-encoder/encoder.go b/pkg/encoder/vpx-encoder/encoder.go index 9791d73f..5b3fb250 100644 --- a/pkg/encoder/vpx-encoder/encoder.go +++ b/pkg/encoder/vpx-encoder/encoder.go @@ -2,7 +2,6 @@ package vpxencoder import ( "fmt" - "image" "log" "unsafe" @@ -46,8 +45,8 @@ const chanSize = 2 // VpxEncoder yuvI420 image to vp8 video type VpxEncoder struct { - Output chan []byte // frame - Input chan *image.RGBA // yuvI420 + Output chan encoder.OutFrame + Input chan encoder.InFrame width int height int @@ -64,8 +63,8 @@ type VpxEncoder struct { // NewVpxEncoder create vp8 encoder func NewVpxEncoder(w, h, fps, bitrate, keyframe int) (encoder.Encoder, error) { v := &VpxEncoder{ - Output: make(chan []byte, 5*chanSize), - Input: make(chan *image.RGBA, chanSize), + Output: make(chan encoder.OutFrame, 5*chanSize), + Input: make(chan encoder.InFrame, chanSize), // C width: w, @@ -126,7 +125,7 @@ func (v *VpxEncoder) startLooping() { yuv := make([]byte, size, size) for img := range v.Input { - util.RgbaToYuvInplace(img, yuv, v.width, v.height) + util.RgbaToYuvInplace(img.Image, yuv, v.width, v.height) // Add Image v.vpxCodexIter = nil @@ -151,7 +150,7 @@ func (v *VpxEncoder) startLooping() { if len(v.Output) >= cap(v.Output) { continue } - v.Output <- bs + v.Output <- encoder.OutFrame{ Data: bs, Timestamp: img.Timestamp } } } @@ -168,12 +167,12 @@ func (v *VpxEncoder) release() { } // GetInputChan returns input channel -func (v *VpxEncoder) GetInputChan() chan *image.RGBA { +func (v *VpxEncoder) GetInputChan() chan encoder.InFrame { return v.Input } // GetInputChan returns output channel -func (v *VpxEncoder) GetOutputChan() chan []byte { +func (v *VpxEncoder) GetOutputChan() chan encoder.OutFrame { return v.Output } diff --git a/pkg/webrtc/webrtc.go b/pkg/webrtc/webrtc.go index 87e90e56..f20d1de5 100644 --- a/pkg/webrtc/webrtc.go +++ b/pkg/webrtc/webrtc.go @@ -25,6 +25,11 @@ type InputDataPair struct { time time.Time } +type WebFrame struct { + Data []byte + Timestamp uint32 +} + // WebRTC connection type WebRTC struct { ID string @@ -33,7 +38,7 @@ type WebRTC struct { isConnected bool isClosed bool // for yuvI420 image - ImageChannel chan []byte + ImageChannel chan WebFrame AudioChannel chan []byte VoiceInChannel chan []byte VoiceOutChannel chan []byte @@ -86,7 +91,7 @@ func NewWebRTC() *WebRTC { w := &WebRTC{ ID: uuid.Must(uuid.NewV4()).String(), - ImageChannel: make(chan []byte, 30), + ImageChannel: make(chan WebFrame, 30), AudioChannel: make(chan []byte, 1), VoiceInChannel: make(chan []byte, 1), VoiceOutChannel: make(chan []byte, 1), @@ -317,9 +322,14 @@ func (w *WebRTC) startStreaming(vp8Track *webrtc.Track, opusTrack *webrtc.Track) }() for data := range w.ImageChannel { - err := vp8Track.WriteSample(media.Sample{Data: data, Samples: 1}) - if err != nil { - log.Println("Warn: Err write sample: ", err) + packets := vp8Track.Packetizer().Packetize(data.Data, 1) + for _, p := range packets { + p.Header.Timestamp = data.Timestamp + err := vp8Track.WriteRTP(p) + if err != nil { + log.Println("Warn: Err write sample: ", err) + break + } } } }() diff --git a/pkg/worker/room/media.go b/pkg/worker/room/media.go index 5718ed3c..33f22ee3 100644 --- a/pkg/worker/room/media.go +++ b/pkg/worker/room/media.go @@ -9,6 +9,7 @@ import ( "github.com/giongto35/cloud-game/pkg/encoder/h264encoder" vpxencoder "github.com/giongto35/cloud-game/pkg/encoder/vpx-encoder" "github.com/giongto35/cloud-game/pkg/util" + "github.com/giongto35/cloud-game/pkg/webrtc" "gopkg.in/hraban/opus.v2" ) @@ -132,26 +133,26 @@ func (r *Room) startAudio(sampleRate int) { // startVideo listen from imageChannel and push to Encoder. The output of encoder will be pushed to webRTC func (r *Room) startVideo(width, height int, videoEncoderType string) { - var encoder encoder.Encoder + var enc encoder.Encoder var err error log.Println("Video Encoder: ", videoEncoderType) if videoEncoderType == config.CODEC_H264 { - encoder, err = h264encoder.NewH264Encoder(width, height, 1) + enc, err = h264encoder.NewH264Encoder(width, height, 1) } else { - encoder, err = vpxencoder.NewVpxEncoder(width, height, 20, 1200, 5) + enc, err = vpxencoder.NewVpxEncoder(width, height, 20, 1200, 5) } defer func() { - encoder.Stop() + enc.Stop() }() if err != nil { fmt.Println("error create new encoder", err) return } - einput := encoder.GetInputChan() - eoutput := encoder.GetOutputChan() + einput := enc.GetInputChan() + eoutput := enc.GetOutputChan() // send screenshot go func() { @@ -169,7 +170,7 @@ func (r *Room) startVideo(width, height int, videoEncoderType string) { // fanout imageChannel if webRTC.IsConnected() { // NOTE: can block here - webRTC.ImageChannel <- data + webRTC.ImageChannel <- webrtc.WebFrame{ Data: data.Data, Timestamp: data.Timestamp } } } } @@ -181,7 +182,7 @@ func (r *Room) startVideo(width, height int, videoEncoderType string) { return } if len(einput) < cap(einput) { - einput <- image + einput <- encoder.InFrame{ Image: image.Image, Timestamp: image.Timestamp } } } } diff --git a/pkg/worker/room/room.go b/pkg/worker/room/room.go index 6483a870..ddb506b5 100644 --- a/pkg/worker/room/room.go +++ b/pkg/worker/room/room.go @@ -1,7 +1,6 @@ package room import ( - "image" "io/ioutil" "log" "math" @@ -29,7 +28,7 @@ type Room struct { ID string // imageChannel is image stream received from director - imageChannel <-chan *image.RGBA + imageChannel <-chan nanoarch.GameFrame // audioChannel is audio stream received from director audioChannel <-chan []int16 // inputChannel is input stream send to director. This inputChannel is combined @@ -169,7 +168,7 @@ func resizeToAspect(ratio float64, sw int, sh int) (dw int, dh int) { } // getEmulator creates new emulator and run it -func getEmulator(emuName string, roomID string, imageChannel chan<- *image.RGBA, audioChannel chan<- []int16, inputChannel <-chan int) emulator.CloudEmulator { +func getEmulator(emuName string, roomID string, imageChannel chan<- nanoarch.GameFrame, audioChannel chan<- []int16, inputChannel <-chan int) emulator.CloudEmulator { return nanoarch.NAEmulator }