From 7f8274f4316190c2f43a2ce705c52d207da9bd63 Mon Sep 17 00:00:00 2001 From: WeebDataHoarder <57538841+WeebDataHoarder@users.noreply.github.com> Date: Thu, 3 Mar 2022 15:00:34 +0100 Subject: [PATCH] Implemented timeouts, audio sample buffers, and fast-start depending on client --- MeteorLight.go | 13 +++++-- README.md | 9 +++-- config.go | 1 + example_config.toml | 7 +++- go.mod | 2 +- go.sum | 4 +-- listener.go | 86 +++++++++++++++++++++++++++++++++++++++++++++ mount.go | 18 +++++++--- queue.go | 58 +++++++++++++++++++++++++----- 9 files changed, 176 insertions(+), 22 deletions(-) create mode 100644 listener.go diff --git a/MeteorLight.go b/MeteorLight.go index 6ddee89..eaaa63f 100644 --- a/MeteorLight.go +++ b/MeteorLight.go @@ -12,6 +12,7 @@ import ( "strconv" "strings" "sync" + "time" ) func getQueueEntryFromBody(body []byte) *QueueTrackEntry { @@ -111,13 +112,13 @@ func main() { //TODO: close properly for { if e := getRandomTrack(); e != nil { - queue.QueueEmpty <- e nr = e sendNowRandom(nr) + queue.QueueEmpty <- e } else if e = getFallbackTrack(); e != nil { - queue.QueueEmpty <- e nr = e sendNowRandom(nr) + queue.QueueEmpty <- e } } }() @@ -311,7 +312,13 @@ func main() { Handler: http.HandlerFunc(queue.HandleRadioRequest), } - if err := server.ListenAndServe(); err != nil { + //setup a timeout to prevent slow clients blocking. See https://github.com/golang/go/issues/16100 + timeoutListener, err := newListener("tcp", server.Addr, time.Second*5, time.Second*5) + if err != nil { + log.Panic(err) + } + + if err = server.Serve(timeoutListener); err != nil { log.Panic(err) } }() diff --git a/README.md b/README.md index c998176..ac8e0f1 100644 --- a/README.md +++ b/README.md @@ -4,7 +4,7 @@ Radio streamer ([kawa](https://github.com/Luminarys/kawa) drop-in compatible). This project is a Work in Progress. -`TODO: proper handling of audio queue <-> data queue, timeouts, ICY metadata` +`TODO: ICY metadata` # Improvements / differences from Kawa * Does not use libav ([see supported formats/codecs on Kirika](https://git.gammaspectra.live/S.O.N.G/Kirika#codecs-supported)) @@ -13,9 +13,13 @@ This project is a Work in Progress. * Does not restart stream per-track, instead being a continuous stream without parameter changes. * Normalized channels / sample rates for mounts. * Implements ICY metadata -* Uses sample/timed packet buffers, instead of kawa byte buffers, which cause wild differences between endpoints. Mounts usually align within 0.2s of each other, depending on client. +* Uses sample/timed packet buffers, instead of kawa byte buffers, which caused wild differences between endpoints. Mounts usually align within 0.2s of each other, depending on client. + * Use `queue.buffer_size` to specify number of seconds to buffer * Implements `queue.nr` and `/random` (To be Deprecated/Changed) +# Future improvements +* Allow playback of files by URL, not just by path +* Implement precise timing information side-channel ## Dependencies ### Go >= 1.18 @@ -38,6 +42,7 @@ Before continuing, you will need to install the dependencies listed above. ### From Git repository ```shell $ git clone https://git.gammaspectra.live/S.O.N.G/MeteorLight.git && cd MeteorLight +# create/edit config.toml $ go run . ``` diff --git a/config.go b/config.go index 672bcc9..b199a9d 100644 --- a/config.go +++ b/config.go @@ -13,6 +13,7 @@ type Config struct { NowRandom string `toml:"nr"` FallbackPath string `toml:"fallback"` BufferLengthInKiB int `toml:"buffer_len"` + BufferSeconds int `toml:"buffer_size"` } `toml:"queue"` Radio struct { Port int `toml:"port"` diff --git a/example_config.toml b/example_config.toml index c79b49c..520f4d6 100644 --- a/example_config.toml +++ b/example_config.toml @@ -26,13 +26,18 @@ np="http://localhost:8012/api/np" # # An HTTP POST is issued to this URL when MeteorLight fetches a random track. The body # will be identical to the JSON blob in memory. +# Can be left empty to not send nr events nr="http://localhost:8012/api/nr" # # When no tracks are available for whatever reason (such as external service # outages), this track will be played. fallback="/tmp/in.flac" # Length of buffer to maintain in KiB (not implemented) -buffer_len=4096 +# buffer_len=4096 +# Duration in seconds of buffer to maintain. Set 0 for automatic mode depending on requesting client. +# Maximum 10 seconds. +# Do note buffer is counted from start of frame, not end, for removal purposes. This depends on format and can be a second or so at times. +buffer_duration=0 [radio] # diff --git a/go.mod b/go.mod index 138d79f..dc60133 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module git.gammaspectra.live/S.O.N.G/MeteorLight go 1.18 require ( - git.gammaspectra.live/S.O.N.G/Kirika v0.0.0-20220303091537-9e01c732f300 + git.gammaspectra.live/S.O.N.G/Kirika v0.0.0-20220303134137-d0976eac62c4 github.com/BurntSushi/toml v1.0.0 ) diff --git a/go.sum b/go.sum index 8f5d03a..263aa2c 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,5 @@ -git.gammaspectra.live/S.O.N.G/Kirika v0.0.0-20220303091537-9e01c732f300 h1:tBRw3QkXCkybDhZ5lwqjNDbyB4YfI2pJ8ZXR+pkago0= -git.gammaspectra.live/S.O.N.G/Kirika v0.0.0-20220303091537-9e01c732f300/go.mod h1:NYC/3wOINygtTYvAqEtMfgWBeJ/9Gfv0NvDxnWmg+yA= +git.gammaspectra.live/S.O.N.G/Kirika v0.0.0-20220303134137-d0976eac62c4 h1:2o+FJxxoN4pv2h/ZnhY7p0Cnr79tVDocD3UycH3Dmmk= +git.gammaspectra.live/S.O.N.G/Kirika v0.0.0-20220303134137-d0976eac62c4/go.mod h1:NYC/3wOINygtTYvAqEtMfgWBeJ/9Gfv0NvDxnWmg+yA= git.gammaspectra.live/S.O.N.G/go-fdkaac v0.0.0-20220228131722-e9cb84c52f48 h1:MaKiBfXQl0keyfdCi1PxGOKRTiWhIs8PqCal5GhKDi0= git.gammaspectra.live/S.O.N.G/go-fdkaac v0.0.0-20220228131722-e9cb84c52f48/go.mod h1:pkWt//S9hLVEQaJDPu/cHHPk8vPpo/0+zHy0me4LIP4= git.gammaspectra.live/S.O.N.G/go-pus v0.0.0-20220227175608-6cc027f24dba h1:JEaxCVgdr3XXAuDCPAx7ttLFZaaHzTEzG+oRnVUtUKU= diff --git a/listener.go b/listener.go new file mode 100644 index 0000000..baf545a --- /dev/null +++ b/listener.go @@ -0,0 +1,86 @@ +package main + +import ( + "net" + "sync/atomic" + "time" +) + +type listener struct { + net.Listener + ReadTimeout time.Duration + WriteTimeout time.Duration +} + +func (l *listener) Accept() (net.Conn, error) { + c, err := l.Listener.Accept() + if err != nil { + return nil, err + } + tc := &Conn{ + Conn: c, + ReadTimeout: l.ReadTimeout, + WriteTimeout: l.WriteTimeout, + ReadThreshold: int32((l.ReadTimeout * 1024) / time.Second), + WriteThreshold: int32((l.WriteTimeout * 1024) / time.Second), + BytesReadFromDeadline: 0, + BytesWrittenFromDeadline: 0, + } + return tc, nil +} + +// Conn wraps a net.Conn, and sets a deadline for every read +// and write operation. +type Conn struct { + net.Conn + ReadTimeout time.Duration + WriteTimeout time.Duration + ReadThreshold int32 + WriteThreshold int32 + BytesReadFromDeadline int32 + BytesWrittenFromDeadline int32 +} + +func (c *Conn) Read(b []byte) (n int, err error) { + if atomic.LoadInt32(&c.BytesReadFromDeadline) > c.ReadThreshold { + atomic.StoreInt32(&c.BytesReadFromDeadline, 0) + // we set both read and write deadlines here otherwise after the request + // is read writing the response fails with an i/o timeout error + err = c.Conn.SetDeadline(time.Now().Add(c.ReadTimeout)) + if err != nil { + return 0, err + } + } + n, err = c.Conn.Read(b) + atomic.AddInt32(&c.BytesReadFromDeadline, int32(n)) + return +} + +func (c *Conn) Write(b []byte) (n int, err error) { + if atomic.LoadInt32(&c.BytesWrittenFromDeadline) > c.WriteThreshold { + atomic.StoreInt32(&c.BytesWrittenFromDeadline, 0) + // we extend the read deadline too, not sure it's necessary, + // but it doesn't hurt + err = c.Conn.SetDeadline(time.Now().Add(c.WriteTimeout)) + if err != nil { + return + } + } + n, err = c.Conn.Write(b) + atomic.AddInt32(&c.BytesWrittenFromDeadline, int32(n)) + return +} + +func newListener(network, addr string, readTimeout, writeTimeout time.Duration) (net.Listener, error) { + l, err := net.Listen(network, addr) + if err != nil { + return nil, err + } + + tl := &listener{ + Listener: l, + ReadTimeout: readTimeout, + WriteTimeout: writeTimeout, + } + return tl, nil +} diff --git a/mount.go b/mount.go index 98ab32e..4e69fab 100644 --- a/mount.go +++ b/mount.go @@ -34,6 +34,7 @@ type StreamMount struct { Mount string MimeType string Packetizer packetizer.Packetizer + SampleRate int listeners []*StreamListener listenersLock sync.Mutex @@ -48,6 +49,8 @@ func NewStreamMount(source audio.Source, mount string, codec string, container s reader, writer := io.Pipe() var packetizerEntry packetizer.Packetizer + sampleRate := source.SampleRate + switch codec { case "opus": encoderFormat = opus.NewFormat() @@ -127,6 +130,7 @@ func NewStreamMount(source audio.Source, mount string, codec string, container s } if opusFormat, ok := encoderFormat.(opus.Format); ok { + sampleRate = opus.FixedSampleRate go func() { defer writer.Close() if err := opusFormat.Encode(audio.NewResampleFilter(opus.FixedSampleRate, audio.Linear, 0).Process(source), writer, options); err != nil { @@ -146,14 +150,18 @@ func NewStreamMount(source audio.Source, mount string, codec string, container s Mount: mount, MimeType: mimeType, Packetizer: packetizerEntry, + SampleRate: sampleRate, } } -func (m *StreamMount) removeDiscard() { +func (m *StreamMount) removeDiscard(sampleNumber int64) { for i, p := range m.keepBuffer { - if p.KeepMode() == packetizer.Discard { + if p.KeepMode() == packetizer.Discard && p.GetSampleNumber() <= sampleNumber { m.keepBuffer = append(m.keepBuffer[:i], m.keepBuffer[i+1:]...) - m.removeDiscard() + m.removeDiscard(sampleNumber) + break + } else if p.GetSampleNumber() > sampleNumber { + //they are placed in order break } } @@ -236,7 +244,9 @@ func (m *StreamMount) Process(group *sync.WaitGroup) { toRemove = toRemove[:0] } - m.removeDiscard() //always remove discards + sampleLimit := packet.GetSampleNumber() - int64(maxBufferSize*m.SampleRate) + + m.removeDiscard(sampleLimit) //always remove discards switch packet.KeepMode() { case packetizer.KeepLast: diff --git a/queue.go b/queue.go index 9d86366..fe9288f 100644 --- a/queue.go +++ b/queue.go @@ -15,8 +15,11 @@ import ( "path" "strings" "sync" + "time" ) +const maxBufferSize = 10 + type QueueTrackEntry struct { QueueIdentifier audio.QueueIdentifier Path string @@ -49,9 +52,8 @@ func NewQueue(config *Config) *Queue { audioQueue: audio.NewQueue(44100, 2), } blocksPerSecond := 20 - bufferSeconds := blocksPerSecond * 1 - sources := SplitAudioSource(audio.NewFilterChain(q.audioQueue.GetSource(), audio.NewRealTimeFilter(blocksPerSecond), audio.NewBufferFilter(bufferSeconds)), len(config.Streams)) + sources := SplitAudioSource(audio.NewFilterChain(q.audioQueue.GetSource(), audio.NewBufferFilter(16), audio.NewRealTimeFilter(blocksPerSecond), audio.NewBufferFilter(maxBufferSize*blocksPerSecond)), len(config.Streams)) for i, s := range q.config.Streams { mount := NewStreamMount(sources[i], s.MountPath, s.Codec, s.Container, s.Bitrate) if mount == nil { @@ -226,7 +228,8 @@ func (q *Queue) Remove(identifier audio.QueueIdentifier) bool { } type httpWriter struct { - writer http.ResponseWriter + timeout time.Duration + writer http.ResponseWriter } func (h *httpWriter) Write(p []byte) (n int, err error) { @@ -247,9 +250,10 @@ func (h *httpWriter) Close() (err error) { func (h *httpWriter) Flush() { if h.writer != nil { - if flusher, ok := h.writer.(http.Flusher); ok { + //TODO: not deadline aware? + /*if flusher, ok := h.writer.(http.Flusher); ok { flusher.Flush() - } + }*/ } } @@ -276,7 +280,7 @@ func (q *Queue) HandleRadioRequest(writer http.ResponseWriter, request *http.Req writer.Header().Set("Cache-Control", "no-store, max-age=0, no-transform") writer.Header().Set("X-Content-Type-Options", "nosniff") - byteWriter := &httpWriter{writer: writer} + byteWriter := &httpWriter{writer: writer, timeout: time.Second * 2} var wgClient sync.WaitGroup @@ -311,6 +315,37 @@ func (q *Queue) HandleRadioRequest(writer http.ResponseWriter, request *http.Req uriPath += "?" + request.URL.Query().Encode() } + getKnownBufferSize := func() time.Duration { + userAgent := request.Header.Get("user-agent") + if strings.Index(userAgent, "libmpv") != -1 || strings.Index(userAgent, "mpv ") != -1 { //mpv + return time.Millisecond * 2500 + } else if strings.Index(userAgent, "libvlc") != -1 { //VLC + return time.Millisecond * 2500 + } else if strings.Index(userAgent, "lavf/") != -1 { //ffplay + return time.Millisecond * 2500 + } else if strings.Index(userAgent, "gvfs/") != -1 { //gvfs + return time.Millisecond * 2500 + } else if strings.Index(userAgent, "Music Player Daemon ") != -1 { //MPD + return time.Millisecond * 2500 + } else if strings.Index(userAgent, " Chrome/") != -1 { //Chromium-based + return time.Millisecond * 5000 + } else if strings.Index(userAgent, " Safari/") != -1 { //Safari-based + return time.Millisecond * 5000 + } else if strings.Index(userAgent, " Gecko/") != -1 { //Gecko-based (Firefox) + return time.Millisecond * 5000 + } else if request.Header.Get("icy-metadata") == "1" { //other unknown players + return time.Millisecond * 5000 + } + + //fallback and provide maximum buffer + return time.Second * maxBufferSize + } + + sampleBufferLimit := int64(q.config.Queue.BufferSeconds * mount.SampleRate) + if q.config.Queue.BufferSeconds == 0 { //auto buffer setup based on user agent and other client headers + sampleBufferLimit = int64(getKnownBufferSize().Seconds() * float64(mount.SampleRate)) + } + mount.AddListener(&StreamListener{ Information: ListenerInformation{ Mount: mount.Mount, @@ -318,9 +353,14 @@ func (q *Queue) HandleRadioRequest(writer http.ResponseWriter, request *http.Req Headers: headers, }, Start: func(packets []packetizer.Packet) error { - for _, p := range packets { - if err := writeCallback(p); err != nil { - return err + if len(packets) > 0 { + sampleBufferMin := packets[len(packets)-1].GetSampleNumber() - sampleBufferLimit + for _, p := range packets { + if p.KeepMode() != packetizer.Discard || p.GetSampleNumber() >= sampleBufferMin { + if err := writeCallback(p); err != nil { + return err + } + } } } return nil