MeteorLight/MeteorLight.go

410 lines
9.0 KiB
Go

package main
import (
"encoding/json"
"errors"
"flag"
"fmt"
"git.gammaspectra.live/S.O.N.G/Kirika/audio"
"git.gammaspectra.live/S.O.N.G/Kirika/audio/format/flac"
"git.gammaspectra.live/S.O.N.G/Kirika/audio/format/mp3"
"git.gammaspectra.live/S.O.N.G/Kirika/audio/format/opus"
"git.gammaspectra.live/S.O.N.G/Kirika/audio/format/tta"
"git.gammaspectra.live/S.O.N.G/Kirika/audio/packetizer"
"io"
"io/ioutil"
"log"
"net/http"
"os"
"path"
"strings"
"sync"
)
var audioQueue = audio.NewQueue(44100, 2, 64)
var flacFormat = flac.NewFormat()
var ttaFormat = tta.NewFormat()
var mp3Format = mp3.NewFormat()
var opusFormat = opus.NewFormat()
type QueueTrackEntry struct {
QueueIdentifier audio.QueueIdentifier
Path string
Metadata struct {
Title string
Album string
Artist string
Art string
}
original map[string]interface{}
}
var queue []*QueueTrackEntry
var queueLock sync.RWMutex
var config *Config
func addTrack(entry *QueueTrackEntry, tail bool) error {
f, err := os.Open(entry.Path)
if err != nil {
return err
}
var source audio.Source
switch strings.ToLower(path.Ext(entry.Path)) {
case ".flac":
source, err = flacFormat.Open(f)
case ".tta":
source, err = ttaFormat.Open(f)
case ".mp3":
source, err = mp3Format.Open(f)
case ".ogg", ".opus":
source, err = opusFormat.Open(f)
}
if err != nil {
f.Close()
return err
}
if source.Blocks == nil {
f.Close()
return fmt.Errorf("could not find decoder for %s", entry.Path)
}
if tail {
entry.QueueIdentifier = audioQueue.AddTail(source, func(q *audio.Queue, entry *audio.QueueEntry) {
log.Printf("now playing %s\n", f.Name())
go handleQueue()
}, func(q *audio.Queue, entry *audio.QueueEntry) {
log.Printf("finished playing %s\n", f.Name())
f.Close()
go handleQueueRemove(entry.Identifier)
go handleQueue()
})
} else {
entry.QueueIdentifier = audioQueue.AddHead(source, func(q *audio.Queue, entry *audio.QueueEntry) {
log.Printf("now playing %s\n", f.Name())
go handleQueue()
}, func(q *audio.Queue, entry *audio.QueueEntry) {
log.Printf("finished playing %s\n", f.Name())
f.Close()
go handleQueueRemove(entry.Identifier)
go handleQueue()
})
}
entry.original["queue_id"] = entry.QueueIdentifier
return nil
}
func handleQueueRemove(identifier audio.QueueIdentifier) {
queueLock.Lock()
defer queueLock.Unlock()
for i, q := range queue {
if q.QueueIdentifier == identifier {
queue = append(queue[:i], queue[i+1:]...)
return
}
}
}
func handleQueue() {
if len(audioQueue.GetQueue()) <= 0 { //TODO: pre-queue it, or remove existing track
queueLock.Lock()
defer queueLock.Unlock()
if e := getRandomTrack(); e != nil {
if err := addTrack(e, true); err != nil {
addTrack(getFallbackTrack(), true) //TODO: how to handle fallback error
}
} else {
addTrack(getFallbackTrack(), true) //TODO: how to handle fallback error
}
}
}
func getRandomTrack() *QueueTrackEntry {
response, err := http.DefaultClient.Get(config.Queue.RandomSongApi)
if err != nil {
return nil
}
body, err := ioutil.ReadAll(response.Body)
if err != nil {
return nil
}
return getQueueEntryFromBody(body)
}
func getFallbackTrack() *QueueTrackEntry {
m := make(map[string]interface{})
m["path"] = config.Queue.FallbackPath
return &QueueTrackEntry{
Path: config.Queue.FallbackPath,
original: m,
}
}
func getQueueEntryFromBody(body []byte) *QueueTrackEntry {
entry := &QueueTrackEntry{}
err := json.Unmarshal(body, &entry.original)
if err != nil {
return nil
}
var val interface{}
var strVal string
var ok bool
if val, ok = entry.original["path"]; ok {
if strVal, ok = val.(string); ok {
entry.Path = strVal
}
}
if val, ok = entry.original["title"]; ok {
if strVal, ok = val.(string); ok {
entry.Metadata.Title = strVal
}
}
if val, ok = entry.original["album"]; ok {
if strVal, ok = val.(string); ok {
entry.Metadata.Album = strVal
}
}
if val, ok = entry.original["artist"]; ok {
if strVal, ok = val.(string); ok {
entry.Metadata.Artist = strVal
}
}
if val, ok = entry.original["art"]; ok {
if strVal, ok = val.(string); ok {
entry.Metadata.Art = strVal
}
}
if len(entry.Path) > 0 {
return entry
}
return nil
}
type httpWriter struct {
io.WriteCloser
writer http.ResponseWriter
}
func (h *httpWriter) Write(p []byte) (n int, err error) {
if h.writer != nil {
_, err = h.writer.Write(p)
if err != nil {
h.writer = nil
}
}
return len(p), nil
}
func (h *httpWriter) Close() (err error) {
h.writer = nil
return nil
}
type resultResponse struct {
Success bool `json:"success"`
Reason error `json:"reason"`
}
func main() {
configPath := flag.String("config", "config.toml", "Config path")
flag.Parse()
var err error
config, err = GetConfig(*configPath)
if err != nil {
log.Panic(err)
}
var wg sync.WaitGroup
var mounts []*StreamMount
handleQueue()
sources := SplitAudioSource((audio.NewRealTimeFilter(20)).Process(audioQueue.GetSource()), len(config.Streams))
for i, s := range config.Streams {
mount := NewStreamMount(sources[i], s.MountPath, s.Codec, s.Container, s.Bitrate)
if mount == nil {
log.Panicf("could not initialize %s\n", s.MountPath)
}
mounts = append(mounts, mount)
wg.Add(1)
go mount.Process()
}
wg.Add(1)
go func() {
defer wg.Done()
server := http.Server{
Addr: fmt.Sprintf(":%d", config.Api.Port),
Handler: http.HandlerFunc(func(writer http.ResponseWriter, request *http.Request) {
writer.Header().Set("Server", "MeteorLight/api")
writer.Header().Set("Content-Type", "application/json; charset=utf-8")
switch request.URL.Path {
case "/listeners":
var listeners []*ListenerInformation
for _, mount := range mounts {
listeners = append(listeners, mount.GetListeners()...)
}
jsonData, _ := json.Marshal(listeners)
writer.Write(jsonData)
case "/queue":
var blobs = make([]map[string]interface{}, 0, 1)
queueLock.RLock()
defer queueLock.RUnlock()
for _, e := range queue {
blobs = append(blobs, e.original)
}
jsonData, _ := json.Marshal(blobs)
writer.Write(jsonData)
case "/skip":
if request.Method != "POST" {
return
}
result := resultResponse{}
q := audioQueue.GetQueue()
if len(q) > 0 {
result.Success = audioQueue.Remove(q[0])
} else {
result.Reason = errors.New("queue empty")
}
jsonData, _ := json.Marshal(result)
writer.Write(jsonData)
case "/queue/clear":
if request.Method != "POST" {
return
}
result := resultResponse{}
q := audioQueue.GetQueue()
if len(q) > 0 {
for _, id := range q[1:] {
audioQueue.Remove(id)
}
result.Success = true
}
jsonData, _ := json.Marshal(result)
writer.Write(jsonData)
}
}),
}
if err := server.ListenAndServe(); err != nil {
log.Panic(err)
}
}()
wg.Add(1)
go func() {
defer wg.Done()
server := http.Server{
Addr: fmt.Sprintf(":%d", config.Radio.Port),
Handler: http.HandlerFunc(func(writer http.ResponseWriter, request *http.Request) {
mountName := strings.TrimPrefix(request.URL.Path, "/")
for _, mount := range mounts {
if mount.Mount == mountName {
writer.Header().Set("Server", "MeteorLight/radio")
writer.Header().Set("Content-Type", mount.MimeType)
writer.Header().Set("Accept-Ranges", "none")
writer.Header().Set("Connection", "keep-alive")
writer.Header().Set("X-Audiocast-Name", config.Radio.Name)
writer.Header().Set("Cache-Control", "no-store, max-age=0, no-transform")
writer.Header().Set("X-Content-Type-Options", "nosniff")
byteWriter := &httpWriter{writer: writer}
var wgClient sync.WaitGroup
writeCallback := func(packet packetizer.Packet) error {
/*
select {
case <-request.Context().Done():
// Client gave up
default:
}
*/
_, err := byteWriter.Write(packet.GetData())
return err
}
wgClient.Add(1)
var headers []HeaderEntry
for k, v := range request.Header {
for _, s := range v {
headers = append(headers, HeaderEntry{
Name: k,
Value: s,
})
}
}
uriPath := request.URL.Path
if len(request.URL.Query().Encode()) > 0 {
uriPath += "?" + request.URL.Query().Encode()
}
mount.AddListener(&StreamListener{
Information: ListenerInformation{
Mount: mountName,
Path: uriPath,
Headers: headers,
},
Start: func(packets []packetizer.Packet) error {
for _, p := range packets {
if err := writeCallback(p); err != nil {
return err
}
}
return nil
},
Write: writeCallback,
Close: func() {
byteWriter.Close()
wgClient.Done()
},
})
wgClient.Wait()
return
}
}
writer.WriteHeader(http.StatusNotFound)
return
}),
}
if err := server.ListenAndServe(); err != nil {
log.Panic(err)
}
}()
wg.Wait()
}