DataHoarder
9ff4d5e9c5
All checks were successful
continuous-integration/drone/push Build is passing
517 lines
13 KiB
Go
517 lines
13 KiB
Go
package main
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
"git.gammaspectra.live/S.O.N.G/Kirika/audio"
|
|
"git.gammaspectra.live/S.O.N.G/Kirika/audio/format/flac"
|
|
"git.gammaspectra.live/S.O.N.G/Kirika/audio/format/mp3"
|
|
"git.gammaspectra.live/S.O.N.G/Kirika/audio/format/opus"
|
|
"git.gammaspectra.live/S.O.N.G/Kirika/audio/format/tta"
|
|
"git.gammaspectra.live/S.O.N.G/Kirika/audio/format/vorbis"
|
|
"git.gammaspectra.live/S.O.N.G/Kirika/audio/packetizer"
|
|
"log"
|
|
"net/http"
|
|
"os"
|
|
"path"
|
|
"runtime"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
)
|
|
|
|
const maxBufferSize = 10
|
|
|
|
type QueueTrackEntry struct {
|
|
QueueIdentifier audio.QueueIdentifier
|
|
Path string
|
|
Metadata struct {
|
|
Title string `json:"title"`
|
|
Album string `json:"album"`
|
|
Artist string `json:"artist"`
|
|
Art string `json:"art"`
|
|
}
|
|
source audio.Source
|
|
|
|
original map[string]interface{}
|
|
}
|
|
|
|
func (e *QueueTrackEntry) Load() error {
|
|
if e.source.Blocks != nil {
|
|
return nil
|
|
}
|
|
|
|
f, err := os.Open(e.Path)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
//close at end, TODO check if it runs
|
|
runtime.SetFinalizer(f, (*os.File).Close)
|
|
|
|
var source audio.Source
|
|
switch strings.ToLower(path.Ext(e.Path)) {
|
|
case ".flac":
|
|
source, err = flacFormat.Open(f)
|
|
case ".tta":
|
|
source, err = ttaFormat.Open(f)
|
|
case ".mp3":
|
|
source, err = mp3Format.Open(f)
|
|
case ".ogg":
|
|
if source, err = opusFormat.Open(f); err != nil {
|
|
//try flac
|
|
if source, err = flacFormat.Open(f); err != nil {
|
|
//try vorbis
|
|
source, err = vorbisFormat.Open(f)
|
|
}
|
|
}
|
|
case ".opus":
|
|
source, err = opusFormat.Open(f)
|
|
case ".vorbis":
|
|
source, err = vorbisFormat.Open(f)
|
|
}
|
|
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if source.Blocks == nil {
|
|
return fmt.Errorf("could not find decoder for %s", e.Path)
|
|
}
|
|
|
|
e.source = source
|
|
return nil
|
|
}
|
|
|
|
type QueueMetadataPacket struct {
|
|
sampleNumber int64
|
|
TrackEntry *QueueTrackEntry
|
|
}
|
|
|
|
func (p *QueueMetadataPacket) KeepMode() packetizer.KeepMode {
|
|
return packetizer.KeepLast
|
|
}
|
|
|
|
func (p *QueueMetadataPacket) GetStartSampleNumber() int64 {
|
|
return p.sampleNumber
|
|
}
|
|
|
|
func (p *QueueMetadataPacket) GetEndSampleNumber() int64 {
|
|
return p.sampleNumber
|
|
}
|
|
|
|
func (p *QueueMetadataPacket) Category() int64 {
|
|
return -1
|
|
}
|
|
|
|
func (p *QueueMetadataPacket) GetData() []byte {
|
|
return nil
|
|
}
|
|
|
|
type Queue struct {
|
|
NowPlaying chan *QueueTrackEntry
|
|
QueueEmpty chan *QueueTrackEntry
|
|
Duration time.Duration
|
|
durationError int64
|
|
audioQueue *audio.Queue
|
|
mounts []*StreamMount
|
|
queue []*QueueTrackEntry
|
|
mutex sync.RWMutex
|
|
config *Config
|
|
wg sync.WaitGroup
|
|
}
|
|
|
|
func NewQueue(config *Config) *Queue {
|
|
q := &Queue{
|
|
NowPlaying: make(chan *QueueTrackEntry, 1),
|
|
QueueEmpty: make(chan *QueueTrackEntry),
|
|
config: config,
|
|
audioQueue: audio.NewQueue(44100, 2),
|
|
}
|
|
blocksPerSecond := 20
|
|
|
|
sources := SplitAudioSource(audio.NewFilterChain(q.audioQueue.GetSource(), audio.NewBufferFilter(16), audio.NewRealTimeFilter(blocksPerSecond), audio.NewBufferFilter(maxBufferSize*blocksPerSecond)), len(config.Streams))
|
|
for i, s := range q.config.Streams {
|
|
mount := NewStreamMount(sources[i], s.MountPath, s.Codec, s.Container, s.Bitrate)
|
|
if mount == nil {
|
|
log.Panicf("could not initialize %s\n", s.MountPath)
|
|
}
|
|
q.mounts = append(q.mounts, mount)
|
|
q.wg.Add(1)
|
|
go mount.Process(&q.wg)
|
|
}
|
|
|
|
return q
|
|
}
|
|
|
|
func (q *Queue) Wait() {
|
|
q.wg.Wait()
|
|
close(q.NowPlaying)
|
|
}
|
|
|
|
var flacFormat = flac.NewFormat()
|
|
var ttaFormat = tta.NewFormat()
|
|
var mp3Format = mp3.NewFormat()
|
|
var opusFormat = opus.NewFormat()
|
|
var vorbisFormat = vorbis.NewFormat()
|
|
|
|
func (q *Queue) AddTrack(entry *QueueTrackEntry, tail bool) error {
|
|
|
|
if err := entry.Load(); err != nil {
|
|
return err
|
|
}
|
|
|
|
startCallback := func(queue *audio.Queue, queueEntry *audio.QueueEntry) {
|
|
if e := q.Get(queueEntry.Identifier); e != nil { //is this needed?
|
|
log.Printf("now playing %s\n", e.Path)
|
|
q.NowPlaying <- e
|
|
for _, mount := range q.mounts {
|
|
mount.MetadataQueue.Enqueue(&QueueMetadataPacket{
|
|
//TODO: carry error
|
|
sampleNumber: int64(q.Duration * time.Duration(queue.GetSampleRate()) / time.Second),
|
|
TrackEntry: e,
|
|
})
|
|
}
|
|
} else {
|
|
log.Printf("now playing %s\n", entry.Path)
|
|
}
|
|
}
|
|
|
|
endCallback := func(queue *audio.Queue, entry *audio.QueueEntry) {
|
|
|
|
}
|
|
|
|
removeCallback := func(queue *audio.Queue, entry *audio.QueueEntry) {
|
|
atomic.AddInt64((*int64)(&q.Duration), int64((time.Second*time.Duration(entry.ReadSamples))/time.Duration(entry.Source.SampleRate)))
|
|
|
|
q.Remove(entry.Identifier)
|
|
q.HandleQueue()
|
|
}
|
|
|
|
q.mutex.Lock()
|
|
defer q.mutex.Unlock()
|
|
|
|
if tail {
|
|
entry.QueueIdentifier = q.audioQueue.AddTail(entry.source, startCallback, endCallback, removeCallback)
|
|
} else {
|
|
entry.QueueIdentifier = q.audioQueue.AddHead(entry.source, startCallback, endCallback, removeCallback)
|
|
}
|
|
|
|
entry.original["queue_id"] = entry.QueueIdentifier
|
|
|
|
if tail || len(q.queue) == 0 {
|
|
q.queue = append(q.queue, entry)
|
|
} else {
|
|
q.queue = append(q.queue[:1], append([]*QueueTrackEntry{entry}, q.queue[1:]...)...)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (q *Queue) HandleQueue() {
|
|
if q.audioQueue.GetQueueSize() == 0 {
|
|
q.AddTrack(<-q.QueueEmpty, true)
|
|
}
|
|
}
|
|
|
|
func (q *Queue) GetQueue() (result []*QueueTrackEntry) {
|
|
q.mutex.RLock()
|
|
defer q.mutex.RUnlock()
|
|
|
|
if len(q.queue) > 1 {
|
|
result = make([]*QueueTrackEntry, len(q.queue)-1)
|
|
copy(result, q.queue[1:])
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
func (q *Queue) Get(identifier audio.QueueIdentifier) *QueueTrackEntry {
|
|
q.mutex.RLock()
|
|
defer q.mutex.RUnlock()
|
|
for _, e := range q.queue {
|
|
if e.QueueIdentifier == identifier {
|
|
return e
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (q *Queue) GetNowPlaying() *QueueTrackEntry {
|
|
if e := q.audioQueue.GetQueueHead(); e != nil {
|
|
return q.Get(e.Identifier)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (q *Queue) SkipNowPlaying() bool {
|
|
if e := q.audioQueue.GetQueueHead(); e != nil {
|
|
return q.Remove(e.Identifier)
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
func (q *Queue) GetIndex(index int) *QueueTrackEntry {
|
|
if e := q.audioQueue.GetQueueIndex(index + 1); e != nil {
|
|
return q.Get(e.Identifier)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (q *Queue) GetHead() *QueueTrackEntry {
|
|
if e := q.audioQueue.GetQueueIndex(1); e != nil {
|
|
return q.Get(e.Identifier)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (q *Queue) GetTail() *QueueTrackEntry {
|
|
if i, e := q.audioQueue.GetQueueTail(); i != 0 && e != nil {
|
|
return q.Get(e.Identifier)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (q *Queue) Remove(identifier audio.QueueIdentifier) bool {
|
|
q.mutex.Lock()
|
|
for i, e := range q.queue {
|
|
if e.QueueIdentifier == identifier {
|
|
q.queue = append(q.queue[:i], q.queue[i+1:]...)
|
|
q.mutex.Unlock()
|
|
q.audioQueue.Remove(identifier)
|
|
return true
|
|
}
|
|
}
|
|
q.mutex.Unlock()
|
|
q.audioQueue.Remove(identifier)
|
|
return false
|
|
}
|
|
|
|
func (q *Queue) GetListeners() (listeners []*ListenerInformation) {
|
|
q.mutex.RLock()
|
|
defer q.mutex.RUnlock()
|
|
|
|
listeners = make([]*ListenerInformation, 0, 1)
|
|
|
|
for _, mount := range q.mounts {
|
|
listeners = append(listeners, mount.GetListeners()...)
|
|
}
|
|
return
|
|
}
|
|
|
|
func (q *Queue) HandleRadioRequest(writer http.ResponseWriter, request *http.Request) {
|
|
for _, mount := range q.mounts {
|
|
if strings.HasSuffix(request.URL.Path, mount.Mount) {
|
|
writer.Header().Set("Server", "MeteorLight/radio")
|
|
writer.Header().Set("Content-Type", mount.MimeType)
|
|
writer.Header().Set("Accept-Ranges", "none")
|
|
writer.Header().Set("Connection", "keep-alive")
|
|
writer.Header().Set("X-Audiocast-Name", q.config.Radio.Name)
|
|
writer.Header().Set("Cache-Control", "no-store, max-age=0, no-transform")
|
|
writer.Header().Set("X-Content-Type-Options", "nosniff")
|
|
|
|
var packetWriteCallback func(packet packetizer.Packet) error
|
|
|
|
//buffer a bit, drop channels when buffer grows to not lock others. They will get disconnected elsewhere
|
|
const byteSliceChannelBuffer = 128
|
|
writeChannel := make(chan []byte, byteSliceChannelBuffer)
|
|
var requestDone error
|
|
var wgClient sync.WaitGroup
|
|
|
|
if numberValue, err := strconv.Atoi(request.Header.Get("icy-metadata")); err == nil && numberValue >= 1 {
|
|
metadataToSend := make(map[string]string)
|
|
const icyInterval = 8192
|
|
icyCounter := 0
|
|
writer.Header().Set("Icy-MetaInt", fmt.Sprintf("%d", icyInterval))
|
|
|
|
writeIcy := func() []byte {
|
|
packetContent := make([]byte, 1, 4096)
|
|
for k, v := range metadataToSend {
|
|
packetContent = append(packetContent, []byte(fmt.Sprintf("%s='%s';", k, v))...)
|
|
delete(metadataToSend, k)
|
|
|
|
//shouldn't send multiple properties in same packet if we want working single quotes, wait until next ICY frame
|
|
break
|
|
}
|
|
|
|
contentLength := len(packetContent) - 1
|
|
if contentLength > 16*255 {
|
|
//cannot send long titles
|
|
return make([]byte, 1)
|
|
}
|
|
|
|
if (contentLength % 16) == 0 { //already padded
|
|
packetContent[0] = byte(contentLength / 16)
|
|
} else {
|
|
packetContent[0] = byte(contentLength/16) + 1
|
|
packetContent = append(packetContent, make([]byte, 16-(contentLength%16))...)
|
|
}
|
|
|
|
return packetContent
|
|
}
|
|
|
|
packetWriteCallback = func(packet packetizer.Packet) error {
|
|
if requestDone != nil {
|
|
return requestDone
|
|
}
|
|
if metadataPacket, ok := packet.(*QueueMetadataPacket); ok {
|
|
if len(metadataPacket.TrackEntry.Metadata.Artist) > 0 {
|
|
metadataToSend["StreamTitle"] = fmt.Sprintf("%s - %s", metadataPacket.TrackEntry.Metadata.Artist, metadataPacket.TrackEntry.Metadata.Title)
|
|
} else {
|
|
metadataToSend["StreamTitle"] = metadataPacket.TrackEntry.Metadata.Title
|
|
}
|
|
if len(metadataPacket.TrackEntry.Metadata.Art) > 0 {
|
|
metadataToSend["StreamURL"] = metadataPacket.TrackEntry.Metadata.Art
|
|
}
|
|
return nil
|
|
}
|
|
|
|
p := packet.GetData()
|
|
var data []byte
|
|
for len(p) > 0 {
|
|
l := icyInterval - icyCounter
|
|
if l <= len(p) {
|
|
data = append(data, p[:l]...)
|
|
data = append(data, writeIcy()...)
|
|
icyCounter = 0
|
|
p = p[l:]
|
|
} else {
|
|
data = append(data, p...)
|
|
icyCounter += len(p)
|
|
p = p[:0]
|
|
}
|
|
}
|
|
|
|
if len(writeChannel) >= byteSliceChannelBuffer-1 {
|
|
requestDone = errors.New("client ran out of buffer")
|
|
return requestDone
|
|
}
|
|
writeChannel <- data
|
|
return nil
|
|
}
|
|
} else {
|
|
packetWriteCallback = func(packet packetizer.Packet) error {
|
|
if requestDone != nil {
|
|
return requestDone
|
|
}
|
|
if _, ok := packet.(*QueueMetadataPacket); ok {
|
|
return nil
|
|
}
|
|
|
|
if len(writeChannel) >= byteSliceChannelBuffer-1 {
|
|
requestDone = errors.New("client ran out of buffer")
|
|
return requestDone
|
|
}
|
|
writeChannel <- packet.GetData()
|
|
return nil
|
|
}
|
|
}
|
|
|
|
wgClient.Add(1)
|
|
go func() {
|
|
defer wgClient.Done()
|
|
var flusher http.Flusher
|
|
if httpFlusher, ok := writer.(http.Flusher); ok {
|
|
flusher = httpFlusher
|
|
}
|
|
|
|
for byteSlice := range writeChannel {
|
|
if _, requestDone = writer.Write(byteSlice); requestDone != nil {
|
|
break
|
|
}
|
|
//try flush
|
|
if flusher != nil {
|
|
flusher.Flush()
|
|
}
|
|
}
|
|
|
|
}()
|
|
|
|
var headers []HeaderEntry
|
|
for k, v := range request.Header {
|
|
for _, s := range v {
|
|
headers = append(headers, HeaderEntry{
|
|
Name: k,
|
|
Value: s,
|
|
})
|
|
}
|
|
}
|
|
|
|
uriPath := request.URL.Path
|
|
if len(request.URL.Query().Encode()) > 0 {
|
|
uriPath += "?" + request.URL.Query().Encode()
|
|
}
|
|
|
|
getKnownBufferSize := func() time.Duration {
|
|
userAgent := request.Header.Get("user-agent")
|
|
if strings.Index(userAgent, "libmpv") != -1 || strings.Index(userAgent, "mpv ") != -1 { //mpv
|
|
return time.Millisecond * 2500
|
|
} else if strings.Index(userAgent, "libvlc") != -1 { //VLC
|
|
return time.Millisecond * 2500
|
|
} else if strings.Index(userAgent, "lavf/") != -1 { //ffplay
|
|
return time.Millisecond * 2500
|
|
} else if strings.Index(userAgent, "gvfs/") != -1 { //gvfs
|
|
return time.Millisecond * 2500
|
|
} else if strings.Index(userAgent, "Music Player Daemon ") != -1 { //MPD
|
|
return time.Millisecond * 2500
|
|
} else if strings.Index(userAgent, " Chrome/") != -1 { //Chromium-based
|
|
return time.Millisecond * 5000
|
|
} else if strings.Index(userAgent, " Safari/") != -1 { //Safari-based
|
|
return time.Millisecond * 5000
|
|
} else if strings.Index(userAgent, " Gecko/") != -1 { //Gecko-based (Firefox)
|
|
return time.Millisecond * 5000
|
|
} else if request.Header.Get("icy-metadata") == "1" { //other unknown players
|
|
return time.Millisecond * 5000
|
|
}
|
|
|
|
//fallback and provide maximum buffer
|
|
return time.Second * maxBufferSize
|
|
}
|
|
|
|
sampleBufferLimit := int64(q.config.Queue.BufferSeconds * mount.SampleRate)
|
|
if q.config.Queue.BufferSeconds == 0 { //auto buffer setup based on user agent and other client headers
|
|
sampleBufferLimit = int64(getKnownBufferSize().Seconds() * float64(mount.SampleRate))
|
|
}
|
|
|
|
wgClient.Add(1)
|
|
mount.AddListener(&StreamListener{
|
|
Information: ListenerInformation{
|
|
Mount: mount.Mount,
|
|
Path: uriPath,
|
|
Headers: headers,
|
|
},
|
|
Start: func(packets []packetizer.Packet) error {
|
|
if len(packets) > 0 {
|
|
sampleBufferMin := packets[len(packets)-1].GetStartSampleNumber() - sampleBufferLimit
|
|
for _, p := range packets {
|
|
if p.KeepMode() != packetizer.Discard || p.GetEndSampleNumber() >= sampleBufferMin {
|
|
if err := packetWriteCallback(p); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
}
|
|
return nil
|
|
},
|
|
Write: packetWriteCallback,
|
|
Close: func() {
|
|
defer wgClient.Done()
|
|
close(writeChannel)
|
|
},
|
|
})
|
|
wgClient.Wait()
|
|
return
|
|
}
|
|
}
|
|
|
|
writer.WriteHeader(http.StatusNotFound)
|
|
return
|
|
}
|