diff --git a/decoder/y4m/y4m.go b/decoder/y4m/y4m.go index 54d6417..99a6997 100644 --- a/decoder/y4m/y4m.go +++ b/decoder/y4m/y4m.go @@ -21,6 +21,8 @@ type Decoder struct { frameSize int + frameStartOffset int64 + frameCounter int bufPool sync.Pool @@ -73,6 +75,10 @@ func (s *Decoder) Properties() frame.StreamProperties { } func (s *Decoder) SeekToFrame(frameNumber int) (err error) { + if s.frameCounter == frameNumber { + return nil + } + if seeker, ok := s.r.(io.Seeker); ok { if frameNumber >= 0 && len(s.frameSeekTable) > frameNumber && s.frameSeekTable[frameNumber] != 0 { if _, err = seeker.Seek(s.frameSeekTable[frameNumber], io.SeekStart); err != nil { @@ -82,6 +88,27 @@ func (s *Decoder) SeekToFrame(frameNumber int) (err error) { s.frameCounter = frameNumber return nil + } else if frameNumber >= 0 && len(s.frameSeekTable) > 0 { + //attempt blind seek from last decoded frame + framesToSeekFromLast := frameNumber + 1 - len(s.frameSeekTable) + + if _, err = seeker.Seek(s.frameSeekTable[len(s.frameSeekTable)-1]+int64(5+1+s.frameSize)*int64(framesToSeekFromLast), io.SeekStart); err != nil { + return err + } + + s.frameCounter = frameNumber + + return nil + } else if frameNumber >= 0 && s.frameStartOffset != 0 { + //attempt full blind seek from start + if _, err = seeker.Seek(s.frameStartOffset+int64(5+1+s.frameSize)*int64(frameNumber), io.SeekStart); err != nil { + return err + } + + s.frameCounter = frameNumber + + return nil + } else { return errors.New("frameNumber out of range") } @@ -224,10 +251,11 @@ func (s *Decoder) readFrameHeader() (parameters map[Parameter][]string, err erro data = append(data, buf[0]) } - parameters = make(map[Parameter][]string) - for _, v := range strings.Split(string(data), " ") { if len(v) > 1 { + if parameters == nil { + parameters = make(map[Parameter][]string) + } if slice, ok := parameters[Parameter(v[0])]; ok { parameters[Parameter(v[0])] = append(slice, v[1:]) } else { @@ -310,6 +338,12 @@ func (s *Decoder) parseParameters() (err error) { s.frameSize, err = s.properties.ColorFormat.FrameSize(s.properties.Width, s.properties.Height) + if seeker, ok := s.r.(io.Seeker); ok { + if index, err := seeker.Seek(0, io.SeekCurrent); err == nil { + s.frameStartOffset = index + } + } + return err } diff --git a/frame/stream.go b/frame/stream.go index c7178ea..c192f44 100644 --- a/frame/stream.go +++ b/frame/stream.go @@ -95,6 +95,9 @@ func (s *Stream) Copy(n int) []*Stream { // Slice produces a slice of the stream, and locks the input func (s *Stream) Slice(start, end int) *Stream { + if end < start { + return nil + } if !s.Lock() { return nil } diff --git a/utilities/frameserver/frameserver.go b/utilities/frameserver/frameserver.go new file mode 100644 index 0000000..0e999fb --- /dev/null +++ b/utilities/frameserver/frameserver.go @@ -0,0 +1,140 @@ +package frameserver + +import ( + "git.gammaspectra.live/S.O.N.G/Ignite/decoder/y4m" + "git.gammaspectra.live/S.O.N.G/Ignite/frame" + "io" + "sync" +) + +type FrameServer struct { + readerLock sync.Mutex + decoder *y4m.Decoder +} + +func New(reader io.ReadSeeker, cachedFrameSeekTable []int64) *FrameServer { + s := &FrameServer{} + + var err error + settings := make(map[string]any) + settings["seek_table"] = cachedFrameSeekTable + if s.decoder, err = y4m.NewDecoder(reader, settings); err != nil { + return nil + } + + return s +} + +func (s *FrameServer) GetFrameSeekTable() []int64 { + s.readerLock.Lock() + defer s.readerLock.Unlock() + return s.decoder.GetFrameSeekTable() +} + +// GetAllFrames gets all frames from start to end. Locks on every iteration. See GetAllFramesLock +func (s *FrameServer) GetAllFrames() *frame.Stream { + frameNumber := 0 + + stream, channel := frame.NewStream(s.decoder.Properties()) + go func() { + defer close(channel) + for { + if f, err := s.GetFrame(frameNumber); err != nil { + return + } else { + channel <- f + } + frameNumber++ + } + }() + return stream +} + +// GetAllFramesLock gets all frames from start to end. Locks on every iteration. See GetAllFramesLock +func (s *FrameServer) GetAllFramesLock() *frame.Stream { + s.readerLock.Lock() + defer s.readerLock.Unlock() + + frameNumber := 0 + + if err := s.decoder.SeekToFrame(frameNumber); err != nil { + return nil + } else { + stream, channel := frame.NewStream(s.decoder.Properties()) + go func() { + defer close(channel) + for { + if f, err := s.decoder.Decode(); err != nil { + return + } else { + channel <- f + } + frameNumber++ + } + }() + return stream + } +} + +// GetFrames gets a range of frames, like a slice [0, 1] will return just one frame.Frame. Locks on every iteration. See GetFramesLock +func (s *FrameServer) GetFrames(startFrameNumber, endFrameNumber int) *frame.Stream { + if endFrameNumber < startFrameNumber { + return nil + } + frameNumber := startFrameNumber + + stream, channel := frame.NewStream(s.decoder.Properties()) + go func() { + defer close(channel) + for frameNumber < endFrameNumber { + if f, err := s.GetFrame(frameNumber); err != nil { + return + } else { + channel <- f + } + frameNumber++ + } + }() + return stream +} + +// GetFramesLock gets a range of frames, but locks during all of them. See GetFrames +func (s *FrameServer) GetFramesLock(startFrameNumber, endFrameNumber int) *frame.Stream { + if endFrameNumber < startFrameNumber { + return nil + } + s.readerLock.Lock() + defer s.readerLock.Unlock() + + frameNumber := startFrameNumber + + //todo: spawn new frame.Stream or just use GetFrame()? + + if err := s.decoder.SeekToFrame(frameNumber); err != nil { + return nil + } else { + stream, channel := frame.NewStream(s.decoder.Properties()) + go func() { + defer close(channel) + for frameNumber < endFrameNumber { + if f, err := s.decoder.Decode(); err != nil { + return + } else { + channel <- f + } + frameNumber++ + } + }() + return stream + } +} + +func (s *FrameServer) GetFrame(frameNumber int) (f frame.Frame, err error) { + s.readerLock.Lock() + defer s.readerLock.Unlock() + if err = s.decoder.SeekToFrame(frameNumber); err != nil { + return nil, err + } else { + return s.decoder.Decode() + } +} diff --git a/utilities/frameserver/pool.go b/utilities/frameserver/pool.go new file mode 100644 index 0000000..abf474b --- /dev/null +++ b/utilities/frameserver/pool.go @@ -0,0 +1,91 @@ +package frameserver + +import ( + "git.gammaspectra.live/S.O.N.G/Ignite/frame" + "os" + "runtime" + "sync" +) + +// Pool automatically grows and shrinks as-needed based on how many frame requests come in. +type Pool struct { + cachedFrameSeekTable []int64 + seekTableLock sync.RWMutex + sourcePath string + pool sync.Pool +} + +func NewPool(sourcePath string, cachedFrameSeekTable []int64) *Pool { + p := &Pool{ + sourcePath: sourcePath, + cachedFrameSeekTable: cachedFrameSeekTable, + } + p.pool.New = func() any { + if f, err := os.Open(p.sourcePath); err != nil { + return nil + } else { + p.seekTableLock.RLock() + defer p.seekTableLock.RUnlock() + if s := New(f, p.cachedFrameSeekTable); s != nil { + runtime.SetFinalizer(s, func(s *FrameServer) { + f.Close() + }) + return s + } else { + return nil + } + } + } + + return p +} + +func (p *Pool) getServer() *FrameServer { + if a := p.pool.Get(); a == nil { + return nil + } else { + return a.(*FrameServer) + } +} + +func (p *Pool) GetFrameSeekTable() []int64 { + p.seekTableLock.RLock() + defer p.seekTableLock.RUnlock() + return p.cachedFrameSeekTable +} + +func (p *Pool) putServer(s *FrameServer) { + if func() bool { + p.seekTableLock.RLock() + defer p.seekTableLock.RUnlock() + + return len(p.cachedFrameSeekTable) < len(s.GetFrameSeekTable()) + }() { + //cache longer table + p.seekTableLock.Lock() + defer p.seekTableLock.Unlock() + //avoid races overwriting things + if len(p.cachedFrameSeekTable) < len(s.GetFrameSeekTable()) { + p.cachedFrameSeekTable = s.GetFrameSeekTable() + } + } + p.pool.Put(s) +} + +func (p *Pool) GetAllFrames() *frame.Stream { + if s := p.getServer(); s == nil { + return nil + } else { + defer p.putServer(s) + return s.GetAllFramesLock() + } +} + +func (p *Pool) GetFrames(startFrameNumber, endFrameNumber int) *frame.Stream { + if s := p.getServer(); s == nil { + return nil + } else { + defer p.putServer(s) + return s.GetFramesLock(startFrameNumber, endFrameNumber) + } +}