Fix harmless data races, add connection identifier to listener information

This commit is contained in:
DataHoarder 2022-07-21 16:58:07 +02:00
parent afdde985f2
commit 41c86cab4a
Signed by: DataHoarder
SSH key fingerprint: SHA256:OLTRf6Fl87G52SiR7sWLGNzlJt4WOX+tfI2yxo0z7xk
2 changed files with 88 additions and 52 deletions

View file

@ -21,9 +21,10 @@ type HeaderEntry struct {
} }
type ListenerInformation struct { type ListenerInformation struct {
Mount string `json:"mount"` Identifier string `json:"identifier"`
Path string `json:"path"` Mount string `json:"mount"`
Headers []HeaderEntry `json:"headers"` Path string `json:"path"`
Headers []HeaderEntry `json:"headers"`
} }
type StreamListener struct { type StreamListener struct {
@ -43,7 +44,7 @@ type StreamMount struct {
OffsetStart bool OffsetStart bool
MetadataQueue *goconcurrentqueue.FIFO MetadataQueue *goconcurrentqueue.FIFO
listeners []*StreamListener listeners []*StreamListener
listenersLock sync.Mutex listenersLock sync.RWMutex
keepBuffer []packetizer.Packet keepBuffer []packetizer.Packet
} }
@ -248,28 +249,38 @@ func (m *StreamMount) GetListeners() (entries []*ListenerInformation) {
func (m *StreamMount) handlePacket(packet packetizer.Packet) { func (m *StreamMount) handlePacket(packet packetizer.Packet) {
var toRemove []int var toRemove []int
//TODO: do this via goroutine messaging? //TODO: do this via goroutine messaging?
for i, l := range m.listeners { func() {
if l.Start != nil { m.listenersLock.RLock()
l.Start(m.keepBuffer) defer m.listenersLock.RUnlock()
l.Start = nil var err error
for i, l := range m.listeners {
if l.Start != nil {
l.Start(m.keepBuffer)
l.Start = nil
}
if err = l.Write(packet); err != nil {
log.Printf("failed to write data to %s client: %s\n", l.Information.Identifier, err)
toRemove = append(toRemove, i)
}
} }
if l.Write(packet) != nil {
toRemove = append(toRemove, i) }()
}
}
if len(toRemove) > 0 { if len(toRemove) > 0 {
m.listenersLock.Lock() func() {
//TODO: remove more than one per iteration m.listenersLock.Lock()
for _, i := range toRemove { defer m.listenersLock.Unlock()
l := m.listeners[i] //TODO: remove more than one per iteration
m.listeners = append(m.listeners[:i], m.listeners[i+1:]...) for _, i := range toRemove {
l.Close() l := m.listeners[i]
break m.listeners = append(m.listeners[:i], m.listeners[i+1:]...)
} l.Close()
m.listenersLock.Unlock() break
toRemove = toRemove[:0] }
toRemove = toRemove[:0]
}()
} }
sampleLimit := packet.GetEndSampleNumber() - int64(maxBufferSize*m.SampleRate) sampleLimit := packet.GetEndSampleNumber() - int64(maxBufferSize*m.SampleRate)

View file

@ -2,7 +2,9 @@ package main
import ( import (
"bytes" "bytes"
"crypto/sha256"
"encoding/binary" "encoding/binary"
"encoding/hex"
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
@ -414,7 +416,7 @@ func (q *Queue) GetListeners() (listeners []*ListenerInformation) {
type PacketStreamType uint64 type PacketStreamType uint64
//PacketStreamType The order of these fields is important and set on-wire protocol // PacketStreamType The order of these fields is important and set on-wire protocol
const ( const (
Header = PacketStreamType(iota) Header = PacketStreamType(iota)
DataKeepLast DataKeepLast
@ -591,7 +593,12 @@ func (q *Queue) HandleRadioRequest(writer http.ResponseWriter, request *http.Req
//buffer a bit, drop channels when buffer grows to not lock others. They will get disconnected elsewhere //buffer a bit, drop channels when buffer grows to not lock others. They will get disconnected elsewhere
const byteSliceChannelBuffer = 1024 * 16 const byteSliceChannelBuffer = 1024 * 16
writeChannel := make(chan []byte, byteSliceChannelBuffer) writeChannel := make(chan []byte, byteSliceChannelBuffer)
var requestDone error
requestDone := struct {
Done atomic.Bool
Lock sync.Mutex
Error error
}{}
var wgClient sync.WaitGroup var wgClient sync.WaitGroup
//set X-Audio-Packet-Stream for strictly timed packets and metadata //set X-Audio-Packet-Stream for strictly timed packets and metadata
@ -602,8 +609,8 @@ func (q *Queue) HandleRadioRequest(writer http.ResponseWriter, request *http.Req
writer.Header().Set("Content-Type", "application/x-audio-packet-stream") writer.Header().Set("Content-Type", "application/x-audio-packet-stream")
packetWriteCallback = func(packet packetizer.Packet) error { packetWriteCallback = func(packet packetizer.Packet) error {
if requestDone != nil { if requestDone.Done.Load() {
return requestDone return requestDone.Error
} }
if metadataPacket, ok := packet.(*QueueMetadataPacket); ok { if metadataPacket, ok := packet.(*QueueMetadataPacket); ok {
@ -611,9 +618,11 @@ func (q *Queue) HandleRadioRequest(writer http.ResponseWriter, request *http.Req
n := binary.PutVarint(queueInfoBuf, int64(metadataPacket.TrackEntry.QueueIdentifier)) n := binary.PutVarint(queueInfoBuf, int64(metadataPacket.TrackEntry.QueueIdentifier))
if len(writeChannel) >= (byteSliceChannelBuffer - 1) { if len(writeChannel) >= (byteSliceChannelBuffer - 1) {
requestDone = errors.New("client ran out of buffer") requestDone.Lock.Lock()
log.Printf("failed to write data to client: %s\n", requestDone) defer requestDone.Lock.Unlock()
return requestDone requestDone.Error = errors.New("client ran out of buffer")
requestDone.Done.Store(true)
return requestDone.Error
} }
writeChannel <- (&packetStreamFrame{ writeChannel <- (&packetStreamFrame{
@ -626,9 +635,11 @@ func (q *Queue) HandleRadioRequest(writer http.ResponseWriter, request *http.Req
if metadataBytes, err := json.Marshal(metadataPacket.TrackEntry.Metadata); err == nil { if metadataBytes, err := json.Marshal(metadataPacket.TrackEntry.Metadata); err == nil {
if len(writeChannel) >= (byteSliceChannelBuffer - 1) { if len(writeChannel) >= (byteSliceChannelBuffer - 1) {
requestDone = errors.New("client ran out of buffer") requestDone.Lock.Lock()
log.Printf("failed to write data to client: %s\n", requestDone) defer requestDone.Lock.Unlock()
return requestDone requestDone.Error = errors.New("client ran out of buffer")
requestDone.Done.Store(true)
return requestDone.Error
} }
writeChannel <- (&packetStreamFrame{ writeChannel <- (&packetStreamFrame{
@ -643,9 +654,11 @@ func (q *Queue) HandleRadioRequest(writer http.ResponseWriter, request *http.Req
} }
if len(writeChannel) >= (byteSliceChannelBuffer - 1) { if len(writeChannel) >= (byteSliceChannelBuffer - 1) {
requestDone = errors.New("client ran out of buffer") requestDone.Lock.Lock()
log.Printf("failed to write data to client: %s\n", requestDone) defer requestDone.Lock.Unlock()
return requestDone requestDone.Error = errors.New("client ran out of buffer")
requestDone.Done.Store(true)
return requestDone.Error
} }
//TODO: category //TODO: category
@ -725,8 +738,8 @@ func (q *Queue) HandleRadioRequest(writer http.ResponseWriter, request *http.Req
var streamStartOffset int64 = -1 var streamStartOffset int64 = -1
packetWriteCallback = func(packet packetizer.Packet) error { packetWriteCallback = func(packet packetizer.Packet) error {
if requestDone != nil { if requestDone.Done.Load() {
return requestDone return requestDone.Error
} }
if metadataPacket, ok := packet.(*QueueMetadataPacket); ok { if metadataPacket, ok := packet.(*QueueMetadataPacket); ok {
if len(metadataPacket.TrackEntry.Metadata.Artist) > 0 { if len(metadataPacket.TrackEntry.Metadata.Artist) > 0 {
@ -772,8 +785,11 @@ func (q *Queue) HandleRadioRequest(writer http.ResponseWriter, request *http.Req
} }
if len(writeChannel) >= (byteSliceChannelBuffer - 1) { if len(writeChannel) >= (byteSliceChannelBuffer - 1) {
requestDone = errors.New("client ran out of buffer") requestDone.Lock.Lock()
return requestDone defer requestDone.Lock.Unlock()
requestDone.Error = errors.New("client ran out of buffer")
requestDone.Done.Store(true)
return requestDone.Error
} }
writeChannel <- data writeChannel <- data
return nil return nil
@ -781,17 +797,19 @@ func (q *Queue) HandleRadioRequest(writer http.ResponseWriter, request *http.Req
} else { } else {
var streamStartOffset int64 = -1 var streamStartOffset int64 = -1
packetWriteCallback = func(packet packetizer.Packet) error { packetWriteCallback = func(packet packetizer.Packet) error {
if requestDone != nil { if requestDone.Done.Load() {
return requestDone return requestDone.Error
} }
if _, ok := packet.(*QueueMetadataPacket); ok { if _, ok := packet.(*QueueMetadataPacket); ok {
return nil return nil
} }
if len(writeChannel) >= (byteSliceChannelBuffer - 1) { if len(writeChannel) >= (byteSliceChannelBuffer - 1) {
requestDone = errors.New("client ran out of buffer") requestDone.Lock.Lock()
log.Printf("failed to write data to client: %s\n", requestDone) defer requestDone.Lock.Unlock()
return requestDone requestDone.Error = errors.New("client ran out of buffer")
requestDone.Done.Store(true)
return requestDone.Error
} }
if offsetable, ok := packet.(packetizer.OffsetablePacket); mount.OffsetStart && ok { if offsetable, ok := packet.(packetizer.OffsetablePacket); mount.OffsetStart && ok {
@ -822,9 +840,12 @@ func (q *Queue) HandleRadioRequest(writer http.ResponseWriter, request *http.Req
} }
for byteSlice := range writeChannel { for byteSlice := range writeChannel {
if _, requestDone = writer.Write(byteSlice); requestDone != nil { if _, err := writer.Write(byteSlice); err != nil {
log.Printf("failed to write data to client: %s\n", requestDone) requestDone.Lock.Lock()
break defer requestDone.Lock.Unlock()
requestDone.Error = errors.New("client ran out of buffer")
requestDone.Done.Store(true)
return
} }
//try flush //try flush
if flusher != nil { if flusher != nil {
@ -881,14 +902,18 @@ func (q *Queue) HandleRadioRequest(writer http.ResponseWriter, request *http.Req
} }
wgClient.Add(1) wgClient.Add(1)
hashSum := sha256.Sum256([]byte(fmt.Sprintf("%s-%s-%s-%s", request.RequestURI, request.RemoteAddr, request.Proto, request.Header.Get("user-agent"))))
listenerIdentifier := hex.EncodeToString(hashSum[16:])
mount.AddListener(&StreamListener{ mount.AddListener(&StreamListener{
Information: ListenerInformation{ Information: ListenerInformation{
Mount: mount.Mount, Identifier: listenerIdentifier,
Path: uriPath, Mount: mount.Mount,
Headers: headers, Path: uriPath,
Headers: headers,
}, },
Start: func(packets []packetizer.Packet) error { Start: func(packets []packetizer.Packet) error {
log.Printf("adding %s client to stream %s\n", request.RemoteAddr, mount.Mount) log.Printf("adding %s client to stream %s (%s, %s, agent \"%s\")\n", listenerIdentifier, mount.Mount, request.RemoteAddr, request.Proto, request.Header.Get("user-agent"))
if len(packets) > 0 { if len(packets) > 0 {
sampleBufferMin := packets[len(packets)-1].GetStartSampleNumber() - sampleBufferLimit sampleBufferMin := packets[len(packets)-1].GetStartSampleNumber() - sampleBufferLimit
for _, p := range packets { for _, p := range packets {
@ -903,7 +928,7 @@ func (q *Queue) HandleRadioRequest(writer http.ResponseWriter, request *http.Req
}, },
Write: packetWriteCallback, Write: packetWriteCallback,
Close: func() { Close: func() {
log.Printf("removing %s client from stream %s\n", request.RemoteAddr, mount.Mount) log.Printf("removing %s client from stream %s\n", listenerIdentifier, mount.Mount)
defer wgClient.Done() defer wgClient.Done()
close(writeChannel) close(writeChannel)
}, },