MeteorLight/listener/plain/plain.go

99 lines
2.3 KiB
Go

package plain
import (
"errors"
"git.gammaspectra.live/S.O.N.G/Kirika/audio/packetizer"
"git.gammaspectra.live/S.O.N.G/MeteorLight/listener"
"git.gammaspectra.live/S.O.N.G/MeteorLight/queue/metadata"
"git.gammaspectra.live/S.O.N.G/MeteorLight/util"
"sync/atomic"
)
type Listener struct {
information listener.Information
started atomic.Bool
streamStartOffset int64
streamSamplesBuffer int64
ctx *util.RequestDone
sliceWriter chan []byte
offsetStart bool
}
func NewListener(info listener.Information, ctx *util.RequestDone, sliceWriter chan []byte, samplesToBuffer int64, offsetStart bool) (*Listener, map[string]string) {
return &Listener{
information: info,
ctx: ctx,
sliceWriter: sliceWriter,
offsetStart: offsetStart,
streamSamplesBuffer: samplesToBuffer,
streamStartOffset: -1,
}, nil
}
func (l *Listener) Identifier() string {
return l.information.Identifier
}
func (l *Listener) Information() *listener.Information {
return &l.information
}
func (l *Listener) HasStarted() bool {
return l.started.Load()
}
func (l *Listener) Start(packets []packetizer.Packet) error {
if l.started.Swap(true) {
return nil
}
if len(packets) > 0 {
sampleBufferMin := packets[len(packets)-1].GetStartSampleNumber() - l.streamSamplesBuffer
for _, p := range packets {
if p.KeepMode() != packetizer.Discard || p.GetEndSampleNumber() >= sampleBufferMin {
if err := l.Write(p); err != nil {
return err
}
}
}
}
return nil
}
func (l *Listener) Write(packet packetizer.Packet) error {
if l.ctx.Done() {
return l.ctx.Error()
}
if _, ok := packet.(*metadata.Packet); ok {
return nil
}
if len(l.sliceWriter) >= (cap(l.sliceWriter) - 1) {
l.ctx.Fail(errors.New("client ran out of writer buffer"))
return l.ctx.Error()
}
if offsetable, ok := packet.(packetizer.OffsetablePacket); l.offsetStart && ok {
if l.streamStartOffset <= -1 {
if offsetable.KeepMode() != packetizer.Keep {
l.streamStartOffset = offsetable.GetStartSampleNumber()
l.sliceWriter <- offsetable.GetDataOffset(l.streamStartOffset)
} else {
l.sliceWriter <- packet.GetData()
}
} else {
l.sliceWriter <- offsetable.GetDataOffset(l.streamStartOffset)
}
} else {
l.sliceWriter <- packet.GetData()
}
return nil
}
func (l *Listener) Close() {
close(l.sliceWriter)
}