MeteorLight/stream/mount.go

354 lines
9.7 KiB
Go

package stream
import (
"git.gammaspectra.live/S.O.N.G/Kirika/audio"
"git.gammaspectra.live/S.O.N.G/Kirika/audio/filter"
"git.gammaspectra.live/S.O.N.G/Kirika/audio/format"
"git.gammaspectra.live/S.O.N.G/Kirika/audio/format/aac"
"git.gammaspectra.live/S.O.N.G/Kirika/audio/format/flac"
"git.gammaspectra.live/S.O.N.G/Kirika/audio/format/mp3"
"git.gammaspectra.live/S.O.N.G/Kirika/audio/format/opus"
"git.gammaspectra.live/S.O.N.G/Kirika/audio/format/vorbis"
"git.gammaspectra.live/S.O.N.G/Kirika/audio/packetizer"
"git.gammaspectra.live/S.O.N.G/MeteorLight/config"
"git.gammaspectra.live/S.O.N.G/MeteorLight/listener"
"git.gammaspectra.live/S.O.N.G/MeteorLight/queue/metadata"
"golang.org/x/exp/slices"
"io"
"log"
"sync"
)
type Mount struct {
Mount string
MimeType string
FormatDescription string
Packetizer packetizer.Packetizer
Options map[string]interface{}
SampleRate int
Channels int
OffsetStart bool
metadataQueue chan *metadata.Packet
listeners []listener.Listener
listenersLock sync.RWMutex
keepBuffer []packetizer.Packet
}
func NewStreamMount(source audio.Source, config *config.StreamConfig) *Mount {
var encoderFormat format.Encoder
options := make(map[string]interface{})
var mimeType string
reader, writer := io.Pipe()
var packetizerEntry packetizer.Packetizer
bitrate := config.GetOption("bitrate", nil)
sampleRate := config.GetIntOption("samplerate", source.GetSampleRate())
channels := config.GetIntOption("channels", source.GetChannels())
switch config.Codec {
case "vorbis":
encoderFormat = vorbis.NewFormat()
if bitrate != nil {
options["bitrate"] = bitrate
}
mimeType = "audio/ogg;codecs=vorbis"
packetizerEntry = packetizer.NewOggPacketizer(reader)
case "opus":
encoderFormat = opus.NewFormat()
if bitrate != nil {
options["bitrate"] = bitrate
}
options["complexity"] = config.GetIntOption("complexity", 10)
//Lower delay than just one packet per second default
options["delay"] = 300
sampleRate = opus.FixedSampleRate
mimeType = "audio/ogg;codecs=opus"
packetizerEntry = packetizer.NewOggPacketizer(reader)
case "mp3":
encoderFormat = mp3.NewFormat()
if bitrate != nil {
options["bitrate"] = bitrate
}
mimeType = "audio/mpeg;codecs=mp3"
packetizerEntry = packetizer.NewMp3Packetizer(reader)
case "flac":
encoderFormat = flac.NewFormat()
options["bitdepth"] = config.GetIntOption("bitdepth", 16)
options["compression_level"] = config.GetIntOption("compression_level", 8)
options["block_size"] = config.GetIntOption("block_size", 0)
mimeType = "audio/flac"
if config.Container == "ogg" {
options["ogg"] = true
mimeType = "audio/ogg;codecs=flac"
packetizerEntry = packetizer.NewOggPacketizer(reader)
} else {
packetizerEntry = packetizer.NewFLACPacketizer(reader)
}
case "aac":
encoderFormat = aac.NewFormat()
if bitrate != nil {
options["bitrate"] = bitrate
}
options["mode"] = config.GetStringOption("mode", "lc")
options["afterburner"] = config.GetBoolOption("afterburner", true)
mimeType = "audio/aac;codecs=mp4a.40.2"
if options["mode"] == "he" || options["mode"] == "hev1" || options["bitrate"] == "vbr2" {
mimeType = "audio/aac;codecs=mp4a.40.5"
} else if options["mode"] == "hev2" || options["bitrate"] == "vbr1" {
mimeType = "audio/aac;codecs=mp4a.40.29"
}
packetizerEntry = packetizer.NewAdtsPacketizer(reader)
}
if encoderFormat == nil {
switch config.Container {
case "ogg":
encoderFormat = opus.NewFormat()
if bitrate != nil {
options["bitrate"] = bitrate
}
options["complexity"] = config.GetIntOption("complexity", 10)
//Lower delay than just one packet per second default
options["delay"] = 300
sampleRate = opus.FixedSampleRate
mimeType = "audio/ogg;codecs=opus"
packetizerEntry = packetizer.NewOggPacketizer(reader)
case "mp3":
encoderFormat = mp3.NewFormat()
if bitrate != nil {
options["bitrate"] = bitrate
}
mimeType = "audio/mpeg;codecs=mp3"
packetizerEntry = packetizer.NewMp3Packetizer(reader)
case "flac":
encoderFormat = flac.NewFormat()
options["bitdepth"] = config.GetIntOption("bitdepth", 16)
options["compression_level"] = config.GetIntOption("compression_level", 8)
options["block_size"] = config.GetIntOption("block_size", 0)
mimeType = "audio/flac"
packetizerEntry = packetizer.NewFLACPacketizer(reader)
case "adts", "aac":
encoderFormat = aac.NewFormat()
if bitrate != nil {
options["bitrate"] = bitrate
}
options["mode"] = config.GetStringOption("mode", "lc")
options["afterburner"] = config.GetBoolOption("afterburner", true)
mimeType = "audio/aac;codecs=mp4a.40.2"
if options["mode"] == "he" || options["mode"] == "hev1" || options["bitrate"] == "vbr2" {
mimeType = "audio/aac;codecs=mp4a.40.5"
} else if options["mode"] == "hev2" || options["bitrate"] == "vbr1" {
mimeType = "audio/aac;codecs=mp4a.40.29"
}
packetizerEntry = packetizer.NewAdtsPacketizer(reader)
}
}
if encoderFormat == nil || packetizerEntry == nil {
return nil
}
go func() {
defer writer.Close()
if channels != source.GetChannels() {
if channels == 1 {
source = filter.MonoFilter{}.Process(source)
} else if channels == 2 {
source = filter.StereoFilter{}.Process(source)
}
}
if sampleRate != source.GetSampleRate() {
source = filter.NewResampleFilter(sampleRate, filter.QualityFastest, 0).Process(source)
}
if err := encoderFormat.Encode(source, writer, options); err != nil {
log.Panic(err)
}
}()
return &Mount{
Mount: config.MountPath,
MimeType: mimeType,
FormatDescription: encoderFormat.EncoderDescription(),
Packetizer: packetizerEntry,
SampleRate: sampleRate,
OffsetStart: config.GetBoolOption("offset_start", true),
Channels: channels,
Options: options,
metadataQueue: make(chan *metadata.Packet, 1),
}
}
func (m *Mount) removeDiscard(sampleNumber int64) {
for i, p := range m.keepBuffer {
if p.KeepMode() == packetizer.Discard && p.GetEndSampleNumber() <= sampleNumber {
m.keepBuffer = slices.Delete(m.keepBuffer, i, i+1)
m.removeDiscard(sampleNumber)
break
} else if p.GetEndSampleNumber() > sampleNumber {
//they are placed in order
break
}
}
}
func (m *Mount) removeKeepLast(category int64) {
for i, p := range m.keepBuffer {
if p.Category() == category && p.KeepMode() == packetizer.KeepLast {
m.keepBuffer = slices.Delete(m.keepBuffer, i, i+1)
m.removeKeepLast(category)
break
}
}
}
func (m *Mount) removeGroupKeep(category int64) {
for i, p := range m.keepBuffer {
if p.Category() == category && p.KeepMode() == packetizer.GroupKeep {
m.keepBuffer = slices.Delete(m.keepBuffer, i, i+1)
m.removeGroupKeep(category)
break
}
}
}
func (m *Mount) AddListener(listener listener.Listener) {
m.listenersLock.Lock()
defer m.listenersLock.Unlock()
m.listeners = append(m.listeners, listener)
}
func (m *Mount) RemoveListener(identifier string, direct ...bool) bool {
if (len(direct) > 0 && direct[0]) || func() bool {
m.listenersLock.RLock()
defer m.listenersLock.RUnlock()
for _, l := range m.listeners {
if l.Identifier() == identifier {
return true
}
}
return false
}() {
m.listenersLock.Lock()
defer m.listenersLock.Unlock()
for i := range m.listeners {
l := m.listeners[i]
if l.Identifier() == identifier {
m.listeners = slices.Delete(m.listeners, i, i+1)
l.Close()
return true
}
}
}
return false
}
func (m *Mount) GetListeners() (entries []*listener.Information) {
m.listenersLock.RLock()
defer m.listenersLock.RUnlock()
for _, l := range m.listeners {
entries = append(entries, l.Information())
}
return
}
func (m *Mount) handlePacket(packet packetizer.Packet) {
var toRemove []string
//TODO: do this via goroutine messaging?
func() {
m.listenersLock.RLock()
defer m.listenersLock.RUnlock()
var err error
for _, l := range m.listeners {
if !l.HasStarted() {
if err = l.Start(m.keepBuffer); err != nil {
log.Printf("failed to write data to %s client: %s\n", l.Identifier(), err)
toRemove = append(toRemove, l.Identifier())
continue
}
}
if err = l.Write(packet); err != nil {
log.Printf("failed to write data to %s client: %s\n", l.Identifier(), err)
toRemove = append(toRemove, l.Identifier())
}
}
}()
for _, id := range toRemove {
m.RemoveListener(id, true)
}
sampleLimit := packet.GetEndSampleNumber() - int64(config.MaxBufferSize*m.SampleRate)
m.removeDiscard(sampleLimit) //always remove discards
switch packet.KeepMode() {
case packetizer.KeepLast:
m.removeKeepLast(packet.Category())
fallthrough
case packetizer.Keep:
m.keepBuffer = append(m.keepBuffer, packet)
case packetizer.GroupKeep:
m.keepBuffer = append(m.keepBuffer, packet)
case packetizer.GroupDiscard:
m.removeGroupKeep(packet.Category())
case packetizer.Discard:
m.keepBuffer = append(m.keepBuffer, packet)
}
}
func (m *Mount) QueueMetadata(meta *metadata.Packet) {
if meta != nil {
m.metadataQueue <- meta
}
}
func (m *Mount) Process(group *sync.WaitGroup) {
defer group.Done()
defer func() {
//Teardown all listeners
m.listenersLock.Lock()
for _, l := range m.listeners {
l.Close()
}
m.listeners = m.listeners[:0]
m.listenersLock.Unlock()
}()
var metadataPacket *metadata.Packet
for {
packet := m.Packetizer.GetPacket()
if packet == nil {
return
}
if metadataPacket == nil && len(m.metadataQueue) > 0 {
metadataPacket = <-m.metadataQueue
}
if metadataPacket != nil && packet.GetEndSampleNumber() > metadataPacket.GetStartSampleNumber() {
m.handlePacket(metadataPacket)
metadataPacket = nil
}
m.handlePacket(packet)
}
}