Remove buffer parameter on queue, added RemoveCallback
Some checks failed
continuous-integration/drone/push Build is failing

This commit is contained in:
DataHoarder 2022-03-02 10:56:41 +01:00
parent 8254100fb5
commit 0d8d4f40bb
2 changed files with 19 additions and 16 deletions

View file

@ -661,9 +661,9 @@ func TestEncodeAACHE(t *testing.T) {
func TestQueue(t *testing.T) {
t.Parallel()
const sampleRate = 41000
const sampleRate = 44100
q := audio.NewQueue(sampleRate, 2, 2)
q := audio.NewQueue(sampleRate, 2)
flacFormat := flac.NewFormat()
for _, location := range TestSampleLocations {

View file

@ -9,16 +9,16 @@ import (
type QueueIdentifier int
type QueueEntry struct {
Identifier QueueIdentifier
Source Source
ReadSamples int
cancel chan bool
StartCallback func(q *Queue, entry *QueueEntry)
EndCallback func(q *Queue, entry *QueueEntry)
Identifier QueueIdentifier
Source Source
ReadSamples int
cancel chan bool
StartCallback func(q *Queue, entry *QueueEntry)
EndCallback func(q *Queue, entry *QueueEntry)
RemoveCallback func(q *Queue, entry *QueueEntry)
}
type Queue struct {
blockBufferSize int
queue []*QueueEntry
output Source
interrupt chan bool
@ -29,14 +29,13 @@ type Queue struct {
identifierCounter QueueIdentifier
}
func NewQueue(sampleRate, channels, blockBufferSize int) *Queue {
func NewQueue(sampleRate, channels int) *Queue {
if channels != 1 && channels != 2 {
log.Panicf("not allowed channel number %d", channels)
}
q := &Queue{
blockBufferSize: blockBufferSize,
interrupt: make(chan bool, 1),
interrupt: make(chan bool, 1),
output: Source{
SampleRate: sampleRate,
Channels: channels,
@ -110,10 +109,10 @@ func (q *Queue) start() {
case block, more := <-current.Source.Blocks:
if !more {
//no more blocks! skip
q.Remove(current.Identifier)
if current.EndCallback != nil {
current.EndCallback(q, current)
}
q.Remove(current.Identifier)
break F
} else {
if current.StartCallback != nil && current.ReadSamples == 0 && len(block) > 0 {
@ -132,9 +131,9 @@ func (q *Queue) start() {
}
func (q *Queue) getFilterChain(source Source) Source {
if q.GetChannels() == 1 {
return NewFilterChain(source, MonoFilter{}, NewResampleFilter(q.GetSampleRate(), BandlimitedFastest, 0), NewBufferFilter(q.blockBufferSize))
return NewFilterChain(source, MonoFilter{}, NewResampleFilter(q.GetSampleRate(), BandlimitedFastest, 0))
} else {
return NewFilterChain(source, StereoFilter{}, NewResampleFilter(q.GetSampleRate(), BandlimitedFastest, 0), NewBufferFilter(q.blockBufferSize))
return NewFilterChain(source, StereoFilter{}, NewResampleFilter(q.GetSampleRate(), BandlimitedFastest, 0))
}
}
@ -194,7 +193,6 @@ func (q *Queue) IsClosed() bool {
func (q *Queue) Remove(identifier QueueIdentifier) bool {
q.lock.Lock()
defer q.lock.Unlock()
for i, e := range q.queue {
if e.Identifier == identifier {
@ -203,9 +201,14 @@ func (q *Queue) Remove(identifier QueueIdentifier) bool {
go NewNullSink().Process(e.Source)
//delete entry
q.queue = append(q.queue[:i], q.queue[i+1:]...)
q.lock.Unlock()
if e.RemoveCallback != nil {
e.RemoveCallback(q, e)
}
return true
}
}
q.lock.Unlock()
return false