diff --git a/MeteorLight.go b/MeteorLight.go index eaaa63f..f5a9984 100644 --- a/MeteorLight.go +++ b/MeteorLight.go @@ -1,57 +1,14 @@ package main import ( - "bytes" - "encoding/json" "flag" "fmt" - "git.gammaspectra.live/S.O.N.G/Kirika/audio" - "io/ioutil" "log" "net/http" - "strconv" - "strings" "sync" "time" ) -func getQueueEntryFromBody(body []byte) *QueueTrackEntry { - entry := &QueueTrackEntry{} - err := json.Unmarshal(body, &entry.original) - if err != nil { - return nil - } - err = json.Unmarshal(body, &entry.Metadata) - if err != nil { - return nil - } - - var val interface{} - var strVal string - var ok bool - if val, ok = entry.original["path"]; ok { - if strVal, ok = val.(string); ok { - entry.Path = strVal - } - } - - if len(entry.Path) > 0 { - return entry - } - - return nil -} - -type resultResponse struct { - Success bool `json:"success"` - Reason error `json:"reason"` -} - -type queueResultResponse struct { - resultResponse - QueueId audio.QueueIdentifier `json:"queue_id"` -} - func main() { configPath := flag.String("config", "config.toml", "Config path") @@ -66,243 +23,6 @@ func main() { queue := NewQueue(config) - var nr *QueueTrackEntry - - getRandomTrack := func() *QueueTrackEntry { - response, err := http.DefaultClient.Get(config.Queue.RandomSongApi) - if err != nil { - return nil - } - - defer response.Body.Close() - body, err := ioutil.ReadAll(response.Body) - if err != nil { - return nil - } - - return getQueueEntryFromBody(body) - } - - getFallbackTrack := func() *QueueTrackEntry { - m := make(map[string]interface{}) - m["path"] = config.Queue.FallbackPath - return &QueueTrackEntry{ - Path: config.Queue.FallbackPath, - original: m, - } - } - - sendNowRandom := func(nr *QueueTrackEntry) { - if config.Queue.NowRandom != "" { - jsonData, _ := json.Marshal(nr.original) - response, err := http.DefaultClient.Post(config.Queue.NowRandom, "application/json; charset=utf-8", bytes.NewReader(jsonData)) - if err != nil { - log.Print(err) - } - if response != nil { - defer response.Body.Close() - } - } - } - - wg.Add(1) - go func() { - defer wg.Done() - defer close(queue.QueueEmpty) - //TODO: close properly - for { - if e := getRandomTrack(); e != nil { - nr = e - sendNowRandom(nr) - queue.QueueEmpty <- e - } else if e = getFallbackTrack(); e != nil { - nr = e - sendNowRandom(nr) - queue.QueueEmpty <- e - } - } - }() - - wg.Add(1) - go func() { - defer wg.Done() - for np := range queue.NowPlaying { - jsonData, _ := json.Marshal(np.original) - response, err := http.DefaultClient.Post(config.Queue.NowPlaying, "application/json; charset=utf-8", bytes.NewReader(jsonData)) - if err != nil { - log.Print(err) - } - if response != nil { - response.Body.Close() - } - } - }() - - wg.Add(1) - go func() { - defer wg.Done() - - server := http.Server{ - Addr: fmt.Sprintf("%s:%d", config.Api.Host, config.Api.Port), - Handler: http.HandlerFunc(func(writer http.ResponseWriter, request *http.Request) { - writer.Header().Set("Server", "MeteorLight/api") - writer.Header().Set("Content-Type", "application/json; charset=utf-8") - - pathSegments := strings.Split(request.URL.Path, "/") - if len(pathSegments) > 1 { - switch pathSegments[1] { - case "listeners": - jsonData, _ := json.Marshal(queue.GetListeners()) - writer.Write(jsonData) - - return - case "np": - if e := queue.GetNowPlaying(); e != nil { - jsonData, _ := json.Marshal(e.original) - writer.Write(jsonData) - } else { - writer.Write([]byte{'{', '}'}) - } - - return - case "random": - if nr != nil { - jsonData, _ := json.Marshal(nr.original) - writer.Write(jsonData) - } else { - writer.Write([]byte{'{', '}'}) - } - - return - case "queue": - if len(pathSegments) == 2 { - if request.Method != "GET" { - return - } - var blobs = make([]map[string]interface{}, 0, 1) - for _, e := range queue.GetQueue() { - blobs = append(blobs, e.original) - } - jsonData, _ := json.Marshal(blobs) - writer.Write(jsonData) - - return - } else { - switch pathSegments[2] { - case "head": - if request.Method == "POST" { - result := queueResultResponse{} - if body, err := ioutil.ReadAll(request.Body); err == nil { - if e := getQueueEntryFromBody(body); e != nil { - if err = queue.AddTrack(e, false); err == nil { - result.Success = true - result.QueueId = e.QueueIdentifier - } - } - } - - jsonData, _ := json.Marshal(result) - writer.Write(jsonData) - - return - } else if request.Method == "DELETE" { - result := resultResponse{} - if head := queue.GetHead(); head != nil { - result.Success = queue.Remove(head.QueueIdentifier) - } - - jsonData, _ := json.Marshal(result) - writer.Write(jsonData) - - return - } - - case "tail": - if request.Method == "POST" { - result := queueResultResponse{} - if body, err := ioutil.ReadAll(request.Body); err == nil { - if e := getQueueEntryFromBody(body); e != nil { - if err = queue.AddTrack(e, true); err == nil { - result.Success = true - result.QueueId = e.QueueIdentifier - } - } - } - - jsonData, _ := json.Marshal(result) - writer.Write(jsonData) - - return - } else if request.Method == "DELETE" { - result := resultResponse{} - if head := queue.GetTail(); head != nil { - result.Success = queue.Remove(head.QueueIdentifier) - } - - jsonData, _ := json.Marshal(result) - writer.Write(jsonData) - - return - } - - case "clear": - if request.Method != "POST" { - return - } - - result := resultResponse{} - - for _, e := range queue.GetQueue() { - queue.Remove(e.QueueIdentifier) - } - result.Success = true - - jsonData, _ := json.Marshal(result) - writer.Write(jsonData) - return - - default: - if request.Method != "POST" { - return - } - - if i, err := strconv.ParseInt(pathSegments[2], 10, 0); err == nil { - result := resultResponse{} - - result.Success = queue.Remove(audio.QueueIdentifier(i)) - - jsonData, _ := json.Marshal(result) - writer.Write(jsonData) - return - } - } - - } - case "skip": - if request.Method != "POST" { - return - } - - result := resultResponse{} - - result.Success = queue.SkipNowPlaying() - - jsonData, _ := json.Marshal(result) - writer.Write(jsonData) - return - } - } - - writer.WriteHeader(http.StatusNotFound) - writer.Write([]byte{'{', '}'}) - }), - } - - if err := server.ListenAndServe(); err != nil { - log.Panic(err) - } - }() - wg.Add(1) go func() { defer wg.Done() @@ -323,12 +43,9 @@ func main() { } }() - if e := getRandomTrack(); e != nil { - queue.AddTrack(e, false) - } else if e = getFallbackTrack(); e != nil { - queue.AddTrack(e, false) - } + api := NewAPI(config, queue) + api.Wait() wg.Wait() queue.Wait() } diff --git a/README.md b/README.md index 40e8629..c9a090f 100644 --- a/README.md +++ b/README.md @@ -7,7 +7,7 @@ Radio streamer ([kawa](https://github.com/Luminarys/kawa) drop-in compatible). * No Vorbis encoding support. * Supports HTTP clients that have more than 16 HTTP request headers or longer than 64 bytes per header. * Does not restart stream per-track, instead being a continuous stream without parameter changes. -* Normalized channels / sample rates for mounts. +* Normalized channels / sample rates for all mounts. * Implements ICY metadata (artist, title, url). * Uses sample/timed packet buffers, instead of kawa byte buffers, which caused wild differences between endpoints. Mounts usually align within 0.2s of each other, depending on client. * Use `queue.buffer_size` to specify number of seconds to buffer (by default 0, automatic per client). diff --git a/api.go b/api.go new file mode 100644 index 0000000..5a0dba1 --- /dev/null +++ b/api.go @@ -0,0 +1,322 @@ +package main + +import ( + "bytes" + "encoding/json" + "fmt" + "git.gammaspectra.live/S.O.N.G/Kirika/audio" + "io/ioutil" + "log" + "net/http" + "strconv" + "strings" + "sync" +) + +type API struct { + config *Config + queue *Queue + wg sync.WaitGroup + nr *QueueTrackEntry +} + +func NewAPI(config *Config, queue *Queue) *API { + api := &API{ + config: config, + queue: queue, + } + + api.listen() + api.handleQueue() + + return api +} + +func (a *API) Wait() { + a.wg.Wait() +} + +func getQueueEntryFromBody(body []byte) *QueueTrackEntry { + entry := &QueueTrackEntry{} + err := json.Unmarshal(body, &entry.original) + if err != nil { + return nil + } + err = json.Unmarshal(body, &entry.Metadata) + if err != nil { + return nil + } + + var val interface{} + var strVal string + var ok bool + if val, ok = entry.original["path"]; ok { + if strVal, ok = val.(string); ok { + entry.Path = strVal + } + } + + if len(entry.Path) > 0 { + return entry + } + + return nil +} + +func (a *API) getFallbackTrack() *QueueTrackEntry { + m := make(map[string]interface{}) + m["path"] = a.config.Queue.FallbackPath + return &QueueTrackEntry{ + Path: a.config.Queue.FallbackPath, + original: m, + } +} +func (a *API) getRandomTrack() *QueueTrackEntry { + response, err := http.DefaultClient.Get(a.config.Queue.RandomSongApi) + if err != nil { + return nil + } + + defer response.Body.Close() + body, err := ioutil.ReadAll(response.Body) + if err != nil { + return nil + } + + return getQueueEntryFromBody(body) +} +func (a *API) setNowRandom(nr *QueueTrackEntry) { + a.nr = nr + if a.config.Queue.NowRandom != "" { + jsonData, _ := json.Marshal(nr.original) + response, err := http.DefaultClient.Post(a.config.Queue.NowRandom, "application/json; charset=utf-8", bytes.NewReader(jsonData)) + if err != nil { + log.Print(err) + } + if response != nil { + defer response.Body.Close() + } + } +} + +func (a *API) handleQueue() { + a.wg.Add(1) + go func() { + defer a.wg.Done() + defer close(a.queue.QueueEmpty) + //TODO: close properly + for { + if e := a.getRandomTrack(); e != nil { + //preload + if err := e.Load(); err != nil { + continue + } + a.setNowRandom(e) + a.queue.QueueEmpty <- e + } else if e = a.getFallbackTrack(); e != nil { + //preload + if err := e.Load(); err != nil { + continue + } + a.setNowRandom(e) + a.queue.QueueEmpty <- e + } + } + }() + + a.wg.Add(1) + go func() { + defer a.wg.Done() + for np := range a.queue.NowPlaying { + jsonData, _ := json.Marshal(np.original) + response, err := http.DefaultClient.Post(a.config.Queue.NowPlaying, "application/json; charset=utf-8", bytes.NewReader(jsonData)) + if err != nil { + log.Print(err) + } + if response != nil { + response.Body.Close() + } + } + }() + + //insert first track + a.queue.HandleQueue() +} + +func (a *API) listen() { + + type resultResponse struct { + Success bool `json:"success"` + Reason error `json:"reason"` + } + + type queueResultResponse struct { + resultResponse + QueueId audio.QueueIdentifier `json:"queue_id"` + } + + a.wg.Add(1) + go func() { + defer a.wg.Done() + + server := http.Server{ + Addr: fmt.Sprintf("%s:%d", a.config.Api.Host, a.config.Api.Port), + Handler: http.HandlerFunc(func(writer http.ResponseWriter, request *http.Request) { + writer.Header().Set("Server", "MeteorLight/api") + writer.Header().Set("Content-Type", "application/json; charset=utf-8") + + pathSegments := strings.Split(request.URL.Path, "/") + if len(pathSegments) > 1 { + switch pathSegments[1] { + case "listeners": + jsonData, _ := json.Marshal(a.queue.GetListeners()) + writer.Write(jsonData) + + return + case "np": + if e := a.queue.GetNowPlaying(); e != nil { + jsonData, _ := json.Marshal(e.original) + writer.Write(jsonData) + } else { + writer.Write([]byte{'{', '}'}) + } + + return + case "random": + if a.nr != nil { + jsonData, _ := json.Marshal(a.nr.original) + writer.Write(jsonData) + } else { + writer.Write([]byte{'{', '}'}) + } + + return + case "queue": + if len(pathSegments) == 2 { + if request.Method != "GET" { + return + } + var blobs = make([]map[string]interface{}, 0, 1) + for _, e := range a.queue.GetQueue() { + blobs = append(blobs, e.original) + } + jsonData, _ := json.Marshal(blobs) + writer.Write(jsonData) + + return + } else { + switch pathSegments[2] { + case "head": + if request.Method == "POST" { + result := queueResultResponse{} + if body, err := ioutil.ReadAll(request.Body); err == nil { + if e := getQueueEntryFromBody(body); e != nil { + if err = a.queue.AddTrack(e, false); err == nil { + result.Success = true + result.QueueId = e.QueueIdentifier + } + } + } + + jsonData, _ := json.Marshal(result) + writer.Write(jsonData) + + return + } else if request.Method == "DELETE" { + result := resultResponse{} + if head := a.queue.GetHead(); head != nil { + result.Success = a.queue.Remove(head.QueueIdentifier) + } + + jsonData, _ := json.Marshal(result) + writer.Write(jsonData) + + return + } + + case "tail": + if request.Method == "POST" { + result := queueResultResponse{} + if body, err := ioutil.ReadAll(request.Body); err == nil { + if e := getQueueEntryFromBody(body); e != nil { + if err = a.queue.AddTrack(e, true); err == nil { + result.Success = true + result.QueueId = e.QueueIdentifier + } + } + } + + jsonData, _ := json.Marshal(result) + writer.Write(jsonData) + + return + } else if request.Method == "DELETE" { + result := resultResponse{} + if head := a.queue.GetTail(); head != nil { + result.Success = a.queue.Remove(head.QueueIdentifier) + } + + jsonData, _ := json.Marshal(result) + writer.Write(jsonData) + + return + } + + case "clear": + if request.Method != "POST" { + return + } + + result := resultResponse{} + + for _, e := range a.queue.GetQueue() { + a.queue.Remove(e.QueueIdentifier) + } + result.Success = true + + jsonData, _ := json.Marshal(result) + writer.Write(jsonData) + return + + default: + if request.Method != "POST" { + return + } + + if i, err := strconv.ParseInt(pathSegments[2], 10, 0); err == nil { + result := resultResponse{} + + result.Success = a.queue.Remove(audio.QueueIdentifier(i)) + + jsonData, _ := json.Marshal(result) + writer.Write(jsonData) + return + } + } + + } + case "skip": + if request.Method != "POST" { + return + } + + result := resultResponse{} + + result.Success = a.queue.SkipNowPlaying() + + jsonData, _ := json.Marshal(result) + writer.Write(jsonData) + return + } + } + + writer.WriteHeader(http.StatusNotFound) + writer.Write([]byte{'{', '}'}) + }), + } + + if err := server.ListenAndServe(); err != nil { + log.Panic(err) + } + }() +} diff --git a/queue.go b/queue.go index e079731..ec54926 100644 --- a/queue.go +++ b/queue.go @@ -14,6 +14,7 @@ import ( "net/http" "os" "path" + "runtime" "strconv" "strings" "sync" @@ -32,10 +33,57 @@ type QueueTrackEntry struct { Artist string `json:"artist"` Art string `json:"art"` } + source audio.Source original map[string]interface{} } +func (e *QueueTrackEntry) Load() error { + if e.source.Blocks != nil { + return nil + } + + f, err := os.Open(e.Path) + if err != nil { + return err + } + //close at end, TODO check if it runs + runtime.SetFinalizer(f, (*os.File).Close) + + var source audio.Source + switch strings.ToLower(path.Ext(e.Path)) { + case ".flac": + source, err = flacFormat.Open(f) + case ".tta": + source, err = ttaFormat.Open(f) + case ".mp3": + source, err = mp3Format.Open(f) + case ".ogg": + if source, err = opusFormat.Open(f); err != nil { + //try flac + if source, err = flacFormat.Open(f); err != nil { + //try vorbis + source, err = vorbisFormat.Open(f) + } + } + case ".opus": + source, err = opusFormat.Open(f) + case ".vorbis": + source, err = vorbisFormat.Open(f) + } + + if err != nil { + return err + } + + if source.Blocks == nil { + return fmt.Errorf("could not find decoder for %s", e.Path) + } + + e.source = source + return nil +} + type QueueMetadataPacket struct { sampleNumber int64 TrackEntry *QueueTrackEntry @@ -110,45 +158,13 @@ var vorbisFormat = vorbis.NewFormat() func (q *Queue) AddTrack(entry *QueueTrackEntry, tail bool) error { - f, err := os.Open(entry.Path) - if err != nil { - return err - } - var source audio.Source - switch strings.ToLower(path.Ext(entry.Path)) { - case ".flac": - source, err = flacFormat.Open(f) - case ".tta": - source, err = ttaFormat.Open(f) - case ".mp3": - source, err = mp3Format.Open(f) - case ".ogg": - if source, err = opusFormat.Open(f); err != nil { - //try flac - if source, err = flacFormat.Open(f); err != nil { - //try vorbis - source, err = vorbisFormat.Open(f) - } - } - case ".opus": - source, err = opusFormat.Open(f) - case ".vorbis": - source, err = vorbisFormat.Open(f) - } - - if err != nil { - f.Close() + if err := entry.Load(); err != nil { return err } - if source.Blocks == nil { - f.Close() - return fmt.Errorf("could not find decoder for %s", entry.Path) - } - - startCallback := func(queue *audio.Queue, entry *audio.QueueEntry) { - log.Printf("now playing %s\n", f.Name()) - if e := q.Get(entry.Identifier); e != nil { + startCallback := func(queue *audio.Queue, queueEntry *audio.QueueEntry) { + if e := q.Get(queueEntry.Identifier); e != nil { //is this needed? + log.Printf("now playing %s\n", e.Path) q.NowPlaying <- e for _, mount := range q.mounts { mount.MetadataQueue.Enqueue(&QueueMetadataPacket{ @@ -157,6 +173,8 @@ func (q *Queue) AddTrack(entry *QueueTrackEntry, tail bool) error { TrackEntry: e, }) } + } else { + log.Printf("now playing %s\n", entry.Path) } } @@ -165,7 +183,6 @@ func (q *Queue) AddTrack(entry *QueueTrackEntry, tail bool) error { } removeCallback := func(queue *audio.Queue, entry *audio.QueueEntry) { - defer f.Close() atomic.AddInt64((*int64)(&q.Duration), int64((time.Second*time.Duration(entry.ReadSamples))/time.Duration(entry.Source.SampleRate))) q.Remove(entry.Identifier) @@ -176,9 +193,9 @@ func (q *Queue) AddTrack(entry *QueueTrackEntry, tail bool) error { defer q.mutex.Unlock() if tail { - entry.QueueIdentifier = q.audioQueue.AddTail(source, startCallback, endCallback, removeCallback) + entry.QueueIdentifier = q.audioQueue.AddTail(entry.source, startCallback, endCallback, removeCallback) } else { - entry.QueueIdentifier = q.audioQueue.AddHead(source, startCallback, endCallback, removeCallback) + entry.QueueIdentifier = q.audioQueue.AddHead(entry.source, startCallback, endCallback, removeCallback) } entry.original["queue_id"] = entry.QueueIdentifier