Cleanup how listeners write data back to client
This commit is contained in:
parent
0e5c9c69c7
commit
dfc2be9bf2
|
@ -8,7 +8,6 @@ import (
|
||||||
"git.gammaspectra.live/S.O.N.G/Kirika/audio/packetizer"
|
"git.gammaspectra.live/S.O.N.G/Kirika/audio/packetizer"
|
||||||
"git.gammaspectra.live/S.O.N.G/MeteorLight/listener"
|
"git.gammaspectra.live/S.O.N.G/MeteorLight/listener"
|
||||||
"git.gammaspectra.live/S.O.N.G/MeteorLight/queue/metadata"
|
"git.gammaspectra.live/S.O.N.G/MeteorLight/queue/metadata"
|
||||||
"git.gammaspectra.live/S.O.N.G/MeteorLight/util"
|
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -18,13 +17,13 @@ type Listener struct {
|
||||||
streamStartOffset int64
|
streamStartOffset int64
|
||||||
streamSamplesBuffer int64
|
streamSamplesBuffer int64
|
||||||
|
|
||||||
ctx *util.RequestDone
|
writer listener.WriterFunc
|
||||||
sliceWriter chan []byte
|
waiter chan struct{}
|
||||||
offsetStart bool
|
offsetStart bool
|
||||||
headerBytes []byte
|
headerBytes []byte
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewListener(info listener.Information, ctx *util.RequestDone, sliceWriter chan []byte, samplesToBuffer int64, offsetStart bool, channels, sampleRate int, mimeType string) (*Listener, map[string]string) {
|
func NewListener(info listener.Information, writer listener.WriterFunc, samplesToBuffer int64, offsetStart bool, channels, sampleRate int, mimeType string) (*Listener, map[string]string) {
|
||||||
headers := make(map[string]string)
|
headers := make(map[string]string)
|
||||||
headers["x-audio-packet-stream"] = "1"
|
headers["x-audio-packet-stream"] = "1"
|
||||||
headers["content-type"] = "application/x-audio-packet-stream"
|
headers["content-type"] = "application/x-audio-packet-stream"
|
||||||
|
@ -38,15 +37,19 @@ func NewListener(info listener.Information, ctx *util.RequestDone, sliceWriter c
|
||||||
|
|
||||||
return &Listener{
|
return &Listener{
|
||||||
information: info,
|
information: info,
|
||||||
ctx: ctx,
|
writer: writer,
|
||||||
sliceWriter: sliceWriter,
|
|
||||||
offsetStart: offsetStart,
|
offsetStart: offsetStart,
|
||||||
streamSamplesBuffer: samplesToBuffer,
|
streamSamplesBuffer: samplesToBuffer,
|
||||||
streamStartOffset: -1,
|
streamStartOffset: -1,
|
||||||
headerBytes: headerBytes.Bytes(),
|
headerBytes: headerBytes.Bytes(),
|
||||||
|
waiter: make(chan struct{}),
|
||||||
}, headers
|
}, headers
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (l *Listener) Wait() {
|
||||||
|
<-l.waiter
|
||||||
|
}
|
||||||
|
|
||||||
func (l *Listener) Identifier() string {
|
func (l *Listener) Identifier() string {
|
||||||
return l.information.Identifier
|
return l.information.Identifier
|
||||||
}
|
}
|
||||||
|
@ -64,13 +67,15 @@ func (l *Listener) Start(packets []packetizer.Packet) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
l.sliceWriter <- (&packetStreamFrame{
|
if err := l.writer((&packetStreamFrame{
|
||||||
Type: Header,
|
Type: Header,
|
||||||
Category: 0,
|
Category: 0,
|
||||||
StartSampleNumber: 0,
|
StartSampleNumber: 0,
|
||||||
DurationInSamples: 0,
|
DurationInSamples: 0,
|
||||||
Data: l.headerBytes,
|
Data: l.headerBytes,
|
||||||
}).Encode()
|
}).Encode()); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
if len(packets) > 0 {
|
if len(packets) > 0 {
|
||||||
sampleBufferMin := packets[len(packets)-1].GetStartSampleNumber() - l.streamSamplesBuffer
|
sampleBufferMin := packets[len(packets)-1].GetStartSampleNumber() - l.streamSamplesBuffer
|
||||||
|
@ -86,50 +91,36 @@ func (l *Listener) Start(packets []packetizer.Packet) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *Listener) Write(packet packetizer.Packet) error {
|
func (l *Listener) Write(packet packetizer.Packet) error {
|
||||||
if l.ctx.Done() {
|
|
||||||
return l.ctx.Error()
|
|
||||||
}
|
|
||||||
|
|
||||||
if metadataPacket, ok := packet.(*metadata.Packet); ok {
|
if metadataPacket, ok := packet.(*metadata.Packet); ok {
|
||||||
|
|
||||||
queueInfoBuf := make([]byte, binary.MaxVarintLen64)
|
queueInfoBuf := make([]byte, binary.MaxVarintLen64)
|
||||||
n := binary.PutVarint(queueInfoBuf, int64(metadataPacket.TrackEntry.Identifier))
|
n := binary.PutVarint(queueInfoBuf, int64(metadataPacket.TrackEntry.Identifier))
|
||||||
|
|
||||||
if len(l.sliceWriter) >= (cap(l.sliceWriter) - 1) {
|
if err := l.writer((&packetStreamFrame{
|
||||||
l.ctx.Fail(errors.New("client ran out of writer buffer"))
|
|
||||||
return l.ctx.Error()
|
|
||||||
}
|
|
||||||
|
|
||||||
l.sliceWriter <- (&packetStreamFrame{
|
|
||||||
Type: TrackIdentifier,
|
Type: TrackIdentifier,
|
||||||
Category: packet.Category(),
|
Category: packet.Category(),
|
||||||
StartSampleNumber: packet.GetStartSampleNumber(),
|
StartSampleNumber: packet.GetStartSampleNumber(),
|
||||||
DurationInSamples: packet.GetEndSampleNumber() - packet.GetStartSampleNumber(),
|
DurationInSamples: packet.GetEndSampleNumber() - packet.GetStartSampleNumber(),
|
||||||
Data: queueInfoBuf[:n],
|
Data: queueInfoBuf[:n],
|
||||||
}).Encode()
|
}).Encode()); err != nil {
|
||||||
|
return err
|
||||||
if metadataBytes, err := json.Marshal(metadataPacket.TrackEntry.Metadata); err == nil {
|
|
||||||
if len(l.sliceWriter) >= (cap(l.sliceWriter) - 1) {
|
|
||||||
l.ctx.Fail(errors.New("client ran out of writer buffer"))
|
|
||||||
return l.ctx.Error()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
l.sliceWriter <- (&packetStreamFrame{
|
if metadataBytes, err := json.Marshal(metadataPacket.TrackEntry.Metadata); err == nil {
|
||||||
|
|
||||||
|
if err := l.writer((&packetStreamFrame{
|
||||||
Type: TrackMetadata,
|
Type: TrackMetadata,
|
||||||
Category: packet.Category(),
|
Category: packet.Category(),
|
||||||
StartSampleNumber: packet.GetStartSampleNumber(),
|
StartSampleNumber: packet.GetStartSampleNumber(),
|
||||||
DurationInSamples: packet.GetEndSampleNumber() - packet.GetStartSampleNumber(),
|
DurationInSamples: packet.GetEndSampleNumber() - packet.GetStartSampleNumber(),
|
||||||
Data: metadataBytes,
|
Data: metadataBytes,
|
||||||
}).Encode()
|
}).Encode()); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(l.sliceWriter) >= (cap(l.sliceWriter) - 1) {
|
|
||||||
l.ctx.Fail(errors.New("client ran out of writer buffer"))
|
|
||||||
return l.ctx.Error()
|
|
||||||
}
|
|
||||||
|
|
||||||
var frameType PacketStreamType
|
var frameType PacketStreamType
|
||||||
switch packet.KeepMode() {
|
switch packet.KeepMode() {
|
||||||
case packetizer.KeepLast:
|
case packetizer.KeepLast:
|
||||||
|
@ -146,19 +137,18 @@ func (l *Listener) Write(packet packetizer.Packet) error {
|
||||||
return errors.New("unknown KeepMode")
|
return errors.New("unknown KeepMode")
|
||||||
}
|
}
|
||||||
|
|
||||||
l.sliceWriter <- (&packetStreamFrame{
|
return l.writer((&packetStreamFrame{
|
||||||
Type: frameType,
|
Type: frameType,
|
||||||
Category: packet.Category(),
|
Category: packet.Category(),
|
||||||
StartSampleNumber: packet.GetStartSampleNumber(),
|
StartSampleNumber: packet.GetStartSampleNumber(),
|
||||||
DurationInSamples: packet.GetEndSampleNumber() - packet.GetStartSampleNumber(),
|
DurationInSamples: packet.GetEndSampleNumber() - packet.GetStartSampleNumber(),
|
||||||
Data: packet.GetData(),
|
Data: packet.GetData(),
|
||||||
}).Encode()
|
}).Encode())
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *Listener) Close() {
|
func (l *Listener) Close() {
|
||||||
close(l.sliceWriter)
|
l.writer(nil)
|
||||||
|
close(l.waiter)
|
||||||
}
|
}
|
||||||
|
|
||||||
type PacketStreamType uint64
|
type PacketStreamType uint64
|
||||||
|
|
|
@ -1,12 +1,10 @@
|
||||||
package icy
|
package icy
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"git.gammaspectra.live/S.O.N.G/Kirika/audio/packetizer"
|
"git.gammaspectra.live/S.O.N.G/Kirika/audio/packetizer"
|
||||||
"git.gammaspectra.live/S.O.N.G/MeteorLight/listener"
|
"git.gammaspectra.live/S.O.N.G/MeteorLight/listener"
|
||||||
"git.gammaspectra.live/S.O.N.G/MeteorLight/queue/metadata"
|
"git.gammaspectra.live/S.O.N.G/MeteorLight/queue/metadata"
|
||||||
"git.gammaspectra.live/S.O.N.G/MeteorLight/util"
|
|
||||||
"strconv"
|
"strconv"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
)
|
)
|
||||||
|
@ -22,28 +20,32 @@ type Listener struct {
|
||||||
|
|
||||||
counter int
|
counter int
|
||||||
|
|
||||||
ctx *util.RequestDone
|
writer listener.WriterFunc
|
||||||
sliceWriter chan []byte
|
waiter chan struct{}
|
||||||
offsetStart bool
|
offsetStart bool
|
||||||
|
|
||||||
pendingMetadata map[string]string
|
pendingMetadata map[string]string
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewListener(info listener.Information, ctx *util.RequestDone, sliceWriter chan []byte, samplesToBuffer int64, offsetStart bool) (*Listener, map[string]string) {
|
func NewListener(info listener.Information, writer listener.WriterFunc, samplesToBuffer int64, offsetStart bool) (*Listener, map[string]string) {
|
||||||
headers := make(map[string]string)
|
headers := make(map[string]string)
|
||||||
headers["icy-metaint"] = strconv.Itoa(icyInterval)
|
headers["icy-metaint"] = strconv.Itoa(icyInterval)
|
||||||
|
|
||||||
return &Listener{
|
return &Listener{
|
||||||
information: info,
|
information: info,
|
||||||
ctx: ctx,
|
writer: writer,
|
||||||
sliceWriter: sliceWriter,
|
|
||||||
offsetStart: offsetStart,
|
offsetStart: offsetStart,
|
||||||
streamSamplesBuffer: samplesToBuffer,
|
streamSamplesBuffer: samplesToBuffer,
|
||||||
streamStartOffset: -1,
|
streamStartOffset: -1,
|
||||||
pendingMetadata: make(map[string]string),
|
pendingMetadata: make(map[string]string),
|
||||||
|
waiter: make(chan struct{}),
|
||||||
}, headers
|
}, headers
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (l *Listener) Wait() {
|
||||||
|
<-l.waiter
|
||||||
|
}
|
||||||
|
|
||||||
func (l *Listener) Identifier() string {
|
func (l *Listener) Identifier() string {
|
||||||
return l.information.Identifier
|
return l.information.Identifier
|
||||||
}
|
}
|
||||||
|
@ -101,9 +103,6 @@ func (l *Listener) writeIcy() []byte {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *Listener) Write(packet packetizer.Packet) error {
|
func (l *Listener) Write(packet packetizer.Packet) error {
|
||||||
if l.ctx.Done() {
|
|
||||||
return l.ctx.Error()
|
|
||||||
}
|
|
||||||
|
|
||||||
if metadataPacket, ok := packet.(*metadata.Packet); ok {
|
if metadataPacket, ok := packet.(*metadata.Packet); ok {
|
||||||
if len(metadataPacket.TrackEntry.Artist()) > 0 {
|
if len(metadataPacket.TrackEntry.Artist()) > 0 {
|
||||||
|
@ -148,15 +147,10 @@ func (l *Listener) Write(packet packetizer.Packet) error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(l.sliceWriter) >= (cap(l.sliceWriter) - 1) {
|
return l.writer(data)
|
||||||
l.ctx.Fail(errors.New("client ran out of writer buffer"))
|
|
||||||
return l.ctx.Error()
|
|
||||||
}
|
|
||||||
|
|
||||||
l.sliceWriter <- data
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *Listener) Close() {
|
func (l *Listener) Close() {
|
||||||
close(l.sliceWriter)
|
l.writer(nil)
|
||||||
|
close(l.waiter)
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,14 +22,7 @@ type Listener interface {
|
||||||
Start(packets []packetizer.Packet) error
|
Start(packets []packetizer.Packet) error
|
||||||
Write(packet packetizer.Packet) error
|
Write(packet packetizer.Packet) error
|
||||||
Close()
|
Close()
|
||||||
|
Wait()
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
type WriterFunc func(data []byte) error
|
||||||
type Listener struct {
|
|
||||||
Information ListenerInformation
|
|
||||||
Start func(packets []packetizer.Packet) error
|
|
||||||
Write func(packet packetizer.Packet) error
|
|
||||||
Close func()
|
|
||||||
}
|
|
||||||
|
|
||||||
*/
|
|
||||||
|
|
|
@ -1,11 +1,9 @@
|
||||||
package plain
|
package plain
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
|
||||||
"git.gammaspectra.live/S.O.N.G/Kirika/audio/packetizer"
|
"git.gammaspectra.live/S.O.N.G/Kirika/audio/packetizer"
|
||||||
"git.gammaspectra.live/S.O.N.G/MeteorLight/listener"
|
"git.gammaspectra.live/S.O.N.G/MeteorLight/listener"
|
||||||
"git.gammaspectra.live/S.O.N.G/MeteorLight/queue/metadata"
|
"git.gammaspectra.live/S.O.N.G/MeteorLight/queue/metadata"
|
||||||
"git.gammaspectra.live/S.O.N.G/MeteorLight/util"
|
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -15,22 +13,26 @@ type Listener struct {
|
||||||
streamStartOffset int64
|
streamStartOffset int64
|
||||||
streamSamplesBuffer int64
|
streamSamplesBuffer int64
|
||||||
|
|
||||||
ctx *util.RequestDone
|
writer listener.WriterFunc
|
||||||
sliceWriter chan []byte
|
waiter chan struct{}
|
||||||
offsetStart bool
|
offsetStart bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewListener(info listener.Information, ctx *util.RequestDone, sliceWriter chan []byte, samplesToBuffer int64, offsetStart bool) (*Listener, map[string]string) {
|
func NewListener(info listener.Information, writer listener.WriterFunc, samplesToBuffer int64, offsetStart bool) (*Listener, map[string]string) {
|
||||||
return &Listener{
|
return &Listener{
|
||||||
information: info,
|
information: info,
|
||||||
ctx: ctx,
|
writer: writer,
|
||||||
sliceWriter: sliceWriter,
|
|
||||||
offsetStart: offsetStart,
|
offsetStart: offsetStart,
|
||||||
streamSamplesBuffer: samplesToBuffer,
|
streamSamplesBuffer: samplesToBuffer,
|
||||||
streamStartOffset: -1,
|
streamStartOffset: -1,
|
||||||
|
waiter: make(chan struct{}),
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (l *Listener) Wait() {
|
||||||
|
<-l.waiter
|
||||||
|
}
|
||||||
|
|
||||||
func (l *Listener) Identifier() string {
|
func (l *Listener) Identifier() string {
|
||||||
return l.information.Identifier
|
return l.information.Identifier
|
||||||
}
|
}
|
||||||
|
@ -62,37 +64,27 @@ func (l *Listener) Start(packets []packetizer.Packet) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *Listener) Write(packet packetizer.Packet) error {
|
func (l *Listener) Write(packet packetizer.Packet) error {
|
||||||
if l.ctx.Done() {
|
|
||||||
return l.ctx.Error()
|
|
||||||
}
|
|
||||||
|
|
||||||
if _, ok := packet.(*metadata.Packet); ok {
|
if _, ok := packet.(*metadata.Packet); ok {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(l.sliceWriter) >= (cap(l.sliceWriter) - 1) {
|
|
||||||
l.ctx.Fail(errors.New("client ran out of writer buffer"))
|
|
||||||
return l.ctx.Error()
|
|
||||||
}
|
|
||||||
|
|
||||||
if offsetable, ok := packet.(packetizer.OffsetablePacket); l.offsetStart && ok {
|
if offsetable, ok := packet.(packetizer.OffsetablePacket); l.offsetStart && ok {
|
||||||
if l.streamStartOffset <= -1 {
|
if l.streamStartOffset <= -1 {
|
||||||
if offsetable.KeepMode() != packetizer.Keep {
|
if offsetable.KeepMode() != packetizer.Keep {
|
||||||
l.streamStartOffset = offsetable.GetStartSampleNumber()
|
l.streamStartOffset = offsetable.GetStartSampleNumber()
|
||||||
l.sliceWriter <- offsetable.GetDataOffset(l.streamStartOffset)
|
return l.writer(offsetable.GetDataOffset(l.streamStartOffset))
|
||||||
} else {
|
} else {
|
||||||
l.sliceWriter <- packet.GetData()
|
return l.writer(packet.GetData())
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
l.sliceWriter <- offsetable.GetDataOffset(l.streamStartOffset)
|
return l.writer(offsetable.GetDataOffset(l.streamStartOffset))
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
l.sliceWriter <- packet.GetData()
|
return l.writer(packet.GetData())
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *Listener) Close() {
|
func (l *Listener) Close() {
|
||||||
close(l.sliceWriter)
|
l.writer(nil)
|
||||||
|
close(l.waiter)
|
||||||
}
|
}
|
||||||
|
|
|
@ -409,10 +409,6 @@ func (q *Queue) HandleRadioRequest(writer http.ResponseWriter, request *http.Req
|
||||||
writer.Header().Set("x-audiocast-public", "1")
|
writer.Header().Set("x-audiocast-public", "1")
|
||||||
}
|
}
|
||||||
|
|
||||||
//buffer a bit, drop channels when buffer grows to not lock others. They will get disconnected elsewhere
|
|
||||||
const byteSliceChannelBuffer = 1024 * 16
|
|
||||||
writeChannel := make(chan []byte, byteSliceChannelBuffer)
|
|
||||||
|
|
||||||
requestDone := util.RequestDone{}
|
requestDone := util.RequestDone{}
|
||||||
|
|
||||||
var headers []listener.HeaderEntry
|
var headers []listener.HeaderEntry
|
||||||
|
@ -476,31 +472,12 @@ func (q *Queue) HandleRadioRequest(writer http.ResponseWriter, request *http.Req
|
||||||
var mountListener listener.Listener
|
var mountListener listener.Listener
|
||||||
var extraHeaders map[string]string
|
var extraHeaders map[string]string
|
||||||
|
|
||||||
//set X-Audio-Packet-Stream for strictly timed packets and metadata
|
|
||||||
if numberValue, err := strconv.Atoi(request.Header.Get("x-audio-packet-stream")); err == nil && numberValue == 1 {
|
|
||||||
//version 1
|
|
||||||
mountListener, extraHeaders = aps1.NewListener(listenerInformation, &requestDone, writeChannel, sampleBufferLimit, mount.OffsetStart, mount.Channels, mount.SampleRate, mount.MimeType)
|
|
||||||
} else if numberValue, err = strconv.Atoi(request.Header.Get("icy-metadata")); err == nil && numberValue >= 1 {
|
|
||||||
mountListener, extraHeaders = icy.NewListener(listenerInformation, &requestDone, writeChannel, sampleBufferLimit, mount.OffsetStart)
|
|
||||||
} else {
|
|
||||||
mountListener, extraHeaders = plain.NewListener(listenerInformation, &requestDone, writeChannel, sampleBufferLimit, mount.OffsetStart)
|
|
||||||
}
|
|
||||||
|
|
||||||
if mountListener == nil {
|
|
||||||
writer.WriteHeader(http.StatusInternalServerError)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
for k, v := range extraHeaders {
|
|
||||||
writer.Header().Set(k, v)
|
|
||||||
}
|
|
||||||
|
|
||||||
writer.Header().Set("x-listener-identifier", listenerIdentifier)
|
|
||||||
|
|
||||||
log.Printf("adding %s client to stream %s (%s, %s, agent \"%s\", buffer %.2f seconds)\n", listenerIdentifier, mount.Mount, request.RemoteAddr, request.Proto, request.Header.Get("user-agent"), float64(sampleBufferLimit)/float64(mount.SampleRate))
|
|
||||||
mount.AddListener(mountListener)
|
|
||||||
|
|
||||||
ctx := request.Context()
|
ctx := request.Context()
|
||||||
|
//buffer a bit, drop channels when buffer grows to not lock others. They will get disconnected elsewhere
|
||||||
|
const byteSliceChannelBuffer = 1024 * 16
|
||||||
|
writeChannel := make(chan []byte, byteSliceChannelBuffer)
|
||||||
|
|
||||||
|
go func() {
|
||||||
var flusher http.Flusher
|
var flusher http.Flusher
|
||||||
if httpFlusher, ok := writer.(http.Flusher); ok {
|
if httpFlusher, ok := writer.(http.Flusher); ok {
|
||||||
flusher = httpFlusher
|
flusher = httpFlusher
|
||||||
|
@ -523,6 +500,52 @@ func (q *Queue) HandleRadioRequest(writer http.ResponseWriter, request *http.Req
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
funcWriter := func(byteSlice []byte) error {
|
||||||
|
if requestDone.Done() {
|
||||||
|
return requestDone.Error()
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(byteSlice) > 0 {
|
||||||
|
if len(writeChannel) >= (cap(writeChannel) - 1) {
|
||||||
|
requestDone.Fail(errors.New("client ran out of writer buffer"))
|
||||||
|
return requestDone.Error()
|
||||||
|
}
|
||||||
|
|
||||||
|
writeChannel <- byteSlice
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
//set X-Audio-Packet-Stream for strictly timed packets and metadata
|
||||||
|
if numberValue, err := strconv.Atoi(request.Header.Get("x-audio-packet-stream")); err == nil && numberValue == 1 {
|
||||||
|
//version 1
|
||||||
|
mountListener, extraHeaders = aps1.NewListener(listenerInformation, funcWriter, sampleBufferLimit, mount.OffsetStart, mount.Channels, mount.SampleRate, mount.MimeType)
|
||||||
|
} else if numberValue, err = strconv.Atoi(request.Header.Get("icy-metadata")); err == nil && numberValue >= 1 {
|
||||||
|
mountListener, extraHeaders = icy.NewListener(listenerInformation, funcWriter, sampleBufferLimit, mount.OffsetStart)
|
||||||
|
} else {
|
||||||
|
mountListener, extraHeaders = plain.NewListener(listenerInformation, funcWriter, sampleBufferLimit, mount.OffsetStart)
|
||||||
|
}
|
||||||
|
|
||||||
|
if mountListener == nil {
|
||||||
|
writer.WriteHeader(http.StatusInternalServerError)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
for k, v := range extraHeaders {
|
||||||
|
writer.Header().Set(k, v)
|
||||||
|
}
|
||||||
|
|
||||||
|
writer.Header().Set("x-listener-identifier", listenerIdentifier)
|
||||||
|
|
||||||
|
log.Printf("adding %s client to stream %s (%s, %s, agent \"%s\", buffer %.2f seconds)\n", listenerIdentifier, mount.Mount, request.RemoteAddr, request.Proto, request.Header.Get("user-agent"), float64(sampleBufferLimit)/float64(mount.SampleRate))
|
||||||
|
mount.AddListener(mountListener)
|
||||||
|
|
||||||
|
mountListener.Wait()
|
||||||
|
|
||||||
|
requestDone.Complete()
|
||||||
|
|
||||||
log.Printf("removing %s client from stream %s\n", listenerIdentifier, mount.Mount)
|
log.Printf("removing %s client from stream %s\n", listenerIdentifier, mount.Mount)
|
||||||
return
|
return
|
||||||
|
|
Loading…
Reference in a new issue