increase buffer on RangeReadSeekCloser, synchronize read + buffer teardowns
All checks were successful
continuous-integration/drone/push Build is passing
All checks were successful
continuous-integration/drone/push Build is passing
This commit is contained in:
parent
2bdecb90cf
commit
f47ba3da89
92
http.go
92
http.go
|
@ -6,20 +6,21 @@ import (
|
|||
"io"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"runtime"
|
||||
"strconv"
|
||||
"sync"
|
||||
)
|
||||
|
||||
const rangeReaderBufferSize = 4096
|
||||
const rangeReaderBufferSize = 1024 * 16
|
||||
|
||||
type RangeReadSeekCloser struct {
|
||||
uri *url.URL
|
||||
size int64
|
||||
i int64
|
||||
body io.ReadCloser
|
||||
buf []byte
|
||||
ib int64
|
||||
closed bool
|
||||
uri *url.URL
|
||||
size int64
|
||||
readOffset int64
|
||||
body io.ReadCloser
|
||||
bodyLock sync.RWMutex
|
||||
buf []byte
|
||||
bufferOffset int64
|
||||
closed bool
|
||||
}
|
||||
|
||||
func NewRangeReadSeekCloser(uri string) (*RangeReadSeekCloser, error) {
|
||||
|
@ -29,7 +30,6 @@ func NewRangeReadSeekCloser(uri string) (*RangeReadSeekCloser, error) {
|
|||
}
|
||||
r := &RangeReadSeekCloser{
|
||||
uri: parsedUrl,
|
||||
buf: make([]byte, 0, rangeReaderBufferSize),
|
||||
}
|
||||
|
||||
if err = r.getInformation(); err != nil {
|
||||
|
@ -44,12 +44,14 @@ func (r *RangeReadSeekCloser) GetURI() string {
|
|||
}
|
||||
|
||||
func (r *RangeReadSeekCloser) retryConnect() error {
|
||||
r.bodyLock.Lock()
|
||||
defer r.bodyLock.Unlock()
|
||||
if r.closed {
|
||||
return errors.New("already closed")
|
||||
}
|
||||
|
||||
headers := make(http.Header)
|
||||
startOffset := r.i
|
||||
startOffset := r.readOffset
|
||||
|
||||
expectedLength := r.size - startOffset
|
||||
headers.Set("Range", fmt.Sprintf("bytes=%d-", startOffset))
|
||||
|
@ -63,7 +65,6 @@ func (r *RangeReadSeekCloser) retryConnect() error {
|
|||
return err
|
||||
}
|
||||
r.body = response.Body
|
||||
runtime.SetFinalizer(r.body, io.ReadCloser.Close)
|
||||
|
||||
if response.StatusCode != http.StatusPartialContent {
|
||||
r.body.Close()
|
||||
|
@ -88,23 +89,37 @@ func (r *RangeReadSeekCloser) retryConnect() error {
|
|||
}
|
||||
|
||||
func (r *RangeReadSeekCloser) Read(p []byte) (n int, err error) {
|
||||
r.bodyLock.RLock()
|
||||
defer r.bodyLock.RUnlock()
|
||||
|
||||
for retry := 0; retry < 2; retry++ {
|
||||
if r.i >= r.size {
|
||||
if r.readOffset >= r.size {
|
||||
return 0, io.EOF
|
||||
}
|
||||
|
||||
if r.i >= r.ib {
|
||||
bufStart := int(r.i - r.ib)
|
||||
bufEnd := bufStart + len(p)
|
||||
if bufEnd <= len(r.buf) {
|
||||
copy(p, r.buf[bufStart:bufEnd])
|
||||
r.i += int64(len(p))
|
||||
return len(p), nil
|
||||
if r.readOffset >= r.bufferOffset {
|
||||
bufStart := r.readOffset - r.bufferOffset
|
||||
|
||||
if bufStart <= int64(len(r.buf)) {
|
||||
bufEnd := bufStart + int64(len(p))
|
||||
if bufEnd >= int64(len(r.buf)) {
|
||||
bufEnd = int64(len(r.buf))
|
||||
}
|
||||
|
||||
if bufEnd-bufStart > 0 {
|
||||
copy(p, r.buf[bufStart:bufEnd])
|
||||
r.readOffset += bufEnd - bufStart
|
||||
return int(bufEnd - bufStart), nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if r.body == nil {
|
||||
if err = r.retryConnect(); err != nil {
|
||||
if err = func() error {
|
||||
r.bodyLock.RUnlock()
|
||||
defer r.bodyLock.RLock()
|
||||
return r.retryConnect()
|
||||
}(); err != nil {
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
@ -113,7 +128,7 @@ func (r *RangeReadSeekCloser) Read(p []byte) (n int, err error) {
|
|||
n, err = r.body.Read(data)
|
||||
if err != nil { //detect actual eof, not a disconnection
|
||||
if err == io.EOF || err == io.ErrUnexpectedEOF {
|
||||
if r.i+int64(n) != r.size {
|
||||
if r.readOffset+int64(n) != r.size {
|
||||
continue
|
||||
}
|
||||
} else {
|
||||
|
@ -121,7 +136,7 @@ func (r *RangeReadSeekCloser) Read(p []byte) (n int, err error) {
|
|||
}
|
||||
}
|
||||
r.buf = data[:n]
|
||||
r.ib = r.i
|
||||
r.bufferOffset = r.readOffset
|
||||
|
||||
readBytes := len(p)
|
||||
if n < readBytes {
|
||||
|
@ -129,7 +144,7 @@ func (r *RangeReadSeekCloser) Read(p []byte) (n int, err error) {
|
|||
}
|
||||
|
||||
copy(p[:readBytes], data[:readBytes])
|
||||
r.i += int64(readBytes)
|
||||
r.readOffset += int64(readBytes)
|
||||
return readBytes, nil
|
||||
}
|
||||
|
||||
|
@ -137,37 +152,44 @@ func (r *RangeReadSeekCloser) Read(p []byte) (n int, err error) {
|
|||
}
|
||||
|
||||
func (r *RangeReadSeekCloser) Seek(offset int64, whence int) (int64, error) {
|
||||
r.bodyLock.Lock()
|
||||
defer r.bodyLock.Unlock()
|
||||
|
||||
oldOffset := r.readOffset
|
||||
|
||||
switch whence {
|
||||
case io.SeekStart:
|
||||
r.i = offset
|
||||
r.readOffset = offset
|
||||
|
||||
case io.SeekCurrent:
|
||||
r.i += offset
|
||||
r.readOffset += offset
|
||||
|
||||
case io.SeekEnd:
|
||||
//todo: maybe without -1?
|
||||
r.i = (r.size - 1) - offset
|
||||
r.readOffset = (r.size - 1) - offset
|
||||
default:
|
||||
return 0, fmt.Errorf("unknown whence %d", whence)
|
||||
}
|
||||
|
||||
if r.i >= r.size {
|
||||
return r.i, io.EOF
|
||||
} else if r.i < 0 {
|
||||
return r.i, io.ErrUnexpectedEOF
|
||||
}
|
||||
|
||||
if r.i < r.ib || r.i >= r.ib+int64(len(r.buf)) {
|
||||
if oldOffset != r.readOffset {
|
||||
if r.body != nil {
|
||||
r.body.Close()
|
||||
}
|
||||
r.body = nil
|
||||
}
|
||||
|
||||
return r.i, nil
|
||||
if r.readOffset >= r.size {
|
||||
return r.readOffset, io.EOF
|
||||
} else if r.readOffset < 0 {
|
||||
return r.readOffset, io.ErrUnexpectedEOF
|
||||
}
|
||||
|
||||
return r.readOffset, nil
|
||||
}
|
||||
|
||||
func (r *RangeReadSeekCloser) Close() error {
|
||||
r.bodyLock.Lock()
|
||||
defer r.bodyLock.Unlock()
|
||||
if r.body != nil {
|
||||
r.closed = true
|
||||
defer func() {
|
||||
|
|
Loading…
Reference in a new issue