diff --git a/listener.go b/listener.go index ec11e21..9f21cd9 100644 --- a/listener.go +++ b/listener.go @@ -30,13 +30,11 @@ func (l *listener) Accept() (net.Conn, error) { log.Printf("accepted new connection from %s\n", c.RemoteAddr().String()) tc := &Conn{ - Conn: c, - ReadTimeout: l.ReadTimeout, - WriteTimeout: l.WriteTimeout, - ReadThreshold: int32((l.ReadTimeout * 1024) / time.Second), - WriteThreshold: int32((l.WriteTimeout * 1024) / time.Second), - BytesReadFromDeadline: 0, - BytesWrittenFromDeadline: 0, + Conn: c, + ReadTimeout: l.ReadTimeout, + WriteTimeout: l.WriteTimeout, + ReadThreshold: int32((l.ReadTimeout * 1024) / time.Second), + WriteThreshold: int32((l.WriteTimeout * 1024) / time.Second), } return tc, nil } @@ -49,13 +47,13 @@ type Conn struct { WriteTimeout time.Duration ReadThreshold int32 WriteThreshold int32 - BytesReadFromDeadline int32 - BytesWrittenFromDeadline int32 + BytesReadFromDeadline atomic.Int32 + BytesWrittenFromDeadline atomic.Int32 } func (c *Conn) Read(b []byte) (n int, err error) { - if atomic.LoadInt32(&c.BytesReadFromDeadline) > c.ReadThreshold { - atomic.StoreInt32(&c.BytesReadFromDeadline, 0) + if c.BytesReadFromDeadline.Load() > c.ReadThreshold { + c.BytesReadFromDeadline.Store(0) // we set both read and write deadlines here otherwise after the request // is read writing the response fails with an i/o timeout error err = c.Conn.SetDeadline(time.Now().Add(c.ReadTimeout)) @@ -64,13 +62,13 @@ func (c *Conn) Read(b []byte) (n int, err error) { } } n, err = c.Conn.Read(b) - atomic.AddInt32(&c.BytesReadFromDeadline, int32(n)) + c.BytesReadFromDeadline.Add(int32(n)) return } func (c *Conn) Write(b []byte) (n int, err error) { - if atomic.LoadInt32(&c.BytesWrittenFromDeadline) > c.WriteThreshold { - atomic.StoreInt32(&c.BytesWrittenFromDeadline, 0) + if c.BytesWrittenFromDeadline.Load() > c.WriteThreshold { + c.BytesWrittenFromDeadline.Store(0) // we extend the read deadline too, not sure it's necessary, // but it doesn't hurt err = c.Conn.SetDeadline(time.Now().Add(c.WriteTimeout)) @@ -79,7 +77,7 @@ func (c *Conn) Write(b []byte) (n int, err error) { } } n, err = c.Conn.Write(b) - atomic.AddInt32(&c.BytesWrittenFromDeadline, int32(n)) + c.BytesWrittenFromDeadline.Add(int32(n)) return } diff --git a/queue.go b/queue.go index 5d5243d..8b17837 100644 --- a/queue.go +++ b/queue.go @@ -210,7 +210,7 @@ func (p *QueueMetadataPacket) GetData() []byte { type Queue struct { NowPlaying chan *QueueTrackEntry QueueEmpty chan *QueueTrackEntry - Duration time.Duration + duration atomic.Int64 durationError int64 audioQueue *queue.Queue mounts []*StreamMount @@ -264,6 +264,10 @@ func NewQueue(config *Config) *Queue { return q } +func (q *Queue) GetDuration() time.Duration { + return time.Duration(q.duration.Load()) +} + func (q *Queue) Wait() { q.wg.Wait() close(q.NowPlaying) @@ -280,9 +284,9 @@ func (q *Queue) AddTrack(entry *QueueTrackEntry, tail bool) error { log.Printf("now playing %s\n", e.Path) q.NowPlaying <- e for _, mount := range q.mounts { - mount.MetadataQueue.Enqueue(&QueueMetadataPacket{ + _ = mount.MetadataQueue.Enqueue(&QueueMetadataPacket{ //TODO: carry sample rate error - sampleNumber: int64(q.Duration * time.Duration(queue.GetSampleRate()) / time.Second), + sampleNumber: (q.duration.Load() * int64(queue.GetSampleRate())) / int64(time.Second), TrackEntry: e, }) } @@ -297,7 +301,7 @@ func (q *Queue) AddTrack(entry *QueueTrackEntry, tail bool) error { removeCallback := func(queue *queue.Queue, entry *queue.QueueEntry) { //TODO: carry sample rate error - atomic.AddInt64((*int64)(&q.Duration), int64((time.Second*time.Duration(atomic.LoadUint64(&entry.ReadSamples)))/time.Duration(entry.Source.GetSampleRate()))) + q.duration.Add(int64((time.Second * time.Duration(entry.ReadSamples.Load())) / time.Duration(entry.Source.GetSampleRate()))) q.Remove(entry.Identifier) q.HandleQueue() @@ -635,7 +639,7 @@ func (q *Queue) HandleRadioRequest(writer http.ResponseWriter, request *http.Req writeChannel := make(chan []byte, byteSliceChannelBuffer) requestDone := struct { - Done uint32 + Done atomic.Bool Lock sync.Mutex Error error }{} @@ -649,7 +653,7 @@ func (q *Queue) HandleRadioRequest(writer http.ResponseWriter, request *http.Req writer.Header().Set("Content-Type", "application/x-audio-packet-stream") packetWriteCallback = func(packet packetizer.Packet) error { - if atomic.LoadUint32(&requestDone.Done) == 1 { + if requestDone.Done.Load() { return requestDone.Error } if metadataPacket, ok := packet.(*QueueMetadataPacket); ok { @@ -661,7 +665,7 @@ func (q *Queue) HandleRadioRequest(writer http.ResponseWriter, request *http.Req requestDone.Lock.Lock() defer requestDone.Lock.Unlock() requestDone.Error = errors.New("client ran out of buffer") - atomic.StoreUint32(&requestDone.Done, 1) + requestDone.Done.Store(true) return requestDone.Error } @@ -678,7 +682,7 @@ func (q *Queue) HandleRadioRequest(writer http.ResponseWriter, request *http.Req requestDone.Lock.Lock() defer requestDone.Lock.Unlock() requestDone.Error = errors.New("client ran out of buffer") - atomic.StoreUint32(&requestDone.Done, 1) + requestDone.Done.Store(true) return requestDone.Error } @@ -697,7 +701,7 @@ func (q *Queue) HandleRadioRequest(writer http.ResponseWriter, request *http.Req requestDone.Lock.Lock() defer requestDone.Lock.Unlock() requestDone.Error = errors.New("client ran out of buffer") - atomic.StoreUint32(&requestDone.Done, 1) + requestDone.Done.Store(true) return requestDone.Error } @@ -776,7 +780,7 @@ func (q *Queue) HandleRadioRequest(writer http.ResponseWriter, request *http.Req var streamStartOffset int64 = -1 packetWriteCallback = func(packet packetizer.Packet) error { - if atomic.LoadUint32(&requestDone.Done) == 1 { + if requestDone.Done.Load() { return requestDone.Error } if metadataPacket, ok := packet.(*QueueMetadataPacket); ok { @@ -826,7 +830,7 @@ func (q *Queue) HandleRadioRequest(writer http.ResponseWriter, request *http.Req requestDone.Lock.Lock() defer requestDone.Lock.Unlock() requestDone.Error = errors.New("client ran out of buffer") - atomic.StoreUint32(&requestDone.Done, 1) + requestDone.Done.Store(true) return requestDone.Error } writeChannel <- data @@ -835,7 +839,7 @@ func (q *Queue) HandleRadioRequest(writer http.ResponseWriter, request *http.Req } else { var streamStartOffset int64 = -1 packetWriteCallback = func(packet packetizer.Packet) error { - if atomic.LoadUint32(&requestDone.Done) == 1 { + if requestDone.Done.Load() { return requestDone.Error } if _, ok := packet.(*QueueMetadataPacket); ok { @@ -846,7 +850,7 @@ func (q *Queue) HandleRadioRequest(writer http.ResponseWriter, request *http.Req requestDone.Lock.Lock() defer requestDone.Lock.Unlock() requestDone.Error = errors.New("client ran out of buffer") - atomic.StoreUint32(&requestDone.Done, 1) + requestDone.Done.Store(true) return requestDone.Error } @@ -882,7 +886,7 @@ func (q *Queue) HandleRadioRequest(writer http.ResponseWriter, request *http.Req requestDone.Lock.Lock() defer requestDone.Lock.Unlock() requestDone.Error = errors.New("client ran out of buffer") - atomic.StoreUint32(&requestDone.Done, 1) + requestDone.Done.Store(true) return } //try flush