From eae8c71bb16129cc6af2b0da29f1284ac226e8aa Mon Sep 17 00:00:00 2001 From: Sergey Stepanov Date: Thu, 12 Dec 2024 11:23:59 +0300 Subject: [PATCH] Ugly audio buf --- pkg/worker/media/buffer.go | 107 ++++++++++++++++++++++++++++++++ pkg/worker/media/buffer_test.go | 10 +++ pkg/worker/media/media.go | 23 ++++--- 3 files changed, 132 insertions(+), 8 deletions(-) create mode 100644 pkg/worker/media/buffer.go create mode 100644 pkg/worker/media/buffer_test.go diff --git a/pkg/worker/media/buffer.go b/pkg/worker/media/buffer.go new file mode 100644 index 00000000..69f1c28a --- /dev/null +++ b/pkg/worker/media/buffer.go @@ -0,0 +1,107 @@ +package media + +import "fmt" + +type buffer2 struct { + s samples + wi int + dst int + stretch bool + frameHz []int + + dstHz2 int + + buckets []Bucket + cur *Bucket + + sym bool +} + +type Bucket struct { + mem samples + vol int + lv int +} + +func NewBucket(level int, size int) Bucket { + return Bucket{ + mem: make(samples, size), + vol: level, + } +} + +func (b *Bucket) Reset() { + b.lv = 0 +} + +func (b *Bucket) IsEmpty() bool { + return b.lv == 0 +} + +var frames = [...]int{10, 5} + +func newOpusBuffer(hz int) buffer2 { + buf := buffer2{} + + var fz = make([]int, 3) + sum := 0 + for i, f := range frames { + sum += f + fz[i] = frame(hz, float32(f)) + buf.buckets = append(buf.buckets, NewBucket(f, fz[i])) + } + buf.cur = &buf.buckets[0] + + //buf.enableStretch(frame(hz, float32(buf.cur.vol))) + + return buf +} + +func (b *buffer2) chooseBucket(l int) { + //b.cur = &b.buckets[0] + for _, bb := range b.buckets { + if l >= len(bb.mem) { + b.cur = &bb + b.sym = false + if b.stretch { + b.enableStretch(frame(b.dstHz2, float32(b.cur.vol))) + } + break + } + } +} + +// enableStretch adds a simple stretching of buffer to a desired size before +// the onFull callback call. +func (b *buffer2) enableStretch(l int) { b.stretch = true; b.dst = frame(b.dstHz2, float32(b.cur.vol)) } + +func (b *buffer2) dstHz(hz int) { + b.dstHz2 = hz + b.enableStretch(frame(hz, float32(b.cur.vol))) +} + +func (b *buffer2) write(s samples, onFull func(samples, int)) (r int) { + // select bucket + //b.chooseBucket(len(s)) + for r < len(s) { + buf := b.cur + + w := copy(buf.mem[buf.lv:], s[r:]) + r += w + buf.lv += w + if buf.lv == len(buf.mem) { + b.sym = true + if b.stretch { + onFull(buf.mem.stretch(b.dst), buf.vol) + } else { + onFull(buf.mem, buf.vol) + } + if !b.sym { + fmt.Printf(">>>>>>>>>") + } + b.chooseBucket(len(s) - r) + b.cur.Reset() + } + } + return +} diff --git a/pkg/worker/media/buffer_test.go b/pkg/worker/media/buffer_test.go new file mode 100644 index 00000000..6671d729 --- /dev/null +++ b/pkg/worker/media/buffer_test.go @@ -0,0 +1,10 @@ +package media + +import "testing" + +func Test_buffer2_write2(t *testing.T) { + + buf := newOpusBuffer(1000) + + t.Logf("%+v", buf) +} diff --git a/pkg/worker/media/media.go b/pkg/worker/media/media.go index ed356308..df4709fc 100644 --- a/pkg/worker/media/media.go +++ b/pkg/worker/media/media.go @@ -107,8 +107,8 @@ func (s samples) stretch(size int) []int16 { type WebrtcMediaPipe struct { a *opus.Encoder v *encoder.Video - onAudio func([]byte) - audioBuf buffer + onAudio func([]byte, int) + audioBuf buffer2 log *logger.Logger mua sync.RWMutex @@ -135,8 +135,12 @@ func NewWebRtcMediaPipe(ac config.Audio, vc config.Video, log *logger.Logger) *W } func (wmp *WebrtcMediaPipe) SetAudioCb(cb func([]byte, int32)) { - fr := int32(time.Duration(wmp.AudioFrame) * time.Millisecond) - wmp.onAudio = func(bytes []byte) { cb(bytes, fr) } + //fr := int32(time.Duration(wmp.AudioFrame) * time.Millisecond) + wmp.onAudio = func(bytes []byte, l int) { + fr := int32(time.Duration(l) * time.Millisecond) + //wmp.log.Info().Msgf(">>> %v", fr) + cb(bytes, fr) + } } func (wmp *WebrtcMediaPipe) Destroy() { v := wmp.Video() @@ -144,7 +148,9 @@ func (wmp *WebrtcMediaPipe) Destroy() { v.Stop() } } -func (wmp *WebrtcMediaPipe) PushAudio(audio []int16) { wmp.audioBuf.write(audio, wmp.encodeAudio) } +func (wmp *WebrtcMediaPipe) PushAudio(audio []int16) { + wmp.audioBuf.write(audio, wmp.encodeAudio) +} func (wmp *WebrtcMediaPipe) Init() error { if err := wmp.initAudio(wmp.AudioSrcHz, wmp.AudioFrame); err != nil { @@ -173,9 +179,10 @@ func (wmp *WebrtcMediaPipe) initAudio(srcHz int, frameSize float32) error { } wmp.log.Debug().Msgf("Opus: %v", au.GetInfo()) wmp.SetAudio(au) - buf := newBuffer(frame(srcHz, frameSize)) + buf := newOpusBuffer(srcHz) //newBuffer(frame(srcHz, frameSize)) dstHz, _ := au.SampleRate() if srcHz != dstHz { + buf.dstHz(dstHz) buf.enableStretch(frame(dstHz, frameSize)) wmp.log.Debug().Msgf("Resample %vHz -> %vHz", srcHz, dstHz) } @@ -183,13 +190,13 @@ func (wmp *WebrtcMediaPipe) initAudio(srcHz int, frameSize float32) error { return nil } -func (wmp *WebrtcMediaPipe) encodeAudio(pcm samples) { +func (wmp *WebrtcMediaPipe) encodeAudio(pcm samples, l int) { data, err := wmp.Audio().Encode(pcm) if err != nil { wmp.log.Error().Err(err).Msgf("opus encode fail") return } - wmp.onAudio(data) + wmp.onAudio(data, l) } func (wmp *WebrtcMediaPipe) initVideo(w, h int, scale float64, conf config.Video) (err error) {