mirror of
https://github.com/giongto35/cloud-game.git
synced 2026-01-23 02:34:42 +00:00
Dynamic audio buf
* Ugly audio buf * Use dynamic Opus frames with config
This commit is contained in:
parent
f54089e072
commit
ed3b195b26
9 changed files with 235 additions and 154 deletions
|
|
@ -300,7 +300,12 @@ encoder:
|
|||
# audio frame duration needed for WebRTC (Opus)
|
||||
# most of the emulators have ~1400 samples per a video frame,
|
||||
# so we keep the frame buffer roughly half of that size or 2 RTC packets per frame
|
||||
# (deprecated) due to frames
|
||||
frame: 10
|
||||
# dynamic frames for Opus encoder
|
||||
frames:
|
||||
- 10
|
||||
- 5
|
||||
video:
|
||||
# h264, vpx (vp8) or vp9
|
||||
codec: h264
|
||||
|
|
|
|||
|
|
@ -9,8 +9,10 @@ import (
|
|||
func TestConfigEnv(t *testing.T) {
|
||||
var out WorkerConfig
|
||||
|
||||
_ = os.Setenv("CLOUD_GAME_ENCODER_AUDIO_FRAME", "33")
|
||||
defer func() { _ = os.Unsetenv("CLOUD_GAME_ENCODER_AUDIO_FRAME") }()
|
||||
_ = os.Setenv("CLOUD_GAME_ENCODER_AUDIO_FRAMES[0]", "10")
|
||||
_ = os.Setenv("CLOUD_GAME_ENCODER_AUDIO_FRAMES[1]", "5")
|
||||
defer func() { _ = os.Unsetenv("CLOUD_GAME_ENCODER_AUDIO_FRAMES[0]") }()
|
||||
defer func() { _ = os.Unsetenv("CLOUD_GAME_ENCODER_AUDIO_FRAMES[1]") }()
|
||||
|
||||
_ = os.Setenv("CLOUD_GAME_EMULATOR_LIBRETRO_CORES_LIST_PCSX_OPTIONS__PCSX_REARMED_DRC", "x")
|
||||
defer func() {
|
||||
|
|
@ -22,8 +24,11 @@ func TestConfigEnv(t *testing.T) {
|
|||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if out.Encoder.Audio.Frame != 33 {
|
||||
t.Errorf("%v is not 33", out.Encoder.Audio.Frame)
|
||||
for i, x := range []float32{10, 5} {
|
||||
if out.Encoder.Audio.Frames[i] != x {
|
||||
t.Errorf("%v is not [10, 5]", out.Encoder.Audio.Frames)
|
||||
t.Failed()
|
||||
}
|
||||
}
|
||||
|
||||
v := out.Emulator.Libretro.Cores.List["pcsx"].Options["pcsx_rearmed_drc"]
|
||||
|
|
|
|||
|
|
@ -51,7 +51,7 @@ type Encoder struct {
|
|||
}
|
||||
|
||||
type Audio struct {
|
||||
Frame float32
|
||||
Frames []float32
|
||||
}
|
||||
|
||||
type Video struct {
|
||||
|
|
|
|||
|
|
@ -168,7 +168,7 @@ func (c *coordinator) HandleGameStart(rq api.StartGameRequest[com.Uid], w *Worke
|
|||
}
|
||||
|
||||
m.AudioSrcHz = app.AudioSampleRate()
|
||||
m.AudioFrame = w.conf.Encoder.Audio.Frame
|
||||
m.AudioFrames = w.conf.Encoder.Audio.Frames
|
||||
m.VideoW, m.VideoH = app.ViewportSize()
|
||||
m.VideoScale = app.Scale()
|
||||
|
||||
|
|
|
|||
119
pkg/worker/media/buffer.go
Normal file
119
pkg/worker/media/buffer.go
Normal file
|
|
@ -0,0 +1,119 @@
|
|||
package media
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"math"
|
||||
"unsafe"
|
||||
)
|
||||
|
||||
// buffer is a simple non-concurrent safe buffer for audio samples.
|
||||
type buffer struct {
|
||||
stretch bool
|
||||
frameHz []int
|
||||
|
||||
raw samples
|
||||
buckets []Bucket
|
||||
cur *Bucket
|
||||
}
|
||||
|
||||
type Bucket struct {
|
||||
mem samples
|
||||
ms float32
|
||||
lv int
|
||||
dst int
|
||||
}
|
||||
|
||||
func newBuffer(frames []float32, hz int) (*buffer, error) {
|
||||
if hz < 2000 {
|
||||
return nil, errors.New("hz should be > than 2000")
|
||||
}
|
||||
|
||||
buf := buffer{}
|
||||
|
||||
// preallocate continuous array
|
||||
s := 0
|
||||
for _, f := range frames {
|
||||
s += frame(hz, f)
|
||||
}
|
||||
buf.raw = make(samples, s)
|
||||
|
||||
next := 0
|
||||
for _, f := range frames {
|
||||
s := frame(hz, f)
|
||||
buf.buckets = append(buf.buckets, Bucket{
|
||||
mem: buf.raw[next : next+s],
|
||||
ms: f,
|
||||
})
|
||||
next += s
|
||||
}
|
||||
buf.cur = &buf.buckets[len(buf.buckets)-1]
|
||||
return &buf, nil
|
||||
}
|
||||
|
||||
func (b *buffer) choose(l int) {
|
||||
for _, bb := range b.buckets {
|
||||
if l >= len(bb.mem) {
|
||||
b.cur = &bb
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (b *buffer) resample(hz int) {
|
||||
b.stretch = true
|
||||
for i := range b.buckets {
|
||||
b.buckets[i].dst = frame(hz, float32(b.buckets[i].ms))
|
||||
}
|
||||
}
|
||||
|
||||
// write fills the buffer until it's full and then passes the gathered data into a callback.
|
||||
//
|
||||
// There are two cases to consider:
|
||||
// 1. Underflow, when the length of the written data is less than the buffer's available space.
|
||||
// 2. Overflow, when the length exceeds the current available buffer space.
|
||||
//
|
||||
// We overwrite any previous values in the buffer and move the internal write pointer
|
||||
// by the length of the written data.
|
||||
// In the first case, we won't call the callback, but it will be called every time
|
||||
// when the internal buffer overflows until all samples are read.
|
||||
func (b *buffer) write(s samples, onFull func(samples, float32)) (r int) {
|
||||
for r < len(s) {
|
||||
buf := b.cur
|
||||
w := copy(buf.mem[buf.lv:], s[r:])
|
||||
r += w
|
||||
buf.lv += w
|
||||
if buf.lv == len(buf.mem) {
|
||||
if b.stretch {
|
||||
onFull(buf.mem.stretch(buf.dst), buf.ms)
|
||||
} else {
|
||||
onFull(buf.mem, buf.ms)
|
||||
}
|
||||
b.choose(len(s) - r)
|
||||
b.cur.lv = 0
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// frame calculates an audio stereo frame size, i.e. 48k*frame/1000*2
|
||||
// with round(x / 2) * 2 for the closest even number
|
||||
func frame(hz int, frame float32) int {
|
||||
return int(math.Round(float64(hz)*float64(frame)/1000/2) * 2 * 2)
|
||||
}
|
||||
|
||||
// stretch does a simple stretching of audio samples.
|
||||
// something like: [1,2,3,4,5,6] -> [1,2,x,x,3,4,x,x,5,6,x,x] -> [1,2,1,2,3,4,3,4,5,6,5,6]
|
||||
func (s samples) stretch(size int) []int16 {
|
||||
out := buf[:size]
|
||||
n := len(s)
|
||||
ratio := float32(size) / float32(n)
|
||||
sPtr := unsafe.Pointer(&s[0])
|
||||
for i, l, r := 0, 0, 0; i < n; i += 2 {
|
||||
l, r = r, int(float32((i+2)>>1)*ratio)<<1 // index in src * ratio -> approximated index in dst *2 due to int16
|
||||
for j := l; j < r; j += 2 {
|
||||
*(*int32)(unsafe.Pointer(&out[j])) = *(*int32)(sPtr) // out[j] = s[i]; out[j+1] = s[i+1]
|
||||
}
|
||||
sPtr = unsafe.Add(sPtr, uintptr(4))
|
||||
}
|
||||
return out
|
||||
}
|
||||
77
pkg/worker/media/buffer_test.go
Normal file
77
pkg/worker/media/buffer_test.go
Normal file
|
|
@ -0,0 +1,77 @@
|
|||
package media
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"testing"
|
||||
)
|
||||
|
||||
type bufWrite struct {
|
||||
sample int16
|
||||
len int
|
||||
}
|
||||
|
||||
func TestBufferWrite(t *testing.T) {
|
||||
tests := []struct {
|
||||
bufLen int
|
||||
writes []bufWrite
|
||||
expect samples
|
||||
}{
|
||||
{
|
||||
bufLen: 2000,
|
||||
writes: []bufWrite{
|
||||
{sample: 1, len: 10},
|
||||
{sample: 2, len: 20},
|
||||
{sample: 3, len: 30},
|
||||
},
|
||||
expect: samples{3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3},
|
||||
},
|
||||
{
|
||||
bufLen: 2000,
|
||||
writes: []bufWrite{
|
||||
{sample: 1, len: 3},
|
||||
{sample: 2, len: 18},
|
||||
{sample: 3, len: 2},
|
||||
},
|
||||
expect: samples{2, 3, 3, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2},
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
var lastResult samples
|
||||
buf, err := newBuffer([]float32{10, 5}, test.bufLen)
|
||||
if err != nil {
|
||||
t.Fatalf("oof, %v", err)
|
||||
}
|
||||
for _, w := range test.writes {
|
||||
buf.write(samplesOf(w.sample, w.len),
|
||||
func(s samples, ms float32) { lastResult = s },
|
||||
)
|
||||
}
|
||||
if !reflect.DeepEqual(test.expect, lastResult) {
|
||||
t.Errorf("not expted buffer, %v != %v, %v", lastResult, test.expect, len(buf.cur.mem))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkBufferWrite(b *testing.B) {
|
||||
fn := func(_ samples, _ float32) {}
|
||||
l := 2000
|
||||
buf, err := newBuffer([]float32{10}, l)
|
||||
if err != nil {
|
||||
b.Fatalf("oof: %v", err)
|
||||
}
|
||||
samples1 := samplesOf(1, l/2)
|
||||
samples2 := samplesOf(2, l*2)
|
||||
for i := 0; i < b.N; i++ {
|
||||
buf.write(samples1, fn)
|
||||
buf.write(samples2, fn)
|
||||
}
|
||||
}
|
||||
|
||||
func samplesOf(v int16, len int) (s samples) {
|
||||
s = make(samples, len)
|
||||
for i := range s {
|
||||
s[i] = v
|
||||
}
|
||||
return
|
||||
}
|
||||
|
|
@ -2,16 +2,13 @@ package media
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"math"
|
||||
"sync"
|
||||
"time"
|
||||
"unsafe"
|
||||
|
||||
"github.com/giongto35/cloud-game/v3/pkg/config"
|
||||
"github.com/giongto35/cloud-game/v3/pkg/encoder"
|
||||
"github.com/giongto35/cloud-game/v3/pkg/encoder/opus"
|
||||
"github.com/giongto35/cloud-game/v3/pkg/logger"
|
||||
"github.com/giongto35/cloud-game/v3/pkg/worker/caged/app"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
const (
|
||||
|
|
@ -19,16 +16,7 @@ const (
|
|||
sampleBufLen = 1024 * 4
|
||||
)
|
||||
|
||||
// buffer is a simple non-concurrent safe ring buffer for audio samples.
|
||||
type (
|
||||
buffer struct {
|
||||
s samples
|
||||
wi int
|
||||
dst int
|
||||
stretch bool
|
||||
}
|
||||
samples []int16
|
||||
)
|
||||
type samples []int16
|
||||
|
||||
var (
|
||||
encoderOnce = sync.Once{}
|
||||
|
|
@ -36,39 +24,6 @@ var (
|
|||
buf = make([]int16, sampleBufLen)
|
||||
)
|
||||
|
||||
func newBuffer(srcLen int) buffer { return buffer{s: make(samples, srcLen)} }
|
||||
|
||||
// enableStretch adds a simple stretching of buffer to a desired size before
|
||||
// the onFull callback call.
|
||||
func (b *buffer) enableStretch(l int) { b.stretch = true; b.dst = l }
|
||||
|
||||
// write fills the buffer until it's full and then passes the gathered data into a callback.
|
||||
//
|
||||
// There are two cases to consider:
|
||||
// 1. Underflow, when the length of the written data is less than the buffer's available space.
|
||||
// 2. Overflow, when the length exceeds the current available buffer space.
|
||||
//
|
||||
// We overwrite any previous values in the buffer and move the internal write pointer
|
||||
// by the length of the written data.
|
||||
// In the first case, we won't call the callback, but it will be called every time
|
||||
// when the internal buffer overflows until all samples are read.
|
||||
func (b *buffer) write(s samples, onFull func(samples)) (r int) {
|
||||
for r < len(s) {
|
||||
w := copy(b.s[b.wi:], s[r:])
|
||||
r += w
|
||||
b.wi += w
|
||||
if b.wi == len(b.s) {
|
||||
b.wi = 0
|
||||
if b.stretch {
|
||||
onFull(b.s.stretch(b.dst))
|
||||
} else {
|
||||
onFull(b.s)
|
||||
}
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func DefaultOpus() (*opus.Encoder, error) {
|
||||
var err error
|
||||
encoderOnce.Do(func() { opusCoder, err = opus.NewEncoder(audioHz) })
|
||||
|
|
@ -81,34 +36,11 @@ func DefaultOpus() (*opus.Encoder, error) {
|
|||
return opusCoder, nil
|
||||
}
|
||||
|
||||
// frame calculates an audio stereo frame size, i.e. 48k*frame/1000*2
|
||||
// with round(x / 2) * 2 for the closest even number
|
||||
func frame(hz int, frame float32) int {
|
||||
return int(math.Round(float64(hz)*float64(frame)/1000/2) * 2 * 2)
|
||||
}
|
||||
|
||||
// stretch does a simple stretching of audio samples.
|
||||
// something like: [1,2,3,4,5,6] -> [1,2,x,x,3,4,x,x,5,6,x,x] -> [1,2,1,2,3,4,3,4,5,6,5,6]
|
||||
func (s samples) stretch(size int) []int16 {
|
||||
out := buf[:size]
|
||||
n := len(s)
|
||||
ratio := float32(size) / float32(n)
|
||||
sPtr := unsafe.Pointer(&s[0])
|
||||
for i, l, r := 0, 0, 0; i < n; i += 2 {
|
||||
l, r = r, int(float32((i+2)>>1)*ratio)<<1 // index in src * ratio -> approximated index in dst *2 due to int16
|
||||
for j := l; j < r; j += 2 {
|
||||
*(*int32)(unsafe.Pointer(&out[j])) = *(*int32)(sPtr) // out[j] = s[i]; out[j+1] = s[i+1]
|
||||
}
|
||||
sPtr = unsafe.Add(sPtr, uintptr(4))
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
type WebrtcMediaPipe struct {
|
||||
a *opus.Encoder
|
||||
v *encoder.Video
|
||||
onAudio func([]byte)
|
||||
audioBuf buffer
|
||||
onAudio func([]byte, float32)
|
||||
audioBuf *buffer
|
||||
log *logger.Logger
|
||||
|
||||
mua sync.RWMutex
|
||||
|
|
@ -118,7 +50,7 @@ type WebrtcMediaPipe struct {
|
|||
vConf config.Video
|
||||
|
||||
AudioSrcHz int
|
||||
AudioFrame float32
|
||||
AudioFrames []float32
|
||||
VideoW, VideoH int
|
||||
VideoScale float64
|
||||
|
||||
|
|
@ -135,8 +67,9 @@ func NewWebRtcMediaPipe(ac config.Audio, vc config.Video, log *logger.Logger) *W
|
|||
}
|
||||
|
||||
func (wmp *WebrtcMediaPipe) SetAudioCb(cb func([]byte, int32)) {
|
||||
fr := int32(time.Duration(wmp.AudioFrame) * time.Millisecond)
|
||||
wmp.onAudio = func(bytes []byte) { cb(bytes, fr) }
|
||||
wmp.onAudio = func(bytes []byte, ms float32) {
|
||||
cb(bytes, int32(time.Duration(ms)*time.Millisecond))
|
||||
}
|
||||
}
|
||||
func (wmp *WebrtcMediaPipe) Destroy() {
|
||||
v := wmp.Video()
|
||||
|
|
@ -144,10 +77,12 @@ func (wmp *WebrtcMediaPipe) Destroy() {
|
|||
v.Stop()
|
||||
}
|
||||
}
|
||||
func (wmp *WebrtcMediaPipe) PushAudio(audio []int16) { wmp.audioBuf.write(audio, wmp.encodeAudio) }
|
||||
func (wmp *WebrtcMediaPipe) PushAudio(audio []int16) {
|
||||
wmp.audioBuf.write(audio, wmp.encodeAudio)
|
||||
}
|
||||
|
||||
func (wmp *WebrtcMediaPipe) Init() error {
|
||||
if err := wmp.initAudio(wmp.AudioSrcHz, wmp.AudioFrame); err != nil {
|
||||
if err := wmp.initAudio(wmp.AudioSrcHz, wmp.AudioFrames); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := wmp.initVideo(wmp.VideoW, wmp.VideoH, wmp.VideoScale, wmp.vConf); err != nil {
|
||||
|
|
@ -166,30 +101,34 @@ func (wmp *WebrtcMediaPipe) Init() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (wmp *WebrtcMediaPipe) initAudio(srcHz int, frameSize float32) error {
|
||||
func (wmp *WebrtcMediaPipe) initAudio(srcHz int, frameSizes []float32) error {
|
||||
au, err := DefaultOpus()
|
||||
if err != nil {
|
||||
return fmt.Errorf("opus fail: %w", err)
|
||||
}
|
||||
wmp.log.Debug().Msgf("Opus: %v", au.GetInfo())
|
||||
wmp.SetAudio(au)
|
||||
buf := newBuffer(frame(srcHz, frameSize))
|
||||
buf, err := newBuffer(frameSizes, srcHz)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
wmp.log.Debug().Msgf("Opus frames (ms): %v", frameSizes)
|
||||
dstHz, _ := au.SampleRate()
|
||||
if srcHz != dstHz {
|
||||
buf.enableStretch(frame(dstHz, frameSize))
|
||||
buf.resample(dstHz)
|
||||
wmp.log.Debug().Msgf("Resample %vHz -> %vHz", srcHz, dstHz)
|
||||
}
|
||||
wmp.audioBuf = buf
|
||||
return nil
|
||||
}
|
||||
|
||||
func (wmp *WebrtcMediaPipe) encodeAudio(pcm samples) {
|
||||
func (wmp *WebrtcMediaPipe) encodeAudio(pcm samples, ms float32) {
|
||||
data, err := wmp.Audio().Encode(pcm)
|
||||
if err != nil {
|
||||
wmp.log.Error().Err(err).Msgf("opus encode fail")
|
||||
return
|
||||
}
|
||||
wmp.onAudio(data)
|
||||
wmp.onAudio(data, ms)
|
||||
}
|
||||
|
||||
func (wmp *WebrtcMediaPipe) initVideo(w, h int, scale float64, conf config.Video) (err error) {
|
||||
|
|
|
|||
|
|
@ -3,7 +3,6 @@ package media
|
|||
import (
|
||||
"image"
|
||||
"math/rand/v2"
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
"github.com/giongto35/cloud-game/v3/pkg/config"
|
||||
|
|
@ -154,69 +153,6 @@ func gen(l int) []int16 {
|
|||
return nums
|
||||
}
|
||||
|
||||
type bufWrite struct {
|
||||
sample int16
|
||||
len int
|
||||
}
|
||||
|
||||
func TestBufferWrite(t *testing.T) {
|
||||
tests := []struct {
|
||||
bufLen int
|
||||
writes []bufWrite
|
||||
expect samples
|
||||
}{
|
||||
{
|
||||
bufLen: 20,
|
||||
writes: []bufWrite{
|
||||
{sample: 1, len: 10},
|
||||
{sample: 2, len: 20},
|
||||
{sample: 3, len: 30},
|
||||
},
|
||||
expect: samples{3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3},
|
||||
},
|
||||
{
|
||||
bufLen: 11,
|
||||
writes: []bufWrite{
|
||||
{sample: 1, len: 3},
|
||||
{sample: 2, len: 18},
|
||||
{sample: 3, len: 2},
|
||||
},
|
||||
expect: samples{3, 2, 2, 2, 2, 2, 2, 2, 2, 2, 3},
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
var lastResult samples
|
||||
buf := newBuffer(test.bufLen)
|
||||
for _, w := range test.writes {
|
||||
buf.write(samplesOf(w.sample, w.len), func(s samples) { lastResult = s })
|
||||
}
|
||||
if !reflect.DeepEqual(test.expect, lastResult) {
|
||||
t.Errorf("not expted buffer, %v != %v, %v", lastResult, test.expect, buf.s)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkBufferWrite(b *testing.B) {
|
||||
fn := func(_ samples) {}
|
||||
l := 1920
|
||||
buf := newBuffer(l)
|
||||
samples1 := samplesOf(1, l/2)
|
||||
samples2 := samplesOf(2, l*2)
|
||||
for i := 0; i < b.N; i++ {
|
||||
buf.write(samples1, fn)
|
||||
buf.write(samples2, fn)
|
||||
}
|
||||
}
|
||||
|
||||
func samplesOf(v int16, len int) (s samples) {
|
||||
s = make(samples, len)
|
||||
for i := range s {
|
||||
s[i] = v
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func TestFrame(t *testing.T) {
|
||||
type args struct {
|
||||
hz int
|
||||
|
|
|
|||
|
|
@ -229,7 +229,7 @@ func room(cfg conf) testRoom {
|
|||
|
||||
m := media.NewWebRtcMediaPipe(conf.Encoder.Audio, conf.Encoder.Video, l)
|
||||
m.AudioSrcHz = emu.AudioSampleRate()
|
||||
m.AudioFrame = conf.Encoder.Audio.Frame
|
||||
m.AudioFrames = conf.Encoder.Audio.Frames
|
||||
m.VideoW, m.VideoH = emu.ViewportSize()
|
||||
m.VideoScale = emu.Scale()
|
||||
if err := m.Init(); err != nil {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue