observer-bot/entry.go

221 lines
5.5 KiB
Go

package main
import (
"context"
"encoding/json"
"errors"
"fmt"
"git.gammaspectra.live/P2Pool/consensus/v3/p2pool/sidechain"
"git.gammaspectra.live/P2Pool/consensus/v3/types"
"git.gammaspectra.live/P2Pool/observer-cmd-utils/index"
"git.gammaspectra.live/P2Pool/observer-cmd-utils/utils"
"io"
"log"
"net/url"
"nhooyr.io/websocket"
"slices"
"sync"
"sync/atomic"
"time"
)
type channelEntry struct {
ApiEndpoint string
Channel string
Name string
PreviousBlocks foundBlocks
PoolInfo map[string]any
Consensus *sidechain.Consensus
Tip *index.SideBlock
Window map[types.Hash]*index.SideBlock
ChainLock sync.RWMutex
Ws atomic.Pointer[websocket.Conn]
}
func (c *channelEntry) DesiredPruneDistance() uint64 {
//prune after a whole day
/*pruneDistance := (3600 * 24) / c.Consensus.TargetBlockTime
if pruneDistance < c.Consensus.ChainWindowSize {
pruneDistance = c.Consensus.ChainWindowSize
}*/
return c.Consensus.ChainWindowSize
}
func (c *channelEntry) SideBlockUnclesByTemplateId(id types.Hash) index.QueryIterator[index.SideBlock] {
var uncles []index.SideBlockUncleEntry
if b := c.SideBlockByTemplateId(id); b != nil && !b.IsUncle() {
uncles = b.Uncles
}
return &index.FakeQueryResult[index.SideBlock]{
NextFunction: func() (int, *index.SideBlock) {
if len(uncles) == 0 {
return 0, nil
}
if uncle := c.SideBlockByTemplateId(uncles[0].TemplateId); uncle != nil {
uncles = uncles[1:]
return 0, uncle
}
return 0, nil
},
}
}
// IterateWindow lock must be taken before this
func (c *channelEntry) IterateWindow(addWeightFunc index.SideBlockWindowAddWeightFunc) (bottomHeight uint64) {
if c.ApiEndpoint == "" {
return
}
var err error
bottomHeight, err = index.BlocksInPPLNSWindow(c.Tip, c.Consensus, c.DifficultyFromHeight, c.SideBlockByTemplateId, c.SideBlockUnclesByTemplateId, addWeightFunc)
if err != nil {
log.Printf("error iterating window on %s", c.Name)
return 0
}
return bottomHeight
}
func (c *channelEntry) SideBlockByTemplateId(id types.Hash) *index.SideBlock {
if b, ok := c.Window[id]; ok {
return b
} else if b = getTypeFromAPI[index.SideBlock](c.ApiEndpoint, "/api/block_by_id/"+id.String()); b != nil {
c.Window[b.TemplateId] = b
}
return nil
}
func (c *channelEntry) DifficultyFromHeight(height uint64) types.Difficulty {
if d := getTypeFromAPI[types.Difficulty](c.ApiEndpoint, fmt.Sprintf("/api/main_difficulty_by_height/%d", height)); d != nil {
return *d
}
return types.ZeroDifficulty
}
func (c *channelEntry) getPreviousWindow() {
if c.ApiEndpoint == "" {
return
}
for _, b := range getSideBlocksFromAPI(c.ApiEndpoint, fmt.Sprintf("/api/side_blocks_in_window?window=%d&noMainStatus", c.DesiredPruneDistance())) {
c.Window[b.TemplateId] = b
if c.Tip == nil || c.Tip.SideHeight < b.SideHeight {
c.Tip = b
}
}
}
func (c *channelEntry) getConsensus() {
if c.ApiEndpoint == "" {
return
}
basePoolInfo := getPoolInfo(c.ApiEndpoint)
consensusData, _ := json.Marshal(basePoolInfo.SideChain.Consensus)
consensus, err := sidechain.NewConsensusFromJSON(consensusData)
if err != nil {
log.Panic(err)
}
c.Consensus = consensus
}
func (c *channelEntry) getPreviousBlocks() {
if c.ApiEndpoint == "" {
return
}
result := getSliceFromAPI[*index.FoundBlock](c.ApiEndpoint, fmt.Sprintf("/api/found_blocks?limit=%d", numberOfBlockHistoryToKeep))
//sort from oldest to newest
slices.SortFunc(result, func(a, b *index.FoundBlock) int {
if a.MainBlock.Height < b.MainBlock.Height {
return -1
} else if a.MainBlock.Height > b.MainBlock.Height {
return 1
}
return 0
})
c.PreviousBlocks = result
}
const readLimit = 8 * 1024 * 1024
func (c *channelEntry) openWebSocket() {
if c.ApiEndpoint == "" {
c.Ws.Store(nil)
return
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
defer cancel()
u, _ := url.Parse(c.ApiEndpoint)
if u.Scheme == "https" {
conn, _, err := websocket.Dial(ctx, fmt.Sprintf("wss://%s/api/events", u.Host), nil)
if err != nil {
c.Ws.Store(nil)
return
}
conn.SetReadLimit(readLimit)
c.Ws.Store(conn)
} else {
conn, _, err := websocket.Dial(ctx, fmt.Sprintf("ws://%s/api/events", u.Host), nil)
if err != nil {
c.Ws.Store(nil)
return
}
conn.SetReadLimit(readLimit)
c.Ws.Store(conn)
}
}
func (c *channelEntry) readEvent() (*utils.JSONEvent, error) {
if c.ApiEndpoint == "" {
return nil, errors.New("nil endpoint")
}
ws := c.Ws.Load()
if ws == nil {
return nil, errors.New("nil websocket")
}
var event utils.JSONEvent
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*5)
defer cancel()
messageType, reader, err := ws.Reader(ctx)
if messageType != websocket.MessageText {
return nil, errors.New("message is not text")
} else if err != nil {
return nil, err
}
if buf, err := io.ReadAll(reader); err != nil {
return nil, err
} else if err = json.Unmarshal(buf, &event); err != nil {
return nil, err
}
return &event, nil
}
func (c *channelEntry) pruneBlocks() {
pruneDistance := c.DesiredPruneDistance()
inChainFromTip := make([]types.Hash, 0, pruneDistance*2)
for cur := c.Tip; cur != nil && (c.Tip.EffectiveHeight-cur.EffectiveHeight) <= pruneDistance; cur = c.Window[cur.ParentTemplateId] {
inChainFromTip = append(inChainFromTip, cur.TemplateId)
for _, u := range cur.Uncles {
inChainFromTip = append(inChainFromTip, u.TemplateId)
}
}
for k := range c.Window {
if !slices.Contains(inChainFromTip, k) {
delete(c.Window, k)
}
}
}