2022-09-03 14:20:40 +00:00
package queue
import (
"crypto/sha256"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"git.gammaspectra.live/S.O.N.G/Kirika/audio"
"git.gammaspectra.live/S.O.N.G/Kirika/audio/filter"
"git.gammaspectra.live/S.O.N.G/Kirika/audio/queue"
"git.gammaspectra.live/S.O.N.G/Kirika/audio/replaygain"
"git.gammaspectra.live/S.O.N.G/MeteorLight/config"
"git.gammaspectra.live/S.O.N.G/MeteorLight/listener"
"git.gammaspectra.live/S.O.N.G/MeteorLight/listener/aps1"
"git.gammaspectra.live/S.O.N.G/MeteorLight/listener/icy"
"git.gammaspectra.live/S.O.N.G/MeteorLight/listener/plain"
"git.gammaspectra.live/S.O.N.G/MeteorLight/queue/metadata"
"git.gammaspectra.live/S.O.N.G/MeteorLight/queue/track"
"git.gammaspectra.live/S.O.N.G/MeteorLight/stream"
"git.gammaspectra.live/S.O.N.G/MeteorLight/util"
2022-10-03 09:56:54 +00:00
"golang.org/x/exp/slices"
2022-09-03 14:20:40 +00:00
"log"
"net/http"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
)
type Queue struct {
NowPlaying chan * track . Entry
QueueEmpty chan * track . Entry
duration atomic . Int64
durationError int64
audioQueue * queue . Queue
mounts [ ] * stream . Mount
queue [ ] * track . Entry
mutex sync . RWMutex
config * config . Config
wg sync . WaitGroup
}
func NewQueue ( conf * config . Config ) * Queue {
if conf . Queue . SampleRate <= 0 {
conf . Queue . SampleRate = 44100
}
sampleFormat := audio . SourceInt16
bitDepth := 16
switch conf . Queue . SampleFormat {
case "f32" , "float" , "float32" , "f32le" :
sampleFormat = audio . SourceFloat32
bitDepth = 0
case "i32" , "s32" , "int32" , "int" , "s32le" :
sampleFormat = audio . SourceInt32
bitDepth = 32
case "i16" , "s16" , "int16" , "s16le" :
sampleFormat = audio . SourceInt16
bitDepth = 16
}
if conf . Queue . BitDepth > 0 {
bitDepth = conf . Queue . BitDepth
}
q := & Queue {
NowPlaying : make ( chan * track . Entry , 1 ) ,
QueueEmpty : make ( chan * track . Entry ) ,
config : conf ,
audioQueue : queue . NewQueue ( sampleFormat , bitDepth , conf . Queue . SampleRate , 2 ) ,
}
blocksPerSecond := 20
sources := filter . NewFilterChain ( q . audioQueue . GetSource ( ) , filter . NewBufferFilter ( 16 ) , filter . NewRealTimeFilter ( blocksPerSecond ) , filter . NewBufferFilter ( config . MaxBufferSize * blocksPerSecond ) ) . Split ( len ( conf . Streams ) )
for i , s := range q . config . Streams {
mount := stream . NewStreamMount ( sources [ i ] , s )
if mount == nil {
log . Panicf ( "could not initialize %s\n" , s . MountPath )
}
q . mounts = append ( q . mounts , mount )
q . wg . Add ( 1 )
go mount . Process ( & q . wg )
}
return q
}
func ( q * Queue ) GetDuration ( ) time . Duration {
return time . Duration ( q . duration . Load ( ) )
}
func ( q * Queue ) Wait ( ) {
q . wg . Wait ( )
close ( q . NowPlaying )
}
func ( q * Queue ) AddTrack ( entry * track . Entry , tail bool ) error {
if err := entry . Load ( ) ; err != nil {
return err
}
2022-09-05 15:11:49 +00:00
startCallback := func ( queue * queue . Queue , queueEntry * queue . Entry ) {
2022-09-03 14:20:40 +00:00
if e := q . Get ( queueEntry . Identifier ) ; e != nil { //is this needed?
log . Printf ( "now playing \"%s\": %s - %s (%s)\n" , e . Path , e . Metadata . Title , e . Metadata . Artist , e . Metadata . Album )
q . NowPlaying <- e
for _ , mount := range q . mounts {
2022-10-02 12:28:41 +00:00
mount . QueueMetadata ( & metadata . Packet {
2022-09-03 14:20:40 +00:00
//TODO: carry sample rate error
SampleNumber : ( q . duration . Load ( ) * int64 ( queue . GetSampleRate ( ) ) ) / int64 ( time . Second ) ,
TrackEntry : e ,
} )
}
} else {
log . Printf ( "now playing \"%s\": %s - %s (%s)\n" , entry . Path , entry . Metadata . Title , entry . Metadata . Artist , entry . Metadata . Album )
}
}
2022-09-05 15:11:49 +00:00
endCallback := func ( queue * queue . Queue , entry * queue . Entry ) {
2022-09-03 14:20:40 +00:00
}
2022-09-05 15:11:49 +00:00
removeCallback := func ( queue * queue . Queue , entry * queue . Entry ) {
2022-09-03 14:20:40 +00:00
//TODO: carry sample rate error
q . duration . Add ( int64 ( ( time . Second * time . Duration ( entry . ReadSamples . Load ( ) ) ) / time . Duration ( entry . Source . GetSampleRate ( ) ) ) )
q . Remove ( entry . Identifier )
q . HandleQueue ( )
}
q . mutex . Lock ( )
defer q . mutex . Unlock ( )
if q . config . Queue . Length > 0 && len ( q . queue ) >= q . config . Queue . Length {
return errors . New ( "queue too long" )
}
source := entry . Source ( )
if q . config . Queue . ReplayGain {
if entry . Metadata . ReplayGain . TrackPeak != 0 {
source = replaygain . NewReplayGainFilter ( entry . Metadata . ReplayGain . TrackGain , entry . Metadata . ReplayGain . TrackPeak , 0 ) . Process ( source )
} else {
source = replaygain . NewNormalizationFilter ( 5 ) . Process ( source )
}
}
if tail {
2022-09-05 15:11:49 +00:00
entry . Identifier = q . audioQueue . AddTail ( source , startCallback , endCallback , removeCallback )
2022-09-03 14:20:40 +00:00
} else {
2022-09-05 15:11:49 +00:00
entry . Identifier = q . audioQueue . AddHead ( source , startCallback , endCallback , removeCallback )
2022-09-03 14:20:40 +00:00
}
2022-09-05 15:11:49 +00:00
entry . Original [ "queue_id" ] = entry . Identifier
2022-09-03 14:20:40 +00:00
if tail || len ( q . queue ) == 0 {
q . queue = append ( q . queue , entry )
} else {
q . queue = append ( q . queue [ : 1 ] , append ( [ ] * track . Entry { entry } , q . queue [ 1 : ] ... ) ... )
}
return nil
}
func ( q * Queue ) HandleQueue ( ) {
if q . audioQueue . GetQueueSize ( ) == 0 {
if err := q . AddTrack ( <- q . QueueEmpty , true ) ; err != nil {
log . Printf ( "track addition error: \"%s\"" , err )
//TODO: maybe fail after n tries
time . Sleep ( time . Second )
q . HandleQueue ( )
}
}
}
func ( q * Queue ) GetQueue ( ) ( result [ ] * track . Entry ) {
q . mutex . RLock ( )
defer q . mutex . RUnlock ( )
if len ( q . queue ) > 1 {
2022-10-03 09:56:54 +00:00
result = slices . Clone ( q . queue [ 1 : ] )
2022-09-03 14:20:40 +00:00
}
return
}
2022-09-05 15:11:49 +00:00
func ( q * Queue ) Get ( identifier queue . Identifier ) * track . Entry {
2022-09-03 14:20:40 +00:00
q . mutex . RLock ( )
defer q . mutex . RUnlock ( )
for _ , e := range q . queue {
2022-09-05 15:11:49 +00:00
if e . Identifier == identifier {
2022-09-03 14:20:40 +00:00
return e
}
}
return nil
}
func ( q * Queue ) GetNowPlaying ( ) * track . Entry {
if e := q . audioQueue . GetQueueHead ( ) ; e != nil {
return q . Get ( e . Identifier )
}
return nil
}
func ( q * Queue ) SkipNowPlaying ( ) bool {
if e := q . audioQueue . GetQueueHead ( ) ; e != nil {
return q . Remove ( e . Identifier )
}
return false
}
func ( q * Queue ) GetIndex ( index int ) * track . Entry {
if e := q . audioQueue . GetQueueIndex ( index + 1 ) ; e != nil {
return q . Get ( e . Identifier )
}
return nil
}
func ( q * Queue ) GetHead ( ) * track . Entry {
if e := q . audioQueue . GetQueueIndex ( 1 ) ; e != nil {
return q . Get ( e . Identifier )
}
return nil
}
func ( q * Queue ) GetTail ( ) * track . Entry {
if i , e := q . audioQueue . GetQueueTail ( ) ; i != 0 && e != nil {
return q . Get ( e . Identifier )
}
return nil
}
2022-09-05 15:11:49 +00:00
func ( q * Queue ) Remove ( identifier queue . Identifier ) bool {
var entry * track . Entry
func ( ) {
defer q . audioQueue . Remove ( identifier )
q . mutex . Lock ( )
defer q . mutex . Unlock ( )
for i , e := range q . queue {
if e . Identifier == identifier {
2022-10-03 09:56:54 +00:00
q . queue = slices . Delete ( q . queue , i , i + 1 )
2022-09-05 15:11:49 +00:00
entry = e
return
}
2022-09-03 14:20:40 +00:00
}
2022-09-05 15:11:49 +00:00
} ( )
if entry != nil {
_ = entry . Close ( )
return true
2022-09-03 14:20:40 +00:00
}
return false
}
func ( q * Queue ) RemoveListener ( identifier string ) bool {
q . mutex . RLock ( )
defer q . mutex . RUnlock ( )
for _ , mount := range q . mounts {
if mount . RemoveListener ( identifier ) {
return true
}
}
return false
}
func ( q * Queue ) GetListeners ( ) ( listeners [ ] * listener . Information ) {
q . mutex . RLock ( )
defer q . mutex . RUnlock ( )
listeners = make ( [ ] * listener . Information , 0 , 1 )
for _ , mount := range q . mounts {
listeners = append ( listeners , mount . GetListeners ( ) ... )
}
return
}
func ( q * Queue ) HandleRadioRequest ( writer http . ResponseWriter , request * http . Request ) {
writer . Header ( ) . Set ( "Server" , "MeteorLight/radio" )
writer . Header ( ) . Set ( "Connection" , "close" )
writer . Header ( ) . Set ( "X-Content-Type-Options" , "nosniff" )
writer . Header ( ) . Set ( "Access-Control-Allow-Origin" , "*" )
writer . Header ( ) . Set ( "Access-Control-Allow-Headers" , "Accept, Content-Type, Icy-Metadata" )
writer . Header ( ) . Set ( "Accept-Ranges" , "none" )
writer . Header ( ) . Set ( "Connection" , "close" )
if strings . HasSuffix ( request . URL . Path , "mounts" ) {
writer . Header ( ) . Set ( "Content-Type" , "application/json; charset=utf-8" )
writer . Header ( ) . Set ( "Access-Control-Expose-Headers" , "Accept-Ranges, Server, Content-Type" )
type mountData struct {
Path string ` json:"mount" `
MimeType string ` json:"mime" `
FormatDescription string ` json:"formatDescription" `
SampleRate int ` json:"sampleRate" `
Channels int ` json:"channels" `
Listeners int ` json:"listeners" `
Options map [ string ] interface { } ` json:"options" `
}
var mounts [ ] mountData
for _ , mount := range q . mounts {
mounts = append ( mounts , mountData {
Path : strings . TrimSuffix ( request . URL . Path , "mounts" ) + mount . Mount ,
MimeType : mount . MimeType ,
SampleRate : mount . SampleRate ,
FormatDescription : mount . FormatDescription ,
Channels : mount . Channels ,
Listeners : len ( mount . GetListeners ( ) ) ,
Options : mount . Options ,
} )
}
jsonBytes , _ := json . MarshalIndent ( mounts , "" , " " )
writer . WriteHeader ( http . StatusOK )
2022-10-03 09:56:54 +00:00
_ , _ = writer . Write ( jsonBytes )
2022-09-03 14:20:40 +00:00
return
}
for _ , mount := range q . mounts {
if strings . HasSuffix ( request . URL . Path , mount . Mount ) {
writer . Header ( ) . Set ( "Content-Type" , mount . MimeType )
writer . Header ( ) . Set ( "Cache-Control" , "no-store, max-age=604800" )
writer . Header ( ) . Set ( "Access-Control-Expose-Headers" , "Accept-Ranges, Server, Content-Type, Icy-MetaInt, X-Listener-Identifier" )
writer . Header ( ) . Set ( "Vary" , "*" )
rangeHeader := request . Header . Get ( "range" )
if rangeHeader != "" && rangeHeader != "bytes=0-" {
//TODO: maybe should fail in case bytes are requested
if strings . Index ( request . UserAgent ( ) , " Safari/" ) != - 1 && mount . MimeType == "audio/flac" {
//Safari special case, fake Range check so it decodes afterwards.
//Safari creates a request with Range for 0-1, specifically for FLAC, and expects a result supporting range. Afterwards it requests the whole file.
//However the decoder is able to decode FLAC livestreams. If we fake the initial range response, then afterwards serve normal responses, Safari will happily work.
//TODO: remove this AS SOON as safari works on its own
//safariLargeFileValue arbitrary large value, cannot be that large or iOS Safari fails.
safariLargeFileValue := 1024 * 1024 * 1024 * 1024 * 16 // 16 TiB
if rangeHeader == "bytes=0-1" {
//first request
writer . Header ( ) . Set ( "Accept-Ranges" , "bytes" )
writer . Header ( ) . Set ( "Content-Range" , fmt . Sprintf ( "bytes 0-1/%d" , safariLargeFileValue ) ) //64 TiB max fake size
writer . Header ( ) . Set ( "Content-Length" , "2" )
writer . WriteHeader ( http . StatusPartialContent )
2022-10-03 09:56:54 +00:00
_ , _ = writer . Write ( [ ] byte { 'f' , 'L' } )
2022-09-03 14:20:40 +00:00
return
} else if rangeHeader == fmt . Sprintf ( "bytes=0-%d" , safariLargeFileValue - 1 ) {
//second request, serve status 200 to keep retries to a minimum
writer . Header ( ) . Set ( "Content-Length" , fmt . Sprintf ( "%d" , safariLargeFileValue ) )
writer . WriteHeader ( http . StatusOK )
} else if strings . HasPrefix ( rangeHeader , "bytes=" ) && strings . HasSuffix ( rangeHeader , fmt . Sprintf ( "-%d" , safariLargeFileValue - 1 ) ) {
//any other requests, these should fail
writer . Header ( ) . Set ( "Content-Range" , fmt . Sprintf ( "bytes %s/%d" , strings . TrimPrefix ( rangeHeader , "bytes=" ) , safariLargeFileValue ) )
writer . Header ( ) . Set ( "Accept-Ranges" , "bytes" )
writer . WriteHeader ( http . StatusRequestedRangeNotSatisfiable )
return
}
}
}
bitrate := 0
if value , ok := mount . Options [ "bitrate" ] ; ok {
if intValue , ok := value . ( int ) ; ok {
bitrate = intValue
} else if int64Value , ok := value . ( int64 ) ; ok {
bitrate = int ( int64Value )
}
}
//set some audiocast/icy radio headers
writer . Header ( ) . Set ( "x-audiocast-name" , q . config . Radio . Name )
writer . Header ( ) . Set ( "x-audiocast-bitrate" , fmt . Sprintf ( "%d" , bitrate ) )
writer . Header ( ) . Set ( "icy-name" , q . config . Radio . Name )
writer . Header ( ) . Set ( "icy-version" , "2" )
writer . Header ( ) . Set ( "icy-index-metadata" , "1" )
if q . config . Radio . Description != "" {
writer . Header ( ) . Set ( "x-audiocast-description" , q . config . Radio . Description )
writer . Header ( ) . Set ( "icy-description" , q . config . Radio . Description )
}
if q . config . Radio . URL != "" {
writer . Header ( ) . Set ( "x-audiocast-url" , q . config . Radio . URL )
writer . Header ( ) . Set ( "icy-url" , q . config . Radio . URL )
}
if q . config . Radio . Logo != "" {
writer . Header ( ) . Set ( "icy-logo" , q . config . Radio . Logo )
}
writer . Header ( ) . Set ( "icy-br" , fmt . Sprintf ( "%d" , bitrate ) )
writer . Header ( ) . Set ( "icy-sr" , fmt . Sprintf ( "%d" , mount . SampleRate ) )
writer . Header ( ) . Set ( "icy-audio-info" , fmt . Sprintf ( "ice-channels=%d;ice-samplerate=%d;ice-bitrate=%d" , mount . Channels , mount . SampleRate , bitrate ) )
if q . config . Radio . Private {
writer . Header ( ) . Set ( "icy-pub" , "0" )
writer . Header ( ) . Set ( "icy-do-not-index" , "1" )
writer . Header ( ) . Set ( "x-audiocast-public" , "0" )
writer . Header ( ) . Set ( "x-robots-tag" , "noindex, nofollow" )
} else {
writer . Header ( ) . Set ( "icy-pub" , "1" )
writer . Header ( ) . Set ( "icy-do-not-index" , "0" )
writer . Header ( ) . Set ( "x-audiocast-public" , "1" )
}
requestDone := util . RequestDone { }
var headers [ ] listener . HeaderEntry
for k , v := range request . Header {
for _ , s := range v {
headers = append ( headers , listener . HeaderEntry {
Name : k ,
Value : s ,
} )
}
}
uriPath := request . URL . Path
if len ( request . URL . Query ( ) . Encode ( ) ) > 0 {
uriPath += "?" + request . URL . Query ( ) . Encode ( )
}
getKnownBufferSize := func ( ) time . Duration {
userAgent := request . Header . Get ( "user-agent" )
if strings . Index ( userAgent , "libmpv" ) != - 1 || strings . Index ( userAgent , "mpv " ) != - 1 { //mpv
return time . Millisecond * 2500
} else if strings . Index ( userAgent , "libvlc" ) != - 1 { //VLC
return time . Millisecond * 2500
} else if strings . Index ( userAgent , "lavf/" ) != - 1 { //ffplay
return time . Millisecond * 2500
} else if strings . Index ( userAgent , "gvfs/" ) != - 1 { //gvfs
return time . Millisecond * 2500
} else if strings . Index ( userAgent , "Music Player Daemon " ) != - 1 { //MPD
return time . Millisecond * 2500
} else if strings . Index ( userAgent , " Chrome/" ) != - 1 { //Chromium-based
return time . Millisecond * 4000
} else if strings . Index ( userAgent , " Safari/" ) != - 1 { //Safari-based
return time . Millisecond * 5000
} else if strings . Index ( userAgent , " Gecko/" ) != - 1 { //Gecko-based (Firefox)
return time . Millisecond * 5000
} else if request . Header . Get ( "icy-metadata" ) == "1" { //other unknown players
return time . Millisecond * 5000
}
//fallback and provide maximum buffer
return time . Second * config . MaxBufferSize
}
sampleBufferLimit := int64 ( q . config . Queue . BufferSeconds * mount . SampleRate )
if q . config . Queue . BufferSeconds == 0 { //auto buffer setup based on user agent and other client headers
sampleBufferLimit = int64 ( getKnownBufferSize ( ) . Seconds ( ) * float64 ( mount . SampleRate ) )
}
startStamp := time . Now ( ) . Unix ( )
hashSum := sha256 . Sum256 ( [ ] byte ( fmt . Sprintf ( "%s-%s-%s-%s-%d" , request . RequestURI , request . RemoteAddr , request . Proto , request . Header . Get ( "user-agent" ) , startStamp ) ) )
listenerIdentifier := hex . EncodeToString ( hashSum [ 16 : ] )
listenerInformation := listener . Information {
Identifier : listenerIdentifier ,
Mount : mount . Mount ,
Path : uriPath ,
Headers : headers ,
Start : startStamp ,
}
var mountListener listener . Listener
var extraHeaders map [ string ] string
2022-10-02 13:19:51 +00:00
ctx := request . Context ( )
//buffer a bit, drop channels when buffer grows to not lock others. They will get disconnected elsewhere
const byteSliceChannelBuffer = 1024 * 16
writeChannel := make ( chan [ ] byte , byteSliceChannelBuffer )
go func ( ) {
var flusher http . Flusher
if httpFlusher , ok := writer . ( http . Flusher ) ; ok {
flusher = httpFlusher
}
for ! requestDone . Done ( ) {
select {
case byteSlice := <- writeChannel :
if _ , err := writer . Write ( byteSlice ) ; err != nil {
requestDone . Fail ( err )
break
}
//try flush
if flusher != nil {
flusher . Flush ( )
}
case <- ctx . Done ( ) :
requestDone . Fail ( ctx . Err ( ) )
break
}
}
} ( )
funcWriter := func ( byteSlice [ ] byte ) error {
if requestDone . Done ( ) {
return requestDone . Error ( )
}
if len ( byteSlice ) > 0 {
if len ( writeChannel ) >= ( cap ( writeChannel ) - 1 ) {
requestDone . Fail ( errors . New ( "client ran out of writer buffer" ) )
return requestDone . Error ( )
}
writeChannel <- byteSlice
}
return nil
}
2022-09-03 14:20:40 +00:00
//set X-Audio-Packet-Stream for strictly timed packets and metadata
if numberValue , err := strconv . Atoi ( request . Header . Get ( "x-audio-packet-stream" ) ) ; err == nil && numberValue == 1 {
//version 1
2022-10-02 13:19:51 +00:00
mountListener , extraHeaders = aps1 . NewListener ( listenerInformation , funcWriter , sampleBufferLimit , mount . OffsetStart , mount . Channels , mount . SampleRate , mount . MimeType )
2022-09-03 14:20:40 +00:00
} else if numberValue , err = strconv . Atoi ( request . Header . Get ( "icy-metadata" ) ) ; err == nil && numberValue >= 1 {
2022-10-02 13:19:51 +00:00
mountListener , extraHeaders = icy . NewListener ( listenerInformation , funcWriter , sampleBufferLimit , mount . OffsetStart )
2022-09-03 14:20:40 +00:00
} else {
2022-10-02 13:19:51 +00:00
mountListener , extraHeaders = plain . NewListener ( listenerInformation , funcWriter , sampleBufferLimit , mount . OffsetStart )
2022-09-03 14:20:40 +00:00
}
if mountListener == nil {
writer . WriteHeader ( http . StatusInternalServerError )
return
}
for k , v := range extraHeaders {
writer . Header ( ) . Set ( k , v )
}
writer . Header ( ) . Set ( "x-listener-identifier" , listenerIdentifier )
log . Printf ( "adding %s client to stream %s (%s, %s, agent \"%s\", buffer %.2f seconds)\n" , listenerIdentifier , mount . Mount , request . RemoteAddr , request . Proto , request . Header . Get ( "user-agent" ) , float64 ( sampleBufferLimit ) / float64 ( mount . SampleRate ) )
mount . AddListener ( mountListener )
2022-10-02 13:19:51 +00:00
mountListener . Wait ( )
2022-09-03 14:20:40 +00:00
2022-10-02 13:19:51 +00:00
requestDone . Complete ( )
2022-09-03 14:20:40 +00:00
log . Printf ( "removing %s client from stream %s\n" , listenerIdentifier , mount . Mount )
return
}
}
writer . WriteHeader ( http . StatusNotFound )
return
}