diff --git a/README.md b/README.md index e5e38f5..33abc3e 100644 --- a/README.md +++ b/README.md @@ -10,7 +10,7 @@ Radio streamer ([kawa](https://github.com/Luminarys/kawa) drop-in compatible). * Normalized channels / sample rates for mounts. * Implements ICY metadata (artist, title, url). * Uses sample/timed packet buffers, instead of kawa byte buffers, which caused wild differences between endpoints. Mounts usually align within 0.2s of each other, depending on client. - * Use `queue.buffer_size` to specify number of seconds to buffer. + * Use `queue.buffer_size` to specify number of seconds to buffer (by default 0, automatic per client). * Implements `queue.nr` and `/random` (to be deprecated/changed) # Future improvements diff --git a/example_config.toml b/example_config.toml index 3d8a717..3ec3412 100644 --- a/example_config.toml +++ b/example_config.toml @@ -18,6 +18,7 @@ host="127.0.0.1" # } # # The path is the path to an audio file on the filesystem you want MeteorLight to play. +# Additionally, the "title", "artist" and "art" properties can be included to be used as metadata. random_song_api="http://localhost:8012/api/random" # # An HTTP POST is issued to this URL when MeteorLight starts playing a track. The body diff --git a/queue.go b/queue.go index b25fb5d..9e68f10 100644 --- a/queue.go +++ b/queue.go @@ -1,6 +1,7 @@ package main import ( + "errors" "fmt" "git.gammaspectra.live/S.O.N.G/Kirika/audio" "git.gammaspectra.live/S.O.N.G/Kirika/audio/format/flac" @@ -8,7 +9,6 @@ import ( "git.gammaspectra.live/S.O.N.G/Kirika/audio/format/opus" "git.gammaspectra.live/S.O.N.G/Kirika/audio/format/tta" "git.gammaspectra.live/S.O.N.G/Kirika/audio/packetizer" - "io" "log" "net/http" "os" @@ -16,6 +16,7 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "time" ) @@ -60,15 +61,16 @@ func (p *QueueMetadataPacket) GetData() []byte { } type Queue struct { - NowPlaying chan *QueueTrackEntry - QueueEmpty chan *QueueTrackEntry - Duration time.Duration - audioQueue *audio.Queue - mounts []*StreamMount - queue []*QueueTrackEntry - mutex sync.RWMutex - config *Config - wg sync.WaitGroup + NowPlaying chan *QueueTrackEntry + QueueEmpty chan *QueueTrackEntry + Duration time.Duration + durationError int64 + audioQueue *audio.Queue + mounts []*StreamMount + queue []*QueueTrackEntry + mutex sync.RWMutex + config *Config + wg sync.WaitGroup } func NewQueue(config *Config) *Queue { @@ -152,11 +154,7 @@ func (q *Queue) AddTrack(entry *QueueTrackEntry, tail bool) error { removeCallback := func(queue *audio.Queue, entry *audio.QueueEntry) { defer f.Close() - - q.mutex.Lock() - //TODO: carry error - q.Duration += (time.Second * time.Duration(entry.ReadSamples)) / time.Duration(entry.Source.SampleRate) - q.mutex.Unlock() + atomic.AddInt64((*int64)(&q.Duration), int64((time.Second*time.Duration(entry.ReadSamples))/time.Duration(entry.Source.SampleRate))) q.Remove(entry.Identifier) q.HandleQueue() @@ -267,105 +265,6 @@ func (q *Queue) Remove(identifier audio.QueueIdentifier) bool { return false } -type httpAudioWriter struct { - timeout time.Duration - writer http.ResponseWriter - metadataToSend struct { - Title string - URL string - } - icyInterval int - icyCounter int -} - -func (h *httpAudioWriter) writeIcy() error { - packetContent := make([]byte, 1, 4096) - if len(h.metadataToSend.Title) > 0 { - //TODO: quote quotes - packetContent = append(packetContent, []byte(fmt.Sprintf("StreamTitle='%s';", strings.ReplaceAll(h.metadataToSend.Title, "'", "")))...) - h.metadataToSend.Title = "" - } - if len(h.metadataToSend.URL) > 0 { - //TODO: quote quotes - packetContent = append(packetContent, []byte(fmt.Sprintf("StreamURL='%s';", strings.ReplaceAll(h.metadataToSend.URL, "'", "")))...) - h.metadataToSend.URL = "" - } - - contentLength := len(packetContent) - 1 - if contentLength > 16*255 { - //cannot send long titles - _, err := h.writer.Write(make([]byte, 1)) - return err - } - - if (contentLength % 16) == 0 { //already padded - packetContent[0] = byte(contentLength / 16) - } else { - packetContent[0] = byte(contentLength/16) + 1 - packetContent = append(packetContent, make([]byte, 16-(contentLength%16))...) - } - - _, err := h.writer.Write(packetContent) - return err -} - -func (h *httpAudioWriter) Write(p []byte) (n int, err error) { - if h.writer != nil { - if h.icyInterval > 0 { - var i int - for len(p) > 0 { - l := h.icyInterval - h.icyCounter - if l <= len(p) { - i, err = h.writer.Write(p[:l]) - n += i - if err != nil { - h.writer = nil - break - } - - if err = h.writeIcy(); err != nil { - h.writer = nil - break - } - - h.icyCounter = 0 - p = p[l:] - } else { - i, err = h.writer.Write(p) - n += i - if err != nil { - h.writer = nil - break - } - h.icyCounter += i - p = p[:0] - } - } - } else { - n, err = h.writer.Write(p) - if err != nil { - h.writer = nil - } - } - return - } - return 0, io.EOF -} - -func (h *httpAudioWriter) Close() (err error) { - h.writer = nil - return nil -} - -func (h *httpAudioWriter) Flush() { - if h.writer != nil { - //TODO: not deadline aware? - /*if flusher, ok := h.writer.(http.Flusher); ok { - flusher.Flush() - }*/ - } -} - func (q *Queue) GetListeners() (listeners []*ListenerInformation) { q.mutex.RLock() defer q.mutex.RUnlock() @@ -389,39 +288,122 @@ func (q *Queue) HandleRadioRequest(writer http.ResponseWriter, request *http.Req writer.Header().Set("Cache-Control", "no-store, max-age=0, no-transform") writer.Header().Set("X-Content-Type-Options", "nosniff") - byteWriter := &httpAudioWriter{writer: writer, timeout: time.Second * 2} - - if numberValue, err := strconv.Atoi(request.Header.Get("icy-metadata")); err == nil && numberValue >= 1 { - byteWriter.icyInterval = 8192 - writer.Header().Set("Icy-MetaInt", fmt.Sprintf("%d", byteWriter.icyInterval)) - } + var packetWriteCallback func(packet packetizer.Packet) error + //buffer a bit, drop channels when buffer grows to not lock others. They will get disconnected elsewhere + const byteSliceChannelBuffer = 128 + writeChannel := make(chan []byte, byteSliceChannelBuffer) + var requestDone error var wgClient sync.WaitGroup - writeCallback := func(packet packetizer.Packet) error { - if metadataPacket, ok := packet.(*QueueMetadataPacket); ok { - if len(metadataPacket.TrackEntry.Metadata.Artist) > 0 { - byteWriter.metadataToSend.Title = fmt.Sprintf("%s - %s", metadataPacket.TrackEntry.Metadata.Artist, metadataPacket.TrackEntry.Metadata.Title) - } else { - byteWriter.metadataToSend.Title = metadataPacket.TrackEntry.Metadata.Title - } - byteWriter.metadataToSend.URL = metadataPacket.TrackEntry.Metadata.Art - return nil - } - //TODO: icy - /* - select { - case <-request.Context().Done(): - // Client gave up - default: + if numberValue, err := strconv.Atoi(request.Header.Get("icy-metadata")); err == nil && numberValue >= 1 { + metadataToSend := make(map[string]string) + const icyInterval = 8192 + icyCounter := 0 + writer.Header().Set("Icy-MetaInt", fmt.Sprintf("%d", icyInterval)) + + writeIcy := func() []byte { + packetContent := make([]byte, 1, 4096) + for k, v := range metadataToSend { + packetContent = append(packetContent, []byte(fmt.Sprintf("%s='%s';", k, v))...) + delete(metadataToSend, k) + + //shouldn't send multiple properties in same packet if we want working single quotes, wait until next ICY frame + break } - */ - _, err := byteWriter.Write(packet.GetData()) - byteWriter.Flush() - return err + contentLength := len(packetContent) - 1 + if contentLength > 16*255 { + //cannot send long titles + return make([]byte, 1) + } + + if (contentLength % 16) == 0 { //already padded + packetContent[0] = byte(contentLength / 16) + } else { + packetContent[0] = byte(contentLength/16) + 1 + packetContent = append(packetContent, make([]byte, 16-(contentLength%16))...) + } + + return packetContent + } + + packetWriteCallback = func(packet packetizer.Packet) error { + if requestDone != nil { + return requestDone + } + if metadataPacket, ok := packet.(*QueueMetadataPacket); ok { + if len(metadataPacket.TrackEntry.Metadata.Artist) > 0 { + metadataToSend["StreamTitle"] = fmt.Sprintf("%s - %s", metadataPacket.TrackEntry.Metadata.Artist, metadataPacket.TrackEntry.Metadata.Title) + } else { + metadataToSend["StreamTitle"] = metadataPacket.TrackEntry.Metadata.Title + } + if len(metadataPacket.TrackEntry.Metadata.Art) > 0 { + metadataToSend["StreamURL"] = metadataPacket.TrackEntry.Metadata.Art + } + return nil + } + + p := packet.GetData() + var data []byte + for len(p) > 0 { + l := icyInterval - icyCounter + if l <= len(p) { + data = append(data, p[:l]...) + data = append(data, writeIcy()...) + icyCounter = 0 + p = p[l:] + } else { + data = append(data, p...) + icyCounter += len(p) + p = p[:0] + } + } + + if len(writeChannel) >= byteSliceChannelBuffer-1 { + requestDone = errors.New("client ran out of buffer") + return requestDone + } + writeChannel <- data + return nil + } + } else { + packetWriteCallback = func(packet packetizer.Packet) error { + if requestDone != nil { + return requestDone + } + if _, ok := packet.(*QueueMetadataPacket); ok { + return nil + } + + if len(writeChannel) >= byteSliceChannelBuffer-1 { + requestDone = errors.New("client ran out of buffer") + return requestDone + } + writeChannel <- packet.GetData() + return nil + } } + wgClient.Add(1) + go func() { + defer wgClient.Done() + var flusher http.Flusher + if httpFlusher, ok := writer.(http.Flusher); ok { + flusher = httpFlusher + } + + for byteSlice := range writeChannel { + if _, requestDone = writer.Write(byteSlice); requestDone != nil { + break + } + //try flush + if flusher != nil { + flusher.Flush() + } + } + + }() var headers []HeaderEntry for k, v := range request.Header { @@ -469,6 +451,7 @@ func (q *Queue) HandleRadioRequest(writer http.ResponseWriter, request *http.Req sampleBufferLimit = int64(getKnownBufferSize().Seconds() * float64(mount.SampleRate)) } + wgClient.Add(1) mount.AddListener(&StreamListener{ Information: ListenerInformation{ Mount: mount.Mount, @@ -480,7 +463,7 @@ func (q *Queue) HandleRadioRequest(writer http.ResponseWriter, request *http.Req sampleBufferMin := packets[len(packets)-1].GetStartSampleNumber() - sampleBufferLimit for _, p := range packets { if p.KeepMode() != packetizer.Discard || p.GetEndSampleNumber() >= sampleBufferMin { - if err := writeCallback(p); err != nil { + if err := packetWriteCallback(p); err != nil { return err } } @@ -488,10 +471,10 @@ func (q *Queue) HandleRadioRequest(writer http.ResponseWriter, request *http.Req } return nil }, - Write: writeCallback, + Write: packetWriteCallback, Close: func() { - byteWriter.Close() - wgClient.Done() + defer wgClient.Done() + close(writeChannel) }, }) wgClient.Wait()