From 0b472366bd7443298c5de7b91914e885944aa57f Mon Sep 17 00:00:00 2001 From: WeebDataHoarder <57538841+WeebDataHoarder@users.noreply.github.com> Date: Thu, 21 Jul 2022 17:34:31 +0200 Subject: [PATCH] Add API for listener removal, report listener id under /listeners and mount header --- README.md | 56 +++++++++++++++++++++++++++++++++++++++++++++++++++++-- api.go | 38 +++++++++++++++++++++++++++++-------- mount.go | 51 ++++++++++++++++++++++++++++++++------------------ queue.go | 34 +++++++++++++++++++++++---------- 4 files changed, 141 insertions(+), 38 deletions(-) diff --git a/README.md b/README.md index e74b22c..ba2cf91 100644 --- a/README.md +++ b/README.md @@ -11,14 +11,15 @@ Radio streamer ([kawa](https://github.com/Luminarys/kawa) drop-in compatible). * Implements ICY metadata (artist, title, url). * Uses sample/timed packet buffers, instead of kawa byte buffers, which caused wild differences between endpoints. Mounts usually align within 0.2s of each other, depending on client. * Use `queue.buffer_size` to specify number of seconds to buffer (by default 0, automatic per client). -* Implements `queue.nr` and `/random` (to be deprecated/changed) +* Implements `queue.nr` and `/random` (to be deprecated/changed). * Supports max queue length via `queue.length` config. * Supports extra encoder bitrate control settings (CBR, VBR, auto, etc.) * Can set custom sample rate / channel count / bitdepth / compression level per stream mount. * Can read and apply ReplayGain tags, or normalize audio loudness. * Can have audio sources over HTTP(s) URLs on `path` property, and supports seeking. * [Precise metadata and timing information packet stream](PACKET_STREAM.md), trigger via `x-audio-packet-stream: 1` HTTP header. -* Workaround to allow FLAC streaming under Safari +* Workaround to allow FLAC streaming under Safari. +* API additions to allow working with direct queue items or listeners. ## Dependencies ### Go >= 1.18 @@ -96,6 +97,57 @@ Same as kawa's, but `queue_id` is added to response directly. } ``` +### `NEW` DELETE /listeners/ +Drops the listener connection with `listener_id` specified as a parameter. + +#### Response +```json +{ + "success": true, + "reason": null +} +``` + +### `CHANGED` GET /listeners +Same as kawa's, but `identifier` is added to each listener entry. + +The listener `identifier` is generated based on user connection address, port, user-agent and mount. + +Additionally, a `x-listener-identifier` header is exposed to mount response. + +#### Response +```json +[ + { + "identifier": "641df131cb52f8f6381d9946cccb822e", + "mount": "stream.flac", + "path": "/stream.flac", + "headers": [ + { + "name": "User-Agent", + "value": "libmpv" + }, + { + "name": "Accept", + "value": "*/*" + }, + { + "name": "Range", + "value": "bytes=0-" + }, + { + "name": "Connection", + "value": "close" + }, + { + "name": "Icy-Metadata", + "value": "1" + } + ] + } +] +``` + ## Mount API ### `NEW` GET /mounts A simple listing of the working mounts + settings are made available. diff --git a/api.go b/api.go index a367b60..90f874e 100644 --- a/api.go +++ b/api.go @@ -5,7 +5,7 @@ import ( "encoding/json" "fmt" "git.gammaspectra.live/S.O.N.G/Kirika/audio/queue" - "io/ioutil" + "io" "log" "net/http" "strconv" @@ -83,7 +83,7 @@ func (a *API) getRandomTrack() *QueueTrackEntry { } defer response.Body.Close() - body, err := ioutil.ReadAll(response.Body) + body, err := io.ReadAll(response.Body) if err != nil { return nil } @@ -151,8 +151,8 @@ func (a *API) handleQueue() { func (a *API) listen() { type resultResponse struct { - Success bool `json:"success"` - Reason error `json:"reason"` + Success bool `json:"success"` + Reason *string `json:"reason"` } type queueResultResponse struct { @@ -174,8 +174,26 @@ func (a *API) listen() { if len(pathSegments) > 1 { switch pathSegments[1] { case "listeners": - jsonData, _ := json.Marshal(a.queue.GetListeners()) - writer.Write(jsonData) + if len(pathSegments) > 2 { + if request.Method != "DELETE" { + return + } + + result := resultResponse{ + Success: a.queue.RemoveListener(pathSegments[2]), + } + if !result.Success { + resultErr := fmt.Sprintf("listener %s not found", pathSegments[2]) + result.Reason = &resultErr + } + jsonData, _ := json.Marshal(result) + writer.Write(jsonData) + + return + } else { + jsonData, _ := json.Marshal(a.queue.GetListeners()) + writer.Write(jsonData) + } return case "np": @@ -214,12 +232,14 @@ func (a *API) listen() { case "head": if request.Method == "POST" { result := queueResultResponse{} - if body, err := ioutil.ReadAll(request.Body); err == nil { + if body, err := io.ReadAll(request.Body); err == nil { if e := a.getQueueEntryFromBody(body); e != nil { if err = a.queue.AddTrack(e, false); err == nil { result.Success = true result.QueueId = e.QueueIdentifier } else { + resultErr := err.Error() + result.Reason = &resultErr log.Printf("track addition error: \"%s\"", err) } } @@ -244,12 +264,14 @@ func (a *API) listen() { case "tail": if request.Method == "POST" { result := queueResultResponse{} - if body, err := ioutil.ReadAll(request.Body); err == nil { + if body, err := io.ReadAll(request.Body); err == nil { if e := a.getQueueEntryFromBody(body); e != nil { if err = a.queue.AddTrack(e, true); err == nil { result.Success = true result.QueueId = e.QueueIdentifier } else { + resultErr := err.Error() + result.Reason = &resultErr log.Printf("track addition error: \"%s\"", err) } } diff --git a/mount.go b/mount.go index 4ba8b0b..4891e0a 100644 --- a/mount.go +++ b/mount.go @@ -237,9 +237,34 @@ func (m *StreamMount) AddListener(listener *StreamListener) { m.listeners = append(m.listeners, listener) } +func (m *StreamMount) RemoveListener(identifier string, direct ...bool) bool { + if (len(direct) > 0 && direct[0]) || func() bool { + m.listenersLock.RLock() + defer m.listenersLock.RUnlock() + for _, l := range m.listeners { + if l.Information.Identifier == identifier { + return true + } + } + return false + }() { + m.listenersLock.Lock() + defer m.listenersLock.Unlock() + for i := range m.listeners { + l := m.listeners[i] + if l.Information.Identifier == identifier { + m.listeners = append(m.listeners[:i], m.listeners[i+1:]...) + l.Close() + return true + } + } + } + return false +} + func (m *StreamMount) GetListeners() (entries []*ListenerInformation) { - m.listenersLock.Lock() - defer m.listenersLock.Unlock() + m.listenersLock.RLock() + defer m.listenersLock.RUnlock() for _, l := range m.listeners { entries = append(entries, &l.Information) } @@ -248,39 +273,28 @@ func (m *StreamMount) GetListeners() (entries []*ListenerInformation) { } func (m *StreamMount) handlePacket(packet packetizer.Packet) { - var toRemove []int + var toRemove []string //TODO: do this via goroutine messaging? func() { m.listenersLock.RLock() defer m.listenersLock.RUnlock() var err error - for i, l := range m.listeners { + for _, l := range m.listeners { if l.Start != nil { l.Start(m.keepBuffer) l.Start = nil } if err = l.Write(packet); err != nil { log.Printf("failed to write data to %s client: %s\n", l.Information.Identifier, err) - toRemove = append(toRemove, i) + toRemove = append(toRemove, l.Information.Identifier) } } }() - if len(toRemove) > 0 { - func() { - m.listenersLock.Lock() - defer m.listenersLock.Unlock() - //TODO: remove more than one per iteration - for _, i := range toRemove { - l := m.listeners[i] - m.listeners = append(m.listeners[:i], m.listeners[i+1:]...) - l.Close() - break - } - toRemove = toRemove[:0] - }() + for _, id := range toRemove { + m.RemoveListener(id, true) } sampleLimit := packet.GetEndSampleNumber() - int64(maxBufferSize*m.SampleRate) @@ -306,6 +320,7 @@ func (m *StreamMount) handlePacket(packet packetizer.Packet) { func (m *StreamMount) Process(group *sync.WaitGroup) { defer group.Done() defer func() { + //Teardown all listeners m.listenersLock.Lock() for _, l := range m.listeners { l.Close() diff --git a/queue.go b/queue.go index 7629547..09c1fad 100644 --- a/queue.go +++ b/queue.go @@ -402,6 +402,17 @@ func (q *Queue) Remove(identifier queue.QueueIdentifier) bool { return false } +func (q *Queue) RemoveListener(identifier string) bool { + q.mutex.RLock() + defer q.mutex.RUnlock() + for _, mount := range q.mounts { + if mount.RemoveListener(identifier) { + return true + } + } + return false +} + func (q *Queue) GetListeners() (listeners []*ListenerInformation) { q.mutex.RLock() defer q.mutex.RUnlock() @@ -595,7 +606,7 @@ func (q *Queue) HandleRadioRequest(writer http.ResponseWriter, request *http.Req writeChannel := make(chan []byte, byteSliceChannelBuffer) requestDone := struct { - Done atomic.Bool + Done uint32 Lock sync.Mutex Error error }{} @@ -609,7 +620,7 @@ func (q *Queue) HandleRadioRequest(writer http.ResponseWriter, request *http.Req writer.Header().Set("Content-Type", "application/x-audio-packet-stream") packetWriteCallback = func(packet packetizer.Packet) error { - if requestDone.Done.Load() { + if atomic.LoadUint32(&requestDone.Done) == 1 { return requestDone.Error } if metadataPacket, ok := packet.(*QueueMetadataPacket); ok { @@ -621,7 +632,7 @@ func (q *Queue) HandleRadioRequest(writer http.ResponseWriter, request *http.Req requestDone.Lock.Lock() defer requestDone.Lock.Unlock() requestDone.Error = errors.New("client ran out of buffer") - requestDone.Done.Store(true) + atomic.StoreUint32(&requestDone.Done, 1) return requestDone.Error } @@ -638,7 +649,7 @@ func (q *Queue) HandleRadioRequest(writer http.ResponseWriter, request *http.Req requestDone.Lock.Lock() defer requestDone.Lock.Unlock() requestDone.Error = errors.New("client ran out of buffer") - requestDone.Done.Store(true) + atomic.StoreUint32(&requestDone.Done, 1) return requestDone.Error } @@ -657,7 +668,7 @@ func (q *Queue) HandleRadioRequest(writer http.ResponseWriter, request *http.Req requestDone.Lock.Lock() defer requestDone.Lock.Unlock() requestDone.Error = errors.New("client ran out of buffer") - requestDone.Done.Store(true) + atomic.StoreUint32(&requestDone.Done, 1) return requestDone.Error } @@ -738,7 +749,7 @@ func (q *Queue) HandleRadioRequest(writer http.ResponseWriter, request *http.Req var streamStartOffset int64 = -1 packetWriteCallback = func(packet packetizer.Packet) error { - if requestDone.Done.Load() { + if atomic.LoadUint32(&requestDone.Done) == 1 { return requestDone.Error } if metadataPacket, ok := packet.(*QueueMetadataPacket); ok { @@ -788,7 +799,7 @@ func (q *Queue) HandleRadioRequest(writer http.ResponseWriter, request *http.Req requestDone.Lock.Lock() defer requestDone.Lock.Unlock() requestDone.Error = errors.New("client ran out of buffer") - requestDone.Done.Store(true) + atomic.StoreUint32(&requestDone.Done, 1) return requestDone.Error } writeChannel <- data @@ -797,7 +808,7 @@ func (q *Queue) HandleRadioRequest(writer http.ResponseWriter, request *http.Req } else { var streamStartOffset int64 = -1 packetWriteCallback = func(packet packetizer.Packet) error { - if requestDone.Done.Load() { + if atomic.LoadUint32(&requestDone.Done) == 1 { return requestDone.Error } if _, ok := packet.(*QueueMetadataPacket); ok { @@ -808,7 +819,7 @@ func (q *Queue) HandleRadioRequest(writer http.ResponseWriter, request *http.Req requestDone.Lock.Lock() defer requestDone.Lock.Unlock() requestDone.Error = errors.New("client ran out of buffer") - requestDone.Done.Store(true) + atomic.StoreUint32(&requestDone.Done, 1) return requestDone.Error } @@ -844,7 +855,7 @@ func (q *Queue) HandleRadioRequest(writer http.ResponseWriter, request *http.Req requestDone.Lock.Lock() defer requestDone.Lock.Unlock() requestDone.Error = errors.New("client ran out of buffer") - requestDone.Done.Store(true) + atomic.StoreUint32(&requestDone.Done, 1) return } //try flush @@ -905,6 +916,9 @@ func (q *Queue) HandleRadioRequest(writer http.ResponseWriter, request *http.Req hashSum := sha256.Sum256([]byte(fmt.Sprintf("%s-%s-%s-%s", request.RequestURI, request.RemoteAddr, request.Proto, request.Header.Get("user-agent")))) listenerIdentifier := hex.EncodeToString(hashSum[16:]) + + writer.Header().Set("x-listener-identifier", listenerIdentifier) + mount.AddListener(&StreamListener{ Information: ListenerInformation{ Identifier: listenerIdentifier,