DataHoarder
68e7cfca07
All checks were successful
continuous-integration/drone/push Build is passing
557 lines
17 KiB
Go
557 lines
17 KiB
Go
package queue
|
|
|
|
import (
|
|
"crypto/sha256"
|
|
"encoding/hex"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"git.gammaspectra.live/S.O.N.G/Kirika/audio"
|
|
"git.gammaspectra.live/S.O.N.G/Kirika/audio/filter"
|
|
"git.gammaspectra.live/S.O.N.G/Kirika/audio/queue"
|
|
"git.gammaspectra.live/S.O.N.G/Kirika/audio/replaygain"
|
|
"git.gammaspectra.live/S.O.N.G/MeteorLight/config"
|
|
"git.gammaspectra.live/S.O.N.G/MeteorLight/listener"
|
|
"git.gammaspectra.live/S.O.N.G/MeteorLight/listener/aps1"
|
|
"git.gammaspectra.live/S.O.N.G/MeteorLight/listener/icy"
|
|
"git.gammaspectra.live/S.O.N.G/MeteorLight/listener/plain"
|
|
"git.gammaspectra.live/S.O.N.G/MeteorLight/queue/metadata"
|
|
"git.gammaspectra.live/S.O.N.G/MeteorLight/queue/track"
|
|
"git.gammaspectra.live/S.O.N.G/MeteorLight/stream"
|
|
"git.gammaspectra.live/S.O.N.G/MeteorLight/util"
|
|
"golang.org/x/exp/slices"
|
|
"log"
|
|
"net/http"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
)
|
|
|
|
type Queue struct {
|
|
NowPlaying chan *track.Entry
|
|
QueueEmpty chan *track.Entry
|
|
duration atomic.Int64
|
|
durationError int64
|
|
audioQueue *queue.Queue
|
|
mounts []*stream.Mount
|
|
queue []*track.Entry
|
|
mutex sync.RWMutex
|
|
config *config.Config
|
|
wg sync.WaitGroup
|
|
}
|
|
|
|
func NewQueue(conf *config.Config) *Queue {
|
|
if conf.Queue.SampleRate <= 0 {
|
|
conf.Queue.SampleRate = 44100
|
|
}
|
|
|
|
sampleFormat := audio.SourceInt16
|
|
bitDepth := 16
|
|
switch conf.Queue.SampleFormat {
|
|
case "f32", "float", "float32", "f32le":
|
|
sampleFormat = audio.SourceFloat32
|
|
bitDepth = 0
|
|
case "i32", "s32", "int32", "int", "s32le":
|
|
sampleFormat = audio.SourceInt32
|
|
bitDepth = 32
|
|
case "i16", "s16", "int16", "s16le":
|
|
sampleFormat = audio.SourceInt16
|
|
bitDepth = 16
|
|
}
|
|
if conf.Queue.BitDepth > 0 {
|
|
bitDepth = conf.Queue.BitDepth
|
|
}
|
|
|
|
q := &Queue{
|
|
NowPlaying: make(chan *track.Entry, 1),
|
|
QueueEmpty: make(chan *track.Entry),
|
|
config: conf,
|
|
audioQueue: queue.NewQueue(sampleFormat, bitDepth, conf.Queue.SampleRate, 2),
|
|
}
|
|
blocksPerSecond := 20
|
|
|
|
sources := filter.NewFilterChain(q.audioQueue.GetSource(), filter.NewBufferFilter(16), filter.NewRealTimeFilter(blocksPerSecond), filter.NewBufferFilter(config.MaxBufferSize*blocksPerSecond)).Split(len(conf.Streams))
|
|
for i, s := range q.config.Streams {
|
|
mount := stream.NewStreamMount(sources[i], s)
|
|
if mount == nil {
|
|
log.Panicf("could not initialize %s\n", s.MountPath)
|
|
}
|
|
q.mounts = append(q.mounts, mount)
|
|
q.wg.Add(1)
|
|
go mount.Process(&q.wg)
|
|
}
|
|
|
|
return q
|
|
}
|
|
|
|
func (q *Queue) GetDuration() time.Duration {
|
|
return time.Duration(q.duration.Load())
|
|
}
|
|
|
|
func (q *Queue) Wait() {
|
|
q.wg.Wait()
|
|
close(q.NowPlaying)
|
|
}
|
|
|
|
func (q *Queue) AddTrack(entry *track.Entry, tail bool) error {
|
|
|
|
if err := entry.Load(); err != nil {
|
|
return err
|
|
}
|
|
|
|
startCallback := func(queue *queue.Queue, queueEntry *queue.Entry) {
|
|
if e := q.Get(queueEntry.Identifier); e != nil { //is this needed?
|
|
log.Printf("now playing \"%s\": %s - %s (%s)\n", e.Path, e.Metadata.Title, e.Metadata.Artist, e.Metadata.Album)
|
|
q.NowPlaying <- e
|
|
for _, mount := range q.mounts {
|
|
mount.QueueMetadata(&metadata.Packet{
|
|
//TODO: carry sample rate error
|
|
SampleNumber: (q.duration.Load() * int64(queue.GetSampleRate())) / int64(time.Second),
|
|
TrackEntry: e,
|
|
})
|
|
}
|
|
} else {
|
|
log.Printf("now playing \"%s\": %s - %s (%s)\n", entry.Path, entry.Metadata.Title, entry.Metadata.Artist, entry.Metadata.Album)
|
|
}
|
|
}
|
|
|
|
endCallback := func(queue *queue.Queue, entry *queue.Entry) {
|
|
|
|
}
|
|
|
|
removeCallback := func(queue *queue.Queue, entry *queue.Entry) {
|
|
//TODO: carry sample rate error
|
|
q.duration.Add(int64((time.Second * time.Duration(entry.ReadSamples.Load())) / time.Duration(entry.Source.GetSampleRate())))
|
|
|
|
q.Remove(entry.Identifier)
|
|
q.HandleQueue()
|
|
}
|
|
|
|
q.mutex.Lock()
|
|
defer q.mutex.Unlock()
|
|
|
|
if q.config.Queue.Length > 0 && len(q.queue) >= q.config.Queue.Length {
|
|
return errors.New("queue too long")
|
|
}
|
|
|
|
source := entry.Source()
|
|
if q.config.Queue.ReplayGain {
|
|
if entry.Metadata.ReplayGain.TrackPeak != 0 {
|
|
source = replaygain.NewReplayGainFilter(entry.Metadata.ReplayGain.TrackGain, entry.Metadata.ReplayGain.TrackPeak, 0).Process(source)
|
|
} else {
|
|
source = replaygain.NewNormalizationFilter(5).Process(source)
|
|
}
|
|
}
|
|
|
|
if tail {
|
|
entry.Identifier = q.audioQueue.AddTail(source, startCallback, endCallback, removeCallback)
|
|
} else {
|
|
entry.Identifier = q.audioQueue.AddHead(source, startCallback, endCallback, removeCallback)
|
|
}
|
|
|
|
entry.Original["queue_id"] = entry.Identifier
|
|
|
|
if tail || len(q.queue) == 0 {
|
|
q.queue = append(q.queue, entry)
|
|
} else {
|
|
q.queue = append(q.queue[:1], append([]*track.Entry{entry}, q.queue[1:]...)...)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (q *Queue) HandleQueue() {
|
|
if q.audioQueue.GetQueueSize() == 0 {
|
|
if err := q.AddTrack(<-q.QueueEmpty, true); err != nil {
|
|
log.Printf("track addition error: \"%s\"", err)
|
|
|
|
//TODO: maybe fail after n tries
|
|
time.Sleep(time.Second)
|
|
|
|
q.HandleQueue()
|
|
}
|
|
}
|
|
}
|
|
|
|
func (q *Queue) GetQueue() (result []*track.Entry) {
|
|
q.mutex.RLock()
|
|
defer q.mutex.RUnlock()
|
|
|
|
if len(q.queue) > 1 {
|
|
result = slices.Clone(q.queue[1:])
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
func (q *Queue) Get(identifier queue.Identifier) *track.Entry {
|
|
q.mutex.RLock()
|
|
defer q.mutex.RUnlock()
|
|
for _, e := range q.queue {
|
|
if e.Identifier == identifier {
|
|
return e
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (q *Queue) GetNowPlaying() *track.Entry {
|
|
if e := q.audioQueue.GetQueueHead(); e != nil {
|
|
return q.Get(e.Identifier)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (q *Queue) SkipNowPlaying() bool {
|
|
if e := q.audioQueue.GetQueueHead(); e != nil {
|
|
return q.Remove(e.Identifier)
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
func (q *Queue) GetIndex(index int) *track.Entry {
|
|
if e := q.audioQueue.GetQueueIndex(index + 1); e != nil {
|
|
return q.Get(e.Identifier)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (q *Queue) GetHead() *track.Entry {
|
|
if e := q.audioQueue.GetQueueIndex(1); e != nil {
|
|
return q.Get(e.Identifier)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (q *Queue) GetTail() *track.Entry {
|
|
if i, e := q.audioQueue.GetQueueTail(); i != 0 && e != nil {
|
|
return q.Get(e.Identifier)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (q *Queue) Remove(identifier queue.Identifier) bool {
|
|
var entry *track.Entry
|
|
func() {
|
|
defer q.audioQueue.Remove(identifier)
|
|
q.mutex.Lock()
|
|
defer q.mutex.Unlock()
|
|
|
|
for i, e := range q.queue {
|
|
if e.Identifier == identifier {
|
|
q.queue = slices.Delete(q.queue, i, i+1)
|
|
entry = e
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
|
|
if entry != nil {
|
|
_ = entry.Close()
|
|
return true
|
|
}
|
|
return false
|
|
}
|
|
|
|
func (q *Queue) RemoveListener(identifier string) bool {
|
|
q.mutex.RLock()
|
|
defer q.mutex.RUnlock()
|
|
for _, mount := range q.mounts {
|
|
if mount.RemoveListener(identifier) {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func (q *Queue) GetListeners() (listeners []*listener.Information) {
|
|
q.mutex.RLock()
|
|
defer q.mutex.RUnlock()
|
|
|
|
listeners = make([]*listener.Information, 0, 1)
|
|
|
|
for _, mount := range q.mounts {
|
|
listeners = append(listeners, mount.GetListeners()...)
|
|
}
|
|
return
|
|
}
|
|
|
|
func (q *Queue) HandleRadioRequest(writer http.ResponseWriter, request *http.Request) {
|
|
writer.Header().Set("Server", "MeteorLight/radio")
|
|
writer.Header().Set("Connection", "close")
|
|
writer.Header().Set("X-Content-Type-Options", "nosniff")
|
|
writer.Header().Set("Access-Control-Allow-Origin", "*")
|
|
writer.Header().Set("Access-Control-Allow-Headers", "Accept, Content-Type, Icy-Metadata")
|
|
writer.Header().Set("Accept-Ranges", "none")
|
|
writer.Header().Set("Connection", "close")
|
|
|
|
if strings.HasSuffix(request.URL.Path, "mounts") {
|
|
writer.Header().Set("Content-Type", "application/json; charset=utf-8")
|
|
writer.Header().Set("Access-Control-Expose-Headers", "Accept-Ranges, Server, Content-Type")
|
|
|
|
type mountData struct {
|
|
Path string `json:"mount"`
|
|
MimeType string `json:"mime"`
|
|
FormatDescription string `json:"formatDescription"`
|
|
SampleRate int `json:"sampleRate"`
|
|
Channels int `json:"channels"`
|
|
Listeners int `json:"listeners"`
|
|
Options map[string]interface{} `json:"options"`
|
|
}
|
|
|
|
var mounts []mountData
|
|
|
|
for _, mount := range q.mounts {
|
|
mounts = append(mounts, mountData{
|
|
Path: strings.TrimSuffix(request.URL.Path, "mounts") + mount.Mount,
|
|
MimeType: mount.MimeType,
|
|
SampleRate: mount.SampleRate,
|
|
FormatDescription: mount.FormatDescription,
|
|
Channels: mount.Channels,
|
|
Listeners: len(mount.GetListeners()),
|
|
Options: mount.Options,
|
|
})
|
|
}
|
|
|
|
jsonBytes, _ := json.MarshalIndent(mounts, "", " ")
|
|
|
|
writer.WriteHeader(http.StatusOK)
|
|
_, _ = writer.Write(jsonBytes)
|
|
return
|
|
}
|
|
|
|
for _, mount := range q.mounts {
|
|
if strings.HasSuffix(request.URL.Path, mount.Mount) {
|
|
writer.Header().Set("Content-Type", mount.MimeType)
|
|
writer.Header().Set("Cache-Control", "no-store, max-age=604800")
|
|
writer.Header().Set("Access-Control-Expose-Headers", "Accept-Ranges, Server, Content-Type, Icy-MetaInt, X-Listener-Identifier")
|
|
writer.Header().Set("Vary", "*")
|
|
|
|
rangeHeader := request.Header.Get("range")
|
|
if rangeHeader != "" && rangeHeader != "bytes=0-" {
|
|
//TODO: maybe should fail in case bytes are requested
|
|
|
|
if strings.Index(request.UserAgent(), " Safari/") != -1 && mount.MimeType == "audio/flac" {
|
|
//Safari special case, fake Range check so it decodes afterwards.
|
|
//Safari creates a request with Range for 0-1, specifically for FLAC, and expects a result supporting range. Afterwards it requests the whole file.
|
|
//However the decoder is able to decode FLAC livestreams. If we fake the initial range response, then afterwards serve normal responses, Safari will happily work.
|
|
//TODO: remove this AS SOON as safari works on its own
|
|
//safariLargeFileValue arbitrary large value, cannot be that large or iOS Safari fails.
|
|
safariLargeFileValue := 1024 * 1024 * 1024 * 1024 * 16 // 16 TiB
|
|
|
|
if rangeHeader == "bytes=0-1" {
|
|
//first request
|
|
writer.Header().Set("Accept-Ranges", "bytes")
|
|
writer.Header().Set("Content-Range", fmt.Sprintf("bytes 0-1/%d", safariLargeFileValue)) //64 TiB max fake size
|
|
writer.Header().Set("Content-Length", "2")
|
|
writer.WriteHeader(http.StatusPartialContent)
|
|
_, _ = writer.Write([]byte{'f', 'L'})
|
|
return
|
|
} else if rangeHeader == fmt.Sprintf("bytes=0-%d", safariLargeFileValue-1) {
|
|
//second request, serve status 200 to keep retries to a minimum
|
|
writer.Header().Set("Content-Length", fmt.Sprintf("%d", safariLargeFileValue))
|
|
writer.WriteHeader(http.StatusOK)
|
|
} else if strings.HasPrefix(rangeHeader, "bytes=") && strings.HasSuffix(rangeHeader, fmt.Sprintf("-%d", safariLargeFileValue-1)) {
|
|
//any other requests, these should fail
|
|
writer.Header().Set("Content-Range", fmt.Sprintf("bytes %s/%d", strings.TrimPrefix(rangeHeader, "bytes="), safariLargeFileValue))
|
|
writer.Header().Set("Accept-Ranges", "bytes")
|
|
writer.WriteHeader(http.StatusRequestedRangeNotSatisfiable)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
bitrate := 0
|
|
if value, ok := mount.Options["bitrate"]; ok {
|
|
if intValue, ok := value.(int); ok {
|
|
bitrate = intValue
|
|
} else if int64Value, ok := value.(int64); ok {
|
|
bitrate = int(int64Value)
|
|
}
|
|
}
|
|
|
|
//set some audiocast/icy radio headers
|
|
writer.Header().Set("x-audiocast-name", q.config.Radio.Name)
|
|
writer.Header().Set("x-audiocast-bitrate", fmt.Sprintf("%d", bitrate))
|
|
writer.Header().Set("icy-name", q.config.Radio.Name)
|
|
writer.Header().Set("icy-version", "2")
|
|
writer.Header().Set("icy-index-metadata", "1")
|
|
if q.config.Radio.Description != "" {
|
|
writer.Header().Set("x-audiocast-description", q.config.Radio.Description)
|
|
writer.Header().Set("icy-description", q.config.Radio.Description)
|
|
}
|
|
if q.config.Radio.URL != "" {
|
|
writer.Header().Set("x-audiocast-url", q.config.Radio.URL)
|
|
writer.Header().Set("icy-url", q.config.Radio.URL)
|
|
}
|
|
if q.config.Radio.Logo != "" {
|
|
writer.Header().Set("icy-logo", q.config.Radio.Logo)
|
|
}
|
|
writer.Header().Set("icy-br", fmt.Sprintf("%d", bitrate))
|
|
writer.Header().Set("icy-sr", fmt.Sprintf("%d", mount.SampleRate))
|
|
writer.Header().Set("icy-audio-info", fmt.Sprintf("ice-channels=%d;ice-samplerate=%d;ice-bitrate=%d", mount.Channels, mount.SampleRate, bitrate))
|
|
if q.config.Radio.Private {
|
|
writer.Header().Set("icy-pub", "0")
|
|
writer.Header().Set("icy-do-not-index", "1")
|
|
writer.Header().Set("x-audiocast-public", "0")
|
|
writer.Header().Set("x-robots-tag", "noindex, nofollow")
|
|
} else {
|
|
writer.Header().Set("icy-pub", "1")
|
|
writer.Header().Set("icy-do-not-index", "0")
|
|
writer.Header().Set("x-audiocast-public", "1")
|
|
}
|
|
|
|
requestDone := util.RequestDone{}
|
|
|
|
var headers []listener.HeaderEntry
|
|
for k, v := range request.Header {
|
|
for _, s := range v {
|
|
headers = append(headers, listener.HeaderEntry{
|
|
Name: k,
|
|
Value: s,
|
|
})
|
|
}
|
|
}
|
|
|
|
uriPath := request.URL.Path
|
|
if len(request.URL.Query().Encode()) > 0 {
|
|
uriPath += "?" + request.URL.Query().Encode()
|
|
}
|
|
|
|
getKnownBufferSize := func() time.Duration {
|
|
userAgent := request.Header.Get("user-agent")
|
|
if strings.Index(userAgent, "libmpv") != -1 || strings.Index(userAgent, "mpv ") != -1 { //mpv
|
|
return time.Millisecond * 2500
|
|
} else if strings.Index(userAgent, "libvlc") != -1 { //VLC
|
|
return time.Millisecond * 2500
|
|
} else if strings.Index(userAgent, "lavf/") != -1 { //ffplay
|
|
return time.Millisecond * 2500
|
|
} else if strings.Index(userAgent, "gvfs/") != -1 { //gvfs
|
|
return time.Millisecond * 2500
|
|
} else if strings.Index(userAgent, "Music Player Daemon ") != -1 { //MPD
|
|
return time.Millisecond * 2500
|
|
} else if strings.Index(userAgent, " Chrome/") != -1 { //Chromium-based
|
|
return time.Millisecond * 4000
|
|
} else if strings.Index(userAgent, " Safari/") != -1 { //Safari-based
|
|
return time.Millisecond * 5000
|
|
} else if strings.Index(userAgent, " Gecko/") != -1 { //Gecko-based (Firefox)
|
|
return time.Millisecond * 5000
|
|
} else if request.Header.Get("icy-metadata") == "1" { //other unknown players
|
|
return time.Millisecond * 5000
|
|
}
|
|
|
|
//fallback and provide maximum buffer
|
|
return time.Second * config.MaxBufferSize
|
|
}
|
|
|
|
sampleBufferLimit := int64(q.config.Queue.BufferSeconds * mount.SampleRate)
|
|
if q.config.Queue.BufferSeconds == 0 { //auto buffer setup based on user agent and other client headers
|
|
sampleBufferLimit = int64(getKnownBufferSize().Seconds() * float64(mount.SampleRate))
|
|
}
|
|
|
|
startStamp := time.Now().Unix()
|
|
hashSum := sha256.Sum256([]byte(fmt.Sprintf("%s-%s-%s-%s-%d", request.RequestURI, request.RemoteAddr, request.Proto, request.Header.Get("user-agent"), startStamp)))
|
|
listenerIdentifier := hex.EncodeToString(hashSum[16:])
|
|
|
|
listenerInformation := listener.Information{
|
|
Identifier: listenerIdentifier,
|
|
Mount: mount.Mount,
|
|
Path: uriPath,
|
|
Headers: headers,
|
|
Start: startStamp,
|
|
}
|
|
|
|
var mountListener listener.Listener
|
|
var extraHeaders map[string]string
|
|
|
|
ctx := request.Context()
|
|
//buffer a bit, drop channels when buffer grows to not lock others. They will get disconnected elsewhere
|
|
const byteSliceChannelBuffer = 1024 * 16
|
|
writeChannel := make(chan []byte, byteSliceChannelBuffer)
|
|
|
|
go func() {
|
|
var flusher http.Flusher
|
|
if httpFlusher, ok := writer.(http.Flusher); ok {
|
|
flusher = httpFlusher
|
|
}
|
|
|
|
for !requestDone.Done() {
|
|
select {
|
|
case byteSlice := <-writeChannel:
|
|
if _, err := writer.Write(byteSlice); err != nil {
|
|
requestDone.Fail(err)
|
|
break
|
|
}
|
|
//try flush
|
|
if flusher != nil {
|
|
flusher.Flush()
|
|
}
|
|
case <-ctx.Done():
|
|
requestDone.Fail(ctx.Err())
|
|
break
|
|
}
|
|
|
|
}
|
|
}()
|
|
|
|
funcWriter := func(byteSlice []byte) error {
|
|
if requestDone.Done() {
|
|
return requestDone.Error()
|
|
}
|
|
|
|
if len(byteSlice) > 0 {
|
|
if len(writeChannel) >= (cap(writeChannel) - 1) {
|
|
requestDone.Fail(errors.New("client ran out of writer buffer"))
|
|
return requestDone.Error()
|
|
}
|
|
|
|
writeChannel <- byteSlice
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
//set X-Audio-Packet-Stream for strictly timed packets and metadata
|
|
if numberValue, err := strconv.Atoi(request.Header.Get("x-audio-packet-stream")); err == nil && numberValue == 1 {
|
|
//version 1
|
|
mountListener, extraHeaders = aps1.NewListener(listenerInformation, funcWriter, sampleBufferLimit, mount.OffsetStart, mount.Channels, mount.SampleRate, mount.MimeType)
|
|
} else if numberValue, err = strconv.Atoi(request.Header.Get("icy-metadata")); err == nil && numberValue >= 1 {
|
|
mountListener, extraHeaders = icy.NewListener(listenerInformation, funcWriter, sampleBufferLimit, mount.OffsetStart)
|
|
} else {
|
|
mountListener, extraHeaders = plain.NewListener(listenerInformation, funcWriter, sampleBufferLimit, mount.OffsetStart)
|
|
}
|
|
|
|
if mountListener == nil {
|
|
writer.WriteHeader(http.StatusInternalServerError)
|
|
return
|
|
}
|
|
|
|
for k, v := range extraHeaders {
|
|
writer.Header().Set(k, v)
|
|
}
|
|
|
|
writer.Header().Set("x-listener-identifier", listenerIdentifier)
|
|
|
|
log.Printf("adding %s client to stream %s (%s, %s, agent \"%s\", buffer %.2f seconds)\n", listenerIdentifier, mount.Mount, request.RemoteAddr, request.Proto, request.Header.Get("user-agent"), float64(sampleBufferLimit)/float64(mount.SampleRate))
|
|
mount.AddListener(mountListener)
|
|
|
|
mountListener.Wait()
|
|
|
|
requestDone.Complete()
|
|
|
|
log.Printf("removing %s client from stream %s\n", listenerIdentifier, mount.Mount)
|
|
return
|
|
}
|
|
}
|
|
|
|
writer.WriteHeader(http.StatusNotFound)
|
|
return
|
|
}
|