MeteorLight/queue.go

995 lines
29 KiB
Go

package main
import (
"bytes"
"crypto/sha256"
"encoding/binary"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"git.gammaspectra.live/S.O.N.G/Kirika/audio"
"git.gammaspectra.live/S.O.N.G/Kirika/audio/filter"
"git.gammaspectra.live/S.O.N.G/Kirika/audio/format/guess"
"git.gammaspectra.live/S.O.N.G/Kirika/audio/packetizer"
"git.gammaspectra.live/S.O.N.G/Kirika/audio/queue"
"git.gammaspectra.live/S.O.N.G/Kirika/audio/replaygain"
"github.com/dhowden/tag"
"io"
"log"
"net/http"
"os"
"path"
"runtime"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
)
const maxBufferSize = 10
type QueueTrackEntry struct {
QueueIdentifier queue.QueueIdentifier
Path string
Metadata struct {
Title interface{} `json:"title"`
Album interface{} `json:"album"`
Artist interface{} `json:"artist"`
Art string `json:"art"`
ReplayGain struct {
TrackPeak float64 `json:"track_peak"`
TrackGain float64 `json:"track_gain"`
AlbumPeak float64 `json:"album_peak"`
AlbumGain float64 `json:"album_gain"`
} `json:"replay_gain,omitempty"`
}
reader io.ReadSeekCloser
source audio.Source
original map[string]interface{}
}
func (e *QueueTrackEntry) Title() string {
if strVal, ok := e.Metadata.Title.(string); ok {
return strVal
} else if intVal, ok := e.Metadata.Title.(int); ok {
return strconv.Itoa(intVal)
}
return ""
}
func (e *QueueTrackEntry) Album() string {
if strVal, ok := e.Metadata.Album.(string); ok {
return strVal
} else if intVal, ok := e.Metadata.Album.(int); ok {
return strconv.Itoa(intVal)
}
return ""
}
func (e *QueueTrackEntry) Artist() string {
if strVal, ok := e.Metadata.Artist.(string); ok {
return strVal
} else if intVal, ok := e.Metadata.Artist.(int); ok {
return strconv.Itoa(intVal)
}
return ""
}
func (e *QueueTrackEntry) Load() error {
if e.source != nil {
return nil
}
fileName := path.Base(e.Path)
if len(e.Path) > 4 && e.Path[:4] == "http" {
s, err := NewRangeReadSeekCloser(e.Path)
if err != nil {
return err
}
fileName = s.GetFileName()
runtime.SetFinalizer(s, (*RangeReadSeekCloser).Close)
e.reader = s
} else {
f, err := os.Open(e.Path)
if err != nil {
return err
}
runtime.SetFinalizer(f, (*os.File).Close)
e.reader = f
}
if e.reader == nil {
return errors.New("could not find stream opener")
}
meta, err := tag.ReadFrom(e.reader)
if err != nil {
err = nil
}
if _, err = e.reader.Seek(0, io.SeekStart); err != nil {
return err
}
decoders, err := guess.GetDecoders(e.reader, fileName)
if err != nil {
return err
}
source, err := guess.Open(e.reader, decoders)
if err != nil {
return err
}
if source == nil {
return fmt.Errorf("could not find decoder for %s", e.Path)
}
e.source = source
//apply tags found on file
if meta != nil {
if e.Title() == "" {
e.Metadata.Title = meta.Title()
}
if e.Album() == "" {
e.Metadata.Album = meta.Album()
}
if e.Artist() == "" {
e.Metadata.Artist = meta.Artist()
}
if e.Artist() == "" {
e.Metadata.Artist = meta.AlbumArtist()
}
tags := meta.Raw()
var strValue string
var value interface{}
var ok bool
getDb := func(strValue string) (ret float64) {
ret, _ = strconv.ParseFloat(strings.TrimSpace(strings.TrimSuffix(strings.ToLower(strValue), "db")), 64)
return
}
if e.Metadata.ReplayGain.TrackPeak == 0 {
if value, ok = tags["replaygain_track_gain"]; ok {
if strValue, ok = value.(string); ok {
e.Metadata.ReplayGain.TrackGain = getDb(strValue)
}
}
if value, ok = tags["replaygain_track_peak"]; ok {
if strValue, ok = value.(string); ok {
e.Metadata.ReplayGain.TrackPeak = getDb(strValue)
}
}
if value, ok = tags["replaygain_album_gain"]; ok {
if strValue, ok = value.(string); ok {
e.Metadata.ReplayGain.AlbumGain = getDb(strValue)
}
}
if value, ok = tags["replaygain_album_peak"]; ok {
if strValue, ok = value.(string); ok {
e.Metadata.ReplayGain.AlbumPeak = getDb(strValue)
}
}
}
}
return nil
}
type QueueMetadataPacket struct {
sampleNumber int64
TrackEntry *QueueTrackEntry
}
func (p *QueueMetadataPacket) KeepMode() packetizer.KeepMode {
return packetizer.KeepLast
}
func (p *QueueMetadataPacket) GetStartSampleNumber() int64 {
return p.sampleNumber
}
func (p *QueueMetadataPacket) GetEndSampleNumber() int64 {
return p.sampleNumber
}
func (p *QueueMetadataPacket) Category() int64 {
return -1
}
func (p *QueueMetadataPacket) GetData() []byte {
return nil
}
type Queue struct {
NowPlaying chan *QueueTrackEntry
QueueEmpty chan *QueueTrackEntry
duration atomic.Int64
durationError int64
audioQueue *queue.Queue
mounts []*StreamMount
queue []*QueueTrackEntry
mutex sync.RWMutex
config *Config
wg sync.WaitGroup
}
func NewQueue(config *Config) *Queue {
if config.Queue.SampleRate <= 0 {
config.Queue.SampleRate = 44100
}
sampleFormat := audio.SourceInt16
bitDepth := 16
switch config.Queue.SampleFormat {
case "f32", "float", "float32", "f32le":
sampleFormat = audio.SourceFloat32
bitDepth = 0
case "i32", "s32", "int32", "int", "s32le":
sampleFormat = audio.SourceInt32
bitDepth = 32
case "i16", "s16", "int16", "s16le":
sampleFormat = audio.SourceInt16
bitDepth = 16
}
if config.Queue.BitDepth > 0 {
bitDepth = config.Queue.BitDepth
}
q := &Queue{
NowPlaying: make(chan *QueueTrackEntry, 1),
QueueEmpty: make(chan *QueueTrackEntry),
config: config,
audioQueue: queue.NewQueue(sampleFormat, bitDepth, config.Queue.SampleRate, 2),
}
blocksPerSecond := 20
sources := filter.NewFilterChain(q.audioQueue.GetSource(), filter.NewBufferFilter(16), filter.NewRealTimeFilter(blocksPerSecond), filter.NewBufferFilter(maxBufferSize*blocksPerSecond)).Split(len(config.Streams))
for i, s := range q.config.Streams {
mount := NewStreamMount(sources[i], s)
if mount == nil {
log.Panicf("could not initialize %s\n", s.MountPath)
}
q.mounts = append(q.mounts, mount)
q.wg.Add(1)
go mount.Process(&q.wg)
}
return q
}
func (q *Queue) GetDuration() time.Duration {
return time.Duration(q.duration.Load())
}
func (q *Queue) Wait() {
q.wg.Wait()
close(q.NowPlaying)
}
func (q *Queue) AddTrack(entry *QueueTrackEntry, tail bool) error {
if err := entry.Load(); err != nil {
return err
}
startCallback := func(queue *queue.Queue, queueEntry *queue.QueueEntry) {
if e := q.Get(queueEntry.Identifier); e != nil { //is this needed?
log.Printf("now playing \"%s\": %s - %s (%s)\n", e.Path, e.Metadata.Title, e.Metadata.Artist, e.Metadata.Album)
q.NowPlaying <- e
for _, mount := range q.mounts {
_ = mount.MetadataQueue.Enqueue(&QueueMetadataPacket{
//TODO: carry sample rate error
sampleNumber: (q.duration.Load() * int64(queue.GetSampleRate())) / int64(time.Second),
TrackEntry: e,
})
}
} else {
log.Printf("now playing \"%s\": %s - %s (%s)\n", entry.Path, entry.Metadata.Title, entry.Metadata.Artist, entry.Metadata.Album)
}
}
endCallback := func(queue *queue.Queue, entry *queue.QueueEntry) {
}
removeCallback := func(queue *queue.Queue, entry *queue.QueueEntry) {
//TODO: carry sample rate error
q.duration.Add(int64((time.Second * time.Duration(entry.ReadSamples.Load())) / time.Duration(entry.Source.GetSampleRate())))
q.Remove(entry.Identifier)
q.HandleQueue()
}
q.mutex.Lock()
defer q.mutex.Unlock()
if q.config.Queue.Length > 0 && len(q.queue) >= q.config.Queue.Length {
return errors.New("queue too long")
}
source := entry.source
if q.config.Queue.ReplayGain {
if entry.Metadata.ReplayGain.TrackPeak != 0 {
source = replaygain.NewReplayGainFilter(entry.Metadata.ReplayGain.TrackGain, entry.Metadata.ReplayGain.TrackPeak, 0).Process(source)
} else {
source = replaygain.NewNormalizationFilter(5).Process(source)
}
}
if tail {
entry.QueueIdentifier = q.audioQueue.AddTail(source, startCallback, endCallback, removeCallback)
} else {
entry.QueueIdentifier = q.audioQueue.AddHead(source, startCallback, endCallback, removeCallback)
}
entry.original["queue_id"] = entry.QueueIdentifier
if tail || len(q.queue) == 0 {
q.queue = append(q.queue, entry)
} else {
q.queue = append(q.queue[:1], append([]*QueueTrackEntry{entry}, q.queue[1:]...)...)
}
return nil
}
func (q *Queue) HandleQueue() {
if q.audioQueue.GetQueueSize() == 0 {
if err := q.AddTrack(<-q.QueueEmpty, true); err != nil {
log.Printf("track addition error: \"%s\"", err)
//TODO: maybe fail after n tries
time.Sleep(time.Second)
q.HandleQueue()
}
}
}
func (q *Queue) GetQueue() (result []*QueueTrackEntry) {
q.mutex.RLock()
defer q.mutex.RUnlock()
if len(q.queue) > 1 {
result = make([]*QueueTrackEntry, len(q.queue)-1)
copy(result, q.queue[1:])
}
return
}
func (q *Queue) Get(identifier queue.QueueIdentifier) *QueueTrackEntry {
q.mutex.RLock()
defer q.mutex.RUnlock()
for _, e := range q.queue {
if e.QueueIdentifier == identifier {
return e
}
}
return nil
}
func (q *Queue) GetNowPlaying() *QueueTrackEntry {
if e := q.audioQueue.GetQueueHead(); e != nil {
return q.Get(e.Identifier)
}
return nil
}
func (q *Queue) SkipNowPlaying() bool {
if e := q.audioQueue.GetQueueHead(); e != nil {
return q.Remove(e.Identifier)
}
return false
}
func (q *Queue) GetIndex(index int) *QueueTrackEntry {
if e := q.audioQueue.GetQueueIndex(index + 1); e != nil {
return q.Get(e.Identifier)
}
return nil
}
func (q *Queue) GetHead() *QueueTrackEntry {
if e := q.audioQueue.GetQueueIndex(1); e != nil {
return q.Get(e.Identifier)
}
return nil
}
func (q *Queue) GetTail() *QueueTrackEntry {
if i, e := q.audioQueue.GetQueueTail(); i != 0 && e != nil {
return q.Get(e.Identifier)
}
return nil
}
func (q *Queue) Remove(identifier queue.QueueIdentifier) bool {
q.mutex.Lock()
for i, e := range q.queue {
if e.QueueIdentifier == identifier {
q.queue = append(q.queue[:i], q.queue[i+1:]...)
q.mutex.Unlock()
q.audioQueue.Remove(identifier)
if e.reader != nil {
e.reader.Close()
}
return true
}
}
q.mutex.Unlock()
q.audioQueue.Remove(identifier)
return false
}
func (q *Queue) RemoveListener(identifier string) bool {
q.mutex.RLock()
defer q.mutex.RUnlock()
for _, mount := range q.mounts {
if mount.RemoveListener(identifier) {
return true
}
}
return false
}
func (q *Queue) GetListeners() (listeners []*ListenerInformation) {
q.mutex.RLock()
defer q.mutex.RUnlock()
listeners = make([]*ListenerInformation, 0, 1)
for _, mount := range q.mounts {
listeners = append(listeners, mount.GetListeners()...)
}
return
}
type PacketStreamType uint64
// PacketStreamType The order of these fields is important and set on-wire protocol
const (
Header = PacketStreamType(iota)
DataKeepLast
DataKeep
DataGroupKeep
DataGroupDiscard
DataDiscard
TrackIdentifier
TrackMetadata
)
type packetStreamFrame struct {
Type PacketStreamType
Category int64
StartSampleNumber int64
DurationInSamples int64
//automatically filled based on Data
//Size uint64
Data []byte
}
func (p *packetStreamFrame) Encode() []byte {
buf := new(bytes.Buffer)
varBuf := make([]byte, binary.MaxVarintLen64)
n := binary.PutUvarint(varBuf, uint64(p.Type))
buf.Write(varBuf[:n])
n = binary.PutUvarint(varBuf, uint64(p.Category))
buf.Write(varBuf[:n])
n = binary.PutVarint(varBuf, p.StartSampleNumber)
buf.Write(varBuf[:n])
n = binary.PutVarint(varBuf, p.DurationInSamples)
buf.Write(varBuf[:n])
n = binary.PutUvarint(varBuf, uint64(len(p.Data)))
buf.Write(varBuf[:n])
buf.Write(p.Data)
return buf.Bytes()
}
func (q *Queue) HandleRadioRequest(writer http.ResponseWriter, request *http.Request) {
writer.Header().Set("Server", "MeteorLight/radio")
writer.Header().Set("Connection", "close")
writer.Header().Set("X-Content-Type-Options", "nosniff")
writer.Header().Set("Access-Control-Allow-Origin", "*")
writer.Header().Set("Access-Control-Allow-Headers", "Accept, Content-Type, Icy-Metadata")
writer.Header().Set("Accept-Ranges", "none")
writer.Header().Set("Connection", "close")
if strings.HasSuffix(request.URL.Path, "mounts") {
writer.Header().Set("Content-Type", "application/json; charset=utf-8")
writer.Header().Set("Access-Control-Expose-Headers", "Accept-Ranges, Server, Content-Type")
type mountData struct {
Path string `json:"mount"`
MimeType string `json:"mime"`
FormatDescription string `json:"formatDescription"`
SampleRate int `json:"sampleRate"`
Channels int `json:"channels"`
Listeners int `json:"listeners"`
Options map[string]interface{} `json:"options"`
}
var mounts []mountData
for _, mount := range q.mounts {
mounts = append(mounts, mountData{
Path: strings.TrimSuffix(request.URL.Path, "mounts") + mount.Mount,
MimeType: mount.MimeType,
SampleRate: mount.SampleRate,
FormatDescription: mount.FormatDescription,
Channels: mount.Channels,
Listeners: len(mount.listeners),
Options: mount.Options,
})
}
jsonBytes, _ := json.MarshalIndent(mounts, "", " ")
writer.WriteHeader(http.StatusOK)
writer.Write(jsonBytes)
return
}
for _, mount := range q.mounts {
if strings.HasSuffix(request.URL.Path, mount.Mount) {
writer.Header().Set("Content-Type", mount.MimeType)
writer.Header().Set("Cache-Control", "no-store, max-age=604800")
writer.Header().Set("Access-Control-Expose-Headers", "Accept-Ranges, Server, Content-Type, Icy-MetaInt, X-Listener-Identifier")
writer.Header().Set("Vary", "*")
rangeHeader := request.Header.Get("range")
if rangeHeader != "" && rangeHeader != "bytes=0-" {
//TODO: maybe should fail in case bytes are requested
if strings.Index(request.UserAgent(), " Safari/") != -1 && mount.MimeType == "audio/flac" {
//Safari special case, fake Range check so it decodes afterwards.
//Safari creates a request with Range for 0-1, specifically for FLAC, and expects a result supporting range. Afterwards it requests the whole file.
//However the decoder is able to decode FLAC livestreams. If we fake the initial range response, then afterwards serve normal responses, Safari will happily work.
//TODO: remove this AS SOON as safari works on its own
//safariLargeFileValue arbitrary large value, cannot be that large or iOS Safari fails.
safariLargeFileValue := 1024 * 1024 * 1024 * 1024 * 16 // 16 TiB
if rangeHeader == "bytes=0-1" {
//first request
writer.Header().Set("Accept-Ranges", "bytes")
writer.Header().Set("Content-Range", fmt.Sprintf("bytes 0-1/%d", safariLargeFileValue)) //64 TiB max fake size
writer.Header().Set("Content-Length", "2")
writer.WriteHeader(http.StatusPartialContent)
writer.Write([]byte{'f', 'L'})
return
} else if rangeHeader == fmt.Sprintf("bytes=0-%d", safariLargeFileValue-1) {
//second request, serve status 200 to keep retries to a minimum
writer.Header().Set("Content-Length", fmt.Sprintf("%d", safariLargeFileValue))
writer.WriteHeader(http.StatusOK)
} else if strings.HasPrefix(rangeHeader, "bytes=") && strings.HasSuffix(rangeHeader, fmt.Sprintf("-%d", safariLargeFileValue-1)) {
//any other requests, these should fail
writer.Header().Set("Content-Range", fmt.Sprintf("bytes %s/%d", strings.TrimPrefix(rangeHeader, "bytes="), safariLargeFileValue))
writer.Header().Set("Accept-Ranges", "bytes")
writer.WriteHeader(http.StatusRequestedRangeNotSatisfiable)
return
}
}
}
bitrate := 0
if value, ok := mount.Options["bitrate"]; ok {
if intValue, ok := value.(int); ok {
bitrate = intValue
} else if int64Value, ok := value.(int64); ok {
bitrate = int(int64Value)
}
}
//set some audiocast/icy radio headers
writer.Header().Set("x-audiocast-name", q.config.Radio.Name)
writer.Header().Set("x-audiocast-bitrate", fmt.Sprintf("%d", bitrate))
writer.Header().Set("icy-name", q.config.Radio.Name)
writer.Header().Set("icy-version", "2")
writer.Header().Set("icy-index-metadata", "1")
if q.config.Radio.Description != "" {
writer.Header().Set("x-audiocast-description", q.config.Radio.Description)
writer.Header().Set("icy-description", q.config.Radio.Description)
}
if q.config.Radio.URL != "" {
writer.Header().Set("x-audiocast-url", q.config.Radio.URL)
writer.Header().Set("icy-url", q.config.Radio.URL)
}
if q.config.Radio.Logo != "" {
writer.Header().Set("icy-logo", q.config.Radio.Logo)
}
writer.Header().Set("icy-br", fmt.Sprintf("%d", bitrate))
writer.Header().Set("icy-sr", fmt.Sprintf("%d", mount.SampleRate))
writer.Header().Set("icy-audio-info", fmt.Sprintf("ice-channels=%d;ice-samplerate=%d;ice-bitrate=%d", mount.Channels, mount.SampleRate, bitrate))
if q.config.Radio.Private {
writer.Header().Set("icy-pub", "0")
writer.Header().Set("icy-do-not-index", "1")
writer.Header().Set("x-audiocast-public", "0")
writer.Header().Set("x-robots-tag", "noindex, nofollow")
} else {
writer.Header().Set("icy-pub", "1")
writer.Header().Set("icy-do-not-index", "0")
writer.Header().Set("x-audiocast-public", "1")
}
var packetWriteCallback func(packet packetizer.Packet) error
//buffer a bit, drop channels when buffer grows to not lock others. They will get disconnected elsewhere
const byteSliceChannelBuffer = 1024 * 16
writeChannel := make(chan []byte, byteSliceChannelBuffer)
requestDone := struct {
Done atomic.Bool
Lock sync.Mutex
Error error
}{}
var wgClient sync.WaitGroup
//set X-Audio-Packet-Stream for strictly timed packets and metadata
if numberValue, err := strconv.Atoi(request.Header.Get("x-audio-packet-stream")); err == nil && numberValue == 1 {
//version 1
writer.Header().Set("x-audio-packet-stream", "1")
writer.Header().Set("Content-Type", "application/x-audio-packet-stream")
packetWriteCallback = func(packet packetizer.Packet) error {
if requestDone.Done.Load() {
return requestDone.Error
}
if metadataPacket, ok := packet.(*QueueMetadataPacket); ok {
queueInfoBuf := make([]byte, binary.MaxVarintLen64)
n := binary.PutVarint(queueInfoBuf, int64(metadataPacket.TrackEntry.QueueIdentifier))
if len(writeChannel) >= (byteSliceChannelBuffer - 1) {
requestDone.Lock.Lock()
defer requestDone.Lock.Unlock()
requestDone.Error = errors.New("client ran out of buffer")
requestDone.Done.Store(true)
return requestDone.Error
}
writeChannel <- (&packetStreamFrame{
Type: TrackIdentifier,
Category: packet.Category(),
StartSampleNumber: packet.GetStartSampleNumber(),
DurationInSamples: packet.GetEndSampleNumber() - packet.GetStartSampleNumber(),
Data: queueInfoBuf[:n],
}).Encode()
if metadataBytes, err := json.Marshal(metadataPacket.TrackEntry.Metadata); err == nil {
if len(writeChannel) >= (byteSliceChannelBuffer - 1) {
requestDone.Lock.Lock()
defer requestDone.Lock.Unlock()
requestDone.Error = errors.New("client ran out of buffer")
requestDone.Done.Store(true)
return requestDone.Error
}
writeChannel <- (&packetStreamFrame{
Type: TrackMetadata,
Category: packet.Category(),
StartSampleNumber: packet.GetStartSampleNumber(),
DurationInSamples: packet.GetEndSampleNumber() - packet.GetStartSampleNumber(),
Data: metadataBytes,
}).Encode()
}
return nil
}
if len(writeChannel) >= (byteSliceChannelBuffer - 1) {
requestDone.Lock.Lock()
defer requestDone.Lock.Unlock()
requestDone.Error = errors.New("client ran out of buffer")
requestDone.Done.Store(true)
return requestDone.Error
}
var frameType PacketStreamType
switch packet.KeepMode() {
case packetizer.KeepLast:
frameType = DataKeepLast
case packetizer.Keep:
frameType = DataKeep
case packetizer.GroupKeep:
frameType = DataGroupKeep
case packetizer.GroupDiscard:
frameType = DataGroupDiscard
case packetizer.Discard:
frameType = DataDiscard
default:
return errors.New("unknown KeepMode")
}
writeChannel <- (&packetStreamFrame{
Type: frameType,
Category: packet.Category(),
StartSampleNumber: packet.GetStartSampleNumber(),
DurationInSamples: packet.GetEndSampleNumber() - packet.GetStartSampleNumber(),
Data: packet.GetData(),
}).Encode()
return nil
}
headerBytes := new(bytes.Buffer)
binary.Write(headerBytes, binary.LittleEndian, int64(mount.Channels))
binary.Write(headerBytes, binary.LittleEndian, int64(mount.SampleRate))
binary.Write(headerBytes, binary.LittleEndian, int32(len(mount.MimeType)))
headerBytes.Write([]byte(mount.MimeType))
writeChannel <- (&packetStreamFrame{
Type: Header,
Category: 0,
StartSampleNumber: 0,
DurationInSamples: 0,
Data: headerBytes.Bytes(),
}).Encode()
} else if numberValue, err = strconv.Atoi(request.Header.Get("icy-metadata")); err == nil && numberValue >= 1 {
metadataToSend := make(map[string]string)
const icyInterval = 8192 //weird clients might not support other numbers than this
icyCounter := 0
writer.Header().Set("icy-metaint", fmt.Sprintf("%d", icyInterval))
writeIcy := func() []byte {
packetContent := make([]byte, 1, 16*255+1)
for k, v := range metadataToSend {
packetContent = append(packetContent, []byte(fmt.Sprintf("%s='%s';", k, v))...)
delete(metadataToSend, k)
//shouldn't send multiple properties in same packet if we want working single quotes, wait until next ICY frame
break
}
contentLength := len(packetContent) - 1
if contentLength > 16*255 {
//cannot send long titles
return make([]byte, 1)
}
if (contentLength % 16) == 0 { //already padded
packetContent[0] = byte(contentLength / 16)
} else {
packetContent[0] = byte(contentLength/16) + 1
packetContent = append(packetContent, make([]byte, 16-(contentLength%16))...)
}
return packetContent
}
var streamStartOffset int64 = -1
packetWriteCallback = func(packet packetizer.Packet) error {
if requestDone.Done.Load() {
return requestDone.Error
}
if metadataPacket, ok := packet.(*QueueMetadataPacket); ok {
if len(metadataPacket.TrackEntry.Artist()) > 0 {
metadataToSend["StreamTitle"] = fmt.Sprintf("%s - %s", metadataPacket.TrackEntry.Artist(), metadataPacket.TrackEntry.Title())
} else {
metadataToSend["StreamTitle"] = metadataPacket.TrackEntry.Title()
}
if len(metadataPacket.TrackEntry.Metadata.Art) > 0 {
metadataToSend["StreamURL"] = metadataPacket.TrackEntry.Metadata.Art
}
return nil
}
var p []byte
if offsetable, ok := packet.(packetizer.OffsetablePacket); mount.OffsetStart && ok {
if streamStartOffset <= -1 {
if offsetable.KeepMode() != packetizer.Keep {
streamStartOffset = offsetable.GetStartSampleNumber()
p = offsetable.GetDataOffset(streamStartOffset)
} else {
p = packet.GetData()
}
} else {
p = offsetable.GetDataOffset(streamStartOffset)
}
} else {
p = packet.GetData()
}
var data []byte
for len(p) > 0 {
l := icyInterval - icyCounter
if l <= len(p) {
data = append(data, p[:l]...)
data = append(data, writeIcy()...)
icyCounter = 0
p = p[l:]
} else {
data = append(data, p...)
icyCounter += len(p)
p = p[:0]
}
}
if len(writeChannel) >= (byteSliceChannelBuffer - 1) {
requestDone.Lock.Lock()
defer requestDone.Lock.Unlock()
requestDone.Error = errors.New("client ran out of buffer")
requestDone.Done.Store(true)
return requestDone.Error
}
writeChannel <- data
return nil
}
} else {
var streamStartOffset int64 = -1
packetWriteCallback = func(packet packetizer.Packet) error {
if requestDone.Done.Load() {
return requestDone.Error
}
if _, ok := packet.(*QueueMetadataPacket); ok {
return nil
}
if len(writeChannel) >= (byteSliceChannelBuffer - 1) {
requestDone.Lock.Lock()
defer requestDone.Lock.Unlock()
requestDone.Error = errors.New("client ran out of buffer")
requestDone.Done.Store(true)
return requestDone.Error
}
if offsetable, ok := packet.(packetizer.OffsetablePacket); mount.OffsetStart && ok {
if streamStartOffset <= -1 {
if offsetable.KeepMode() != packetizer.Keep {
streamStartOffset = offsetable.GetStartSampleNumber()
writeChannel <- offsetable.GetDataOffset(streamStartOffset)
} else {
writeChannel <- packet.GetData()
}
} else {
writeChannel <- offsetable.GetDataOffset(streamStartOffset)
}
} else {
writeChannel <- packet.GetData()
}
return nil
}
}
wgClient.Add(1)
go func() {
defer wgClient.Done()
var flusher http.Flusher
if httpFlusher, ok := writer.(http.Flusher); ok {
flusher = httpFlusher
}
for byteSlice := range writeChannel {
if _, err := writer.Write(byteSlice); err != nil {
requestDone.Lock.Lock()
defer requestDone.Lock.Unlock()
requestDone.Error = errors.New("client ran out of buffer")
requestDone.Done.Store(true)
return
}
//try flush
if flusher != nil {
flusher.Flush()
}
}
}()
var headers []HeaderEntry
for k, v := range request.Header {
for _, s := range v {
headers = append(headers, HeaderEntry{
Name: k,
Value: s,
})
}
}
uriPath := request.URL.Path
if len(request.URL.Query().Encode()) > 0 {
uriPath += "?" + request.URL.Query().Encode()
}
getKnownBufferSize := func() time.Duration {
userAgent := request.Header.Get("user-agent")
if strings.Index(userAgent, "libmpv") != -1 || strings.Index(userAgent, "mpv ") != -1 { //mpv
return time.Millisecond * 2500
} else if strings.Index(userAgent, "libvlc") != -1 { //VLC
return time.Millisecond * 2500
} else if strings.Index(userAgent, "lavf/") != -1 { //ffplay
return time.Millisecond * 2500
} else if strings.Index(userAgent, "gvfs/") != -1 { //gvfs
return time.Millisecond * 2500
} else if strings.Index(userAgent, "Music Player Daemon ") != -1 { //MPD
return time.Millisecond * 2500
} else if strings.Index(userAgent, " Chrome/") != -1 { //Chromium-based
return time.Millisecond * 4000
} else if strings.Index(userAgent, " Safari/") != -1 { //Safari-based
return time.Millisecond * 5000
} else if strings.Index(userAgent, " Gecko/") != -1 { //Gecko-based (Firefox)
return time.Millisecond * 5000
} else if request.Header.Get("icy-metadata") == "1" { //other unknown players
return time.Millisecond * 5000
}
//fallback and provide maximum buffer
return time.Second * maxBufferSize
}
sampleBufferLimit := int64(q.config.Queue.BufferSeconds * mount.SampleRate)
if q.config.Queue.BufferSeconds == 0 { //auto buffer setup based on user agent and other client headers
sampleBufferLimit = int64(getKnownBufferSize().Seconds() * float64(mount.SampleRate))
}
wgClient.Add(1)
startStamp := time.Now().Unix()
hashSum := sha256.Sum256([]byte(fmt.Sprintf("%s-%s-%s-%s-%d", request.RequestURI, request.RemoteAddr, request.Proto, request.Header.Get("user-agent"), startStamp)))
listenerIdentifier := hex.EncodeToString(hashSum[16:])
writer.Header().Set("x-listener-identifier", listenerIdentifier)
mount.AddListener(&StreamListener{
Information: ListenerInformation{
Identifier: listenerIdentifier,
Mount: mount.Mount,
Path: uriPath,
Headers: headers,
Start: startStamp,
},
Start: func(packets []packetizer.Packet) error {
log.Printf("adding %s client to stream %s (%s, %s, agent \"%s\", buffer %.2f seconds)\n", listenerIdentifier, mount.Mount, request.RemoteAddr, request.Proto, request.Header.Get("user-agent"), float64(sampleBufferLimit)/float64(mount.SampleRate))
if len(packets) > 0 {
sampleBufferMin := packets[len(packets)-1].GetStartSampleNumber() - sampleBufferLimit
for _, p := range packets {
if p.KeepMode() != packetizer.Discard || p.GetEndSampleNumber() >= sampleBufferMin {
if err := packetWriteCallback(p); err != nil {
return err
}
}
}
}
return nil
},
Write: packetWriteCallback,
Close: func() {
log.Printf("removing %s client from stream %s\n", listenerIdentifier, mount.Mount)
defer wgClient.Done()
close(writeChannel)
},
})
wgClient.Wait()
return
}
}
writer.WriteHeader(http.StatusNotFound)
return
}