Added Queue, buffer filter
All checks were successful
continuous-integration/drone/push Build is passing

This commit is contained in:
DataHoarder 2022-02-23 11:41:11 +01:00
parent 53375704b4
commit 4f6b2801ea
3 changed files with 325 additions and 0 deletions

View file

@ -167,6 +167,7 @@ func TestHasher16(t *testing.T) {
}
func TestFilterChain(t *testing.T) {
t.Parallel()
fp, err := os.Open(TestSingleSample24)
if err != nil {
t.Error(err)
@ -196,3 +197,66 @@ func TestFilterChain(t *testing.T) {
t.Errorf("Wrong Sample Count %d != %d", sink.SamplesRead, 6284999)
}
}
func TestQueue(t *testing.T) {
const sampleRate = 41000
q := audio.NewQueue(sampleRate, 2, 2)
flacFormat := flac.NewFormat()
for _, location := range TestSampleLocations {
entries, err := os.ReadDir(location)
if err != nil {
t.Error(err)
return
}
for _, f := range entries {
if path.Ext(f.Name()) == ".flac" {
fullPath := path.Join(location, f.Name())
fp, err := os.Open(fullPath)
if err != nil {
t.Error(err)
return
}
source, err := flacFormat.Open(fp)
if err != nil {
fp.Close()
t.Error(err)
return
}
q.Add(source, func(q *audio.Queue, entry *audio.QueueEntry) {
t.Logf("Started playback of %d %s\n", entry.Identifier, fullPath)
}, func(q *audio.Queue, entry *audio.QueueEntry) {
if len(q.GetQueue()) == 0 {
t.Log("Finished playback, closing\n")
q.Close()
}
t.Logf("Finished playback of %d %s: output %d samples\n", entry.Identifier, fullPath, entry.ReadSamples)
fp.Close()
})
}
}
}
//Decode
result := q.GetSource()
sink := audio.NewForwardSink(audio.NewNullSink())
sink.Process(result)
q.Wait()
if result.SampleRate != sampleRate {
t.Errorf("Wrong SampleRate %d != %d", result.SampleRate, sampleRate)
}
if result.Channels != 2 {
t.Errorf("Wrong Channel Count %d != %d", result.SampleRate, 2)
}
if sink.SamplesRead != 218834361 {
t.Errorf("Wrong Sample Count %d != %d", sink.SamplesRead, 218834361)
}
}

View file

@ -22,6 +22,32 @@ func NewFilterChain(source Source, filters ...Filter) Source {
return source
}
type BufferFilter struct {
blockBufferSize int
}
func NewBufferFilter(blockBufferSize int) BufferFilter {
return BufferFilter{
blockBufferSize: blockBufferSize,
}
}
func (f BufferFilter) Process(source Source) Source {
outBlocks := make(chan []float32, f.blockBufferSize)
go func() {
defer close(outBlocks)
for block := range source.Blocks {
outBlocks <- block
}
}()
return Source{
Channels: source.Channels,
SampleRate: source.SampleRate,
Blocks: outBlocks,
}
}
type RealTimeFilter struct {
}

235
audio/queue.go Normal file
View file

@ -0,0 +1,235 @@
package audio
import (
"log"
"sync"
"sync/atomic"
)
type QueueIdentifier int
type QueueEntry struct {
Identifier QueueIdentifier
Source Source
ReadSamples int
cancel chan bool
StartCallback func(q *Queue, entry *QueueEntry)
EndCallback func(q *Queue, entry *QueueEntry)
}
type Queue struct {
blockBufferSize int
queue []*QueueEntry
output Source
interrupt chan bool
interruptDepth int64
closed bool
lock sync.RWMutex
wg sync.WaitGroup
identifierCounter QueueIdentifier
}
func NewQueue(sampleRate, channels, blockBufferSize int) *Queue {
if channels != 1 && channels != 2 {
log.Panicf("not allowed channel number %d", channels)
}
q := &Queue{
blockBufferSize: blockBufferSize,
interrupt: make(chan bool, 1),
output: Source{
SampleRate: sampleRate,
Channels: channels,
Blocks: make(chan []float32),
},
}
q.start()
return q
}
func (q *Queue) spliceSources(input Source) (output Source, cancel chan bool) {
cancel = make(chan bool, 1)
output = Source{
Channels: input.Channels,
SampleRate: input.SampleRate,
Blocks: make(chan []float32),
}
go func() {
defer close(output.Blocks)
L:
for range input.Blocks {
select {
case <-cancel:
break L
case block, more := <-input.Blocks:
if !more {
//no more blocks!
break L
} else {
output.Blocks <- block
}
}
}
//sink remaining
go NewNullSink().Process(input)
}()
return
}
func (q *Queue) start() {
q.wg.Add(1)
go func() {
defer q.wg.Done()
var current *QueueEntry
L:
for {
q.lock.RLock()
if q.closed {
close(q.output.Blocks)
break L
}
if len(q.queue) == 0 { //no more entries, wait for interrupt
q.lock.RUnlock()
<-q.interrupt
atomic.AddInt64(&q.interruptDepth, -1)
continue
}
current = q.queue[0]
q.lock.RUnlock()
F:
for {
select {
case <-q.interrupt:
atomic.AddInt64(&q.interruptDepth, -1)
//force recheck
break F
case block, more := <-current.Source.Blocks:
if !more {
//no more blocks! skip
q.Remove(current.Identifier)
if current.EndCallback != nil {
current.EndCallback(q, current)
}
break F
} else {
if current.StartCallback != nil && current.ReadSamples == 0 && len(block) > 0 {
current.StartCallback(q, current)
}
current.ReadSamples += len(block) / current.Source.Channels
q.output.Blocks <- block
}
}
}
}
}()
}
func (q *Queue) getFilterChain(source Source) Source {
if q.GetChannels() == 1 {
return NewFilterChain(source, MonoFilter{}, NewResampleFilter(q.GetSampleRate(), BandlimitedFastest, 0), NewBufferFilter(q.blockBufferSize))
} else {
return NewFilterChain(source, StereoFilter{}, NewResampleFilter(q.GetSampleRate(), BandlimitedFastest, 0), NewBufferFilter(q.blockBufferSize))
}
}
func (q *Queue) Add(source Source, startPlayback func(q *Queue, entry *QueueEntry), endPlayback func(q *Queue, entry *QueueEntry)) (identifier QueueIdentifier) {
q.lock.Lock()
splicedOutput, cancel := q.spliceSources(source)
identifier = q.identifierCounter
q.queue = append(q.queue, &QueueEntry{
Identifier: identifier,
Source: q.getFilterChain(splicedOutput),
cancel: cancel,
StartCallback: startPlayback,
EndCallback: endPlayback,
})
q.identifierCounter++
q.lock.Unlock()
q.sendInterrupt()
return
}
func (q *Queue) IsClosed() bool {
q.lock.RLock()
defer q.lock.RUnlock()
return q.closed
}
func (q *Queue) Remove(identifier QueueIdentifier) bool {
q.lock.Lock()
defer q.lock.Unlock()
for i, e := range q.queue {
if e.Identifier == identifier {
q.sendInterrupt()
e.cancel <- true
go NewNullSink().Process(e.Source)
//delete entry
q.queue = append(q.queue[:i], q.queue[i+1:]...)
return true
}
}
return false
}
func (q *Queue) GetQueueEntry(identifier QueueIdentifier) (int, *QueueEntry) {
q.lock.RLock()
defer q.lock.RUnlock()
for i, e := range q.queue {
if e.Identifier == identifier {
return i, e
}
}
return -1, nil
}
func (q *Queue) GetQueue() (ids []QueueIdentifier) {
q.lock.RLock()
defer q.lock.RUnlock()
for _, e := range q.queue {
ids = append(ids, e.Identifier)
}
return
}
func (q *Queue) GetSource() Source {
return q.output
}
func (q *Queue) GetSampleRate() int {
return q.output.SampleRate
}
func (q *Queue) GetChannels() int {
return q.output.Channels
}
func (q *Queue) sendInterrupt() {
if atomic.LoadInt64(&q.interruptDepth) == 0 { //not waiting on interrupt
atomic.AddInt64(&q.interruptDepth, 1)
q.interrupt <- true
}
}
func (q *Queue) Close() {
if !q.closed {
q.closed = true
q.sendInterrupt()
}
}
func (q *Queue) Wait() {
q.wg.Wait()
}