From 4de5127f6e56526e06b07e4cd225ac53512f9ecc Mon Sep 17 00:00:00 2001 From: WeebDataHoarder <57538841+WeebDataHoarder@users.noreply.github.com> Date: Wed, 2 Mar 2022 00:31:29 +0100 Subject: [PATCH] Initial commit, WiP --- .gitignore | 2 + LICENSE | 9 + MeteorLight.go | 409 ++++++++++++++++++++++++++++++++++++++++++++ README.md | 20 +++ config.go | 33 ++++ example_config.toml | 83 +++++++++ go.mod | 24 +++ go.sum | 43 +++++ mount.go | 256 +++++++++++++++++++++++++++ utilities.go | 30 ++++ 10 files changed, 909 insertions(+) create mode 100644 .gitignore create mode 100644 LICENSE create mode 100644 MeteorLight.go create mode 100644 README.md create mode 100644 config.go create mode 100644 example_config.toml create mode 100644 go.mod create mode 100644 go.sum create mode 100644 mount.go create mode 100644 utilities.go diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..f08a487 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +/test_config.toml +/.idea \ No newline at end of file diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..6303463 --- /dev/null +++ b/LICENSE @@ -0,0 +1,9 @@ +Copyright (c) 2022 MeteorLight Contributors All rights reserved. + +Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met: + +1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer. + +2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/MeteorLight.go b/MeteorLight.go new file mode 100644 index 0000000..70ba439 --- /dev/null +++ b/MeteorLight.go @@ -0,0 +1,409 @@ +package main + +import ( + "encoding/json" + "errors" + "flag" + "fmt" + "git.gammaspectra.live/S.O.N.G/Kirika/audio" + "git.gammaspectra.live/S.O.N.G/Kirika/audio/format/flac" + "git.gammaspectra.live/S.O.N.G/Kirika/audio/format/mp3" + "git.gammaspectra.live/S.O.N.G/Kirika/audio/format/opus" + "git.gammaspectra.live/S.O.N.G/Kirika/audio/format/tta" + "git.gammaspectra.live/S.O.N.G/Kirika/audio/packetizer" + "io" + "io/ioutil" + "log" + "net/http" + "os" + "path" + "strings" + "sync" +) + +var audioQueue = audio.NewQueue(44100, 2, 64) +var flacFormat = flac.NewFormat() +var ttaFormat = tta.NewFormat() +var mp3Format = mp3.NewFormat() +var opusFormat = opus.NewFormat() + +type QueueTrackEntry struct { + QueueIdentifier audio.QueueIdentifier + Path string + Metadata struct { + Title string + Album string + Artist string + Art string + } + + original map[string]interface{} +} + +var queue []*QueueTrackEntry +var queueLock sync.RWMutex + +var config *Config + +func addTrack(entry *QueueTrackEntry, tail bool) error { + + f, err := os.Open(entry.Path) + if err != nil { + return err + } + var source audio.Source + switch strings.ToLower(path.Ext(entry.Path)) { + case ".flac": + source, err = flacFormat.Open(f) + case ".tta": + source, err = ttaFormat.Open(f) + case ".mp3": + source, err = mp3Format.Open(f) + case ".ogg", ".opus": + source, err = opusFormat.Open(f) + } + + if err != nil { + f.Close() + return err + } + + if source.Blocks == nil { + f.Close() + return fmt.Errorf("could not find decoder for %s", entry.Path) + } + + if tail { + entry.QueueIdentifier = audioQueue.AddTail(source, func(q *audio.Queue, entry *audio.QueueEntry) { + log.Printf("now playing %s\n", f.Name()) + go handleQueue() + }, func(q *audio.Queue, entry *audio.QueueEntry) { + log.Printf("finished playing %s\n", f.Name()) + f.Close() + go handleQueueRemove(entry.Identifier) + go handleQueue() + }) + } else { + entry.QueueIdentifier = audioQueue.AddHead(source, func(q *audio.Queue, entry *audio.QueueEntry) { + log.Printf("now playing %s\n", f.Name()) + go handleQueue() + }, func(q *audio.Queue, entry *audio.QueueEntry) { + log.Printf("finished playing %s\n", f.Name()) + f.Close() + go handleQueueRemove(entry.Identifier) + go handleQueue() + }) + } + + entry.original["queue_id"] = entry.QueueIdentifier + + return nil +} + +func handleQueueRemove(identifier audio.QueueIdentifier) { + queueLock.Lock() + defer queueLock.Unlock() + for i, q := range queue { + if q.QueueIdentifier == identifier { + queue = append(queue[:i], queue[i+1:]...) + return + } + } + +} + +func handleQueue() { + if len(audioQueue.GetQueue()) <= 0 { //TODO: pre-queue it, or remove existing track + queueLock.Lock() + defer queueLock.Unlock() + + if e := getRandomTrack(); e != nil { + if err := addTrack(e, true); err != nil { + addTrack(getFallbackTrack(), true) //TODO: how to handle fallback error + } + } else { + addTrack(getFallbackTrack(), true) //TODO: how to handle fallback error + } + } + +} + +func getRandomTrack() *QueueTrackEntry { + response, err := http.DefaultClient.Get(config.Queue.RandomSongApi) + if err != nil { + return nil + } + + body, err := ioutil.ReadAll(response.Body) + if err != nil { + return nil + } + + return getQueueEntryFromBody(body) +} +func getFallbackTrack() *QueueTrackEntry { + m := make(map[string]interface{}) + m["path"] = config.Queue.FallbackPath + return &QueueTrackEntry{ + Path: config.Queue.FallbackPath, + original: m, + } +} + +func getQueueEntryFromBody(body []byte) *QueueTrackEntry { + entry := &QueueTrackEntry{} + err := json.Unmarshal(body, &entry.original) + if err != nil { + return nil + } + + var val interface{} + var strVal string + var ok bool + if val, ok = entry.original["path"]; ok { + if strVal, ok = val.(string); ok { + entry.Path = strVal + } + } + if val, ok = entry.original["title"]; ok { + if strVal, ok = val.(string); ok { + entry.Metadata.Title = strVal + } + } + if val, ok = entry.original["album"]; ok { + if strVal, ok = val.(string); ok { + entry.Metadata.Album = strVal + } + } + if val, ok = entry.original["artist"]; ok { + if strVal, ok = val.(string); ok { + entry.Metadata.Artist = strVal + } + } + if val, ok = entry.original["art"]; ok { + if strVal, ok = val.(string); ok { + entry.Metadata.Art = strVal + } + } + + if len(entry.Path) > 0 { + return entry + } + + return nil +} + +type httpWriter struct { + io.WriteCloser + writer http.ResponseWriter +} + +func (h *httpWriter) Write(p []byte) (n int, err error) { + if h.writer != nil { + _, err = h.writer.Write(p) + if err != nil { + h.writer = nil + } + } + return len(p), nil +} + +func (h *httpWriter) Close() (err error) { + h.writer = nil + return nil +} + +type resultResponse struct { + Success bool `json:"success"` + Reason error `json:"reason"` +} + +func main() { + configPath := flag.String("config", "config.toml", "Config path") + + flag.Parse() + + var err error + + config, err = GetConfig(*configPath) + if err != nil { + log.Panic(err) + } + + var wg sync.WaitGroup + + var mounts []*StreamMount + + handleQueue() + + sources := SplitAudioSource((audio.NewRealTimeFilter(20)).Process(audioQueue.GetSource()), len(config.Streams)) + for i, s := range config.Streams { + mount := NewStreamMount(sources[i], s.MountPath, s.Codec, s.Container, s.Bitrate) + if mount == nil { + log.Panicf("could not initialize %s\n", s.MountPath) + } + mounts = append(mounts, mount) + wg.Add(1) + go mount.Process() + } + + wg.Add(1) + go func() { + defer wg.Done() + + server := http.Server{ + Addr: fmt.Sprintf(":%d", config.Api.Port), + Handler: http.HandlerFunc(func(writer http.ResponseWriter, request *http.Request) { + writer.Header().Set("Server", "MeteorLight/api") + writer.Header().Set("Content-Type", "application/json; charset=utf-8") + switch request.URL.Path { + case "/listeners": + var listeners []*ListenerInformation + for _, mount := range mounts { + listeners = append(listeners, mount.GetListeners()...) + } + + jsonData, _ := json.Marshal(listeners) + writer.Write(jsonData) + case "/queue": + var blobs = make([]map[string]interface{}, 0, 1) + queueLock.RLock() + defer queueLock.RUnlock() + for _, e := range queue { + blobs = append(blobs, e.original) + } + + jsonData, _ := json.Marshal(blobs) + writer.Write(jsonData) + case "/skip": + if request.Method != "POST" { + return + } + + result := resultResponse{} + + q := audioQueue.GetQueue() + if len(q) > 0 { + result.Success = audioQueue.Remove(q[0]) + } else { + result.Reason = errors.New("queue empty") + } + + jsonData, _ := json.Marshal(result) + writer.Write(jsonData) + + case "/queue/clear": + if request.Method != "POST" { + return + } + + result := resultResponse{} + + q := audioQueue.GetQueue() + if len(q) > 0 { + for _, id := range q[1:] { + audioQueue.Remove(id) + } + result.Success = true + } + + jsonData, _ := json.Marshal(result) + writer.Write(jsonData) + + } + }), + } + + if err := server.ListenAndServe(); err != nil { + log.Panic(err) + } + }() + + wg.Add(1) + go func() { + defer wg.Done() + + server := http.Server{ + Addr: fmt.Sprintf(":%d", config.Radio.Port), + Handler: http.HandlerFunc(func(writer http.ResponseWriter, request *http.Request) { + mountName := strings.TrimPrefix(request.URL.Path, "/") + for _, mount := range mounts { + if mount.Mount == mountName { + writer.Header().Set("Server", "MeteorLight/radio") + writer.Header().Set("Content-Type", mount.MimeType) + writer.Header().Set("Accept-Ranges", "none") + writer.Header().Set("Connection", "keep-alive") + writer.Header().Set("X-Audiocast-Name", config.Radio.Name) + writer.Header().Set("Cache-Control", "no-store, max-age=0, no-transform") + writer.Header().Set("X-Content-Type-Options", "nosniff") + + byteWriter := &httpWriter{writer: writer} + + var wgClient sync.WaitGroup + + writeCallback := func(packet packetizer.Packet) error { + /* + select { + case <-request.Context().Done(): + // Client gave up + default: + } + + */ + _, err := byteWriter.Write(packet.GetData()) + return err + } + wgClient.Add(1) + + var headers []HeaderEntry + for k, v := range request.Header { + for _, s := range v { + headers = append(headers, HeaderEntry{ + Name: k, + Value: s, + }) + } + } + + uriPath := request.URL.Path + if len(request.URL.Query().Encode()) > 0 { + uriPath += "?" + request.URL.Query().Encode() + } + + mount.AddListener(&StreamListener{ + Information: ListenerInformation{ + Mount: mountName, + Path: uriPath, + Headers: headers, + }, + Start: func(packets []packetizer.Packet) error { + for _, p := range packets { + if err := writeCallback(p); err != nil { + return err + } + } + return nil + }, + Write: writeCallback, + Close: func() { + byteWriter.Close() + wgClient.Done() + }, + }) + wgClient.Wait() + return + } + } + + writer.WriteHeader(http.StatusNotFound) + return + }), + } + + if err := server.ListenAndServe(); err != nil { + log.Panic(err) + } + }() + + wg.Wait() +} diff --git a/README.md b/README.md new file mode 100644 index 0000000..b8c26d2 --- /dev/null +++ b/README.md @@ -0,0 +1,20 @@ +# MeteorLight + +Radio streamer ([kawa](https://github.com/Luminarys/kawa) drop-in compatible). + +This project is a Work in Progress. TODO: np, API: np queue/head queue/tail, proper handling of audio queue <-> data queue, packetizer buffer + +# Improvements / differences from Kawa +* Does not use libav ([see supported formats/codecs on Kirika](https://git.gammaspectra.live/S.O.N.G/Kirika#codecs-supported)) +* Supports listeners that have more than 16 HTTP headers on their requests. +* Does not restart stream per-track, instead being a continuous stream. +* Normalized channels / sample rates for endpoints. +* Implements ICY metadata (WiP) + +## Dependencies +### Go >= 1.18 + +### [Kirika](https://git.gammaspectra.live/S.O.N.G/Kirika) dependencies +Kirika is a collection of audio utilities for decoding/encoding files and streams. + +Check its native dependencies that must be installed before usage. \ No newline at end of file diff --git a/config.go b/config.go new file mode 100644 index 0000000..11b45f7 --- /dev/null +++ b/config.go @@ -0,0 +1,33 @@ +package main + +import "github.com/BurntSushi/toml" + +type Config struct { + Api struct { + Port int `toml:"port"` + } `toml:"api"` + Queue struct { + RandomSongApi string `toml:"random_song_api"` + NowPlaying string `toml:"np"` + FallbackPath string `toml:"fallback"` + BufferLengthInKiB int `toml:"buffer_len"` + } `toml:"queue"` + Radio struct { + Port int `toml:"port"` + Name string `toml:"name"` + } `toml:"radio"` + Streams []struct { + MountPath string `toml:"mount"` + Container string `toml:"container"` + Bitrate interface{} `toml:"bitrate"` + Codec string `toml:"codec"` + } `toml:"streams"` +} + +func GetConfig(pathName string) (*Config, error) { + config := &Config{} + if _, err := toml.DecodeFile(pathName, config); err != nil { + return nil, err + } + return config, nil +} diff --git a/example_config.toml b/example_config.toml new file mode 100644 index 0000000..e5b80d4 --- /dev/null +++ b/example_config.toml @@ -0,0 +1,83 @@ +# Kawa-compatible configuration file with extensions + +[api] +# +# The HTTP port the Kawa API listens on. Kawa will listen on localhost. +port=4040 + +[queue] +# +# An HTTP GET is sent to this URL when Kawa's queue is empty and it needs a new +# random track to play. The expected response is an arbitrary JSON blob that +# Kawa stores in its queue. At a minimum, it must include the "path" property: +# +# { +# "path": "/path/to/audio/file" +# } +# +# The path is the path to an audio file on the filesystem you want Kawa to play. +random_song_api="http://localhost:8012/api/random" +# +# An HTTP POST is issued to this URL when Kawa starts playing a track. The body +# will be identical to the JSON blob in the queue. +np="http://localhost:8012/api/np" +# +# When no tracks are available for whatever reason (such as external service +# outages), this track will be played. +fallback="/tmp/in.flac" +# Length of buffer to maintain in KiB +buffer_len=4096 + +[radio] +# +# The port to stream actual audio on. Kawa will listen on localhost. +port=8001 +# Name of the stream. +name="my radio" + +# +# A list of streams to make available at [radio.port]/(mount) follows. The +# following properties are available: +# +# mount: the HTTP address to serve the stream from +# container: the container format to use (ogg, flac, aac, or mp3) +# codec: the audio codec to use (opus, vorbis, flac, aac, do not specify for mp3 streams) +# bitrate: the desired bitrate of the stream in Kb/s, if not specified (or 0) an appropriate +# bitrate will be automatically selected based on the container/codec +# MeteorLight extension: bitrate can be a string (for example, v0/v1/v2/v3 on MP3). codec can also be he-aacv2. No vorbis support +[[streams]] +mount="stream128.mp3" +container="mp3" +bitrate=128 + +[[streams]] +mount="stream192.mp3" +container="mp3" +bitrate=192 + +[[streams]] +mount="stream128.aac" +container="aac" +bitrate=128 + +[[streams]] +mount="stream128.opus" +container="ogg" +codec="opus" +bitrate=128 + +[[streams]] +mount="stream192.opus" +container="ogg" +codec="opus" +bitrate=192 + +[[streams]] +mount="stream256.opus" +container="ogg" +codec="opus" +bitrate=256 + +[[streams]] +mount="stream.flac" +container="flac" \ No newline at end of file diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..83d93aa --- /dev/null +++ b/go.mod @@ -0,0 +1,24 @@ +module git.gammaspectra.live/S.O.N.G/MeteorLight + +go 1.18 + +require ( + git.gammaspectra.live/S.O.N.G/Kirika v0.0.0-20220301232239-8254100fb58f + github.com/BurntSushi/toml v1.0.0 +) + +require ( + git.gammaspectra.live/S.O.N.G/go-fdkaac v0.0.0-20220228131722-e9cb84c52f48 // indirect + git.gammaspectra.live/S.O.N.G/go-pus v0.0.0-20220227175608-6cc027f24dba // indirect + git.gammaspectra.live/S.O.N.G/go-tta v0.2.1-0.20220226150007-096de1072bd6 // indirect + git.gammaspectra.live/S.O.N.G/goflac v0.0.0-20220223152921-827e6c3f729f // indirect + github.com/dh1tw/gosamplerate v0.1.2 // indirect + github.com/edgeware/mp4ff v0.26.1 // indirect + github.com/icza/bitio v1.0.0 // indirect + github.com/klauspost/cpuid v1.3.1 // indirect + github.com/kvark128/minimp3 v0.0.0-20211109174940-101188771a65 // indirect + github.com/mewkiz/flac v1.0.7 // indirect + github.com/mewkiz/pkg v0.0.0-20190919212034-518ade7978e2 // indirect + github.com/sssgun/mp3 v0.0.0-20170810093403-85f2ec632081 // indirect + github.com/viert/go-lame v0.0.0-20201108052322-bb552596b11d // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..f11316f --- /dev/null +++ b/go.sum @@ -0,0 +1,43 @@ +git.gammaspectra.live/S.O.N.G/Kirika v0.0.0-20220301232239-8254100fb58f h1:7WbbseQ4WAodtxOf1m7vT4NDphNIe7ScMkGP5HeYNyk= +git.gammaspectra.live/S.O.N.G/Kirika v0.0.0-20220301232239-8254100fb58f/go.mod h1:NYC/3wOINygtTYvAqEtMfgWBeJ/9Gfv0NvDxnWmg+yA= +git.gammaspectra.live/S.O.N.G/go-fdkaac v0.0.0-20220228131722-e9cb84c52f48 h1:MaKiBfXQl0keyfdCi1PxGOKRTiWhIs8PqCal5GhKDi0= +git.gammaspectra.live/S.O.N.G/go-fdkaac v0.0.0-20220228131722-e9cb84c52f48/go.mod h1:pkWt//S9hLVEQaJDPu/cHHPk8vPpo/0+zHy0me4LIP4= +git.gammaspectra.live/S.O.N.G/go-pus v0.0.0-20220227175608-6cc027f24dba h1:JEaxCVgdr3XXAuDCPAx7ttLFZaaHzTEzG+oRnVUtUKU= +git.gammaspectra.live/S.O.N.G/go-pus v0.0.0-20220227175608-6cc027f24dba/go.mod h1:vkoHSHVM9p6vAUmXAik0gvaLcIfiQYrD6bQqVpOulUk= +git.gammaspectra.live/S.O.N.G/go-tta v0.2.1-0.20220226150007-096de1072bd6 h1:ITVVisbHPnUclp3PBkCbXFeBhOCBcOjPdgjJ9wRH3TI= +git.gammaspectra.live/S.O.N.G/go-tta v0.2.1-0.20220226150007-096de1072bd6/go.mod h1:cobkT8u8vq/+ngLy+feKS2M2ZT2HoCec5riA/0Cex3Q= +git.gammaspectra.live/S.O.N.G/goflac v0.0.0-20220223152921-827e6c3f729f h1:4Dkx1l5Ex7pG/Xbs57L4IQd7mBgd6TO5rhP0BKP9PiI= +git.gammaspectra.live/S.O.N.G/goflac v0.0.0-20220223152921-827e6c3f729f/go.mod h1:/po1QgOh3xynbvi4sxdY6Iw8m5WPJfGGmry2boZD8fs= +github.com/BurntSushi/toml v1.0.0 h1:dtDWrepsVPfW9H/4y7dDgFc2MBUSeJhlaDtK13CxFlU= +github.com/BurntSushi/toml v1.0.0/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ= +github.com/cocoonlife/testify v0.0.0-20160218172820-792cc1faeb64 h1:LjPYdzoFSAJ5Tr/ElL8kzTJghXgpnOjJVbgd1UvZB1o= +github.com/d4l3k/messagediff v1.2.2-0.20190829033028-7e0a312ae40b/go.mod h1:Oozbb1TVXFac9FtSIxHBMnBCq2qeH/2KkEQxENCrlLo= +github.com/dh1tw/gosamplerate v0.1.2 h1:oyqtZk67xB9B4l+vIZCZ3F0RYV/z66W58VOah11/ktI= +github.com/dh1tw/gosamplerate v0.1.2/go.mod h1:zooTyHpoR7hE+FLfdE3yjLHb2QA2NpMusNfuaZqEACM= +github.com/edgeware/mp4ff v0.26.1 h1:tH+TIesZZmrA8BN5HuiKWp3sv5NF4N1A2cFxTSCNL8E= +github.com/edgeware/mp4ff v0.26.1/go.mod h1:6VHE5CTkpDseIg775+rh8BfnTvqjMnVbz5EDU4QwSdc= +github.com/go-audio/audio v1.0.0/go.mod h1:6uAu0+H2lHkwdGsAY+j2wHPNPpPoeg5AaEFh9FlA+Zs= +github.com/go-audio/riff v1.0.0/go.mod h1:l3cQwc85y79NQFCRB7TiPoNiaijp6q8Z0Uv38rVG498= +github.com/go-audio/wav v1.0.0/go.mod h1:3yoReyQOsiARkvPl3ERCi8JFjihzG6WhjYpZCf5zAWE= +github.com/go-test/deep v1.0.6 h1:UHSEyLZUwX9Qoi99vVwvewiMC8mM2bf7XEM2nqvzEn8= +github.com/go-test/deep v1.0.6/go.mod h1:QV8Hv/iy04NyLBxAdO9njL0iVPN1S4d/A3NVv1V36o8= +github.com/icza/bitio v1.0.0 h1:squ/m1SHyFeCA6+6Gyol1AxV9nmPPlJFT8c2vKdj3U8= +github.com/icza/bitio v1.0.0/go.mod h1:0jGnlLAx8MKMr9VGnn/4YrvZiprkvBelsVIbA9Jjr9A= +github.com/icza/mighty v0.0.0-20180919140131-cfd07d671de6 h1:8UsGZ2rr2ksmEru6lToqnXgA8Mz1DP11X4zSJ159C3k= +github.com/icza/mighty v0.0.0-20180919140131-cfd07d671de6/go.mod h1:xQig96I1VNBDIWGCdTt54nHt6EeI639SmHycLYL7FkA= +github.com/klauspost/cpuid v1.3.1 h1:5JNjFYYQrZeKRJ0734q51WCEEn2huer72Dc7K+R/b6s= +github.com/klauspost/cpuid v1.3.1/go.mod h1:bYW4mA6ZgKPob1/Dlai2LviZJO7KGI3uoWLd42rAQw4= +github.com/kvark128/minimp3 v0.0.0-20211109174940-101188771a65 h1:8qfVQv7MSACDXadEwl1yjUKJ68yC9B7nR4cioEoCfH0= +github.com/kvark128/minimp3 v0.0.0-20211109174940-101188771a65/go.mod h1:hIq9nAqNcwTySvnFhCe1C8xC/STIr2Fe5vJ52zk1jkE= +github.com/mewkiz/flac v1.0.7 h1:uIXEjnuXqdRaZttmSFM5v5Ukp4U6orrZsnYGGR3yow8= +github.com/mewkiz/flac v1.0.7/go.mod h1:yU74UH277dBUpqxPouHSQIar3G1X/QIclVbFahSd1pU= +github.com/mewkiz/pkg v0.0.0-20190919212034-518ade7978e2 h1:EyTNMdePWaoWsRSGQnXiSoQu0r6RS1eA557AwJhlzHU= +github.com/mewkiz/pkg v0.0.0-20190919212034-518ade7978e2/go.mod h1:3E2FUC/qYUfM8+r9zAwpeHJzqRVVMIYnpzD/clwWxyA= +github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/sssgun/mp3 v0.0.0-20170810093403-85f2ec632081 h1:Qo/HswJzVywl0podyXMD62HIohsj/Ij2oXbD26aUIxM= +github.com/sssgun/mp3 v0.0.0-20170810093403-85f2ec632081/go.mod h1:ExwZtltybPz8zLO8c2lKRfpPk1HAxhrkp038QIBs+yg= +github.com/viert/go-lame v0.0.0-20201108052322-bb552596b11d h1:LptdD7GTUZeklomtW5vZ1AHwBvDBUCZ2Ftpaz7uEI7g= +github.com/viert/go-lame v0.0.0-20201108052322-bb552596b11d/go.mod h1:EqTcYM7y4JlSfeTI47pmNu3EZQuCuLQefsQyg1Imlz8= +golang.org/x/image v0.0.0-20190220214146-31aff87c08e9/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js= +golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= diff --git a/mount.go b/mount.go new file mode 100644 index 0000000..9000037 --- /dev/null +++ b/mount.go @@ -0,0 +1,256 @@ +package main + +import ( + "git.gammaspectra.live/S.O.N.G/Kirika/audio" + "git.gammaspectra.live/S.O.N.G/Kirika/audio/format" + "git.gammaspectra.live/S.O.N.G/Kirika/audio/format/aac" + "git.gammaspectra.live/S.O.N.G/Kirika/audio/format/flac" + "git.gammaspectra.live/S.O.N.G/Kirika/audio/format/mp3" + "git.gammaspectra.live/S.O.N.G/Kirika/audio/format/opus" + "git.gammaspectra.live/S.O.N.G/Kirika/audio/packetizer" + "io" + "log" + "sync" +) + +type HeaderEntry struct { + Name string `json:"name"` + Value string `json:"value"` +} + +type ListenerInformation struct { + Mount string `json:"mount"` + Path string `json:"path"` + Headers []HeaderEntry `json:"headers"` +} + +type StreamListener struct { + Information ListenerInformation + Start func(packets []packetizer.Packet) error + Write func(packet packetizer.Packet) error + Close func() +} +type StreamMount struct { + Mount string + MimeType string + Packetizer packetizer.Packetizer + + listeners []*StreamListener + listenersLock sync.Mutex + keepBuffer []packetizer.Packet +} + +func NewStreamMount(source audio.Source, mount string, codec string, container string, bitrate interface{}) *StreamMount { + var encoderFormat format.Encoder + options := make(map[string]interface{}) + var mimeType string + + reader, writer := io.Pipe() + var packetizerEntry packetizer.Packetizer + + switch codec { + case "opus": + encoderFormat = opus.NewFormat() + if bitrate != nil { + options["bitrate"] = bitrate + } + mimeType = "audio/ogg;codecs=opus" + packetizerEntry = packetizer.NewOggPacketizer(reader) + case "mp3": + encoderFormat = mp3.NewFormat() + if bitrate != nil { + options["bitrate"] = bitrate + } + mimeType = "audio/mpeg;codecs=mp3" + packetizerEntry = packetizer.NewMp3Packetizer(reader) + case "flac": + encoderFormat = flac.NewFormat() + if bitrate != nil { + options["bitdepth"] = bitrate + } + options["compression_level"] = 8 + mimeType = "audio/flac" + packetizerEntry = packetizer.NewFLACPacketizer(reader) + case "aac": + encoderFormat = aac.NewFormat() + if bitrate != nil { + options["bitrate"] = bitrate + } + mimeType = "audio/aac" + packetizerEntry = packetizer.NewAdtsPacketizer(reader) + case "he-aacv2": + encoderFormat = aac.NewFormat() + if bitrate != nil { + options["bitrate"] = bitrate + } + options["mode"] = "hev2" + mimeType = "audio/aac" + packetizerEntry = packetizer.NewAdtsPacketizer(reader) + } + + if encoderFormat == nil { + switch container { + case "ogg": + encoderFormat = opus.NewFormat() + if bitrate != nil { + options["bitrate"] = bitrate + } + mimeType = "audio/ogg;codecs=opus" + packetizerEntry = packetizer.NewOggPacketizer(reader) + case "mp3": + encoderFormat = mp3.NewFormat() + if bitrate != nil { + options["bitrate"] = bitrate + } + mimeType = "audio/mpeg;codecs=mp3" + packetizerEntry = packetizer.NewMp3Packetizer(reader) + case "flac": + encoderFormat = flac.NewFormat() + if bitrate != nil { + options["bitdepth"] = bitrate + } + options["compression_level"] = 8 + mimeType = "audio/flac" + packetizerEntry = packetizer.NewFLACPacketizer(reader) + case "adts", "aac": + encoderFormat = aac.NewFormat() + if bitrate != nil { + options["bitrate"] = bitrate + } + mimeType = "audio/aac" + packetizerEntry = packetizer.NewAdtsPacketizer(reader) + } + } + + if encoderFormat == nil || packetizerEntry == nil { + return nil + } + + if opusFormat, ok := encoderFormat.(opus.Format); ok { + go func() { + defer writer.Close() + if err := opusFormat.Encode(audio.NewResampleFilter(opus.FixedSampleRate, audio.Linear, 0).Process(source), writer, options); err != nil { + log.Panic(err) + } + }() + } else { + go func() { + defer writer.Close() + if err := encoderFormat.Encode(source, writer, options); err != nil { + log.Panic(err) + } + }() + } + + return &StreamMount{ + Mount: mount, + MimeType: mimeType, + Packetizer: packetizerEntry, + } +} + +func (m *StreamMount) removeDiscard() { + for i, p := range m.keepBuffer { + if p.KeepMode() == packetizer.Discard { + m.keepBuffer = append(m.keepBuffer[:i], m.keepBuffer[i+1:]...) + m.removeDiscard() + break + } + } +} + +func (m *StreamMount) removeKeepLast() { + for i, p := range m.keepBuffer { + if p.KeepMode() == packetizer.KeepLast { + m.keepBuffer = append(m.keepBuffer[:i], m.keepBuffer[i+1:]...) + m.removeKeepLast() + break + } + } +} + +func (m *StreamMount) removeGroupKeep() { + for i, p := range m.keepBuffer { + if p.KeepMode() == packetizer.GroupKeep { + m.keepBuffer = append(m.keepBuffer[:i], m.keepBuffer[i+1:]...) + m.removeGroupKeep() + break + } + } +} + +func (m *StreamMount) AddListener(listener *StreamListener) { + m.listenersLock.Lock() + defer m.listenersLock.Unlock() + m.listeners = append(m.listeners, listener) +} + +func (m *StreamMount) GetListeners() (entries []*ListenerInformation) { + m.listenersLock.Lock() + defer m.listenersLock.Unlock() + for _, l := range m.listeners { + entries = append(entries, &l.Information) + } + + return +} + +func (m *StreamMount) Process() { + defer func() { + m.listenersLock.Lock() + for _, l := range m.listeners { + l.Close() + } + m.listeners = m.listeners[:0] + m.listenersLock.Unlock() + }() + var toRemove []int + for { + packet := m.Packetizer.GetPacket() + if packet == nil { + return + } + + //TODO: do this via goroutine messaging? + for i, l := range m.listeners { + if l.Start != nil { + l.Start(m.keepBuffer) + l.Start = nil + } + if l.Write(packet) != nil { + toRemove = append(toRemove, i) + l.Close() + } + } + + if len(toRemove) > 0 { + m.listenersLock.Lock() + //TODO: remove more than one per iteration + for _, i := range toRemove { + m.listeners = append(m.listeners[:i], m.listeners[i+1:]...) + break + } + m.listenersLock.Unlock() + toRemove = toRemove[:0] + } + + m.removeDiscard() //always remove discards + + switch packet.KeepMode() { + case packetizer.KeepLast: + m.removeKeepLast() + fallthrough + case packetizer.Keep: + m.keepBuffer = append(m.keepBuffer, packet) + case packetizer.GroupKeep: + m.keepBuffer = append(m.keepBuffer, packet) + case packetizer.GroupDiscard: + m.removeGroupKeep() + case packetizer.Discard: + m.keepBuffer = append(m.keepBuffer, packet) + + } + + } + +} diff --git a/utilities.go b/utilities.go new file mode 100644 index 0000000..cf715a9 --- /dev/null +++ b/utilities.go @@ -0,0 +1,30 @@ +package main + +import "git.gammaspectra.live/S.O.N.G/Kirika/audio" + +func SplitAudioSource(source audio.Source, split int) (sources []audio.Source) { + for i := 0; i < split; i++ { + sources = append(sources, audio.Source{ + SampleRate: source.SampleRate, + Channels: source.Channels, + Blocks: make(chan []float32), + }) + } + + go func() { + defer func() { + for _, s := range sources { + close(s.Blocks) + } + }() + + for block := range source.Blocks { + //TODO: this might block? + for _, s := range sources { + s.Blocks <- block + } + } + }() + + return +}