From dfc2be9bf200b81c10dd5fc26b965fd2aff85e1d Mon Sep 17 00:00:00 2001 From: WeebDataHoarder <57538841+WeebDataHoarder@users.noreply.github.com> Date: Sun, 2 Oct 2022 15:19:51 +0200 Subject: [PATCH] Cleanup how listeners write data back to client --- listener/aps1/aps1.go | 60 +++++++++++++----------------- listener/icy/icy.go | 30 ++++++--------- listener/listener.go | 11 +----- listener/plain/plain.go | 38 ++++++++----------- queue/queue.go | 81 ++++++++++++++++++++++++++--------------- 5 files changed, 106 insertions(+), 114 deletions(-) diff --git a/listener/aps1/aps1.go b/listener/aps1/aps1.go index ccdef33..ac665b4 100644 --- a/listener/aps1/aps1.go +++ b/listener/aps1/aps1.go @@ -8,7 +8,6 @@ import ( "git.gammaspectra.live/S.O.N.G/Kirika/audio/packetizer" "git.gammaspectra.live/S.O.N.G/MeteorLight/listener" "git.gammaspectra.live/S.O.N.G/MeteorLight/queue/metadata" - "git.gammaspectra.live/S.O.N.G/MeteorLight/util" "sync/atomic" ) @@ -18,13 +17,13 @@ type Listener struct { streamStartOffset int64 streamSamplesBuffer int64 - ctx *util.RequestDone - sliceWriter chan []byte + writer listener.WriterFunc + waiter chan struct{} offsetStart bool headerBytes []byte } -func NewListener(info listener.Information, ctx *util.RequestDone, sliceWriter chan []byte, samplesToBuffer int64, offsetStart bool, channels, sampleRate int, mimeType string) (*Listener, map[string]string) { +func NewListener(info listener.Information, writer listener.WriterFunc, samplesToBuffer int64, offsetStart bool, channels, sampleRate int, mimeType string) (*Listener, map[string]string) { headers := make(map[string]string) headers["x-audio-packet-stream"] = "1" headers["content-type"] = "application/x-audio-packet-stream" @@ -38,15 +37,19 @@ func NewListener(info listener.Information, ctx *util.RequestDone, sliceWriter c return &Listener{ information: info, - ctx: ctx, - sliceWriter: sliceWriter, + writer: writer, offsetStart: offsetStart, streamSamplesBuffer: samplesToBuffer, streamStartOffset: -1, headerBytes: headerBytes.Bytes(), + waiter: make(chan struct{}), }, headers } +func (l *Listener) Wait() { + <-l.waiter +} + func (l *Listener) Identifier() string { return l.information.Identifier } @@ -64,13 +67,15 @@ func (l *Listener) Start(packets []packetizer.Packet) error { return nil } - l.sliceWriter <- (&packetStreamFrame{ + if err := l.writer((&packetStreamFrame{ Type: Header, Category: 0, StartSampleNumber: 0, DurationInSamples: 0, Data: l.headerBytes, - }).Encode() + }).Encode()); err != nil { + return err + } if len(packets) > 0 { sampleBufferMin := packets[len(packets)-1].GetStartSampleNumber() - l.streamSamplesBuffer @@ -86,50 +91,36 @@ func (l *Listener) Start(packets []packetizer.Packet) error { } func (l *Listener) Write(packet packetizer.Packet) error { - if l.ctx.Done() { - return l.ctx.Error() - } - if metadataPacket, ok := packet.(*metadata.Packet); ok { queueInfoBuf := make([]byte, binary.MaxVarintLen64) n := binary.PutVarint(queueInfoBuf, int64(metadataPacket.TrackEntry.Identifier)) - if len(l.sliceWriter) >= (cap(l.sliceWriter) - 1) { - l.ctx.Fail(errors.New("client ran out of writer buffer")) - return l.ctx.Error() - } - - l.sliceWriter <- (&packetStreamFrame{ + if err := l.writer((&packetStreamFrame{ Type: TrackIdentifier, Category: packet.Category(), StartSampleNumber: packet.GetStartSampleNumber(), DurationInSamples: packet.GetEndSampleNumber() - packet.GetStartSampleNumber(), Data: queueInfoBuf[:n], - }).Encode() + }).Encode()); err != nil { + return err + } if metadataBytes, err := json.Marshal(metadataPacket.TrackEntry.Metadata); err == nil { - if len(l.sliceWriter) >= (cap(l.sliceWriter) - 1) { - l.ctx.Fail(errors.New("client ran out of writer buffer")) - return l.ctx.Error() - } - l.sliceWriter <- (&packetStreamFrame{ + if err := l.writer((&packetStreamFrame{ Type: TrackMetadata, Category: packet.Category(), StartSampleNumber: packet.GetStartSampleNumber(), DurationInSamples: packet.GetEndSampleNumber() - packet.GetStartSampleNumber(), Data: metadataBytes, - }).Encode() + }).Encode()); err != nil { + return err + } } return nil } - if len(l.sliceWriter) >= (cap(l.sliceWriter) - 1) { - l.ctx.Fail(errors.New("client ran out of writer buffer")) - return l.ctx.Error() - } - var frameType PacketStreamType switch packet.KeepMode() { case packetizer.KeepLast: @@ -146,19 +137,18 @@ func (l *Listener) Write(packet packetizer.Packet) error { return errors.New("unknown KeepMode") } - l.sliceWriter <- (&packetStreamFrame{ + return l.writer((&packetStreamFrame{ Type: frameType, Category: packet.Category(), StartSampleNumber: packet.GetStartSampleNumber(), DurationInSamples: packet.GetEndSampleNumber() - packet.GetStartSampleNumber(), Data: packet.GetData(), - }).Encode() - - return nil + }).Encode()) } func (l *Listener) Close() { - close(l.sliceWriter) + l.writer(nil) + close(l.waiter) } type PacketStreamType uint64 diff --git a/listener/icy/icy.go b/listener/icy/icy.go index 922287f..415c868 100644 --- a/listener/icy/icy.go +++ b/listener/icy/icy.go @@ -1,12 +1,10 @@ package icy import ( - "errors" "fmt" "git.gammaspectra.live/S.O.N.G/Kirika/audio/packetizer" "git.gammaspectra.live/S.O.N.G/MeteorLight/listener" "git.gammaspectra.live/S.O.N.G/MeteorLight/queue/metadata" - "git.gammaspectra.live/S.O.N.G/MeteorLight/util" "strconv" "sync/atomic" ) @@ -22,28 +20,32 @@ type Listener struct { counter int - ctx *util.RequestDone - sliceWriter chan []byte + writer listener.WriterFunc + waiter chan struct{} offsetStart bool pendingMetadata map[string]string } -func NewListener(info listener.Information, ctx *util.RequestDone, sliceWriter chan []byte, samplesToBuffer int64, offsetStart bool) (*Listener, map[string]string) { +func NewListener(info listener.Information, writer listener.WriterFunc, samplesToBuffer int64, offsetStart bool) (*Listener, map[string]string) { headers := make(map[string]string) headers["icy-metaint"] = strconv.Itoa(icyInterval) return &Listener{ information: info, - ctx: ctx, - sliceWriter: sliceWriter, + writer: writer, offsetStart: offsetStart, streamSamplesBuffer: samplesToBuffer, streamStartOffset: -1, pendingMetadata: make(map[string]string), + waiter: make(chan struct{}), }, headers } +func (l *Listener) Wait() { + <-l.waiter +} + func (l *Listener) Identifier() string { return l.information.Identifier } @@ -101,9 +103,6 @@ func (l *Listener) writeIcy() []byte { } func (l *Listener) Write(packet packetizer.Packet) error { - if l.ctx.Done() { - return l.ctx.Error() - } if metadataPacket, ok := packet.(*metadata.Packet); ok { if len(metadataPacket.TrackEntry.Artist()) > 0 { @@ -148,15 +147,10 @@ func (l *Listener) Write(packet packetizer.Packet) error { } } - if len(l.sliceWriter) >= (cap(l.sliceWriter) - 1) { - l.ctx.Fail(errors.New("client ran out of writer buffer")) - return l.ctx.Error() - } - - l.sliceWriter <- data - return nil + return l.writer(data) } func (l *Listener) Close() { - close(l.sliceWriter) + l.writer(nil) + close(l.waiter) } diff --git a/listener/listener.go b/listener/listener.go index 29ccb54..ccd26de 100644 --- a/listener/listener.go +++ b/listener/listener.go @@ -22,14 +22,7 @@ type Listener interface { Start(packets []packetizer.Packet) error Write(packet packetizer.Packet) error Close() + Wait() } -/* -type Listener struct { - Information ListenerInformation - Start func(packets []packetizer.Packet) error - Write func(packet packetizer.Packet) error - Close func() -} - -*/ +type WriterFunc func(data []byte) error diff --git a/listener/plain/plain.go b/listener/plain/plain.go index c32aa7e..3355def 100644 --- a/listener/plain/plain.go +++ b/listener/plain/plain.go @@ -1,11 +1,9 @@ package plain import ( - "errors" "git.gammaspectra.live/S.O.N.G/Kirika/audio/packetizer" "git.gammaspectra.live/S.O.N.G/MeteorLight/listener" "git.gammaspectra.live/S.O.N.G/MeteorLight/queue/metadata" - "git.gammaspectra.live/S.O.N.G/MeteorLight/util" "sync/atomic" ) @@ -15,22 +13,26 @@ type Listener struct { streamStartOffset int64 streamSamplesBuffer int64 - ctx *util.RequestDone - sliceWriter chan []byte + writer listener.WriterFunc + waiter chan struct{} offsetStart bool } -func NewListener(info listener.Information, ctx *util.RequestDone, sliceWriter chan []byte, samplesToBuffer int64, offsetStart bool) (*Listener, map[string]string) { +func NewListener(info listener.Information, writer listener.WriterFunc, samplesToBuffer int64, offsetStart bool) (*Listener, map[string]string) { return &Listener{ information: info, - ctx: ctx, - sliceWriter: sliceWriter, + writer: writer, offsetStart: offsetStart, streamSamplesBuffer: samplesToBuffer, streamStartOffset: -1, + waiter: make(chan struct{}), }, nil } +func (l *Listener) Wait() { + <-l.waiter +} + func (l *Listener) Identifier() string { return l.information.Identifier } @@ -62,37 +64,27 @@ func (l *Listener) Start(packets []packetizer.Packet) error { } func (l *Listener) Write(packet packetizer.Packet) error { - if l.ctx.Done() { - return l.ctx.Error() - } - if _, ok := packet.(*metadata.Packet); ok { return nil } - if len(l.sliceWriter) >= (cap(l.sliceWriter) - 1) { - l.ctx.Fail(errors.New("client ran out of writer buffer")) - return l.ctx.Error() - } - if offsetable, ok := packet.(packetizer.OffsetablePacket); l.offsetStart && ok { if l.streamStartOffset <= -1 { if offsetable.KeepMode() != packetizer.Keep { l.streamStartOffset = offsetable.GetStartSampleNumber() - l.sliceWriter <- offsetable.GetDataOffset(l.streamStartOffset) + return l.writer(offsetable.GetDataOffset(l.streamStartOffset)) } else { - l.sliceWriter <- packet.GetData() + return l.writer(packet.GetData()) } } else { - l.sliceWriter <- offsetable.GetDataOffset(l.streamStartOffset) + return l.writer(offsetable.GetDataOffset(l.streamStartOffset)) } } else { - l.sliceWriter <- packet.GetData() + return l.writer(packet.GetData()) } - - return nil } func (l *Listener) Close() { - close(l.sliceWriter) + l.writer(nil) + close(l.waiter) } diff --git a/queue/queue.go b/queue/queue.go index 43bca73..88c690b 100644 --- a/queue/queue.go +++ b/queue/queue.go @@ -409,10 +409,6 @@ func (q *Queue) HandleRadioRequest(writer http.ResponseWriter, request *http.Req writer.Header().Set("x-audiocast-public", "1") } - //buffer a bit, drop channels when buffer grows to not lock others. They will get disconnected elsewhere - const byteSliceChannelBuffer = 1024 * 16 - writeChannel := make(chan []byte, byteSliceChannelBuffer) - requestDone := util.RequestDone{} var headers []listener.HeaderEntry @@ -476,14 +472,61 @@ func (q *Queue) HandleRadioRequest(writer http.ResponseWriter, request *http.Req var mountListener listener.Listener var extraHeaders map[string]string + ctx := request.Context() + //buffer a bit, drop channels when buffer grows to not lock others. They will get disconnected elsewhere + const byteSliceChannelBuffer = 1024 * 16 + writeChannel := make(chan []byte, byteSliceChannelBuffer) + + go func() { + var flusher http.Flusher + if httpFlusher, ok := writer.(http.Flusher); ok { + flusher = httpFlusher + } + + for !requestDone.Done() { + select { + case byteSlice := <-writeChannel: + if _, err := writer.Write(byteSlice); err != nil { + requestDone.Fail(err) + break + } + //try flush + if flusher != nil { + flusher.Flush() + } + case <-ctx.Done(): + requestDone.Fail(ctx.Err()) + break + } + + } + }() + + funcWriter := func(byteSlice []byte) error { + if requestDone.Done() { + return requestDone.Error() + } + + if len(byteSlice) > 0 { + if len(writeChannel) >= (cap(writeChannel) - 1) { + requestDone.Fail(errors.New("client ran out of writer buffer")) + return requestDone.Error() + } + + writeChannel <- byteSlice + } + + return nil + } + //set X-Audio-Packet-Stream for strictly timed packets and metadata if numberValue, err := strconv.Atoi(request.Header.Get("x-audio-packet-stream")); err == nil && numberValue == 1 { //version 1 - mountListener, extraHeaders = aps1.NewListener(listenerInformation, &requestDone, writeChannel, sampleBufferLimit, mount.OffsetStart, mount.Channels, mount.SampleRate, mount.MimeType) + mountListener, extraHeaders = aps1.NewListener(listenerInformation, funcWriter, sampleBufferLimit, mount.OffsetStart, mount.Channels, mount.SampleRate, mount.MimeType) } else if numberValue, err = strconv.Atoi(request.Header.Get("icy-metadata")); err == nil && numberValue >= 1 { - mountListener, extraHeaders = icy.NewListener(listenerInformation, &requestDone, writeChannel, sampleBufferLimit, mount.OffsetStart) + mountListener, extraHeaders = icy.NewListener(listenerInformation, funcWriter, sampleBufferLimit, mount.OffsetStart) } else { - mountListener, extraHeaders = plain.NewListener(listenerInformation, &requestDone, writeChannel, sampleBufferLimit, mount.OffsetStart) + mountListener, extraHeaders = plain.NewListener(listenerInformation, funcWriter, sampleBufferLimit, mount.OffsetStart) } if mountListener == nil { @@ -500,29 +543,9 @@ func (q *Queue) HandleRadioRequest(writer http.ResponseWriter, request *http.Req log.Printf("adding %s client to stream %s (%s, %s, agent \"%s\", buffer %.2f seconds)\n", listenerIdentifier, mount.Mount, request.RemoteAddr, request.Proto, request.Header.Get("user-agent"), float64(sampleBufferLimit)/float64(mount.SampleRate)) mount.AddListener(mountListener) - ctx := request.Context() - var flusher http.Flusher - if httpFlusher, ok := writer.(http.Flusher); ok { - flusher = httpFlusher - } + mountListener.Wait() - for !requestDone.Done() { - select { - case byteSlice := <-writeChannel: - if _, err := writer.Write(byteSlice); err != nil { - requestDone.Fail(err) - break - } - //try flush - if flusher != nil { - flusher.Flush() - } - case <-ctx.Done(): - requestDone.Fail(ctx.Err()) - break - } - - } + requestDone.Complete() log.Printf("removing %s client from stream %s\n", listenerIdentifier, mount.Mount) return