package main import ( "context" "encoding/json" "errors" "fmt" "git.gammaspectra.live/P2Pool/consensus/v3/p2pool/sidechain" "git.gammaspectra.live/P2Pool/consensus/v3/types" "git.gammaspectra.live/P2Pool/observer-cmd-utils/index" "git.gammaspectra.live/P2Pool/observer-cmd-utils/utils" "io" "log" "net/url" "nhooyr.io/websocket" "slices" "sync" "sync/atomic" "time" ) type channelEntry struct { ApiEndpoint string Channel string Name string PreviousBlocks foundBlocks PoolInfo map[string]any Consensus *sidechain.Consensus Tip *index.SideBlock Window map[types.Hash]*index.SideBlock ChainLock sync.RWMutex Ws atomic.Pointer[websocket.Conn] } func (c *channelEntry) DesiredPruneDistance() uint64 { //prune after a whole day /*pruneDistance := (3600 * 24) / c.Consensus.TargetBlockTime if pruneDistance < c.Consensus.ChainWindowSize { pruneDistance = c.Consensus.ChainWindowSize }*/ return c.Consensus.ChainWindowSize } func (c *channelEntry) SideBlockUnclesByTemplateId(id types.Hash) index.QueryIterator[index.SideBlock] { var uncles []index.SideBlockUncleEntry if b := c.SideBlockByTemplateId(id); b != nil && !b.IsUncle() { uncles = b.Uncles } return &index.FakeQueryResult[index.SideBlock]{ NextFunction: func() (int, *index.SideBlock) { if len(uncles) == 0 { return 0, nil } if uncle := c.SideBlockByTemplateId(uncles[0].TemplateId); uncle != nil { uncles = uncles[1:] return 0, uncle } return 0, nil }, } } // IterateWindow lock must be taken before this func (c *channelEntry) IterateWindow(addWeightFunc index.SideBlockWindowAddWeightFunc) (bottomHeight uint64) { if c.ApiEndpoint == "" { return } var err error bottomHeight, err = index.BlocksInPPLNSWindow(c.Tip, c.Consensus, c.DifficultyFromHeight, c.SideBlockByTemplateId, c.SideBlockUnclesByTemplateId, addWeightFunc) if err != nil { log.Printf("error iterating window on %s", c.Name) return 0 } return bottomHeight } func (c *channelEntry) SideBlockByTemplateId(id types.Hash) *index.SideBlock { if b, ok := c.Window[id]; ok { return b } else if b = getTypeFromAPI[index.SideBlock](c.ApiEndpoint, "/api/block_by_id/"+id.String()); b != nil { c.Window[b.TemplateId] = b } return nil } func (c *channelEntry) DifficultyFromHeight(height uint64) types.Difficulty { if d := getTypeFromAPI[types.Difficulty](c.ApiEndpoint, fmt.Sprintf("/api/main_difficulty_by_height/%d", height)); d != nil { return *d } return types.ZeroDifficulty } func (c *channelEntry) getPreviousWindow() { if c.ApiEndpoint == "" { return } for _, b := range getSideBlocksFromAPI(c.ApiEndpoint, fmt.Sprintf("/api/side_blocks_in_window?window=%d&noMainStatus", c.DesiredPruneDistance())) { c.Window[b.TemplateId] = b if c.Tip == nil || c.Tip.SideHeight < b.SideHeight { c.Tip = b } } } func (c *channelEntry) getConsensus() { if c.ApiEndpoint == "" { return } basePoolInfo := getPoolInfo(c.ApiEndpoint) consensusData, _ := json.Marshal(basePoolInfo.SideChain.Consensus) consensus, err := sidechain.NewConsensusFromJSON(consensusData) if err != nil { log.Panic(err) } c.Consensus = consensus } func (c *channelEntry) getPreviousBlocks() { if c.ApiEndpoint == "" { return } result := getSliceFromAPI[*index.FoundBlock](c.ApiEndpoint, fmt.Sprintf("/api/found_blocks?limit=%d", numberOfBlockHistoryToKeep)) //sort from oldest to newest slices.SortFunc(result, func(a, b *index.FoundBlock) int { if a.MainBlock.Height < b.MainBlock.Height { return -1 } else if a.MainBlock.Height > b.MainBlock.Height { return 1 } return 0 }) c.PreviousBlocks = result } const readLimit = 8 * 1024 * 1024 func (c *channelEntry) openWebSocket() { if c.ApiEndpoint == "" { c.Ws.Store(nil) return } ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) defer cancel() u, _ := url.Parse(c.ApiEndpoint) if u.Scheme == "https" { conn, _, err := websocket.Dial(ctx, fmt.Sprintf("wss://%s/api/events", u.Host), nil) if err != nil { c.Ws.Store(nil) return } conn.SetReadLimit(readLimit) c.Ws.Store(conn) } else { conn, _, err := websocket.Dial(ctx, fmt.Sprintf("ws://%s/api/events", u.Host), nil) if err != nil { c.Ws.Store(nil) return } conn.SetReadLimit(readLimit) c.Ws.Store(conn) } } func (c *channelEntry) readEvent() (*utils.JSONEvent, error) { if c.ApiEndpoint == "" { return nil, errors.New("nil endpoint") } ws := c.Ws.Load() if ws == nil { return nil, errors.New("nil websocket") } var event utils.JSONEvent ctx, cancel := context.WithTimeout(context.Background(), time.Minute*5) defer cancel() messageType, reader, err := ws.Reader(ctx) if messageType != websocket.MessageText { return nil, errors.New("message is not text") } else if err != nil { return nil, err } if buf, err := io.ReadAll(reader); err != nil { return nil, err } else if err = json.Unmarshal(buf, &event); err != nil { return nil, err } return &event, nil } func (c *channelEntry) pruneBlocks() { pruneDistance := c.DesiredPruneDistance() inChainFromTip := make([]types.Hash, 0, pruneDistance*2) for cur := c.Tip; cur != nil && (c.Tip.EffectiveHeight-cur.EffectiveHeight) <= pruneDistance; cur = c.Window[cur.ParentTemplateId] { inChainFromTip = append(inChainFromTip, cur.TemplateId) for _, u := range cur.Uncles { inChainFromTip = append(inChainFromTip, u.TemplateId) } } for k := range c.Window { if !slices.Contains(inChainFromTip, k) { delete(c.Window, k) } } }