diff --git a/api/api.go b/api/api.go index 1916e85..6ac0cbc 100644 --- a/api/api.go +++ b/api/api.go @@ -182,7 +182,7 @@ func (a *API) listen() { type queueResultResponse struct { resultResponse - QueueId queue.QueueIdentifier `json:"queue_id"` + QueueId queue.Identifier `json:"queue_id"` } a.wg.Add(1) @@ -261,7 +261,7 @@ func (a *API) listen() { if e, err := a.getQueueEntryFromBody(body); e != nil { if err = a.queue.AddTrack(e, false); err == nil { result.Success = true - result.QueueId = e.QueueIdentifier + result.QueueId = e.Identifier } else { resultErr := err.Error() result.Reason = &resultErr @@ -281,7 +281,7 @@ func (a *API) listen() { } else if request.Method == "DELETE" { result := resultResponse{} if head := a.queue.GetHead(); head != nil { - result.Success = a.queue.Remove(head.QueueIdentifier) + result.Success = a.queue.Remove(head.Identifier) } jsonData, _ := json.Marshal(result) @@ -297,7 +297,7 @@ func (a *API) listen() { if e, err := a.getQueueEntryFromBody(body); e != nil { if err = a.queue.AddTrack(e, true); err == nil { result.Success = true - result.QueueId = e.QueueIdentifier + result.QueueId = e.Identifier } else { resultErr := err.Error() result.Reason = &resultErr @@ -317,7 +317,7 @@ func (a *API) listen() { } else if request.Method == "DELETE" { result := resultResponse{} if head := a.queue.GetTail(); head != nil { - result.Success = a.queue.Remove(head.QueueIdentifier) + result.Success = a.queue.Remove(head.Identifier) } jsonData, _ := json.Marshal(result) @@ -334,7 +334,7 @@ func (a *API) listen() { result := resultResponse{} for _, e := range a.queue.GetQueue() { - a.queue.Remove(e.QueueIdentifier) + a.queue.Remove(e.Identifier) } result.Success = true @@ -350,7 +350,7 @@ func (a *API) listen() { if i, err := strconv.ParseInt(pathSegments[2], 10, 0); err == nil { result := resultResponse{} - result.Success = a.queue.Remove(queue.QueueIdentifier(i)) + result.Success = a.queue.Remove(queue.Identifier(i)) jsonData, _ := json.Marshal(result) writer.Write(jsonData) diff --git a/go.mod b/go.mod index 7e1c339..fe8e730 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module git.gammaspectra.live/S.O.N.G/MeteorLight go 1.19 require ( - git.gammaspectra.live/S.O.N.G/Kirika v0.0.0-20220803133829-bf39ddac3ac7 + git.gammaspectra.live/S.O.N.G/Kirika v0.0.0-20220905145717-5b3948f68ccc github.com/BurntSushi/toml v1.2.0 github.com/dhowden/tag v0.0.0-20220618230019-adf36e896086 github.com/enriquebris/goconcurrentqueue v0.6.3 diff --git a/go.sum b/go.sum index 1a3a8da..2e67c40 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,5 @@ -git.gammaspectra.live/S.O.N.G/Kirika v0.0.0-20220803133829-bf39ddac3ac7 h1:580azkVpYctqZcgT3RGRP7rQJfURXHfD3F4/WzHHKpU= -git.gammaspectra.live/S.O.N.G/Kirika v0.0.0-20220803133829-bf39ddac3ac7/go.mod h1:t8jY8NqFY2Wwzcyj486u7ox4zyfXnlnoQBgbOzBQdl0= +git.gammaspectra.live/S.O.N.G/Kirika v0.0.0-20220905145717-5b3948f68ccc h1:V3NtzTkIoTWa2U8Pv8ypFap+3B1Axr2TUja7lhYwfMM= +git.gammaspectra.live/S.O.N.G/Kirika v0.0.0-20220905145717-5b3948f68ccc/go.mod h1:IALWR0qk0+O4sxb7vlW9HZssHL4YbtI4ju6UvBDOxOw= git.gammaspectra.live/S.O.N.G/flacgo v0.0.0-20220726151057-28f458bc5391 h1:us3yKKsnMe0FZVHRSFZCw113ddiNrZgKf5M5PNr3SQ4= git.gammaspectra.live/S.O.N.G/flacgo v0.0.0-20220726151057-28f458bc5391/go.mod h1:ZVHB/7Vrs9xxK1j98+SJ5TRYBc7Q9dIUaNJHEmysZcI= git.gammaspectra.live/S.O.N.G/go-alac v0.0.0-20220421115623-d0b3bfe57e0f h1:CxN7zlk5FdAieyRKQSbwBGBsvQ2cDF8JVCODZpzcRkA= diff --git a/listener/aps1/aps1.go b/listener/aps1/aps1.go index ff37714..ccdef33 100644 --- a/listener/aps1/aps1.go +++ b/listener/aps1/aps1.go @@ -93,7 +93,7 @@ func (l *Listener) Write(packet packetizer.Packet) error { if metadataPacket, ok := packet.(*metadata.Packet); ok { queueInfoBuf := make([]byte, binary.MaxVarintLen64) - n := binary.PutVarint(queueInfoBuf, int64(metadataPacket.TrackEntry.QueueIdentifier)) + n := binary.PutVarint(queueInfoBuf, int64(metadataPacket.TrackEntry.Identifier)) if len(l.sliceWriter) >= (cap(l.sliceWriter) - 1) { l.ctx.Fail(errors.New("client ran out of writer buffer")) diff --git a/queue/queue.go b/queue/queue.go index 699c94c..d60d033 100644 --- a/queue/queue.go +++ b/queue/queue.go @@ -100,7 +100,7 @@ func (q *Queue) AddTrack(entry *track.Entry, tail bool) error { return err } - startCallback := func(queue *queue.Queue, queueEntry *queue.QueueEntry) { + startCallback := func(queue *queue.Queue, queueEntry *queue.Entry) { if e := q.Get(queueEntry.Identifier); e != nil { //is this needed? log.Printf("now playing \"%s\": %s - %s (%s)\n", e.Path, e.Metadata.Title, e.Metadata.Artist, e.Metadata.Album) q.NowPlaying <- e @@ -116,11 +116,11 @@ func (q *Queue) AddTrack(entry *track.Entry, tail bool) error { } } - endCallback := func(queue *queue.Queue, entry *queue.QueueEntry) { + endCallback := func(queue *queue.Queue, entry *queue.Entry) { } - removeCallback := func(queue *queue.Queue, entry *queue.QueueEntry) { + removeCallback := func(queue *queue.Queue, entry *queue.Entry) { //TODO: carry sample rate error q.duration.Add(int64((time.Second * time.Duration(entry.ReadSamples.Load())) / time.Duration(entry.Source.GetSampleRate()))) @@ -145,12 +145,12 @@ func (q *Queue) AddTrack(entry *track.Entry, tail bool) error { } if tail { - entry.QueueIdentifier = q.audioQueue.AddTail(source, startCallback, endCallback, removeCallback) + entry.Identifier = q.audioQueue.AddTail(source, startCallback, endCallback, removeCallback) } else { - entry.QueueIdentifier = q.audioQueue.AddHead(source, startCallback, endCallback, removeCallback) + entry.Identifier = q.audioQueue.AddHead(source, startCallback, endCallback, removeCallback) } - entry.Original["queue_id"] = entry.QueueIdentifier + entry.Original["queue_id"] = entry.Identifier if tail || len(q.queue) == 0 { q.queue = append(q.queue, entry) @@ -186,11 +186,11 @@ func (q *Queue) GetQueue() (result []*track.Entry) { return } -func (q *Queue) Get(identifier queue.QueueIdentifier) *track.Entry { +func (q *Queue) Get(identifier queue.Identifier) *track.Entry { q.mutex.RLock() defer q.mutex.RUnlock() for _, e := range q.queue { - if e.QueueIdentifier == identifier { + if e.Identifier == identifier { return e } } @@ -238,19 +238,26 @@ func (q *Queue) GetTail() *track.Entry { return nil } -func (q *Queue) Remove(identifier queue.QueueIdentifier) bool { - q.mutex.Lock() - for i, e := range q.queue { - if e.QueueIdentifier == identifier { - q.queue = append(q.queue[:i], q.queue[i+1:]...) - q.mutex.Unlock() - q.audioQueue.Remove(identifier) - e.Close() - return true +func (q *Queue) Remove(identifier queue.Identifier) bool { + var entry *track.Entry + func() { + defer q.audioQueue.Remove(identifier) + q.mutex.Lock() + defer q.mutex.Unlock() + + for i, e := range q.queue { + if e.Identifier == identifier { + q.queue = append(q.queue[:i], q.queue[i+1:]...) + entry = e + return + } } + }() + + if entry != nil { + _ = entry.Close() + return true } - q.mutex.Unlock() - q.audioQueue.Remove(identifier) return false } @@ -493,30 +500,29 @@ func (q *Queue) HandleRadioRequest(writer http.ResponseWriter, request *http.Req log.Printf("adding %s client to stream %s (%s, %s, agent \"%s\", buffer %.2f seconds)\n", listenerIdentifier, mount.Mount, request.RemoteAddr, request.Proto, request.Header.Get("user-agent"), float64(sampleBufferLimit)/float64(mount.SampleRate)) mount.AddListener(mountListener) - var wgClient sync.WaitGroup + ctx := request.Context() + var flusher http.Flusher + if httpFlusher, ok := writer.(http.Flusher); ok { + flusher = httpFlusher + } - wgClient.Add(1) - go func() { - defer wgClient.Done() - var flusher http.Flusher - if httpFlusher, ok := writer.(http.Flusher); ok { - flusher = httpFlusher - } - - for byteSlice := range writeChannel { + for !requestDone.Done() { + select { + case byteSlice := <-writeChannel: if _, err := writer.Write(byteSlice); err != nil { requestDone.Fail(err) - return + break } //try flush if flusher != nil { flusher.Flush() } + case <-ctx.Done(): + requestDone.Fail(ctx.Err()) + break } - }() - - wgClient.Wait() + } log.Printf("removing %s client from stream %s\n", listenerIdentifier, mount.Mount) return diff --git a/queue/track/track.go b/queue/track/track.go index fc7fdf2..2f5c322 100644 --- a/queue/track/track.go +++ b/queue/track/track.go @@ -17,9 +17,9 @@ import ( ) type Entry struct { - QueueIdentifier queue.QueueIdentifier - Path string - Metadata struct { + Identifier queue.Identifier + Path string + Metadata struct { Title interface{} `json:"title"` Album interface{} `json:"album"` Artist interface{} `json:"artist"`