Use streaming HTTP reader implementation
All checks were successful
continuous-integration/drone/push Build is passing
All checks were successful
continuous-integration/drone/push Build is passing
This commit is contained in:
parent
08574dbe05
commit
95dccc3e75
129
http.go
129
http.go
|
@ -4,18 +4,19 @@ import (
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"io/ioutil"
|
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/url"
|
"net/url"
|
||||||
|
"runtime"
|
||||||
"strconv"
|
"strconv"
|
||||||
)
|
)
|
||||||
|
|
||||||
const rangeReaderBufferSize = 1024 * 64
|
const rangeReaderBufferSize = 4096
|
||||||
|
|
||||||
type RangeReadSeekCloser struct {
|
type RangeReadSeekCloser struct {
|
||||||
uri *url.URL
|
uri *url.URL
|
||||||
size int64
|
size int64
|
||||||
i int64
|
i int64
|
||||||
|
body io.ReadCloser
|
||||||
buf []byte
|
buf []byte
|
||||||
ib int64
|
ib int64
|
||||||
}
|
}
|
||||||
|
@ -41,41 +42,13 @@ func (r *RangeReadSeekCloser) GetURI() string {
|
||||||
return r.uri.String()
|
return r.uri.String()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *RangeReadSeekCloser) Read(p []byte) (n int, err error) {
|
func (r *RangeReadSeekCloser) retryConnect() error {
|
||||||
if r.i >= r.size {
|
|
||||||
return 0, io.EOF
|
|
||||||
}
|
|
||||||
|
|
||||||
if r.i >= r.ib {
|
|
||||||
bufStart := int(r.i - r.ib)
|
|
||||||
bufEnd := bufStart + len(p)
|
|
||||||
if bufEnd <= len(r.buf) {
|
|
||||||
copy(p, r.buf[bufStart:bufEnd])
|
|
||||||
r.i += int64(len(p))
|
|
||||||
return len(p), nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
//TODO: EOF error / limit
|
|
||||||
headers := make(http.Header)
|
headers := make(http.Header)
|
||||||
|
|
||||||
startOffset := r.i
|
startOffset := r.i
|
||||||
endOffset := r.i + int64(len(p)) - 1
|
|
||||||
if endOffset-startOffset+1 < rangeReaderBufferSize {
|
|
||||||
endOffset = startOffset + rangeReaderBufferSize - 1
|
|
||||||
}
|
|
||||||
if endOffset >= (r.size - 1) {
|
|
||||||
endOffset = r.size - 1
|
|
||||||
}
|
|
||||||
|
|
||||||
expectedLength := endOffset - startOffset + 1
|
expectedLength := r.size - startOffset
|
||||||
|
headers.Set("Range", fmt.Sprintf("bytes=%d-", startOffset))
|
||||||
returnLength := int(endOffset - startOffset + 1)
|
|
||||||
if returnLength > len(p) {
|
|
||||||
returnLength = len(p)
|
|
||||||
}
|
|
||||||
|
|
||||||
headers.Set("Range", fmt.Sprintf("bytes=%d-%d", startOffset, endOffset))
|
|
||||||
response, err := http.DefaultClient.Do(&http.Request{
|
response, err := http.DefaultClient.Do(&http.Request{
|
||||||
Method: "GET",
|
Method: "GET",
|
||||||
URL: r.uri,
|
URL: r.uri,
|
||||||
|
@ -83,38 +56,80 @@ func (r *RangeReadSeekCloser) Read(p []byte) (n int, err error) {
|
||||||
})
|
})
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return err
|
||||||
}
|
}
|
||||||
defer response.Body.Close()
|
r.body = response.Body
|
||||||
|
runtime.SetFinalizer(r.body, io.ReadCloser.Close)
|
||||||
|
|
||||||
if response.StatusCode != http.StatusPartialContent {
|
if response.StatusCode != http.StatusPartialContent {
|
||||||
return 0, fmt.Errorf("response status code %d != %d", response.StatusCode, http.StatusPartialContent)
|
r.body.Close()
|
||||||
|
r.body = nil
|
||||||
|
return fmt.Errorf("response status code %d != %d", response.StatusCode, http.StatusPartialContent)
|
||||||
}
|
}
|
||||||
|
|
||||||
contentLength, err := strconv.ParseInt(response.Header.Get("content-length"), 10, 0)
|
contentLength, err := strconv.ParseInt(response.Header.Get("content-length"), 10, 0)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, errors.New("server response does not have a valid Content-Length")
|
r.body.Close()
|
||||||
|
r.body = nil
|
||||||
|
return errors.New("server response does not have a valid Content-Length")
|
||||||
}
|
}
|
||||||
|
|
||||||
if contentLength != expectedLength {
|
if contentLength != expectedLength {
|
||||||
return 0, fmt.Errorf("server returned %d bytes, expected %d", contentLength, expectedLength)
|
r.body.Close()
|
||||||
|
r.body = nil
|
||||||
|
return fmt.Errorf("server returned %d bytes, expected %d", contentLength, expectedLength)
|
||||||
}
|
}
|
||||||
|
|
||||||
data, err := ioutil.ReadAll(response.Body)
|
return nil
|
||||||
if err != nil {
|
}
|
||||||
return 0, err
|
|
||||||
|
func (r *RangeReadSeekCloser) Read(p []byte) (n int, err error) {
|
||||||
|
for retry := 0; retry < 2; retry++ {
|
||||||
|
if r.i >= r.size {
|
||||||
|
return 0, io.EOF
|
||||||
|
}
|
||||||
|
|
||||||
|
if r.i >= r.ib {
|
||||||
|
bufStart := int(r.i - r.ib)
|
||||||
|
bufEnd := bufStart + len(p)
|
||||||
|
if bufEnd <= len(r.buf) {
|
||||||
|
copy(p, r.buf[bufStart:bufEnd])
|
||||||
|
r.i += int64(len(p))
|
||||||
|
return len(p), nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if r.body == nil {
|
||||||
|
if err = r.retryConnect(); err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
data := make([]byte, rangeReaderBufferSize)
|
||||||
|
n, err = r.body.Read(data)
|
||||||
|
if err != nil { //detect actual eof, not a disconnection
|
||||||
|
if err == io.EOF || err == io.ErrUnexpectedEOF {
|
||||||
|
if r.i+int64(n) != r.size {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
r.buf = data[:n]
|
||||||
|
r.ib = r.i
|
||||||
|
|
||||||
|
readBytes := len(p)
|
||||||
|
if n < readBytes {
|
||||||
|
readBytes = n
|
||||||
|
}
|
||||||
|
|
||||||
|
copy(p[:readBytes], data[:readBytes])
|
||||||
|
r.i += int64(readBytes)
|
||||||
|
return readBytes, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(data) != int(expectedLength) {
|
return 0, errors.New("could not retry")
|
||||||
return 0, fmt.Errorf("read %d bytes, expected %d", len(data), expectedLength)
|
|
||||||
}
|
|
||||||
copy(p[:returnLength], data[:returnLength])
|
|
||||||
|
|
||||||
r.buf = data
|
|
||||||
r.ib = r.i
|
|
||||||
|
|
||||||
r.i += int64(returnLength)
|
|
||||||
return returnLength, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *RangeReadSeekCloser) Seek(offset int64, whence int) (int64, error) {
|
func (r *RangeReadSeekCloser) Seek(offset int64, whence int) (int64, error) {
|
||||||
|
@ -138,11 +153,21 @@ func (r *RangeReadSeekCloser) Seek(offset int64, whence int) (int64, error) {
|
||||||
return r.i, io.ErrUnexpectedEOF
|
return r.i, io.ErrUnexpectedEOF
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if r.i < r.ib || r.i >= r.ib+int64(len(r.buf)) {
|
||||||
|
r.body.Close()
|
||||||
|
r.body = nil
|
||||||
|
}
|
||||||
|
|
||||||
return r.i, nil
|
return r.i, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *RangeReadSeekCloser) Close() error {
|
func (r *RangeReadSeekCloser) Close() error {
|
||||||
//todo close?
|
if r.body != nil {
|
||||||
|
defer func() {
|
||||||
|
r.body = nil
|
||||||
|
}()
|
||||||
|
return r.body.Close()
|
||||||
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
21
queue.go
21
queue.go
|
@ -38,6 +38,7 @@ type QueueTrackEntry struct {
|
||||||
Artist string `json:"artist"`
|
Artist string `json:"artist"`
|
||||||
Art string `json:"art"`
|
Art string `json:"art"`
|
||||||
}
|
}
|
||||||
|
reader io.ReadSeekCloser
|
||||||
source audio.Source
|
source audio.Source
|
||||||
|
|
||||||
original map[string]interface{}
|
original map[string]interface{}
|
||||||
|
@ -48,19 +49,16 @@ func (e *QueueTrackEntry) Load() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
var reader io.ReadSeekCloser
|
|
||||||
|
|
||||||
if len(e.Path) > 4 && e.Path[:4] == "http" {
|
if len(e.Path) > 4 && e.Path[:4] == "http" {
|
||||||
s, err := NewRangeReadSeekCloser(e.Path)
|
s, err := NewRangeReadSeekCloser(e.Path)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
reader = s
|
|
||||||
//close at end, TODO check if it runs
|
//close at end, TODO check if it runs
|
||||||
runtime.SetFinalizer(s, (*RangeReadSeekCloser).Close)
|
runtime.SetFinalizer(s, (*RangeReadSeekCloser).Close)
|
||||||
|
|
||||||
reader = s
|
e.reader = s
|
||||||
} else {
|
} else {
|
||||||
f, err := os.Open(e.Path)
|
f, err := os.Open(e.Path)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -69,26 +67,26 @@ func (e *QueueTrackEntry) Load() error {
|
||||||
//close at end, TODO check if it runs
|
//close at end, TODO check if it runs
|
||||||
runtime.SetFinalizer(f, (*os.File).Close)
|
runtime.SetFinalizer(f, (*os.File).Close)
|
||||||
|
|
||||||
reader = f
|
e.reader = f
|
||||||
}
|
}
|
||||||
|
|
||||||
if reader == nil {
|
if e.reader == nil {
|
||||||
return errors.New("could not find stream opener")
|
return errors.New("could not find stream opener")
|
||||||
}
|
}
|
||||||
|
|
||||||
meta, err := tag.ReadFrom(reader)
|
meta, err := tag.ReadFrom(e.reader)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
err = nil
|
err = nil
|
||||||
}
|
}
|
||||||
if _, err = reader.Seek(0, io.SeekStart); err != nil {
|
if _, err = e.reader.Seek(0, io.SeekStart); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
decoders, err := guess.GetDecoders(reader, e.Path)
|
decoders, err := guess.GetDecoders(e.reader, e.Path)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
source, err := guess.Open(reader, decoders)
|
source, err := guess.Open(e.reader, decoders)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -356,6 +354,9 @@ func (q *Queue) Remove(identifier audio.QueueIdentifier) bool {
|
||||||
q.queue = append(q.queue[:i], q.queue[i+1:]...)
|
q.queue = append(q.queue[:i], q.queue[i+1:]...)
|
||||||
q.mutex.Unlock()
|
q.mutex.Unlock()
|
||||||
q.audioQueue.Remove(identifier)
|
q.audioQueue.Remove(identifier)
|
||||||
|
if e.reader != nil {
|
||||||
|
e.reader.Close()
|
||||||
|
}
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue