Add frame server / frame server pool, allow blind frame jumps on y4m

This commit is contained in:
DataHoarder 2022-11-13 13:41:21 +01:00
parent cbb9a797c0
commit 0f98ce1c4f
Signed by: DataHoarder
SSH key fingerprint: SHA256:OLTRf6Fl87G52SiR7sWLGNzlJt4WOX+tfI2yxo0z7xk
4 changed files with 270 additions and 2 deletions

View file

@ -21,6 +21,8 @@ type Decoder struct {
frameSize int
frameStartOffset int64
frameCounter int
bufPool sync.Pool
@ -73,6 +75,10 @@ func (s *Decoder) Properties() frame.StreamProperties {
}
func (s *Decoder) SeekToFrame(frameNumber int) (err error) {
if s.frameCounter == frameNumber {
return nil
}
if seeker, ok := s.r.(io.Seeker); ok {
if frameNumber >= 0 && len(s.frameSeekTable) > frameNumber && s.frameSeekTable[frameNumber] != 0 {
if _, err = seeker.Seek(s.frameSeekTable[frameNumber], io.SeekStart); err != nil {
@ -82,6 +88,27 @@ func (s *Decoder) SeekToFrame(frameNumber int) (err error) {
s.frameCounter = frameNumber
return nil
} else if frameNumber >= 0 && len(s.frameSeekTable) > 0 {
//attempt blind seek from last decoded frame
framesToSeekFromLast := frameNumber + 1 - len(s.frameSeekTable)
if _, err = seeker.Seek(s.frameSeekTable[len(s.frameSeekTable)-1]+int64(5+1+s.frameSize)*int64(framesToSeekFromLast), io.SeekStart); err != nil {
return err
}
s.frameCounter = frameNumber
return nil
} else if frameNumber >= 0 && s.frameStartOffset != 0 {
//attempt full blind seek from start
if _, err = seeker.Seek(s.frameStartOffset+int64(5+1+s.frameSize)*int64(frameNumber), io.SeekStart); err != nil {
return err
}
s.frameCounter = frameNumber
return nil
} else {
return errors.New("frameNumber out of range")
}
@ -224,10 +251,11 @@ func (s *Decoder) readFrameHeader() (parameters map[Parameter][]string, err erro
data = append(data, buf[0])
}
parameters = make(map[Parameter][]string)
for _, v := range strings.Split(string(data), " ") {
if len(v) > 1 {
if parameters == nil {
parameters = make(map[Parameter][]string)
}
if slice, ok := parameters[Parameter(v[0])]; ok {
parameters[Parameter(v[0])] = append(slice, v[1:])
} else {
@ -310,6 +338,12 @@ func (s *Decoder) parseParameters() (err error) {
s.frameSize, err = s.properties.ColorFormat.FrameSize(s.properties.Width, s.properties.Height)
if seeker, ok := s.r.(io.Seeker); ok {
if index, err := seeker.Seek(0, io.SeekCurrent); err == nil {
s.frameStartOffset = index
}
}
return err
}

View file

@ -95,6 +95,9 @@ func (s *Stream) Copy(n int) []*Stream {
// Slice produces a slice of the stream, and locks the input
func (s *Stream) Slice(start, end int) *Stream {
if end < start {
return nil
}
if !s.Lock() {
return nil
}

View file

@ -0,0 +1,140 @@
package frameserver
import (
"git.gammaspectra.live/S.O.N.G/Ignite/decoder/y4m"
"git.gammaspectra.live/S.O.N.G/Ignite/frame"
"io"
"sync"
)
type FrameServer struct {
readerLock sync.Mutex
decoder *y4m.Decoder
}
func New(reader io.ReadSeeker, cachedFrameSeekTable []int64) *FrameServer {
s := &FrameServer{}
var err error
settings := make(map[string]any)
settings["seek_table"] = cachedFrameSeekTable
if s.decoder, err = y4m.NewDecoder(reader, settings); err != nil {
return nil
}
return s
}
func (s *FrameServer) GetFrameSeekTable() []int64 {
s.readerLock.Lock()
defer s.readerLock.Unlock()
return s.decoder.GetFrameSeekTable()
}
// GetAllFrames gets all frames from start to end. Locks on every iteration. See GetAllFramesLock
func (s *FrameServer) GetAllFrames() *frame.Stream {
frameNumber := 0
stream, channel := frame.NewStream(s.decoder.Properties())
go func() {
defer close(channel)
for {
if f, err := s.GetFrame(frameNumber); err != nil {
return
} else {
channel <- f
}
frameNumber++
}
}()
return stream
}
// GetAllFramesLock gets all frames from start to end. Locks on every iteration. See GetAllFramesLock
func (s *FrameServer) GetAllFramesLock() *frame.Stream {
s.readerLock.Lock()
defer s.readerLock.Unlock()
frameNumber := 0
if err := s.decoder.SeekToFrame(frameNumber); err != nil {
return nil
} else {
stream, channel := frame.NewStream(s.decoder.Properties())
go func() {
defer close(channel)
for {
if f, err := s.decoder.Decode(); err != nil {
return
} else {
channel <- f
}
frameNumber++
}
}()
return stream
}
}
// GetFrames gets a range of frames, like a slice [0, 1] will return just one frame.Frame. Locks on every iteration. See GetFramesLock
func (s *FrameServer) GetFrames(startFrameNumber, endFrameNumber int) *frame.Stream {
if endFrameNumber < startFrameNumber {
return nil
}
frameNumber := startFrameNumber
stream, channel := frame.NewStream(s.decoder.Properties())
go func() {
defer close(channel)
for frameNumber < endFrameNumber {
if f, err := s.GetFrame(frameNumber); err != nil {
return
} else {
channel <- f
}
frameNumber++
}
}()
return stream
}
// GetFramesLock gets a range of frames, but locks during all of them. See GetFrames
func (s *FrameServer) GetFramesLock(startFrameNumber, endFrameNumber int) *frame.Stream {
if endFrameNumber < startFrameNumber {
return nil
}
s.readerLock.Lock()
defer s.readerLock.Unlock()
frameNumber := startFrameNumber
//todo: spawn new frame.Stream or just use GetFrame()?
if err := s.decoder.SeekToFrame(frameNumber); err != nil {
return nil
} else {
stream, channel := frame.NewStream(s.decoder.Properties())
go func() {
defer close(channel)
for frameNumber < endFrameNumber {
if f, err := s.decoder.Decode(); err != nil {
return
} else {
channel <- f
}
frameNumber++
}
}()
return stream
}
}
func (s *FrameServer) GetFrame(frameNumber int) (f frame.Frame, err error) {
s.readerLock.Lock()
defer s.readerLock.Unlock()
if err = s.decoder.SeekToFrame(frameNumber); err != nil {
return nil, err
} else {
return s.decoder.Decode()
}
}

View file

@ -0,0 +1,91 @@
package frameserver
import (
"git.gammaspectra.live/S.O.N.G/Ignite/frame"
"os"
"runtime"
"sync"
)
// Pool automatically grows and shrinks as-needed based on how many frame requests come in.
type Pool struct {
cachedFrameSeekTable []int64
seekTableLock sync.RWMutex
sourcePath string
pool sync.Pool
}
func NewPool(sourcePath string, cachedFrameSeekTable []int64) *Pool {
p := &Pool{
sourcePath: sourcePath,
cachedFrameSeekTable: cachedFrameSeekTable,
}
p.pool.New = func() any {
if f, err := os.Open(p.sourcePath); err != nil {
return nil
} else {
p.seekTableLock.RLock()
defer p.seekTableLock.RUnlock()
if s := New(f, p.cachedFrameSeekTable); s != nil {
runtime.SetFinalizer(s, func(s *FrameServer) {
f.Close()
})
return s
} else {
return nil
}
}
}
return p
}
func (p *Pool) getServer() *FrameServer {
if a := p.pool.Get(); a == nil {
return nil
} else {
return a.(*FrameServer)
}
}
func (p *Pool) GetFrameSeekTable() []int64 {
p.seekTableLock.RLock()
defer p.seekTableLock.RUnlock()
return p.cachedFrameSeekTable
}
func (p *Pool) putServer(s *FrameServer) {
if func() bool {
p.seekTableLock.RLock()
defer p.seekTableLock.RUnlock()
return len(p.cachedFrameSeekTable) < len(s.GetFrameSeekTable())
}() {
//cache longer table
p.seekTableLock.Lock()
defer p.seekTableLock.Unlock()
//avoid races overwriting things
if len(p.cachedFrameSeekTable) < len(s.GetFrameSeekTable()) {
p.cachedFrameSeekTable = s.GetFrameSeekTable()
}
}
p.pool.Put(s)
}
func (p *Pool) GetAllFrames() *frame.Stream {
if s := p.getServer(); s == nil {
return nil
} else {
defer p.putServer(s)
return s.GetAllFramesLock()
}
}
func (p *Pool) GetFrames(startFrameNumber, endFrameNumber int) *frame.Stream {
if s := p.getServer(); s == nil {
return nil
} else {
defer p.putServer(s)
return s.GetFramesLock(startFrameNumber, endFrameNumber)
}
}