Implemented precise metadata and timing information packet stream
All checks were successful
continuous-integration/drone/push Build is passing

This commit is contained in:
DataHoarder 2022-04-20 13:20:22 +02:00
parent f395d6746e
commit 2650039581
Signed by: DataHoarder
SSH key fingerprint: SHA256:OLTRf6Fl87G52SiR7sWLGNzlJt4WOX+tfI2yxo0z7xk
2 changed files with 144 additions and 4 deletions

View file

@ -16,9 +16,7 @@ Radio streamer ([kawa](https://github.com/Luminarys/kawa) drop-in compatible).
* Supports extra encoder bitrate control settings (CBR, VBR, auto, etc.)
* Can read and apply ReplayGain tags, or normalize audio loudness.
* Can have audio sources over HTTP(s) URLs on `path` property, and supports seeking.
# Future improvements
* Implement precise timing information side-channel
* Precise metadata and timing information packet stream, trigger via `x-audio-packet-stream: 1` HTTP header.
## Dependencies
### Go >= 1.18

144
queue.go
View file

@ -1,6 +1,9 @@
package main
import (
"bytes"
"encoding/binary"
"encoding/json"
"errors"
"fmt"
"git.gammaspectra.live/S.O.N.G/Kirika/audio"
@ -386,6 +389,50 @@ func (q *Queue) GetListeners() (listeners []*ListenerInformation) {
return
}
type packetStreamType uint64
const (
Header = packetStreamType(iota)
DataKeepLast
DataKeep
DataGroupKeep
DataGroupDiscard
DataDiscard
TrackIdentifier
TrackMetadata
)
type packetStreamFrame struct {
Type packetStreamType
StartSampleNumber int64
DurationInSamples int64
//automatically filled based on Data
//Size uint64
Data []byte
}
func (p *packetStreamFrame) Encode() []byte {
buf := new(bytes.Buffer)
varBuf := make([]byte, binary.MaxVarintLen64)
n := binary.PutUvarint(varBuf, uint64(p.Type))
buf.Write(varBuf[:n])
n = binary.PutVarint(varBuf, p.StartSampleNumber)
buf.Write(varBuf[:n])
n = binary.PutVarint(varBuf, p.DurationInSamples)
buf.Write(varBuf[:n])
n = binary.PutUvarint(varBuf, uint64(len(p.Data)))
buf.Write(varBuf[:n])
buf.Write(p.Data)
return buf.Bytes()
}
func (q *Queue) HandleRadioRequest(writer http.ResponseWriter, request *http.Request) {
for _, mount := range q.mounts {
if strings.HasSuffix(request.URL.Path, mount.Mount) {
@ -453,7 +500,102 @@ func (q *Queue) HandleRadioRequest(writer http.ResponseWriter, request *http.Req
var requestDone error
var wgClient sync.WaitGroup
if numberValue, err := strconv.Atoi(request.Header.Get("icy-metadata")); err == nil && numberValue >= 1 {
//set X-Audio-Packet-Stream for strictly timed packets and metadata
if numberValue, err := strconv.Atoi(request.Header.Get("x-audio-packet-stream")); err == nil && numberValue == 1 {
//version 1
packetWriteCallback = func(packet packetizer.Packet) error {
if requestDone != nil {
return requestDone
}
if metadataPacket, ok := packet.(*QueueMetadataPacket); ok {
queueInfoBuf := make([]byte, binary.MaxVarintLen64)
n := binary.PutVarint(queueInfoBuf, int64(metadataPacket.TrackEntry.QueueIdentifier))
if len(writeChannel) >= (byteSliceChannelBuffer - 1) {
requestDone = errors.New("client ran out of buffer")
log.Printf("failed to write data to client: %s\n", requestDone)
return requestDone
}
writeChannel <- (&packetStreamFrame{
Type: TrackIdentifier,
StartSampleNumber: packet.GetStartSampleNumber(),
DurationInSamples: packet.GetEndSampleNumber() - packet.GetStartSampleNumber(),
Data: queueInfoBuf[:n],
}).Encode()
if metadataBytes, err := json.Marshal(metadataPacket.TrackEntry.Metadata); err == nil {
if len(writeChannel) >= (byteSliceChannelBuffer - 1) {
requestDone = errors.New("client ran out of buffer")
log.Printf("failed to write data to client: %s\n", requestDone)
return requestDone
}
writeChannel <- (&packetStreamFrame{
Type: TrackMetadata,
StartSampleNumber: packet.GetStartSampleNumber(),
DurationInSamples: packet.GetEndSampleNumber() - packet.GetStartSampleNumber(),
Data: metadataBytes,
}).Encode()
}
return nil
}
if len(writeChannel) >= (byteSliceChannelBuffer - 1) {
requestDone = errors.New("client ran out of buffer")
log.Printf("failed to write data to client: %s\n", requestDone)
return requestDone
}
//TODO: category
var frameType packetStreamType
switch packet.KeepMode() {
case packetizer.KeepLast:
frameType = DataKeepLast
case packetizer.Keep:
frameType = DataKeep
case packetizer.GroupKeep:
frameType = DataGroupKeep
case packetizer.GroupDiscard:
frameType = DataGroupDiscard
case packetizer.Discard:
frameType = DataDiscard
default:
return errors.New("unknown KeepMode")
}
writeChannel <- (&packetStreamFrame{
Type: frameType,
StartSampleNumber: packet.GetStartSampleNumber(),
DurationInSamples: packet.GetEndSampleNumber() - packet.GetStartSampleNumber(),
Data: packet.GetData(),
}).Encode()
return nil
}
type headerData struct {
Channels int64
SampleRate int64
MimeType string
}
headerBytes := new(bytes.Buffer)
binary.Write(headerBytes, binary.LittleEndian, int64(2))
binary.Write(headerBytes, binary.LittleEndian, int64(mount.SampleRate))
binary.Write(headerBytes, binary.LittleEndian, int32(len(mount.MimeType)))
headerBytes.Write([]byte(mount.MimeType))
writeChannel <- (&packetStreamFrame{
Type: Header,
StartSampleNumber: 0,
DurationInSamples: 0,
Data: headerBytes.Bytes(),
}).Encode()
} else if numberValue, err = strconv.Atoi(request.Header.Get("icy-metadata")); err == nil && numberValue >= 1 {
metadataToSend := make(map[string]string)
const icyInterval = 8192 //weird clients might not support other numbers than this
icyCounter := 0