From 41c86cab4ae86d768b41dd3d97aabe22bf91fe45 Mon Sep 17 00:00:00 2001 From: WeebDataHoarder <57538841+WeebDataHoarder@users.noreply.github.com> Date: Thu, 21 Jul 2022 16:58:07 +0200 Subject: [PATCH] Fix harmless data races, add connection identifier to listener information --- mount.go | 55 +++++++++++++++++++++--------------- queue.go | 85 ++++++++++++++++++++++++++++++++++++-------------------- 2 files changed, 88 insertions(+), 52 deletions(-) diff --git a/mount.go b/mount.go index 4c5d4e2..4ba8b0b 100644 --- a/mount.go +++ b/mount.go @@ -21,9 +21,10 @@ type HeaderEntry struct { } type ListenerInformation struct { - Mount string `json:"mount"` - Path string `json:"path"` - Headers []HeaderEntry `json:"headers"` + Identifier string `json:"identifier"` + Mount string `json:"mount"` + Path string `json:"path"` + Headers []HeaderEntry `json:"headers"` } type StreamListener struct { @@ -43,7 +44,7 @@ type StreamMount struct { OffsetStart bool MetadataQueue *goconcurrentqueue.FIFO listeners []*StreamListener - listenersLock sync.Mutex + listenersLock sync.RWMutex keepBuffer []packetizer.Packet } @@ -248,28 +249,38 @@ func (m *StreamMount) GetListeners() (entries []*ListenerInformation) { func (m *StreamMount) handlePacket(packet packetizer.Packet) { var toRemove []int + //TODO: do this via goroutine messaging? - for i, l := range m.listeners { - if l.Start != nil { - l.Start(m.keepBuffer) - l.Start = nil + func() { + m.listenersLock.RLock() + defer m.listenersLock.RUnlock() + var err error + for i, l := range m.listeners { + if l.Start != nil { + l.Start(m.keepBuffer) + l.Start = nil + } + if err = l.Write(packet); err != nil { + log.Printf("failed to write data to %s client: %s\n", l.Information.Identifier, err) + toRemove = append(toRemove, i) + } } - if l.Write(packet) != nil { - toRemove = append(toRemove, i) - } - } + + }() if len(toRemove) > 0 { - m.listenersLock.Lock() - //TODO: remove more than one per iteration - for _, i := range toRemove { - l := m.listeners[i] - m.listeners = append(m.listeners[:i], m.listeners[i+1:]...) - l.Close() - break - } - m.listenersLock.Unlock() - toRemove = toRemove[:0] + func() { + m.listenersLock.Lock() + defer m.listenersLock.Unlock() + //TODO: remove more than one per iteration + for _, i := range toRemove { + l := m.listeners[i] + m.listeners = append(m.listeners[:i], m.listeners[i+1:]...) + l.Close() + break + } + toRemove = toRemove[:0] + }() } sampleLimit := packet.GetEndSampleNumber() - int64(maxBufferSize*m.SampleRate) diff --git a/queue.go b/queue.go index 24519bd..7629547 100644 --- a/queue.go +++ b/queue.go @@ -2,7 +2,9 @@ package main import ( "bytes" + "crypto/sha256" "encoding/binary" + "encoding/hex" "encoding/json" "errors" "fmt" @@ -414,7 +416,7 @@ func (q *Queue) GetListeners() (listeners []*ListenerInformation) { type PacketStreamType uint64 -//PacketStreamType The order of these fields is important and set on-wire protocol +// PacketStreamType The order of these fields is important and set on-wire protocol const ( Header = PacketStreamType(iota) DataKeepLast @@ -591,7 +593,12 @@ func (q *Queue) HandleRadioRequest(writer http.ResponseWriter, request *http.Req //buffer a bit, drop channels when buffer grows to not lock others. They will get disconnected elsewhere const byteSliceChannelBuffer = 1024 * 16 writeChannel := make(chan []byte, byteSliceChannelBuffer) - var requestDone error + + requestDone := struct { + Done atomic.Bool + Lock sync.Mutex + Error error + }{} var wgClient sync.WaitGroup //set X-Audio-Packet-Stream for strictly timed packets and metadata @@ -602,8 +609,8 @@ func (q *Queue) HandleRadioRequest(writer http.ResponseWriter, request *http.Req writer.Header().Set("Content-Type", "application/x-audio-packet-stream") packetWriteCallback = func(packet packetizer.Packet) error { - if requestDone != nil { - return requestDone + if requestDone.Done.Load() { + return requestDone.Error } if metadataPacket, ok := packet.(*QueueMetadataPacket); ok { @@ -611,9 +618,11 @@ func (q *Queue) HandleRadioRequest(writer http.ResponseWriter, request *http.Req n := binary.PutVarint(queueInfoBuf, int64(metadataPacket.TrackEntry.QueueIdentifier)) if len(writeChannel) >= (byteSliceChannelBuffer - 1) { - requestDone = errors.New("client ran out of buffer") - log.Printf("failed to write data to client: %s\n", requestDone) - return requestDone + requestDone.Lock.Lock() + defer requestDone.Lock.Unlock() + requestDone.Error = errors.New("client ran out of buffer") + requestDone.Done.Store(true) + return requestDone.Error } writeChannel <- (&packetStreamFrame{ @@ -626,9 +635,11 @@ func (q *Queue) HandleRadioRequest(writer http.ResponseWriter, request *http.Req if metadataBytes, err := json.Marshal(metadataPacket.TrackEntry.Metadata); err == nil { if len(writeChannel) >= (byteSliceChannelBuffer - 1) { - requestDone = errors.New("client ran out of buffer") - log.Printf("failed to write data to client: %s\n", requestDone) - return requestDone + requestDone.Lock.Lock() + defer requestDone.Lock.Unlock() + requestDone.Error = errors.New("client ran out of buffer") + requestDone.Done.Store(true) + return requestDone.Error } writeChannel <- (&packetStreamFrame{ @@ -643,9 +654,11 @@ func (q *Queue) HandleRadioRequest(writer http.ResponseWriter, request *http.Req } if len(writeChannel) >= (byteSliceChannelBuffer - 1) { - requestDone = errors.New("client ran out of buffer") - log.Printf("failed to write data to client: %s\n", requestDone) - return requestDone + requestDone.Lock.Lock() + defer requestDone.Lock.Unlock() + requestDone.Error = errors.New("client ran out of buffer") + requestDone.Done.Store(true) + return requestDone.Error } //TODO: category @@ -725,8 +738,8 @@ func (q *Queue) HandleRadioRequest(writer http.ResponseWriter, request *http.Req var streamStartOffset int64 = -1 packetWriteCallback = func(packet packetizer.Packet) error { - if requestDone != nil { - return requestDone + if requestDone.Done.Load() { + return requestDone.Error } if metadataPacket, ok := packet.(*QueueMetadataPacket); ok { if len(metadataPacket.TrackEntry.Metadata.Artist) > 0 { @@ -772,8 +785,11 @@ func (q *Queue) HandleRadioRequest(writer http.ResponseWriter, request *http.Req } if len(writeChannel) >= (byteSliceChannelBuffer - 1) { - requestDone = errors.New("client ran out of buffer") - return requestDone + requestDone.Lock.Lock() + defer requestDone.Lock.Unlock() + requestDone.Error = errors.New("client ran out of buffer") + requestDone.Done.Store(true) + return requestDone.Error } writeChannel <- data return nil @@ -781,17 +797,19 @@ func (q *Queue) HandleRadioRequest(writer http.ResponseWriter, request *http.Req } else { var streamStartOffset int64 = -1 packetWriteCallback = func(packet packetizer.Packet) error { - if requestDone != nil { - return requestDone + if requestDone.Done.Load() { + return requestDone.Error } if _, ok := packet.(*QueueMetadataPacket); ok { return nil } if len(writeChannel) >= (byteSliceChannelBuffer - 1) { - requestDone = errors.New("client ran out of buffer") - log.Printf("failed to write data to client: %s\n", requestDone) - return requestDone + requestDone.Lock.Lock() + defer requestDone.Lock.Unlock() + requestDone.Error = errors.New("client ran out of buffer") + requestDone.Done.Store(true) + return requestDone.Error } if offsetable, ok := packet.(packetizer.OffsetablePacket); mount.OffsetStart && ok { @@ -822,9 +840,12 @@ func (q *Queue) HandleRadioRequest(writer http.ResponseWriter, request *http.Req } for byteSlice := range writeChannel { - if _, requestDone = writer.Write(byteSlice); requestDone != nil { - log.Printf("failed to write data to client: %s\n", requestDone) - break + if _, err := writer.Write(byteSlice); err != nil { + requestDone.Lock.Lock() + defer requestDone.Lock.Unlock() + requestDone.Error = errors.New("client ran out of buffer") + requestDone.Done.Store(true) + return } //try flush if flusher != nil { @@ -881,14 +902,18 @@ func (q *Queue) HandleRadioRequest(writer http.ResponseWriter, request *http.Req } wgClient.Add(1) + + hashSum := sha256.Sum256([]byte(fmt.Sprintf("%s-%s-%s-%s", request.RequestURI, request.RemoteAddr, request.Proto, request.Header.Get("user-agent")))) + listenerIdentifier := hex.EncodeToString(hashSum[16:]) mount.AddListener(&StreamListener{ Information: ListenerInformation{ - Mount: mount.Mount, - Path: uriPath, - Headers: headers, + Identifier: listenerIdentifier, + Mount: mount.Mount, + Path: uriPath, + Headers: headers, }, Start: func(packets []packetizer.Packet) error { - log.Printf("adding %s client to stream %s\n", request.RemoteAddr, mount.Mount) + log.Printf("adding %s client to stream %s (%s, %s, agent \"%s\")\n", listenerIdentifier, mount.Mount, request.RemoteAddr, request.Proto, request.Header.Get("user-agent")) if len(packets) > 0 { sampleBufferMin := packets[len(packets)-1].GetStartSampleNumber() - sampleBufferLimit for _, p := range packets { @@ -903,7 +928,7 @@ func (q *Queue) HandleRadioRequest(writer http.ResponseWriter, request *http.Req }, Write: packetWriteCallback, Close: func() { - log.Printf("removing %s client from stream %s\n", request.RemoteAddr, mount.Mount) + log.Printf("removing %s client from stream %s\n", listenerIdentifier, mount.Mount) defer wgClient.Done() close(writeChannel) },