package main import ( "fmt" "git.gammaspectra.live/S.O.N.G/Kirika/audio" "git.gammaspectra.live/S.O.N.G/Kirika/audio/format/flac" "git.gammaspectra.live/S.O.N.G/Kirika/audio/format/mp3" "git.gammaspectra.live/S.O.N.G/Kirika/audio/format/opus" "git.gammaspectra.live/S.O.N.G/Kirika/audio/format/tta" "git.gammaspectra.live/S.O.N.G/Kirika/audio/packetizer" "io" "log" "net/http" "os" "path" "strings" "sync" "time" ) const maxBufferSize = 10 type QueueTrackEntry struct { QueueIdentifier audio.QueueIdentifier Path string Metadata struct { Title string `json:"title"` Album string `json:"album"` Artist string `json:"artist"` Art string `json:"art"` } original map[string]interface{} } type Queue struct { NowPlaying chan *QueueTrackEntry QueueEmpty chan *QueueTrackEntry audioQueue *audio.Queue mounts []*StreamMount queue []*QueueTrackEntry mutex sync.RWMutex config *Config wg sync.WaitGroup } func NewQueue(config *Config) *Queue { q := &Queue{ NowPlaying: make(chan *QueueTrackEntry, 1), QueueEmpty: make(chan *QueueTrackEntry), config: config, audioQueue: audio.NewQueue(44100, 2), } blocksPerSecond := 20 sources := SplitAudioSource(audio.NewFilterChain(q.audioQueue.GetSource(), audio.NewBufferFilter(16), audio.NewRealTimeFilter(blocksPerSecond), audio.NewBufferFilter(maxBufferSize*blocksPerSecond)), len(config.Streams)) for i, s := range q.config.Streams { mount := NewStreamMount(sources[i], s.MountPath, s.Codec, s.Container, s.Bitrate) if mount == nil { log.Panicf("could not initialize %s\n", s.MountPath) } q.mounts = append(q.mounts, mount) q.wg.Add(1) go mount.Process(&q.wg) } return q } func (q *Queue) Wait() { q.wg.Wait() close(q.NowPlaying) } var flacFormat = flac.NewFormat() var ttaFormat = tta.NewFormat() var mp3Format = mp3.NewFormat() var opusFormat = opus.NewFormat() func (q *Queue) AddTrack(entry *QueueTrackEntry, tail bool) error { f, err := os.Open(entry.Path) if err != nil { return err } var source audio.Source switch strings.ToLower(path.Ext(entry.Path)) { case ".flac": source, err = flacFormat.Open(f) case ".tta": source, err = ttaFormat.Open(f) case ".mp3": source, err = mp3Format.Open(f) case ".ogg", ".opus": source, err = opusFormat.Open(f) } if err != nil { f.Close() return err } if source.Blocks == nil { f.Close() return fmt.Errorf("could not find decoder for %s", entry.Path) } startCallback := func(queue *audio.Queue, entry *audio.QueueEntry) { log.Printf("now playing %s\n", f.Name()) if e := q.Get(entry.Identifier); e != nil { q.NowPlaying <- e } } endCallback := func(queue *audio.Queue, entry *audio.QueueEntry) { } removeCallback := func(queue *audio.Queue, entry *audio.QueueEntry) { defer f.Close() q.Remove(entry.Identifier) q.HandleQueue() } q.mutex.Lock() defer q.mutex.Unlock() if tail { entry.QueueIdentifier = q.audioQueue.AddTail(source, startCallback, endCallback, removeCallback) } else { entry.QueueIdentifier = q.audioQueue.AddHead(source, startCallback, endCallback, removeCallback) } entry.original["queue_id"] = entry.QueueIdentifier if tail || len(q.queue) == 0 { q.queue = append(q.queue, entry) } else { q.queue = append(q.queue[:1], append([]*QueueTrackEntry{entry}, q.queue[1:]...)...) } return nil } func (q *Queue) HandleQueue() { if q.audioQueue.GetQueueSize() == 0 { q.AddTrack(<-q.QueueEmpty, true) } } func (q *Queue) GetQueue() (result []*QueueTrackEntry) { q.mutex.RLock() defer q.mutex.RUnlock() if len(q.queue) > 1 { result = make([]*QueueTrackEntry, len(q.queue)-1) copy(result, q.queue[1:]) } return } func (q *Queue) Get(identifier audio.QueueIdentifier) *QueueTrackEntry { q.mutex.RLock() defer q.mutex.RUnlock() for _, e := range q.queue { if e.QueueIdentifier == identifier { return e } } return nil } func (q *Queue) GetNowPlaying() *QueueTrackEntry { if e := q.audioQueue.GetQueueHead(); e != nil { return q.Get(e.Identifier) } return nil } func (q *Queue) SkipNowPlaying() bool { if e := q.audioQueue.GetQueueHead(); e != nil { return q.Remove(e.Identifier) } return false } func (q *Queue) GetIndex(index int) *QueueTrackEntry { if e := q.audioQueue.GetQueueIndex(index + 1); e != nil { return q.Get(e.Identifier) } return nil } func (q *Queue) GetHead() *QueueTrackEntry { if e := q.audioQueue.GetQueueIndex(1); e != nil { return q.Get(e.Identifier) } return nil } func (q *Queue) GetTail() *QueueTrackEntry { if i, e := q.audioQueue.GetQueueTail(); i != 0 && e != nil { return q.Get(e.Identifier) } return nil } func (q *Queue) Remove(identifier audio.QueueIdentifier) bool { q.mutex.Lock() for i, e := range q.queue { if e.QueueIdentifier == identifier { q.queue = append(q.queue[:i], q.queue[i+1:]...) q.mutex.Unlock() q.audioQueue.Remove(identifier) return true } } q.mutex.Unlock() q.audioQueue.Remove(identifier) return false } type httpWriter struct { timeout time.Duration writer http.ResponseWriter } func (h *httpWriter) Write(p []byte) (n int, err error) { if h.writer != nil { n, err = h.writer.Write(p) if err != nil { h.writer = nil } return } return 0, io.EOF } func (h *httpWriter) Close() (err error) { h.writer = nil return nil } func (h *httpWriter) Flush() { if h.writer != nil { //TODO: not deadline aware? /*if flusher, ok := h.writer.(http.Flusher); ok { flusher.Flush() }*/ } } func (q *Queue) GetListeners() (listeners []*ListenerInformation) { q.mutex.RLock() defer q.mutex.RUnlock() listeners = make([]*ListenerInformation, 0, 1) for _, mount := range q.mounts { listeners = append(listeners, mount.GetListeners()...) } return } func (q *Queue) HandleRadioRequest(writer http.ResponseWriter, request *http.Request) { for _, mount := range q.mounts { if strings.HasSuffix(request.URL.Path, mount.Mount) { writer.Header().Set("Server", "MeteorLight/radio") writer.Header().Set("Content-Type", mount.MimeType) writer.Header().Set("Accept-Ranges", "none") writer.Header().Set("Connection", "keep-alive") writer.Header().Set("X-Audiocast-Name", q.config.Radio.Name) writer.Header().Set("Cache-Control", "no-store, max-age=0, no-transform") writer.Header().Set("X-Content-Type-Options", "nosniff") byteWriter := &httpWriter{writer: writer, timeout: time.Second * 2} var wgClient sync.WaitGroup writeCallback := func(packet packetizer.Packet) error { //TODO: icy /* select { case <-request.Context().Done(): // Client gave up default: } */ _, err := byteWriter.Write(packet.GetData()) byteWriter.Flush() return err } wgClient.Add(1) var headers []HeaderEntry for k, v := range request.Header { for _, s := range v { headers = append(headers, HeaderEntry{ Name: k, Value: s, }) } } uriPath := request.URL.Path if len(request.URL.Query().Encode()) > 0 { uriPath += "?" + request.URL.Query().Encode() } getKnownBufferSize := func() time.Duration { userAgent := request.Header.Get("user-agent") if strings.Index(userAgent, "libmpv") != -1 || strings.Index(userAgent, "mpv ") != -1 { //mpv return time.Millisecond * 2500 } else if strings.Index(userAgent, "libvlc") != -1 { //VLC return time.Millisecond * 2500 } else if strings.Index(userAgent, "lavf/") != -1 { //ffplay return time.Millisecond * 2500 } else if strings.Index(userAgent, "gvfs/") != -1 { //gvfs return time.Millisecond * 2500 } else if strings.Index(userAgent, "Music Player Daemon ") != -1 { //MPD return time.Millisecond * 2500 } else if strings.Index(userAgent, " Chrome/") != -1 { //Chromium-based return time.Millisecond * 5000 } else if strings.Index(userAgent, " Safari/") != -1 { //Safari-based return time.Millisecond * 5000 } else if strings.Index(userAgent, " Gecko/") != -1 { //Gecko-based (Firefox) return time.Millisecond * 5000 } else if request.Header.Get("icy-metadata") == "1" { //other unknown players return time.Millisecond * 5000 } //fallback and provide maximum buffer return time.Second * maxBufferSize } sampleBufferLimit := int64(q.config.Queue.BufferSeconds * mount.SampleRate) if q.config.Queue.BufferSeconds == 0 { //auto buffer setup based on user agent and other client headers sampleBufferLimit = int64(getKnownBufferSize().Seconds() * float64(mount.SampleRate)) } mount.AddListener(&StreamListener{ Information: ListenerInformation{ Mount: mount.Mount, Path: uriPath, Headers: headers, }, Start: func(packets []packetizer.Packet) error { if len(packets) > 0 { sampleBufferMin := packets[len(packets)-1].GetSampleNumber() - sampleBufferLimit for _, p := range packets { if p.KeepMode() != packetizer.Discard || p.GetSampleNumber() >= sampleBufferMin { if err := writeCallback(p); err != nil { return err } } } } return nil }, Write: writeCallback, Close: func() { byteWriter.Close() wgClient.Done() }, }) wgClient.Wait() return } } writer.WriteHeader(http.StatusNotFound) return }