Add API for listener removal, report listener id under /listeners and mount header
All checks were successful
continuous-integration/drone/push Build is passing

This commit is contained in:
DataHoarder 2022-07-21 17:34:31 +02:00
parent 41c86cab4a
commit 0b472366bd
Signed by: DataHoarder
SSH key fingerprint: SHA256:OLTRf6Fl87G52SiR7sWLGNzlJt4WOX+tfI2yxo0z7xk
4 changed files with 141 additions and 38 deletions

View file

@ -11,14 +11,15 @@ Radio streamer ([kawa](https://github.com/Luminarys/kawa) drop-in compatible).
* Implements ICY metadata (artist, title, url). * Implements ICY metadata (artist, title, url).
* Uses sample/timed packet buffers, instead of kawa byte buffers, which caused wild differences between endpoints. Mounts usually align within 0.2s of each other, depending on client. * Uses sample/timed packet buffers, instead of kawa byte buffers, which caused wild differences between endpoints. Mounts usually align within 0.2s of each other, depending on client.
* Use `queue.buffer_size` to specify number of seconds to buffer (by default 0, automatic per client). * Use `queue.buffer_size` to specify number of seconds to buffer (by default 0, automatic per client).
* Implements `queue.nr` and `/random` (to be deprecated/changed) * Implements `queue.nr` and `/random` (to be deprecated/changed).
* Supports max queue length via `queue.length` config. * Supports max queue length via `queue.length` config.
* Supports extra encoder bitrate control settings (CBR, VBR, auto, etc.) * Supports extra encoder bitrate control settings (CBR, VBR, auto, etc.)
* Can set custom sample rate / channel count / bitdepth / compression level per stream mount. * Can set custom sample rate / channel count / bitdepth / compression level per stream mount.
* Can read and apply ReplayGain tags, or normalize audio loudness. * Can read and apply ReplayGain tags, or normalize audio loudness.
* Can have audio sources over HTTP(s) URLs on `path` property, and supports seeking. * Can have audio sources over HTTP(s) URLs on `path` property, and supports seeking.
* [Precise metadata and timing information packet stream](PACKET_STREAM.md), trigger via `x-audio-packet-stream: 1` HTTP header. * [Precise metadata and timing information packet stream](PACKET_STREAM.md), trigger via `x-audio-packet-stream: 1` HTTP header.
* Workaround to allow FLAC streaming under Safari * Workaround to allow FLAC streaming under Safari.
* API additions to allow working with direct queue items or listeners.
## Dependencies ## Dependencies
### Go >= 1.18 ### Go >= 1.18
@ -96,6 +97,57 @@ Same as kawa's, but `queue_id` is added to response directly.
} }
``` ```
### `NEW` DELETE /listeners/<listener_id>
Drops the listener connection with `listener_id` specified as a parameter.
#### Response
```json
{
"success": true,
"reason": null
}
```
### `CHANGED` GET /listeners
Same as kawa's, but `identifier` is added to each listener entry.
The listener `identifier` is generated based on user connection address, port, user-agent and mount.
Additionally, a `x-listener-identifier` header is exposed to mount response.
#### Response
```json
[
{
"identifier": "641df131cb52f8f6381d9946cccb822e",
"mount": "stream.flac",
"path": "/stream.flac",
"headers": [
{
"name": "User-Agent",
"value": "libmpv"
},
{
"name": "Accept",
"value": "*/*"
},
{
"name": "Range",
"value": "bytes=0-"
},
{
"name": "Connection",
"value": "close"
},
{
"name": "Icy-Metadata",
"value": "1"
}
]
}
]
```
## Mount API ## Mount API
### `NEW` GET /mounts ### `NEW` GET /mounts
A simple listing of the working mounts + settings are made available. A simple listing of the working mounts + settings are made available.

38
api.go
View file

@ -5,7 +5,7 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"git.gammaspectra.live/S.O.N.G/Kirika/audio/queue" "git.gammaspectra.live/S.O.N.G/Kirika/audio/queue"
"io/ioutil" "io"
"log" "log"
"net/http" "net/http"
"strconv" "strconv"
@ -83,7 +83,7 @@ func (a *API) getRandomTrack() *QueueTrackEntry {
} }
defer response.Body.Close() defer response.Body.Close()
body, err := ioutil.ReadAll(response.Body) body, err := io.ReadAll(response.Body)
if err != nil { if err != nil {
return nil return nil
} }
@ -151,8 +151,8 @@ func (a *API) handleQueue() {
func (a *API) listen() { func (a *API) listen() {
type resultResponse struct { type resultResponse struct {
Success bool `json:"success"` Success bool `json:"success"`
Reason error `json:"reason"` Reason *string `json:"reason"`
} }
type queueResultResponse struct { type queueResultResponse struct {
@ -174,8 +174,26 @@ func (a *API) listen() {
if len(pathSegments) > 1 { if len(pathSegments) > 1 {
switch pathSegments[1] { switch pathSegments[1] {
case "listeners": case "listeners":
jsonData, _ := json.Marshal(a.queue.GetListeners()) if len(pathSegments) > 2 {
writer.Write(jsonData) if request.Method != "DELETE" {
return
}
result := resultResponse{
Success: a.queue.RemoveListener(pathSegments[2]),
}
if !result.Success {
resultErr := fmt.Sprintf("listener %s not found", pathSegments[2])
result.Reason = &resultErr
}
jsonData, _ := json.Marshal(result)
writer.Write(jsonData)
return
} else {
jsonData, _ := json.Marshal(a.queue.GetListeners())
writer.Write(jsonData)
}
return return
case "np": case "np":
@ -214,12 +232,14 @@ func (a *API) listen() {
case "head": case "head":
if request.Method == "POST" { if request.Method == "POST" {
result := queueResultResponse{} result := queueResultResponse{}
if body, err := ioutil.ReadAll(request.Body); err == nil { if body, err := io.ReadAll(request.Body); err == nil {
if e := a.getQueueEntryFromBody(body); e != nil { if e := a.getQueueEntryFromBody(body); e != nil {
if err = a.queue.AddTrack(e, false); err == nil { if err = a.queue.AddTrack(e, false); err == nil {
result.Success = true result.Success = true
result.QueueId = e.QueueIdentifier result.QueueId = e.QueueIdentifier
} else { } else {
resultErr := err.Error()
result.Reason = &resultErr
log.Printf("track addition error: \"%s\"", err) log.Printf("track addition error: \"%s\"", err)
} }
} }
@ -244,12 +264,14 @@ func (a *API) listen() {
case "tail": case "tail":
if request.Method == "POST" { if request.Method == "POST" {
result := queueResultResponse{} result := queueResultResponse{}
if body, err := ioutil.ReadAll(request.Body); err == nil { if body, err := io.ReadAll(request.Body); err == nil {
if e := a.getQueueEntryFromBody(body); e != nil { if e := a.getQueueEntryFromBody(body); e != nil {
if err = a.queue.AddTrack(e, true); err == nil { if err = a.queue.AddTrack(e, true); err == nil {
result.Success = true result.Success = true
result.QueueId = e.QueueIdentifier result.QueueId = e.QueueIdentifier
} else { } else {
resultErr := err.Error()
result.Reason = &resultErr
log.Printf("track addition error: \"%s\"", err) log.Printf("track addition error: \"%s\"", err)
} }
} }

View file

@ -237,9 +237,34 @@ func (m *StreamMount) AddListener(listener *StreamListener) {
m.listeners = append(m.listeners, listener) m.listeners = append(m.listeners, listener)
} }
func (m *StreamMount) RemoveListener(identifier string, direct ...bool) bool {
if (len(direct) > 0 && direct[0]) || func() bool {
m.listenersLock.RLock()
defer m.listenersLock.RUnlock()
for _, l := range m.listeners {
if l.Information.Identifier == identifier {
return true
}
}
return false
}() {
m.listenersLock.Lock()
defer m.listenersLock.Unlock()
for i := range m.listeners {
l := m.listeners[i]
if l.Information.Identifier == identifier {
m.listeners = append(m.listeners[:i], m.listeners[i+1:]...)
l.Close()
return true
}
}
}
return false
}
func (m *StreamMount) GetListeners() (entries []*ListenerInformation) { func (m *StreamMount) GetListeners() (entries []*ListenerInformation) {
m.listenersLock.Lock() m.listenersLock.RLock()
defer m.listenersLock.Unlock() defer m.listenersLock.RUnlock()
for _, l := range m.listeners { for _, l := range m.listeners {
entries = append(entries, &l.Information) entries = append(entries, &l.Information)
} }
@ -248,39 +273,28 @@ func (m *StreamMount) GetListeners() (entries []*ListenerInformation) {
} }
func (m *StreamMount) handlePacket(packet packetizer.Packet) { func (m *StreamMount) handlePacket(packet packetizer.Packet) {
var toRemove []int var toRemove []string
//TODO: do this via goroutine messaging? //TODO: do this via goroutine messaging?
func() { func() {
m.listenersLock.RLock() m.listenersLock.RLock()
defer m.listenersLock.RUnlock() defer m.listenersLock.RUnlock()
var err error var err error
for i, l := range m.listeners { for _, l := range m.listeners {
if l.Start != nil { if l.Start != nil {
l.Start(m.keepBuffer) l.Start(m.keepBuffer)
l.Start = nil l.Start = nil
} }
if err = l.Write(packet); err != nil { if err = l.Write(packet); err != nil {
log.Printf("failed to write data to %s client: %s\n", l.Information.Identifier, err) log.Printf("failed to write data to %s client: %s\n", l.Information.Identifier, err)
toRemove = append(toRemove, i) toRemove = append(toRemove, l.Information.Identifier)
} }
} }
}() }()
if len(toRemove) > 0 { for _, id := range toRemove {
func() { m.RemoveListener(id, true)
m.listenersLock.Lock()
defer m.listenersLock.Unlock()
//TODO: remove more than one per iteration
for _, i := range toRemove {
l := m.listeners[i]
m.listeners = append(m.listeners[:i], m.listeners[i+1:]...)
l.Close()
break
}
toRemove = toRemove[:0]
}()
} }
sampleLimit := packet.GetEndSampleNumber() - int64(maxBufferSize*m.SampleRate) sampleLimit := packet.GetEndSampleNumber() - int64(maxBufferSize*m.SampleRate)
@ -306,6 +320,7 @@ func (m *StreamMount) handlePacket(packet packetizer.Packet) {
func (m *StreamMount) Process(group *sync.WaitGroup) { func (m *StreamMount) Process(group *sync.WaitGroup) {
defer group.Done() defer group.Done()
defer func() { defer func() {
//Teardown all listeners
m.listenersLock.Lock() m.listenersLock.Lock()
for _, l := range m.listeners { for _, l := range m.listeners {
l.Close() l.Close()

View file

@ -402,6 +402,17 @@ func (q *Queue) Remove(identifier queue.QueueIdentifier) bool {
return false return false
} }
func (q *Queue) RemoveListener(identifier string) bool {
q.mutex.RLock()
defer q.mutex.RUnlock()
for _, mount := range q.mounts {
if mount.RemoveListener(identifier) {
return true
}
}
return false
}
func (q *Queue) GetListeners() (listeners []*ListenerInformation) { func (q *Queue) GetListeners() (listeners []*ListenerInformation) {
q.mutex.RLock() q.mutex.RLock()
defer q.mutex.RUnlock() defer q.mutex.RUnlock()
@ -595,7 +606,7 @@ func (q *Queue) HandleRadioRequest(writer http.ResponseWriter, request *http.Req
writeChannel := make(chan []byte, byteSliceChannelBuffer) writeChannel := make(chan []byte, byteSliceChannelBuffer)
requestDone := struct { requestDone := struct {
Done atomic.Bool Done uint32
Lock sync.Mutex Lock sync.Mutex
Error error Error error
}{} }{}
@ -609,7 +620,7 @@ func (q *Queue) HandleRadioRequest(writer http.ResponseWriter, request *http.Req
writer.Header().Set("Content-Type", "application/x-audio-packet-stream") writer.Header().Set("Content-Type", "application/x-audio-packet-stream")
packetWriteCallback = func(packet packetizer.Packet) error { packetWriteCallback = func(packet packetizer.Packet) error {
if requestDone.Done.Load() { if atomic.LoadUint32(&requestDone.Done) == 1 {
return requestDone.Error return requestDone.Error
} }
if metadataPacket, ok := packet.(*QueueMetadataPacket); ok { if metadataPacket, ok := packet.(*QueueMetadataPacket); ok {
@ -621,7 +632,7 @@ func (q *Queue) HandleRadioRequest(writer http.ResponseWriter, request *http.Req
requestDone.Lock.Lock() requestDone.Lock.Lock()
defer requestDone.Lock.Unlock() defer requestDone.Lock.Unlock()
requestDone.Error = errors.New("client ran out of buffer") requestDone.Error = errors.New("client ran out of buffer")
requestDone.Done.Store(true) atomic.StoreUint32(&requestDone.Done, 1)
return requestDone.Error return requestDone.Error
} }
@ -638,7 +649,7 @@ func (q *Queue) HandleRadioRequest(writer http.ResponseWriter, request *http.Req
requestDone.Lock.Lock() requestDone.Lock.Lock()
defer requestDone.Lock.Unlock() defer requestDone.Lock.Unlock()
requestDone.Error = errors.New("client ran out of buffer") requestDone.Error = errors.New("client ran out of buffer")
requestDone.Done.Store(true) atomic.StoreUint32(&requestDone.Done, 1)
return requestDone.Error return requestDone.Error
} }
@ -657,7 +668,7 @@ func (q *Queue) HandleRadioRequest(writer http.ResponseWriter, request *http.Req
requestDone.Lock.Lock() requestDone.Lock.Lock()
defer requestDone.Lock.Unlock() defer requestDone.Lock.Unlock()
requestDone.Error = errors.New("client ran out of buffer") requestDone.Error = errors.New("client ran out of buffer")
requestDone.Done.Store(true) atomic.StoreUint32(&requestDone.Done, 1)
return requestDone.Error return requestDone.Error
} }
@ -738,7 +749,7 @@ func (q *Queue) HandleRadioRequest(writer http.ResponseWriter, request *http.Req
var streamStartOffset int64 = -1 var streamStartOffset int64 = -1
packetWriteCallback = func(packet packetizer.Packet) error { packetWriteCallback = func(packet packetizer.Packet) error {
if requestDone.Done.Load() { if atomic.LoadUint32(&requestDone.Done) == 1 {
return requestDone.Error return requestDone.Error
} }
if metadataPacket, ok := packet.(*QueueMetadataPacket); ok { if metadataPacket, ok := packet.(*QueueMetadataPacket); ok {
@ -788,7 +799,7 @@ func (q *Queue) HandleRadioRequest(writer http.ResponseWriter, request *http.Req
requestDone.Lock.Lock() requestDone.Lock.Lock()
defer requestDone.Lock.Unlock() defer requestDone.Lock.Unlock()
requestDone.Error = errors.New("client ran out of buffer") requestDone.Error = errors.New("client ran out of buffer")
requestDone.Done.Store(true) atomic.StoreUint32(&requestDone.Done, 1)
return requestDone.Error return requestDone.Error
} }
writeChannel <- data writeChannel <- data
@ -797,7 +808,7 @@ func (q *Queue) HandleRadioRequest(writer http.ResponseWriter, request *http.Req
} else { } else {
var streamStartOffset int64 = -1 var streamStartOffset int64 = -1
packetWriteCallback = func(packet packetizer.Packet) error { packetWriteCallback = func(packet packetizer.Packet) error {
if requestDone.Done.Load() { if atomic.LoadUint32(&requestDone.Done) == 1 {
return requestDone.Error return requestDone.Error
} }
if _, ok := packet.(*QueueMetadataPacket); ok { if _, ok := packet.(*QueueMetadataPacket); ok {
@ -808,7 +819,7 @@ func (q *Queue) HandleRadioRequest(writer http.ResponseWriter, request *http.Req
requestDone.Lock.Lock() requestDone.Lock.Lock()
defer requestDone.Lock.Unlock() defer requestDone.Lock.Unlock()
requestDone.Error = errors.New("client ran out of buffer") requestDone.Error = errors.New("client ran out of buffer")
requestDone.Done.Store(true) atomic.StoreUint32(&requestDone.Done, 1)
return requestDone.Error return requestDone.Error
} }
@ -844,7 +855,7 @@ func (q *Queue) HandleRadioRequest(writer http.ResponseWriter, request *http.Req
requestDone.Lock.Lock() requestDone.Lock.Lock()
defer requestDone.Lock.Unlock() defer requestDone.Lock.Unlock()
requestDone.Error = errors.New("client ran out of buffer") requestDone.Error = errors.New("client ran out of buffer")
requestDone.Done.Store(true) atomic.StoreUint32(&requestDone.Done, 1)
return return
} }
//try flush //try flush
@ -905,6 +916,9 @@ func (q *Queue) HandleRadioRequest(writer http.ResponseWriter, request *http.Req
hashSum := sha256.Sum256([]byte(fmt.Sprintf("%s-%s-%s-%s", request.RequestURI, request.RemoteAddr, request.Proto, request.Header.Get("user-agent")))) hashSum := sha256.Sum256([]byte(fmt.Sprintf("%s-%s-%s-%s", request.RequestURI, request.RemoteAddr, request.Proto, request.Header.Get("user-agent"))))
listenerIdentifier := hex.EncodeToString(hashSum[16:]) listenerIdentifier := hex.EncodeToString(hashSum[16:])
writer.Header().Set("x-listener-identifier", listenerIdentifier)
mount.AddListener(&StreamListener{ mount.AddListener(&StreamListener{
Information: ListenerInformation{ Information: ListenerInformation{
Identifier: listenerIdentifier, Identifier: listenerIdentifier,