325 lines
7 KiB
Go
325 lines
7 KiB
Go
package queue
|
|
|
|
import (
|
|
"git.gammaspectra.live/S.O.N.G/Kirika/audio"
|
|
"git.gammaspectra.live/S.O.N.G/Kirika/audio/filter"
|
|
"golang.org/x/exp/slices"
|
|
"log"
|
|
"runtime"
|
|
"sync"
|
|
"sync/atomic"
|
|
)
|
|
|
|
type Identifier int
|
|
|
|
type Queue struct {
|
|
queue []*Entry
|
|
output audio.Source
|
|
closed atomic.Bool
|
|
lock sync.RWMutex
|
|
wg sync.WaitGroup
|
|
identifierCounter Identifier
|
|
}
|
|
|
|
func NewQueue(format audio.SourceFormat, bitDepth, sampleRate, channels int) *Queue {
|
|
if channels != 1 && channels != 2 {
|
|
log.Panicf("not allowed channel number %d", channels)
|
|
}
|
|
|
|
q := &Queue{}
|
|
|
|
switch format {
|
|
case audio.SourceFloat32:
|
|
q.output = audio.NewSource[float32](bitDepth, sampleRate, channels)
|
|
queueLoopStart[float32](q)
|
|
case audio.SourceInt16:
|
|
q.output = audio.NewSource[int16](bitDepth, sampleRate, channels)
|
|
queueLoopStart[int16](q)
|
|
case audio.SourceInt32:
|
|
q.output = audio.NewSource[int32](bitDepth, sampleRate, channels)
|
|
queueLoopStart[int32](q)
|
|
default:
|
|
log.Panicf("not found source format %d", int(format))
|
|
}
|
|
return q
|
|
}
|
|
|
|
func spliceHelper[T audio.AllowedSourceTypes](input audio.TypedSource[T]) (output audio.TypedSource[T], cancel chan struct{}) {
|
|
cancel = make(chan struct{}, 1)
|
|
output = audio.NewSource[T](input.GetBitDepth(), input.GetSampleRate(), input.GetChannels())
|
|
|
|
bitDepth := input.GetBitDepth()
|
|
|
|
sourceChannel := input.GetBlocks()
|
|
go func() {
|
|
defer output.Close()
|
|
L1:
|
|
for {
|
|
select {
|
|
case <-cancel:
|
|
break L1
|
|
case block, more := <-sourceChannel:
|
|
if !more {
|
|
//no more blocks!
|
|
break L1
|
|
} else {
|
|
output.IngestNative(block, bitDepth)
|
|
}
|
|
}
|
|
}
|
|
|
|
input.Unlock()
|
|
|
|
//sink remaining
|
|
go audio.NewNullSink().Process(input)
|
|
}()
|
|
|
|
return
|
|
}
|
|
|
|
func (q *Queue) spliceSources(input audio.Source) (audio.Source, chan struct{}) {
|
|
|
|
switch q.GetSource().GetFormat() {
|
|
case audio.SourceFloat32:
|
|
return spliceHelper(input.ToFloat32())
|
|
case audio.SourceInt16:
|
|
return spliceHelper(input.ToInt16())
|
|
case audio.SourceInt32:
|
|
return spliceHelper(input.ToInt32(input.GetBitDepth()))
|
|
default:
|
|
log.Panicf("not found source format %d", int(input.GetFormat()))
|
|
}
|
|
|
|
return nil, nil
|
|
}
|
|
|
|
func (q *Queue) current() *Entry {
|
|
q.lock.RLock()
|
|
defer q.lock.RUnlock()
|
|
if len(q.queue) > 0 {
|
|
return q.queue[0]
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func queueLoopStart[T audio.AllowedSourceTypes](q *Queue) {
|
|
q.wg.Add(1)
|
|
go func() {
|
|
defer q.wg.Done()
|
|
var current *Entry
|
|
L1:
|
|
for {
|
|
if q.closed.Load() {
|
|
q.output.Close()
|
|
break L1
|
|
}
|
|
|
|
if func() bool {
|
|
currentEntry := q.current()
|
|
if currentEntry == nil {
|
|
return current == nil
|
|
} else if current == nil || current.Identifier != currentEntry.Identifier {
|
|
current = currentEntry
|
|
}
|
|
return false
|
|
}() {
|
|
runtime.Gosched()
|
|
continue
|
|
}
|
|
|
|
func() {
|
|
currentBlocks := current.Source.(audio.TypedSource[T]).GetBlocks()
|
|
defer current.Source.Unlock()
|
|
|
|
if current.StartCallback != nil && current.ReadSamples.Load() == 0 {
|
|
current.StartCallback(q, current)
|
|
}
|
|
|
|
output := q.output.(audio.TypedSource[T])
|
|
|
|
for {
|
|
select {
|
|
case <-current.Done():
|
|
//song has been cancelled elsewhere
|
|
return
|
|
case block, more := <-currentBlocks:
|
|
if !more {
|
|
//no more blocks to read
|
|
current.Cancel()
|
|
return
|
|
} else {
|
|
current.ReadSamples.Add(uint64(len(block) / current.Source.GetChannels()))
|
|
output.IngestNative(block, current.Source.GetBitDepth())
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
|
|
{
|
|
//no more blocks! skip
|
|
if current.EndCallback != nil {
|
|
current.EndCallback(q, current)
|
|
}
|
|
q.Remove(current.Identifier)
|
|
}
|
|
}
|
|
|
|
}()
|
|
}
|
|
|
|
func (q *Queue) getFilterChain(source audio.Source) audio.Source {
|
|
if q.GetChannels() == 1 {
|
|
return filter.NewFilterChain(source, filter.MonoFilter{}, filter.NewResampleFilter(q.GetSampleRate(), filter.QualityFastest, 0), filter.SourceFormatFilter{
|
|
Format: q.GetSource().GetFormat(),
|
|
})
|
|
} else {
|
|
return filter.NewFilterChain(source, filter.StereoFilter{}, filter.NewResampleFilter(q.GetSampleRate(), filter.QualityFastest, 0), filter.SourceFormatFilter{
|
|
Format: q.GetSource().GetFormat(),
|
|
})
|
|
}
|
|
}
|
|
|
|
func (q *Queue) AddHead(source audio.Source, startCallback, endCallback, removeCallback EntryCallback) (identifier Identifier) {
|
|
q.lock.Lock()
|
|
defer q.lock.Unlock()
|
|
|
|
splicedOutput, cancel := q.spliceSources(source)
|
|
identifier = q.identifierCounter
|
|
|
|
entry := NewEntry(identifier, q.getFilterChain(splicedOutput), cancel, startCallback, endCallback, removeCallback)
|
|
if len(q.queue) > 0 {
|
|
q.queue = append(q.queue[:1], append([]*Entry{entry}, q.queue[1:]...)...)
|
|
} else {
|
|
q.queue = append(q.queue, entry)
|
|
}
|
|
q.identifierCounter++
|
|
|
|
return
|
|
}
|
|
|
|
func (q *Queue) AddTail(source audio.Source, startCallback, endCallback, removeCallback EntryCallback) (identifier Identifier) {
|
|
q.lock.Lock()
|
|
defer q.lock.Unlock()
|
|
|
|
splicedOutput, cancel := q.spliceSources(source)
|
|
identifier = q.identifierCounter
|
|
|
|
entry := NewEntry(identifier, q.getFilterChain(splicedOutput), cancel, startCallback, endCallback, removeCallback)
|
|
|
|
q.queue = append(q.queue, entry)
|
|
q.identifierCounter++
|
|
|
|
return
|
|
}
|
|
|
|
func (q *Queue) IsClosed() bool {
|
|
return q.closed.Load()
|
|
}
|
|
|
|
func (q *Queue) Remove(identifier Identifier) bool {
|
|
var entry *Entry
|
|
|
|
func() {
|
|
q.lock.Lock()
|
|
defer q.lock.Unlock()
|
|
|
|
if i := slices.IndexFunc(q.queue, func(e *Entry) bool {
|
|
return e.Identifier == identifier
|
|
}); i != -1 {
|
|
e := q.queue[i]
|
|
e.Cancel()
|
|
|
|
e.Source.Unlock()
|
|
|
|
go audio.NewNullSink().Process(e.Source)
|
|
q.queue = slices.Delete(q.queue, i, i+1)
|
|
entry = e
|
|
}
|
|
}()
|
|
|
|
if entry != nil {
|
|
if entry.RemoveCallback != nil {
|
|
entry.RemoveCallback(q, entry)
|
|
}
|
|
return true
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
func (q *Queue) GetQueueHead() *Entry {
|
|
q.lock.RLock()
|
|
defer q.lock.RUnlock()
|
|
if len(q.queue) > 0 {
|
|
return q.queue[0]
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (q *Queue) GetQueueTail() (index int, entry *Entry) {
|
|
q.lock.RLock()
|
|
defer q.lock.RUnlock()
|
|
if len(q.queue) > 0 {
|
|
return len(q.queue) - 1, q.queue[len(q.queue)-1]
|
|
}
|
|
return 0, nil
|
|
}
|
|
|
|
func (q *Queue) GetQueueIndex(index int) *Entry {
|
|
q.lock.RLock()
|
|
defer q.lock.RUnlock()
|
|
if len(q.queue) > index {
|
|
return q.queue[index]
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (q *Queue) GetQueueEntry(identifier Identifier) (index int, entry *Entry) {
|
|
q.lock.RLock()
|
|
defer q.lock.RUnlock()
|
|
for i, e := range q.queue {
|
|
if e.Identifier == identifier {
|
|
return i, e
|
|
}
|
|
}
|
|
return -1, nil
|
|
}
|
|
|
|
func (q *Queue) GetQueueSize() int {
|
|
q.lock.RLock()
|
|
defer q.lock.RUnlock()
|
|
|
|
return len(q.queue)
|
|
}
|
|
|
|
func (q *Queue) GetQueue() (entries []*Entry) {
|
|
q.lock.RLock()
|
|
defer q.lock.RUnlock()
|
|
|
|
return slices.Clone(q.queue)
|
|
}
|
|
|
|
func (q *Queue) GetSource() audio.Source {
|
|
return q.output
|
|
}
|
|
|
|
func (q *Queue) GetSampleRate() int {
|
|
return q.GetSource().GetSampleRate()
|
|
}
|
|
|
|
func (q *Queue) GetChannels() int {
|
|
return q.GetSource().GetChannels()
|
|
}
|
|
|
|
func (q *Queue) Close() {
|
|
if !q.closed.Swap(true) {
|
|
if current := q.current(); current != nil {
|
|
current.Cancel()
|
|
}
|
|
}
|
|
}
|
|
|
|
func (q *Queue) Wait() {
|
|
q.wg.Wait()
|
|
}
|