Compare commits

...

3 commits

Author SHA1 Message Date
DataHoarder f8eb6de4c1
Error on listener Start failure
All checks were successful
continuous-integration/drone/push Build is passing
2022-10-02 15:38:51 +02:00
DataHoarder dfc2be9bf2
Cleanup how listeners write data back to client 2022-10-02 15:19:51 +02:00
DataHoarder 0e5c9c69c7
Remove custom tcp listener, MeteorLight is expected to sit under reverse proxies 2022-10-02 14:34:13 +02:00
8 changed files with 115 additions and 223 deletions

View file

@ -6,7 +6,6 @@ import (
"git.gammaspectra.live/S.O.N.G/MeteorLight/api" "git.gammaspectra.live/S.O.N.G/MeteorLight/api"
"git.gammaspectra.live/S.O.N.G/MeteorLight/config" "git.gammaspectra.live/S.O.N.G/MeteorLight/config"
"git.gammaspectra.live/S.O.N.G/MeteorLight/queue" "git.gammaspectra.live/S.O.N.G/MeteorLight/queue"
"git.gammaspectra.live/S.O.N.G/MeteorLight/util"
"log" "log"
"net/http" "net/http"
"sync" "sync"
@ -32,18 +31,12 @@ func main() {
defer wg.Done() defer wg.Done()
server := http.Server{ server := http.Server{
Addr: fmt.Sprintf("%s:%d", conf.Radio.Host, conf.Radio.Port), Addr: fmt.Sprintf("%s:%d", conf.Radio.Host, conf.Radio.Port),
Handler: http.HandlerFunc(queueInstance.HandleRadioRequest), Handler: http.HandlerFunc(queueInstance.HandleRadioRequest),
} ReadTimeout: time.Second * 5,
server.SetKeepAlivesEnabled(false)
//setup a timeout to prevent slow clients blocking. See https://github.com/golang/go/issues/16100
timeoutListener, err := util.NewTimeoutListener("tcp", server.Addr, time.Second*15, time.Second*15)
if err != nil {
log.Panic(err)
} }
if err = server.Serve(timeoutListener); err != nil { if err = server.ListenAndServe(); err != nil {
log.Panic(err) log.Panic(err)
} }
}() }()

View file

@ -8,7 +8,6 @@ import (
"git.gammaspectra.live/S.O.N.G/Kirika/audio/packetizer" "git.gammaspectra.live/S.O.N.G/Kirika/audio/packetizer"
"git.gammaspectra.live/S.O.N.G/MeteorLight/listener" "git.gammaspectra.live/S.O.N.G/MeteorLight/listener"
"git.gammaspectra.live/S.O.N.G/MeteorLight/queue/metadata" "git.gammaspectra.live/S.O.N.G/MeteorLight/queue/metadata"
"git.gammaspectra.live/S.O.N.G/MeteorLight/util"
"sync/atomic" "sync/atomic"
) )
@ -18,13 +17,13 @@ type Listener struct {
streamStartOffset int64 streamStartOffset int64
streamSamplesBuffer int64 streamSamplesBuffer int64
ctx *util.RequestDone writer listener.WriterFunc
sliceWriter chan []byte waiter chan struct{}
offsetStart bool offsetStart bool
headerBytes []byte headerBytes []byte
} }
func NewListener(info listener.Information, ctx *util.RequestDone, sliceWriter chan []byte, samplesToBuffer int64, offsetStart bool, channels, sampleRate int, mimeType string) (*Listener, map[string]string) { func NewListener(info listener.Information, writer listener.WriterFunc, samplesToBuffer int64, offsetStart bool, channels, sampleRate int, mimeType string) (*Listener, map[string]string) {
headers := make(map[string]string) headers := make(map[string]string)
headers["x-audio-packet-stream"] = "1" headers["x-audio-packet-stream"] = "1"
headers["content-type"] = "application/x-audio-packet-stream" headers["content-type"] = "application/x-audio-packet-stream"
@ -38,15 +37,19 @@ func NewListener(info listener.Information, ctx *util.RequestDone, sliceWriter c
return &Listener{ return &Listener{
information: info, information: info,
ctx: ctx, writer: writer,
sliceWriter: sliceWriter,
offsetStart: offsetStart, offsetStart: offsetStart,
streamSamplesBuffer: samplesToBuffer, streamSamplesBuffer: samplesToBuffer,
streamStartOffset: -1, streamStartOffset: -1,
headerBytes: headerBytes.Bytes(), headerBytes: headerBytes.Bytes(),
waiter: make(chan struct{}),
}, headers }, headers
} }
func (l *Listener) Wait() {
<-l.waiter
}
func (l *Listener) Identifier() string { func (l *Listener) Identifier() string {
return l.information.Identifier return l.information.Identifier
} }
@ -64,13 +67,15 @@ func (l *Listener) Start(packets []packetizer.Packet) error {
return nil return nil
} }
l.sliceWriter <- (&packetStreamFrame{ if err := l.writer((&packetStreamFrame{
Type: Header, Type: Header,
Category: 0, Category: 0,
StartSampleNumber: 0, StartSampleNumber: 0,
DurationInSamples: 0, DurationInSamples: 0,
Data: l.headerBytes, Data: l.headerBytes,
}).Encode() }).Encode()); err != nil {
return err
}
if len(packets) > 0 { if len(packets) > 0 {
sampleBufferMin := packets[len(packets)-1].GetStartSampleNumber() - l.streamSamplesBuffer sampleBufferMin := packets[len(packets)-1].GetStartSampleNumber() - l.streamSamplesBuffer
@ -86,50 +91,36 @@ func (l *Listener) Start(packets []packetizer.Packet) error {
} }
func (l *Listener) Write(packet packetizer.Packet) error { func (l *Listener) Write(packet packetizer.Packet) error {
if l.ctx.Done() {
return l.ctx.Error()
}
if metadataPacket, ok := packet.(*metadata.Packet); ok { if metadataPacket, ok := packet.(*metadata.Packet); ok {
queueInfoBuf := make([]byte, binary.MaxVarintLen64) queueInfoBuf := make([]byte, binary.MaxVarintLen64)
n := binary.PutVarint(queueInfoBuf, int64(metadataPacket.TrackEntry.Identifier)) n := binary.PutVarint(queueInfoBuf, int64(metadataPacket.TrackEntry.Identifier))
if len(l.sliceWriter) >= (cap(l.sliceWriter) - 1) { if err := l.writer((&packetStreamFrame{
l.ctx.Fail(errors.New("client ran out of writer buffer"))
return l.ctx.Error()
}
l.sliceWriter <- (&packetStreamFrame{
Type: TrackIdentifier, Type: TrackIdentifier,
Category: packet.Category(), Category: packet.Category(),
StartSampleNumber: packet.GetStartSampleNumber(), StartSampleNumber: packet.GetStartSampleNumber(),
DurationInSamples: packet.GetEndSampleNumber() - packet.GetStartSampleNumber(), DurationInSamples: packet.GetEndSampleNumber() - packet.GetStartSampleNumber(),
Data: queueInfoBuf[:n], Data: queueInfoBuf[:n],
}).Encode() }).Encode()); err != nil {
return err
}
if metadataBytes, err := json.Marshal(metadataPacket.TrackEntry.Metadata); err == nil { if metadataBytes, err := json.Marshal(metadataPacket.TrackEntry.Metadata); err == nil {
if len(l.sliceWriter) >= (cap(l.sliceWriter) - 1) {
l.ctx.Fail(errors.New("client ran out of writer buffer"))
return l.ctx.Error()
}
l.sliceWriter <- (&packetStreamFrame{ if err := l.writer((&packetStreamFrame{
Type: TrackMetadata, Type: TrackMetadata,
Category: packet.Category(), Category: packet.Category(),
StartSampleNumber: packet.GetStartSampleNumber(), StartSampleNumber: packet.GetStartSampleNumber(),
DurationInSamples: packet.GetEndSampleNumber() - packet.GetStartSampleNumber(), DurationInSamples: packet.GetEndSampleNumber() - packet.GetStartSampleNumber(),
Data: metadataBytes, Data: metadataBytes,
}).Encode() }).Encode()); err != nil {
return err
}
} }
return nil return nil
} }
if len(l.sliceWriter) >= (cap(l.sliceWriter) - 1) {
l.ctx.Fail(errors.New("client ran out of writer buffer"))
return l.ctx.Error()
}
var frameType PacketStreamType var frameType PacketStreamType
switch packet.KeepMode() { switch packet.KeepMode() {
case packetizer.KeepLast: case packetizer.KeepLast:
@ -146,19 +137,18 @@ func (l *Listener) Write(packet packetizer.Packet) error {
return errors.New("unknown KeepMode") return errors.New("unknown KeepMode")
} }
l.sliceWriter <- (&packetStreamFrame{ return l.writer((&packetStreamFrame{
Type: frameType, Type: frameType,
Category: packet.Category(), Category: packet.Category(),
StartSampleNumber: packet.GetStartSampleNumber(), StartSampleNumber: packet.GetStartSampleNumber(),
DurationInSamples: packet.GetEndSampleNumber() - packet.GetStartSampleNumber(), DurationInSamples: packet.GetEndSampleNumber() - packet.GetStartSampleNumber(),
Data: packet.GetData(), Data: packet.GetData(),
}).Encode() }).Encode())
return nil
} }
func (l *Listener) Close() { func (l *Listener) Close() {
close(l.sliceWriter) l.writer(nil)
close(l.waiter)
} }
type PacketStreamType uint64 type PacketStreamType uint64

View file

@ -1,12 +1,10 @@
package icy package icy
import ( import (
"errors"
"fmt" "fmt"
"git.gammaspectra.live/S.O.N.G/Kirika/audio/packetizer" "git.gammaspectra.live/S.O.N.G/Kirika/audio/packetizer"
"git.gammaspectra.live/S.O.N.G/MeteorLight/listener" "git.gammaspectra.live/S.O.N.G/MeteorLight/listener"
"git.gammaspectra.live/S.O.N.G/MeteorLight/queue/metadata" "git.gammaspectra.live/S.O.N.G/MeteorLight/queue/metadata"
"git.gammaspectra.live/S.O.N.G/MeteorLight/util"
"strconv" "strconv"
"sync/atomic" "sync/atomic"
) )
@ -22,28 +20,32 @@ type Listener struct {
counter int counter int
ctx *util.RequestDone writer listener.WriterFunc
sliceWriter chan []byte waiter chan struct{}
offsetStart bool offsetStart bool
pendingMetadata map[string]string pendingMetadata map[string]string
} }
func NewListener(info listener.Information, ctx *util.RequestDone, sliceWriter chan []byte, samplesToBuffer int64, offsetStart bool) (*Listener, map[string]string) { func NewListener(info listener.Information, writer listener.WriterFunc, samplesToBuffer int64, offsetStart bool) (*Listener, map[string]string) {
headers := make(map[string]string) headers := make(map[string]string)
headers["icy-metaint"] = strconv.Itoa(icyInterval) headers["icy-metaint"] = strconv.Itoa(icyInterval)
return &Listener{ return &Listener{
information: info, information: info,
ctx: ctx, writer: writer,
sliceWriter: sliceWriter,
offsetStart: offsetStart, offsetStart: offsetStart,
streamSamplesBuffer: samplesToBuffer, streamSamplesBuffer: samplesToBuffer,
streamStartOffset: -1, streamStartOffset: -1,
pendingMetadata: make(map[string]string), pendingMetadata: make(map[string]string),
waiter: make(chan struct{}),
}, headers }, headers
} }
func (l *Listener) Wait() {
<-l.waiter
}
func (l *Listener) Identifier() string { func (l *Listener) Identifier() string {
return l.information.Identifier return l.information.Identifier
} }
@ -101,9 +103,6 @@ func (l *Listener) writeIcy() []byte {
} }
func (l *Listener) Write(packet packetizer.Packet) error { func (l *Listener) Write(packet packetizer.Packet) error {
if l.ctx.Done() {
return l.ctx.Error()
}
if metadataPacket, ok := packet.(*metadata.Packet); ok { if metadataPacket, ok := packet.(*metadata.Packet); ok {
if len(metadataPacket.TrackEntry.Artist()) > 0 { if len(metadataPacket.TrackEntry.Artist()) > 0 {
@ -148,15 +147,10 @@ func (l *Listener) Write(packet packetizer.Packet) error {
} }
} }
if len(l.sliceWriter) >= (cap(l.sliceWriter) - 1) { return l.writer(data)
l.ctx.Fail(errors.New("client ran out of writer buffer"))
return l.ctx.Error()
}
l.sliceWriter <- data
return nil
} }
func (l *Listener) Close() { func (l *Listener) Close() {
close(l.sliceWriter) l.writer(nil)
close(l.waiter)
} }

View file

@ -22,14 +22,7 @@ type Listener interface {
Start(packets []packetizer.Packet) error Start(packets []packetizer.Packet) error
Write(packet packetizer.Packet) error Write(packet packetizer.Packet) error
Close() Close()
Wait()
} }
/* type WriterFunc func(data []byte) error
type Listener struct {
Information ListenerInformation
Start func(packets []packetizer.Packet) error
Write func(packet packetizer.Packet) error
Close func()
}
*/

View file

@ -1,11 +1,9 @@
package plain package plain
import ( import (
"errors"
"git.gammaspectra.live/S.O.N.G/Kirika/audio/packetizer" "git.gammaspectra.live/S.O.N.G/Kirika/audio/packetizer"
"git.gammaspectra.live/S.O.N.G/MeteorLight/listener" "git.gammaspectra.live/S.O.N.G/MeteorLight/listener"
"git.gammaspectra.live/S.O.N.G/MeteorLight/queue/metadata" "git.gammaspectra.live/S.O.N.G/MeteorLight/queue/metadata"
"git.gammaspectra.live/S.O.N.G/MeteorLight/util"
"sync/atomic" "sync/atomic"
) )
@ -15,22 +13,26 @@ type Listener struct {
streamStartOffset int64 streamStartOffset int64
streamSamplesBuffer int64 streamSamplesBuffer int64
ctx *util.RequestDone writer listener.WriterFunc
sliceWriter chan []byte waiter chan struct{}
offsetStart bool offsetStart bool
} }
func NewListener(info listener.Information, ctx *util.RequestDone, sliceWriter chan []byte, samplesToBuffer int64, offsetStart bool) (*Listener, map[string]string) { func NewListener(info listener.Information, writer listener.WriterFunc, samplesToBuffer int64, offsetStart bool) (*Listener, map[string]string) {
return &Listener{ return &Listener{
information: info, information: info,
ctx: ctx, writer: writer,
sliceWriter: sliceWriter,
offsetStart: offsetStart, offsetStart: offsetStart,
streamSamplesBuffer: samplesToBuffer, streamSamplesBuffer: samplesToBuffer,
streamStartOffset: -1, streamStartOffset: -1,
waiter: make(chan struct{}),
}, nil }, nil
} }
func (l *Listener) Wait() {
<-l.waiter
}
func (l *Listener) Identifier() string { func (l *Listener) Identifier() string {
return l.information.Identifier return l.information.Identifier
} }
@ -62,37 +64,27 @@ func (l *Listener) Start(packets []packetizer.Packet) error {
} }
func (l *Listener) Write(packet packetizer.Packet) error { func (l *Listener) Write(packet packetizer.Packet) error {
if l.ctx.Done() {
return l.ctx.Error()
}
if _, ok := packet.(*metadata.Packet); ok { if _, ok := packet.(*metadata.Packet); ok {
return nil return nil
} }
if len(l.sliceWriter) >= (cap(l.sliceWriter) - 1) {
l.ctx.Fail(errors.New("client ran out of writer buffer"))
return l.ctx.Error()
}
if offsetable, ok := packet.(packetizer.OffsetablePacket); l.offsetStart && ok { if offsetable, ok := packet.(packetizer.OffsetablePacket); l.offsetStart && ok {
if l.streamStartOffset <= -1 { if l.streamStartOffset <= -1 {
if offsetable.KeepMode() != packetizer.Keep { if offsetable.KeepMode() != packetizer.Keep {
l.streamStartOffset = offsetable.GetStartSampleNumber() l.streamStartOffset = offsetable.GetStartSampleNumber()
l.sliceWriter <- offsetable.GetDataOffset(l.streamStartOffset) return l.writer(offsetable.GetDataOffset(l.streamStartOffset))
} else { } else {
l.sliceWriter <- packet.GetData() return l.writer(packet.GetData())
} }
} else { } else {
l.sliceWriter <- offsetable.GetDataOffset(l.streamStartOffset) return l.writer(offsetable.GetDataOffset(l.streamStartOffset))
} }
} else { } else {
l.sliceWriter <- packet.GetData() return l.writer(packet.GetData())
} }
return nil
} }
func (l *Listener) Close() { func (l *Listener) Close() {
close(l.sliceWriter) l.writer(nil)
close(l.waiter)
} }

View file

@ -409,10 +409,6 @@ func (q *Queue) HandleRadioRequest(writer http.ResponseWriter, request *http.Req
writer.Header().Set("x-audiocast-public", "1") writer.Header().Set("x-audiocast-public", "1")
} }
//buffer a bit, drop channels when buffer grows to not lock others. They will get disconnected elsewhere
const byteSliceChannelBuffer = 1024 * 16
writeChannel := make(chan []byte, byteSliceChannelBuffer)
requestDone := util.RequestDone{} requestDone := util.RequestDone{}
var headers []listener.HeaderEntry var headers []listener.HeaderEntry
@ -476,14 +472,61 @@ func (q *Queue) HandleRadioRequest(writer http.ResponseWriter, request *http.Req
var mountListener listener.Listener var mountListener listener.Listener
var extraHeaders map[string]string var extraHeaders map[string]string
ctx := request.Context()
//buffer a bit, drop channels when buffer grows to not lock others. They will get disconnected elsewhere
const byteSliceChannelBuffer = 1024 * 16
writeChannel := make(chan []byte, byteSliceChannelBuffer)
go func() {
var flusher http.Flusher
if httpFlusher, ok := writer.(http.Flusher); ok {
flusher = httpFlusher
}
for !requestDone.Done() {
select {
case byteSlice := <-writeChannel:
if _, err := writer.Write(byteSlice); err != nil {
requestDone.Fail(err)
break
}
//try flush
if flusher != nil {
flusher.Flush()
}
case <-ctx.Done():
requestDone.Fail(ctx.Err())
break
}
}
}()
funcWriter := func(byteSlice []byte) error {
if requestDone.Done() {
return requestDone.Error()
}
if len(byteSlice) > 0 {
if len(writeChannel) >= (cap(writeChannel) - 1) {
requestDone.Fail(errors.New("client ran out of writer buffer"))
return requestDone.Error()
}
writeChannel <- byteSlice
}
return nil
}
//set X-Audio-Packet-Stream for strictly timed packets and metadata //set X-Audio-Packet-Stream for strictly timed packets and metadata
if numberValue, err := strconv.Atoi(request.Header.Get("x-audio-packet-stream")); err == nil && numberValue == 1 { if numberValue, err := strconv.Atoi(request.Header.Get("x-audio-packet-stream")); err == nil && numberValue == 1 {
//version 1 //version 1
mountListener, extraHeaders = aps1.NewListener(listenerInformation, &requestDone, writeChannel, sampleBufferLimit, mount.OffsetStart, mount.Channels, mount.SampleRate, mount.MimeType) mountListener, extraHeaders = aps1.NewListener(listenerInformation, funcWriter, sampleBufferLimit, mount.OffsetStart, mount.Channels, mount.SampleRate, mount.MimeType)
} else if numberValue, err = strconv.Atoi(request.Header.Get("icy-metadata")); err == nil && numberValue >= 1 { } else if numberValue, err = strconv.Atoi(request.Header.Get("icy-metadata")); err == nil && numberValue >= 1 {
mountListener, extraHeaders = icy.NewListener(listenerInformation, &requestDone, writeChannel, sampleBufferLimit, mount.OffsetStart) mountListener, extraHeaders = icy.NewListener(listenerInformation, funcWriter, sampleBufferLimit, mount.OffsetStart)
} else { } else {
mountListener, extraHeaders = plain.NewListener(listenerInformation, &requestDone, writeChannel, sampleBufferLimit, mount.OffsetStart) mountListener, extraHeaders = plain.NewListener(listenerInformation, funcWriter, sampleBufferLimit, mount.OffsetStart)
} }
if mountListener == nil { if mountListener == nil {
@ -500,29 +543,9 @@ func (q *Queue) HandleRadioRequest(writer http.ResponseWriter, request *http.Req
log.Printf("adding %s client to stream %s (%s, %s, agent \"%s\", buffer %.2f seconds)\n", listenerIdentifier, mount.Mount, request.RemoteAddr, request.Proto, request.Header.Get("user-agent"), float64(sampleBufferLimit)/float64(mount.SampleRate)) log.Printf("adding %s client to stream %s (%s, %s, agent \"%s\", buffer %.2f seconds)\n", listenerIdentifier, mount.Mount, request.RemoteAddr, request.Proto, request.Header.Get("user-agent"), float64(sampleBufferLimit)/float64(mount.SampleRate))
mount.AddListener(mountListener) mount.AddListener(mountListener)
ctx := request.Context() mountListener.Wait()
var flusher http.Flusher
if httpFlusher, ok := writer.(http.Flusher); ok {
flusher = httpFlusher
}
for !requestDone.Done() { requestDone.Complete()
select {
case byteSlice := <-writeChannel:
if _, err := writer.Write(byteSlice); err != nil {
requestDone.Fail(err)
break
}
//try flush
if flusher != nil {
flusher.Flush()
}
case <-ctx.Done():
requestDone.Fail(ctx.Err())
break
}
}
log.Printf("removing %s client from stream %s\n", listenerIdentifier, mount.Mount) log.Printf("removing %s client from stream %s\n", listenerIdentifier, mount.Mount)
return return

View file

@ -272,8 +272,11 @@ func (m *Mount) handlePacket(packet packetizer.Packet) {
var err error var err error
for _, l := range m.listeners { for _, l := range m.listeners {
if !l.HasStarted() { if !l.HasStarted() {
//TODO: handle error too? if err = l.Start(m.keepBuffer); err != nil {
l.Start(m.keepBuffer) log.Printf("failed to write data to %s client: %s\n", l.Identifier(), err)
toRemove = append(toRemove, l.Identifier())
continue
}
} }
if err = l.Write(packet); err != nil { if err = l.Write(packet); err != nil {
log.Printf("failed to write data to %s client: %s\n", l.Identifier(), err) log.Printf("failed to write data to %s client: %s\n", l.Identifier(), err)

View file

@ -1,96 +0,0 @@
package util
import (
"log"
"net"
"sync/atomic"
"time"
)
type listener struct {
net.Listener
ReadTimeout time.Duration
WriteTimeout time.Duration
}
func (l *listener) Accept() (net.Conn, error) {
c, err := l.Listener.Accept()
if err != nil {
return nil, err
}
if tcpConnection, ok := c.(*net.TCPConn); ok {
tcpConnection.SetReadBuffer(1024 * 64)
tcpConnection.SetWriteBuffer(1024 * 64)
tcpConnection.SetKeepAlive(true)
tcpConnection.SetKeepAlivePeriod(l.WriteTimeout / 4)
tcpConnection.SetNoDelay(true)
tcpConnection.SetLinger(0)
}
log.Printf("accepted new connection from %s\n", c.RemoteAddr().String())
tc := &Conn{
Conn: c,
ReadTimeout: l.ReadTimeout,
WriteTimeout: l.WriteTimeout,
ReadThreshold: int32((l.ReadTimeout * 1024) / time.Second),
WriteThreshold: int32((l.WriteTimeout * 1024) / time.Second),
}
return tc, nil
}
// Conn wraps a net.Conn, and sets a deadline for every read
// and write operation.
type Conn struct {
net.Conn
ReadTimeout time.Duration
WriteTimeout time.Duration
ReadThreshold int32
WriteThreshold int32
BytesReadFromDeadline atomic.Int32
BytesWrittenFromDeadline atomic.Int32
}
func (c *Conn) Read(b []byte) (n int, err error) {
if c.BytesReadFromDeadline.Load() > c.ReadThreshold {
c.BytesReadFromDeadline.Store(0)
// we set both read and write deadlines here otherwise after the request
// is read writing the response fails with an i/o timeout error
err = c.Conn.SetDeadline(time.Now().Add(c.ReadTimeout))
if err != nil {
return 0, err
}
}
n, err = c.Conn.Read(b)
c.BytesReadFromDeadline.Add(int32(n))
return
}
func (c *Conn) Write(b []byte) (n int, err error) {
if c.BytesWrittenFromDeadline.Load() > c.WriteThreshold {
c.BytesWrittenFromDeadline.Store(0)
// we extend the read deadline too, not sure it's necessary,
// but it doesn't hurt
err = c.Conn.SetDeadline(time.Now().Add(c.WriteTimeout))
if err != nil {
return
}
}
n, err = c.Conn.Write(b)
c.BytesWrittenFromDeadline.Add(int32(n))
return
}
func NewTimeoutListener(network, addr string, readTimeout, writeTimeout time.Duration) (net.Listener, error) {
l, err := net.Listen(network, addr)
if err != nil {
return nil, err
}
tl := &listener{
Listener: l,
ReadTimeout: readTimeout,
WriteTimeout: writeTimeout,
}
return tl, nil
}