Properly reset websocket on error
This commit is contained in:
parent
b9d7774896
commit
544712c585
11
bot.go
11
bot.go
|
@ -457,7 +457,7 @@ func main() {
|
|||
if e.ApiEndpoint == "" {
|
||||
continue
|
||||
}
|
||||
go func() {
|
||||
go func(e *channelEntry) {
|
||||
for range time.NewTicker(time.Second * 30).C {
|
||||
ws := e.Ws.Load()
|
||||
if ws != nil {
|
||||
|
@ -465,14 +465,14 @@ func main() {
|
|||
ctx, cancel := context.WithTimeout(context.Background(), time.Second*15)
|
||||
defer cancel()
|
||||
if err := ws.Ping(ctx); err != nil {
|
||||
log.Printf("Closing websocket, %s", err)
|
||||
log.Printf("[WS] WebSocket for %s disconnected on ping: %s", e.ApiEndpoint, err)
|
||||
ws.Close(websocket.StatusGoingAway, "error on ping "+err.Error())
|
||||
e.Ws.Store(nil)
|
||||
}
|
||||
}()
|
||||
}
|
||||
}
|
||||
}()
|
||||
}(e)
|
||||
go func(e *channelEntry) {
|
||||
|
||||
for {
|
||||
|
@ -487,10 +487,11 @@ func main() {
|
|||
ws = e.Ws.Load()
|
||||
}
|
||||
|
||||
if event := e.readEvent(); event == nil {
|
||||
if event, err := e.readEvent(); event == nil || err != nil {
|
||||
log.Printf("[WS] WebSocket for %s disconnected: %s", e.ApiEndpoint, err)
|
||||
ws.Close(websocket.StatusGoingAway, "error on reading event")
|
||||
e.Ws.Store(nil)
|
||||
break
|
||||
continue
|
||||
} else {
|
||||
func() {
|
||||
e.ChainLock.Lock()
|
||||
|
|
19
entry.go
19
entry.go
|
@ -3,6 +3,7 @@ package main
|
|||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"git.gammaspectra.live/P2Pool/p2pool-observer/cmd/utils"
|
||||
"git.gammaspectra.live/P2Pool/p2pool-observer/index"
|
||||
|
@ -155,30 +156,32 @@ func (c *channelEntry) openWebSocket() {
|
|||
}
|
||||
}
|
||||
|
||||
func (c *channelEntry) readEvent() *utils.JSONEvent {
|
||||
func (c *channelEntry) readEvent() (*utils.JSONEvent, error) {
|
||||
if c.ApiEndpoint == "" {
|
||||
return nil
|
||||
return nil, errors.New("nil endpoint")
|
||||
}
|
||||
|
||||
ws := c.Ws.Load()
|
||||
if ws == nil {
|
||||
return nil
|
||||
return nil, errors.New("nil websocket")
|
||||
}
|
||||
|
||||
var event utils.JSONEvent
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*5)
|
||||
defer cancel()
|
||||
messageType, reader, err := ws.Reader(ctx)
|
||||
if messageType != websocket.MessageText || err != nil {
|
||||
return nil
|
||||
if messageType != websocket.MessageText {
|
||||
return nil, errors.New("message is not text")
|
||||
} else if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if buf, err := io.ReadAll(reader); err != nil {
|
||||
return nil
|
||||
return nil, err
|
||||
} else if err = json.Unmarshal(buf, &event); err != nil {
|
||||
return nil
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &event
|
||||
return &event, nil
|
||||
}
|
||||
|
||||
func (c *channelEntry) pruneBlocks() {
|
||||
|
|
Loading…
Reference in a new issue