diff --git a/http.go b/http.go index 0b55b6f..b281fc8 100644 --- a/http.go +++ b/http.go @@ -6,20 +6,21 @@ import ( "io" "net/http" "net/url" - "runtime" "strconv" + "sync" ) -const rangeReaderBufferSize = 4096 +const rangeReaderBufferSize = 1024 * 16 type RangeReadSeekCloser struct { - uri *url.URL - size int64 - i int64 - body io.ReadCloser - buf []byte - ib int64 - closed bool + uri *url.URL + size int64 + readOffset int64 + body io.ReadCloser + bodyLock sync.RWMutex + buf []byte + bufferOffset int64 + closed bool } func NewRangeReadSeekCloser(uri string) (*RangeReadSeekCloser, error) { @@ -29,7 +30,6 @@ func NewRangeReadSeekCloser(uri string) (*RangeReadSeekCloser, error) { } r := &RangeReadSeekCloser{ uri: parsedUrl, - buf: make([]byte, 0, rangeReaderBufferSize), } if err = r.getInformation(); err != nil { @@ -44,12 +44,14 @@ func (r *RangeReadSeekCloser) GetURI() string { } func (r *RangeReadSeekCloser) retryConnect() error { + r.bodyLock.Lock() + defer r.bodyLock.Unlock() if r.closed { return errors.New("already closed") } headers := make(http.Header) - startOffset := r.i + startOffset := r.readOffset expectedLength := r.size - startOffset headers.Set("Range", fmt.Sprintf("bytes=%d-", startOffset)) @@ -63,7 +65,6 @@ func (r *RangeReadSeekCloser) retryConnect() error { return err } r.body = response.Body - runtime.SetFinalizer(r.body, io.ReadCloser.Close) if response.StatusCode != http.StatusPartialContent { r.body.Close() @@ -88,23 +89,37 @@ func (r *RangeReadSeekCloser) retryConnect() error { } func (r *RangeReadSeekCloser) Read(p []byte) (n int, err error) { + r.bodyLock.RLock() + defer r.bodyLock.RUnlock() + for retry := 0; retry < 2; retry++ { - if r.i >= r.size { + if r.readOffset >= r.size { return 0, io.EOF } - if r.i >= r.ib { - bufStart := int(r.i - r.ib) - bufEnd := bufStart + len(p) - if bufEnd <= len(r.buf) { - copy(p, r.buf[bufStart:bufEnd]) - r.i += int64(len(p)) - return len(p), nil + if r.readOffset >= r.bufferOffset { + bufStart := r.readOffset - r.bufferOffset + + if bufStart <= int64(len(r.buf)) { + bufEnd := bufStart + int64(len(p)) + if bufEnd >= int64(len(r.buf)) { + bufEnd = int64(len(r.buf)) + } + + if bufEnd-bufStart > 0 { + copy(p, r.buf[bufStart:bufEnd]) + r.readOffset += bufEnd - bufStart + return int(bufEnd - bufStart), nil + } } } if r.body == nil { - if err = r.retryConnect(); err != nil { + if err = func() error { + r.bodyLock.RUnlock() + defer r.bodyLock.RLock() + return r.retryConnect() + }(); err != nil { continue } } @@ -113,7 +128,7 @@ func (r *RangeReadSeekCloser) Read(p []byte) (n int, err error) { n, err = r.body.Read(data) if err != nil { //detect actual eof, not a disconnection if err == io.EOF || err == io.ErrUnexpectedEOF { - if r.i+int64(n) != r.size { + if r.readOffset+int64(n) != r.size { continue } } else { @@ -121,7 +136,7 @@ func (r *RangeReadSeekCloser) Read(p []byte) (n int, err error) { } } r.buf = data[:n] - r.ib = r.i + r.bufferOffset = r.readOffset readBytes := len(p) if n < readBytes { @@ -129,7 +144,7 @@ func (r *RangeReadSeekCloser) Read(p []byte) (n int, err error) { } copy(p[:readBytes], data[:readBytes]) - r.i += int64(readBytes) + r.readOffset += int64(readBytes) return readBytes, nil } @@ -137,37 +152,44 @@ func (r *RangeReadSeekCloser) Read(p []byte) (n int, err error) { } func (r *RangeReadSeekCloser) Seek(offset int64, whence int) (int64, error) { + r.bodyLock.Lock() + defer r.bodyLock.Unlock() + + oldOffset := r.readOffset + switch whence { case io.SeekStart: - r.i = offset + r.readOffset = offset case io.SeekCurrent: - r.i += offset + r.readOffset += offset case io.SeekEnd: //todo: maybe without -1? - r.i = (r.size - 1) - offset + r.readOffset = (r.size - 1) - offset default: return 0, fmt.Errorf("unknown whence %d", whence) } - if r.i >= r.size { - return r.i, io.EOF - } else if r.i < 0 { - return r.i, io.ErrUnexpectedEOF - } - - if r.i < r.ib || r.i >= r.ib+int64(len(r.buf)) { + if oldOffset != r.readOffset { if r.body != nil { r.body.Close() } r.body = nil } - return r.i, nil + if r.readOffset >= r.size { + return r.readOffset, io.EOF + } else if r.readOffset < 0 { + return r.readOffset, io.ErrUnexpectedEOF + } + + return r.readOffset, nil } func (r *RangeReadSeekCloser) Close() error { + r.bodyLock.Lock() + defer r.bodyLock.Unlock() if r.body != nil { r.closed = true defer func() {