Update several sections to use atomic[T] instead of pointers
All checks were successful
continuous-integration/drone/push Build is passing
All checks were successful
continuous-integration/drone/push Build is passing
This commit is contained in:
parent
d785718551
commit
bf39ddac3a
|
@ -30,7 +30,7 @@ func TestFilterChainNoResample(t *testing.T) {
|
||||||
if result.GetChannels() != 2 {
|
if result.GetChannels() != 2 {
|
||||||
t.Errorf("Wrong Channel Count %d != %d", result.GetChannels(), 2)
|
t.Errorf("Wrong Channel Count %d != %d", result.GetChannels(), 2)
|
||||||
}
|
}
|
||||||
if sink.SamplesRead != 17323031 {
|
if sink.GetSamplesRead() != 17323031 {
|
||||||
t.Errorf("Wrong Sample Count %d != %d", sink.SamplesRead, 17323031)
|
t.Errorf("Wrong Sample Count %d != %d", sink.GetSamplesRead(), 17323031)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -37,7 +37,7 @@ func TestFilterChainResample(t *testing.T) {
|
||||||
if result.GetChannels() != 2 {
|
if result.GetChannels() != 2 {
|
||||||
t.Errorf("Wrong Channel Count %d != %d", result.GetChannels(), 2)
|
t.Errorf("Wrong Channel Count %d != %d", result.GetChannels(), 2)
|
||||||
}
|
}
|
||||||
if sink.SamplesRead != 6284999 {
|
if sink.GetSamplesRead() != 6284999 {
|
||||||
t.Errorf("Wrong Sample Count %d != %d", sink.SamplesRead, 6284999)
|
t.Errorf("Wrong Sample Count %d != %d", sink.GetSamplesRead(), 6284999)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -37,7 +37,7 @@ func TestFilterChainResample(t *testing.T) {
|
||||||
if result.GetChannels() != 2 {
|
if result.GetChannels() != 2 {
|
||||||
t.Errorf("Wrong Channel Count %d != %d", result.GetChannels(), 2)
|
t.Errorf("Wrong Channel Count %d != %d", result.GetChannels(), 2)
|
||||||
}
|
}
|
||||||
if sink.SamplesRead != 6285000 {
|
if sink.GetSamplesRead() != 6285000 {
|
||||||
t.Errorf("Wrong Sample Count %d != %d", sink.SamplesRead, 6285000)
|
t.Errorf("Wrong Sample Count %d != %d", sink.GetSamplesRead(), 6285000)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -170,7 +170,7 @@ func (c AnalyzerChannel) SkipEndSamples(samples int) (channel AnalyzerChannel) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c AnalyzerChannel) SkipEndSamplesMultiple(wg *sync.WaitGroup, offset *uint32, samples int) (channel AnalyzerChannel) {
|
func (c AnalyzerChannel) SkipEndSamplesMultiple(wg *sync.WaitGroup, offset *atomic.Uint32, samples int) (channel AnalyzerChannel) {
|
||||||
channel = make(AnalyzerChannel, chanBuf)
|
channel = make(AnalyzerChannel, chanBuf)
|
||||||
go func() {
|
go func() {
|
||||||
defer close(channel)
|
defer close(channel)
|
||||||
|
@ -194,7 +194,7 @@ func (c AnalyzerChannel) SkipEndSamplesMultiple(wg *sync.WaitGroup, offset *uint
|
||||||
}
|
}
|
||||||
|
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
totalSampleOffset := samplesRead + int(atomic.LoadUint32(offset))
|
totalSampleOffset := samplesRead + int(offset.Load())
|
||||||
|
|
||||||
if len(buffer) > 0 {
|
if len(buffer) > 0 {
|
||||||
p := &AnalyzerPacket{
|
p := &AnalyzerPacket{
|
||||||
|
|
|
@ -24,7 +24,7 @@ type Queue struct {
|
||||||
queue []*QueueEntry
|
queue []*QueueEntry
|
||||||
output audio.Source
|
output audio.Source
|
||||||
interrupt chan bool
|
interrupt chan bool
|
||||||
interruptDepth int64
|
interruptDepth atomic.Int64
|
||||||
closed bool
|
closed bool
|
||||||
lock sync.RWMutex
|
lock sync.RWMutex
|
||||||
wg sync.WaitGroup
|
wg sync.WaitGroup
|
||||||
|
@ -121,7 +121,7 @@ func queueLoopStart[T audio.AllowedSourceTypes](q *Queue) {
|
||||||
if len(q.queue) == 0 { //no more entries, wait for interrupt
|
if len(q.queue) == 0 { //no more entries, wait for interrupt
|
||||||
q.lock.RUnlock()
|
q.lock.RUnlock()
|
||||||
<-q.interrupt
|
<-q.interrupt
|
||||||
atomic.AddInt64(&q.interruptDepth, -1)
|
q.interruptDepth.Add(-1)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if current == nil || current.Identifier != q.queue[0].Identifier {
|
if current == nil || current.Identifier != q.queue[0].Identifier {
|
||||||
|
@ -134,7 +134,7 @@ func queueLoopStart[T audio.AllowedSourceTypes](q *Queue) {
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-q.interrupt:
|
case <-q.interrupt:
|
||||||
atomic.AddInt64(&q.interruptDepth, -1)
|
q.interruptDepth.Add(-1)
|
||||||
//force recheck
|
//force recheck
|
||||||
break F1
|
break F1
|
||||||
case block, more := <-currentBlocks:
|
case block, more := <-currentBlocks:
|
||||||
|
@ -324,8 +324,8 @@ func (q *Queue) GetChannels() int {
|
||||||
|
|
||||||
func (q *Queue) sendInterrupt() {
|
func (q *Queue) sendInterrupt() {
|
||||||
//TODO: maybe use len() on channel?
|
//TODO: maybe use len() on channel?
|
||||||
if atomic.LoadInt64(&q.interruptDepth) == 0 { //not waiting on interrupt
|
if q.interruptDepth.Load() == 0 { //not waiting on interrupt
|
||||||
atomic.AddInt64(&q.interruptDepth, 1)
|
q.interruptDepth.Add(1)
|
||||||
q.interrupt <- true
|
q.interrupt <- true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -68,7 +68,7 @@ func TestQueue(t *testing.T) {
|
||||||
if result.GetChannels() != 2 {
|
if result.GetChannels() != 2 {
|
||||||
t.Errorf("Wrong Channel Count %d != %d", result.GetChannels(), 2)
|
t.Errorf("Wrong Channel Count %d != %d", result.GetChannels(), 2)
|
||||||
}
|
}
|
||||||
if sink.SamplesRead != 470828559 {
|
if sink.GetSamplesRead() != 470828559 {
|
||||||
t.Errorf("Wrong Sample Count %d != %d", sink.SamplesRead, 470828559)
|
t.Errorf("Wrong Sample Count %d != %d", sink.GetSamplesRead(), 470828559)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -40,8 +40,8 @@ func (n *NullSink) Process(source Source) {
|
||||||
type ForwardSink struct {
|
type ForwardSink struct {
|
||||||
Sink
|
Sink
|
||||||
Target Sink
|
Target Sink
|
||||||
SamplesRead int64
|
SamplesRead atomic.Uint64
|
||||||
Duration time.Duration
|
Duration atomic.Int64
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewForwardSink(target Sink) *ForwardSink {
|
func NewForwardSink(target Sink) *ForwardSink {
|
||||||
|
@ -51,11 +51,11 @@ func NewForwardSink(target Sink) *ForwardSink {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *ForwardSink) GetDuration() time.Duration {
|
func (n *ForwardSink) GetDuration() time.Duration {
|
||||||
return time.Duration(atomic.LoadInt64((*int64)(&n.Duration)))
|
return time.Duration(n.Duration.Load())
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *ForwardSink) GetSamplesRead() int64 {
|
func (n *ForwardSink) GetSamplesRead() uint64 {
|
||||||
return atomic.LoadInt64(&n.SamplesRead)
|
return n.SamplesRead.Load()
|
||||||
}
|
}
|
||||||
|
|
||||||
func forwardSinkProcess[T AllowedSourceTypes](n *ForwardSink, source TypedSource[T]) {
|
func forwardSinkProcess[T AllowedSourceTypes](n *ForwardSink, source TypedSource[T]) {
|
||||||
|
@ -63,8 +63,8 @@ func forwardSinkProcess[T AllowedSourceTypes](n *ForwardSink, source TypedSource
|
||||||
go func() {
|
go func() {
|
||||||
defer processor.Close()
|
defer processor.Close()
|
||||||
for block := range source.GetBlocks() {
|
for block := range source.GetBlocks() {
|
||||||
atomic.AddInt64((*int64)(&n.Duration), int64((time.Second*time.Duration(len(block)/source.GetChannels()))/time.Duration(source.GetSampleRate())))
|
n.Duration.Add(int64((time.Second * time.Duration(len(block)/source.GetChannels())) / time.Duration(source.GetSampleRate())))
|
||||||
atomic.AddInt64(&n.SamplesRead, int64(len(block)/source.GetChannels()))
|
n.SamplesRead.Add(uint64(len(block) / source.GetChannels()))
|
||||||
processor.IngestNative(block, source.GetBitDepth())
|
processor.IngestNative(block, source.GetBitDepth())
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
Loading…
Reference in a new issue