package plain import ( "git.gammaspectra.live/S.O.N.G/Kirika/audio/packetizer" "git.gammaspectra.live/S.O.N.G/MeteorLight/listener" "git.gammaspectra.live/S.O.N.G/MeteorLight/queue/metadata" "sync/atomic" ) type Listener struct { information listener.Information started atomic.Bool streamStartOffset int64 streamSamplesBuffer int64 writer listener.WriterFunc waiter chan struct{} offsetStart bool } func NewListener(info listener.Information, writer listener.WriterFunc, samplesToBuffer int64, offsetStart bool) (*Listener, map[string]string) { return &Listener{ information: info, writer: writer, offsetStart: offsetStart, streamSamplesBuffer: samplesToBuffer, streamStartOffset: -1, waiter: make(chan struct{}), }, nil } func (l *Listener) Wait() { <-l.waiter } func (l *Listener) Identifier() string { return l.information.Identifier } func (l *Listener) Information() *listener.Information { return &l.information } func (l *Listener) HasStarted() bool { return l.started.Load() } func (l *Listener) Start(packets []packetizer.Packet) error { if l.started.Swap(true) { return nil } if len(packets) > 0 { sampleBufferMin := packets[len(packets)-1].GetStartSampleNumber() - l.streamSamplesBuffer for _, p := range packets { if p.KeepMode() != packetizer.Discard || p.GetEndSampleNumber() >= sampleBufferMin { if err := l.Write(p); err != nil { return err } } } } return nil } func (l *Listener) Write(packet packetizer.Packet) error { if _, ok := packet.(*metadata.Packet); ok { return nil } if offsetable, ok := packet.(packetizer.OffsetablePacket); l.offsetStart && ok { if l.streamStartOffset <= -1 { if offsetable.KeepMode() != packetizer.Keep { l.streamStartOffset = offsetable.GetStartSampleNumber() return l.writer(offsetable.GetDataOffset(l.streamStartOffset)) } else { return l.writer(packet.GetData()) } } else { return l.writer(offsetable.GetDataOffset(l.streamStartOffset)) } } else { return l.writer(packet.GetData()) } } func (l *Listener) Close() { _ = l.writer(nil) close(l.waiter) }