diff --git a/README.md b/README.md index ac8e0f1..e5e38f5 100644 --- a/README.md +++ b/README.md @@ -2,20 +2,16 @@ Radio streamer ([kawa](https://github.com/Luminarys/kawa) drop-in compatible). -This project is a Work in Progress. - -`TODO: ICY metadata` - # Improvements / differences from Kawa * Does not use libav ([see supported formats/codecs on Kirika](https://git.gammaspectra.live/S.O.N.G/Kirika#codecs-supported)) * No Vorbis support. -* Supports HTTP clients that have more than 16 HTTP request headers. +* Supports HTTP clients that have more than 16 HTTP request headers or longer than 64 bytes per header. * Does not restart stream per-track, instead being a continuous stream without parameter changes. * Normalized channels / sample rates for mounts. -* Implements ICY metadata +* Implements ICY metadata (artist, title, url). * Uses sample/timed packet buffers, instead of kawa byte buffers, which caused wild differences between endpoints. Mounts usually align within 0.2s of each other, depending on client. - * Use `queue.buffer_size` to specify number of seconds to buffer -* Implements `queue.nr` and `/random` (To be Deprecated/Changed) + * Use `queue.buffer_size` to specify number of seconds to buffer. +* Implements `queue.nr` and `/random` (to be deprecated/changed) # Future improvements * Allow playback of files by URL, not just by path diff --git a/example_config.toml b/example_config.toml index 520f4d6..3d8a717 100644 --- a/example_config.toml +++ b/example_config.toml @@ -36,7 +36,7 @@ fallback="/tmp/in.flac" # buffer_len=4096 # Duration in seconds of buffer to maintain. Set 0 for automatic mode depending on requesting client. # Maximum 10 seconds. -# Do note buffer is counted from start of frame, not end, for removal purposes. This depends on format and can be a second or so at times. +# Do note buffer is counted from end of frame, not start, for removal purposes. This depends on format and can be a second or so at times. buffer_duration=0 [radio] diff --git a/go.mod b/go.mod index dc60133..219f24a 100644 --- a/go.mod +++ b/go.mod @@ -3,8 +3,9 @@ module git.gammaspectra.live/S.O.N.G/MeteorLight go 1.18 require ( - git.gammaspectra.live/S.O.N.G/Kirika v0.0.0-20220303134137-d0976eac62c4 + git.gammaspectra.live/S.O.N.G/Kirika v0.0.0-20220304112513-f1e808b8f144 github.com/BurntSushi/toml v1.0.0 + github.com/enriquebris/goconcurrentqueue v0.6.3 ) require ( diff --git a/go.sum b/go.sum index 263aa2c..b11cf8c 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,5 @@ -git.gammaspectra.live/S.O.N.G/Kirika v0.0.0-20220303134137-d0976eac62c4 h1:2o+FJxxoN4pv2h/ZnhY7p0Cnr79tVDocD3UycH3Dmmk= -git.gammaspectra.live/S.O.N.G/Kirika v0.0.0-20220303134137-d0976eac62c4/go.mod h1:NYC/3wOINygtTYvAqEtMfgWBeJ/9Gfv0NvDxnWmg+yA= +git.gammaspectra.live/S.O.N.G/Kirika v0.0.0-20220304112513-f1e808b8f144 h1:2bJiVqiDQw36CKp6taE9MIl9tJ0j0E7Hp9n/QNgTdLo= +git.gammaspectra.live/S.O.N.G/Kirika v0.0.0-20220304112513-f1e808b8f144/go.mod h1:NYC/3wOINygtTYvAqEtMfgWBeJ/9Gfv0NvDxnWmg+yA= git.gammaspectra.live/S.O.N.G/go-fdkaac v0.0.0-20220228131722-e9cb84c52f48 h1:MaKiBfXQl0keyfdCi1PxGOKRTiWhIs8PqCal5GhKDi0= git.gammaspectra.live/S.O.N.G/go-fdkaac v0.0.0-20220228131722-e9cb84c52f48/go.mod h1:pkWt//S9hLVEQaJDPu/cHHPk8vPpo/0+zHy0me4LIP4= git.gammaspectra.live/S.O.N.G/go-pus v0.0.0-20220227175608-6cc027f24dba h1:JEaxCVgdr3XXAuDCPAx7ttLFZaaHzTEzG+oRnVUtUKU= @@ -12,10 +12,13 @@ github.com/BurntSushi/toml v1.0.0 h1:dtDWrepsVPfW9H/4y7dDgFc2MBUSeJhlaDtK13CxFlU github.com/BurntSushi/toml v1.0.0/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ= github.com/cocoonlife/testify v0.0.0-20160218172820-792cc1faeb64 h1:LjPYdzoFSAJ5Tr/ElL8kzTJghXgpnOjJVbgd1UvZB1o= github.com/d4l3k/messagediff v1.2.2-0.20190829033028-7e0a312ae40b/go.mod h1:Oozbb1TVXFac9FtSIxHBMnBCq2qeH/2KkEQxENCrlLo= +github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= github.com/dh1tw/gosamplerate v0.1.2 h1:oyqtZk67xB9B4l+vIZCZ3F0RYV/z66W58VOah11/ktI= github.com/dh1tw/gosamplerate v0.1.2/go.mod h1:zooTyHpoR7hE+FLfdE3yjLHb2QA2NpMusNfuaZqEACM= github.com/edgeware/mp4ff v0.26.1 h1:tH+TIesZZmrA8BN5HuiKWp3sv5NF4N1A2cFxTSCNL8E= github.com/edgeware/mp4ff v0.26.1/go.mod h1:6VHE5CTkpDseIg775+rh8BfnTvqjMnVbz5EDU4QwSdc= +github.com/enriquebris/goconcurrentqueue v0.6.3 h1:+ma7EEEFMDmJBIS6Q4KNJruChctgwYQFqlxdveIoEE4= +github.com/enriquebris/goconcurrentqueue v0.6.3/go.mod h1:OZ+KC2BcRYzjg0vgoUs1GFqdAjkD9mz2Ots7Jbm1yS4= github.com/go-audio/audio v1.0.0/go.mod h1:6uAu0+H2lHkwdGsAY+j2wHPNPpPoeg5AaEFh9FlA+Zs= github.com/go-audio/riff v1.0.0/go.mod h1:l3cQwc85y79NQFCRB7TiPoNiaijp6q8Z0Uv38rVG498= github.com/go-audio/wav v1.0.0/go.mod h1:3yoReyQOsiARkvPl3ERCi8JFjihzG6WhjYpZCf5zAWE= @@ -34,10 +37,13 @@ github.com/mewkiz/flac v1.0.7/go.mod h1:yU74UH277dBUpqxPouHSQIar3G1X/QIclVbFahSd github.com/mewkiz/pkg v0.0.0-20190919212034-518ade7978e2 h1:EyTNMdePWaoWsRSGQnXiSoQu0r6RS1eA557AwJhlzHU= github.com/mewkiz/pkg v0.0.0-20190919212034-518ade7978e2/go.mod h1:3E2FUC/qYUfM8+r9zAwpeHJzqRVVMIYnpzD/clwWxyA= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/sssgun/mp3 v0.0.0-20170810093403-85f2ec632081 h1:Qo/HswJzVywl0podyXMD62HIohsj/Ij2oXbD26aUIxM= github.com/sssgun/mp3 v0.0.0-20170810093403-85f2ec632081/go.mod h1:ExwZtltybPz8zLO8c2lKRfpPk1HAxhrkp038QIBs+yg= +github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= github.com/viert/go-lame v0.0.0-20201108052322-bb552596b11d h1:LptdD7GTUZeklomtW5vZ1AHwBvDBUCZ2Ftpaz7uEI7g= github.com/viert/go-lame v0.0.0-20201108052322-bb552596b11d/go.mod h1:EqTcYM7y4JlSfeTI47pmNu3EZQuCuLQefsQyg1Imlz8= golang.org/x/image v0.0.0-20190220214146-31aff87c08e9/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js= golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= diff --git a/mount.go b/mount.go index 4e69fab..68d7ccf 100644 --- a/mount.go +++ b/mount.go @@ -8,6 +8,7 @@ import ( "git.gammaspectra.live/S.O.N.G/Kirika/audio/format/mp3" "git.gammaspectra.live/S.O.N.G/Kirika/audio/format/opus" "git.gammaspectra.live/S.O.N.G/Kirika/audio/packetizer" + "github.com/enriquebris/goconcurrentqueue" "io" "log" "sync" @@ -31,11 +32,11 @@ type StreamListener struct { Close func() } type StreamMount struct { - Mount string - MimeType string - Packetizer packetizer.Packetizer - SampleRate int - + Mount string + MimeType string + Packetizer packetizer.Packetizer + SampleRate int + MetadataQueue *goconcurrentqueue.FIFO listeners []*StreamListener listenersLock sync.Mutex keepBuffer []packetizer.Packet @@ -147,41 +148,42 @@ func NewStreamMount(source audio.Source, mount string, codec string, container s } return &StreamMount{ - Mount: mount, - MimeType: mimeType, - Packetizer: packetizerEntry, - SampleRate: sampleRate, + Mount: mount, + MimeType: mimeType, + Packetizer: packetizerEntry, + SampleRate: sampleRate, + MetadataQueue: goconcurrentqueue.NewFIFO(), } } func (m *StreamMount) removeDiscard(sampleNumber int64) { for i, p := range m.keepBuffer { - if p.KeepMode() == packetizer.Discard && p.GetSampleNumber() <= sampleNumber { + if p.KeepMode() == packetizer.Discard && p.GetEndSampleNumber() <= sampleNumber { m.keepBuffer = append(m.keepBuffer[:i], m.keepBuffer[i+1:]...) m.removeDiscard(sampleNumber) break - } else if p.GetSampleNumber() > sampleNumber { + } else if p.GetEndSampleNumber() > sampleNumber { //they are placed in order break } } } -func (m *StreamMount) removeKeepLast() { +func (m *StreamMount) removeKeepLast(category int64) { for i, p := range m.keepBuffer { - if p.KeepMode() == packetizer.KeepLast { + if p.Category() == category && p.KeepMode() == packetizer.KeepLast { m.keepBuffer = append(m.keepBuffer[:i], m.keepBuffer[i+1:]...) - m.removeKeepLast() + m.removeKeepLast(category) break } } } -func (m *StreamMount) removeGroupKeep() { +func (m *StreamMount) removeGroupKeep(category int64) { for i, p := range m.keepBuffer { - if p.KeepMode() == packetizer.GroupKeep { + if p.Category() == category && p.KeepMode() == packetizer.GroupKeep { m.keepBuffer = append(m.keepBuffer[:i], m.keepBuffer[i+1:]...) - m.removeGroupKeep() + m.removeGroupKeep(category) break } } @@ -203,6 +205,52 @@ func (m *StreamMount) GetListeners() (entries []*ListenerInformation) { return } +func (m *StreamMount) handlePacket(packet packetizer.Packet) { + var toRemove []int + //TODO: do this via goroutine messaging? + for i, l := range m.listeners { + if l.Start != nil { + l.Start(m.keepBuffer) + l.Start = nil + } + if l.Write(packet) != nil { + toRemove = append(toRemove, i) + } + } + + if len(toRemove) > 0 { + m.listenersLock.Lock() + //TODO: remove more than one per iteration + for _, i := range toRemove { + l := m.listeners[i] + m.listeners = append(m.listeners[:i], m.listeners[i+1:]...) + l.Close() + break + } + m.listenersLock.Unlock() + toRemove = toRemove[:0] + } + + sampleLimit := packet.GetEndSampleNumber() - int64(maxBufferSize*m.SampleRate) + + m.removeDiscard(sampleLimit) //always remove discards + + switch packet.KeepMode() { + case packetizer.KeepLast: + m.removeKeepLast(packet.Category()) + fallthrough + case packetizer.Keep: + m.keepBuffer = append(m.keepBuffer, packet) + case packetizer.GroupKeep: + m.keepBuffer = append(m.keepBuffer, packet) + case packetizer.GroupDiscard: + m.removeGroupKeep(packet.Category()) + case packetizer.Discard: + m.keepBuffer = append(m.keepBuffer, packet) + + } +} + func (m *StreamMount) Process(group *sync.WaitGroup) { defer group.Done() defer func() { @@ -213,55 +261,24 @@ func (m *StreamMount) Process(group *sync.WaitGroup) { m.listeners = m.listeners[:0] m.listenersLock.Unlock() }() - var toRemove []int + for { packet := m.Packetizer.GetPacket() if packet == nil { return } - //TODO: do this via goroutine messaging? - for i, l := range m.listeners { - if l.Start != nil { - l.Start(m.keepBuffer) - l.Start = nil - } - if l.Write(packet) != nil { - toRemove = append(toRemove, i) + if item, err := m.MetadataQueue.Get(0); err == nil { + if metadataPacket, ok := item.(*QueueMetadataPacket); ok { + if packet.GetEndSampleNumber() > metadataPacket.GetStartSampleNumber() { + m.MetadataQueue.Dequeue() + m.handlePacket(metadataPacket) + } + } } - if len(toRemove) > 0 { - m.listenersLock.Lock() - //TODO: remove more than one per iteration - for _, i := range toRemove { - l := m.listeners[i] - m.listeners = append(m.listeners[:i], m.listeners[i+1:]...) - l.Close() - break - } - m.listenersLock.Unlock() - toRemove = toRemove[:0] - } - - sampleLimit := packet.GetSampleNumber() - int64(maxBufferSize*m.SampleRate) - - m.removeDiscard(sampleLimit) //always remove discards - - switch packet.KeepMode() { - case packetizer.KeepLast: - m.removeKeepLast() - fallthrough - case packetizer.Keep: - m.keepBuffer = append(m.keepBuffer, packet) - case packetizer.GroupKeep: - m.keepBuffer = append(m.keepBuffer, packet) - case packetizer.GroupDiscard: - m.removeGroupKeep() - case packetizer.Discard: - m.keepBuffer = append(m.keepBuffer, packet) - - } + m.handlePacket(packet) } diff --git a/queue.go b/queue.go index fe9288f..b25fb5d 100644 --- a/queue.go +++ b/queue.go @@ -13,6 +13,7 @@ import ( "net/http" "os" "path" + "strconv" "strings" "sync" "time" @@ -33,9 +34,35 @@ type QueueTrackEntry struct { original map[string]interface{} } +type QueueMetadataPacket struct { + sampleNumber int64 + TrackEntry *QueueTrackEntry +} + +func (p *QueueMetadataPacket) KeepMode() packetizer.KeepMode { + return packetizer.KeepLast +} + +func (p *QueueMetadataPacket) GetStartSampleNumber() int64 { + return p.sampleNumber +} + +func (p *QueueMetadataPacket) GetEndSampleNumber() int64 { + return p.sampleNumber +} + +func (p *QueueMetadataPacket) Category() int64 { + return -1 +} + +func (p *QueueMetadataPacket) GetData() []byte { + return nil +} + type Queue struct { NowPlaying chan *QueueTrackEntry QueueEmpty chan *QueueTrackEntry + Duration time.Duration audioQueue *audio.Queue mounts []*StreamMount queue []*QueueTrackEntry @@ -109,6 +136,13 @@ func (q *Queue) AddTrack(entry *QueueTrackEntry, tail bool) error { log.Printf("now playing %s\n", f.Name()) if e := q.Get(entry.Identifier); e != nil { q.NowPlaying <- e + for _, mount := range q.mounts { + mount.MetadataQueue.Enqueue(&QueueMetadataPacket{ + //TODO: carry error + sampleNumber: int64(q.Duration * time.Duration(queue.GetSampleRate()) / time.Second), + TrackEntry: e, + }) + } } } @@ -118,6 +152,12 @@ func (q *Queue) AddTrack(entry *QueueTrackEntry, tail bool) error { removeCallback := func(queue *audio.Queue, entry *audio.QueueEntry) { defer f.Close() + + q.mutex.Lock() + //TODO: carry error + q.Duration += (time.Second * time.Duration(entry.ReadSamples)) / time.Duration(entry.Source.SampleRate) + q.mutex.Unlock() + q.Remove(entry.Identifier) q.HandleQueue() } @@ -227,28 +267,97 @@ func (q *Queue) Remove(identifier audio.QueueIdentifier) bool { return false } -type httpWriter struct { - timeout time.Duration - writer http.ResponseWriter +type httpAudioWriter struct { + timeout time.Duration + writer http.ResponseWriter + metadataToSend struct { + Title string + URL string + } + icyInterval int + icyCounter int } -func (h *httpWriter) Write(p []byte) (n int, err error) { +func (h *httpAudioWriter) writeIcy() error { + packetContent := make([]byte, 1, 4096) + if len(h.metadataToSend.Title) > 0 { + //TODO: quote quotes + packetContent = append(packetContent, []byte(fmt.Sprintf("StreamTitle='%s';", strings.ReplaceAll(h.metadataToSend.Title, "'", "")))...) + h.metadataToSend.Title = "" + } + if len(h.metadataToSend.URL) > 0 { + //TODO: quote quotes + packetContent = append(packetContent, []byte(fmt.Sprintf("StreamURL='%s';", strings.ReplaceAll(h.metadataToSend.URL, "'", "")))...) + h.metadataToSend.URL = "" + } + + contentLength := len(packetContent) - 1 + if contentLength > 16*255 { + //cannot send long titles + _, err := h.writer.Write(make([]byte, 1)) + return err + } + + if (contentLength % 16) == 0 { //already padded + packetContent[0] = byte(contentLength / 16) + } else { + packetContent[0] = byte(contentLength/16) + 1 + packetContent = append(packetContent, make([]byte, 16-(contentLength%16))...) + } + + _, err := h.writer.Write(packetContent) + return err +} + +func (h *httpAudioWriter) Write(p []byte) (n int, err error) { if h.writer != nil { - n, err = h.writer.Write(p) - if err != nil { - h.writer = nil + if h.icyInterval > 0 { + var i int + for len(p) > 0 { + l := h.icyInterval - h.icyCounter + if l <= len(p) { + i, err = h.writer.Write(p[:l]) + n += i + if err != nil { + h.writer = nil + break + } + + if err = h.writeIcy(); err != nil { + h.writer = nil + break + } + + h.icyCounter = 0 + p = p[l:] + } else { + i, err = h.writer.Write(p) + n += i + if err != nil { + h.writer = nil + break + } + h.icyCounter += i + p = p[:0] + } + } + } else { + n, err = h.writer.Write(p) + if err != nil { + h.writer = nil + } } return } return 0, io.EOF } -func (h *httpWriter) Close() (err error) { +func (h *httpAudioWriter) Close() (err error) { h.writer = nil return nil } -func (h *httpWriter) Flush() { +func (h *httpAudioWriter) Flush() { if h.writer != nil { //TODO: not deadline aware? /*if flusher, ok := h.writer.(http.Flusher); ok { @@ -280,11 +389,25 @@ func (q *Queue) HandleRadioRequest(writer http.ResponseWriter, request *http.Req writer.Header().Set("Cache-Control", "no-store, max-age=0, no-transform") writer.Header().Set("X-Content-Type-Options", "nosniff") - byteWriter := &httpWriter{writer: writer, timeout: time.Second * 2} + byteWriter := &httpAudioWriter{writer: writer, timeout: time.Second * 2} + + if numberValue, err := strconv.Atoi(request.Header.Get("icy-metadata")); err == nil && numberValue >= 1 { + byteWriter.icyInterval = 8192 + writer.Header().Set("Icy-MetaInt", fmt.Sprintf("%d", byteWriter.icyInterval)) + } var wgClient sync.WaitGroup writeCallback := func(packet packetizer.Packet) error { + if metadataPacket, ok := packet.(*QueueMetadataPacket); ok { + if len(metadataPacket.TrackEntry.Metadata.Artist) > 0 { + byteWriter.metadataToSend.Title = fmt.Sprintf("%s - %s", metadataPacket.TrackEntry.Metadata.Artist, metadataPacket.TrackEntry.Metadata.Title) + } else { + byteWriter.metadataToSend.Title = metadataPacket.TrackEntry.Metadata.Title + } + byteWriter.metadataToSend.URL = metadataPacket.TrackEntry.Metadata.Art + return nil + } //TODO: icy /* select { @@ -354,9 +477,9 @@ func (q *Queue) HandleRadioRequest(writer http.ResponseWriter, request *http.Req }, Start: func(packets []packetizer.Packet) error { if len(packets) > 0 { - sampleBufferMin := packets[len(packets)-1].GetSampleNumber() - sampleBufferLimit + sampleBufferMin := packets[len(packets)-1].GetStartSampleNumber() - sampleBufferLimit for _, p := range packets { - if p.KeepMode() != packetizer.Discard || p.GetSampleNumber() >= sampleBufferMin { + if p.KeepMode() != packetizer.Discard || p.GetEndSampleNumber() >= sampleBufferMin { if err := writeCallback(p); err != nil { return err }