package queue import ( "git.gammaspectra.live/S.O.N.G/Kirika/audio" "git.gammaspectra.live/S.O.N.G/Kirika/audio/filter" "golang.org/x/exp/slices" "log" "runtime" "sync" "sync/atomic" ) type Identifier int type Queue struct { queue []*Entry output audio.Source closed atomic.Bool lock sync.RWMutex wg sync.WaitGroup identifierCounter Identifier } func NewQueue(format audio.SourceFormat, bitDepth, sampleRate, channels int) *Queue { if channels != 1 && channels != 2 { log.Panicf("not allowed channel number %d", channels) } q := &Queue{} switch format { case audio.SourceFloat32: q.output = audio.NewSource[float32](bitDepth, sampleRate, channels) queueLoopStart[float32](q) case audio.SourceInt16: q.output = audio.NewSource[int16](bitDepth, sampleRate, channels) queueLoopStart[int16](q) case audio.SourceInt32: q.output = audio.NewSource[int32](bitDepth, sampleRate, channels) queueLoopStart[int32](q) default: log.Panicf("not found source format %d", int(format)) } return q } func spliceHelper[T audio.AllowedSourceTypes](input audio.TypedSource[T]) (output audio.TypedSource[T], cancel chan struct{}) { cancel = make(chan struct{}, 1) output = audio.NewSource[T](input.GetBitDepth(), input.GetSampleRate(), input.GetChannels()) bitDepth := input.GetBitDepth() sourceChannel := input.GetBlocks() go func() { defer output.Close() L1: for { select { case <-cancel: break L1 case block, more := <-sourceChannel: if !more { //no more blocks! break L1 } else { output.IngestNative(block, bitDepth) } } } input.Unlock() //sink remaining go audio.NewNullSink().Process(input) }() return } func (q *Queue) spliceSources(input audio.Source) (audio.Source, chan struct{}) { switch q.GetSource().GetFormat() { case audio.SourceFloat32: return spliceHelper(input.ToFloat32()) case audio.SourceInt16: return spliceHelper(input.ToInt16()) case audio.SourceInt32: return spliceHelper(input.ToInt32(input.GetBitDepth())) default: log.Panicf("not found source format %d", int(input.GetFormat())) } return nil, nil } func (q *Queue) current() *Entry { q.lock.RLock() defer q.lock.RUnlock() if len(q.queue) > 0 { return q.queue[0] } return nil } func queueLoopStart[T audio.AllowedSourceTypes](q *Queue) { q.wg.Add(1) go func() { defer q.wg.Done() var current *Entry L1: for { if q.closed.Load() { q.output.Close() break L1 } if func() bool { currentEntry := q.current() if currentEntry == nil { return current == nil } else if current == nil || current.Identifier != currentEntry.Identifier { current = currentEntry } return false }() { runtime.Gosched() continue } func() { currentBlocks := current.Source.(audio.TypedSource[T]).GetBlocks() defer current.Source.Unlock() if current.StartCallback != nil && current.ReadSamples.Load() == 0 { current.StartCallback(q, current) } output := q.output.(audio.TypedSource[T]) for { select { case <-current.Done(): //song has been cancelled elsewhere return case block, more := <-currentBlocks: if !more { //no more blocks to read current.Cancel() return } else { current.ReadSamples.Add(uint64(len(block) / current.Source.GetChannels())) output.IngestNative(block, current.Source.GetBitDepth()) } } } }() { //no more blocks! skip if current.EndCallback != nil { current.EndCallback(q, current) } q.Remove(current.Identifier) } } }() } func (q *Queue) getFilterChain(source audio.Source) audio.Source { if q.GetChannels() == 1 { return filter.NewFilterChain(source, filter.MonoFilter{}, filter.NewResampleFilter(q.GetSampleRate(), filter.QualityFastest, 0), filter.SourceFormatFilter{ Format: q.GetSource().GetFormat(), }) } else { return filter.NewFilterChain(source, filter.StereoFilter{}, filter.NewResampleFilter(q.GetSampleRate(), filter.QualityFastest, 0), filter.SourceFormatFilter{ Format: q.GetSource().GetFormat(), }) } } func (q *Queue) AddHead(source audio.Source, startCallback, endCallback, removeCallback EntryCallback) (identifier Identifier) { q.lock.Lock() defer q.lock.Unlock() splicedOutput, cancel := q.spliceSources(source) identifier = q.identifierCounter entry := NewEntry(identifier, q.getFilterChain(splicedOutput), cancel, startCallback, endCallback, removeCallback) if len(q.queue) > 0 { q.queue = append(q.queue[:1], append([]*Entry{entry}, q.queue[1:]...)...) } else { q.queue = append(q.queue, entry) } q.identifierCounter++ return } func (q *Queue) AddTail(source audio.Source, startCallback, endCallback, removeCallback EntryCallback) (identifier Identifier) { q.lock.Lock() defer q.lock.Unlock() splicedOutput, cancel := q.spliceSources(source) identifier = q.identifierCounter entry := NewEntry(identifier, q.getFilterChain(splicedOutput), cancel, startCallback, endCallback, removeCallback) q.queue = append(q.queue, entry) q.identifierCounter++ return } func (q *Queue) IsClosed() bool { return q.closed.Load() } func (q *Queue) Remove(identifier Identifier) bool { var entry *Entry func() { q.lock.Lock() defer q.lock.Unlock() if i := slices.IndexFunc(q.queue, func(e *Entry) bool { return e.Identifier == identifier }); i != -1 { e := q.queue[i] e.Cancel() e.Source.Unlock() go audio.NewNullSink().Process(e.Source) q.queue = slices.Delete(q.queue, i, i+1) entry = e } }() if entry != nil { if entry.RemoveCallback != nil { entry.RemoveCallback(q, entry) } return true } return false } func (q *Queue) GetQueueHead() *Entry { q.lock.RLock() defer q.lock.RUnlock() if len(q.queue) > 0 { return q.queue[0] } return nil } func (q *Queue) GetQueueTail() (index int, entry *Entry) { q.lock.RLock() defer q.lock.RUnlock() if len(q.queue) > 0 { return len(q.queue) - 1, q.queue[len(q.queue)-1] } return 0, nil } func (q *Queue) GetQueueIndex(index int) *Entry { q.lock.RLock() defer q.lock.RUnlock() if len(q.queue) > index { return q.queue[index] } return nil } func (q *Queue) GetQueueEntry(identifier Identifier) (index int, entry *Entry) { q.lock.RLock() defer q.lock.RUnlock() for i, e := range q.queue { if e.Identifier == identifier { return i, e } } return -1, nil } func (q *Queue) GetQueueSize() int { q.lock.RLock() defer q.lock.RUnlock() return len(q.queue) } func (q *Queue) GetQueue() (entries []*Entry) { q.lock.RLock() defer q.lock.RUnlock() return slices.Clone(q.queue) } func (q *Queue) GetSource() audio.Source { return q.output } func (q *Queue) GetSampleRate() int { return q.GetSource().GetSampleRate() } func (q *Queue) GetChannels() int { return q.GetSource().GetChannels() } func (q *Queue) Close() { if !q.closed.Swap(true) { if current := q.current(); current != nil { current.Cancel() } } } func (q *Queue) Wait() { q.wg.Wait() }