From 95dccc3e75b1049e222e884f0d0c68d4f1be0e1f Mon Sep 17 00:00:00 2001 From: WeebDataHoarder <57538841+WeebDataHoarder@users.noreply.github.com> Date: Sun, 6 Mar 2022 20:19:36 +0100 Subject: [PATCH] Use streaming HTTP reader implementation --- http.go | 129 +++++++++++++++++++++++++++++++++---------------------- queue.go | 21 ++++----- 2 files changed, 88 insertions(+), 62 deletions(-) diff --git a/http.go b/http.go index aca4a6c..7e5a6e1 100644 --- a/http.go +++ b/http.go @@ -4,18 +4,19 @@ import ( "errors" "fmt" "io" - "io/ioutil" "net/http" "net/url" + "runtime" "strconv" ) -const rangeReaderBufferSize = 1024 * 64 +const rangeReaderBufferSize = 4096 type RangeReadSeekCloser struct { uri *url.URL size int64 i int64 + body io.ReadCloser buf []byte ib int64 } @@ -41,41 +42,13 @@ func (r *RangeReadSeekCloser) GetURI() string { return r.uri.String() } -func (r *RangeReadSeekCloser) Read(p []byte) (n int, err error) { - if r.i >= r.size { - return 0, io.EOF - } +func (r *RangeReadSeekCloser) retryConnect() error { - if r.i >= r.ib { - bufStart := int(r.i - r.ib) - bufEnd := bufStart + len(p) - if bufEnd <= len(r.buf) { - copy(p, r.buf[bufStart:bufEnd]) - r.i += int64(len(p)) - return len(p), nil - } - } - - //TODO: EOF error / limit headers := make(http.Header) - startOffset := r.i - endOffset := r.i + int64(len(p)) - 1 - if endOffset-startOffset+1 < rangeReaderBufferSize { - endOffset = startOffset + rangeReaderBufferSize - 1 - } - if endOffset >= (r.size - 1) { - endOffset = r.size - 1 - } - expectedLength := endOffset - startOffset + 1 - - returnLength := int(endOffset - startOffset + 1) - if returnLength > len(p) { - returnLength = len(p) - } - - headers.Set("Range", fmt.Sprintf("bytes=%d-%d", startOffset, endOffset)) + expectedLength := r.size - startOffset + headers.Set("Range", fmt.Sprintf("bytes=%d-", startOffset)) response, err := http.DefaultClient.Do(&http.Request{ Method: "GET", URL: r.uri, @@ -83,38 +56,80 @@ func (r *RangeReadSeekCloser) Read(p []byte) (n int, err error) { }) if err != nil { - return 0, err + return err } - defer response.Body.Close() + r.body = response.Body + runtime.SetFinalizer(r.body, io.ReadCloser.Close) if response.StatusCode != http.StatusPartialContent { - return 0, fmt.Errorf("response status code %d != %d", response.StatusCode, http.StatusPartialContent) + r.body.Close() + r.body = nil + return fmt.Errorf("response status code %d != %d", response.StatusCode, http.StatusPartialContent) } contentLength, err := strconv.ParseInt(response.Header.Get("content-length"), 10, 0) if err != nil { - return 0, errors.New("server response does not have a valid Content-Length") + r.body.Close() + r.body = nil + return errors.New("server response does not have a valid Content-Length") } if contentLength != expectedLength { - return 0, fmt.Errorf("server returned %d bytes, expected %d", contentLength, expectedLength) + r.body.Close() + r.body = nil + return fmt.Errorf("server returned %d bytes, expected %d", contentLength, expectedLength) } - data, err := ioutil.ReadAll(response.Body) - if err != nil { - return 0, err + return nil +} + +func (r *RangeReadSeekCloser) Read(p []byte) (n int, err error) { + for retry := 0; retry < 2; retry++ { + if r.i >= r.size { + return 0, io.EOF + } + + if r.i >= r.ib { + bufStart := int(r.i - r.ib) + bufEnd := bufStart + len(p) + if bufEnd <= len(r.buf) { + copy(p, r.buf[bufStart:bufEnd]) + r.i += int64(len(p)) + return len(p), nil + } + } + + if r.body == nil { + if err = r.retryConnect(); err != nil { + continue + } + } + + data := make([]byte, rangeReaderBufferSize) + n, err = r.body.Read(data) + if err != nil { //detect actual eof, not a disconnection + if err == io.EOF || err == io.ErrUnexpectedEOF { + if r.i+int64(n) != r.size { + continue + } + } else { + continue + } + } + r.buf = data[:n] + r.ib = r.i + + readBytes := len(p) + if n < readBytes { + readBytes = n + } + + copy(p[:readBytes], data[:readBytes]) + r.i += int64(readBytes) + return readBytes, nil } - if len(data) != int(expectedLength) { - return 0, fmt.Errorf("read %d bytes, expected %d", len(data), expectedLength) - } - copy(p[:returnLength], data[:returnLength]) - - r.buf = data - r.ib = r.i - - r.i += int64(returnLength) - return returnLength, nil + return 0, errors.New("could not retry") } func (r *RangeReadSeekCloser) Seek(offset int64, whence int) (int64, error) { @@ -138,11 +153,21 @@ func (r *RangeReadSeekCloser) Seek(offset int64, whence int) (int64, error) { return r.i, io.ErrUnexpectedEOF } + if r.i < r.ib || r.i >= r.ib+int64(len(r.buf)) { + r.body.Close() + r.body = nil + } + return r.i, nil } func (r *RangeReadSeekCloser) Close() error { - //todo close? + if r.body != nil { + defer func() { + r.body = nil + }() + return r.body.Close() + } return nil } diff --git a/queue.go b/queue.go index 05146e3..e876515 100644 --- a/queue.go +++ b/queue.go @@ -38,6 +38,7 @@ type QueueTrackEntry struct { Artist string `json:"artist"` Art string `json:"art"` } + reader io.ReadSeekCloser source audio.Source original map[string]interface{} @@ -48,19 +49,16 @@ func (e *QueueTrackEntry) Load() error { return nil } - var reader io.ReadSeekCloser - if len(e.Path) > 4 && e.Path[:4] == "http" { s, err := NewRangeReadSeekCloser(e.Path) if err != nil { return err } - reader = s //close at end, TODO check if it runs runtime.SetFinalizer(s, (*RangeReadSeekCloser).Close) - reader = s + e.reader = s } else { f, err := os.Open(e.Path) if err != nil { @@ -69,26 +67,26 @@ func (e *QueueTrackEntry) Load() error { //close at end, TODO check if it runs runtime.SetFinalizer(f, (*os.File).Close) - reader = f + e.reader = f } - if reader == nil { + if e.reader == nil { return errors.New("could not find stream opener") } - meta, err := tag.ReadFrom(reader) + meta, err := tag.ReadFrom(e.reader) if err != nil { err = nil } - if _, err = reader.Seek(0, io.SeekStart); err != nil { + if _, err = e.reader.Seek(0, io.SeekStart); err != nil { return err } - decoders, err := guess.GetDecoders(reader, e.Path) + decoders, err := guess.GetDecoders(e.reader, e.Path) if err != nil { return err } - source, err := guess.Open(reader, decoders) + source, err := guess.Open(e.reader, decoders) if err != nil { return err } @@ -356,6 +354,9 @@ func (q *Queue) Remove(identifier audio.QueueIdentifier) bool { q.queue = append(q.queue[:i], q.queue[i+1:]...) q.mutex.Unlock() q.audioQueue.Remove(identifier) + if e.reader != nil { + e.reader.Close() + } return true } }