MeteorLight/queue.go

505 lines
12 KiB
Go
Raw Normal View History

2022-03-02 17:54:56 +00:00
package main
import (
"fmt"
"git.gammaspectra.live/S.O.N.G/Kirika/audio"
"git.gammaspectra.live/S.O.N.G/Kirika/audio/format/flac"
"git.gammaspectra.live/S.O.N.G/Kirika/audio/format/mp3"
"git.gammaspectra.live/S.O.N.G/Kirika/audio/format/opus"
"git.gammaspectra.live/S.O.N.G/Kirika/audio/format/tta"
"git.gammaspectra.live/S.O.N.G/Kirika/audio/packetizer"
2022-03-02 19:26:48 +00:00
"io"
2022-03-02 17:54:56 +00:00
"log"
"net/http"
"os"
"path"
2022-03-04 12:25:59 +00:00
"strconv"
2022-03-02 17:54:56 +00:00
"strings"
"sync"
"time"
2022-03-02 17:54:56 +00:00
)
const maxBufferSize = 10
2022-03-02 17:54:56 +00:00
type QueueTrackEntry struct {
QueueIdentifier audio.QueueIdentifier
Path string
Metadata struct {
Title string `json:"title"`
Album string `json:"album"`
Artist string `json:"artist"`
Art string `json:"art"`
2022-03-02 17:54:56 +00:00
}
original map[string]interface{}
}
2022-03-04 12:25:59 +00:00
type QueueMetadataPacket struct {
sampleNumber int64
TrackEntry *QueueTrackEntry
}
func (p *QueueMetadataPacket) KeepMode() packetizer.KeepMode {
return packetizer.KeepLast
}
func (p *QueueMetadataPacket) GetStartSampleNumber() int64 {
return p.sampleNumber
}
func (p *QueueMetadataPacket) GetEndSampleNumber() int64 {
return p.sampleNumber
}
func (p *QueueMetadataPacket) Category() int64 {
return -1
}
func (p *QueueMetadataPacket) GetData() []byte {
return nil
}
2022-03-02 17:54:56 +00:00
type Queue struct {
NowPlaying chan *QueueTrackEntry
QueueEmpty chan *QueueTrackEntry
2022-03-04 12:25:59 +00:00
Duration time.Duration
2022-03-02 17:54:56 +00:00
audioQueue *audio.Queue
mounts []*StreamMount
queue []*QueueTrackEntry
mutex sync.RWMutex
config *Config
wg sync.WaitGroup
}
func NewQueue(config *Config) *Queue {
q := &Queue{
NowPlaying: make(chan *QueueTrackEntry, 1),
QueueEmpty: make(chan *QueueTrackEntry),
config: config,
audioQueue: audio.NewQueue(44100, 2),
}
blocksPerSecond := 20
sources := SplitAudioSource(audio.NewFilterChain(q.audioQueue.GetSource(), audio.NewBufferFilter(16), audio.NewRealTimeFilter(blocksPerSecond), audio.NewBufferFilter(maxBufferSize*blocksPerSecond)), len(config.Streams))
2022-03-02 17:54:56 +00:00
for i, s := range q.config.Streams {
mount := NewStreamMount(sources[i], s.MountPath, s.Codec, s.Container, s.Bitrate)
if mount == nil {
log.Panicf("could not initialize %s\n", s.MountPath)
}
q.mounts = append(q.mounts, mount)
q.wg.Add(1)
go mount.Process(&q.wg)
}
return q
}
func (q *Queue) Wait() {
q.wg.Wait()
close(q.NowPlaying)
}
var flacFormat = flac.NewFormat()
var ttaFormat = tta.NewFormat()
var mp3Format = mp3.NewFormat()
var opusFormat = opus.NewFormat()
func (q *Queue) AddTrack(entry *QueueTrackEntry, tail bool) error {
f, err := os.Open(entry.Path)
if err != nil {
return err
}
var source audio.Source
switch strings.ToLower(path.Ext(entry.Path)) {
case ".flac":
source, err = flacFormat.Open(f)
case ".tta":
source, err = ttaFormat.Open(f)
case ".mp3":
source, err = mp3Format.Open(f)
case ".ogg", ".opus":
source, err = opusFormat.Open(f)
}
if err != nil {
f.Close()
return err
}
if source.Blocks == nil {
f.Close()
return fmt.Errorf("could not find decoder for %s", entry.Path)
}
startCallback := func(queue *audio.Queue, entry *audio.QueueEntry) {
log.Printf("now playing %s\n", f.Name())
if e := q.Get(entry.Identifier); e != nil {
q.NowPlaying <- e
2022-03-04 12:25:59 +00:00
for _, mount := range q.mounts {
mount.MetadataQueue.Enqueue(&QueueMetadataPacket{
//TODO: carry error
sampleNumber: int64(q.Duration * time.Duration(queue.GetSampleRate()) / time.Second),
TrackEntry: e,
})
}
2022-03-02 17:54:56 +00:00
}
}
endCallback := func(queue *audio.Queue, entry *audio.QueueEntry) {
}
removeCallback := func(queue *audio.Queue, entry *audio.QueueEntry) {
defer f.Close()
2022-03-04 12:25:59 +00:00
q.mutex.Lock()
//TODO: carry error
q.Duration += (time.Second * time.Duration(entry.ReadSamples)) / time.Duration(entry.Source.SampleRate)
q.mutex.Unlock()
2022-03-02 17:54:56 +00:00
q.Remove(entry.Identifier)
q.HandleQueue()
}
q.mutex.Lock()
defer q.mutex.Unlock()
if tail {
entry.QueueIdentifier = q.audioQueue.AddTail(source, startCallback, endCallback, removeCallback)
} else {
entry.QueueIdentifier = q.audioQueue.AddHead(source, startCallback, endCallback, removeCallback)
}
entry.original["queue_id"] = entry.QueueIdentifier
if tail || len(q.queue) == 0 {
q.queue = append(q.queue, entry)
} else {
q.queue = append(q.queue[:1], append([]*QueueTrackEntry{entry}, q.queue[1:]...)...)
}
return nil
}
func (q *Queue) HandleQueue() {
if q.audioQueue.GetQueueSize() == 0 {
q.AddTrack(<-q.QueueEmpty, true)
}
}
func (q *Queue) GetQueue() (result []*QueueTrackEntry) {
q.mutex.RLock()
defer q.mutex.RUnlock()
if len(q.queue) > 1 {
result = make([]*QueueTrackEntry, len(q.queue)-1)
copy(result, q.queue[1:])
}
return
}
func (q *Queue) Get(identifier audio.QueueIdentifier) *QueueTrackEntry {
q.mutex.RLock()
defer q.mutex.RUnlock()
for _, e := range q.queue {
if e.QueueIdentifier == identifier {
return e
}
}
return nil
}
func (q *Queue) GetNowPlaying() *QueueTrackEntry {
if e := q.audioQueue.GetQueueHead(); e != nil {
return q.Get(e.Identifier)
}
return nil
}
func (q *Queue) SkipNowPlaying() bool {
if e := q.audioQueue.GetQueueHead(); e != nil {
return q.Remove(e.Identifier)
}
return false
}
func (q *Queue) GetIndex(index int) *QueueTrackEntry {
if e := q.audioQueue.GetQueueIndex(index + 1); e != nil {
return q.Get(e.Identifier)
}
return nil
}
func (q *Queue) GetHead() *QueueTrackEntry {
if e := q.audioQueue.GetQueueIndex(1); e != nil {
return q.Get(e.Identifier)
}
return nil
}
func (q *Queue) GetTail() *QueueTrackEntry {
if i, e := q.audioQueue.GetQueueTail(); i != 0 && e != nil {
return q.Get(e.Identifier)
}
return nil
}
func (q *Queue) Remove(identifier audio.QueueIdentifier) bool {
q.mutex.Lock()
for i, e := range q.queue {
if e.QueueIdentifier == identifier {
q.queue = append(q.queue[:i], q.queue[i+1:]...)
q.mutex.Unlock()
q.audioQueue.Remove(identifier)
return true
}
}
q.mutex.Unlock()
q.audioQueue.Remove(identifier)
return false
}
2022-03-04 12:25:59 +00:00
type httpAudioWriter struct {
timeout time.Duration
writer http.ResponseWriter
metadataToSend struct {
Title string
URL string
}
icyInterval int
icyCounter int
2022-03-02 17:54:56 +00:00
}
2022-03-04 12:25:59 +00:00
func (h *httpAudioWriter) writeIcy() error {
packetContent := make([]byte, 1, 4096)
if len(h.metadataToSend.Title) > 0 {
//TODO: quote quotes
packetContent = append(packetContent, []byte(fmt.Sprintf("StreamTitle='%s';", strings.ReplaceAll(h.metadataToSend.Title, "'", "")))...)
h.metadataToSend.Title = ""
}
if len(h.metadataToSend.URL) > 0 {
//TODO: quote quotes
packetContent = append(packetContent, []byte(fmt.Sprintf("StreamURL='%s';", strings.ReplaceAll(h.metadataToSend.URL, "'", "")))...)
h.metadataToSend.URL = ""
}
contentLength := len(packetContent) - 1
if contentLength > 16*255 {
//cannot send long titles
_, err := h.writer.Write(make([]byte, 1))
return err
}
if (contentLength % 16) == 0 { //already padded
packetContent[0] = byte(contentLength / 16)
} else {
packetContent[0] = byte(contentLength/16) + 1
packetContent = append(packetContent, make([]byte, 16-(contentLength%16))...)
}
_, err := h.writer.Write(packetContent)
return err
}
func (h *httpAudioWriter) Write(p []byte) (n int, err error) {
2022-03-02 17:54:56 +00:00
if h.writer != nil {
2022-03-04 12:25:59 +00:00
if h.icyInterval > 0 {
var i int
for len(p) > 0 {
l := h.icyInterval - h.icyCounter
if l <= len(p) {
i, err = h.writer.Write(p[:l])
n += i
if err != nil {
h.writer = nil
break
}
if err = h.writeIcy(); err != nil {
h.writer = nil
break
}
h.icyCounter = 0
p = p[l:]
} else {
i, err = h.writer.Write(p)
n += i
if err != nil {
h.writer = nil
break
}
h.icyCounter += i
p = p[:0]
}
}
} else {
n, err = h.writer.Write(p)
if err != nil {
h.writer = nil
}
2022-03-02 17:54:56 +00:00
}
2022-03-02 19:26:48 +00:00
return
2022-03-02 17:54:56 +00:00
}
2022-03-02 19:26:48 +00:00
return 0, io.EOF
2022-03-02 17:54:56 +00:00
}
2022-03-04 12:25:59 +00:00
func (h *httpAudioWriter) Close() (err error) {
2022-03-02 17:54:56 +00:00
h.writer = nil
return nil
}
2022-03-04 12:25:59 +00:00
func (h *httpAudioWriter) Flush() {
2022-03-02 17:54:56 +00:00
if h.writer != nil {
//TODO: not deadline aware?
/*if flusher, ok := h.writer.(http.Flusher); ok {
2022-03-02 17:54:56 +00:00
flusher.Flush()
}*/
2022-03-02 17:54:56 +00:00
}
}
func (q *Queue) GetListeners() (listeners []*ListenerInformation) {
q.mutex.RLock()
2022-03-02 18:27:46 +00:00
defer q.mutex.RUnlock()
2022-03-02 17:54:56 +00:00
listeners = make([]*ListenerInformation, 0, 1)
for _, mount := range q.mounts {
listeners = append(listeners, mount.GetListeners()...)
}
return
}
func (q *Queue) HandleRadioRequest(writer http.ResponseWriter, request *http.Request) {
for _, mount := range q.mounts {
if strings.HasSuffix(request.URL.Path, mount.Mount) {
2022-03-02 17:54:56 +00:00
writer.Header().Set("Server", "MeteorLight/radio")
writer.Header().Set("Content-Type", mount.MimeType)
writer.Header().Set("Accept-Ranges", "none")
writer.Header().Set("Connection", "keep-alive")
writer.Header().Set("X-Audiocast-Name", q.config.Radio.Name)
writer.Header().Set("Cache-Control", "no-store, max-age=0, no-transform")
writer.Header().Set("X-Content-Type-Options", "nosniff")
2022-03-04 12:25:59 +00:00
byteWriter := &httpAudioWriter{writer: writer, timeout: time.Second * 2}
if numberValue, err := strconv.Atoi(request.Header.Get("icy-metadata")); err == nil && numberValue >= 1 {
byteWriter.icyInterval = 8192
writer.Header().Set("Icy-MetaInt", fmt.Sprintf("%d", byteWriter.icyInterval))
}
2022-03-02 17:54:56 +00:00
var wgClient sync.WaitGroup
writeCallback := func(packet packetizer.Packet) error {
2022-03-04 12:25:59 +00:00
if metadataPacket, ok := packet.(*QueueMetadataPacket); ok {
if len(metadataPacket.TrackEntry.Metadata.Artist) > 0 {
byteWriter.metadataToSend.Title = fmt.Sprintf("%s - %s", metadataPacket.TrackEntry.Metadata.Artist, metadataPacket.TrackEntry.Metadata.Title)
} else {
byteWriter.metadataToSend.Title = metadataPacket.TrackEntry.Metadata.Title
}
byteWriter.metadataToSend.URL = metadataPacket.TrackEntry.Metadata.Art
return nil
}
2022-03-02 17:54:56 +00:00
//TODO: icy
/*
select {
case <-request.Context().Done():
// Client gave up
default:
}
*/
_, err := byteWriter.Write(packet.GetData())
byteWriter.Flush()
return err
}
wgClient.Add(1)
var headers []HeaderEntry
for k, v := range request.Header {
for _, s := range v {
headers = append(headers, HeaderEntry{
Name: k,
Value: s,
})
}
}
uriPath := request.URL.Path
if len(request.URL.Query().Encode()) > 0 {
uriPath += "?" + request.URL.Query().Encode()
}
getKnownBufferSize := func() time.Duration {
userAgent := request.Header.Get("user-agent")
if strings.Index(userAgent, "libmpv") != -1 || strings.Index(userAgent, "mpv ") != -1 { //mpv
return time.Millisecond * 2500
} else if strings.Index(userAgent, "libvlc") != -1 { //VLC
return time.Millisecond * 2500
} else if strings.Index(userAgent, "lavf/") != -1 { //ffplay
return time.Millisecond * 2500
} else if strings.Index(userAgent, "gvfs/") != -1 { //gvfs
return time.Millisecond * 2500
} else if strings.Index(userAgent, "Music Player Daemon ") != -1 { //MPD
return time.Millisecond * 2500
} else if strings.Index(userAgent, " Chrome/") != -1 { //Chromium-based
return time.Millisecond * 5000
} else if strings.Index(userAgent, " Safari/") != -1 { //Safari-based
return time.Millisecond * 5000
} else if strings.Index(userAgent, " Gecko/") != -1 { //Gecko-based (Firefox)
return time.Millisecond * 5000
} else if request.Header.Get("icy-metadata") == "1" { //other unknown players
return time.Millisecond * 5000
}
//fallback and provide maximum buffer
return time.Second * maxBufferSize
}
sampleBufferLimit := int64(q.config.Queue.BufferSeconds * mount.SampleRate)
if q.config.Queue.BufferSeconds == 0 { //auto buffer setup based on user agent and other client headers
sampleBufferLimit = int64(getKnownBufferSize().Seconds() * float64(mount.SampleRate))
}
2022-03-02 17:54:56 +00:00
mount.AddListener(&StreamListener{
Information: ListenerInformation{
Mount: mount.Mount,
2022-03-02 17:54:56 +00:00
Path: uriPath,
Headers: headers,
},
Start: func(packets []packetizer.Packet) error {
if len(packets) > 0 {
2022-03-04 12:25:59 +00:00
sampleBufferMin := packets[len(packets)-1].GetStartSampleNumber() - sampleBufferLimit
for _, p := range packets {
2022-03-04 12:25:59 +00:00
if p.KeepMode() != packetizer.Discard || p.GetEndSampleNumber() >= sampleBufferMin {
if err := writeCallback(p); err != nil {
return err
}
}
2022-03-02 17:54:56 +00:00
}
}
return nil
},
Write: writeCallback,
Close: func() {
byteWriter.Close()
wgClient.Done()
},
})
wgClient.Wait()
return
}
}
writer.WriteHeader(http.StatusNotFound)
return
}