Cleanup http writer/flusher/ICY, use channel-based approach
All checks were successful
continuous-integration/drone/push Build is passing

This commit is contained in:
DataHoarder 2022-03-05 10:34:53 +01:00
parent 4f354bd813
commit 5396dfc036
3 changed files with 129 additions and 145 deletions

View file

@ -10,7 +10,7 @@ Radio streamer ([kawa](https://github.com/Luminarys/kawa) drop-in compatible).
* Normalized channels / sample rates for mounts.
* Implements ICY metadata (artist, title, url).
* Uses sample/timed packet buffers, instead of kawa byte buffers, which caused wild differences between endpoints. Mounts usually align within 0.2s of each other, depending on client.
* Use `queue.buffer_size` to specify number of seconds to buffer.
* Use `queue.buffer_size` to specify number of seconds to buffer (by default 0, automatic per client).
* Implements `queue.nr` and `/random` (to be deprecated/changed)
# Future improvements

View file

@ -18,6 +18,7 @@ host="127.0.0.1"
# }
#
# The path is the path to an audio file on the filesystem you want MeteorLight to play.
# Additionally, the "title", "artist" and "art" properties can be included to be used as metadata.
random_song_api="http://localhost:8012/api/random"
#
# An HTTP POST is issued to this URL when MeteorLight starts playing a track. The body

271
queue.go
View file

@ -1,6 +1,7 @@
package main
import (
"errors"
"fmt"
"git.gammaspectra.live/S.O.N.G/Kirika/audio"
"git.gammaspectra.live/S.O.N.G/Kirika/audio/format/flac"
@ -8,7 +9,6 @@ import (
"git.gammaspectra.live/S.O.N.G/Kirika/audio/format/opus"
"git.gammaspectra.live/S.O.N.G/Kirika/audio/format/tta"
"git.gammaspectra.live/S.O.N.G/Kirika/audio/packetizer"
"io"
"log"
"net/http"
"os"
@ -16,6 +16,7 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
)
@ -60,15 +61,16 @@ func (p *QueueMetadataPacket) GetData() []byte {
}
type Queue struct {
NowPlaying chan *QueueTrackEntry
QueueEmpty chan *QueueTrackEntry
Duration time.Duration
audioQueue *audio.Queue
mounts []*StreamMount
queue []*QueueTrackEntry
mutex sync.RWMutex
config *Config
wg sync.WaitGroup
NowPlaying chan *QueueTrackEntry
QueueEmpty chan *QueueTrackEntry
Duration time.Duration
durationError int64
audioQueue *audio.Queue
mounts []*StreamMount
queue []*QueueTrackEntry
mutex sync.RWMutex
config *Config
wg sync.WaitGroup
}
func NewQueue(config *Config) *Queue {
@ -152,11 +154,7 @@ func (q *Queue) AddTrack(entry *QueueTrackEntry, tail bool) error {
removeCallback := func(queue *audio.Queue, entry *audio.QueueEntry) {
defer f.Close()
q.mutex.Lock()
//TODO: carry error
q.Duration += (time.Second * time.Duration(entry.ReadSamples)) / time.Duration(entry.Source.SampleRate)
q.mutex.Unlock()
atomic.AddInt64((*int64)(&q.Duration), int64((time.Second*time.Duration(entry.ReadSamples))/time.Duration(entry.Source.SampleRate)))
q.Remove(entry.Identifier)
q.HandleQueue()
@ -267,105 +265,6 @@ func (q *Queue) Remove(identifier audio.QueueIdentifier) bool {
return false
}
type httpAudioWriter struct {
timeout time.Duration
writer http.ResponseWriter
metadataToSend struct {
Title string
URL string
}
icyInterval int
icyCounter int
}
func (h *httpAudioWriter) writeIcy() error {
packetContent := make([]byte, 1, 4096)
if len(h.metadataToSend.Title) > 0 {
//TODO: quote quotes
packetContent = append(packetContent, []byte(fmt.Sprintf("StreamTitle='%s';", strings.ReplaceAll(h.metadataToSend.Title, "'", "")))...)
h.metadataToSend.Title = ""
}
if len(h.metadataToSend.URL) > 0 {
//TODO: quote quotes
packetContent = append(packetContent, []byte(fmt.Sprintf("StreamURL='%s';", strings.ReplaceAll(h.metadataToSend.URL, "'", "")))...)
h.metadataToSend.URL = ""
}
contentLength := len(packetContent) - 1
if contentLength > 16*255 {
//cannot send long titles
_, err := h.writer.Write(make([]byte, 1))
return err
}
if (contentLength % 16) == 0 { //already padded
packetContent[0] = byte(contentLength / 16)
} else {
packetContent[0] = byte(contentLength/16) + 1
packetContent = append(packetContent, make([]byte, 16-(contentLength%16))...)
}
_, err := h.writer.Write(packetContent)
return err
}
func (h *httpAudioWriter) Write(p []byte) (n int, err error) {
if h.writer != nil {
if h.icyInterval > 0 {
var i int
for len(p) > 0 {
l := h.icyInterval - h.icyCounter
if l <= len(p) {
i, err = h.writer.Write(p[:l])
n += i
if err != nil {
h.writer = nil
break
}
if err = h.writeIcy(); err != nil {
h.writer = nil
break
}
h.icyCounter = 0
p = p[l:]
} else {
i, err = h.writer.Write(p)
n += i
if err != nil {
h.writer = nil
break
}
h.icyCounter += i
p = p[:0]
}
}
} else {
n, err = h.writer.Write(p)
if err != nil {
h.writer = nil
}
}
return
}
return 0, io.EOF
}
func (h *httpAudioWriter) Close() (err error) {
h.writer = nil
return nil
}
func (h *httpAudioWriter) Flush() {
if h.writer != nil {
//TODO: not deadline aware?
/*if flusher, ok := h.writer.(http.Flusher); ok {
flusher.Flush()
}*/
}
}
func (q *Queue) GetListeners() (listeners []*ListenerInformation) {
q.mutex.RLock()
defer q.mutex.RUnlock()
@ -389,39 +288,122 @@ func (q *Queue) HandleRadioRequest(writer http.ResponseWriter, request *http.Req
writer.Header().Set("Cache-Control", "no-store, max-age=0, no-transform")
writer.Header().Set("X-Content-Type-Options", "nosniff")
byteWriter := &httpAudioWriter{writer: writer, timeout: time.Second * 2}
if numberValue, err := strconv.Atoi(request.Header.Get("icy-metadata")); err == nil && numberValue >= 1 {
byteWriter.icyInterval = 8192
writer.Header().Set("Icy-MetaInt", fmt.Sprintf("%d", byteWriter.icyInterval))
}
var packetWriteCallback func(packet packetizer.Packet) error
//buffer a bit, drop channels when buffer grows to not lock others. They will get disconnected elsewhere
const byteSliceChannelBuffer = 128
writeChannel := make(chan []byte, byteSliceChannelBuffer)
var requestDone error
var wgClient sync.WaitGroup
writeCallback := func(packet packetizer.Packet) error {
if metadataPacket, ok := packet.(*QueueMetadataPacket); ok {
if len(metadataPacket.TrackEntry.Metadata.Artist) > 0 {
byteWriter.metadataToSend.Title = fmt.Sprintf("%s - %s", metadataPacket.TrackEntry.Metadata.Artist, metadataPacket.TrackEntry.Metadata.Title)
} else {
byteWriter.metadataToSend.Title = metadataPacket.TrackEntry.Metadata.Title
}
byteWriter.metadataToSend.URL = metadataPacket.TrackEntry.Metadata.Art
return nil
}
//TODO: icy
/*
select {
case <-request.Context().Done():
// Client gave up
default:
if numberValue, err := strconv.Atoi(request.Header.Get("icy-metadata")); err == nil && numberValue >= 1 {
metadataToSend := make(map[string]string)
const icyInterval = 8192
icyCounter := 0
writer.Header().Set("Icy-MetaInt", fmt.Sprintf("%d", icyInterval))
writeIcy := func() []byte {
packetContent := make([]byte, 1, 4096)
for k, v := range metadataToSend {
packetContent = append(packetContent, []byte(fmt.Sprintf("%s='%s';", k, v))...)
delete(metadataToSend, k)
//shouldn't send multiple properties in same packet if we want working single quotes, wait until next ICY frame
break
}
*/
_, err := byteWriter.Write(packet.GetData())
byteWriter.Flush()
return err
contentLength := len(packetContent) - 1
if contentLength > 16*255 {
//cannot send long titles
return make([]byte, 1)
}
if (contentLength % 16) == 0 { //already padded
packetContent[0] = byte(contentLength / 16)
} else {
packetContent[0] = byte(contentLength/16) + 1
packetContent = append(packetContent, make([]byte, 16-(contentLength%16))...)
}
return packetContent
}
packetWriteCallback = func(packet packetizer.Packet) error {
if requestDone != nil {
return requestDone
}
if metadataPacket, ok := packet.(*QueueMetadataPacket); ok {
if len(metadataPacket.TrackEntry.Metadata.Artist) > 0 {
metadataToSend["StreamTitle"] = fmt.Sprintf("%s - %s", metadataPacket.TrackEntry.Metadata.Artist, metadataPacket.TrackEntry.Metadata.Title)
} else {
metadataToSend["StreamTitle"] = metadataPacket.TrackEntry.Metadata.Title
}
if len(metadataPacket.TrackEntry.Metadata.Art) > 0 {
metadataToSend["StreamURL"] = metadataPacket.TrackEntry.Metadata.Art
}
return nil
}
p := packet.GetData()
var data []byte
for len(p) > 0 {
l := icyInterval - icyCounter
if l <= len(p) {
data = append(data, p[:l]...)
data = append(data, writeIcy()...)
icyCounter = 0
p = p[l:]
} else {
data = append(data, p...)
icyCounter += len(p)
p = p[:0]
}
}
if len(writeChannel) >= byteSliceChannelBuffer-1 {
requestDone = errors.New("client ran out of buffer")
return requestDone
}
writeChannel <- data
return nil
}
} else {
packetWriteCallback = func(packet packetizer.Packet) error {
if requestDone != nil {
return requestDone
}
if _, ok := packet.(*QueueMetadataPacket); ok {
return nil
}
if len(writeChannel) >= byteSliceChannelBuffer-1 {
requestDone = errors.New("client ran out of buffer")
return requestDone
}
writeChannel <- packet.GetData()
return nil
}
}
wgClient.Add(1)
go func() {
defer wgClient.Done()
var flusher http.Flusher
if httpFlusher, ok := writer.(http.Flusher); ok {
flusher = httpFlusher
}
for byteSlice := range writeChannel {
if _, requestDone = writer.Write(byteSlice); requestDone != nil {
break
}
//try flush
if flusher != nil {
flusher.Flush()
}
}
}()
var headers []HeaderEntry
for k, v := range request.Header {
@ -469,6 +451,7 @@ func (q *Queue) HandleRadioRequest(writer http.ResponseWriter, request *http.Req
sampleBufferLimit = int64(getKnownBufferSize().Seconds() * float64(mount.SampleRate))
}
wgClient.Add(1)
mount.AddListener(&StreamListener{
Information: ListenerInformation{
Mount: mount.Mount,
@ -480,7 +463,7 @@ func (q *Queue) HandleRadioRequest(writer http.ResponseWriter, request *http.Req
sampleBufferMin := packets[len(packets)-1].GetStartSampleNumber() - sampleBufferLimit
for _, p := range packets {
if p.KeepMode() != packetizer.Discard || p.GetEndSampleNumber() >= sampleBufferMin {
if err := writeCallback(p); err != nil {
if err := packetWriteCallback(p); err != nil {
return err
}
}
@ -488,10 +471,10 @@ func (q *Queue) HandleRadioRequest(writer http.ResponseWriter, request *http.Req
}
return nil
},
Write: writeCallback,
Write: packetWriteCallback,
Close: func() {
byteWriter.Close()
wgClient.Done()
defer wgClient.Done()
close(writeChannel)
},
})
wgClient.Wait()