diff --git a/cli/encode-server/encode.go b/cli/encode-server/encode.go index 709b33a..af23f8a 100644 --- a/cli/encode-server/encode.go +++ b/cli/encode-server/encode.go @@ -111,6 +111,7 @@ func encodeFromReader(reader io.ReadCloser, job *Job, w http.ResponseWriter) { w.Header().Set("x-decoder-error", "") return } + f.Return() job.Status.Processed.Add(1) //log.Printf("[job %s] %d", job.Id, job.Status.Read.Load()) } diff --git a/decoder/libdav1d/libdav1d.go b/decoder/libdav1d/libdav1d.go index 28ad75f..d298a3f 100644 --- a/decoder/libdav1d/libdav1d.go +++ b/decoder/libdav1d/libdav1d.go @@ -39,7 +39,7 @@ type Decoder struct { closer sync.Once - bufPool sync.Pool + pool *frame.Pool } var dav1dVersion = fmt.Sprintf("dav1d %s", C.GoString(C.dav1d_version())) @@ -174,7 +174,7 @@ const ( planeV = 2 ) -func (d *Decoder) pictureToFrame() (frame.Frame, error) { +func (d *Decoder) pictureToFrame() (f frame.Frame, err error) { properties := d.properties.FrameProperties() @@ -225,66 +225,37 @@ func (d *Decoder) pictureToFrame() (frame.Frame, error) { defer C.dav1d_picture_unref(&d.picture) + if d.pool == nil || properties != d.pool.Properties() { + //initialize pool to known size + d.pool, err = frame.NewPool(properties) + if err != nil { + return nil, err + } + } + + f = d.pool.Get(int64(d.picture.m.timestamp)) + + var yData, uData, vData []byte + if bitDepth > 8 { //16-bit - - yData := unsafe.Slice((*byte)(d.picture.data[planeY]), properties.Height*properties.Width*2) - var uData, vData []byte - + yData = unsafe.Slice((*byte)(d.picture.data[planeY]), properties.Height*properties.Width*2) if d.picture.p.layout != C.DAV1D_PIXEL_LAYOUT_I400 { uData = unsafe.Slice((*byte)(d.picture.data[planeU]), chromaHeight*chromaWidth*2) vData = unsafe.Slice((*byte)(d.picture.data[planeV]), chromaHeight*chromaWidth*2) } - - n := len(yData) + len(uData) + len(vData) - - var buf []byte - if b := d.bufPool.Get(); b != nil && len(b.([]byte)) == n { - buf = b.([]byte) - } else { - buf = make([]byte, n) - } - - copy(buf, yData) - copy(buf[len(yData):], uData) - copy(buf[len(yData)+len(uData):], vData) - - if f, err := frame.NewUint16FrameFromBytes(properties, int64(d.picture.m.timestamp), buf); err != nil { - return nil, err - } else { - d.bufPool.Put(buf) - return f, nil - } } else { - yData := unsafe.Slice((*byte)(d.picture.data[planeY]), properties.Height*properties.Width) - var uData, vData []byte - + yData = unsafe.Slice((*byte)(d.picture.data[planeY]), properties.Height*properties.Width) if d.picture.p.layout != C.DAV1D_PIXEL_LAYOUT_I400 { uData = unsafe.Slice((*byte)(d.picture.data[planeU]), chromaHeight*chromaWidth) vData = unsafe.Slice((*byte)(d.picture.data[planeV]), chromaHeight*chromaWidth) } - - n := len(yData) + len(uData) + len(vData) - - var buf []byte - if b := d.bufPool.Get(); b != nil && len(b.([]byte)) == n { - buf = b.([]byte) - } else { - buf = make([]byte, n) - } - - copy(buf, yData) - copy(buf[len(yData):], uData) - copy(buf[len(yData)+len(uData):], vData) - - if f, err := frame.NewUint8FrameFromBytes(properties, int64(d.picture.m.timestamp), buf); err != nil { - return nil, err - } else { - runtime.SetFinalizer(f, func(frameUint8 *frame.FrameUint8) { - d.bufPool.Put(buf) - }) - return f, nil - } } + + copy(f.GetLuma(), yData) + copy(f.GetCb(), uData) + copy(f.GetCr(), vData) + + return f, nil } func (d *Decoder) DecodeStream() *frame.Stream { diff --git a/decoder/libdav1d/libdav1d_test.go b/decoder/libdav1d/libdav1d_test.go index ea6aa4f..f241ab4 100644 --- a/decoder/libdav1d/libdav1d_test.go +++ b/decoder/libdav1d/libdav1d_test.go @@ -35,6 +35,7 @@ func testDecode(sample testdata.TestSample, t *testing.T) { if decoded%50 == 0 { t.Logf("%d/%d", decoded, sample.Frames) } + decodedFrame.Return() } if decoded != sample.Frames { diff --git a/decoder/y4m/y4m.go b/decoder/y4m/y4m.go index 1f21068..1edc4e8 100644 --- a/decoder/y4m/y4m.go +++ b/decoder/y4m/y4m.go @@ -8,10 +8,8 @@ import ( "git.gammaspectra.live/S.O.N.G/Ignite/utilities" "github.com/ulikunitz/xz" "io" - "runtime" "strconv" "strings" - "sync" ) type Decoder struct { @@ -28,7 +26,7 @@ type Decoder struct { frameCounter int timecodes utilities.Timecodes - bufPool sync.Pool + pool *frame.Pool } type Parameter byte @@ -59,19 +57,16 @@ func NewDecoder(reader io.Reader, settings map[string]any) (*Decoder, error) { r: reader, parameters: make(map[Parameter][]string), } + var err error - if err := s.readHeader(); err != nil { + if err = s.readHeader(); err != nil { return nil, err } - if err := s.parseParameters(); err != nil { + if err = s.parseParameters(); err != nil { return nil, err } - s.bufPool.New = func() any { - return make([]byte, s.frameSize) - } - if t, ok := settings["seek_table"]; ok { if table, ok := t.([]int64); ok { s.frameSeekTable = table @@ -96,6 +91,11 @@ func NewDecoder(reader io.Reader, settings map[string]any) (*Decoder, error) { s.properties.VFR = s.IsVFR() + s.pool, err = frame.NewPool(s.properties.FrameProperties()) + if err != nil { + return nil, err + } + return s, nil } @@ -204,34 +204,13 @@ func (s *Decoder) GetFrame() (parameters map[Parameter][]string, frameObject fra s.frameSeekTable[s.frameCounter] = index } - var buf []byte - if buf, err = s.readFrameData(); err != nil { - s.bufPool.Put(buf) - return nil, nil, err - } - pts := s.FramePTS(s.frameCounter) if pts == -1 { return nil, nil, fmt.Errorf("frame %d PTS could not be calculated", s.frameCounter) } - if s.properties.ColorSpace.BitDepth > 8 { - //it's copied below - defer s.bufPool.Put(buf) - if frameObject, err = frame.NewUint16FrameFromBytes(s.properties.FrameProperties(), pts, buf); err != nil { - return nil, nil, err - } - } else { - if f8, err := frame.NewUint8FrameFromBytes(s.properties.FrameProperties(), pts, buf); err != nil { - s.bufPool.Put(buf) - return nil, nil, err - } else { - frameObject = f8 - runtime.SetFinalizer(f8, func(f *frame.FrameUint8) { - //return buffer to pool once top frame is not in use - s.bufPool.Put(buf) - }) - } + if frameObject, err = s.readFrameData(pts); err != nil { + return nil, nil, err } s.frameCounter++ @@ -316,11 +295,10 @@ func (s *Decoder) readFrameHeader() (parameters map[Parameter][]string, err erro return parameters, nil } -func (s *Decoder) readFrameData() (buf []byte, err error) { - //TODO: reuse buffers, maybe channel? - buf = s.bufPool.Get().([]byte) - _, err = io.ReadFull(s.r, buf) - return buf, err +func (s *Decoder) readFrameData(pts int64) (f frame.Frame, err error) { + f = s.pool.Get(pts) + _, err = io.ReadFull(s.r, f.GetJoint()) + return f, err } func (s *Decoder) parseParameters() (err error) { diff --git a/decoder/y4m/y4m_test.go b/decoder/y4m/y4m_test.go index 2b7f3ea..523fc78 100644 --- a/decoder/y4m/y4m_test.go +++ b/decoder/y4m/y4m_test.go @@ -1,8 +1,10 @@ package y4m import ( + "errors" "git.gammaspectra.live/S.O.N.G/Ignite/frame" "git.gammaspectra.live/S.O.N.G/Ignite/testdata" + "io" "testing" ) @@ -29,6 +31,9 @@ func testDecode(sample testdata.TestSample, t *testing.T) { } else { defer y4m.Close() decoded := 0 + var lastPts int64 + + mod := sample.Frames / 5 var frameProperties frame.Properties for decodedFrame := range y4m.DecodeStream().Channel() { @@ -37,15 +42,26 @@ func testDecode(sample testdata.TestSample, t *testing.T) { } //ingest decoded++ - if decoded%50 == 0 { + if decoded%mod == 0 { t.Logf("%d/%d", decoded, sample.Frames) } + lastPts = decodedFrame.PTS() + decodedFrame.Return() + } + if decoded%mod != 0 { + t.Logf("%d/%d", decoded, sample.Frames) } if decoded != sample.Frames { t.Fatalf("expected %d frames, got %d", sample.Frames, decoded) } + duration := y4m.Properties().TimeBase().PTSToDuration(lastPts) + + if y4m.Properties().TimeBase().PTSToDuration(lastPts) != sample.Duration() { + t.Fatalf("expected %s duration, got %s", sample.Duration(), duration) + } + if frameProperties.Width != sample.Width { t.Fatalf("expected %d width, got %d", sample.Width, frameProperties.Width) } @@ -79,3 +95,94 @@ func TestDecode_YUV420_2160p60_10bit(t *testing.T) { func TestDecode_YUV420_360p24_8bit_xz(t *testing.T) { testDecode(testdata.Y4M_Big_Buck_Bunny_360p24_YUV420_8bit, t) } + +func testBench(sample testdata.TestSample, b *testing.B) { + b.ReportAllocs() + reader, err := sample.Open(b) + if err != nil { + b.Fatal(err) + } + defer func() { + reader.Close() + }() + + var y4m *Decoder + + init := func() { + if y4m != nil { + y4m.Close() + } + + if rsc, ok := reader.(io.ReadSeekCloser); ok { + _, err = rsc.Seek(0, io.SeekStart) + if err != nil { + b.Fatal(err) + } + } else { + reader.Close() + reader, err = sample.Open(b) + if err != nil { + b.Fatal(err) + } + } + + switch sample.Type { + case "y4m": + y4m, err = NewDecoder(reader, nil) + case "y4m.xz": + y4m, err = NewXZCompressedDecoder(reader, nil) + default: + b.Fatal("unsupported sample type") + } + if err != nil { + b.Fatal(err) + } + } + + var bytesDecoded, framesDecoded int + + init() + b.ResetTimer() + for i := 0; i < b.N; i++ { + decodedFrame, err := y4m.Decode() + if err != nil { + b.StopTimer() + if errors.Is(err, io.EOF) { + //reset to start + init() + b.StartTimer() + continue + } else { + b.Fatal(err) + } + } + framesDecoded++ + bytesDecoded += len(decodedFrame.GetJoint()) + b.SetBytes(int64(len(decodedFrame.GetJoint()))) + decodedFrame.Return() + } + b.StopTimer() + + y4m.Close() + b.ReportMetric(float64(framesDecoded)/b.Elapsed().Seconds(), "frames/s") +} + +func BenchmarkDecode_YUV420_720p24_8bit(b *testing.B) { + testBench(testdata.Y4M_Sintel_Trailer_720p24_YUV420_8bit, b) +} + +func BenchmarkDecode_YUV444_720p50_8bit(b *testing.B) { + testBench(testdata.Y4M_Ducks_Take_Off_720p50_YUV444_8bit, b) +} + +func BenchmarkDecode_YUV422_720p50_8bit(b *testing.B) { + testBench(testdata.Y4M_Ducks_Take_Off_720p50_YUV422_8bit, b) +} + +func BenchmarkDecode_YUV420_2160p60_10bit(b *testing.B) { + testBench(testdata.Y4M_Netflix_FoodMarket_2160p60_YUV420_10bit, b) +} + +func BenchmarkDecode_YUV420_360p24_8bit_xz(b *testing.B) { + testBench(testdata.Y4M_Big_Buck_Bunny_360p24_YUV420_8bit, b) +} diff --git a/frame/frame.go b/frame/frame.go index e8afa4e..fe5ba63 100644 --- a/frame/frame.go +++ b/frame/frame.go @@ -19,6 +19,16 @@ type Frame interface { // Get8 get a pixel sample in 8-bit depth Get8(x, y int) (Y uint8, Cb uint8, Cr uint8) + + GetLuma() []byte + GetCb() []byte + GetCr() []byte + // GetJoint Gets Luma+Cb+Cr slice. Do not keep references to this slice, copy instead. + // This is unsafe + GetJoint() []byte + + // Return Finishes using this frame and marks it for reuse + Return() } type TypedFrame[T AllowedFrameTypes] interface { @@ -33,6 +43,12 @@ type TypedFrame[T AllowedFrameTypes] interface { GetNativeCb() []T // GetNativeCr also known as V. Do not keep references to this slice, copy instead. GetNativeCr() []T + // GetNativeJoint Gets Luma+Cb+Cr slice. Do not keep references to this slice, copy instead. + GetNativeJoint() []T + + FillNativeLuma([]T) + FillNativeCb([]T) + FillNativeCr([]T) } type Properties struct { diff --git a/frame/frame_uint16.go b/frame/frame_uint16.go index 1385417..afc7254 100644 --- a/frame/frame_uint16.go +++ b/frame/frame_uint16.go @@ -8,12 +8,15 @@ import ( type FrameUint16 struct { properties Properties + ret func(f Frame) Pts int64 Y []uint16 Cb []uint16 Cr []uint16 } +// NewUint16FrameFromBytes +// Deprecated func NewUint16FrameFromBytes(properties Properties, pts int64, data []byte) (*FrameUint16, error) { if frameLength, _ := properties.ColorSpace.FrameSize(properties.Width, properties.Height); frameLength != len(data) { return nil, errors.New("wrong length of data") @@ -74,6 +77,43 @@ func (i *FrameUint16) GetNative(x, y int) (Y uint16, Cb uint16, Cr uint16) { return } +func (i *FrameUint16) FillNativeLuma(buf []uint16) { + copy(i.Y, buf) +} + +func (i *FrameUint16) FillNativeCb(buf []uint16) { + copy(i.Cb, buf) +} + +func (i *FrameUint16) FillNativeCr(buf []uint16) { + copy(i.Cr, buf) +} + +func (i *FrameUint16) GetNativeJoint() []uint16 { + // Component slices are allocated as a single buffer + return i.Y[:len(i.Y)+len(i.Cb)+len(i.Cr)] +} + +func (i *FrameUint16) GetJoint() []byte { + buf := i.GetNativeJoint() + return unsafe.Slice((*byte)(unsafe.Pointer(unsafe.SliceData(buf))), len(buf)*2) +} + +func (i *FrameUint16) GetLuma() []byte { + buf := i.GetNativeLuma() + return unsafe.Slice((*byte)(unsafe.Pointer(unsafe.SliceData(buf))), len(buf)*2) +} + +func (i *FrameUint16) GetCb() []byte { + buf := i.GetNativeCb() + return unsafe.Slice((*byte)(unsafe.Pointer(unsafe.SliceData(buf))), len(buf)*2) +} + +func (i *FrameUint16) GetCr() []byte { + buf := i.GetNativeCr() + return unsafe.Slice((*byte)(unsafe.Pointer(unsafe.SliceData(buf))), len(buf)*2) +} + func (i *FrameUint16) GetNativeLuma() []uint16 { return i.Y } @@ -85,3 +125,9 @@ func (i *FrameUint16) GetNativeCb() []uint16 { func (i *FrameUint16) GetNativeCr() []uint16 { return i.Cr } + +func (i *FrameUint16) Return() { + if i.ret != nil { + i.ret(i) + } +} diff --git a/frame/frame_uint8.go b/frame/frame_uint8.go index dfd490b..ae6bbe3 100644 --- a/frame/frame_uint8.go +++ b/frame/frame_uint8.go @@ -6,12 +6,15 @@ import ( type FrameUint8 struct { properties Properties + ret func(f Frame) Pts int64 Y []uint8 Cb []uint8 Cr []uint8 } +// NewUint8FrameFromBytes +// Deprecated func NewUint8FrameFromBytes(properties Properties, pts int64, data []byte) (*FrameUint8, error) { if frameLength, _ := properties.ColorSpace.FrameSize(properties.Width, properties.Height); frameLength != len(data) { return nil, errors.New("wrong length of data") @@ -66,6 +69,18 @@ func (i *FrameUint8) GetNative(x, y int) (Y uint8, Cb uint8, Cr uint8) { return } +func (i *FrameUint8) FillNativeLuma(buf []uint8) { + copy(i.Y, buf) +} + +func (i *FrameUint8) FillNativeCb(buf []uint8) { + copy(i.Cb, buf) +} + +func (i *FrameUint8) FillNativeCr(buf []uint8) { + copy(i.Cr, buf) +} + func (i *FrameUint8) GetNativeLuma() []uint8 { return i.Y } @@ -77,3 +92,30 @@ func (i *FrameUint8) GetNativeCb() []uint8 { func (i *FrameUint8) GetNativeCr() []uint8 { return i.Cr } + +func (i *FrameUint8) GetNativeJoint() []uint8 { + // Component slices are allocated as a single buffer + return i.Y[:len(i.Y)+len(i.Cb)+len(i.Cr)] +} + +func (i *FrameUint8) GetJoint() []byte { + return i.GetNativeJoint() +} + +func (i *FrameUint8) GetLuma() []byte { + return i.GetNativeLuma() +} + +func (i *FrameUint8) GetCb() []byte { + return i.GetNativeCb() +} + +func (i *FrameUint8) GetCr() []byte { + return i.GetNativeCr() +} + +func (i *FrameUint8) Return() { + if i.ret != nil { + i.ret(i) + } +} diff --git a/frame/pool.go b/frame/pool.go new file mode 100644 index 0000000..3ac62bf --- /dev/null +++ b/frame/pool.go @@ -0,0 +1,100 @@ +package frame + +import ( + "errors" + "sync" +) + +type Pool struct { + p sync.Pool + properties Properties + frameSize int +} + +func NewPool(properties Properties) (*Pool, error) { + p := &Pool{ + properties: properties, + } + + if properties.ColorSpace.BitDepth > 16 || properties.ColorSpace.BitDepth <= 0 { + return nil, errors.New("unsupported bit depth") + } + + frameSize, err := properties.ColorSpace.FrameSize(properties.Width, properties.Height) + if err != nil { + return nil, err + } + + p.frameSize = frameSize + + iY := properties.ColorSpace.ChromaSampling.PlaneLumaSamples(properties.Width, properties.Height) + iCb := properties.ColorSpace.ChromaSampling.PlaneCbSamples(properties.Width, properties.Height) + iCr := properties.ColorSpace.ChromaSampling.PlaneCrSamples(properties.Width, properties.Height) + + // 16-bit frame + if properties.ColorSpace.BitDepth > 8 { + p.p.New = func() any { + buf := make([]uint16, p.frameSize/2) + return &FrameUint16{ + ret: p.Put, + properties: properties, + Y: buf[:iY], + Cb: buf[iY : iY+iCb], + Cr: buf[iY+iCb : iY+iCb+iCr], + } + } + } else { + p.p.New = func() any { + buf := make([]uint8, p.frameSize) + return &FrameUint8{ + ret: p.Put, + properties: properties, + Y: buf[:iY], + Cb: buf[iY : iY+iCb], + Cr: buf[iY+iCb : iY+iCb+iCr], + } + } + } + + return p, nil +} + +func (p *Pool) Properties() Properties { + return p.properties +} + +func (p *Pool) Get(pts int64) Frame { + switch tf := p.p.Get().(type) { + case *FrameUint16: + tf.Pts = pts + return tf + case *FrameUint8: + tf.Pts = pts + return tf + default: + panic("unsupported type") + } +} + +func (p *Pool) Put(f Frame) { + switch tf := f.(type) { + case *FrameUint16: + if tf.properties != p.properties { + panic("unsupported properties") + } + if (len(tf.Y)+len(tf.Cb)+len(tf.Cr))*2 != p.frameSize { + panic("unsupported data size") + } + p.p.Put(tf) + case *FrameUint8: + if tf.properties != p.properties { + panic("unsupported properties") + } + if (len(tf.Y) + len(tf.Cb) + len(tf.Cr)) != p.frameSize { + panic("unsupported data size") + } + p.p.Put(tf) + default: + panic("unsupported type") + } +} diff --git a/testdata/testdata.go b/testdata/testdata.go index 1bbaeb0..bc69409 100644 --- a/testdata/testdata.go +++ b/testdata/testdata.go @@ -2,12 +2,13 @@ package testdata import ( "git.gammaspectra.live/S.O.N.G/Ignite/color" + "git.gammaspectra.live/S.O.N.G/Ignite/utilities" "io" "net/http" "os" "path" "runtime" - "testing" + "time" ) func init() { @@ -26,11 +27,16 @@ type TestSample struct { Type string Width, Height int Frames int + TimeBase utilities.Ratio ColorSpace color.Space SkipNotFound bool } -func (sample *TestSample) Open(t *testing.T) (io.ReadCloser, error) { +func (sample *TestSample) Duration() time.Duration { + return sample.TimeBase.PTSToDuration(int64(sample.Frames - 1)) +} + +func (sample *TestSample) Open(t TestRunner) (io.ReadCloser, error) { var reader io.ReadCloser var err error @@ -54,6 +60,11 @@ func (sample *TestSample) Open(t *testing.T) (io.ReadCloser, error) { return reader, nil } +type TestRunner interface { + Skip(args ...any) + Fatal(args ...any) +} + var ( Y4M_Sintel_Trailer_720p24_YUV420_8bit = TestSample{ Path: "testdata/sintel_trailer_2k_720p24.y4m", @@ -63,6 +74,7 @@ var ( Width: 1280, Height: 720, Frames: 1253, + TimeBase: utilities.NewRatio(1, 24), ColorSpace: color.MustColorFormatFromString("420jpeg"), } Y4M_Big_Buck_Bunny_360p24_YUV420_8bit = TestSample{ @@ -72,6 +84,7 @@ var ( Width: 640, Height: 360, Frames: 14315, + TimeBase: utilities.NewRatio(1, 24), ColorSpace: color.MustColorFormatFromString("420jpeg"), SkipNotFound: true, } @@ -83,6 +96,7 @@ var ( Width: 1280, Height: 720, Frames: 500, + TimeBase: utilities.NewRatio(1, 50), ColorSpace: color.MustColorFormatFromString("444p8"), } Y4M_Ducks_Take_Off_720p50_YUV422_8bit = TestSample{ @@ -93,6 +107,7 @@ var ( Width: 1280, Height: 720, Frames: 500, + TimeBase: utilities.NewRatio(1, 50), ColorSpace: color.MustColorFormatFromString("422p8"), } Y4M_Netflix_FoodMarket_2160p60_YUV420_10bit = TestSample{ @@ -102,6 +117,7 @@ var ( Width: 4096, Height: 2160, Frames: 600, + TimeBase: utilities.NewRatio(1, 60), ColorSpace: color.MustColorFormatFromString("420p10"), SkipNotFound: true, } @@ -113,6 +129,7 @@ var ( Width: 1280, Height: 720, Frames: 1253, + TimeBase: utilities.NewRatio(1, 24), ColorSpace: color.MustColorFormatFromString("420jpeg"), } AV1_Netflix_Sol_Levante_2160p24_YUV444_12bit_Lossy = TestSample{ @@ -122,6 +139,7 @@ var ( Width: 3840, Height: 2160, Frames: 6313, + TimeBase: utilities.NewRatio(1, 24), ColorSpace: color.MustColorFormatFromString("444p12"), } ) diff --git a/utilities/ratio.go b/utilities/ratio.go index f8839c4..a7cd840 100644 --- a/utilities/ratio.go +++ b/utilities/ratio.go @@ -4,6 +4,7 @@ import ( "fmt" "gopkg.in/yaml.v3" "math" + "time" ) type Ratio struct { @@ -15,6 +16,10 @@ func (r Ratio) Float64() float64 { return float64(r.Numerator) / float64(r.Denominator) } +func (r Ratio) PTSToDuration(pts int64) time.Duration { + return (time.Duration(pts) * time.Second * time.Duration(r.Numerator)) / time.Duration(r.Denominator) +} + func (r *Ratio) UnmarshalJSON(buf []byte) error { _, err := fmt.Sscanf(string(buf), "\"%d:%d\"", &r.Numerator, &r.Denominator) return err