diff --git a/pkg/worker/media/buffer.go b/pkg/worker/media/buffer.go index 2228085f..57adeb90 100644 --- a/pkg/worker/media/buffer.go +++ b/pkg/worker/media/buffer.go @@ -1,9 +1,6 @@ package media -import ( - "errors" - "slices" -) +import "errors" type ResampleAlgo uint8 @@ -14,138 +11,163 @@ const ( ) type buffer struct { - in, out samples - frames []float32 + raw samples + scratch samples + buckets []bucket resampler *Resampler srcHz int dstHz int - fi int - p int + bi int algo ResampleAlgo } +type bucket struct { + mem samples + ms float32 + p int + dst int +} + func newBuffer(frames []float32, hz int) (*buffer, error) { if hz < 2000 || len(frames) == 0 { return nil, errors.New("invalid params") } - // frames should be sorted ascending, largest last - frames = slices.Clone(frames) - slices.Sort(frames) + var totalSize int + for _, f := range frames { + totalSize += stereoSamples(hz, f) + } + if totalSize == 0 { + return nil, errors.New("zero buffer size") + } - maxSize := stereoSamples(hz, frames[len(frames)-1]) + buf := &buffer{ + raw: make(samples, totalSize), + scratch: make(samples, 5760), + srcHz: hz, + dstHz: hz, + } - return &buffer{ - in: make(samples, maxSize), - out: make(samples, maxSize), - frames: frames, - srcHz: hz, - dstHz: hz, - fi: len(frames) - 1, // start with largest - }, nil + offset := 0 + for _, f := range frames { + size := stereoSamples(hz, f) + buf.buckets = append(buf.buckets, bucket{mem: buf.raw[offset : offset+size], ms: f, dst: size}) + offset += size + } + buf.bi = len(buf.buckets) - 1 + + return buf, nil } func (b *buffer) close() { if b.resampler != nil { b.resampler.Destroy() + b.resampler = nil } } func (b *buffer) resample(targetHz int, algo ResampleAlgo) error { - b.algo, b.dstHz = algo, targetHz - b.out = make(samples, stereoSamples(targetHz, b.frames[len(b.frames)-1])) + b.algo = algo + b.dstHz = targetHz + + for i := range b.buckets { + b.buckets[i].dst = stereoSamples(targetHz, b.buckets[i].ms) + } if algo == ResampleSpeex { var err error - b.resampler, err = NewResampler(2, b.srcHz, targetHz, QualityMax) - return err + if b.resampler, err = ResamplerInit(2, b.srcHz, targetHz, QualityMax); err != nil { + return err + } } return nil } -func (b *buffer) write(s samples, onFull func(samples, float32)) { - for len(s) > 0 { - srcSize := stereoSamples(b.srcHz, b.frames[b.fi]) +func (b *buffer) write(s samples, onFull func(samples, float32)) int { + read := 0 + for read < len(s) { + cur := &b.buckets[b.bi] + n := copy(cur.mem[cur.p:], s[read:]) + read += n + cur.p += n - n := copy(b.in[b.p:srcSize], s) - if n == 0 { - // oof - break - } - - s = s[n:] - b.p += n - - if b.p >= srcSize { - onFull(b.stretch(srcSize), b.frames[b.fi]) - b.p = 0 - b.choose(len(s)) + if cur.p == len(cur.mem) { + onFull(b.stretch(cur.mem, cur.dst), cur.ms) + b.choose(len(s) - read) + b.buckets[b.bi].p = 0 } } - // Remaining samples stay in buffer, will be completed on next write + return read } func (b *buffer) choose(remaining int) { - // Find the largest bucket that fits in remaining samples - for i := len(b.frames) - 1; i >= 0; i-- { - if remaining >= stereoSamples(b.srcHz, b.frames[i]) { - b.fi = i + for i := len(b.buckets) - 1; i >= 0; i-- { + if remaining >= len(b.buckets[i].mem) { + b.bi = i return } } - // Nothing fits - use smallest and wait for more data - b.fi = 0 + b.bi = 0 } -func (b *buffer) stretch(srcSize int) samples { - dstSize := stereoSamples(b.dstHz, b.frames[b.fi]) - src, out := b.in[:srcSize], b.out[:dstSize] - - if srcSize == dstSize { - return src - } - +func (b *buffer) stretch(src samples, dstSize int) samples { switch b.algo { case ResampleSpeex: - if n, _ := b.resampler.Process(src, out); n == dstSize { - return out + if b.resampler != nil { + if _, out, err := b.resampler.ProcessIntInterleaved(src); err == nil { + if len(out) == dstSize { + return out + } + src = out // use speex output for linear correction + } } fallthrough case ResampleLinear: - return linear(src, out) + return b.linear(src, dstSize) case ResampleNearest: - return nearest(src, out) + return b.nearest(src, dstSize) + default: + return b.linear(src, dstSize) } - return src } -func linear(src, out samples) samples { - sn, dn := len(src)/2, len(out)/2 - if sn < 2 || dn < 2 { - return out +func (b *buffer) linear(src samples, dstSize int) samples { + srcLen := len(src) + if srcLen < 2 || dstSize < 2 { + return b.scratch[:dstSize] } - ratio := ((sn - 1) << 16) / (dn - 1) - for i := 0; i < dn; i++ { + + out := b.scratch[:dstSize] + srcPairs, dstPairs := srcLen/2, dstSize/2 + ratio := ((srcPairs - 1) << 16) / (dstPairs - 1) + + for i := 0; i < dstPairs; i++ { pos := i * ratio - si, frac := (pos>>16)*2, pos&0xFFFF + idx, frac := (pos>>16)*2, pos&0xFFFF di := i * 2 - if si >= len(src)-2 { - out[di], out[di+1] = src[len(src)-2], src[len(src)-1] + + if idx >= srcLen-2 { + out[di], out[di+1] = src[srcLen-2], src[srcLen-1] } else { - out[di] = int16(int32(src[si]) + ((int32(src[si+2])-int32(src[si]))*int32(frac))>>16) - out[di+1] = int16(int32(src[si+1]) + ((int32(src[si+3])-int32(src[si+1]))*int32(frac))>>16) + out[di] = int16(int32(src[idx]) + ((int32(src[idx+2])-int32(src[idx]))*int32(frac))>>16) + out[di+1] = int16(int32(src[idx+1]) + ((int32(src[idx+3])-int32(src[idx+1]))*int32(frac))>>16) } } return out } -func nearest(src, out samples) samples { - sn, dn := len(src)/2, len(out)/2 - if sn < 2 || dn < 2 { - return out +func (b *buffer) nearest(src samples, dstSize int) samples { + srcLen := len(src) + if srcLen < 2 || dstSize < 2 { + return b.scratch[:dstSize] } - for i := 0; i < dn; i++ { - si, di := (i*sn/dn)*2, i*2 + + out := b.scratch[:dstSize] + srcPairs, dstPairs := srcLen/2, dstSize/2 + + for i := 0; i < dstPairs; i++ { + si := (i * srcPairs / dstPairs) * 2 + di := i * 2 out[di], out[di+1] = src[si], src[si+1] } return out diff --git a/pkg/worker/media/buffer_test.go b/pkg/worker/media/buffer_test.go index 096b4ef9..28a596ba 100644 --- a/pkg/worker/media/buffer_test.go +++ b/pkg/worker/media/buffer_test.go @@ -11,109 +11,71 @@ type bufWrite struct { } func TestBufferWrite(t *testing.T) { - // At 2000Hz stereo: - // 5ms = 2000 * 0.005 * 2 = 20 samples - // 10ms = 2000 * 0.01 * 2 = 40 samples - tests := []struct { - frames []float32 bufLen int writes []bufWrite expect samples }{ { - frames: []float32{5, 10}, - bufLen: 2000, - writes: []bufWrite{ - {sample: 1, len: 20}, - {sample: 2, len: 40}, - {sample: 3, len: 60}, - }, - expect: samples(rep(1, 20).add(2, 40).add(3, 40).add(3, 20)), - }, - { - frames: []float32{5, 10}, - bufLen: 2000, - writes: []bufWrite{ - {sample: 1, len: 6}, - {sample: 2, len: 36}, - {sample: 3, len: 4}, - }, - expect: samples(rep(1, 6).add(2, 34)), - }, - { - frames: []float32{5, 10}, - bufLen: 2000, - writes: []bufWrite{ - {sample: 1, len: 40}, - }, - expect: samples(rep(1, 40)), - }, - { - frames: []float32{5, 10}, - bufLen: 2000, - writes: []bufWrite{ - {sample: 1, len: 100}, - }, - expect: samples(rep(1, 40).add(1, 40).add(1, 20)), - }, - { - frames: []float32{5}, bufLen: 2000, writes: []bufWrite{ {sample: 1, len: 10}, - {sample: 2, len: 15}, + {sample: 2, len: 20}, + {sample: 3, len: 30}, }, - expect: samples(rep(1, 10).add(2, 10)), + expect: samples{ + 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 3, 3, 3, + 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, + 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, + }, + }, + { + bufLen: 2000, + writes: []bufWrite{ + {sample: 1, len: 3}, + {sample: 2, len: 18}, + {sample: 3, len: 2}, + }, + expect: samples{1, 1, 1, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2}, }, } - for i, test := range tests { - var results samples - buf, err := newBuffer(test.frames, test.bufLen) + for _, test := range tests { + var lastResult samples + buf, err := newBuffer([]float32{10, 5}, test.bufLen) if err != nil { - t.Fatalf("test %d: %v", i, err) + t.Fatalf("oof, %v", err) } for _, w := range test.writes { - buf.write(samplesOf(w.sample, w.len), func(s samples, ms float32) { - tmp := make(samples, len(s)) - copy(tmp, s) - results = append(results, tmp...) - }) + buf.write(samplesOf(w.sample, w.len), + func(s samples, ms float32) { lastResult = s }, + ) } - if !reflect.DeepEqual(test.expect, results) { - t.Errorf("test %d:\ngot %v (len=%d)\nwant %v (len=%d)", i, results, len(results), test.expect, len(test.expect)) + if !reflect.DeepEqual(test.expect, lastResult) { + t.Errorf("not expted buffer, %v != %v, %v", lastResult, test.expect, len(buf.buckets)) } } } func BenchmarkBufferWrite(b *testing.B) { fn := func(_ samples, _ float32) {} - buf, _ := newBuffer([]float32{10}, 2000) - s1 := samplesOf(1, 1000) - s2 := samplesOf(2, 4000) + l := 2000 + buf, err := newBuffer([]float32{10}, l) + if err != nil { + b.Fatalf("oof: %v", err) + } + samples1 := samplesOf(1, l/2) + samples2 := samplesOf(2, l*2) for i := 0; i < b.N; i++ { - buf.write(s1, fn) - buf.write(s2, fn) + buf.write(samples1, fn) + buf.write(samples2, fn) } } -// helpers - -func samplesOf(v int16, l int) samples { - s := make(samples, l) +func samplesOf(v int16, len int) (s samples) { + s = make(samples, len) for i := range s { s[i] = v } - return s -} - -type builder samples - -func rep(v int16, n int) builder { - return builder(samplesOf(v, n)) -} - -func (b builder) add(v int16, n int) builder { - return append(b, samplesOf(v, n)...) + return } diff --git a/pkg/worker/media/media_test.go b/pkg/worker/media/media_test.go index 64659427..10152bf5 100644 --- a/pkg/worker/media/media_test.go +++ b/pkg/worker/media/media_test.go @@ -125,8 +125,9 @@ func TestResampleStretch(t *testing.T) { {name: "", args: args{pcm: gen(1764), size: 1920}, want: nil}, } for _, tt := range tests { + buf, _ := newBuffer([]float32{20}, 2000) t.Run(tt.name, func(t *testing.T) { - rez2 := nearest(tt.args.pcm, make(samples, tt.args.size)) + rez2 := buf.nearest(tt.args.pcm, tt.args.size) if rez2[0] != tt.args.pcm[0] || rez2[1] != tt.args.pcm[1] || rez2[len(rez2)-1] != tt.args.pcm[len(tt.args.pcm)-1] || rez2[len(rez2)-2] != tt.args.pcm[len(tt.args.pcm)-2] { @@ -140,8 +141,9 @@ func TestResampleStretch(t *testing.T) { func BenchmarkResampler(b *testing.B) { pcm := samples(gen(1764)) size := 1920 + buf, _ := newBuffer([]float32{20}, 1000) for i := 0; i < b.N; i++ { - linear(pcm, make(samples, size)) + buf.linear(pcm, size) } } diff --git a/pkg/worker/media/speex.go b/pkg/worker/media/speex.go index 5f9fa591..a306fd2b 100644 --- a/pkg/worker/media/speex.go +++ b/pkg/worker/media/speex.go @@ -12,74 +12,86 @@ import "C" import "errors" type Resampler struct { - resampler *C.SpeexResamplerState - channels int + resampler *C.SpeexResamplerState + outBuff []int16 // one of these buffers used when typed data read + outBuffFloat []float32 + channels int + multiplier float32 } +// Quality const ( QualityMax = 10 QualityMin = 0 QualityDefault = 4 QualityDesktop = 5 - QualityVoIP = 3 + QualityVoid = 3 ) -func NewResampler(channels, inRate, outRate, quality int) (*Resampler, error) { - var err C.int +// Errors +const ( + ErrorSuccess = iota + ErrorAllocFailed + ErrorBadState + ErrorInvalidArg + ErrorPtrOverlap + ErrorMaxError +) + +const ( + reserve = 1.1 +) + +// ResamplerInit Create a new resampler with integer input and output rates +// Resampling quality between 0 and 10, where 0 has poor quality +// and 10 has very high quality +func ResamplerInit(channels, inRate, outRate, quality int) (*Resampler, error) { + err := C.int(0) r := &Resampler{channels: channels} - - // Use fractional init for exact ratio - g := gcd(outRate, inRate) - r.resampler = C.speex_resampler_init_frac( - C.spx_uint32_t(channels), - C.spx_uint32_t(outRate/g), - C.spx_uint32_t(inRate/g), - C.spx_uint32_t(inRate), - C.spx_uint32_t(outRate), - C.int(quality), - &err, - ) + r.multiplier = float32(outRate) / float32(inRate) * 1.1 + r.resampler = C.speex_resampler_init(C.spx_uint32_t(channels), + C.spx_uint32_t(inRate), C.spx_uint32_t(outRate), C.int(quality), &err) if r.resampler == nil { - return nil, errors.New(C.GoString(C.speex_resampler_strerror(err))) + return nil, StrError(int(err)) } - - C.speex_resampler_skip_zeros(r.resampler) - return r, nil } -func (r *Resampler) Destroy() { +// Destroy a resampler +func (r *Resampler) Destroy() error { if r.resampler != nil { - C.speex_resampler_destroy(r.resampler) - r.resampler = nil + C.speex_resampler_destroy((*C.SpeexResamplerState)(r.resampler)) + return nil } + return StrError(ErrorInvalidArg) } -func (r *Resampler) Process(in, out []int16) (int, error) { - if r.resampler == nil || len(in) < r.channels || len(out) < r.channels { - return 0, nil +// ProcessIntInterleaved Resample an int slice interleaved +func (r *Resampler) ProcessIntInterleaved(in []int16) (int, []int16, error) { + outBuffCap := int(float32(len(in)) * r.multiplier) + if outBuffCap > cap(r.outBuff) { + r.outBuff = make([]int16, int(float32(outBuffCap)*reserve)*4) } - inLen := C.spx_uint32_t(len(in) / r.channels) - outLen := C.spx_uint32_t(len(out) / r.channels) - + outLen := C.spx_uint32_t(len(r.outBuff) / r.channels) res := C.speex_resampler_process_interleaved_int( r.resampler, (*C.spx_int16_t)(&in[0]), &inLen, - (*C.spx_int16_t)(&out[0]), + (*C.spx_int16_t)(&r.outBuff[0]), &outLen, ) - if res != 0 { - return 0, errors.New(C.GoString(C.speex_resampler_strerror(res))) + if res != ErrorSuccess { + return 0, nil, StrError(ErrorInvalidArg) } - - return int(outLen) * r.channels, nil + return int(inLen) * r.channels, r.outBuff[:outLen*2], nil } -func gcd(a, b int) int { - for b != 0 { - a, b = b, a%b +// StrError returns error message +func StrError(errorCode int) error { + cS := C.speex_resampler_strerror(C.int(errorCode)) + if cS == nil { + return nil } - return a + return errors.New(C.GoString(cS)) } diff --git a/pkg/worker/media/speex_resampler.h b/pkg/worker/media/speex_resampler.h index 88b03fd6..27d510f5 100644 --- a/pkg/worker/media/speex_resampler.h +++ b/pkg/worker/media/speex_resampler.h @@ -36,42 +36,11 @@ SpeexResamplerState *speex_resampler_init(spx_uint32_t nb_channels, spx_uint32_t out_rate, int quality, int *err); -/** Create a new resampler with fractional input/output rates. The sampling - * rate ratio is an arbitrary rational number with both the numerator and - * denominator being 32-bit integers. - * @param nb_channels Number of channels to be processed - * @param ratio_num Numerator of the sampling rate ratio - * @param ratio_den Denominator of the sampling rate ratio - * @param in_rate Input sampling rate rounded to the nearest integer (in Hz). - * @param out_rate Output sampling rate rounded to the nearest integer (in Hz). - * @param quality Resampling quality between 0 and 10, where 0 has poor quality - * and 10 has very high quality. - * @return Newly created resampler state - * @retval NULL Error: not enough memory - */ -SpeexResamplerState *speex_resampler_init_frac(spx_uint32_t nb_channels, - spx_uint32_t ratio_num, - spx_uint32_t ratio_den, - spx_uint32_t in_rate, - spx_uint32_t out_rate, - int quality, - int *err); /** Destroy a resampler state. * @param st Resampler state */ void speex_resampler_destroy(SpeexResamplerState *st); - -/** Make sure that the first samples to go out of the resamplers don't have - * leading zeros. This is only useful before starting to use a newly created - * resampler. It is recommended to use that when resampling an audio file, as - * it will generate a file with the same length. For real-time processing, - * it is probably easier not to use this call (so that the output duration - * is the same for the first frame). - * @param st Resampler state - */ -int speex_resampler_skip_zeros(SpeexResamplerState *st); - /** Resample an interleaved int array. The input and output buffers must *not* overlap. * @param st Resampler state * @param in Input buffer