diff --git a/audio/filter/filter_test.go b/audio/filter/filter_test.go index 44b651f..9558e3b 100644 --- a/audio/filter/filter_test.go +++ b/audio/filter/filter_test.go @@ -30,7 +30,7 @@ func TestFilterChainNoResample(t *testing.T) { if result.GetChannels() != 2 { t.Errorf("Wrong Channel Count %d != %d", result.GetChannels(), 2) } - if sink.SamplesRead != 17323031 { - t.Errorf("Wrong Sample Count %d != %d", sink.SamplesRead, 17323031) + if sink.GetSamplesRead() != 17323031 { + t.Errorf("Wrong Sample Count %d != %d", sink.GetSamplesRead(), 17323031) } } diff --git a/audio/filter/resample_filter_cgo_test.go b/audio/filter/resample_filter_cgo_test.go index 261ee72..d4d6b06 100644 --- a/audio/filter/resample_filter_cgo_test.go +++ b/audio/filter/resample_filter_cgo_test.go @@ -37,7 +37,7 @@ func TestFilterChainResample(t *testing.T) { if result.GetChannels() != 2 { t.Errorf("Wrong Channel Count %d != %d", result.GetChannels(), 2) } - if sink.SamplesRead != 6284999 { - t.Errorf("Wrong Sample Count %d != %d", sink.SamplesRead, 6284999) + if sink.GetSamplesRead() != 6284999 { + t.Errorf("Wrong Sample Count %d != %d", sink.GetSamplesRead(), 6284999) } } diff --git a/audio/filter/resample_filter_nocgo_test.go b/audio/filter/resample_filter_nocgo_test.go index e9387c2..6f9c4c4 100644 --- a/audio/filter/resample_filter_nocgo_test.go +++ b/audio/filter/resample_filter_nocgo_test.go @@ -37,7 +37,7 @@ func TestFilterChainResample(t *testing.T) { if result.GetChannels() != 2 { t.Errorf("Wrong Channel Count %d != %d", result.GetChannels(), 2) } - if sink.SamplesRead != 6285000 { - t.Errorf("Wrong Sample Count %d != %d", sink.SamplesRead, 6285000) + if sink.GetSamplesRead() != 6285000 { + t.Errorf("Wrong Sample Count %d != %d", sink.GetSamplesRead(), 6285000) } } diff --git a/audio/format/analyzer.go b/audio/format/analyzer.go index 3835578..a4602b9 100644 --- a/audio/format/analyzer.go +++ b/audio/format/analyzer.go @@ -170,7 +170,7 @@ func (c AnalyzerChannel) SkipEndSamples(samples int) (channel AnalyzerChannel) { return } -func (c AnalyzerChannel) SkipEndSamplesMultiple(wg *sync.WaitGroup, offset *uint32, samples int) (channel AnalyzerChannel) { +func (c AnalyzerChannel) SkipEndSamplesMultiple(wg *sync.WaitGroup, offset *atomic.Uint32, samples int) (channel AnalyzerChannel) { channel = make(AnalyzerChannel, chanBuf) go func() { defer close(channel) @@ -194,7 +194,7 @@ func (c AnalyzerChannel) SkipEndSamplesMultiple(wg *sync.WaitGroup, offset *uint } wg.Wait() - totalSampleOffset := samplesRead + int(atomic.LoadUint32(offset)) + totalSampleOffset := samplesRead + int(offset.Load()) if len(buffer) > 0 { p := &AnalyzerPacket{ diff --git a/audio/queue/queue.go b/audio/queue/queue.go index 7afa0c8..e1f4d7f 100644 --- a/audio/queue/queue.go +++ b/audio/queue/queue.go @@ -24,7 +24,7 @@ type Queue struct { queue []*QueueEntry output audio.Source interrupt chan bool - interruptDepth int64 + interruptDepth atomic.Int64 closed bool lock sync.RWMutex wg sync.WaitGroup @@ -121,7 +121,7 @@ func queueLoopStart[T audio.AllowedSourceTypes](q *Queue) { if len(q.queue) == 0 { //no more entries, wait for interrupt q.lock.RUnlock() <-q.interrupt - atomic.AddInt64(&q.interruptDepth, -1) + q.interruptDepth.Add(-1) continue } if current == nil || current.Identifier != q.queue[0].Identifier { @@ -134,7 +134,7 @@ func queueLoopStart[T audio.AllowedSourceTypes](q *Queue) { for { select { case <-q.interrupt: - atomic.AddInt64(&q.interruptDepth, -1) + q.interruptDepth.Add(-1) //force recheck break F1 case block, more := <-currentBlocks: @@ -324,8 +324,8 @@ func (q *Queue) GetChannels() int { func (q *Queue) sendInterrupt() { //TODO: maybe use len() on channel? - if atomic.LoadInt64(&q.interruptDepth) == 0 { //not waiting on interrupt - atomic.AddInt64(&q.interruptDepth, 1) + if q.interruptDepth.Load() == 0 { //not waiting on interrupt + q.interruptDepth.Add(1) q.interrupt <- true } } diff --git a/audio/queue/queue_test.go b/audio/queue/queue_test.go index b5438e9..865e974 100644 --- a/audio/queue/queue_test.go +++ b/audio/queue/queue_test.go @@ -68,7 +68,7 @@ func TestQueue(t *testing.T) { if result.GetChannels() != 2 { t.Errorf("Wrong Channel Count %d != %d", result.GetChannels(), 2) } - if sink.SamplesRead != 470828559 { - t.Errorf("Wrong Sample Count %d != %d", sink.SamplesRead, 470828559) + if sink.GetSamplesRead() != 470828559 { + t.Errorf("Wrong Sample Count %d != %d", sink.GetSamplesRead(), 470828559) } } diff --git a/audio/sink.go b/audio/sink.go index 12e760e..eb88b3b 100644 --- a/audio/sink.go +++ b/audio/sink.go @@ -40,8 +40,8 @@ func (n *NullSink) Process(source Source) { type ForwardSink struct { Sink Target Sink - SamplesRead int64 - Duration time.Duration + SamplesRead atomic.Uint64 + Duration atomic.Int64 } func NewForwardSink(target Sink) *ForwardSink { @@ -51,11 +51,11 @@ func NewForwardSink(target Sink) *ForwardSink { } func (n *ForwardSink) GetDuration() time.Duration { - return time.Duration(atomic.LoadInt64((*int64)(&n.Duration))) + return time.Duration(n.Duration.Load()) } -func (n *ForwardSink) GetSamplesRead() int64 { - return atomic.LoadInt64(&n.SamplesRead) +func (n *ForwardSink) GetSamplesRead() uint64 { + return n.SamplesRead.Load() } func forwardSinkProcess[T AllowedSourceTypes](n *ForwardSink, source TypedSource[T]) { @@ -63,8 +63,8 @@ func forwardSinkProcess[T AllowedSourceTypes](n *ForwardSink, source TypedSource go func() { defer processor.Close() for block := range source.GetBlocks() { - atomic.AddInt64((*int64)(&n.Duration), int64((time.Second*time.Duration(len(block)/source.GetChannels()))/time.Duration(source.GetSampleRate()))) - atomic.AddInt64(&n.SamplesRead, int64(len(block)/source.GetChannels())) + n.Duration.Add(int64((time.Second * time.Duration(len(block)/source.GetChannels())) / time.Duration(source.GetSampleRate()))) + n.SamplesRead.Add(uint64(len(block) / source.GetChannels())) processor.IngestNative(block, source.GetBitDepth()) } }()