DataHoarder
05a8ed380a
All checks were successful
continuous-integration/drone/push Build is passing
359 lines
9.8 KiB
Go
359 lines
9.8 KiB
Go
package main
|
|
|
|
import (
|
|
"git.gammaspectra.live/S.O.N.G/Kirika/audio"
|
|
"git.gammaspectra.live/S.O.N.G/Kirika/audio/filter"
|
|
"git.gammaspectra.live/S.O.N.G/Kirika/audio/format"
|
|
"git.gammaspectra.live/S.O.N.G/Kirika/audio/format/aac"
|
|
"git.gammaspectra.live/S.O.N.G/Kirika/audio/format/flac"
|
|
"git.gammaspectra.live/S.O.N.G/Kirika/audio/format/mp3"
|
|
"git.gammaspectra.live/S.O.N.G/Kirika/audio/format/opus"
|
|
"git.gammaspectra.live/S.O.N.G/Kirika/audio/format/vorbis"
|
|
"git.gammaspectra.live/S.O.N.G/Kirika/audio/packetizer"
|
|
"github.com/enriquebris/goconcurrentqueue"
|
|
"io"
|
|
"log"
|
|
"sync"
|
|
)
|
|
|
|
type HeaderEntry struct {
|
|
Name string `json:"name"`
|
|
Value string `json:"value"`
|
|
}
|
|
|
|
type ListenerInformation struct {
|
|
Identifier string `json:"identifier"`
|
|
Mount string `json:"mount"`
|
|
Path string `json:"path"`
|
|
Headers []HeaderEntry `json:"headers"`
|
|
}
|
|
|
|
type StreamListener struct {
|
|
Information ListenerInformation
|
|
Start func(packets []packetizer.Packet) error
|
|
Write func(packet packetizer.Packet) error
|
|
Close func()
|
|
}
|
|
type StreamMount struct {
|
|
Mount string
|
|
MimeType string
|
|
FormatDescription string
|
|
Packetizer packetizer.Packetizer
|
|
Options map[string]interface{}
|
|
SampleRate int
|
|
Channels int
|
|
OffsetStart bool
|
|
MetadataQueue *goconcurrentqueue.FIFO
|
|
listeners []*StreamListener
|
|
listenersLock sync.RWMutex
|
|
keepBuffer []packetizer.Packet
|
|
}
|
|
|
|
func NewStreamMount(source audio.Source, config *StreamConfig) *StreamMount {
|
|
var encoderFormat format.Encoder
|
|
options := make(map[string]interface{})
|
|
var mimeType string
|
|
|
|
reader, writer := io.Pipe()
|
|
var packetizerEntry packetizer.Packetizer
|
|
|
|
bitrate := config.GetOption("bitrate", nil)
|
|
|
|
sampleRate := config.GetIntOption("samplerate", source.GetSampleRate())
|
|
|
|
channels := config.GetIntOption("channels", source.GetChannels())
|
|
|
|
switch config.Codec {
|
|
case "vorbis":
|
|
encoderFormat = vorbis.NewFormat()
|
|
if bitrate != nil {
|
|
options["bitrate"] = bitrate
|
|
}
|
|
mimeType = "audio/ogg;codecs=vorbis"
|
|
packetizerEntry = packetizer.NewOggPacketizer(reader)
|
|
case "opus":
|
|
encoderFormat = opus.NewFormat()
|
|
if bitrate != nil {
|
|
options["bitrate"] = bitrate
|
|
}
|
|
options["complexity"] = config.GetIntOption("complexity", 10)
|
|
//Lower delay than just one packet per second default
|
|
options["delay"] = 300
|
|
sampleRate = opus.FixedSampleRate
|
|
mimeType = "audio/ogg;codecs=opus"
|
|
packetizerEntry = packetizer.NewOggPacketizer(reader)
|
|
case "mp3":
|
|
encoderFormat = mp3.NewFormat()
|
|
if bitrate != nil {
|
|
options["bitrate"] = bitrate
|
|
}
|
|
mimeType = "audio/mpeg;codecs=mp3"
|
|
packetizerEntry = packetizer.NewMp3Packetizer(reader)
|
|
case "flac":
|
|
encoderFormat = flac.NewFormat()
|
|
|
|
options["bitdepth"] = config.GetIntOption("bitdepth", 16)
|
|
options["compression_level"] = config.GetIntOption("compression_level", 8)
|
|
options["block_size"] = config.GetIntOption("block_size", 0)
|
|
|
|
mimeType = "audio/flac"
|
|
if config.Container == "ogg" {
|
|
options["ogg"] = true
|
|
mimeType = "audio/ogg;codecs=flac"
|
|
packetizerEntry = packetizer.NewOggPacketizer(reader)
|
|
} else {
|
|
packetizerEntry = packetizer.NewFLACPacketizer(reader)
|
|
}
|
|
case "aac":
|
|
encoderFormat = aac.NewFormat()
|
|
if bitrate != nil {
|
|
options["bitrate"] = bitrate
|
|
}
|
|
options["mode"] = config.GetStringOption("mode", "lc")
|
|
options["afterburner"] = config.GetBoolOption("afterburner", true)
|
|
mimeType = "audio/aac;codecs=mp4a.40.2"
|
|
if options["mode"] == "he" || options["mode"] == "hev1" || options["bitrate"] == "vbr2" {
|
|
mimeType = "audio/aac;codecs=mp4a.40.5"
|
|
} else if options["mode"] == "hev2" || options["bitrate"] == "vbr1" {
|
|
mimeType = "audio/aac;codecs=mp4a.40.29"
|
|
}
|
|
packetizerEntry = packetizer.NewAdtsPacketizer(reader)
|
|
}
|
|
|
|
if encoderFormat == nil {
|
|
switch config.Container {
|
|
case "ogg":
|
|
encoderFormat = opus.NewFormat()
|
|
if bitrate != nil {
|
|
options["bitrate"] = bitrate
|
|
}
|
|
|
|
options["complexity"] = config.GetIntOption("complexity", 10)
|
|
//Lower delay than just one packet per second default
|
|
options["delay"] = 300
|
|
sampleRate = opus.FixedSampleRate
|
|
mimeType = "audio/ogg;codecs=opus"
|
|
packetizerEntry = packetizer.NewOggPacketizer(reader)
|
|
case "mp3":
|
|
encoderFormat = mp3.NewFormat()
|
|
if bitrate != nil {
|
|
options["bitrate"] = bitrate
|
|
}
|
|
mimeType = "audio/mpeg;codecs=mp3"
|
|
packetizerEntry = packetizer.NewMp3Packetizer(reader)
|
|
case "flac":
|
|
encoderFormat = flac.NewFormat()
|
|
|
|
options["bitdepth"] = config.GetIntOption("bitdepth", 16)
|
|
options["compression_level"] = config.GetIntOption("compression_level", 8)
|
|
options["block_size"] = config.GetIntOption("block_size", 0)
|
|
|
|
mimeType = "audio/flac"
|
|
packetizerEntry = packetizer.NewFLACPacketizer(reader)
|
|
case "adts", "aac":
|
|
encoderFormat = aac.NewFormat()
|
|
if bitrate != nil {
|
|
options["bitrate"] = bitrate
|
|
}
|
|
options["mode"] = config.GetStringOption("mode", "lc")
|
|
options["afterburner"] = config.GetBoolOption("afterburner", true)
|
|
mimeType = "audio/aac;codecs=mp4a.40.2"
|
|
if options["mode"] == "he" || options["mode"] == "hev1" || options["bitrate"] == "vbr2" {
|
|
mimeType = "audio/aac;codecs=mp4a.40.5"
|
|
} else if options["mode"] == "hev2" || options["bitrate"] == "vbr1" {
|
|
mimeType = "audio/aac;codecs=mp4a.40.29"
|
|
}
|
|
packetizerEntry = packetizer.NewAdtsPacketizer(reader)
|
|
}
|
|
}
|
|
|
|
if encoderFormat == nil || packetizerEntry == nil {
|
|
return nil
|
|
}
|
|
|
|
go func() {
|
|
defer writer.Close()
|
|
|
|
if channels != source.GetChannels() {
|
|
if channels == 1 {
|
|
source = filter.MonoFilter{}.Process(source)
|
|
} else if channels == 2 {
|
|
source = filter.StereoFilter{}.Process(source)
|
|
}
|
|
}
|
|
|
|
if sampleRate != source.GetSampleRate() {
|
|
source = filter.NewResampleFilter(sampleRate, filter.QualityFastest, 0).Process(source)
|
|
}
|
|
|
|
if err := encoderFormat.Encode(source, writer, options); err != nil {
|
|
log.Panic(err)
|
|
}
|
|
}()
|
|
|
|
return &StreamMount{
|
|
Mount: config.MountPath,
|
|
MimeType: mimeType,
|
|
FormatDescription: encoderFormat.EncoderDescription(),
|
|
Packetizer: packetizerEntry,
|
|
SampleRate: sampleRate,
|
|
OffsetStart: config.GetBoolOption("offset_start", true),
|
|
Channels: channels,
|
|
Options: options,
|
|
MetadataQueue: goconcurrentqueue.NewFIFO(),
|
|
}
|
|
}
|
|
|
|
func (m *StreamMount) removeDiscard(sampleNumber int64) {
|
|
for i, p := range m.keepBuffer {
|
|
if p.KeepMode() == packetizer.Discard && p.GetEndSampleNumber() <= sampleNumber {
|
|
m.keepBuffer = append(m.keepBuffer[:i], m.keepBuffer[i+1:]...)
|
|
m.removeDiscard(sampleNumber)
|
|
break
|
|
} else if p.GetEndSampleNumber() > sampleNumber {
|
|
//they are placed in order
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
func (m *StreamMount) removeKeepLast(category int64) {
|
|
for i, p := range m.keepBuffer {
|
|
if p.Category() == category && p.KeepMode() == packetizer.KeepLast {
|
|
m.keepBuffer = append(m.keepBuffer[:i], m.keepBuffer[i+1:]...)
|
|
m.removeKeepLast(category)
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
func (m *StreamMount) removeGroupKeep(category int64) {
|
|
for i, p := range m.keepBuffer {
|
|
if p.Category() == category && p.KeepMode() == packetizer.GroupKeep {
|
|
m.keepBuffer = append(m.keepBuffer[:i], m.keepBuffer[i+1:]...)
|
|
m.removeGroupKeep(category)
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
func (m *StreamMount) AddListener(listener *StreamListener) {
|
|
m.listenersLock.Lock()
|
|
defer m.listenersLock.Unlock()
|
|
m.listeners = append(m.listeners, listener)
|
|
}
|
|
|
|
func (m *StreamMount) RemoveListener(identifier string, direct ...bool) bool {
|
|
if (len(direct) > 0 && direct[0]) || func() bool {
|
|
m.listenersLock.RLock()
|
|
defer m.listenersLock.RUnlock()
|
|
for _, l := range m.listeners {
|
|
if l.Information.Identifier == identifier {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}() {
|
|
m.listenersLock.Lock()
|
|
defer m.listenersLock.Unlock()
|
|
for i := range m.listeners {
|
|
l := m.listeners[i]
|
|
if l.Information.Identifier == identifier {
|
|
m.listeners = append(m.listeners[:i], m.listeners[i+1:]...)
|
|
l.Close()
|
|
return true
|
|
}
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func (m *StreamMount) GetListeners() (entries []*ListenerInformation) {
|
|
m.listenersLock.RLock()
|
|
defer m.listenersLock.RUnlock()
|
|
for _, l := range m.listeners {
|
|
entries = append(entries, &l.Information)
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
func (m *StreamMount) handlePacket(packet packetizer.Packet) {
|
|
var toRemove []string
|
|
|
|
//TODO: do this via goroutine messaging?
|
|
func() {
|
|
m.listenersLock.RLock()
|
|
defer m.listenersLock.RUnlock()
|
|
var err error
|
|
for _, l := range m.listeners {
|
|
if l.Start != nil {
|
|
l.Start(m.keepBuffer)
|
|
l.Start = nil
|
|
}
|
|
if err = l.Write(packet); err != nil {
|
|
log.Printf("failed to write data to %s client: %s\n", l.Information.Identifier, err)
|
|
toRemove = append(toRemove, l.Information.Identifier)
|
|
}
|
|
}
|
|
|
|
}()
|
|
|
|
for _, id := range toRemove {
|
|
m.RemoveListener(id, true)
|
|
}
|
|
|
|
sampleLimit := packet.GetEndSampleNumber() - int64(maxBufferSize*m.SampleRate)
|
|
|
|
m.removeDiscard(sampleLimit) //always remove discards
|
|
|
|
switch packet.KeepMode() {
|
|
case packetizer.KeepLast:
|
|
m.removeKeepLast(packet.Category())
|
|
fallthrough
|
|
case packetizer.Keep:
|
|
m.keepBuffer = append(m.keepBuffer, packet)
|
|
case packetizer.GroupKeep:
|
|
m.keepBuffer = append(m.keepBuffer, packet)
|
|
case packetizer.GroupDiscard:
|
|
m.removeGroupKeep(packet.Category())
|
|
case packetizer.Discard:
|
|
m.keepBuffer = append(m.keepBuffer, packet)
|
|
|
|
}
|
|
}
|
|
|
|
func (m *StreamMount) Process(group *sync.WaitGroup) {
|
|
defer group.Done()
|
|
defer func() {
|
|
//Teardown all listeners
|
|
m.listenersLock.Lock()
|
|
for _, l := range m.listeners {
|
|
l.Close()
|
|
}
|
|
m.listeners = m.listeners[:0]
|
|
m.listenersLock.Unlock()
|
|
}()
|
|
|
|
for {
|
|
packet := m.Packetizer.GetPacket()
|
|
if packet == nil {
|
|
return
|
|
}
|
|
|
|
if item, err := m.MetadataQueue.Get(0); err == nil {
|
|
if metadataPacket, ok := item.(*QueueMetadataPacket); ok {
|
|
if packet.GetEndSampleNumber() > metadataPacket.GetStartSampleNumber() {
|
|
m.MetadataQueue.Dequeue()
|
|
m.handlePacket(metadataPacket)
|
|
}
|
|
|
|
}
|
|
}
|
|
|
|
m.handlePacket(packet)
|
|
|
|
}
|
|
|
|
}
|