2022-03-02 17:54:56 +00:00
package main
import (
2022-04-20 11:20:22 +00:00
"bytes"
"encoding/binary"
"encoding/json"
2022-03-05 09:34:53 +00:00
"errors"
2022-03-02 17:54:56 +00:00
"fmt"
"git.gammaspectra.live/S.O.N.G/Kirika/audio"
2022-05-15 14:42:28 +00:00
"git.gammaspectra.live/S.O.N.G/Kirika/audio/filter"
2022-03-06 15:29:03 +00:00
"git.gammaspectra.live/S.O.N.G/Kirika/audio/format/guess"
2022-03-02 17:54:56 +00:00
"git.gammaspectra.live/S.O.N.G/Kirika/audio/packetizer"
2022-05-15 14:42:28 +00:00
"git.gammaspectra.live/S.O.N.G/Kirika/audio/queue"
2022-03-08 14:10:06 +00:00
"git.gammaspectra.live/S.O.N.G/Kirika/audio/replaygain"
2022-03-06 15:29:03 +00:00
"github.com/dhowden/tag"
"io"
2022-03-02 17:54:56 +00:00
"log"
"net/http"
"os"
2022-03-05 10:40:06 +00:00
"runtime"
2022-03-04 12:25:59 +00:00
"strconv"
2022-03-02 17:54:56 +00:00
"strings"
"sync"
2022-03-05 09:34:53 +00:00
"sync/atomic"
2022-03-03 14:00:34 +00:00
"time"
2022-03-02 17:54:56 +00:00
)
2022-03-03 14:00:34 +00:00
const maxBufferSize = 10
2022-03-02 17:54:56 +00:00
type QueueTrackEntry struct {
2022-05-15 14:42:28 +00:00
QueueIdentifier queue . QueueIdentifier
2022-03-02 17:54:56 +00:00
Path string
2022-03-08 14:10:06 +00:00
Metadata struct {
Title string ` json:"title" `
Album string ` json:"album" `
Artist string ` json:"artist" `
Art string ` json:"art" `
ReplayGain struct {
TrackPeak float64 ` json:"track_peak" `
TrackGain float64 ` json:"track_gain" `
AlbumPeak float64 ` json:"album_peak" `
AlbumGain float64 ` json:"album_gain" `
} ` json:"replay_gain,omitempty" `
2022-03-02 17:54:56 +00:00
}
2022-03-06 19:19:36 +00:00
reader io . ReadSeekCloser
2022-03-05 10:40:06 +00:00
source audio . Source
2022-03-02 17:54:56 +00:00
original map [ string ] interface { }
}
2022-03-05 10:40:06 +00:00
func ( e * QueueTrackEntry ) Load ( ) error {
2022-07-19 15:21:51 +00:00
if e . source != nil {
2022-03-05 10:40:06 +00:00
return nil
}
2022-03-06 16:55:34 +00:00
if len ( e . Path ) > 4 && e . Path [ : 4 ] == "http" {
s , err := NewRangeReadSeekCloser ( e . Path )
if err != nil {
return err
}
//close at end, TODO check if it runs
runtime . SetFinalizer ( s , ( * RangeReadSeekCloser ) . Close )
2022-03-06 19:19:36 +00:00
e . reader = s
2022-03-06 16:55:34 +00:00
} else {
f , err := os . Open ( e . Path )
if err != nil {
return err
}
//close at end, TODO check if it runs
runtime . SetFinalizer ( f , ( * os . File ) . Close )
2022-03-06 19:19:36 +00:00
e . reader = f
2022-03-05 10:40:06 +00:00
}
2022-03-06 19:19:36 +00:00
if e . reader == nil {
2022-03-06 16:55:34 +00:00
return errors . New ( "could not find stream opener" )
}
2022-03-06 19:19:36 +00:00
meta , err := tag . ReadFrom ( e . reader )
2022-03-06 15:29:03 +00:00
if err != nil {
err = nil
}
2022-03-06 19:19:36 +00:00
if _ , err = e . reader . Seek ( 0 , io . SeekStart ) ; err != nil {
2022-03-06 15:29:03 +00:00
return err
2022-03-05 10:40:06 +00:00
}
2022-03-06 19:19:36 +00:00
decoders , err := guess . GetDecoders ( e . reader , e . Path )
2022-03-06 15:29:03 +00:00
if err != nil {
return err
}
2022-03-06 19:19:36 +00:00
source , err := guess . Open ( e . reader , decoders )
2022-03-05 10:40:06 +00:00
if err != nil {
return err
}
2022-07-19 15:21:51 +00:00
if source == nil {
2022-03-05 10:40:06 +00:00
return fmt . Errorf ( "could not find decoder for %s" , e . Path )
}
e . source = source
2022-03-06 15:29:03 +00:00
//apply tags found on file
if meta != nil {
if e . Metadata . Title == "" {
e . Metadata . Title = meta . Title ( )
}
if e . Metadata . Album == "" {
2022-03-06 16:55:34 +00:00
e . Metadata . Album = meta . Album ( )
2022-03-06 15:29:03 +00:00
}
if e . Metadata . Artist == "" {
e . Metadata . Artist = meta . Artist ( )
}
if e . Metadata . Artist == "" {
e . Metadata . Artist = meta . AlbumArtist ( )
}
tags := meta . Raw ( )
var strValue string
var value interface { }
var ok bool
2022-03-08 14:10:06 +00:00
getDb := func ( strValue string ) ( ret float64 ) {
ret , _ = strconv . ParseFloat ( strings . TrimSpace ( strings . TrimSuffix ( strings . ToLower ( strValue ) , "db" ) ) , 64 )
return
}
if e . Metadata . ReplayGain . TrackPeak == 0 {
2022-03-06 16:55:34 +00:00
if value , ok = tags [ "replaygain_track_gain" ] ; ok {
if strValue , ok = value . ( string ) ; ok {
2022-03-08 14:10:06 +00:00
e . Metadata . ReplayGain . TrackGain = getDb ( strValue )
2022-03-06 15:29:03 +00:00
}
}
2022-03-06 16:55:34 +00:00
if value , ok = tags [ "replaygain_track_peak" ] ; ok {
if strValue , ok = value . ( string ) ; ok {
2022-03-08 14:10:06 +00:00
e . Metadata . ReplayGain . TrackPeak = getDb ( strValue )
2022-03-06 15:29:03 +00:00
}
}
2022-03-06 16:55:34 +00:00
if value , ok = tags [ "replaygain_album_gain" ] ; ok {
if strValue , ok = value . ( string ) ; ok {
2022-03-08 14:10:06 +00:00
e . Metadata . ReplayGain . AlbumGain = getDb ( strValue )
2022-03-06 16:55:34 +00:00
}
2022-03-06 15:29:03 +00:00
}
2022-03-06 16:55:34 +00:00
if value , ok = tags [ "replaygain_album_peak" ] ; ok {
if strValue , ok = value . ( string ) ; ok {
2022-03-08 14:10:06 +00:00
e . Metadata . ReplayGain . AlbumPeak = getDb ( strValue )
2022-03-06 16:55:34 +00:00
}
2022-03-06 15:29:03 +00:00
}
}
}
2022-03-05 10:40:06 +00:00
return nil
}
2022-03-04 12:25:59 +00:00
type QueueMetadataPacket struct {
sampleNumber int64
TrackEntry * QueueTrackEntry
}
func ( p * QueueMetadataPacket ) KeepMode ( ) packetizer . KeepMode {
return packetizer . KeepLast
}
func ( p * QueueMetadataPacket ) GetStartSampleNumber ( ) int64 {
return p . sampleNumber
}
func ( p * QueueMetadataPacket ) GetEndSampleNumber ( ) int64 {
return p . sampleNumber
}
func ( p * QueueMetadataPacket ) Category ( ) int64 {
return - 1
}
func ( p * QueueMetadataPacket ) GetData ( ) [ ] byte {
return nil
}
2022-03-02 17:54:56 +00:00
type Queue struct {
2022-03-05 09:34:53 +00:00
NowPlaying chan * QueueTrackEntry
QueueEmpty chan * QueueTrackEntry
Duration time . Duration
durationError int64
2022-05-15 14:42:28 +00:00
audioQueue * queue . Queue
2022-03-05 09:34:53 +00:00
mounts [ ] * StreamMount
queue [ ] * QueueTrackEntry
mutex sync . RWMutex
config * Config
wg sync . WaitGroup
2022-03-02 17:54:56 +00:00
}
func NewQueue ( config * Config ) * Queue {
2022-05-15 14:42:28 +00:00
if config . Queue . SampleRate <= 0 {
config . Queue . SampleRate = 44100
}
2022-07-19 15:21:51 +00:00
sampleFormat := audio . SourceInt16
bitDepth := 16
switch config . Queue . SampleFormat {
case "f32" , "float" , "float32" , "f32le" :
sampleFormat = audio . SourceFloat32
bitDepth = 0
case "i32" , "s32" , "int32" , "int" , "s32le" :
sampleFormat = audio . SourceInt32
bitDepth = 32
case "i16" , "s16" , "int16" , "s16le" :
sampleFormat = audio . SourceInt16
bitDepth = 16
}
if config . Queue . BitDepth > 0 {
bitDepth = config . Queue . BitDepth
}
2022-03-02 17:54:56 +00:00
q := & Queue {
NowPlaying : make ( chan * QueueTrackEntry , 1 ) ,
QueueEmpty : make ( chan * QueueTrackEntry ) ,
config : config ,
2022-07-19 15:21:51 +00:00
audioQueue : queue . NewQueue ( sampleFormat , bitDepth , config . Queue . SampleRate , 2 ) ,
2022-03-02 17:54:56 +00:00
}
blocksPerSecond := 20
2022-07-19 15:21:51 +00:00
sources := filter . NewFilterChain ( q . audioQueue . GetSource ( ) , filter . NewBufferFilter ( 16 ) , filter . NewRealTimeFilter ( blocksPerSecond ) , filter . NewBufferFilter ( maxBufferSize * blocksPerSecond ) ) . Split ( len ( config . Streams ) )
2022-03-02 17:54:56 +00:00
for i , s := range q . config . Streams {
2022-05-15 15:07:30 +00:00
mount := NewStreamMount ( sources [ i ] , s )
2022-03-02 17:54:56 +00:00
if mount == nil {
log . Panicf ( "could not initialize %s\n" , s . MountPath )
}
q . mounts = append ( q . mounts , mount )
q . wg . Add ( 1 )
go mount . Process ( & q . wg )
}
return q
}
func ( q * Queue ) Wait ( ) {
q . wg . Wait ( )
close ( q . NowPlaying )
}
func ( q * Queue ) AddTrack ( entry * QueueTrackEntry , tail bool ) error {
2022-03-05 10:40:06 +00:00
if err := entry . Load ( ) ; err != nil {
2022-03-02 17:54:56 +00:00
return err
}
2022-05-15 14:42:28 +00:00
startCallback := func ( queue * queue . Queue , queueEntry * queue . QueueEntry ) {
2022-03-05 10:40:06 +00:00
if e := q . Get ( queueEntry . Identifier ) ; e != nil { //is this needed?
log . Printf ( "now playing %s\n" , e . Path )
2022-03-02 17:54:56 +00:00
q . NowPlaying <- e
2022-03-04 12:25:59 +00:00
for _ , mount := range q . mounts {
mount . MetadataQueue . Enqueue ( & QueueMetadataPacket {
//TODO: carry error
sampleNumber : int64 ( q . Duration * time . Duration ( queue . GetSampleRate ( ) ) / time . Second ) ,
TrackEntry : e ,
} )
}
2022-03-05 10:40:06 +00:00
} else {
log . Printf ( "now playing %s\n" , entry . Path )
2022-03-02 17:54:56 +00:00
}
}
2022-05-15 14:42:28 +00:00
endCallback := func ( queue * queue . Queue , entry * queue . QueueEntry ) {
2022-03-02 17:54:56 +00:00
}
2022-05-15 14:42:28 +00:00
removeCallback := func ( queue * queue . Queue , entry * queue . QueueEntry ) {
2022-07-19 15:21:51 +00:00
atomic . AddInt64 ( ( * int64 ) ( & q . Duration ) , int64 ( ( time . Second * time . Duration ( entry . ReadSamples ) ) / time . Duration ( entry . Source . GetSampleRate ( ) ) ) )
2022-03-04 12:25:59 +00:00
2022-03-02 17:54:56 +00:00
q . Remove ( entry . Identifier )
q . HandleQueue ( )
}
q . mutex . Lock ( )
defer q . mutex . Unlock ( )
2022-04-20 09:08:33 +00:00
if q . config . Queue . Length > 0 && len ( q . queue ) >= q . config . Queue . Length {
return errors . New ( "queue too long" )
}
2022-03-06 15:29:03 +00:00
source := entry . source
2022-03-08 14:10:06 +00:00
if q . config . Queue . ReplayGain {
if entry . Metadata . ReplayGain . TrackPeak != 0 {
source = replaygain . NewReplayGainFilter ( entry . Metadata . ReplayGain . TrackGain , entry . Metadata . ReplayGain . TrackPeak , 0 ) . Process ( source )
} else {
source = replaygain . NewNormalizationFilter ( 5 ) . Process ( source )
}
2022-03-06 15:29:03 +00:00
}
2022-03-02 17:54:56 +00:00
if tail {
2022-03-06 15:29:03 +00:00
entry . QueueIdentifier = q . audioQueue . AddTail ( source , startCallback , endCallback , removeCallback )
2022-03-02 17:54:56 +00:00
} else {
2022-03-06 15:29:03 +00:00
entry . QueueIdentifier = q . audioQueue . AddHead ( source , startCallback , endCallback , removeCallback )
2022-03-02 17:54:56 +00:00
}
entry . original [ "queue_id" ] = entry . QueueIdentifier
if tail || len ( q . queue ) == 0 {
q . queue = append ( q . queue , entry )
} else {
q . queue = append ( q . queue [ : 1 ] , append ( [ ] * QueueTrackEntry { entry } , q . queue [ 1 : ] ... ) ... )
}
return nil
}
func ( q * Queue ) HandleQueue ( ) {
if q . audioQueue . GetQueueSize ( ) == 0 {
2022-04-20 09:04:53 +00:00
if err := q . AddTrack ( <- q . QueueEmpty , true ) ; err != nil {
log . Printf ( "track addition error: \"%s\"" , err )
//TODO: maybe fail after n tries
time . Sleep ( time . Second )
q . HandleQueue ( )
}
2022-03-02 17:54:56 +00:00
}
}
func ( q * Queue ) GetQueue ( ) ( result [ ] * QueueTrackEntry ) {
q . mutex . RLock ( )
defer q . mutex . RUnlock ( )
if len ( q . queue ) > 1 {
result = make ( [ ] * QueueTrackEntry , len ( q . queue ) - 1 )
copy ( result , q . queue [ 1 : ] )
}
return
}
2022-05-15 14:42:28 +00:00
func ( q * Queue ) Get ( identifier queue . QueueIdentifier ) * QueueTrackEntry {
2022-03-02 17:54:56 +00:00
q . mutex . RLock ( )
defer q . mutex . RUnlock ( )
for _ , e := range q . queue {
if e . QueueIdentifier == identifier {
return e
}
}
return nil
}
func ( q * Queue ) GetNowPlaying ( ) * QueueTrackEntry {
if e := q . audioQueue . GetQueueHead ( ) ; e != nil {
return q . Get ( e . Identifier )
}
return nil
}
func ( q * Queue ) SkipNowPlaying ( ) bool {
if e := q . audioQueue . GetQueueHead ( ) ; e != nil {
return q . Remove ( e . Identifier )
}
return false
}
func ( q * Queue ) GetIndex ( index int ) * QueueTrackEntry {
if e := q . audioQueue . GetQueueIndex ( index + 1 ) ; e != nil {
return q . Get ( e . Identifier )
}
return nil
}
func ( q * Queue ) GetHead ( ) * QueueTrackEntry {
if e := q . audioQueue . GetQueueIndex ( 1 ) ; e != nil {
return q . Get ( e . Identifier )
}
return nil
}
func ( q * Queue ) GetTail ( ) * QueueTrackEntry {
if i , e := q . audioQueue . GetQueueTail ( ) ; i != 0 && e != nil {
return q . Get ( e . Identifier )
}
return nil
}
2022-05-15 14:42:28 +00:00
func ( q * Queue ) Remove ( identifier queue . QueueIdentifier ) bool {
2022-03-02 17:54:56 +00:00
q . mutex . Lock ( )
for i , e := range q . queue {
if e . QueueIdentifier == identifier {
q . queue = append ( q . queue [ : i ] , q . queue [ i + 1 : ] ... )
q . mutex . Unlock ( )
q . audioQueue . Remove ( identifier )
2022-03-06 19:19:36 +00:00
if e . reader != nil {
e . reader . Close ( )
}
2022-03-02 17:54:56 +00:00
return true
}
}
q . mutex . Unlock ( )
q . audioQueue . Remove ( identifier )
return false
}
func ( q * Queue ) GetListeners ( ) ( listeners [ ] * ListenerInformation ) {
q . mutex . RLock ( )
2022-03-02 18:27:46 +00:00
defer q . mutex . RUnlock ( )
2022-03-02 17:54:56 +00:00
listeners = make ( [ ] * ListenerInformation , 0 , 1 )
for _ , mount := range q . mounts {
listeners = append ( listeners , mount . GetListeners ( ) ... )
}
return
}
2022-07-16 12:49:47 +00:00
type PacketStreamType uint64
2022-04-20 11:20:22 +00:00
2022-07-16 12:49:47 +00:00
//PacketStreamType The order of these fields is important and set on-wire protocol
2022-04-20 11:20:22 +00:00
const (
2022-07-16 12:49:47 +00:00
Header = PacketStreamType ( iota )
2022-04-20 11:20:22 +00:00
DataKeepLast
DataKeep
DataGroupKeep
DataGroupDiscard
DataDiscard
TrackIdentifier
TrackMetadata
)
type packetStreamFrame struct {
2022-07-16 12:49:47 +00:00
Type PacketStreamType
Category int64
2022-04-20 11:20:22 +00:00
StartSampleNumber int64
DurationInSamples int64
//automatically filled based on Data
//Size uint64
Data [ ] byte
}
func ( p * packetStreamFrame ) Encode ( ) [ ] byte {
buf := new ( bytes . Buffer )
varBuf := make ( [ ] byte , binary . MaxVarintLen64 )
n := binary . PutUvarint ( varBuf , uint64 ( p . Type ) )
buf . Write ( varBuf [ : n ] )
2022-07-16 12:49:47 +00:00
n = binary . PutUvarint ( varBuf , uint64 ( p . Category ) )
buf . Write ( varBuf [ : n ] )
2022-04-20 11:20:22 +00:00
n = binary . PutVarint ( varBuf , p . StartSampleNumber )
buf . Write ( varBuf [ : n ] )
n = binary . PutVarint ( varBuf , p . DurationInSamples )
buf . Write ( varBuf [ : n ] )
n = binary . PutUvarint ( varBuf , uint64 ( len ( p . Data ) ) )
buf . Write ( varBuf [ : n ] )
buf . Write ( p . Data )
return buf . Bytes ( )
}
2022-03-02 17:54:56 +00:00
func ( q * Queue ) HandleRadioRequest ( writer http . ResponseWriter , request * http . Request ) {
2022-07-14 21:21:58 +00:00
writer . Header ( ) . Set ( "Server" , "MeteorLight/radio" )
writer . Header ( ) . Set ( "Connection" , "close" )
writer . Header ( ) . Set ( "X-Content-Type-Options" , "nosniff" )
writer . Header ( ) . Set ( "Access-Control-Allow-Origin" , "*" )
writer . Header ( ) . Set ( "Access-Control-Allow-Headers" , "Accept, Content-Type, Icy-Metadata" )
writer . Header ( ) . Set ( "Accept-Ranges" , "none" )
writer . Header ( ) . Set ( "Connection" , "close" )
2022-05-15 14:42:28 +00:00
if strings . HasSuffix ( request . URL . Path , "mounts" ) {
2022-05-15 18:42:42 +00:00
writer . Header ( ) . Set ( "Content-Type" , "application/json; charset=utf-8" )
2022-05-15 14:42:28 +00:00
writer . Header ( ) . Set ( "Access-Control-Expose-Headers" , "Accept-Ranges, Server, Content-Type" )
type mountData struct {
Path string ` json:"mount" `
MimeType string ` json:"mime" `
FormatDescription string ` json:"formatDescription" `
SampleRate int ` json:"sampleRate" `
Channels int ` json:"channels" `
Listeners int ` json:"listeners" `
Options map [ string ] interface { } ` json:"options" `
}
var mounts [ ] mountData
for _ , mount := range q . mounts {
mounts = append ( mounts , mountData {
Path : strings . TrimSuffix ( request . URL . Path , "mounts" ) + mount . Mount ,
MimeType : mount . MimeType ,
SampleRate : mount . SampleRate ,
FormatDescription : mount . FormatDescription ,
Channels : mount . Channels ,
Listeners : len ( mount . listeners ) ,
Options : mount . Options ,
} )
}
jsonBytes , _ := json . MarshalIndent ( mounts , "" , " " )
writer . WriteHeader ( http . StatusOK )
writer . Write ( jsonBytes )
return
}
2022-03-02 17:54:56 +00:00
for _ , mount := range q . mounts {
2022-03-02 18:32:41 +00:00
if strings . HasSuffix ( request . URL . Path , mount . Mount ) {
2022-03-02 17:54:56 +00:00
writer . Header ( ) . Set ( "Content-Type" , mount . MimeType )
2022-03-07 15:19:38 +00:00
writer . Header ( ) . Set ( "Cache-Control" , "no-store, max-age=604800" )
writer . Header ( ) . Set ( "Access-Control-Expose-Headers" , "Accept-Ranges, Server, Content-Type, Icy-MetaInt" )
writer . Header ( ) . Set ( "Vary" , "*" )
rangeHeader := request . Header . Get ( "range" )
if rangeHeader != "" && rangeHeader != "bytes=0-" {
//TODO: maybe should fail in case bytes are requested
2022-04-20 17:08:45 +00:00
if strings . Index ( request . UserAgent ( ) , " Safari/" ) != - 1 && mount . MimeType == "audio/flac" {
//Safari special case, fake Range check so it decodes afterwards.
//Safari creates a request with Range for 0-1, specifically for FLAC, and expects a result supporting range. Afterwards it requests the whole file.
//However the decoder is able to decode FLAC livestreams. If we fake the initial range response, then afterwards serve normal responses, Safari will happily work.
//TODO: remove this AS SOON as safari works on its own
//safariLargeFileValue arbitrary large value, cannot be that large or iOS Safari fails.
safariLargeFileValue := 1024 * 1024 * 1024 * 1024 * 16 // 16 TiB
if rangeHeader == "bytes=0-1" {
//first request
writer . Header ( ) . Set ( "Accept-Ranges" , "bytes" )
writer . Header ( ) . Set ( "Content-Range" , fmt . Sprintf ( "bytes 0-1/%d" , safariLargeFileValue ) ) //64 TiB max fake size
writer . Header ( ) . Set ( "Content-Length" , "2" )
writer . WriteHeader ( http . StatusPartialContent )
2022-07-14 21:21:58 +00:00
writer . Write ( [ ] byte { 'f' , 'L' } )
2022-04-20 17:08:45 +00:00
return
} else if rangeHeader == fmt . Sprintf ( "bytes=0-%d" , safariLargeFileValue - 1 ) {
//second request, serve status 200 to keep retries to a minimum
writer . Header ( ) . Set ( "Content-Length" , fmt . Sprintf ( "%d" , safariLargeFileValue ) )
writer . WriteHeader ( http . StatusOK )
} else if strings . HasPrefix ( rangeHeader , "bytes=" ) && strings . HasSuffix ( rangeHeader , fmt . Sprintf ( "-%d" , safariLargeFileValue - 1 ) ) {
//any other requests, these should fail
writer . Header ( ) . Set ( "Content-Range" , fmt . Sprintf ( "bytes %s/%d" , strings . TrimPrefix ( rangeHeader , "bytes=" ) , safariLargeFileValue ) )
writer . Header ( ) . Set ( "Accept-Ranges" , "bytes" )
writer . WriteHeader ( http . StatusRequestedRangeNotSatisfiable )
return
}
}
2022-03-07 15:19:38 +00:00
}
bitrate := 0
if value , ok := mount . Options [ "bitrate" ] ; ok {
if intValue , ok := value . ( int ) ; ok {
bitrate = intValue
} else if int64Value , ok := value . ( int64 ) ; ok {
bitrate = int ( int64Value )
}
}
//set some audiocast/icy radio headers
writer . Header ( ) . Set ( "x-audiocast-name" , q . config . Radio . Name )
writer . Header ( ) . Set ( "x-audiocast-bitrate" , fmt . Sprintf ( "%d" , bitrate ) )
writer . Header ( ) . Set ( "icy-name" , q . config . Radio . Name )
writer . Header ( ) . Set ( "icy-version" , "2" )
writer . Header ( ) . Set ( "icy-index-metadata" , "1" )
if q . config . Radio . Description != "" {
writer . Header ( ) . Set ( "x-audiocast-description" , q . config . Radio . Description )
writer . Header ( ) . Set ( "icy-description" , q . config . Radio . Description )
}
if q . config . Radio . URL != "" {
writer . Header ( ) . Set ( "x-audiocast-url" , q . config . Radio . URL )
writer . Header ( ) . Set ( "icy-url" , q . config . Radio . URL )
}
if q . config . Radio . Logo != "" {
writer . Header ( ) . Set ( "icy-logo" , q . config . Radio . Logo )
}
writer . Header ( ) . Set ( "icy-br" , fmt . Sprintf ( "%d" , bitrate ) )
writer . Header ( ) . Set ( "icy-sr" , fmt . Sprintf ( "%d" , mount . SampleRate ) )
2022-05-15 14:42:28 +00:00
writer . Header ( ) . Set ( "icy-audio-info" , fmt . Sprintf ( "ice-channels=%d;ice-samplerate=%d;ice-bitrate=%d" , mount . Channels , mount . SampleRate , bitrate ) )
2022-03-07 15:19:38 +00:00
if q . config . Radio . Private {
writer . Header ( ) . Set ( "icy-pub" , "0" )
writer . Header ( ) . Set ( "icy-do-not-index" , "1" )
writer . Header ( ) . Set ( "x-audiocast-public" , "0" )
writer . Header ( ) . Set ( "x-robots-tag" , "noindex, nofollow" )
} else {
writer . Header ( ) . Set ( "icy-pub" , "1" )
writer . Header ( ) . Set ( "icy-do-not-index" , "0" )
writer . Header ( ) . Set ( "x-audiocast-public" , "1" )
}
2022-03-02 17:54:56 +00:00
2022-03-05 09:34:53 +00:00
var packetWriteCallback func ( packet packetizer . Packet ) error
//buffer a bit, drop channels when buffer grows to not lock others. They will get disconnected elsewhere
2022-03-08 16:18:31 +00:00
const byteSliceChannelBuffer = 1024 * 16
2022-03-05 09:34:53 +00:00
writeChannel := make ( chan [ ] byte , byteSliceChannelBuffer )
var requestDone error
var wgClient sync . WaitGroup
2022-03-04 12:25:59 +00:00
2022-04-20 11:20:22 +00:00
//set X-Audio-Packet-Stream for strictly timed packets and metadata
if numberValue , err := strconv . Atoi ( request . Header . Get ( "x-audio-packet-stream" ) ) ; err == nil && numberValue == 1 {
//version 1
2022-07-16 12:49:47 +00:00
writer . Header ( ) . Set ( "x-audio-packet-stream" , "1" )
writer . Header ( ) . Set ( "Content-Type" , "application/x-audio-packet-stream" )
2022-04-20 11:20:22 +00:00
packetWriteCallback = func ( packet packetizer . Packet ) error {
if requestDone != nil {
return requestDone
}
if metadataPacket , ok := packet . ( * QueueMetadataPacket ) ; ok {
queueInfoBuf := make ( [ ] byte , binary . MaxVarintLen64 )
n := binary . PutVarint ( queueInfoBuf , int64 ( metadataPacket . TrackEntry . QueueIdentifier ) )
if len ( writeChannel ) >= ( byteSliceChannelBuffer - 1 ) {
requestDone = errors . New ( "client ran out of buffer" )
log . Printf ( "failed to write data to client: %s\n" , requestDone )
return requestDone
}
writeChannel <- ( & packetStreamFrame {
Type : TrackIdentifier ,
2022-07-16 12:49:47 +00:00
Category : packet . Category ( ) ,
2022-04-20 11:20:22 +00:00
StartSampleNumber : packet . GetStartSampleNumber ( ) ,
DurationInSamples : packet . GetEndSampleNumber ( ) - packet . GetStartSampleNumber ( ) ,
Data : queueInfoBuf [ : n ] ,
} ) . Encode ( )
if metadataBytes , err := json . Marshal ( metadataPacket . TrackEntry . Metadata ) ; err == nil {
if len ( writeChannel ) >= ( byteSliceChannelBuffer - 1 ) {
requestDone = errors . New ( "client ran out of buffer" )
log . Printf ( "failed to write data to client: %s\n" , requestDone )
return requestDone
}
writeChannel <- ( & packetStreamFrame {
Type : TrackMetadata ,
2022-07-16 12:49:47 +00:00
Category : packet . Category ( ) ,
2022-04-20 11:20:22 +00:00
StartSampleNumber : packet . GetStartSampleNumber ( ) ,
DurationInSamples : packet . GetEndSampleNumber ( ) - packet . GetStartSampleNumber ( ) ,
Data : metadataBytes ,
} ) . Encode ( )
}
return nil
}
if len ( writeChannel ) >= ( byteSliceChannelBuffer - 1 ) {
requestDone = errors . New ( "client ran out of buffer" )
log . Printf ( "failed to write data to client: %s\n" , requestDone )
return requestDone
}
//TODO: category
2022-07-16 12:49:47 +00:00
var frameType PacketStreamType
2022-04-20 11:20:22 +00:00
switch packet . KeepMode ( ) {
case packetizer . KeepLast :
frameType = DataKeepLast
case packetizer . Keep :
frameType = DataKeep
case packetizer . GroupKeep :
frameType = DataGroupKeep
case packetizer . GroupDiscard :
frameType = DataGroupDiscard
case packetizer . Discard :
frameType = DataDiscard
default :
return errors . New ( "unknown KeepMode" )
}
writeChannel <- ( & packetStreamFrame {
Type : frameType ,
2022-07-16 12:49:47 +00:00
Category : packet . Category ( ) ,
2022-04-20 11:20:22 +00:00
StartSampleNumber : packet . GetStartSampleNumber ( ) ,
DurationInSamples : packet . GetEndSampleNumber ( ) - packet . GetStartSampleNumber ( ) ,
Data : packet . GetData ( ) ,
} ) . Encode ( )
return nil
}
headerBytes := new ( bytes . Buffer )
2022-05-15 14:42:28 +00:00
binary . Write ( headerBytes , binary . LittleEndian , int64 ( mount . Channels ) )
2022-04-20 11:20:22 +00:00
binary . Write ( headerBytes , binary . LittleEndian , int64 ( mount . SampleRate ) )
binary . Write ( headerBytes , binary . LittleEndian , int32 ( len ( mount . MimeType ) ) )
headerBytes . Write ( [ ] byte ( mount . MimeType ) )
writeChannel <- ( & packetStreamFrame {
Type : Header ,
2022-07-16 12:49:47 +00:00
Category : 0 ,
2022-04-20 11:20:22 +00:00
StartSampleNumber : 0 ,
DurationInSamples : 0 ,
Data : headerBytes . Bytes ( ) ,
} ) . Encode ( )
} else if numberValue , err = strconv . Atoi ( request . Header . Get ( "icy-metadata" ) ) ; err == nil && numberValue >= 1 {
2022-03-05 09:34:53 +00:00
metadataToSend := make ( map [ string ] string )
2022-03-07 15:19:38 +00:00
const icyInterval = 8192 //weird clients might not support other numbers than this
2022-03-05 09:34:53 +00:00
icyCounter := 0
2022-03-07 15:19:38 +00:00
writer . Header ( ) . Set ( "icy-metaint" , fmt . Sprintf ( "%d" , icyInterval ) )
2022-03-05 09:34:53 +00:00
writeIcy := func ( ) [ ] byte {
2022-03-07 15:19:38 +00:00
packetContent := make ( [ ] byte , 1 , 16 * 255 + 1 )
2022-03-05 09:34:53 +00:00
for k , v := range metadataToSend {
packetContent = append ( packetContent , [ ] byte ( fmt . Sprintf ( "%s='%s';" , k , v ) ) ... )
delete ( metadataToSend , k )
//shouldn't send multiple properties in same packet if we want working single quotes, wait until next ICY frame
break
}
2022-03-02 17:54:56 +00:00
2022-03-05 09:34:53 +00:00
contentLength := len ( packetContent ) - 1
if contentLength > 16 * 255 {
//cannot send long titles
return make ( [ ] byte , 1 )
}
2022-03-02 17:54:56 +00:00
2022-03-05 09:34:53 +00:00
if ( contentLength % 16 ) == 0 { //already padded
packetContent [ 0 ] = byte ( contentLength / 16 )
2022-03-04 12:25:59 +00:00
} else {
2022-03-05 09:34:53 +00:00
packetContent [ 0 ] = byte ( contentLength / 16 ) + 1
packetContent = append ( packetContent , make ( [ ] byte , 16 - ( contentLength % 16 ) ) ... )
}
return packetContent
}
2022-05-15 18:19:12 +00:00
var streamStartOffset int64 = - 1
2022-03-05 09:34:53 +00:00
packetWriteCallback = func ( packet packetizer . Packet ) error {
if requestDone != nil {
return requestDone
}
if metadataPacket , ok := packet . ( * QueueMetadataPacket ) ; ok {
if len ( metadataPacket . TrackEntry . Metadata . Artist ) > 0 {
metadataToSend [ "StreamTitle" ] = fmt . Sprintf ( "%s - %s" , metadataPacket . TrackEntry . Metadata . Artist , metadataPacket . TrackEntry . Metadata . Title )
} else {
metadataToSend [ "StreamTitle" ] = metadataPacket . TrackEntry . Metadata . Title
}
if len ( metadataPacket . TrackEntry . Metadata . Art ) > 0 {
metadataToSend [ "StreamURL" ] = metadataPacket . TrackEntry . Metadata . Art
}
return nil
}
2022-05-15 18:19:12 +00:00
var p [ ] byte
2022-07-16 13:06:21 +00:00
if offsetable , ok := packet . ( packetizer . OffsetablePacket ) ; mount . OffsetStart && ok {
2022-05-15 18:19:12 +00:00
if streamStartOffset <= - 1 {
if offsetable . KeepMode ( ) != packetizer . Keep {
streamStartOffset = offsetable . GetStartSampleNumber ( )
p = offsetable . GetDataOffset ( streamStartOffset )
} else {
p = packet . GetData ( )
}
} else {
p = offsetable . GetDataOffset ( streamStartOffset )
}
} else {
p = packet . GetData ( )
}
2022-03-05 09:34:53 +00:00
var data [ ] byte
for len ( p ) > 0 {
l := icyInterval - icyCounter
if l <= len ( p ) {
data = append ( data , p [ : l ] ... )
data = append ( data , writeIcy ( ) ... )
icyCounter = 0
p = p [ l : ]
} else {
data = append ( data , p ... )
icyCounter += len ( p )
p = p [ : 0 ]
}
2022-03-04 12:25:59 +00:00
}
2022-03-05 09:34:53 +00:00
2022-03-07 16:33:37 +00:00
if len ( writeChannel ) >= ( byteSliceChannelBuffer - 1 ) {
2022-03-05 09:34:53 +00:00
requestDone = errors . New ( "client ran out of buffer" )
return requestDone
}
writeChannel <- data
2022-03-04 12:25:59 +00:00
return nil
}
2022-03-05 09:34:53 +00:00
} else {
2022-05-15 18:19:12 +00:00
var streamStartOffset int64 = - 1
2022-03-05 09:34:53 +00:00
packetWriteCallback = func ( packet packetizer . Packet ) error {
if requestDone != nil {
return requestDone
}
if _ , ok := packet . ( * QueueMetadataPacket ) ; ok {
return nil
2022-03-02 17:54:56 +00:00
}
2022-03-07 16:33:37 +00:00
if len ( writeChannel ) >= ( byteSliceChannelBuffer - 1 ) {
2022-03-05 09:34:53 +00:00
requestDone = errors . New ( "client ran out of buffer" )
2022-03-07 16:33:37 +00:00
log . Printf ( "failed to write data to client: %s\n" , requestDone )
2022-03-05 09:34:53 +00:00
return requestDone
}
2022-05-15 18:19:12 +00:00
2022-07-16 13:06:21 +00:00
if offsetable , ok := packet . ( packetizer . OffsetablePacket ) ; mount . OffsetStart && ok {
2022-05-15 18:19:12 +00:00
if streamStartOffset <= - 1 {
if offsetable . KeepMode ( ) != packetizer . Keep {
streamStartOffset = offsetable . GetStartSampleNumber ( )
writeChannel <- offsetable . GetDataOffset ( streamStartOffset )
} else {
writeChannel <- packet . GetData ( )
}
} else {
writeChannel <- offsetable . GetDataOffset ( streamStartOffset )
}
} else {
writeChannel <- packet . GetData ( )
}
2022-03-05 09:34:53 +00:00
return nil
}
2022-03-02 17:54:56 +00:00
}
2022-03-05 09:34:53 +00:00
2022-03-02 17:54:56 +00:00
wgClient . Add ( 1 )
2022-03-05 09:34:53 +00:00
go func ( ) {
defer wgClient . Done ( )
var flusher http . Flusher
if httpFlusher , ok := writer . ( http . Flusher ) ; ok {
flusher = httpFlusher
}
for byteSlice := range writeChannel {
if _ , requestDone = writer . Write ( byteSlice ) ; requestDone != nil {
2022-03-07 16:33:37 +00:00
log . Printf ( "failed to write data to client: %s\n" , requestDone )
2022-03-05 09:34:53 +00:00
break
}
//try flush
if flusher != nil {
flusher . Flush ( )
}
}
} ( )
2022-03-02 17:54:56 +00:00
var headers [ ] HeaderEntry
for k , v := range request . Header {
for _ , s := range v {
headers = append ( headers , HeaderEntry {
Name : k ,
Value : s ,
} )
}
}
uriPath := request . URL . Path
if len ( request . URL . Query ( ) . Encode ( ) ) > 0 {
uriPath += "?" + request . URL . Query ( ) . Encode ( )
}
2022-03-03 14:00:34 +00:00
getKnownBufferSize := func ( ) time . Duration {
userAgent := request . Header . Get ( "user-agent" )
if strings . Index ( userAgent , "libmpv" ) != - 1 || strings . Index ( userAgent , "mpv " ) != - 1 { //mpv
return time . Millisecond * 2500
} else if strings . Index ( userAgent , "libvlc" ) != - 1 { //VLC
return time . Millisecond * 2500
} else if strings . Index ( userAgent , "lavf/" ) != - 1 { //ffplay
return time . Millisecond * 2500
} else if strings . Index ( userAgent , "gvfs/" ) != - 1 { //gvfs
return time . Millisecond * 2500
} else if strings . Index ( userAgent , "Music Player Daemon " ) != - 1 { //MPD
return time . Millisecond * 2500
} else if strings . Index ( userAgent , " Chrome/" ) != - 1 { //Chromium-based
2022-03-07 17:05:25 +00:00
return time . Millisecond * 4000
2022-03-03 14:00:34 +00:00
} else if strings . Index ( userAgent , " Safari/" ) != - 1 { //Safari-based
return time . Millisecond * 5000
} else if strings . Index ( userAgent , " Gecko/" ) != - 1 { //Gecko-based (Firefox)
return time . Millisecond * 5000
} else if request . Header . Get ( "icy-metadata" ) == "1" { //other unknown players
return time . Millisecond * 5000
}
//fallback and provide maximum buffer
return time . Second * maxBufferSize
}
sampleBufferLimit := int64 ( q . config . Queue . BufferSeconds * mount . SampleRate )
if q . config . Queue . BufferSeconds == 0 { //auto buffer setup based on user agent and other client headers
sampleBufferLimit = int64 ( getKnownBufferSize ( ) . Seconds ( ) * float64 ( mount . SampleRate ) )
}
2022-03-05 09:34:53 +00:00
wgClient . Add ( 1 )
2022-03-02 17:54:56 +00:00
mount . AddListener ( & StreamListener {
Information : ListenerInformation {
2022-03-02 18:32:41 +00:00
Mount : mount . Mount ,
2022-03-02 17:54:56 +00:00
Path : uriPath ,
Headers : headers ,
} ,
Start : func ( packets [ ] packetizer . Packet ) error {
2022-03-07 16:33:37 +00:00
log . Printf ( "adding %s client to stream %s\n" , request . RemoteAddr , mount . Mount )
2022-03-03 14:00:34 +00:00
if len ( packets ) > 0 {
2022-03-04 12:25:59 +00:00
sampleBufferMin := packets [ len ( packets ) - 1 ] . GetStartSampleNumber ( ) - sampleBufferLimit
2022-03-03 14:00:34 +00:00
for _ , p := range packets {
2022-03-04 12:25:59 +00:00
if p . KeepMode ( ) != packetizer . Discard || p . GetEndSampleNumber ( ) >= sampleBufferMin {
2022-03-05 09:34:53 +00:00
if err := packetWriteCallback ( p ) ; err != nil {
2022-03-03 14:00:34 +00:00
return err
}
}
2022-03-02 17:54:56 +00:00
}
}
return nil
} ,
2022-03-05 09:34:53 +00:00
Write : packetWriteCallback ,
2022-03-02 17:54:56 +00:00
Close : func ( ) {
2022-03-07 16:33:37 +00:00
log . Printf ( "removing %s client from stream %s\n" , request . RemoteAddr , mount . Mount )
2022-03-05 09:34:53 +00:00
defer wgClient . Done ( )
close ( writeChannel )
2022-03-02 17:54:56 +00:00
} ,
} )
wgClient . Wait ( )
return
}
}
writer . WriteHeader ( http . StatusNotFound )
return
}