Added proper queue system
All checks were successful
continuous-integration/drone/push Build is passing

This commit is contained in:
DataHoarder 2022-03-02 18:54:56 +01:00
parent 39baf9a58f
commit 7b3e844f6d
7 changed files with 615 additions and 307 deletions

16
.drone.yml Normal file
View file

@ -0,0 +1,16 @@
---
kind: pipeline
type: docker
name: default
steps:
- name: test
image: golang:1.18rc1-bullseye
commands:
- DEBIAN_FRONTEND=noninteractive apt update
- DEBIAN_FRONTEND=noninteractive apt install -y git build-essential autoconf automake libtool libflac-dev libopus-dev libopusfile-dev libsamplerate0-dev libmp3lame-dev
- git clone --depth 1 https://github.com/xiph/libopusenc.git && cd libopusenc && ./autogen.sh && ./configure --prefix /usr && make && make install && cd ..
- git clone --depth 1 https://github.com/mstorsjo/fdk-aac.git && cd fdk-aac && ./autogen.sh && ./configure --prefix /usr && make -j$(nproc) && make install && cd ..
- go build -v .
...

View file

@ -1,155 +1,19 @@
package main
import (
"bytes"
"encoding/json"
"errors"
"flag"
"fmt"
"git.gammaspectra.live/S.O.N.G/Kirika/audio"
"git.gammaspectra.live/S.O.N.G/Kirika/audio/format/flac"
"git.gammaspectra.live/S.O.N.G/Kirika/audio/format/mp3"
"git.gammaspectra.live/S.O.N.G/Kirika/audio/format/opus"
"git.gammaspectra.live/S.O.N.G/Kirika/audio/format/tta"
"git.gammaspectra.live/S.O.N.G/Kirika/audio/packetizer"
"io"
"io/ioutil"
"log"
"net/http"
"os"
"path"
"strconv"
"strings"
"sync"
)
var audioQueue = audio.NewQueue(44100, 2, 64)
var flacFormat = flac.NewFormat()
var ttaFormat = tta.NewFormat()
var mp3Format = mp3.NewFormat()
var opusFormat = opus.NewFormat()
type QueueTrackEntry struct {
QueueIdentifier audio.QueueIdentifier
Path string
Metadata struct {
Title string
Album string
Artist string
Art string
}
original map[string]interface{}
}
var queue []*QueueTrackEntry
var queueLock sync.RWMutex
var config *Config
func addTrack(entry *QueueTrackEntry, tail bool) error {
f, err := os.Open(entry.Path)
if err != nil {
return err
}
var source audio.Source
switch strings.ToLower(path.Ext(entry.Path)) {
case ".flac":
source, err = flacFormat.Open(f)
case ".tta":
source, err = ttaFormat.Open(f)
case ".mp3":
source, err = mp3Format.Open(f)
case ".ogg", ".opus":
source, err = opusFormat.Open(f)
}
if err != nil {
f.Close()
return err
}
if source.Blocks == nil {
f.Close()
return fmt.Errorf("could not find decoder for %s", entry.Path)
}
if tail {
entry.QueueIdentifier = audioQueue.AddTail(source, func(q *audio.Queue, entry *audio.QueueEntry) {
log.Printf("now playing %s\n", f.Name())
go handleQueue()
}, func(q *audio.Queue, entry *audio.QueueEntry) {
log.Printf("finished playing %s\n", f.Name())
f.Close()
go handleQueueRemove(entry.Identifier)
go handleQueue()
})
} else {
entry.QueueIdentifier = audioQueue.AddHead(source, func(q *audio.Queue, entry *audio.QueueEntry) {
log.Printf("now playing %s\n", f.Name())
go handleQueue()
}, func(q *audio.Queue, entry *audio.QueueEntry) {
log.Printf("finished playing %s\n", f.Name())
f.Close()
go handleQueueRemove(entry.Identifier)
go handleQueue()
})
}
entry.original["queue_id"] = entry.QueueIdentifier
return nil
}
func handleQueueRemove(identifier audio.QueueIdentifier) {
queueLock.Lock()
defer queueLock.Unlock()
for i, q := range queue {
if q.QueueIdentifier == identifier {
queue = append(queue[:i], queue[i+1:]...)
return
}
}
}
func handleQueue() {
if len(audioQueue.GetQueue()) <= 0 { //TODO: pre-queue it, or remove existing track
queueLock.Lock()
defer queueLock.Unlock()
if e := getRandomTrack(); e != nil {
if err := addTrack(e, true); err != nil {
addTrack(getFallbackTrack(), true) //TODO: how to handle fallback error
}
} else {
addTrack(getFallbackTrack(), true) //TODO: how to handle fallback error
}
}
}
func getRandomTrack() *QueueTrackEntry {
response, err := http.DefaultClient.Get(config.Queue.RandomSongApi)
if err != nil {
return nil
}
body, err := ioutil.ReadAll(response.Body)
if err != nil {
return nil
}
return getQueueEntryFromBody(body)
}
func getFallbackTrack() *QueueTrackEntry {
m := make(map[string]interface{})
m["path"] = config.Queue.FallbackPath
return &QueueTrackEntry{
Path: config.Queue.FallbackPath,
original: m,
}
}
func getQueueEntryFromBody(body []byte) *QueueTrackEntry {
entry := &QueueTrackEntry{}
err := json.Unmarshal(body, &entry.original)
@ -193,60 +57,79 @@ func getQueueEntryFromBody(body []byte) *QueueTrackEntry {
return nil
}
type httpWriter struct {
io.WriteCloser
writer http.ResponseWriter
}
func (h *httpWriter) Write(p []byte) (n int, err error) {
if h.writer != nil {
_, err = h.writer.Write(p)
if err != nil {
h.writer = nil
}
}
return len(p), nil
}
func (h *httpWriter) Close() (err error) {
h.writer = nil
return nil
}
type resultResponse struct {
Success bool `json:"success"`
Reason error `json:"reason"`
}
type queueResultResponse struct {
resultResponse
QueueId audio.QueueIdentifier `json:"queue_id"`
}
func main() {
configPath := flag.String("config", "config.toml", "Config path")
flag.Parse()
var err error
config, err = GetConfig(*configPath)
config, err := GetConfig(*configPath)
if err != nil {
log.Panic(err)
}
var wg sync.WaitGroup
var mounts []*StreamMount
queue := NewQueue(config)
handleQueue()
sources := SplitAudioSource((audio.NewRealTimeFilter(20)).Process(audioQueue.GetSource()), len(config.Streams))
for i, s := range config.Streams {
mount := NewStreamMount(sources[i], s.MountPath, s.Codec, s.Container, s.Bitrate)
if mount == nil {
log.Panicf("could not initialize %s\n", s.MountPath)
getRandomTrack := func() *QueueTrackEntry {
response, err := http.DefaultClient.Get(config.Queue.RandomSongApi)
if err != nil {
return nil
}
mounts = append(mounts, mount)
wg.Add(1)
go mount.Process()
body, err := ioutil.ReadAll(response.Body)
if err != nil {
return nil
}
return getQueueEntryFromBody(body)
}
getFallbackTrack := func() *QueueTrackEntry {
m := make(map[string]interface{})
m["path"] = config.Queue.FallbackPath
return &QueueTrackEntry{
Path: config.Queue.FallbackPath,
original: m,
}
}
wg.Add(1)
go func() {
defer wg.Done()
defer close(queue.QueueEmpty)
//TODO: close properly
for {
if e := getRandomTrack(); e != nil {
queue.QueueEmpty <- e
} else if e = getFallbackTrack(); e != nil {
queue.QueueEmpty <- e
}
}
}()
wg.Add(1)
go func() {
defer wg.Done()
for np := range queue.NowPlaying {
jsonData, _ := json.Marshal(np.original)
_, err := http.DefaultClient.Post(config.Queue.NowPlaying, "application/json; charset=utf-8", bytes.NewReader(jsonData))
if err != nil {
log.Print(err)
}
}
}()
wg.Add(1)
go func() {
defer wg.Done()
@ -256,60 +139,130 @@ func main() {
Handler: http.HandlerFunc(func(writer http.ResponseWriter, request *http.Request) {
writer.Header().Set("Server", "MeteorLight/api")
writer.Header().Set("Content-Type", "application/json; charset=utf-8")
switch request.URL.Path {
case "/listeners":
var listeners []*ListenerInformation
for _, mount := range mounts {
listeners = append(listeners, mount.GetListeners()...)
}
jsonData, _ := json.Marshal(listeners)
writer.Write(jsonData)
case "/queue":
var blobs = make([]map[string]interface{}, 0, 1)
queueLock.RLock()
defer queueLock.RUnlock()
for _, e := range queue {
blobs = append(blobs, e.original)
}
jsonData, _ := json.Marshal(blobs)
writer.Write(jsonData)
case "/skip":
if request.Method != "POST" {
return
}
result := resultResponse{}
q := audioQueue.GetQueue()
if len(q) > 0 {
result.Success = audioQueue.Remove(q[0])
} else {
result.Reason = errors.New("queue empty")
}
jsonData, _ := json.Marshal(result)
writer.Write(jsonData)
case "/queue/clear":
if request.Method != "POST" {
return
}
result := resultResponse{}
q := audioQueue.GetQueue()
if len(q) > 0 {
for _, id := range q[1:] {
audioQueue.Remove(id)
pathSegments := strings.Split(request.URL.Path, "/")
if len(pathSegments) > 1 {
switch pathSegments[1] {
case "listeners":
jsonData, _ := json.Marshal(queue.GetListeners())
writer.Write(jsonData)
case "np":
if e := queue.GetNowPlaying(); e != nil {
jsonData, _ := json.Marshal(e.original)
writer.Write(jsonData)
}
result.Success = true
case "queue":
if len(pathSegments) == 2 {
if request.Method != "POST" {
return
}
var blobs = make([]map[string]interface{}, 0, 1)
for _, e := range queue.GetQueue() {
blobs = append(blobs, e.original)
}
jsonData, _ := json.Marshal(blobs)
writer.Write(jsonData)
} else {
switch pathSegments[2] {
case "head":
if request.Method == "POST" {
result := queueResultResponse{}
if body, err := ioutil.ReadAll(request.Body); err == nil {
if e := getQueueEntryFromBody(body); e != nil {
if err = queue.AddTrack(e, false); err == nil {
result.Success = true
result.QueueId = e.QueueIdentifier
}
}
}
jsonData, _ := json.Marshal(result)
writer.Write(jsonData)
return
} else if request.Method == "DELETE" {
result := resultResponse{}
if head := queue.GetHead(); head != nil {
result.Success = queue.Remove(head.QueueIdentifier)
}
jsonData, _ := json.Marshal(result)
writer.Write(jsonData)
return
}
case "tail":
if request.Method == "POST" {
result := queueResultResponse{}
if body, err := ioutil.ReadAll(request.Body); err == nil {
if e := getQueueEntryFromBody(body); e != nil {
if err = queue.AddTrack(e, true); err == nil {
result.Success = true
result.QueueId = e.QueueIdentifier
}
}
}
jsonData, _ := json.Marshal(result)
writer.Write(jsonData)
return
} else if request.Method == "DELETE" {
result := resultResponse{}
if head := queue.GetTail(); head != nil {
result.Success = queue.Remove(head.QueueIdentifier)
}
jsonData, _ := json.Marshal(result)
writer.Write(jsonData)
return
}
case "clear":
if request.Method != "POST" {
return
}
result := resultResponse{}
for _, e := range queue.GetQueue() {
queue.Remove(e.QueueIdentifier)
}
result.Success = true
jsonData, _ := json.Marshal(result)
writer.Write(jsonData)
default:
if request.Method != "POST" {
return
}
if i, err := strconv.ParseInt(pathSegments[2], 10, 0); err == nil {
result := resultResponse{}
result.Success = queue.Remove(audio.QueueIdentifier(i))
jsonData, _ := json.Marshal(result)
writer.Write(jsonData)
}
}
}
case "skip":
if request.Method != "POST" {
return
}
result := resultResponse{}
result.Success = queue.SkipNowPlaying()
jsonData, _ := json.Marshal(result)
writer.Write(jsonData)
}
jsonData, _ := json.Marshal(result)
writer.Write(jsonData)
}
}),
}
@ -324,80 +277,8 @@ func main() {
defer wg.Done()
server := http.Server{
Addr: fmt.Sprintf(":%d", config.Radio.Port),
Handler: http.HandlerFunc(func(writer http.ResponseWriter, request *http.Request) {
mountName := strings.TrimPrefix(request.URL.Path, "/")
for _, mount := range mounts {
if mount.Mount == mountName {
writer.Header().Set("Server", "MeteorLight/radio")
writer.Header().Set("Content-Type", mount.MimeType)
writer.Header().Set("Accept-Ranges", "none")
writer.Header().Set("Connection", "keep-alive")
writer.Header().Set("X-Audiocast-Name", config.Radio.Name)
writer.Header().Set("Cache-Control", "no-store, max-age=0, no-transform")
writer.Header().Set("X-Content-Type-Options", "nosniff")
byteWriter := &httpWriter{writer: writer}
var wgClient sync.WaitGroup
writeCallback := func(packet packetizer.Packet) error {
/*
select {
case <-request.Context().Done():
// Client gave up
default:
}
*/
_, err := byteWriter.Write(packet.GetData())
return err
}
wgClient.Add(1)
var headers []HeaderEntry
for k, v := range request.Header {
for _, s := range v {
headers = append(headers, HeaderEntry{
Name: k,
Value: s,
})
}
}
uriPath := request.URL.Path
if len(request.URL.Query().Encode()) > 0 {
uriPath += "?" + request.URL.Query().Encode()
}
mount.AddListener(&StreamListener{
Information: ListenerInformation{
Mount: mountName,
Path: uriPath,
Headers: headers,
},
Start: func(packets []packetizer.Packet) error {
for _, p := range packets {
if err := writeCallback(p); err != nil {
return err
}
}
return nil
},
Write: writeCallback,
Close: func() {
byteWriter.Close()
wgClient.Done()
},
})
wgClient.Wait()
return
}
}
writer.WriteHeader(http.StatusNotFound)
return
}),
Addr: fmt.Sprintf(":%d", config.Radio.Port),
Handler: http.HandlerFunc(queue.HandleRadioRequest),
}
if err := server.ListenAndServe(); err != nil {
@ -405,5 +286,12 @@ func main() {
}
}()
if e := getRandomTrack(); e != nil {
queue.AddTrack(e, false)
} else if e = getFallbackTrack(); e != nil {
queue.AddTrack(e, false)
}
wg.Wait()
queue.Wait()
}

View file

@ -4,14 +4,16 @@ Radio streamer ([kawa](https://github.com/Luminarys/kawa) drop-in compatible).
This project is a Work in Progress.
`TODO: np, API: np queue/head queue/tail, proper handling of audio queue <-> data queue, packetizer buffer`
`TODO: proper handling of audio queue <-> data queue, timeouts, ICY metadata`
# Improvements / differences from Kawa
* Does not use libav ([see supported formats/codecs on Kirika](https://git.gammaspectra.live/S.O.N.G/Kirika#codecs-supported))
* Supports listeners that have more than 16 HTTP headers on their requests.
* Does not restart stream per-track, instead being a continuous stream.
* Normalized channels / sample rates for endpoints.
* Implements ICY metadata (WiP)
* Supports HTTP clients that have more than 16 HTTP request headers.
* Does not restart stream per-track, instead being a continuous stream without parameter changes.
* Normalized channels / sample rates for mounts.
* Implements ICY metadata
* Uses sample/timed packet buffers, instead of kawa byte buffers, which cause wild differences between endpoints. Mounts usually align within 0.2s of each other, depending on client.
## Dependencies
### Go >= 1.18
@ -19,4 +21,65 @@ This project is a Work in Progress.
### [Kirika](https://git.gammaspectra.live/S.O.N.G/Kirika) dependencies
Kirika is a collection of audio utilities for decoding/encoding files and streams.
Check its native dependencies that must be installed before usage.
Check its native dependencies that must be installed before usage.
## Usage
Start by copying example_config.toml to the location of your choice and reading through it. Of importance are `queue.fallback`, and `queue.random_song_api`.
MeteorLight will search for `config.toml` in its working directory. Alternatively you can pass `-config "/example/path/config.toml"` to specify a different location.
Batteries are not included - MeteorLight needs to be paired with your own software to find songs to stream.
You will have to provide an external API that MeteorLight can query for songs to play and notify as new songs being played.
Before continuing, you will need to install the dependencies listed above.
### From Git repository
```shell
$ git clone https://git.gammaspectra.live/S.O.N.G/MeteorLight.git && cd MeteorLight
$ go run .
```
### From Go run
```shell
$ go run git.gammaspectra.live/S.O.N.G/MeteorLight@<commit_hash>
```
## API
See [kawa API](https://github.com/Luminarys/kawa#api) for a general overview. Additional endpoints or changed ones are listed below.
Track blobs returned have a `queue_id` parameter, regardless of source.
### `NEW` DELETE /queue/<queue_id>
Unqueues the track with `queue_id` specified as a parameter.
#### Response
```json
{
"success": true,
"reason": null
}
```
### `CHANGED` POST /queue/head
Same as kawa's, but `queue_id` is added to response directly.
#### Response
```json
{
"success": true,
"reason": null,
"queue_id": 3
}
```
### `CHANGED` POST /queue/tail
Same as kawa's, but `queue_id` is added to response directly.
#### Response
```json
{
"success": true,
"reason": null,
"queue_id": 5
}
```

2
go.mod
View file

@ -3,7 +3,7 @@ module git.gammaspectra.live/S.O.N.G/MeteorLight
go 1.18
require (
git.gammaspectra.live/S.O.N.G/Kirika v0.0.0-20220301232239-8254100fb58f
git.gammaspectra.live/S.O.N.G/Kirika v0.0.0-20220302151314-7b6b11dd6c5c
github.com/BurntSushi/toml v1.0.0
)

4
go.sum
View file

@ -1,5 +1,5 @@
git.gammaspectra.live/S.O.N.G/Kirika v0.0.0-20220301232239-8254100fb58f h1:7WbbseQ4WAodtxOf1m7vT4NDphNIe7ScMkGP5HeYNyk=
git.gammaspectra.live/S.O.N.G/Kirika v0.0.0-20220301232239-8254100fb58f/go.mod h1:NYC/3wOINygtTYvAqEtMfgWBeJ/9Gfv0NvDxnWmg+yA=
git.gammaspectra.live/S.O.N.G/Kirika v0.0.0-20220302151314-7b6b11dd6c5c h1:8Tcq/ueYofDoeRgovGwekXHhyH0i15vm79W5wK/WwpE=
git.gammaspectra.live/S.O.N.G/Kirika v0.0.0-20220302151314-7b6b11dd6c5c/go.mod h1:NYC/3wOINygtTYvAqEtMfgWBeJ/9Gfv0NvDxnWmg+yA=
git.gammaspectra.live/S.O.N.G/go-fdkaac v0.0.0-20220228131722-e9cb84c52f48 h1:MaKiBfXQl0keyfdCi1PxGOKRTiWhIs8PqCal5GhKDi0=
git.gammaspectra.live/S.O.N.G/go-fdkaac v0.0.0-20220228131722-e9cb84c52f48/go.mod h1:pkWt//S9hLVEQaJDPu/cHHPk8vPpo/0+zHy0me4LIP4=
git.gammaspectra.live/S.O.N.G/go-pus v0.0.0-20220227175608-6cc027f24dba h1:JEaxCVgdr3XXAuDCPAx7ttLFZaaHzTEzG+oRnVUtUKU=

View file

@ -195,7 +195,8 @@ func (m *StreamMount) GetListeners() (entries []*ListenerInformation) {
return
}
func (m *StreamMount) Process() {
func (m *StreamMount) Process(group *sync.WaitGroup) {
defer group.Done()
defer func() {
m.listenersLock.Lock()
for _, l := range m.listeners {

340
queue.go Normal file
View file

@ -0,0 +1,340 @@
package main
import (
"fmt"
"git.gammaspectra.live/S.O.N.G/Kirika/audio"
"git.gammaspectra.live/S.O.N.G/Kirika/audio/format/flac"
"git.gammaspectra.live/S.O.N.G/Kirika/audio/format/mp3"
"git.gammaspectra.live/S.O.N.G/Kirika/audio/format/opus"
"git.gammaspectra.live/S.O.N.G/Kirika/audio/format/tta"
"git.gammaspectra.live/S.O.N.G/Kirika/audio/packetizer"
"log"
"net/http"
"os"
"path"
"strings"
"sync"
)
type QueueTrackEntry struct {
QueueIdentifier audio.QueueIdentifier
Path string
Metadata struct {
Title string
Album string
Artist string
Art string
}
original map[string]interface{}
}
type Queue struct {
NowPlaying chan *QueueTrackEntry
QueueEmpty chan *QueueTrackEntry
audioQueue *audio.Queue
mounts []*StreamMount
queue []*QueueTrackEntry
mutex sync.RWMutex
config *Config
wg sync.WaitGroup
}
func NewQueue(config *Config) *Queue {
q := &Queue{
NowPlaying: make(chan *QueueTrackEntry, 1),
QueueEmpty: make(chan *QueueTrackEntry),
config: config,
audioQueue: audio.NewQueue(44100, 2),
}
blocksPerSecond := 20
bufferSeconds := blocksPerSecond * 1
sources := SplitAudioSource(audio.NewFilterChain(q.audioQueue.GetSource(), audio.NewRealTimeFilter(blocksPerSecond), audio.NewBufferFilter(bufferSeconds)), len(config.Streams))
for i, s := range q.config.Streams {
mount := NewStreamMount(sources[i], s.MountPath, s.Codec, s.Container, s.Bitrate)
if mount == nil {
log.Panicf("could not initialize %s\n", s.MountPath)
}
q.mounts = append(q.mounts, mount)
q.wg.Add(1)
go mount.Process(&q.wg)
}
return q
}
func (q *Queue) Wait() {
q.wg.Wait()
close(q.NowPlaying)
}
var flacFormat = flac.NewFormat()
var ttaFormat = tta.NewFormat()
var mp3Format = mp3.NewFormat()
var opusFormat = opus.NewFormat()
func (q *Queue) AddTrack(entry *QueueTrackEntry, tail bool) error {
f, err := os.Open(entry.Path)
if err != nil {
return err
}
var source audio.Source
switch strings.ToLower(path.Ext(entry.Path)) {
case ".flac":
source, err = flacFormat.Open(f)
case ".tta":
source, err = ttaFormat.Open(f)
case ".mp3":
source, err = mp3Format.Open(f)
case ".ogg", ".opus":
source, err = opusFormat.Open(f)
}
if err != nil {
f.Close()
return err
}
if source.Blocks == nil {
f.Close()
return fmt.Errorf("could not find decoder for %s", entry.Path)
}
startCallback := func(queue *audio.Queue, entry *audio.QueueEntry) {
log.Printf("now playing %s\n", f.Name())
if e := q.Get(entry.Identifier); e != nil {
q.NowPlaying <- e
}
}
endCallback := func(queue *audio.Queue, entry *audio.QueueEntry) {
}
removeCallback := func(queue *audio.Queue, entry *audio.QueueEntry) {
defer f.Close()
q.Remove(entry.Identifier)
q.HandleQueue()
}
q.mutex.Lock()
defer q.mutex.Unlock()
if tail {
entry.QueueIdentifier = q.audioQueue.AddTail(source, startCallback, endCallback, removeCallback)
} else {
entry.QueueIdentifier = q.audioQueue.AddHead(source, startCallback, endCallback, removeCallback)
}
entry.original["queue_id"] = entry.QueueIdentifier
if tail || len(q.queue) == 0 {
q.queue = append(q.queue, entry)
} else {
q.queue = append(q.queue[:1], append([]*QueueTrackEntry{entry}, q.queue[1:]...)...)
}
return nil
}
func (q *Queue) HandleQueue() {
if q.audioQueue.GetQueueSize() == 0 {
q.AddTrack(<-q.QueueEmpty, true)
}
}
func (q *Queue) GetQueue() (result []*QueueTrackEntry) {
q.mutex.RLock()
defer q.mutex.RUnlock()
if len(q.queue) > 1 {
result = make([]*QueueTrackEntry, len(q.queue)-1)
copy(result, q.queue[1:])
}
return
}
func (q *Queue) Get(identifier audio.QueueIdentifier) *QueueTrackEntry {
q.mutex.RLock()
defer q.mutex.RUnlock()
for _, e := range q.queue {
if e.QueueIdentifier == identifier {
return e
}
}
return nil
}
func (q *Queue) GetNowPlaying() *QueueTrackEntry {
if e := q.audioQueue.GetQueueHead(); e != nil {
return q.Get(e.Identifier)
}
return nil
}
func (q *Queue) SkipNowPlaying() bool {
if e := q.audioQueue.GetQueueHead(); e != nil {
return q.Remove(e.Identifier)
}
return false
}
func (q *Queue) GetIndex(index int) *QueueTrackEntry {
if e := q.audioQueue.GetQueueIndex(index + 1); e != nil {
return q.Get(e.Identifier)
}
return nil
}
func (q *Queue) GetHead() *QueueTrackEntry {
if e := q.audioQueue.GetQueueIndex(1); e != nil {
return q.Get(e.Identifier)
}
return nil
}
func (q *Queue) GetTail() *QueueTrackEntry {
if i, e := q.audioQueue.GetQueueTail(); i != 0 && e != nil {
return q.Get(e.Identifier)
}
return nil
}
func (q *Queue) Remove(identifier audio.QueueIdentifier) bool {
q.mutex.Lock()
for i, e := range q.queue {
if e.QueueIdentifier == identifier {
q.queue = append(q.queue[:i], q.queue[i+1:]...)
q.mutex.Unlock()
q.audioQueue.Remove(identifier)
return true
}
}
q.mutex.Unlock()
q.audioQueue.Remove(identifier)
return false
}
type httpWriter struct {
writer http.ResponseWriter
}
func (h *httpWriter) Write(p []byte) (n int, err error) {
if h.writer != nil {
_, err = h.writer.Write(p)
if err != nil {
h.writer = nil
}
}
return len(p), nil
}
func (h *httpWriter) Close() (err error) {
h.writer = nil
return nil
}
func (h *httpWriter) Flush() {
if h.writer != nil {
if flusher, ok := h.writer.(http.Flusher); ok {
flusher.Flush()
}
}
}
func (q *Queue) GetListeners() (listeners []*ListenerInformation) {
q.mutex.RLock()
defer q.mutex.Unlock()
listeners = make([]*ListenerInformation, 0, 1)
for _, mount := range q.mounts {
listeners = append(listeners, mount.GetListeners()...)
}
return
}
func (q *Queue) HandleRadioRequest(writer http.ResponseWriter, request *http.Request) {
mountName := strings.TrimPrefix(request.URL.Path, "/")
for _, mount := range q.mounts {
if mount.Mount == mountName {
writer.Header().Set("Server", "MeteorLight/radio")
writer.Header().Set("Content-Type", mount.MimeType)
writer.Header().Set("Accept-Ranges", "none")
writer.Header().Set("Connection", "keep-alive")
writer.Header().Set("X-Audiocast-Name", q.config.Radio.Name)
writer.Header().Set("Cache-Control", "no-store, max-age=0, no-transform")
writer.Header().Set("X-Content-Type-Options", "nosniff")
byteWriter := &httpWriter{writer: writer}
var wgClient sync.WaitGroup
writeCallback := func(packet packetizer.Packet) error {
//TODO: icy
/*
select {
case <-request.Context().Done():
// Client gave up
default:
}
*/
_, err := byteWriter.Write(packet.GetData())
byteWriter.Flush()
return err
}
wgClient.Add(1)
var headers []HeaderEntry
for k, v := range request.Header {
for _, s := range v {
headers = append(headers, HeaderEntry{
Name: k,
Value: s,
})
}
}
uriPath := request.URL.Path
if len(request.URL.Query().Encode()) > 0 {
uriPath += "?" + request.URL.Query().Encode()
}
mount.AddListener(&StreamListener{
Information: ListenerInformation{
Mount: mountName,
Path: uriPath,
Headers: headers,
},
Start: func(packets []packetizer.Packet) error {
for _, p := range packets {
if err := writeCallback(p); err != nil {
return err
}
}
return nil
},
Write: writeCallback,
Close: func() {
byteWriter.Close()
wgClient.Done()
},
})
wgClient.Wait()
return
}
}
writer.WriteHeader(http.StatusNotFound)
return
}