Use new go.19 atomic[T]

This commit is contained in:
DataHoarder 2022-08-02 21:24:23 +02:00
parent d919c27fd0
commit 351897ba56
Signed by: DataHoarder
SSH key fingerprint: SHA256:OLTRf6Fl87G52SiR7sWLGNzlJt4WOX+tfI2yxo0z7xk
2 changed files with 31 additions and 29 deletions

View file

@ -30,13 +30,11 @@ func (l *listener) Accept() (net.Conn, error) {
log.Printf("accepted new connection from %s\n", c.RemoteAddr().String())
tc := &Conn{
Conn: c,
ReadTimeout: l.ReadTimeout,
WriteTimeout: l.WriteTimeout,
ReadThreshold: int32((l.ReadTimeout * 1024) / time.Second),
WriteThreshold: int32((l.WriteTimeout * 1024) / time.Second),
BytesReadFromDeadline: 0,
BytesWrittenFromDeadline: 0,
Conn: c,
ReadTimeout: l.ReadTimeout,
WriteTimeout: l.WriteTimeout,
ReadThreshold: int32((l.ReadTimeout * 1024) / time.Second),
WriteThreshold: int32((l.WriteTimeout * 1024) / time.Second),
}
return tc, nil
}
@ -49,13 +47,13 @@ type Conn struct {
WriteTimeout time.Duration
ReadThreshold int32
WriteThreshold int32
BytesReadFromDeadline int32
BytesWrittenFromDeadline int32
BytesReadFromDeadline atomic.Int32
BytesWrittenFromDeadline atomic.Int32
}
func (c *Conn) Read(b []byte) (n int, err error) {
if atomic.LoadInt32(&c.BytesReadFromDeadline) > c.ReadThreshold {
atomic.StoreInt32(&c.BytesReadFromDeadline, 0)
if c.BytesReadFromDeadline.Load() > c.ReadThreshold {
c.BytesReadFromDeadline.Store(0)
// we set both read and write deadlines here otherwise after the request
// is read writing the response fails with an i/o timeout error
err = c.Conn.SetDeadline(time.Now().Add(c.ReadTimeout))
@ -64,13 +62,13 @@ func (c *Conn) Read(b []byte) (n int, err error) {
}
}
n, err = c.Conn.Read(b)
atomic.AddInt32(&c.BytesReadFromDeadline, int32(n))
c.BytesReadFromDeadline.Add(int32(n))
return
}
func (c *Conn) Write(b []byte) (n int, err error) {
if atomic.LoadInt32(&c.BytesWrittenFromDeadline) > c.WriteThreshold {
atomic.StoreInt32(&c.BytesWrittenFromDeadline, 0)
if c.BytesWrittenFromDeadline.Load() > c.WriteThreshold {
c.BytesWrittenFromDeadline.Store(0)
// we extend the read deadline too, not sure it's necessary,
// but it doesn't hurt
err = c.Conn.SetDeadline(time.Now().Add(c.WriteTimeout))
@ -79,7 +77,7 @@ func (c *Conn) Write(b []byte) (n int, err error) {
}
}
n, err = c.Conn.Write(b)
atomic.AddInt32(&c.BytesWrittenFromDeadline, int32(n))
c.BytesWrittenFromDeadline.Add(int32(n))
return
}

View file

@ -210,7 +210,7 @@ func (p *QueueMetadataPacket) GetData() []byte {
type Queue struct {
NowPlaying chan *QueueTrackEntry
QueueEmpty chan *QueueTrackEntry
Duration time.Duration
duration atomic.Int64
durationError int64
audioQueue *queue.Queue
mounts []*StreamMount
@ -264,6 +264,10 @@ func NewQueue(config *Config) *Queue {
return q
}
func (q *Queue) GetDuration() time.Duration {
return time.Duration(q.duration.Load())
}
func (q *Queue) Wait() {
q.wg.Wait()
close(q.NowPlaying)
@ -280,9 +284,9 @@ func (q *Queue) AddTrack(entry *QueueTrackEntry, tail bool) error {
log.Printf("now playing %s\n", e.Path)
q.NowPlaying <- e
for _, mount := range q.mounts {
mount.MetadataQueue.Enqueue(&QueueMetadataPacket{
_ = mount.MetadataQueue.Enqueue(&QueueMetadataPacket{
//TODO: carry sample rate error
sampleNumber: int64(q.Duration * time.Duration(queue.GetSampleRate()) / time.Second),
sampleNumber: (q.duration.Load() * int64(queue.GetSampleRate())) / int64(time.Second),
TrackEntry: e,
})
}
@ -297,7 +301,7 @@ func (q *Queue) AddTrack(entry *QueueTrackEntry, tail bool) error {
removeCallback := func(queue *queue.Queue, entry *queue.QueueEntry) {
//TODO: carry sample rate error
atomic.AddInt64((*int64)(&q.Duration), int64((time.Second*time.Duration(atomic.LoadUint64(&entry.ReadSamples)))/time.Duration(entry.Source.GetSampleRate())))
q.duration.Add(int64((time.Second * time.Duration(entry.ReadSamples.Load())) / time.Duration(entry.Source.GetSampleRate())))
q.Remove(entry.Identifier)
q.HandleQueue()
@ -635,7 +639,7 @@ func (q *Queue) HandleRadioRequest(writer http.ResponseWriter, request *http.Req
writeChannel := make(chan []byte, byteSliceChannelBuffer)
requestDone := struct {
Done uint32
Done atomic.Bool
Lock sync.Mutex
Error error
}{}
@ -649,7 +653,7 @@ func (q *Queue) HandleRadioRequest(writer http.ResponseWriter, request *http.Req
writer.Header().Set("Content-Type", "application/x-audio-packet-stream")
packetWriteCallback = func(packet packetizer.Packet) error {
if atomic.LoadUint32(&requestDone.Done) == 1 {
if requestDone.Done.Load() {
return requestDone.Error
}
if metadataPacket, ok := packet.(*QueueMetadataPacket); ok {
@ -661,7 +665,7 @@ func (q *Queue) HandleRadioRequest(writer http.ResponseWriter, request *http.Req
requestDone.Lock.Lock()
defer requestDone.Lock.Unlock()
requestDone.Error = errors.New("client ran out of buffer")
atomic.StoreUint32(&requestDone.Done, 1)
requestDone.Done.Store(true)
return requestDone.Error
}
@ -678,7 +682,7 @@ func (q *Queue) HandleRadioRequest(writer http.ResponseWriter, request *http.Req
requestDone.Lock.Lock()
defer requestDone.Lock.Unlock()
requestDone.Error = errors.New("client ran out of buffer")
atomic.StoreUint32(&requestDone.Done, 1)
requestDone.Done.Store(true)
return requestDone.Error
}
@ -697,7 +701,7 @@ func (q *Queue) HandleRadioRequest(writer http.ResponseWriter, request *http.Req
requestDone.Lock.Lock()
defer requestDone.Lock.Unlock()
requestDone.Error = errors.New("client ran out of buffer")
atomic.StoreUint32(&requestDone.Done, 1)
requestDone.Done.Store(true)
return requestDone.Error
}
@ -776,7 +780,7 @@ func (q *Queue) HandleRadioRequest(writer http.ResponseWriter, request *http.Req
var streamStartOffset int64 = -1
packetWriteCallback = func(packet packetizer.Packet) error {
if atomic.LoadUint32(&requestDone.Done) == 1 {
if requestDone.Done.Load() {
return requestDone.Error
}
if metadataPacket, ok := packet.(*QueueMetadataPacket); ok {
@ -826,7 +830,7 @@ func (q *Queue) HandleRadioRequest(writer http.ResponseWriter, request *http.Req
requestDone.Lock.Lock()
defer requestDone.Lock.Unlock()
requestDone.Error = errors.New("client ran out of buffer")
atomic.StoreUint32(&requestDone.Done, 1)
requestDone.Done.Store(true)
return requestDone.Error
}
writeChannel <- data
@ -835,7 +839,7 @@ func (q *Queue) HandleRadioRequest(writer http.ResponseWriter, request *http.Req
} else {
var streamStartOffset int64 = -1
packetWriteCallback = func(packet packetizer.Packet) error {
if atomic.LoadUint32(&requestDone.Done) == 1 {
if requestDone.Done.Load() {
return requestDone.Error
}
if _, ok := packet.(*QueueMetadataPacket); ok {
@ -846,7 +850,7 @@ func (q *Queue) HandleRadioRequest(writer http.ResponseWriter, request *http.Req
requestDone.Lock.Lock()
defer requestDone.Lock.Unlock()
requestDone.Error = errors.New("client ran out of buffer")
atomic.StoreUint32(&requestDone.Done, 1)
requestDone.Done.Store(true)
return requestDone.Error
}
@ -882,7 +886,7 @@ func (q *Queue) HandleRadioRequest(writer http.ResponseWriter, request *http.Req
requestDone.Lock.Lock()
defer requestDone.Lock.Unlock()
requestDone.Error = errors.New("client ran out of buffer")
atomic.StoreUint32(&requestDone.Done, 1)
requestDone.Done.Store(true)
return
}
//try flush