Kirika/audio/queue/queue.go
2022-05-20 17:23:50 +02:00

307 lines
6.6 KiB
Go

package queue
import (
"git.gammaspectra.live/S.O.N.G/Kirika/audio"
"git.gammaspectra.live/S.O.N.G/Kirika/audio/filter"
"log"
"sync"
"sync/atomic"
)
type QueueIdentifier int
type QueueEntry struct {
Identifier QueueIdentifier
Source audio.Source
ReadSamples int
cancel chan bool
StartCallback func(q *Queue, entry *QueueEntry)
EndCallback func(q *Queue, entry *QueueEntry)
RemoveCallback func(q *Queue, entry *QueueEntry)
}
type Queue struct {
queue []*QueueEntry
output audio.Source
interrupt chan bool
interruptDepth int64
closed bool
lock sync.RWMutex
wg sync.WaitGroup
identifierCounter QueueIdentifier
}
func NewQueue(sampleRate, channels int) *Queue {
if channels != 1 && channels != 2 {
log.Panicf("not allowed channel number %d", channels)
}
q := &Queue{
interrupt: make(chan bool, 1),
output: audio.Source{
SampleRate: sampleRate,
Channels: channels,
Blocks: make(chan []float32),
},
}
q.start()
return q
}
func (q *Queue) spliceSources(input audio.Source) (output audio.Source, cancel chan bool) {
cancel = make(chan bool, 1)
output = audio.Source{
Channels: input.Channels,
SampleRate: input.SampleRate,
Blocks: make(chan []float32),
}
go func() {
defer close(output.Blocks)
L:
for {
select {
case <-cancel:
break L
case block, more := <-input.Blocks:
if !more {
//no more blocks!
break L
} else {
output.Blocks <- block
}
}
}
//sink remaining
go audio.NewNullSink().Process(input)
}()
return
}
func (q *Queue) start() {
q.wg.Add(1)
go func() {
defer q.wg.Done()
var current *QueueEntry
L:
for {
q.lock.RLock()
if q.closed {
close(q.output.Blocks)
break L
}
if len(q.queue) == 0 { //no more entries, wait for interrupt
q.lock.RUnlock()
<-q.interrupt
atomic.AddInt64(&q.interruptDepth, -1)
continue
}
current = q.queue[0]
q.lock.RUnlock()
F:
for {
select {
case <-q.interrupt:
atomic.AddInt64(&q.interruptDepth, -1)
//force recheck
break F
case block, more := <-current.Source.Blocks:
if !more {
//no more blocks! skip
if current.EndCallback != nil {
current.EndCallback(q, current)
}
q.Remove(current.Identifier)
break F
} else {
if current.StartCallback != nil && current.ReadSamples == 0 && len(block) > 0 {
current.StartCallback(q, current)
}
current.ReadSamples += len(block) / current.Source.Channels
q.output.Blocks <- block
}
}
}
}
}()
}
func (q *Queue) getFilterChain(source audio.Source) audio.Source {
if q.GetChannels() == 1 {
return filter.NewFilterChain(source, filter.MonoFilter{}, filter.NewResampleFilter(q.GetSampleRate(), filter.QualityFastest, 0))
} else {
return filter.NewFilterChain(source, filter.StereoFilter{}, filter.NewResampleFilter(q.GetSampleRate(), filter.QualityFastest, 0))
}
}
func (q *Queue) AddHead(source audio.Source, startCallback func(q *Queue, entry *QueueEntry), endCallback func(q *Queue, entry *QueueEntry), removeCallback func(q *Queue, entry *QueueEntry)) (identifier QueueIdentifier) {
q.lock.Lock()
splicedOutput, cancel := q.spliceSources(source)
identifier = q.identifierCounter
if len(q.queue) > 0 {
q.queue = append(q.queue[:1], append([]*QueueEntry{&QueueEntry{
Identifier: identifier,
Source: q.getFilterChain(splicedOutput),
cancel: cancel,
StartCallback: startCallback,
EndCallback: endCallback,
RemoveCallback: removeCallback,
}}, q.queue[1:]...)...)
} else {
q.queue = append(q.queue, &QueueEntry{
Identifier: identifier,
Source: q.getFilterChain(splicedOutput),
cancel: cancel,
StartCallback: startCallback,
EndCallback: endCallback,
RemoveCallback: removeCallback,
})
}
q.identifierCounter++
q.lock.Unlock()
q.sendInterrupt()
return
}
func (q *Queue) AddTail(source audio.Source, startCallback func(q *Queue, entry *QueueEntry), endCallback func(q *Queue, entry *QueueEntry), removeCallback func(q *Queue, entry *QueueEntry)) (identifier QueueIdentifier) {
q.lock.Lock()
splicedOutput, cancel := q.spliceSources(source)
identifier = q.identifierCounter
q.queue = append(q.queue, &QueueEntry{
Identifier: identifier,
Source: q.getFilterChain(splicedOutput),
cancel: cancel,
StartCallback: startCallback,
EndCallback: endCallback,
RemoveCallback: removeCallback,
})
q.identifierCounter++
q.lock.Unlock()
q.sendInterrupt()
return
}
func (q *Queue) IsClosed() bool {
q.lock.RLock()
defer q.lock.RUnlock()
return q.closed
}
func (q *Queue) Remove(identifier QueueIdentifier) bool {
q.lock.Lock()
for i, e := range q.queue {
if e.Identifier == identifier {
q.sendInterrupt()
e.cancel <- true
go audio.NewNullSink().Process(e.Source)
//delete entry
q.queue = append(q.queue[:i], q.queue[i+1:]...)
q.lock.Unlock()
if e.RemoveCallback != nil {
e.RemoveCallback(q, e)
}
return true
}
}
q.lock.Unlock()
return false
}
func (q *Queue) GetQueueHead() *QueueEntry {
q.lock.RLock()
defer q.lock.RUnlock()
if len(q.queue) > 0 {
return q.queue[0]
}
return nil
}
func (q *Queue) GetQueueTail() (index int, entry *QueueEntry) {
q.lock.RLock()
defer q.lock.RUnlock()
if len(q.queue) > 0 {
return len(q.queue) - 1, q.queue[len(q.queue)-1]
}
return 0, nil
}
func (q *Queue) GetQueueIndex(index int) *QueueEntry {
q.lock.RLock()
defer q.lock.RUnlock()
if len(q.queue) > index {
return q.queue[index]
}
return nil
}
func (q *Queue) GetQueueEntry(identifier QueueIdentifier) (index int, entry *QueueEntry) {
q.lock.RLock()
defer q.lock.RUnlock()
for i, e := range q.queue {
if e.Identifier == identifier {
return i, e
}
}
return -1, nil
}
func (q *Queue) GetQueueSize() int {
q.lock.RLock()
defer q.lock.RUnlock()
return len(q.queue)
}
func (q *Queue) GetQueue() (entries []*QueueEntry) {
q.lock.RLock()
defer q.lock.RUnlock()
entries = make([]*QueueEntry, len(q.queue))
copy(entries, q.queue)
return
}
func (q *Queue) GetSource() audio.Source {
return q.output
}
func (q *Queue) GetSampleRate() int {
return q.output.SampleRate
}
func (q *Queue) GetChannels() int {
return q.output.Channels
}
func (q *Queue) sendInterrupt() {
//TODO: maybe use len() on channel?
if atomic.LoadInt64(&q.interruptDepth) == 0 { //not waiting on interrupt
atomic.AddInt64(&q.interruptDepth, 1)
q.interrupt <- true
}
}
func (q *Queue) Close() {
if !q.closed {
q.closed = true
q.sendInterrupt()
}
}
func (q *Queue) Wait() {
q.wg.Wait()
}