MeteorLight/listener/aps1/aps1.go

212 lines
5.4 KiB
Go

package aps1
import (
"bytes"
"encoding/binary"
"encoding/json"
"errors"
"git.gammaspectra.live/S.O.N.G/Kirika/audio/packetizer"
"git.gammaspectra.live/S.O.N.G/MeteorLight/listener"
"git.gammaspectra.live/S.O.N.G/MeteorLight/queue/metadata"
"git.gammaspectra.live/S.O.N.G/MeteorLight/util"
"sync/atomic"
)
type Listener struct {
information listener.Information
started atomic.Bool
streamStartOffset int64
streamSamplesBuffer int64
ctx *util.RequestDone
sliceWriter chan []byte
offsetStart bool
headerBytes []byte
}
func NewListener(info listener.Information, ctx *util.RequestDone, sliceWriter chan []byte, samplesToBuffer int64, offsetStart bool, channels, sampleRate int, mimeType string) (*Listener, map[string]string) {
headers := make(map[string]string)
headers["x-audio-packet-stream"] = "1"
headers["content-type"] = "application/x-audio-packet-stream"
headerBytes := new(bytes.Buffer)
binary.Write(headerBytes, binary.LittleEndian, int64(channels))
binary.Write(headerBytes, binary.LittleEndian, int64(sampleRate))
binary.Write(headerBytes, binary.LittleEndian, int32(len(mimeType)))
headerBytes.Write([]byte(mimeType))
return &Listener{
information: info,
ctx: ctx,
sliceWriter: sliceWriter,
offsetStart: offsetStart,
streamSamplesBuffer: samplesToBuffer,
streamStartOffset: -1,
headerBytes: headerBytes.Bytes(),
}, headers
}
func (l *Listener) Identifier() string {
return l.information.Identifier
}
func (l *Listener) Information() *listener.Information {
return &l.information
}
func (l *Listener) HasStarted() bool {
return l.started.Load()
}
func (l *Listener) Start(packets []packetizer.Packet) error {
if l.started.Swap(true) {
return nil
}
l.sliceWriter <- (&packetStreamFrame{
Type: Header,
Category: 0,
StartSampleNumber: 0,
DurationInSamples: 0,
Data: l.headerBytes,
}).Encode()
if len(packets) > 0 {
sampleBufferMin := packets[len(packets)-1].GetStartSampleNumber() - l.streamSamplesBuffer
for _, p := range packets {
if p.KeepMode() != packetizer.Discard || p.GetEndSampleNumber() >= sampleBufferMin {
if err := l.Write(p); err != nil {
return err
}
}
}
}
return nil
}
func (l *Listener) Write(packet packetizer.Packet) error {
if l.ctx.Done() {
return l.ctx.Error()
}
if metadataPacket, ok := packet.(*metadata.Packet); ok {
queueInfoBuf := make([]byte, binary.MaxVarintLen64)
n := binary.PutVarint(queueInfoBuf, int64(metadataPacket.TrackEntry.Identifier))
if len(l.sliceWriter) >= (cap(l.sliceWriter) - 1) {
l.ctx.Fail(errors.New("client ran out of writer buffer"))
return l.ctx.Error()
}
l.sliceWriter <- (&packetStreamFrame{
Type: TrackIdentifier,
Category: packet.Category(),
StartSampleNumber: packet.GetStartSampleNumber(),
DurationInSamples: packet.GetEndSampleNumber() - packet.GetStartSampleNumber(),
Data: queueInfoBuf[:n],
}).Encode()
if metadataBytes, err := json.Marshal(metadataPacket.TrackEntry.Metadata); err == nil {
if len(l.sliceWriter) >= (cap(l.sliceWriter) - 1) {
l.ctx.Fail(errors.New("client ran out of writer buffer"))
return l.ctx.Error()
}
l.sliceWriter <- (&packetStreamFrame{
Type: TrackMetadata,
Category: packet.Category(),
StartSampleNumber: packet.GetStartSampleNumber(),
DurationInSamples: packet.GetEndSampleNumber() - packet.GetStartSampleNumber(),
Data: metadataBytes,
}).Encode()
}
return nil
}
if len(l.sliceWriter) >= (cap(l.sliceWriter) - 1) {
l.ctx.Fail(errors.New("client ran out of writer buffer"))
return l.ctx.Error()
}
var frameType PacketStreamType
switch packet.KeepMode() {
case packetizer.KeepLast:
frameType = DataKeepLast
case packetizer.Keep:
frameType = DataKeep
case packetizer.GroupKeep:
frameType = DataGroupKeep
case packetizer.GroupDiscard:
frameType = DataGroupDiscard
case packetizer.Discard:
frameType = DataDiscard
default:
return errors.New("unknown KeepMode")
}
l.sliceWriter <- (&packetStreamFrame{
Type: frameType,
Category: packet.Category(),
StartSampleNumber: packet.GetStartSampleNumber(),
DurationInSamples: packet.GetEndSampleNumber() - packet.GetStartSampleNumber(),
Data: packet.GetData(),
}).Encode()
return nil
}
func (l *Listener) Close() {
close(l.sliceWriter)
}
type PacketStreamType uint64
// PacketStreamType The order of these fields is important and set on-wire protocol
const (
Header = PacketStreamType(iota)
DataKeepLast
DataKeep
DataGroupKeep
DataGroupDiscard
DataDiscard
TrackIdentifier
TrackMetadata
)
type packetStreamFrame struct {
Type PacketStreamType
Category int64
StartSampleNumber int64
DurationInSamples int64
//automatically filled based on Data
//Size uint64
Data []byte
}
func (p *packetStreamFrame) Encode() []byte {
buf := new(bytes.Buffer)
varBuf := make([]byte, binary.MaxVarintLen64)
n := binary.PutUvarint(varBuf, uint64(p.Type))
buf.Write(varBuf[:n])
n = binary.PutUvarint(varBuf, uint64(p.Category))
buf.Write(varBuf[:n])
n = binary.PutVarint(varBuf, p.StartSampleNumber)
buf.Write(varBuf[:n])
n = binary.PutVarint(varBuf, p.DurationInSamples)
buf.Write(varBuf[:n])
n = binary.PutUvarint(varBuf, uint64(len(p.Data)))
buf.Write(varBuf[:n])
buf.Write(p.Data)
return buf.Bytes()
}