Compare commits
3 commits
6ea83ecce7
...
f8eb6de4c1
Author | SHA1 | Date | |
---|---|---|---|
DataHoarder | f8eb6de4c1 | ||
DataHoarder | dfc2be9bf2 | ||
DataHoarder | 0e5c9c69c7 |
|
@ -6,7 +6,6 @@ import (
|
|||
"git.gammaspectra.live/S.O.N.G/MeteorLight/api"
|
||||
"git.gammaspectra.live/S.O.N.G/MeteorLight/config"
|
||||
"git.gammaspectra.live/S.O.N.G/MeteorLight/queue"
|
||||
"git.gammaspectra.live/S.O.N.G/MeteorLight/util"
|
||||
"log"
|
||||
"net/http"
|
||||
"sync"
|
||||
|
@ -32,18 +31,12 @@ func main() {
|
|||
defer wg.Done()
|
||||
|
||||
server := http.Server{
|
||||
Addr: fmt.Sprintf("%s:%d", conf.Radio.Host, conf.Radio.Port),
|
||||
Handler: http.HandlerFunc(queueInstance.HandleRadioRequest),
|
||||
}
|
||||
server.SetKeepAlivesEnabled(false)
|
||||
|
||||
//setup a timeout to prevent slow clients blocking. See https://github.com/golang/go/issues/16100
|
||||
timeoutListener, err := util.NewTimeoutListener("tcp", server.Addr, time.Second*15, time.Second*15)
|
||||
if err != nil {
|
||||
log.Panic(err)
|
||||
Addr: fmt.Sprintf("%s:%d", conf.Radio.Host, conf.Radio.Port),
|
||||
Handler: http.HandlerFunc(queueInstance.HandleRadioRequest),
|
||||
ReadTimeout: time.Second * 5,
|
||||
}
|
||||
|
||||
if err = server.Serve(timeoutListener); err != nil {
|
||||
if err = server.ListenAndServe(); err != nil {
|
||||
log.Panic(err)
|
||||
}
|
||||
}()
|
||||
|
|
|
@ -8,7 +8,6 @@ import (
|
|||
"git.gammaspectra.live/S.O.N.G/Kirika/audio/packetizer"
|
||||
"git.gammaspectra.live/S.O.N.G/MeteorLight/listener"
|
||||
"git.gammaspectra.live/S.O.N.G/MeteorLight/queue/metadata"
|
||||
"git.gammaspectra.live/S.O.N.G/MeteorLight/util"
|
||||
"sync/atomic"
|
||||
)
|
||||
|
||||
|
@ -18,13 +17,13 @@ type Listener struct {
|
|||
streamStartOffset int64
|
||||
streamSamplesBuffer int64
|
||||
|
||||
ctx *util.RequestDone
|
||||
sliceWriter chan []byte
|
||||
writer listener.WriterFunc
|
||||
waiter chan struct{}
|
||||
offsetStart bool
|
||||
headerBytes []byte
|
||||
}
|
||||
|
||||
func NewListener(info listener.Information, ctx *util.RequestDone, sliceWriter chan []byte, samplesToBuffer int64, offsetStart bool, channels, sampleRate int, mimeType string) (*Listener, map[string]string) {
|
||||
func NewListener(info listener.Information, writer listener.WriterFunc, samplesToBuffer int64, offsetStart bool, channels, sampleRate int, mimeType string) (*Listener, map[string]string) {
|
||||
headers := make(map[string]string)
|
||||
headers["x-audio-packet-stream"] = "1"
|
||||
headers["content-type"] = "application/x-audio-packet-stream"
|
||||
|
@ -38,15 +37,19 @@ func NewListener(info listener.Information, ctx *util.RequestDone, sliceWriter c
|
|||
|
||||
return &Listener{
|
||||
information: info,
|
||||
ctx: ctx,
|
||||
sliceWriter: sliceWriter,
|
||||
writer: writer,
|
||||
offsetStart: offsetStart,
|
||||
streamSamplesBuffer: samplesToBuffer,
|
||||
streamStartOffset: -1,
|
||||
headerBytes: headerBytes.Bytes(),
|
||||
waiter: make(chan struct{}),
|
||||
}, headers
|
||||
}
|
||||
|
||||
func (l *Listener) Wait() {
|
||||
<-l.waiter
|
||||
}
|
||||
|
||||
func (l *Listener) Identifier() string {
|
||||
return l.information.Identifier
|
||||
}
|
||||
|
@ -64,13 +67,15 @@ func (l *Listener) Start(packets []packetizer.Packet) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
l.sliceWriter <- (&packetStreamFrame{
|
||||
if err := l.writer((&packetStreamFrame{
|
||||
Type: Header,
|
||||
Category: 0,
|
||||
StartSampleNumber: 0,
|
||||
DurationInSamples: 0,
|
||||
Data: l.headerBytes,
|
||||
}).Encode()
|
||||
}).Encode()); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if len(packets) > 0 {
|
||||
sampleBufferMin := packets[len(packets)-1].GetStartSampleNumber() - l.streamSamplesBuffer
|
||||
|
@ -86,50 +91,36 @@ func (l *Listener) Start(packets []packetizer.Packet) error {
|
|||
}
|
||||
|
||||
func (l *Listener) Write(packet packetizer.Packet) error {
|
||||
if l.ctx.Done() {
|
||||
return l.ctx.Error()
|
||||
}
|
||||
|
||||
if metadataPacket, ok := packet.(*metadata.Packet); ok {
|
||||
|
||||
queueInfoBuf := make([]byte, binary.MaxVarintLen64)
|
||||
n := binary.PutVarint(queueInfoBuf, int64(metadataPacket.TrackEntry.Identifier))
|
||||
|
||||
if len(l.sliceWriter) >= (cap(l.sliceWriter) - 1) {
|
||||
l.ctx.Fail(errors.New("client ran out of writer buffer"))
|
||||
return l.ctx.Error()
|
||||
}
|
||||
|
||||
l.sliceWriter <- (&packetStreamFrame{
|
||||
if err := l.writer((&packetStreamFrame{
|
||||
Type: TrackIdentifier,
|
||||
Category: packet.Category(),
|
||||
StartSampleNumber: packet.GetStartSampleNumber(),
|
||||
DurationInSamples: packet.GetEndSampleNumber() - packet.GetStartSampleNumber(),
|
||||
Data: queueInfoBuf[:n],
|
||||
}).Encode()
|
||||
}).Encode()); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if metadataBytes, err := json.Marshal(metadataPacket.TrackEntry.Metadata); err == nil {
|
||||
if len(l.sliceWriter) >= (cap(l.sliceWriter) - 1) {
|
||||
l.ctx.Fail(errors.New("client ran out of writer buffer"))
|
||||
return l.ctx.Error()
|
||||
}
|
||||
|
||||
l.sliceWriter <- (&packetStreamFrame{
|
||||
if err := l.writer((&packetStreamFrame{
|
||||
Type: TrackMetadata,
|
||||
Category: packet.Category(),
|
||||
StartSampleNumber: packet.GetStartSampleNumber(),
|
||||
DurationInSamples: packet.GetEndSampleNumber() - packet.GetStartSampleNumber(),
|
||||
Data: metadataBytes,
|
||||
}).Encode()
|
||||
}).Encode()); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
if len(l.sliceWriter) >= (cap(l.sliceWriter) - 1) {
|
||||
l.ctx.Fail(errors.New("client ran out of writer buffer"))
|
||||
return l.ctx.Error()
|
||||
}
|
||||
|
||||
var frameType PacketStreamType
|
||||
switch packet.KeepMode() {
|
||||
case packetizer.KeepLast:
|
||||
|
@ -146,19 +137,18 @@ func (l *Listener) Write(packet packetizer.Packet) error {
|
|||
return errors.New("unknown KeepMode")
|
||||
}
|
||||
|
||||
l.sliceWriter <- (&packetStreamFrame{
|
||||
return l.writer((&packetStreamFrame{
|
||||
Type: frameType,
|
||||
Category: packet.Category(),
|
||||
StartSampleNumber: packet.GetStartSampleNumber(),
|
||||
DurationInSamples: packet.GetEndSampleNumber() - packet.GetStartSampleNumber(),
|
||||
Data: packet.GetData(),
|
||||
}).Encode()
|
||||
|
||||
return nil
|
||||
}).Encode())
|
||||
}
|
||||
|
||||
func (l *Listener) Close() {
|
||||
close(l.sliceWriter)
|
||||
l.writer(nil)
|
||||
close(l.waiter)
|
||||
}
|
||||
|
||||
type PacketStreamType uint64
|
||||
|
|
|
@ -1,12 +1,10 @@
|
|||
package icy
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"git.gammaspectra.live/S.O.N.G/Kirika/audio/packetizer"
|
||||
"git.gammaspectra.live/S.O.N.G/MeteorLight/listener"
|
||||
"git.gammaspectra.live/S.O.N.G/MeteorLight/queue/metadata"
|
||||
"git.gammaspectra.live/S.O.N.G/MeteorLight/util"
|
||||
"strconv"
|
||||
"sync/atomic"
|
||||
)
|
||||
|
@ -22,28 +20,32 @@ type Listener struct {
|
|||
|
||||
counter int
|
||||
|
||||
ctx *util.RequestDone
|
||||
sliceWriter chan []byte
|
||||
writer listener.WriterFunc
|
||||
waiter chan struct{}
|
||||
offsetStart bool
|
||||
|
||||
pendingMetadata map[string]string
|
||||
}
|
||||
|
||||
func NewListener(info listener.Information, ctx *util.RequestDone, sliceWriter chan []byte, samplesToBuffer int64, offsetStart bool) (*Listener, map[string]string) {
|
||||
func NewListener(info listener.Information, writer listener.WriterFunc, samplesToBuffer int64, offsetStart bool) (*Listener, map[string]string) {
|
||||
headers := make(map[string]string)
|
||||
headers["icy-metaint"] = strconv.Itoa(icyInterval)
|
||||
|
||||
return &Listener{
|
||||
information: info,
|
||||
ctx: ctx,
|
||||
sliceWriter: sliceWriter,
|
||||
writer: writer,
|
||||
offsetStart: offsetStart,
|
||||
streamSamplesBuffer: samplesToBuffer,
|
||||
streamStartOffset: -1,
|
||||
pendingMetadata: make(map[string]string),
|
||||
waiter: make(chan struct{}),
|
||||
}, headers
|
||||
}
|
||||
|
||||
func (l *Listener) Wait() {
|
||||
<-l.waiter
|
||||
}
|
||||
|
||||
func (l *Listener) Identifier() string {
|
||||
return l.information.Identifier
|
||||
}
|
||||
|
@ -101,9 +103,6 @@ func (l *Listener) writeIcy() []byte {
|
|||
}
|
||||
|
||||
func (l *Listener) Write(packet packetizer.Packet) error {
|
||||
if l.ctx.Done() {
|
||||
return l.ctx.Error()
|
||||
}
|
||||
|
||||
if metadataPacket, ok := packet.(*metadata.Packet); ok {
|
||||
if len(metadataPacket.TrackEntry.Artist()) > 0 {
|
||||
|
@ -148,15 +147,10 @@ func (l *Listener) Write(packet packetizer.Packet) error {
|
|||
}
|
||||
}
|
||||
|
||||
if len(l.sliceWriter) >= (cap(l.sliceWriter) - 1) {
|
||||
l.ctx.Fail(errors.New("client ran out of writer buffer"))
|
||||
return l.ctx.Error()
|
||||
}
|
||||
|
||||
l.sliceWriter <- data
|
||||
return nil
|
||||
return l.writer(data)
|
||||
}
|
||||
|
||||
func (l *Listener) Close() {
|
||||
close(l.sliceWriter)
|
||||
l.writer(nil)
|
||||
close(l.waiter)
|
||||
}
|
||||
|
|
|
@ -22,14 +22,7 @@ type Listener interface {
|
|||
Start(packets []packetizer.Packet) error
|
||||
Write(packet packetizer.Packet) error
|
||||
Close()
|
||||
Wait()
|
||||
}
|
||||
|
||||
/*
|
||||
type Listener struct {
|
||||
Information ListenerInformation
|
||||
Start func(packets []packetizer.Packet) error
|
||||
Write func(packet packetizer.Packet) error
|
||||
Close func()
|
||||
}
|
||||
|
||||
*/
|
||||
type WriterFunc func(data []byte) error
|
||||
|
|
|
@ -1,11 +1,9 @@
|
|||
package plain
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"git.gammaspectra.live/S.O.N.G/Kirika/audio/packetizer"
|
||||
"git.gammaspectra.live/S.O.N.G/MeteorLight/listener"
|
||||
"git.gammaspectra.live/S.O.N.G/MeteorLight/queue/metadata"
|
||||
"git.gammaspectra.live/S.O.N.G/MeteorLight/util"
|
||||
"sync/atomic"
|
||||
)
|
||||
|
||||
|
@ -15,22 +13,26 @@ type Listener struct {
|
|||
streamStartOffset int64
|
||||
streamSamplesBuffer int64
|
||||
|
||||
ctx *util.RequestDone
|
||||
sliceWriter chan []byte
|
||||
writer listener.WriterFunc
|
||||
waiter chan struct{}
|
||||
offsetStart bool
|
||||
}
|
||||
|
||||
func NewListener(info listener.Information, ctx *util.RequestDone, sliceWriter chan []byte, samplesToBuffer int64, offsetStart bool) (*Listener, map[string]string) {
|
||||
func NewListener(info listener.Information, writer listener.WriterFunc, samplesToBuffer int64, offsetStart bool) (*Listener, map[string]string) {
|
||||
return &Listener{
|
||||
information: info,
|
||||
ctx: ctx,
|
||||
sliceWriter: sliceWriter,
|
||||
writer: writer,
|
||||
offsetStart: offsetStart,
|
||||
streamSamplesBuffer: samplesToBuffer,
|
||||
streamStartOffset: -1,
|
||||
waiter: make(chan struct{}),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (l *Listener) Wait() {
|
||||
<-l.waiter
|
||||
}
|
||||
|
||||
func (l *Listener) Identifier() string {
|
||||
return l.information.Identifier
|
||||
}
|
||||
|
@ -62,37 +64,27 @@ func (l *Listener) Start(packets []packetizer.Packet) error {
|
|||
}
|
||||
|
||||
func (l *Listener) Write(packet packetizer.Packet) error {
|
||||
if l.ctx.Done() {
|
||||
return l.ctx.Error()
|
||||
}
|
||||
|
||||
if _, ok := packet.(*metadata.Packet); ok {
|
||||
return nil
|
||||
}
|
||||
|
||||
if len(l.sliceWriter) >= (cap(l.sliceWriter) - 1) {
|
||||
l.ctx.Fail(errors.New("client ran out of writer buffer"))
|
||||
return l.ctx.Error()
|
||||
}
|
||||
|
||||
if offsetable, ok := packet.(packetizer.OffsetablePacket); l.offsetStart && ok {
|
||||
if l.streamStartOffset <= -1 {
|
||||
if offsetable.KeepMode() != packetizer.Keep {
|
||||
l.streamStartOffset = offsetable.GetStartSampleNumber()
|
||||
l.sliceWriter <- offsetable.GetDataOffset(l.streamStartOffset)
|
||||
return l.writer(offsetable.GetDataOffset(l.streamStartOffset))
|
||||
} else {
|
||||
l.sliceWriter <- packet.GetData()
|
||||
return l.writer(packet.GetData())
|
||||
}
|
||||
} else {
|
||||
l.sliceWriter <- offsetable.GetDataOffset(l.streamStartOffset)
|
||||
return l.writer(offsetable.GetDataOffset(l.streamStartOffset))
|
||||
}
|
||||
} else {
|
||||
l.sliceWriter <- packet.GetData()
|
||||
return l.writer(packet.GetData())
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (l *Listener) Close() {
|
||||
close(l.sliceWriter)
|
||||
l.writer(nil)
|
||||
close(l.waiter)
|
||||
}
|
||||
|
|
|
@ -409,10 +409,6 @@ func (q *Queue) HandleRadioRequest(writer http.ResponseWriter, request *http.Req
|
|||
writer.Header().Set("x-audiocast-public", "1")
|
||||
}
|
||||
|
||||
//buffer a bit, drop channels when buffer grows to not lock others. They will get disconnected elsewhere
|
||||
const byteSliceChannelBuffer = 1024 * 16
|
||||
writeChannel := make(chan []byte, byteSliceChannelBuffer)
|
||||
|
||||
requestDone := util.RequestDone{}
|
||||
|
||||
var headers []listener.HeaderEntry
|
||||
|
@ -476,14 +472,61 @@ func (q *Queue) HandleRadioRequest(writer http.ResponseWriter, request *http.Req
|
|||
var mountListener listener.Listener
|
||||
var extraHeaders map[string]string
|
||||
|
||||
ctx := request.Context()
|
||||
//buffer a bit, drop channels when buffer grows to not lock others. They will get disconnected elsewhere
|
||||
const byteSliceChannelBuffer = 1024 * 16
|
||||
writeChannel := make(chan []byte, byteSliceChannelBuffer)
|
||||
|
||||
go func() {
|
||||
var flusher http.Flusher
|
||||
if httpFlusher, ok := writer.(http.Flusher); ok {
|
||||
flusher = httpFlusher
|
||||
}
|
||||
|
||||
for !requestDone.Done() {
|
||||
select {
|
||||
case byteSlice := <-writeChannel:
|
||||
if _, err := writer.Write(byteSlice); err != nil {
|
||||
requestDone.Fail(err)
|
||||
break
|
||||
}
|
||||
//try flush
|
||||
if flusher != nil {
|
||||
flusher.Flush()
|
||||
}
|
||||
case <-ctx.Done():
|
||||
requestDone.Fail(ctx.Err())
|
||||
break
|
||||
}
|
||||
|
||||
}
|
||||
}()
|
||||
|
||||
funcWriter := func(byteSlice []byte) error {
|
||||
if requestDone.Done() {
|
||||
return requestDone.Error()
|
||||
}
|
||||
|
||||
if len(byteSlice) > 0 {
|
||||
if len(writeChannel) >= (cap(writeChannel) - 1) {
|
||||
requestDone.Fail(errors.New("client ran out of writer buffer"))
|
||||
return requestDone.Error()
|
||||
}
|
||||
|
||||
writeChannel <- byteSlice
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
//set X-Audio-Packet-Stream for strictly timed packets and metadata
|
||||
if numberValue, err := strconv.Atoi(request.Header.Get("x-audio-packet-stream")); err == nil && numberValue == 1 {
|
||||
//version 1
|
||||
mountListener, extraHeaders = aps1.NewListener(listenerInformation, &requestDone, writeChannel, sampleBufferLimit, mount.OffsetStart, mount.Channels, mount.SampleRate, mount.MimeType)
|
||||
mountListener, extraHeaders = aps1.NewListener(listenerInformation, funcWriter, sampleBufferLimit, mount.OffsetStart, mount.Channels, mount.SampleRate, mount.MimeType)
|
||||
} else if numberValue, err = strconv.Atoi(request.Header.Get("icy-metadata")); err == nil && numberValue >= 1 {
|
||||
mountListener, extraHeaders = icy.NewListener(listenerInformation, &requestDone, writeChannel, sampleBufferLimit, mount.OffsetStart)
|
||||
mountListener, extraHeaders = icy.NewListener(listenerInformation, funcWriter, sampleBufferLimit, mount.OffsetStart)
|
||||
} else {
|
||||
mountListener, extraHeaders = plain.NewListener(listenerInformation, &requestDone, writeChannel, sampleBufferLimit, mount.OffsetStart)
|
||||
mountListener, extraHeaders = plain.NewListener(listenerInformation, funcWriter, sampleBufferLimit, mount.OffsetStart)
|
||||
}
|
||||
|
||||
if mountListener == nil {
|
||||
|
@ -500,29 +543,9 @@ func (q *Queue) HandleRadioRequest(writer http.ResponseWriter, request *http.Req
|
|||
log.Printf("adding %s client to stream %s (%s, %s, agent \"%s\", buffer %.2f seconds)\n", listenerIdentifier, mount.Mount, request.RemoteAddr, request.Proto, request.Header.Get("user-agent"), float64(sampleBufferLimit)/float64(mount.SampleRate))
|
||||
mount.AddListener(mountListener)
|
||||
|
||||
ctx := request.Context()
|
||||
var flusher http.Flusher
|
||||
if httpFlusher, ok := writer.(http.Flusher); ok {
|
||||
flusher = httpFlusher
|
||||
}
|
||||
mountListener.Wait()
|
||||
|
||||
for !requestDone.Done() {
|
||||
select {
|
||||
case byteSlice := <-writeChannel:
|
||||
if _, err := writer.Write(byteSlice); err != nil {
|
||||
requestDone.Fail(err)
|
||||
break
|
||||
}
|
||||
//try flush
|
||||
if flusher != nil {
|
||||
flusher.Flush()
|
||||
}
|
||||
case <-ctx.Done():
|
||||
requestDone.Fail(ctx.Err())
|
||||
break
|
||||
}
|
||||
|
||||
}
|
||||
requestDone.Complete()
|
||||
|
||||
log.Printf("removing %s client from stream %s\n", listenerIdentifier, mount.Mount)
|
||||
return
|
||||
|
|
|
@ -272,8 +272,11 @@ func (m *Mount) handlePacket(packet packetizer.Packet) {
|
|||
var err error
|
||||
for _, l := range m.listeners {
|
||||
if !l.HasStarted() {
|
||||
//TODO: handle error too?
|
||||
l.Start(m.keepBuffer)
|
||||
if err = l.Start(m.keepBuffer); err != nil {
|
||||
log.Printf("failed to write data to %s client: %s\n", l.Identifier(), err)
|
||||
toRemove = append(toRemove, l.Identifier())
|
||||
continue
|
||||
}
|
||||
}
|
||||
if err = l.Write(packet); err != nil {
|
||||
log.Printf("failed to write data to %s client: %s\n", l.Identifier(), err)
|
||||
|
|
|
@ -1,96 +0,0 @@
|
|||
package util
|
||||
|
||||
import (
|
||||
"log"
|
||||
"net"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
)
|
||||
|
||||
type listener struct {
|
||||
net.Listener
|
||||
ReadTimeout time.Duration
|
||||
WriteTimeout time.Duration
|
||||
}
|
||||
|
||||
func (l *listener) Accept() (net.Conn, error) {
|
||||
c, err := l.Listener.Accept()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if tcpConnection, ok := c.(*net.TCPConn); ok {
|
||||
tcpConnection.SetReadBuffer(1024 * 64)
|
||||
tcpConnection.SetWriteBuffer(1024 * 64)
|
||||
tcpConnection.SetKeepAlive(true)
|
||||
tcpConnection.SetKeepAlivePeriod(l.WriteTimeout / 4)
|
||||
tcpConnection.SetNoDelay(true)
|
||||
tcpConnection.SetLinger(0)
|
||||
}
|
||||
|
||||
log.Printf("accepted new connection from %s\n", c.RemoteAddr().String())
|
||||
tc := &Conn{
|
||||
Conn: c,
|
||||
ReadTimeout: l.ReadTimeout,
|
||||
WriteTimeout: l.WriteTimeout,
|
||||
ReadThreshold: int32((l.ReadTimeout * 1024) / time.Second),
|
||||
WriteThreshold: int32((l.WriteTimeout * 1024) / time.Second),
|
||||
}
|
||||
return tc, nil
|
||||
}
|
||||
|
||||
// Conn wraps a net.Conn, and sets a deadline for every read
|
||||
// and write operation.
|
||||
type Conn struct {
|
||||
net.Conn
|
||||
ReadTimeout time.Duration
|
||||
WriteTimeout time.Duration
|
||||
ReadThreshold int32
|
||||
WriteThreshold int32
|
||||
BytesReadFromDeadline atomic.Int32
|
||||
BytesWrittenFromDeadline atomic.Int32
|
||||
}
|
||||
|
||||
func (c *Conn) Read(b []byte) (n int, err error) {
|
||||
if c.BytesReadFromDeadline.Load() > c.ReadThreshold {
|
||||
c.BytesReadFromDeadline.Store(0)
|
||||
// we set both read and write deadlines here otherwise after the request
|
||||
// is read writing the response fails with an i/o timeout error
|
||||
err = c.Conn.SetDeadline(time.Now().Add(c.ReadTimeout))
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
}
|
||||
n, err = c.Conn.Read(b)
|
||||
c.BytesReadFromDeadline.Add(int32(n))
|
||||
return
|
||||
}
|
||||
|
||||
func (c *Conn) Write(b []byte) (n int, err error) {
|
||||
if c.BytesWrittenFromDeadline.Load() > c.WriteThreshold {
|
||||
c.BytesWrittenFromDeadline.Store(0)
|
||||
// we extend the read deadline too, not sure it's necessary,
|
||||
// but it doesn't hurt
|
||||
err = c.Conn.SetDeadline(time.Now().Add(c.WriteTimeout))
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
n, err = c.Conn.Write(b)
|
||||
c.BytesWrittenFromDeadline.Add(int32(n))
|
||||
return
|
||||
}
|
||||
|
||||
func NewTimeoutListener(network, addr string, readTimeout, writeTimeout time.Duration) (net.Listener, error) {
|
||||
l, err := net.Listen(network, addr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
tl := &listener{
|
||||
Listener: l,
|
||||
ReadTimeout: readTimeout,
|
||||
WriteTimeout: writeTimeout,
|
||||
}
|
||||
return tl, nil
|
||||
}
|
Loading…
Reference in a new issue