package main import ( "encoding/json" "errors" "flag" "fmt" "git.gammaspectra.live/S.O.N.G/Kirika/audio" "git.gammaspectra.live/S.O.N.G/Kirika/audio/format/flac" "git.gammaspectra.live/S.O.N.G/Kirika/audio/format/mp3" "git.gammaspectra.live/S.O.N.G/Kirika/audio/format/opus" "git.gammaspectra.live/S.O.N.G/Kirika/audio/format/tta" "git.gammaspectra.live/S.O.N.G/Kirika/audio/packetizer" "io" "io/ioutil" "log" "net/http" "os" "path" "strings" "sync" ) var audioQueue = audio.NewQueue(44100, 2, 64) var flacFormat = flac.NewFormat() var ttaFormat = tta.NewFormat() var mp3Format = mp3.NewFormat() var opusFormat = opus.NewFormat() type QueueTrackEntry struct { QueueIdentifier audio.QueueIdentifier Path string Metadata struct { Title string Album string Artist string Art string } original map[string]interface{} } var queue []*QueueTrackEntry var queueLock sync.RWMutex var config *Config func addTrack(entry *QueueTrackEntry, tail bool) error { f, err := os.Open(entry.Path) if err != nil { return err } var source audio.Source switch strings.ToLower(path.Ext(entry.Path)) { case ".flac": source, err = flacFormat.Open(f) case ".tta": source, err = ttaFormat.Open(f) case ".mp3": source, err = mp3Format.Open(f) case ".ogg", ".opus": source, err = opusFormat.Open(f) } if err != nil { f.Close() return err } if source.Blocks == nil { f.Close() return fmt.Errorf("could not find decoder for %s", entry.Path) } if tail { entry.QueueIdentifier = audioQueue.AddTail(source, func(q *audio.Queue, entry *audio.QueueEntry) { log.Printf("now playing %s\n", f.Name()) go handleQueue() }, func(q *audio.Queue, entry *audio.QueueEntry) { log.Printf("finished playing %s\n", f.Name()) f.Close() go handleQueueRemove(entry.Identifier) go handleQueue() }) } else { entry.QueueIdentifier = audioQueue.AddHead(source, func(q *audio.Queue, entry *audio.QueueEntry) { log.Printf("now playing %s\n", f.Name()) go handleQueue() }, func(q *audio.Queue, entry *audio.QueueEntry) { log.Printf("finished playing %s\n", f.Name()) f.Close() go handleQueueRemove(entry.Identifier) go handleQueue() }) } entry.original["queue_id"] = entry.QueueIdentifier return nil } func handleQueueRemove(identifier audio.QueueIdentifier) { queueLock.Lock() defer queueLock.Unlock() for i, q := range queue { if q.QueueIdentifier == identifier { queue = append(queue[:i], queue[i+1:]...) return } } } func handleQueue() { if len(audioQueue.GetQueue()) <= 0 { //TODO: pre-queue it, or remove existing track queueLock.Lock() defer queueLock.Unlock() if e := getRandomTrack(); e != nil { if err := addTrack(e, true); err != nil { addTrack(getFallbackTrack(), true) //TODO: how to handle fallback error } } else { addTrack(getFallbackTrack(), true) //TODO: how to handle fallback error } } } func getRandomTrack() *QueueTrackEntry { response, err := http.DefaultClient.Get(config.Queue.RandomSongApi) if err != nil { return nil } body, err := ioutil.ReadAll(response.Body) if err != nil { return nil } return getQueueEntryFromBody(body) } func getFallbackTrack() *QueueTrackEntry { m := make(map[string]interface{}) m["path"] = config.Queue.FallbackPath return &QueueTrackEntry{ Path: config.Queue.FallbackPath, original: m, } } func getQueueEntryFromBody(body []byte) *QueueTrackEntry { entry := &QueueTrackEntry{} err := json.Unmarshal(body, &entry.original) if err != nil { return nil } var val interface{} var strVal string var ok bool if val, ok = entry.original["path"]; ok { if strVal, ok = val.(string); ok { entry.Path = strVal } } if val, ok = entry.original["title"]; ok { if strVal, ok = val.(string); ok { entry.Metadata.Title = strVal } } if val, ok = entry.original["album"]; ok { if strVal, ok = val.(string); ok { entry.Metadata.Album = strVal } } if val, ok = entry.original["artist"]; ok { if strVal, ok = val.(string); ok { entry.Metadata.Artist = strVal } } if val, ok = entry.original["art"]; ok { if strVal, ok = val.(string); ok { entry.Metadata.Art = strVal } } if len(entry.Path) > 0 { return entry } return nil } type httpWriter struct { io.WriteCloser writer http.ResponseWriter } func (h *httpWriter) Write(p []byte) (n int, err error) { if h.writer != nil { _, err = h.writer.Write(p) if err != nil { h.writer = nil } } return len(p), nil } func (h *httpWriter) Close() (err error) { h.writer = nil return nil } type resultResponse struct { Success bool `json:"success"` Reason error `json:"reason"` } func main() { configPath := flag.String("config", "config.toml", "Config path") flag.Parse() var err error config, err = GetConfig(*configPath) if err != nil { log.Panic(err) } var wg sync.WaitGroup var mounts []*StreamMount handleQueue() sources := SplitAudioSource((audio.NewRealTimeFilter(20)).Process(audioQueue.GetSource()), len(config.Streams)) for i, s := range config.Streams { mount := NewStreamMount(sources[i], s.MountPath, s.Codec, s.Container, s.Bitrate) if mount == nil { log.Panicf("could not initialize %s\n", s.MountPath) } mounts = append(mounts, mount) wg.Add(1) go mount.Process() } wg.Add(1) go func() { defer wg.Done() server := http.Server{ Addr: fmt.Sprintf(":%d", config.Api.Port), Handler: http.HandlerFunc(func(writer http.ResponseWriter, request *http.Request) { writer.Header().Set("Server", "MeteorLight/api") writer.Header().Set("Content-Type", "application/json; charset=utf-8") switch request.URL.Path { case "/listeners": var listeners []*ListenerInformation for _, mount := range mounts { listeners = append(listeners, mount.GetListeners()...) } jsonData, _ := json.Marshal(listeners) writer.Write(jsonData) case "/queue": var blobs = make([]map[string]interface{}, 0, 1) queueLock.RLock() defer queueLock.RUnlock() for _, e := range queue { blobs = append(blobs, e.original) } jsonData, _ := json.Marshal(blobs) writer.Write(jsonData) case "/skip": if request.Method != "POST" { return } result := resultResponse{} q := audioQueue.GetQueue() if len(q) > 0 { result.Success = audioQueue.Remove(q[0]) } else { result.Reason = errors.New("queue empty") } jsonData, _ := json.Marshal(result) writer.Write(jsonData) case "/queue/clear": if request.Method != "POST" { return } result := resultResponse{} q := audioQueue.GetQueue() if len(q) > 0 { for _, id := range q[1:] { audioQueue.Remove(id) } result.Success = true } jsonData, _ := json.Marshal(result) writer.Write(jsonData) } }), } if err := server.ListenAndServe(); err != nil { log.Panic(err) } }() wg.Add(1) go func() { defer wg.Done() server := http.Server{ Addr: fmt.Sprintf(":%d", config.Radio.Port), Handler: http.HandlerFunc(func(writer http.ResponseWriter, request *http.Request) { mountName := strings.TrimPrefix(request.URL.Path, "/") for _, mount := range mounts { if mount.Mount == mountName { writer.Header().Set("Server", "MeteorLight/radio") writer.Header().Set("Content-Type", mount.MimeType) writer.Header().Set("Accept-Ranges", "none") writer.Header().Set("Connection", "keep-alive") writer.Header().Set("X-Audiocast-Name", config.Radio.Name) writer.Header().Set("Cache-Control", "no-store, max-age=0, no-transform") writer.Header().Set("X-Content-Type-Options", "nosniff") byteWriter := &httpWriter{writer: writer} var wgClient sync.WaitGroup writeCallback := func(packet packetizer.Packet) error { /* select { case <-request.Context().Done(): // Client gave up default: } */ _, err := byteWriter.Write(packet.GetData()) return err } wgClient.Add(1) var headers []HeaderEntry for k, v := range request.Header { for _, s := range v { headers = append(headers, HeaderEntry{ Name: k, Value: s, }) } } uriPath := request.URL.Path if len(request.URL.Query().Encode()) > 0 { uriPath += "?" + request.URL.Query().Encode() } mount.AddListener(&StreamListener{ Information: ListenerInformation{ Mount: mountName, Path: uriPath, Headers: headers, }, Start: func(packets []packetizer.Packet) error { for _, p := range packets { if err := writeCallback(p); err != nil { return err } } return nil }, Write: writeCallback, Close: func() { byteWriter.Close() wgClient.Done() }, }) wgClient.Wait() return } } writer.WriteHeader(http.StatusNotFound) return }), } if err := server.ListenAndServe(); err != nil { log.Panic(err) } }() wg.Wait() }