Adjust read/write buffers for streamer
All checks were successful
continuous-integration/drone/push Build is passing

This commit is contained in:
DataHoarder 2022-03-07 17:33:37 +01:00
parent 5eda50e5b7
commit 07109d4ec0
Signed by: DataHoarder
SSH key fingerprint: SHA256:OLTRf6Fl87G52SiR7sWLGNzlJt4WOX+tfI2yxo0z7xk
3 changed files with 19 additions and 3 deletions

View file

@ -34,7 +34,7 @@ func main() {
server.SetKeepAlivesEnabled(false) server.SetKeepAlivesEnabled(false)
//setup a timeout to prevent slow clients blocking. See https://github.com/golang/go/issues/16100 //setup a timeout to prevent slow clients blocking. See https://github.com/golang/go/issues/16100
timeoutListener, err := newListener("tcp", server.Addr, time.Second*15, time.Second*30) timeoutListener, err := newListener("tcp", server.Addr, time.Second*15, time.Second*15)
if err != nil { if err != nil {
log.Panic(err) log.Panic(err)
} }

View file

@ -1,6 +1,7 @@
package main package main
import ( import (
"log"
"net" "net"
"sync/atomic" "sync/atomic"
"time" "time"
@ -17,6 +18,17 @@ func (l *listener) Accept() (net.Conn, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
if tcpConnection, ok := c.(*net.TCPConn); ok {
tcpConnection.SetReadBuffer(1024 * 64)
tcpConnection.SetWriteBuffer(1024 * 64)
tcpConnection.SetKeepAlive(true)
tcpConnection.SetKeepAlivePeriod(l.WriteTimeout / 4)
tcpConnection.SetNoDelay(true)
tcpConnection.SetLinger(0)
}
log.Printf("accepted new connection from %s\n", c.RemoteAddr().String())
tc := &Conn{ tc := &Conn{
Conn: c, Conn: c,
ReadTimeout: l.ReadTimeout, ReadTimeout: l.ReadTimeout,

View file

@ -508,7 +508,7 @@ func (q *Queue) HandleRadioRequest(writer http.ResponseWriter, request *http.Req
} }
} }
if len(writeChannel) >= byteSliceChannelBuffer-1 { if len(writeChannel) >= (byteSliceChannelBuffer - 1) {
requestDone = errors.New("client ran out of buffer") requestDone = errors.New("client ran out of buffer")
return requestDone return requestDone
} }
@ -524,8 +524,9 @@ func (q *Queue) HandleRadioRequest(writer http.ResponseWriter, request *http.Req
return nil return nil
} }
if len(writeChannel) >= byteSliceChannelBuffer-1 { if len(writeChannel) >= (byteSliceChannelBuffer - 1) {
requestDone = errors.New("client ran out of buffer") requestDone = errors.New("client ran out of buffer")
log.Printf("failed to write data to client: %s\n", requestDone)
return requestDone return requestDone
} }
writeChannel <- packet.GetData() writeChannel <- packet.GetData()
@ -543,6 +544,7 @@ func (q *Queue) HandleRadioRequest(writer http.ResponseWriter, request *http.Req
for byteSlice := range writeChannel { for byteSlice := range writeChannel {
if _, requestDone = writer.Write(byteSlice); requestDone != nil { if _, requestDone = writer.Write(byteSlice); requestDone != nil {
log.Printf("failed to write data to client: %s\n", requestDone)
break break
} }
//try flush //try flush
@ -607,6 +609,7 @@ func (q *Queue) HandleRadioRequest(writer http.ResponseWriter, request *http.Req
Headers: headers, Headers: headers,
}, },
Start: func(packets []packetizer.Packet) error { Start: func(packets []packetizer.Packet) error {
log.Printf("adding %s client to stream %s\n", request.RemoteAddr, mount.Mount)
if len(packets) > 0 { if len(packets) > 0 {
sampleBufferMin := packets[len(packets)-1].GetStartSampleNumber() - sampleBufferLimit sampleBufferMin := packets[len(packets)-1].GetStartSampleNumber() - sampleBufferLimit
for _, p := range packets { for _, p := range packets {
@ -621,6 +624,7 @@ func (q *Queue) HandleRadioRequest(writer http.ResponseWriter, request *http.Req
}, },
Write: packetWriteCallback, Write: packetWriteCallback,
Close: func() { Close: func() {
log.Printf("removing %s client from stream %s\n", request.RemoteAddr, mount.Mount)
defer wgClient.Done() defer wgClient.Done()
close(writeChannel) close(writeChannel)
}, },