package queue import ( "crypto/sha256" "encoding/hex" "encoding/json" "errors" "fmt" "git.gammaspectra.live/S.O.N.G/Kirika/audio" "git.gammaspectra.live/S.O.N.G/Kirika/audio/filter" "git.gammaspectra.live/S.O.N.G/Kirika/audio/queue" "git.gammaspectra.live/S.O.N.G/Kirika/audio/replaygain" "git.gammaspectra.live/S.O.N.G/MeteorLight/config" "git.gammaspectra.live/S.O.N.G/MeteorLight/listener" "git.gammaspectra.live/S.O.N.G/MeteorLight/listener/aps1" "git.gammaspectra.live/S.O.N.G/MeteorLight/listener/icy" "git.gammaspectra.live/S.O.N.G/MeteorLight/listener/plain" "git.gammaspectra.live/S.O.N.G/MeteorLight/queue/metadata" "git.gammaspectra.live/S.O.N.G/MeteorLight/queue/track" "git.gammaspectra.live/S.O.N.G/MeteorLight/stream" "git.gammaspectra.live/S.O.N.G/MeteorLight/util" "golang.org/x/exp/slices" "log" "net/http" "strconv" "strings" "sync" "sync/atomic" "time" ) type Queue struct { NowPlaying chan *track.Entry QueueEmpty chan *track.Entry duration atomic.Int64 durationError int64 audioQueue *queue.Queue mounts []*stream.Mount queue []*track.Entry mutex sync.RWMutex config *config.Config wg sync.WaitGroup } func NewQueue(conf *config.Config) *Queue { if conf.Queue.SampleRate <= 0 { conf.Queue.SampleRate = 44100 } sampleFormat := audio.SourceInt16 bitDepth := 16 switch conf.Queue.SampleFormat { case "f32", "float", "float32", "f32le": sampleFormat = audio.SourceFloat32 bitDepth = 0 case "i32", "s32", "int32", "int", "s32le": sampleFormat = audio.SourceInt32 bitDepth = 32 case "i16", "s16", "int16", "s16le": sampleFormat = audio.SourceInt16 bitDepth = 16 } if conf.Queue.BitDepth > 0 { bitDepth = conf.Queue.BitDepth } q := &Queue{ NowPlaying: make(chan *track.Entry, 1), QueueEmpty: make(chan *track.Entry), config: conf, audioQueue: queue.NewQueue(sampleFormat, bitDepth, conf.Queue.SampleRate, 2), } blocksPerSecond := 20 sources := filter.NewFilterChain(q.audioQueue.GetSource(), filter.NewBufferFilter(16), filter.NewRealTimeFilter(blocksPerSecond), filter.NewBufferFilter(config.MaxBufferSize*blocksPerSecond)).Split(len(conf.Streams)) for i, s := range q.config.Streams { mount := stream.NewStreamMount(sources[i], s) if mount == nil { log.Panicf("could not initialize %s\n", s.MountPath) } q.mounts = append(q.mounts, mount) q.wg.Add(1) go mount.Process(&q.wg) } return q } func (q *Queue) GetDuration() time.Duration { return time.Duration(q.duration.Load()) } func (q *Queue) Wait() { q.wg.Wait() close(q.NowPlaying) } func (q *Queue) AddTrack(entry *track.Entry, tail bool) error { if err := entry.Load(); err != nil { return err } startCallback := func(queue *queue.Queue, queueEntry *queue.Entry) { if e := q.Get(queueEntry.Identifier); e != nil { //is this needed? log.Printf("now playing \"%s\": %s - %s (%s)\n", e.Path, e.Metadata.Title, e.Metadata.Artist, e.Metadata.Album) q.NowPlaying <- e for _, mount := range q.mounts { mount.QueueMetadata(&metadata.Packet{ //TODO: carry sample rate error SampleNumber: (q.duration.Load() * int64(queue.GetSampleRate())) / int64(time.Second), TrackEntry: e, }) } } else { log.Printf("now playing \"%s\": %s - %s (%s)\n", entry.Path, entry.Metadata.Title, entry.Metadata.Artist, entry.Metadata.Album) } } endCallback := func(queue *queue.Queue, entry *queue.Entry) { } removeCallback := func(queue *queue.Queue, entry *queue.Entry) { //TODO: carry sample rate error q.duration.Add(int64((time.Second * time.Duration(entry.ReadSamples.Load())) / time.Duration(entry.Source.GetSampleRate()))) q.Remove(entry.Identifier) q.HandleQueue() } q.mutex.Lock() defer q.mutex.Unlock() if q.config.Queue.Length > 0 && len(q.queue) >= q.config.Queue.Length { return errors.New("queue too long") } source := entry.Source() if q.config.Queue.ReplayGain { if entry.Metadata.ReplayGain.TrackPeak != 0 { source = replaygain.NewReplayGainFilter(entry.Metadata.ReplayGain.TrackGain, entry.Metadata.ReplayGain.TrackPeak, 0).Process(source) } else { source = replaygain.NewNormalizationFilter(5).Process(source) } } if tail { entry.Identifier = q.audioQueue.AddTail(source, startCallback, endCallback, removeCallback) } else { entry.Identifier = q.audioQueue.AddHead(source, startCallback, endCallback, removeCallback) } entry.Original["queue_id"] = entry.Identifier if tail || len(q.queue) == 0 { q.queue = append(q.queue, entry) } else { q.queue = append(q.queue[:1], append([]*track.Entry{entry}, q.queue[1:]...)...) } return nil } func (q *Queue) HandleQueue() { if q.audioQueue.GetQueueSize() == 0 { if err := q.AddTrack(<-q.QueueEmpty, true); err != nil { log.Printf("track addition error: \"%s\"", err) //TODO: maybe fail after n tries time.Sleep(time.Second) q.HandleQueue() } } } func (q *Queue) GetQueue() (result []*track.Entry) { q.mutex.RLock() defer q.mutex.RUnlock() if len(q.queue) > 1 { result = slices.Clone(q.queue[1:]) } return } func (q *Queue) Get(identifier queue.Identifier) *track.Entry { q.mutex.RLock() defer q.mutex.RUnlock() for _, e := range q.queue { if e.Identifier == identifier { return e } } return nil } func (q *Queue) GetNowPlaying() *track.Entry { if e := q.audioQueue.GetQueueHead(); e != nil { return q.Get(e.Identifier) } return nil } func (q *Queue) SkipNowPlaying() bool { if e := q.audioQueue.GetQueueHead(); e != nil { return q.Remove(e.Identifier) } return false } func (q *Queue) GetIndex(index int) *track.Entry { if e := q.audioQueue.GetQueueIndex(index + 1); e != nil { return q.Get(e.Identifier) } return nil } func (q *Queue) GetHead() *track.Entry { if e := q.audioQueue.GetQueueIndex(1); e != nil { return q.Get(e.Identifier) } return nil } func (q *Queue) GetTail() *track.Entry { if i, e := q.audioQueue.GetQueueTail(); i != 0 && e != nil { return q.Get(e.Identifier) } return nil } func (q *Queue) Remove(identifier queue.Identifier) bool { var entry *track.Entry func() { defer q.audioQueue.Remove(identifier) q.mutex.Lock() defer q.mutex.Unlock() for i, e := range q.queue { if e.Identifier == identifier { q.queue = slices.Delete(q.queue, i, i+1) entry = e return } } }() if entry != nil { _ = entry.Close() return true } return false } func (q *Queue) RemoveListener(identifier string) bool { q.mutex.RLock() defer q.mutex.RUnlock() for _, mount := range q.mounts { if mount.RemoveListener(identifier) { return true } } return false } func (q *Queue) GetListeners() (listeners []*listener.Information) { q.mutex.RLock() defer q.mutex.RUnlock() listeners = make([]*listener.Information, 0, 1) for _, mount := range q.mounts { listeners = append(listeners, mount.GetListeners()...) } return } func (q *Queue) HandleRadioRequest(writer http.ResponseWriter, request *http.Request) { writer.Header().Set("Server", "MeteorLight/radio") writer.Header().Set("Connection", "close") writer.Header().Set("X-Content-Type-Options", "nosniff") writer.Header().Set("Access-Control-Allow-Origin", "*") writer.Header().Set("Access-Control-Allow-Headers", "Accept, Content-Type, Icy-Metadata") writer.Header().Set("Accept-Ranges", "none") writer.Header().Set("Connection", "close") if strings.HasSuffix(request.URL.Path, "mounts") { writer.Header().Set("Content-Type", "application/json; charset=utf-8") writer.Header().Set("Access-Control-Expose-Headers", "Accept-Ranges, Server, Content-Type") type mountData struct { Path string `json:"mount"` MimeType string `json:"mime"` FormatDescription string `json:"formatDescription"` SampleRate int `json:"sampleRate"` Channels int `json:"channels"` Listeners int `json:"listeners"` Options map[string]interface{} `json:"options"` } var mounts []mountData for _, mount := range q.mounts { mounts = append(mounts, mountData{ Path: strings.TrimSuffix(request.URL.Path, "mounts") + mount.Mount, MimeType: mount.MimeType, SampleRate: mount.SampleRate, FormatDescription: mount.FormatDescription, Channels: mount.Channels, Listeners: len(mount.GetListeners()), Options: mount.Options, }) } jsonBytes, _ := json.MarshalIndent(mounts, "", " ") writer.WriteHeader(http.StatusOK) _, _ = writer.Write(jsonBytes) return } for _, mount := range q.mounts { if strings.HasSuffix(request.URL.Path, mount.Mount) { writer.Header().Set("Content-Type", mount.MimeType) writer.Header().Set("Cache-Control", "no-store, max-age=604800") writer.Header().Set("Access-Control-Expose-Headers", "Accept-Ranges, Server, Content-Type, Icy-MetaInt, X-Listener-Identifier") writer.Header().Set("Vary", "*") rangeHeader := request.Header.Get("range") if rangeHeader != "" && rangeHeader != "bytes=0-" { //TODO: maybe should fail in case bytes are requested if strings.Index(request.UserAgent(), " Safari/") != -1 && mount.MimeType == "audio/flac" { //Safari special case, fake Range check so it decodes afterwards. //Safari creates a request with Range for 0-1, specifically for FLAC, and expects a result supporting range. Afterwards it requests the whole file. //However the decoder is able to decode FLAC livestreams. If we fake the initial range response, then afterwards serve normal responses, Safari will happily work. //TODO: remove this AS SOON as safari works on its own //safariLargeFileValue arbitrary large value, cannot be that large or iOS Safari fails. safariLargeFileValue := 1024 * 1024 * 1024 * 1024 * 16 // 16 TiB if rangeHeader == "bytes=0-1" { //first request writer.Header().Set("Accept-Ranges", "bytes") writer.Header().Set("Content-Range", fmt.Sprintf("bytes 0-1/%d", safariLargeFileValue)) //64 TiB max fake size writer.Header().Set("Content-Length", "2") writer.WriteHeader(http.StatusPartialContent) _, _ = writer.Write([]byte{'f', 'L'}) return } else if rangeHeader == fmt.Sprintf("bytes=0-%d", safariLargeFileValue-1) { //second request, serve status 200 to keep retries to a minimum writer.Header().Set("Content-Length", fmt.Sprintf("%d", safariLargeFileValue)) writer.WriteHeader(http.StatusOK) } else if strings.HasPrefix(rangeHeader, "bytes=") && strings.HasSuffix(rangeHeader, fmt.Sprintf("-%d", safariLargeFileValue-1)) { //any other requests, these should fail writer.Header().Set("Content-Range", fmt.Sprintf("bytes %s/%d", strings.TrimPrefix(rangeHeader, "bytes="), safariLargeFileValue)) writer.Header().Set("Accept-Ranges", "bytes") writer.WriteHeader(http.StatusRequestedRangeNotSatisfiable) return } } } bitrate := 0 if value, ok := mount.Options["bitrate"]; ok { if intValue, ok := value.(int); ok { bitrate = intValue } else if int64Value, ok := value.(int64); ok { bitrate = int(int64Value) } } //set some audiocast/icy radio headers writer.Header().Set("x-audiocast-name", q.config.Radio.Name) writer.Header().Set("x-audiocast-bitrate", fmt.Sprintf("%d", bitrate)) writer.Header().Set("icy-name", q.config.Radio.Name) writer.Header().Set("icy-version", "2") writer.Header().Set("icy-index-metadata", "1") if q.config.Radio.Description != "" { writer.Header().Set("x-audiocast-description", q.config.Radio.Description) writer.Header().Set("icy-description", q.config.Radio.Description) } if q.config.Radio.URL != "" { writer.Header().Set("x-audiocast-url", q.config.Radio.URL) writer.Header().Set("icy-url", q.config.Radio.URL) } if q.config.Radio.Logo != "" { writer.Header().Set("icy-logo", q.config.Radio.Logo) } writer.Header().Set("icy-br", fmt.Sprintf("%d", bitrate)) writer.Header().Set("icy-sr", fmt.Sprintf("%d", mount.SampleRate)) writer.Header().Set("icy-audio-info", fmt.Sprintf("ice-channels=%d;ice-samplerate=%d;ice-bitrate=%d", mount.Channels, mount.SampleRate, bitrate)) if q.config.Radio.Private { writer.Header().Set("icy-pub", "0") writer.Header().Set("icy-do-not-index", "1") writer.Header().Set("x-audiocast-public", "0") writer.Header().Set("x-robots-tag", "noindex, nofollow") } else { writer.Header().Set("icy-pub", "1") writer.Header().Set("icy-do-not-index", "0") writer.Header().Set("x-audiocast-public", "1") } requestDone := util.RequestDone{} var headers []listener.HeaderEntry for k, v := range request.Header { for _, s := range v { headers = append(headers, listener.HeaderEntry{ Name: k, Value: s, }) } } uriPath := request.URL.Path if len(request.URL.Query().Encode()) > 0 { uriPath += "?" + request.URL.Query().Encode() } getKnownBufferSize := func() time.Duration { userAgent := request.Header.Get("user-agent") if strings.Index(userAgent, "libmpv") != -1 || strings.Index(userAgent, "mpv ") != -1 { //mpv return time.Millisecond * 2500 } else if strings.Index(userAgent, "libvlc") != -1 { //VLC return time.Millisecond * 2500 } else if strings.Index(userAgent, "lavf/") != -1 { //ffplay return time.Millisecond * 2500 } else if strings.Index(userAgent, "gvfs/") != -1 { //gvfs return time.Millisecond * 2500 } else if strings.Index(userAgent, "Music Player Daemon ") != -1 { //MPD return time.Millisecond * 2500 } else if strings.Index(userAgent, " Chrome/") != -1 { //Chromium-based return time.Millisecond * 4000 } else if strings.Index(userAgent, " Safari/") != -1 { //Safari-based return time.Millisecond * 5000 } else if strings.Index(userAgent, " Gecko/") != -1 { //Gecko-based (Firefox) return time.Millisecond * 5000 } else if request.Header.Get("icy-metadata") == "1" { //other unknown players return time.Millisecond * 5000 } //fallback and provide maximum buffer return time.Second * config.MaxBufferSize } sampleBufferLimit := int64(q.config.Queue.BufferSeconds * mount.SampleRate) if q.config.Queue.BufferSeconds == 0 { //auto buffer setup based on user agent and other client headers sampleBufferLimit = int64(getKnownBufferSize().Seconds() * float64(mount.SampleRate)) } startStamp := time.Now().Unix() hashSum := sha256.Sum256([]byte(fmt.Sprintf("%s-%s-%s-%s-%d", request.RequestURI, request.RemoteAddr, request.Proto, request.Header.Get("user-agent"), startStamp))) listenerIdentifier := hex.EncodeToString(hashSum[16:]) listenerInformation := listener.Information{ Identifier: listenerIdentifier, Mount: mount.Mount, Path: uriPath, Headers: headers, Start: startStamp, } var mountListener listener.Listener var extraHeaders map[string]string ctx := request.Context() //buffer a bit, drop channels when buffer grows to not lock others. They will get disconnected elsewhere const byteSliceChannelBuffer = 1024 * 16 writeChannel := make(chan []byte, byteSliceChannelBuffer) go func() { var flusher http.Flusher if httpFlusher, ok := writer.(http.Flusher); ok { flusher = httpFlusher } for !requestDone.Done() { select { case byteSlice := <-writeChannel: if _, err := writer.Write(byteSlice); err != nil { requestDone.Fail(err) break } //try flush if flusher != nil { flusher.Flush() } case <-ctx.Done(): requestDone.Fail(ctx.Err()) break } } }() funcWriter := func(byteSlice []byte) error { if requestDone.Done() { return requestDone.Error() } if len(byteSlice) > 0 { if len(writeChannel) >= (cap(writeChannel) - 1) { requestDone.Fail(errors.New("client ran out of writer buffer")) return requestDone.Error() } writeChannel <- byteSlice } return nil } //set X-Audio-Packet-Stream for strictly timed packets and metadata if numberValue, err := strconv.Atoi(request.Header.Get("x-audio-packet-stream")); err == nil && numberValue == 1 { //version 1 mountListener, extraHeaders = aps1.NewListener(listenerInformation, funcWriter, sampleBufferLimit, mount.OffsetStart, mount.Channels, mount.SampleRate, mount.MimeType) } else if numberValue, err = strconv.Atoi(request.Header.Get("icy-metadata")); err == nil && numberValue >= 1 { mountListener, extraHeaders = icy.NewListener(listenerInformation, funcWriter, sampleBufferLimit, mount.OffsetStart) } else { mountListener, extraHeaders = plain.NewListener(listenerInformation, funcWriter, sampleBufferLimit, mount.OffsetStart) } if mountListener == nil { writer.WriteHeader(http.StatusInternalServerError) return } for k, v := range extraHeaders { writer.Header().Set(k, v) } writer.Header().Set("x-listener-identifier", listenerIdentifier) log.Printf("adding %s client to stream %s (%s, %s, agent \"%s\", buffer %.2f seconds)\n", listenerIdentifier, mount.Mount, request.RemoteAddr, request.Proto, request.Header.Get("user-agent"), float64(sampleBufferLimit)/float64(mount.SampleRate)) mount.AddListener(mountListener) mountListener.Wait() requestDone.Complete() log.Printf("removing %s client from stream %s\n", listenerIdentifier, mount.Mount) return } } writer.WriteHeader(http.StatusNotFound) return }