consensus/p2pool/mainchain/mainchain.go

445 lines
12 KiB
Go

package mainchain
import (
"context"
"fmt"
mainblock "git.gammaspectra.live/P2Pool/p2pool-observer/monero/block"
"git.gammaspectra.live/P2Pool/p2pool-observer/monero/client"
"git.gammaspectra.live/P2Pool/p2pool-observer/monero/client/zmq"
"git.gammaspectra.live/P2Pool/p2pool-observer/monero/randomx"
"git.gammaspectra.live/P2Pool/p2pool-observer/monero/transaction"
"git.gammaspectra.live/P2Pool/p2pool-observer/p2pool/sidechain"
"git.gammaspectra.live/P2Pool/p2pool-observer/types"
"git.gammaspectra.live/P2Pool/p2pool-observer/utils"
"golang.org/x/exp/slices"
"log"
"sync"
"sync/atomic"
"time"
)
const TimestampWindow = 60
const BlockHeadersRequired = 720
type MainChain struct {
p2pool P2PoolInterface
lock sync.RWMutex
sidechain *sidechain.SideChain
highest uint64
mainchainByHeight map[uint64]*sidechain.ChainMain
mainchainByHash map[types.Hash]*sidechain.ChainMain
tip atomic.Pointer[sidechain.ChainMain]
tipMinerData atomic.Pointer[MinerData]
medianTimestamp atomic.Uint64
}
type P2PoolInterface interface {
ClientRPC() *client.Client
ClientZMQ() *zmq.Client
Context() context.Context
Started() bool
UpdateBlockFound(data *sidechain.ChainMain, block *sidechain.PoolBlock)
}
func NewMainChain(s *sidechain.SideChain, p2pool P2PoolInterface) *MainChain {
m := &MainChain{
sidechain: s,
p2pool: p2pool,
mainchainByHeight: make(map[uint64]*sidechain.ChainMain),
mainchainByHash: make(map[types.Hash]*sidechain.ChainMain),
}
return m
}
func (c *MainChain) Listen() error {
ctx := c.p2pool.Context()
s, err := c.p2pool.ClientZMQ().Listen(ctx)
if err != nil {
return err
}
for {
select {
case <-ctx.Done():
return ctx.Err()
case err := <-s.ErrC:
//TODO: retry connection
return err
case fullChainMain := <-s.FullChainMainC:
log.Print(fullChainMain)
case fullMinerData := <-s.FullMinerDataC:
log.Print(fullMinerData)
}
}
}
func (c *MainChain) getTimestamps(timestamps []uint64) bool {
_ = timestamps[TimestampWindow-1]
if len(c.mainchainByHeight) <= TimestampWindow {
return false
}
for i := 0; i < TimestampWindow; i++ {
h, ok := c.mainchainByHeight[c.highest-uint64(i)]
if !ok {
break
}
timestamps[i] = h.Timestamp
}
return true
}
func (c *MainChain) updateMedianTimestamp() {
var timestamps [TimestampWindow]uint64
if !c.getTimestamps(timestamps[:]) {
c.medianTimestamp.Store(0)
return
}
slices.Sort(timestamps[:])
// Shift it +1 block compared to Monero's code because we don't have the latest block yet when we receive new miner data
ts := (timestamps[TimestampWindow/2] + timestamps[TimestampWindow/2+1]) / 2
log.Printf("[MainChain] Median timestamp updated to %d", ts)
c.medianTimestamp.Store(ts)
}
func (c *MainChain) HandleMainHeader(mainHeader *mainblock.Header) {
c.lock.Lock()
defer c.lock.Unlock()
mainData := &sidechain.ChainMain{
Difficulty: mainHeader.Difficulty,
Height: mainHeader.Height,
Timestamp: mainHeader.Timestamp,
Reward: mainHeader.Reward,
Id: mainHeader.Id,
}
c.mainchainByHeight[mainHeader.Height] = mainData
c.mainchainByHash[mainHeader.Id] = mainData
if mainData.Height > c.highest {
c.highest = mainData.Height
}
log.Printf("[MainChain] new main chain block: height = %d, id = %s, timestamp = %d, reward = %s", mainData.Height, mainData.Id.String(), mainData.Timestamp, utils.XMRUnits(mainData.Reward))
c.updateMedianTimestamp()
}
func (c *MainChain) HandleMainBlock(b *mainblock.Block) {
mainData := &sidechain.ChainMain{
Difficulty: types.ZeroDifficulty,
Height: b.Coinbase.GenHeight,
Timestamp: b.Timestamp,
Reward: b.Coinbase.TotalReward,
Id: b.Id(),
}
func() {
c.lock.Lock()
defer c.lock.Unlock()
if h, ok := c.mainchainByHeight[mainData.Height]; ok {
mainData.Difficulty = h.Difficulty
} else {
return
}
c.mainchainByHash[mainData.Id] = mainData
c.mainchainByHeight[mainData.Height] = mainData
if mainData.Height > c.highest {
c.highest = mainData.Height
}
log.Printf("[MainChain] new main chain block: height = %d, id = %s, timestamp = %d, reward = %s", mainData.Height, mainData.Id.String(), mainData.Timestamp, utils.XMRUnits(mainData.Reward))
c.updateMedianTimestamp()
}()
extraMergeMiningTag := b.Coinbase.Extra.GetTag(transaction.TxExtraTagMergeMining)
if extraMergeMiningTag == nil {
return
}
sidechainHashData := extraMergeMiningTag.Data
if len(sidechainHashData) != types.HashSize {
return
}
sidechainId := types.HashFromBytes(sidechainHashData)
if block := c.sidechain.GetPoolBlockByTemplateId(sidechainId); block != nil {
c.p2pool.UpdateBlockFound(mainData, block)
} else {
c.sidechain.WatchMainChainBlock(mainData, sidechainId)
}
c.updateTip()
}
// HandleChainMain
// Deprecated
func (c *MainChain) HandleChainMain(mainData *sidechain.ChainMain, extra []byte) {
func() {
c.lock.Lock()
defer c.lock.Unlock()
if h, ok := c.mainchainByHeight[mainData.Height]; ok {
h.Height = mainData.Height
h.Timestamp = mainData.Timestamp
h.Reward = mainData.Reward
mainData.Id = h.Id
mainData.Difficulty = h.Difficulty
c.mainchainByHash[h.Id] = h
} else {
return
}
if mainData.Height > c.highest {
c.highest = mainData.Height
}
log.Printf("[MainChain] new main chain block: height = %d, id = %s, timestamp = %d, reward = %s", mainData.Height, mainData.Id.String(), mainData.Timestamp, utils.XMRUnits(mainData.Reward))
c.updateMedianTimestamp()
}()
var tags transaction.ExtraTags
if err := tags.UnmarshalBinary(extra); err != nil {
return
}
extraMergeMiningTag := tags.GetTag(transaction.TxExtraTagMergeMining)
if extraMergeMiningTag == nil {
return
}
sidechainHashData := extraMergeMiningTag.Data
if len(sidechainHashData) != types.HashSize {
return
}
sidechainId := types.HashFromBytes(sidechainHashData)
if block := c.sidechain.GetPoolBlockByTemplateId(sidechainId); block != nil {
c.p2pool.UpdateBlockFound(mainData, block)
} else {
c.sidechain.WatchMainChainBlock(mainData, sidechainId)
}
c.updateTip()
}
func (c *MainChain) GetChainMainByHeight(height uint64) *sidechain.ChainMain {
c.lock.RLock()
defer c.lock.RUnlock()
return c.mainchainByHeight[height]
}
func (c *MainChain) GetChainMainByHash(hash types.Hash) *sidechain.ChainMain {
c.lock.RLock()
defer c.lock.RUnlock()
return c.mainchainByHash[hash]
}
func (c *MainChain) GetChainMainTip() *sidechain.ChainMain {
return c.tip.Load()
}
func (c *MainChain) updateTip() {
if minerData := c.tipMinerData.Load(); minerData != nil {
if d := c.GetChainMainByHash(minerData.PrevId); d != nil {
c.tip.Store(d)
}
}
}
func (c *MainChain) Cleanup() {
if tip := c.GetChainMainTip(); tip != nil {
c.lock.Lock()
defer c.lock.Unlock()
c.cleanup(tip.Height)
}
}
func (c *MainChain) cleanup(height uint64) {
// Expects m_mainchainLock to be already locked here
// Deletes everything older than 720 blocks, except for the 3 latest RandomX seed heights
const PruneDistance = BlockHeadersRequired
seedHeight := randomx.SeedHeight(height)
seedHeights := []uint64{seedHeight, seedHeight - randomx.SeedHashEpochBlocks, seedHeight - randomx.SeedHashEpochBlocks*2}
for h, m := range c.mainchainByHeight {
if (h + PruneDistance) >= height {
continue
}
if !slices.Contains(seedHeights, h) {
delete(c.mainchainByHash, m.Id)
delete(c.mainchainByHeight, h)
}
}
}
func (c *MainChain) DownloadBlockHeaders(currentHeight uint64) error {
seedHeight := randomx.SeedHeight(currentHeight)
var prevSeedHeight uint64
if seedHeight > randomx.SeedHashEpochBlocks {
prevSeedHeight = seedHeight - randomx.SeedHashEpochBlocks
}
// First download 2 RandomX seeds
for _, h := range []uint64{prevSeedHeight, seedHeight} {
if err := c.getBlockHeader(h); err != nil {
return err
}
}
var startHeight uint64
if currentHeight > BlockHeadersRequired {
startHeight = currentHeight - BlockHeadersRequired
}
if rangeResult, err := c.p2pool.ClientRPC().GetBlockHeadersRangeResult(startHeight, currentHeight-1, c.p2pool.Context()); err != nil {
return fmt.Errorf("couldn't download block headers range for height %d to %d: %s", startHeight, currentHeight-1, err)
} else {
for _, header := range rangeResult.Headers {
prevHash, _ := types.HashFromString(header.PrevHash)
h, _ := types.HashFromString(header.Hash)
c.HandleMainHeader(&mainblock.Header{
MajorVersion: uint8(header.MajorVersion),
MinorVersion: uint8(header.MinorVersion),
Timestamp: uint64(header.Timestamp),
PreviousId: prevHash,
Height: header.Height,
Nonce: uint32(header.Nonce),
Reward: header.Reward,
Id: h,
Difficulty: types.DifficultyFrom64(header.Difficulty),
})
}
log.Printf("[MainChain] Downloaded headers for range %d to %d", startHeight, currentHeight-1)
}
c.updateMedianTimestamp()
return nil
}
func (c *MainChain) HandleMinerData(minerData *MinerData) {
var missingHeights []uint64
func() {
c.lock.Lock()
defer c.lock.Unlock()
mainData := &sidechain.ChainMain{
Difficulty: minerData.Difficulty,
Height: minerData.Height,
}
if existingMainData, ok := c.mainchainByHeight[mainData.Height]; !ok {
c.mainchainByHeight[mainData.Height] = mainData
} else {
existingMainData.Difficulty = mainData.Difficulty
mainData = existingMainData
}
prevMainData := &sidechain.ChainMain{
Height: minerData.Height - 1,
Id: minerData.PrevId,
}
if existingPrevMainData, ok := c.mainchainByHeight[prevMainData.Height]; !ok {
c.mainchainByHeight[prevMainData.Height] = prevMainData
} else {
existingPrevMainData.Id = prevMainData.Id
// timestamp and reward is unknown here
existingPrevMainData.Timestamp = 0
existingPrevMainData.Reward = 0
prevMainData = existingPrevMainData
}
c.mainchainByHash[prevMainData.Id] = prevMainData
c.cleanup(minerData.Height)
minerData.TimeReceived = time.Now()
c.tipMinerData.Store(minerData)
c.updateMedianTimestamp()
log.Printf("[MainChain] new miner data: major_version = %d, height = %d, prev_id = %s, seed_hash = %s, difficulty = %s", minerData.MajorVersion, minerData.Height, minerData.PrevId.String(), minerData.SeedHash.String(), minerData.Difficulty.StringNumeric())
if c.p2pool.Started() {
for h := minerData.Height; h > 0 && (h+BlockHeadersRequired) > minerData.Height; h-- {
if d, ok := c.mainchainByHeight[h]; !ok || d.Difficulty.Equals(types.ZeroDifficulty) {
log.Printf("[MainChain] Main chain data for height = %d is missing, requesting from monerod again", h)
missingHeights = append(missingHeights, h)
}
}
}
}()
var wg sync.WaitGroup
for _, h := range missingHeights {
wg.Add(1)
go func(height uint64) {
wg.Done()
if err := c.getBlockHeader(height); err != nil {
log.Printf("[MainChain] %s", err)
}
}(h)
}
wg.Wait()
c.updateTip()
}
func (c *MainChain) getBlockHeader(height uint64) error {
if header, err := c.p2pool.ClientRPC().GetBlockHeaderByHeight(height, c.p2pool.Context()); err != nil {
return fmt.Errorf("couldn't download block header for height %d: %s", height, err)
} else {
prevHash, _ := types.HashFromString(header.BlockHeader.PrevHash)
h, _ := types.HashFromString(header.BlockHeader.Hash)
c.HandleMainHeader(&mainblock.Header{
MajorVersion: uint8(header.BlockHeader.MajorVersion),
MinorVersion: uint8(header.BlockHeader.MinorVersion),
Timestamp: uint64(header.BlockHeader.Timestamp),
PreviousId: prevHash,
Height: header.BlockHeader.Height,
Nonce: uint32(header.BlockHeader.Nonce),
Reward: header.BlockHeader.Reward,
Id: h,
Difficulty: types.DifficultyFrom64(header.BlockHeader.Difficulty),
})
}
return nil
}
type MinerData struct {
MajorVersion uint8
Height uint64
PrevId types.Hash
SeedHash types.Hash
Difficulty types.Difficulty
MedianWeight uint64
AlreadyGeneratedCoins uint64
MedianTimestamp uint64
//TxBacklog any
TimeReceived time.Time
}