From 1e4e5b3c65d015dc5d3ebbf640dcbd6db5fe7201 Mon Sep 17 00:00:00 2001 From: sergystepanov Date: Sun, 14 Dec 2025 22:15:28 +0300 Subject: [PATCH] Clean media buffer --- pkg/worker/media/buffer.go | 176 +++++++++++++---------------- pkg/worker/media/buffer_test.go | 112 ++++++++++++------ pkg/worker/media/media_test.go | 6 +- pkg/worker/media/speex.go | 90 +++++++-------- pkg/worker/media/speex_resampler.h | 31 +++++ 5 files changed, 224 insertions(+), 191 deletions(-) diff --git a/pkg/worker/media/buffer.go b/pkg/worker/media/buffer.go index 7d528452..2228085f 100644 --- a/pkg/worker/media/buffer.go +++ b/pkg/worker/media/buffer.go @@ -1,6 +1,9 @@ package media -import "errors" +import ( + "errors" + "slices" +) type ResampleAlgo uint8 @@ -11,163 +14,138 @@ const ( ) type buffer struct { - raw samples - scratch samples - buckets []bucket + in, out samples + frames []float32 resampler *Resampler srcHz int dstHz int - bi int + fi int + p int algo ResampleAlgo } -type bucket struct { - mem samples - ms float32 - p int - dst int -} - func newBuffer(frames []float32, hz int) (*buffer, error) { if hz < 2000 || len(frames) == 0 { return nil, errors.New("invalid params") } - var totalSize int - for _, f := range frames { - totalSize += stereoSamples(hz, f) - } - if totalSize == 0 { - return nil, errors.New("zero buffer size") - } + // frames should be sorted ascending, largest last + frames = slices.Clone(frames) + slices.Sort(frames) - buf := &buffer{ - raw: make(samples, totalSize), - scratch: make(samples, 5760), - srcHz: hz, - dstHz: hz, - } + maxSize := stereoSamples(hz, frames[len(frames)-1]) - offset := 0 - for _, f := range frames { - size := stereoSamples(hz, f) - buf.buckets = append(buf.buckets, bucket{mem: buf.raw[offset : offset+size], ms: f, dst: size}) - offset += size - } - buf.bi = len(buf.buckets) - 1 - - return buf, nil + return &buffer{ + in: make(samples, maxSize), + out: make(samples, maxSize), + frames: frames, + srcHz: hz, + dstHz: hz, + fi: len(frames) - 1, // start with largest + }, nil } func (b *buffer) close() { if b.resampler != nil { b.resampler.Destroy() - b.resampler = nil } } func (b *buffer) resample(targetHz int, algo ResampleAlgo) error { - b.algo = algo - b.dstHz = targetHz - - for i := range b.buckets { - b.buckets[i].dst = stereoSamples(targetHz, b.buckets[i].ms) - } + b.algo, b.dstHz = algo, targetHz + b.out = make(samples, stereoSamples(targetHz, b.frames[len(b.frames)-1])) if algo == ResampleSpeex { var err error - if b.resampler, err = ResamplerInit(2, b.srcHz, targetHz, QualityDesktop); err != nil { - return err - } + b.resampler, err = NewResampler(2, b.srcHz, targetHz, QualityMax) + return err } return nil } -func (b *buffer) write(s samples, onFull func(samples, float32)) int { - read := 0 - for read < len(s) { - cur := &b.buckets[b.bi] - n := copy(cur.mem[cur.p:], s[read:]) - read += n - cur.p += n +func (b *buffer) write(s samples, onFull func(samples, float32)) { + for len(s) > 0 { + srcSize := stereoSamples(b.srcHz, b.frames[b.fi]) - if cur.p == len(cur.mem) { - onFull(b.stretch(cur.mem, cur.dst), cur.ms) - b.choose(len(s) - read) - b.buckets[b.bi].p = 0 + n := copy(b.in[b.p:srcSize], s) + if n == 0 { + // oof + break + } + + s = s[n:] + b.p += n + + if b.p >= srcSize { + onFull(b.stretch(srcSize), b.frames[b.fi]) + b.p = 0 + b.choose(len(s)) } } - return read + // Remaining samples stay in buffer, will be completed on next write } func (b *buffer) choose(remaining int) { - for i := len(b.buckets) - 1; i >= 0; i-- { - if remaining >= len(b.buckets[i].mem) { - b.bi = i + // Find the largest bucket that fits in remaining samples + for i := len(b.frames) - 1; i >= 0; i-- { + if remaining >= stereoSamples(b.srcHz, b.frames[i]) { + b.fi = i return } } - b.bi = 0 + // Nothing fits - use smallest and wait for more data + b.fi = 0 } -func (b *buffer) stretch(src samples, dstSize int) samples { +func (b *buffer) stretch(srcSize int) samples { + dstSize := stereoSamples(b.dstHz, b.frames[b.fi]) + src, out := b.in[:srcSize], b.out[:dstSize] + + if srcSize == dstSize { + return src + } + switch b.algo { case ResampleSpeex: - if b.resampler != nil { - if _, out, err := b.resampler.PocessIntInterleaved(src); err == nil { - if len(out) == dstSize { - return out - } - src = out // use speex output for linear correction - } + if n, _ := b.resampler.Process(src, out); n == dstSize { + return out } fallthrough case ResampleLinear: - return b.linear(src, dstSize) + return linear(src, out) case ResampleNearest: - return b.nearest(src, dstSize) - default: - return b.linear(src, dstSize) + return nearest(src, out) } + return src } -func (b *buffer) linear(src samples, dstSize int) samples { - srcLen := len(src) - if srcLen < 2 || dstSize < 2 { - return b.scratch[:dstSize] +func linear(src, out samples) samples { + sn, dn := len(src)/2, len(out)/2 + if sn < 2 || dn < 2 { + return out } - - out := b.scratch[:dstSize] - srcPairs, dstPairs := srcLen/2, dstSize/2 - ratio := ((srcPairs - 1) << 16) / (dstPairs - 1) - - for i := 0; i < dstPairs; i++ { + ratio := ((sn - 1) << 16) / (dn - 1) + for i := 0; i < dn; i++ { pos := i * ratio - idx, frac := (pos>>16)*2, pos&0xFFFF + si, frac := (pos>>16)*2, pos&0xFFFF di := i * 2 - - if idx >= srcLen-2 { - out[di], out[di+1] = src[srcLen-2], src[srcLen-1] + if si >= len(src)-2 { + out[di], out[di+1] = src[len(src)-2], src[len(src)-1] } else { - out[di] = int16(int32(src[idx]) + ((int32(src[idx+2])-int32(src[idx]))*int32(frac))>>16) - out[di+1] = int16(int32(src[idx+1]) + ((int32(src[idx+3])-int32(src[idx+1]))*int32(frac))>>16) + out[di] = int16(int32(src[si]) + ((int32(src[si+2])-int32(src[si]))*int32(frac))>>16) + out[di+1] = int16(int32(src[si+1]) + ((int32(src[si+3])-int32(src[si+1]))*int32(frac))>>16) } } return out } -func (b *buffer) nearest(src samples, dstSize int) samples { - srcLen := len(src) - if srcLen < 2 || dstSize < 2 { - return b.scratch[:dstSize] +func nearest(src, out samples) samples { + sn, dn := len(src)/2, len(out)/2 + if sn < 2 || dn < 2 { + return out } - - out := b.scratch[:dstSize] - srcPairs, dstPairs := srcLen/2, dstSize/2 - - for i := 0; i < dstPairs; i++ { - si := (i * srcPairs / dstPairs) * 2 - di := i * 2 + for i := 0; i < dn; i++ { + si, di := (i*sn/dn)*2, i*2 out[di], out[di+1] = src[si], src[si+1] } return out diff --git a/pkg/worker/media/buffer_test.go b/pkg/worker/media/buffer_test.go index 28a596ba..096b4ef9 100644 --- a/pkg/worker/media/buffer_test.go +++ b/pkg/worker/media/buffer_test.go @@ -11,71 +11,109 @@ type bufWrite struct { } func TestBufferWrite(t *testing.T) { + // At 2000Hz stereo: + // 5ms = 2000 * 0.005 * 2 = 20 samples + // 10ms = 2000 * 0.01 * 2 = 40 samples + tests := []struct { + frames []float32 bufLen int writes []bufWrite expect samples }{ { + frames: []float32{5, 10}, + bufLen: 2000, + writes: []bufWrite{ + {sample: 1, len: 20}, + {sample: 2, len: 40}, + {sample: 3, len: 60}, + }, + expect: samples(rep(1, 20).add(2, 40).add(3, 40).add(3, 20)), + }, + { + frames: []float32{5, 10}, + bufLen: 2000, + writes: []bufWrite{ + {sample: 1, len: 6}, + {sample: 2, len: 36}, + {sample: 3, len: 4}, + }, + expect: samples(rep(1, 6).add(2, 34)), + }, + { + frames: []float32{5, 10}, + bufLen: 2000, + writes: []bufWrite{ + {sample: 1, len: 40}, + }, + expect: samples(rep(1, 40)), + }, + { + frames: []float32{5, 10}, + bufLen: 2000, + writes: []bufWrite{ + {sample: 1, len: 100}, + }, + expect: samples(rep(1, 40).add(1, 40).add(1, 20)), + }, + { + frames: []float32{5}, bufLen: 2000, writes: []bufWrite{ {sample: 1, len: 10}, - {sample: 2, len: 20}, - {sample: 3, len: 30}, + {sample: 2, len: 15}, }, - expect: samples{ - 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 3, 3, 3, - 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, - 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, - }, - }, - { - bufLen: 2000, - writes: []bufWrite{ - {sample: 1, len: 3}, - {sample: 2, len: 18}, - {sample: 3, len: 2}, - }, - expect: samples{1, 1, 1, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2}, + expect: samples(rep(1, 10).add(2, 10)), }, } - for _, test := range tests { - var lastResult samples - buf, err := newBuffer([]float32{10, 5}, test.bufLen) + for i, test := range tests { + var results samples + buf, err := newBuffer(test.frames, test.bufLen) if err != nil { - t.Fatalf("oof, %v", err) + t.Fatalf("test %d: %v", i, err) } for _, w := range test.writes { - buf.write(samplesOf(w.sample, w.len), - func(s samples, ms float32) { lastResult = s }, - ) + buf.write(samplesOf(w.sample, w.len), func(s samples, ms float32) { + tmp := make(samples, len(s)) + copy(tmp, s) + results = append(results, tmp...) + }) } - if !reflect.DeepEqual(test.expect, lastResult) { - t.Errorf("not expted buffer, %v != %v, %v", lastResult, test.expect, len(buf.buckets)) + if !reflect.DeepEqual(test.expect, results) { + t.Errorf("test %d:\ngot %v (len=%d)\nwant %v (len=%d)", i, results, len(results), test.expect, len(test.expect)) } } } func BenchmarkBufferWrite(b *testing.B) { fn := func(_ samples, _ float32) {} - l := 2000 - buf, err := newBuffer([]float32{10}, l) - if err != nil { - b.Fatalf("oof: %v", err) - } - samples1 := samplesOf(1, l/2) - samples2 := samplesOf(2, l*2) + buf, _ := newBuffer([]float32{10}, 2000) + s1 := samplesOf(1, 1000) + s2 := samplesOf(2, 4000) for i := 0; i < b.N; i++ { - buf.write(samples1, fn) - buf.write(samples2, fn) + buf.write(s1, fn) + buf.write(s2, fn) } } -func samplesOf(v int16, len int) (s samples) { - s = make(samples, len) +// helpers + +func samplesOf(v int16, l int) samples { + s := make(samples, l) for i := range s { s[i] = v } - return + return s +} + +type builder samples + +func rep(v int16, n int) builder { + return builder(samplesOf(v, n)) +} + +func (b builder) add(v int16, n int) builder { + return append(b, samplesOf(v, n)...) } diff --git a/pkg/worker/media/media_test.go b/pkg/worker/media/media_test.go index 10152bf5..64659427 100644 --- a/pkg/worker/media/media_test.go +++ b/pkg/worker/media/media_test.go @@ -125,9 +125,8 @@ func TestResampleStretch(t *testing.T) { {name: "", args: args{pcm: gen(1764), size: 1920}, want: nil}, } for _, tt := range tests { - buf, _ := newBuffer([]float32{20}, 2000) t.Run(tt.name, func(t *testing.T) { - rez2 := buf.nearest(tt.args.pcm, tt.args.size) + rez2 := nearest(tt.args.pcm, make(samples, tt.args.size)) if rez2[0] != tt.args.pcm[0] || rez2[1] != tt.args.pcm[1] || rez2[len(rez2)-1] != tt.args.pcm[len(tt.args.pcm)-1] || rez2[len(rez2)-2] != tt.args.pcm[len(tt.args.pcm)-2] { @@ -141,9 +140,8 @@ func TestResampleStretch(t *testing.T) { func BenchmarkResampler(b *testing.B) { pcm := samples(gen(1764)) size := 1920 - buf, _ := newBuffer([]float32{20}, 1000) for i := 0; i < b.N; i++ { - buf.linear(pcm, size) + linear(pcm, make(samples, size)) } } diff --git a/pkg/worker/media/speex.go b/pkg/worker/media/speex.go index 3b4b18cd..5f9fa591 100644 --- a/pkg/worker/media/speex.go +++ b/pkg/worker/media/speex.go @@ -12,86 +12,74 @@ import "C" import "errors" type Resampler struct { - resampler *C.SpeexResamplerState - outBuff []int16 // one of these buffers used when typed data read - outBuffFloat []float32 - channels int - multiplier float32 + resampler *C.SpeexResamplerState + channels int } -// Quality const ( QualityMax = 10 QualityMin = 0 QualityDefault = 4 QualityDesktop = 5 - QualityVoid = 3 + QualityVoIP = 3 ) -// Errors -const ( - ErrorSuccess = iota - ErrorAllocFailed - ErrorBadState - ErrorInvalidArg - ErrorPtrOverlap - ErrorMaxError -) - -const ( - reserve = 1.1 -) - -// ResamplerInit Create a new resampler with integer input and output rates -// Resampling quality between 0 and 10, where 0 has poor quality -// and 10 has very high quality -func ResamplerInit(channels, inRate, outRate, quality int) (*Resampler, error) { - err := C.int(0) +func NewResampler(channels, inRate, outRate, quality int) (*Resampler, error) { + var err C.int r := &Resampler{channels: channels} - r.multiplier = float32(outRate) / float32(inRate) * 1.1 - r.resampler = C.speex_resampler_init(C.spx_uint32_t(channels), - C.spx_uint32_t(inRate), C.spx_uint32_t(outRate), C.int(quality), &err) + + // Use fractional init for exact ratio + g := gcd(outRate, inRate) + r.resampler = C.speex_resampler_init_frac( + C.spx_uint32_t(channels), + C.spx_uint32_t(outRate/g), + C.spx_uint32_t(inRate/g), + C.spx_uint32_t(inRate), + C.spx_uint32_t(outRate), + C.int(quality), + &err, + ) if r.resampler == nil { - return nil, StrError(int(err)) + return nil, errors.New(C.GoString(C.speex_resampler_strerror(err))) } + + C.speex_resampler_skip_zeros(r.resampler) + return r, nil } -// Destroy a resampler -func (r *Resampler) Destroy() error { +func (r *Resampler) Destroy() { if r.resampler != nil { - C.speex_resampler_destroy((*C.SpeexResamplerState)(r.resampler)) - return nil + C.speex_resampler_destroy(r.resampler) + r.resampler = nil } - return StrError(ErrorInvalidArg) } -// PocessIntInterleaved Resample an int slice interleaved -func (r *Resampler) PocessIntInterleaved(in []int16) (int, []int16, error) { - outBuffCap := int(float32(len(in)) * r.multiplier) - if outBuffCap > cap(r.outBuff) { - r.outBuff = make([]int16, int(float32(outBuffCap)*reserve)*4) +func (r *Resampler) Process(in, out []int16) (int, error) { + if r.resampler == nil || len(in) < r.channels || len(out) < r.channels { + return 0, nil } + inLen := C.spx_uint32_t(len(in) / r.channels) - outLen := C.spx_uint32_t(len(r.outBuff) / r.channels) + outLen := C.spx_uint32_t(len(out) / r.channels) + res := C.speex_resampler_process_interleaved_int( r.resampler, (*C.spx_int16_t)(&in[0]), &inLen, - (*C.spx_int16_t)(&r.outBuff[0]), + (*C.spx_int16_t)(&out[0]), &outLen, ) - if res != ErrorSuccess { - return 0, nil, StrError(ErrorInvalidArg) + if res != 0 { + return 0, errors.New(C.GoString(C.speex_resampler_strerror(res))) } - return int(inLen) * r.channels, r.outBuff[:outLen*2], nil + + return int(outLen) * r.channels, nil } -// StrError returns error message -func StrError(errorCode int) error { - cS := C.speex_resampler_strerror(C.int(errorCode)) - if cS == nil { - return nil +func gcd(a, b int) int { + for b != 0 { + a, b = b, a%b } - return errors.New(C.GoString(cS)) + return a } diff --git a/pkg/worker/media/speex_resampler.h b/pkg/worker/media/speex_resampler.h index 27d510f5..88b03fd6 100644 --- a/pkg/worker/media/speex_resampler.h +++ b/pkg/worker/media/speex_resampler.h @@ -36,11 +36,42 @@ SpeexResamplerState *speex_resampler_init(spx_uint32_t nb_channels, spx_uint32_t out_rate, int quality, int *err); +/** Create a new resampler with fractional input/output rates. The sampling + * rate ratio is an arbitrary rational number with both the numerator and + * denominator being 32-bit integers. + * @param nb_channels Number of channels to be processed + * @param ratio_num Numerator of the sampling rate ratio + * @param ratio_den Denominator of the sampling rate ratio + * @param in_rate Input sampling rate rounded to the nearest integer (in Hz). + * @param out_rate Output sampling rate rounded to the nearest integer (in Hz). + * @param quality Resampling quality between 0 and 10, where 0 has poor quality + * and 10 has very high quality. + * @return Newly created resampler state + * @retval NULL Error: not enough memory + */ +SpeexResamplerState *speex_resampler_init_frac(spx_uint32_t nb_channels, + spx_uint32_t ratio_num, + spx_uint32_t ratio_den, + spx_uint32_t in_rate, + spx_uint32_t out_rate, + int quality, + int *err); /** Destroy a resampler state. * @param st Resampler state */ void speex_resampler_destroy(SpeexResamplerState *st); + +/** Make sure that the first samples to go out of the resamplers don't have + * leading zeros. This is only useful before starting to use a newly created + * resampler. It is recommended to use that when resampling an audio file, as + * it will generate a file with the same length. For real-time processing, + * it is probably easier not to use this call (so that the output duration + * is the same for the first frame). + * @param st Resampler state + */ +int speex_resampler_skip_zeros(SpeexResamplerState *st); + /** Resample an interleaved int array. The input and output buffers must *not* overlap. * @param st Resampler state * @param in Input buffer