diff --git a/.drone.yml b/.drone.yml new file mode 100644 index 0000000..a6133bf --- /dev/null +++ b/.drone.yml @@ -0,0 +1,16 @@ +--- +kind: pipeline +type: docker +name: default + +steps: + - name: test + image: golang:1.18rc1-bullseye + commands: + - DEBIAN_FRONTEND=noninteractive apt update + - DEBIAN_FRONTEND=noninteractive apt install -y git build-essential autoconf automake libtool libflac-dev libopus-dev libopusfile-dev libsamplerate0-dev libmp3lame-dev + - git clone --depth 1 https://github.com/xiph/libopusenc.git && cd libopusenc && ./autogen.sh && ./configure --prefix /usr && make && make install && cd .. + - git clone --depth 1 https://github.com/mstorsjo/fdk-aac.git && cd fdk-aac && ./autogen.sh && ./configure --prefix /usr && make -j$(nproc) && make install && cd .. + - go build -v . + +... diff --git a/MeteorLight.go b/MeteorLight.go index 70ba439..3d00c3a 100644 --- a/MeteorLight.go +++ b/MeteorLight.go @@ -1,155 +1,19 @@ package main import ( + "bytes" "encoding/json" - "errors" "flag" "fmt" "git.gammaspectra.live/S.O.N.G/Kirika/audio" - "git.gammaspectra.live/S.O.N.G/Kirika/audio/format/flac" - "git.gammaspectra.live/S.O.N.G/Kirika/audio/format/mp3" - "git.gammaspectra.live/S.O.N.G/Kirika/audio/format/opus" - "git.gammaspectra.live/S.O.N.G/Kirika/audio/format/tta" - "git.gammaspectra.live/S.O.N.G/Kirika/audio/packetizer" - "io" "io/ioutil" "log" "net/http" - "os" - "path" + "strconv" "strings" "sync" ) -var audioQueue = audio.NewQueue(44100, 2, 64) -var flacFormat = flac.NewFormat() -var ttaFormat = tta.NewFormat() -var mp3Format = mp3.NewFormat() -var opusFormat = opus.NewFormat() - -type QueueTrackEntry struct { - QueueIdentifier audio.QueueIdentifier - Path string - Metadata struct { - Title string - Album string - Artist string - Art string - } - - original map[string]interface{} -} - -var queue []*QueueTrackEntry -var queueLock sync.RWMutex - -var config *Config - -func addTrack(entry *QueueTrackEntry, tail bool) error { - - f, err := os.Open(entry.Path) - if err != nil { - return err - } - var source audio.Source - switch strings.ToLower(path.Ext(entry.Path)) { - case ".flac": - source, err = flacFormat.Open(f) - case ".tta": - source, err = ttaFormat.Open(f) - case ".mp3": - source, err = mp3Format.Open(f) - case ".ogg", ".opus": - source, err = opusFormat.Open(f) - } - - if err != nil { - f.Close() - return err - } - - if source.Blocks == nil { - f.Close() - return fmt.Errorf("could not find decoder for %s", entry.Path) - } - - if tail { - entry.QueueIdentifier = audioQueue.AddTail(source, func(q *audio.Queue, entry *audio.QueueEntry) { - log.Printf("now playing %s\n", f.Name()) - go handleQueue() - }, func(q *audio.Queue, entry *audio.QueueEntry) { - log.Printf("finished playing %s\n", f.Name()) - f.Close() - go handleQueueRemove(entry.Identifier) - go handleQueue() - }) - } else { - entry.QueueIdentifier = audioQueue.AddHead(source, func(q *audio.Queue, entry *audio.QueueEntry) { - log.Printf("now playing %s\n", f.Name()) - go handleQueue() - }, func(q *audio.Queue, entry *audio.QueueEntry) { - log.Printf("finished playing %s\n", f.Name()) - f.Close() - go handleQueueRemove(entry.Identifier) - go handleQueue() - }) - } - - entry.original["queue_id"] = entry.QueueIdentifier - - return nil -} - -func handleQueueRemove(identifier audio.QueueIdentifier) { - queueLock.Lock() - defer queueLock.Unlock() - for i, q := range queue { - if q.QueueIdentifier == identifier { - queue = append(queue[:i], queue[i+1:]...) - return - } - } - -} - -func handleQueue() { - if len(audioQueue.GetQueue()) <= 0 { //TODO: pre-queue it, or remove existing track - queueLock.Lock() - defer queueLock.Unlock() - - if e := getRandomTrack(); e != nil { - if err := addTrack(e, true); err != nil { - addTrack(getFallbackTrack(), true) //TODO: how to handle fallback error - } - } else { - addTrack(getFallbackTrack(), true) //TODO: how to handle fallback error - } - } - -} - -func getRandomTrack() *QueueTrackEntry { - response, err := http.DefaultClient.Get(config.Queue.RandomSongApi) - if err != nil { - return nil - } - - body, err := ioutil.ReadAll(response.Body) - if err != nil { - return nil - } - - return getQueueEntryFromBody(body) -} -func getFallbackTrack() *QueueTrackEntry { - m := make(map[string]interface{}) - m["path"] = config.Queue.FallbackPath - return &QueueTrackEntry{ - Path: config.Queue.FallbackPath, - original: m, - } -} - func getQueueEntryFromBody(body []byte) *QueueTrackEntry { entry := &QueueTrackEntry{} err := json.Unmarshal(body, &entry.original) @@ -193,60 +57,79 @@ func getQueueEntryFromBody(body []byte) *QueueTrackEntry { return nil } -type httpWriter struct { - io.WriteCloser - writer http.ResponseWriter -} - -func (h *httpWriter) Write(p []byte) (n int, err error) { - if h.writer != nil { - _, err = h.writer.Write(p) - if err != nil { - h.writer = nil - } - } - return len(p), nil -} - -func (h *httpWriter) Close() (err error) { - h.writer = nil - return nil -} - type resultResponse struct { Success bool `json:"success"` Reason error `json:"reason"` } +type queueResultResponse struct { + resultResponse + QueueId audio.QueueIdentifier `json:"queue_id"` +} + func main() { configPath := flag.String("config", "config.toml", "Config path") flag.Parse() - var err error - - config, err = GetConfig(*configPath) + config, err := GetConfig(*configPath) if err != nil { log.Panic(err) } var wg sync.WaitGroup - var mounts []*StreamMount + queue := NewQueue(config) - handleQueue() - - sources := SplitAudioSource((audio.NewRealTimeFilter(20)).Process(audioQueue.GetSource()), len(config.Streams)) - for i, s := range config.Streams { - mount := NewStreamMount(sources[i], s.MountPath, s.Codec, s.Container, s.Bitrate) - if mount == nil { - log.Panicf("could not initialize %s\n", s.MountPath) + getRandomTrack := func() *QueueTrackEntry { + response, err := http.DefaultClient.Get(config.Queue.RandomSongApi) + if err != nil { + return nil } - mounts = append(mounts, mount) - wg.Add(1) - go mount.Process() + + body, err := ioutil.ReadAll(response.Body) + if err != nil { + return nil + } + + return getQueueEntryFromBody(body) } + getFallbackTrack := func() *QueueTrackEntry { + m := make(map[string]interface{}) + m["path"] = config.Queue.FallbackPath + return &QueueTrackEntry{ + Path: config.Queue.FallbackPath, + original: m, + } + } + + wg.Add(1) + go func() { + defer wg.Done() + defer close(queue.QueueEmpty) + //TODO: close properly + for { + if e := getRandomTrack(); e != nil { + queue.QueueEmpty <- e + } else if e = getFallbackTrack(); e != nil { + queue.QueueEmpty <- e + } + } + }() + + wg.Add(1) + go func() { + defer wg.Done() + for np := range queue.NowPlaying { + jsonData, _ := json.Marshal(np.original) + _, err := http.DefaultClient.Post(config.Queue.NowPlaying, "application/json; charset=utf-8", bytes.NewReader(jsonData)) + if err != nil { + log.Print(err) + } + } + }() + wg.Add(1) go func() { defer wg.Done() @@ -256,60 +139,130 @@ func main() { Handler: http.HandlerFunc(func(writer http.ResponseWriter, request *http.Request) { writer.Header().Set("Server", "MeteorLight/api") writer.Header().Set("Content-Type", "application/json; charset=utf-8") - switch request.URL.Path { - case "/listeners": - var listeners []*ListenerInformation - for _, mount := range mounts { - listeners = append(listeners, mount.GetListeners()...) - } - jsonData, _ := json.Marshal(listeners) - writer.Write(jsonData) - case "/queue": - var blobs = make([]map[string]interface{}, 0, 1) - queueLock.RLock() - defer queueLock.RUnlock() - for _, e := range queue { - blobs = append(blobs, e.original) - } - - jsonData, _ := json.Marshal(blobs) - writer.Write(jsonData) - case "/skip": - if request.Method != "POST" { - return - } - - result := resultResponse{} - - q := audioQueue.GetQueue() - if len(q) > 0 { - result.Success = audioQueue.Remove(q[0]) - } else { - result.Reason = errors.New("queue empty") - } - - jsonData, _ := json.Marshal(result) - writer.Write(jsonData) - - case "/queue/clear": - if request.Method != "POST" { - return - } - - result := resultResponse{} - - q := audioQueue.GetQueue() - if len(q) > 0 { - for _, id := range q[1:] { - audioQueue.Remove(id) + pathSegments := strings.Split(request.URL.Path, "/") + if len(pathSegments) > 1 { + switch pathSegments[1] { + case "listeners": + jsonData, _ := json.Marshal(queue.GetListeners()) + writer.Write(jsonData) + case "np": + if e := queue.GetNowPlaying(); e != nil { + jsonData, _ := json.Marshal(e.original) + writer.Write(jsonData) } - result.Success = true + case "queue": + if len(pathSegments) == 2 { + if request.Method != "POST" { + return + } + var blobs = make([]map[string]interface{}, 0, 1) + for _, e := range queue.GetQueue() { + blobs = append(blobs, e.original) + } + jsonData, _ := json.Marshal(blobs) + writer.Write(jsonData) + } else { + switch pathSegments[2] { + case "head": + if request.Method == "POST" { + result := queueResultResponse{} + if body, err := ioutil.ReadAll(request.Body); err == nil { + if e := getQueueEntryFromBody(body); e != nil { + if err = queue.AddTrack(e, false); err == nil { + result.Success = true + result.QueueId = e.QueueIdentifier + } + } + } + + jsonData, _ := json.Marshal(result) + writer.Write(jsonData) + + return + } else if request.Method == "DELETE" { + result := resultResponse{} + if head := queue.GetHead(); head != nil { + result.Success = queue.Remove(head.QueueIdentifier) + } + + jsonData, _ := json.Marshal(result) + writer.Write(jsonData) + + return + } + + case "tail": + if request.Method == "POST" { + result := queueResultResponse{} + if body, err := ioutil.ReadAll(request.Body); err == nil { + if e := getQueueEntryFromBody(body); e != nil { + if err = queue.AddTrack(e, true); err == nil { + result.Success = true + result.QueueId = e.QueueIdentifier + } + } + } + + jsonData, _ := json.Marshal(result) + writer.Write(jsonData) + + return + } else if request.Method == "DELETE" { + result := resultResponse{} + if head := queue.GetTail(); head != nil { + result.Success = queue.Remove(head.QueueIdentifier) + } + + jsonData, _ := json.Marshal(result) + writer.Write(jsonData) + + return + } + + case "clear": + if request.Method != "POST" { + return + } + + result := resultResponse{} + + for _, e := range queue.GetQueue() { + queue.Remove(e.QueueIdentifier) + } + result.Success = true + + jsonData, _ := json.Marshal(result) + writer.Write(jsonData) + + default: + if request.Method != "POST" { + return + } + + if i, err := strconv.ParseInt(pathSegments[2], 10, 0); err == nil { + result := resultResponse{} + + result.Success = queue.Remove(audio.QueueIdentifier(i)) + + jsonData, _ := json.Marshal(result) + writer.Write(jsonData) + } + } + + } + case "skip": + if request.Method != "POST" { + return + } + + result := resultResponse{} + + result.Success = queue.SkipNowPlaying() + + jsonData, _ := json.Marshal(result) + writer.Write(jsonData) } - - jsonData, _ := json.Marshal(result) - writer.Write(jsonData) - } }), } @@ -324,80 +277,8 @@ func main() { defer wg.Done() server := http.Server{ - Addr: fmt.Sprintf(":%d", config.Radio.Port), - Handler: http.HandlerFunc(func(writer http.ResponseWriter, request *http.Request) { - mountName := strings.TrimPrefix(request.URL.Path, "/") - for _, mount := range mounts { - if mount.Mount == mountName { - writer.Header().Set("Server", "MeteorLight/radio") - writer.Header().Set("Content-Type", mount.MimeType) - writer.Header().Set("Accept-Ranges", "none") - writer.Header().Set("Connection", "keep-alive") - writer.Header().Set("X-Audiocast-Name", config.Radio.Name) - writer.Header().Set("Cache-Control", "no-store, max-age=0, no-transform") - writer.Header().Set("X-Content-Type-Options", "nosniff") - - byteWriter := &httpWriter{writer: writer} - - var wgClient sync.WaitGroup - - writeCallback := func(packet packetizer.Packet) error { - /* - select { - case <-request.Context().Done(): - // Client gave up - default: - } - - */ - _, err := byteWriter.Write(packet.GetData()) - return err - } - wgClient.Add(1) - - var headers []HeaderEntry - for k, v := range request.Header { - for _, s := range v { - headers = append(headers, HeaderEntry{ - Name: k, - Value: s, - }) - } - } - - uriPath := request.URL.Path - if len(request.URL.Query().Encode()) > 0 { - uriPath += "?" + request.URL.Query().Encode() - } - - mount.AddListener(&StreamListener{ - Information: ListenerInformation{ - Mount: mountName, - Path: uriPath, - Headers: headers, - }, - Start: func(packets []packetizer.Packet) error { - for _, p := range packets { - if err := writeCallback(p); err != nil { - return err - } - } - return nil - }, - Write: writeCallback, - Close: func() { - byteWriter.Close() - wgClient.Done() - }, - }) - wgClient.Wait() - return - } - } - - writer.WriteHeader(http.StatusNotFound) - return - }), + Addr: fmt.Sprintf(":%d", config.Radio.Port), + Handler: http.HandlerFunc(queue.HandleRadioRequest), } if err := server.ListenAndServe(); err != nil { @@ -405,5 +286,12 @@ func main() { } }() + if e := getRandomTrack(); e != nil { + queue.AddTrack(e, false) + } else if e = getFallbackTrack(); e != nil { + queue.AddTrack(e, false) + } + wg.Wait() + queue.Wait() } diff --git a/README.md b/README.md index 0f6be67..f72f1d5 100644 --- a/README.md +++ b/README.md @@ -4,14 +4,16 @@ Radio streamer ([kawa](https://github.com/Luminarys/kawa) drop-in compatible). This project is a Work in Progress. -`TODO: np, API: np queue/head queue/tail, proper handling of audio queue <-> data queue, packetizer buffer` +`TODO: proper handling of audio queue <-> data queue, timeouts, ICY metadata` # Improvements / differences from Kawa * Does not use libav ([see supported formats/codecs on Kirika](https://git.gammaspectra.live/S.O.N.G/Kirika#codecs-supported)) -* Supports listeners that have more than 16 HTTP headers on their requests. -* Does not restart stream per-track, instead being a continuous stream. -* Normalized channels / sample rates for endpoints. -* Implements ICY metadata (WiP) +* Supports HTTP clients that have more than 16 HTTP request headers. +* Does not restart stream per-track, instead being a continuous stream without parameter changes. +* Normalized channels / sample rates for mounts. +* Implements ICY metadata +* Uses sample/timed packet buffers, instead of kawa byte buffers, which cause wild differences between endpoints. Mounts usually align within 0.2s of each other, depending on client. + ## Dependencies ### Go >= 1.18 @@ -19,4 +21,65 @@ This project is a Work in Progress. ### [Kirika](https://git.gammaspectra.live/S.O.N.G/Kirika) dependencies Kirika is a collection of audio utilities for decoding/encoding files and streams. -Check its native dependencies that must be installed before usage. \ No newline at end of file +Check its native dependencies that must be installed before usage. + +## Usage +Start by copying example_config.toml to the location of your choice and reading through it. Of importance are `queue.fallback`, and `queue.random_song_api`. + +MeteorLight will search for `config.toml` in its working directory. Alternatively you can pass `-config "/example/path/config.toml"` to specify a different location. + +Batteries are not included - MeteorLight needs to be paired with your own software to find songs to stream. +You will have to provide an external API that MeteorLight can query for songs to play and notify as new songs being played. + +Before continuing, you will need to install the dependencies listed above. + +### From Git repository +```shell +$ git clone https://git.gammaspectra.live/S.O.N.G/MeteorLight.git && cd MeteorLight +$ go run . +``` + +### From Go run +```shell +$ go run git.gammaspectra.live/S.O.N.G/MeteorLight@ +``` + +## API +See [kawa API](https://github.com/Luminarys/kawa#api) for a general overview. Additional endpoints or changed ones are listed below. + +Track blobs returned have a `queue_id` parameter, regardless of source. + +### `NEW` DELETE /queue/ +Unqueues the track with `queue_id` specified as a parameter. + +#### Response +```json +{ + "success": true, + "reason": null +} +``` + +### `CHANGED` POST /queue/head +Same as kawa's, but `queue_id` is added to response directly. + +#### Response +```json +{ + "success": true, + "reason": null, + "queue_id": 3 +} +``` + +### `CHANGED` POST /queue/tail +Same as kawa's, but `queue_id` is added to response directly. + +#### Response +```json +{ + "success": true, + "reason": null, + "queue_id": 5 +} +``` \ No newline at end of file diff --git a/go.mod b/go.mod index 83d93aa..5730697 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module git.gammaspectra.live/S.O.N.G/MeteorLight go 1.18 require ( - git.gammaspectra.live/S.O.N.G/Kirika v0.0.0-20220301232239-8254100fb58f + git.gammaspectra.live/S.O.N.G/Kirika v0.0.0-20220302151314-7b6b11dd6c5c github.com/BurntSushi/toml v1.0.0 ) diff --git a/go.sum b/go.sum index f11316f..d5c7189 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,5 @@ -git.gammaspectra.live/S.O.N.G/Kirika v0.0.0-20220301232239-8254100fb58f h1:7WbbseQ4WAodtxOf1m7vT4NDphNIe7ScMkGP5HeYNyk= -git.gammaspectra.live/S.O.N.G/Kirika v0.0.0-20220301232239-8254100fb58f/go.mod h1:NYC/3wOINygtTYvAqEtMfgWBeJ/9Gfv0NvDxnWmg+yA= +git.gammaspectra.live/S.O.N.G/Kirika v0.0.0-20220302151314-7b6b11dd6c5c h1:8Tcq/ueYofDoeRgovGwekXHhyH0i15vm79W5wK/WwpE= +git.gammaspectra.live/S.O.N.G/Kirika v0.0.0-20220302151314-7b6b11dd6c5c/go.mod h1:NYC/3wOINygtTYvAqEtMfgWBeJ/9Gfv0NvDxnWmg+yA= git.gammaspectra.live/S.O.N.G/go-fdkaac v0.0.0-20220228131722-e9cb84c52f48 h1:MaKiBfXQl0keyfdCi1PxGOKRTiWhIs8PqCal5GhKDi0= git.gammaspectra.live/S.O.N.G/go-fdkaac v0.0.0-20220228131722-e9cb84c52f48/go.mod h1:pkWt//S9hLVEQaJDPu/cHHPk8vPpo/0+zHy0me4LIP4= git.gammaspectra.live/S.O.N.G/go-pus v0.0.0-20220227175608-6cc027f24dba h1:JEaxCVgdr3XXAuDCPAx7ttLFZaaHzTEzG+oRnVUtUKU= diff --git a/mount.go b/mount.go index 9000037..608f38d 100644 --- a/mount.go +++ b/mount.go @@ -195,7 +195,8 @@ func (m *StreamMount) GetListeners() (entries []*ListenerInformation) { return } -func (m *StreamMount) Process() { +func (m *StreamMount) Process(group *sync.WaitGroup) { + defer group.Done() defer func() { m.listenersLock.Lock() for _, l := range m.listeners { diff --git a/queue.go b/queue.go new file mode 100644 index 0000000..e1da4a1 --- /dev/null +++ b/queue.go @@ -0,0 +1,340 @@ +package main + +import ( + "fmt" + "git.gammaspectra.live/S.O.N.G/Kirika/audio" + "git.gammaspectra.live/S.O.N.G/Kirika/audio/format/flac" + "git.gammaspectra.live/S.O.N.G/Kirika/audio/format/mp3" + "git.gammaspectra.live/S.O.N.G/Kirika/audio/format/opus" + "git.gammaspectra.live/S.O.N.G/Kirika/audio/format/tta" + "git.gammaspectra.live/S.O.N.G/Kirika/audio/packetizer" + "log" + "net/http" + "os" + "path" + "strings" + "sync" +) + +type QueueTrackEntry struct { + QueueIdentifier audio.QueueIdentifier + Path string + Metadata struct { + Title string + Album string + Artist string + Art string + } + + original map[string]interface{} +} + +type Queue struct { + NowPlaying chan *QueueTrackEntry + QueueEmpty chan *QueueTrackEntry + audioQueue *audio.Queue + mounts []*StreamMount + queue []*QueueTrackEntry + mutex sync.RWMutex + config *Config + wg sync.WaitGroup +} + +func NewQueue(config *Config) *Queue { + q := &Queue{ + NowPlaying: make(chan *QueueTrackEntry, 1), + QueueEmpty: make(chan *QueueTrackEntry), + config: config, + audioQueue: audio.NewQueue(44100, 2), + } + blocksPerSecond := 20 + bufferSeconds := blocksPerSecond * 1 + + sources := SplitAudioSource(audio.NewFilterChain(q.audioQueue.GetSource(), audio.NewRealTimeFilter(blocksPerSecond), audio.NewBufferFilter(bufferSeconds)), len(config.Streams)) + for i, s := range q.config.Streams { + mount := NewStreamMount(sources[i], s.MountPath, s.Codec, s.Container, s.Bitrate) + if mount == nil { + log.Panicf("could not initialize %s\n", s.MountPath) + } + q.mounts = append(q.mounts, mount) + q.wg.Add(1) + go mount.Process(&q.wg) + } + + return q +} + +func (q *Queue) Wait() { + q.wg.Wait() + close(q.NowPlaying) +} + +var flacFormat = flac.NewFormat() +var ttaFormat = tta.NewFormat() +var mp3Format = mp3.NewFormat() +var opusFormat = opus.NewFormat() + +func (q *Queue) AddTrack(entry *QueueTrackEntry, tail bool) error { + + f, err := os.Open(entry.Path) + if err != nil { + return err + } + var source audio.Source + switch strings.ToLower(path.Ext(entry.Path)) { + case ".flac": + source, err = flacFormat.Open(f) + case ".tta": + source, err = ttaFormat.Open(f) + case ".mp3": + source, err = mp3Format.Open(f) + case ".ogg", ".opus": + source, err = opusFormat.Open(f) + } + + if err != nil { + f.Close() + return err + } + + if source.Blocks == nil { + f.Close() + return fmt.Errorf("could not find decoder for %s", entry.Path) + } + + startCallback := func(queue *audio.Queue, entry *audio.QueueEntry) { + log.Printf("now playing %s\n", f.Name()) + if e := q.Get(entry.Identifier); e != nil { + q.NowPlaying <- e + } + } + + endCallback := func(queue *audio.Queue, entry *audio.QueueEntry) { + + } + + removeCallback := func(queue *audio.Queue, entry *audio.QueueEntry) { + defer f.Close() + q.Remove(entry.Identifier) + q.HandleQueue() + } + + q.mutex.Lock() + defer q.mutex.Unlock() + + if tail { + entry.QueueIdentifier = q.audioQueue.AddTail(source, startCallback, endCallback, removeCallback) + } else { + entry.QueueIdentifier = q.audioQueue.AddHead(source, startCallback, endCallback, removeCallback) + } + + entry.original["queue_id"] = entry.QueueIdentifier + + if tail || len(q.queue) == 0 { + q.queue = append(q.queue, entry) + } else { + q.queue = append(q.queue[:1], append([]*QueueTrackEntry{entry}, q.queue[1:]...)...) + } + + return nil +} + +func (q *Queue) HandleQueue() { + if q.audioQueue.GetQueueSize() == 0 { + q.AddTrack(<-q.QueueEmpty, true) + } +} + +func (q *Queue) GetQueue() (result []*QueueTrackEntry) { + q.mutex.RLock() + defer q.mutex.RUnlock() + + if len(q.queue) > 1 { + result = make([]*QueueTrackEntry, len(q.queue)-1) + copy(result, q.queue[1:]) + } + + return +} + +func (q *Queue) Get(identifier audio.QueueIdentifier) *QueueTrackEntry { + q.mutex.RLock() + defer q.mutex.RUnlock() + for _, e := range q.queue { + if e.QueueIdentifier == identifier { + return e + } + } + + return nil +} + +func (q *Queue) GetNowPlaying() *QueueTrackEntry { + if e := q.audioQueue.GetQueueHead(); e != nil { + return q.Get(e.Identifier) + } + + return nil +} + +func (q *Queue) SkipNowPlaying() bool { + if e := q.audioQueue.GetQueueHead(); e != nil { + return q.Remove(e.Identifier) + } + + return false +} + +func (q *Queue) GetIndex(index int) *QueueTrackEntry { + if e := q.audioQueue.GetQueueIndex(index + 1); e != nil { + return q.Get(e.Identifier) + } + + return nil +} + +func (q *Queue) GetHead() *QueueTrackEntry { + if e := q.audioQueue.GetQueueIndex(1); e != nil { + return q.Get(e.Identifier) + } + + return nil +} + +func (q *Queue) GetTail() *QueueTrackEntry { + if i, e := q.audioQueue.GetQueueTail(); i != 0 && e != nil { + return q.Get(e.Identifier) + } + + return nil +} + +func (q *Queue) Remove(identifier audio.QueueIdentifier) bool { + q.mutex.Lock() + for i, e := range q.queue { + if e.QueueIdentifier == identifier { + q.queue = append(q.queue[:i], q.queue[i+1:]...) + q.mutex.Unlock() + q.audioQueue.Remove(identifier) + return true + } + } + q.mutex.Unlock() + q.audioQueue.Remove(identifier) + return false +} + +type httpWriter struct { + writer http.ResponseWriter +} + +func (h *httpWriter) Write(p []byte) (n int, err error) { + if h.writer != nil { + _, err = h.writer.Write(p) + if err != nil { + h.writer = nil + } + } + return len(p), nil +} + +func (h *httpWriter) Close() (err error) { + h.writer = nil + return nil +} + +func (h *httpWriter) Flush() { + if h.writer != nil { + if flusher, ok := h.writer.(http.Flusher); ok { + flusher.Flush() + } + } +} + +func (q *Queue) GetListeners() (listeners []*ListenerInformation) { + q.mutex.RLock() + defer q.mutex.Unlock() + + listeners = make([]*ListenerInformation, 0, 1) + + for _, mount := range q.mounts { + listeners = append(listeners, mount.GetListeners()...) + } + return +} + +func (q *Queue) HandleRadioRequest(writer http.ResponseWriter, request *http.Request) { + mountName := strings.TrimPrefix(request.URL.Path, "/") + for _, mount := range q.mounts { + if mount.Mount == mountName { + writer.Header().Set("Server", "MeteorLight/radio") + writer.Header().Set("Content-Type", mount.MimeType) + writer.Header().Set("Accept-Ranges", "none") + writer.Header().Set("Connection", "keep-alive") + writer.Header().Set("X-Audiocast-Name", q.config.Radio.Name) + writer.Header().Set("Cache-Control", "no-store, max-age=0, no-transform") + writer.Header().Set("X-Content-Type-Options", "nosniff") + + byteWriter := &httpWriter{writer: writer} + + var wgClient sync.WaitGroup + + writeCallback := func(packet packetizer.Packet) error { + //TODO: icy + /* + select { + case <-request.Context().Done(): + // Client gave up + default: + } + + */ + _, err := byteWriter.Write(packet.GetData()) + byteWriter.Flush() + return err + } + wgClient.Add(1) + + var headers []HeaderEntry + for k, v := range request.Header { + for _, s := range v { + headers = append(headers, HeaderEntry{ + Name: k, + Value: s, + }) + } + } + + uriPath := request.URL.Path + if len(request.URL.Query().Encode()) > 0 { + uriPath += "?" + request.URL.Query().Encode() + } + + mount.AddListener(&StreamListener{ + Information: ListenerInformation{ + Mount: mountName, + Path: uriPath, + Headers: headers, + }, + Start: func(packets []packetizer.Packet) error { + for _, p := range packets { + if err := writeCallback(p); err != nil { + return err + } + } + return nil + }, + Write: writeCallback, + Close: func() { + byteWriter.Close() + wgClient.Done() + }, + }) + wgClient.Wait() + return + } + } + + writer.WriteHeader(http.StatusNotFound) + return +}