diff --git a/cmd/archivetoarchive/archivetoarchive.go b/cmd/archivetoarchive/archivetoarchive.go new file mode 100644 index 0000000..0c83c6d --- /dev/null +++ b/cmd/archivetoarchive/archivetoarchive.go @@ -0,0 +1,98 @@ +package main + +import ( + "context" + "flag" + "fmt" + "git.gammaspectra.live/P2Pool/p2pool-observer/monero/client" + "git.gammaspectra.live/P2Pool/p2pool-observer/p2pool/cache/archive" + "git.gammaspectra.live/P2Pool/p2pool-observer/p2pool/sidechain" + "git.gammaspectra.live/P2Pool/p2pool-observer/types" + "github.com/floatdrop/lru" + "log" + "math" + "os" +) + +func main() { + inputConsensus := flag.String("consensus", "config.json", "Input config.json consensus file") + inputArchive := flag.String("input", "", "Input path for archive database") + outputArchive := flag.String("output", "", "Output path for archive database") + moneroHost := flag.String("host", "127.0.0.1", "IP address of your Monero node") + moneroRpcPort := flag.Uint("rpc-port", 18081, "monerod RPC API port number") + + flag.Parse() + + client.SetDefaultClientSettings(fmt.Sprintf("http://%s:%d", *moneroHost, *moneroRpcPort)) + + cf, err := os.ReadFile(*inputConsensus) + + consensus, err := sidechain.NewConsensusFromJSON(cf) + if err != nil { + log.Panic(err) + } + + difficultyCache := lru.New[uint64, types.Difficulty](1024) + + getDifficultyByHeight := func(height uint64) types.Difficulty { + if v := difficultyCache.Get(height); v == nil { + if r, err := client.GetDefaultClient().GetBlockHeaderByHeight(height, context.Background()); err == nil { + d := types.DifficultyFrom64(r.BlockHeader.Difficulty) + difficultyCache.Set(height, d) + return d + } + return types.ZeroDifficulty + } else { + return *v + } + } + + inputCache, err := archive.NewCache(*inputArchive, consensus, getDifficultyByHeight) + if err != nil { + log.Panic(err) + } + defer inputCache.Close() + + outputCache, err := archive.NewCache(*outputArchive, consensus, getDifficultyByHeight) + if err != nil { + log.Panic(err) + } + defer outputCache.Close() + + derivationCache := sidechain.NewDerivationCache() + + blockCache := lru.New[types.Hash, *sidechain.PoolBlock](int(consensus.ChainWindowSize * 2)) + + getByTemplateId := func(h types.Hash) *sidechain.PoolBlock { + if v := blockCache.Get(h); v == nil { + if bs := inputCache.LoadByTemplateId(h); len(bs) != 0 { + blockCache.Set(h, bs[0]) + return bs[0] + } else if bs = outputCache.LoadByTemplateId(h); len(bs) != 0 { + blockCache.Set(h, bs[0]) + return bs[0] + } else { + return nil + } + } else { + return *v + } + } + + preAllocatedShares := make(sidechain.Shares, consensus.ChainWindowSize*2) + for i := range preAllocatedShares { + preAllocatedShares[i] = &sidechain.Share{} + } + for blocksAtHeight := range inputCache.ScanHeights(0, math.MaxUint64) { + for i, b := range blocksAtHeight { + if _, err := b.PreProcessBlock(consensus, derivationCache, preAllocatedShares, getDifficultyByHeight, getByTemplateId); err != nil { + log.Printf("error processing block %s at %d, %s", types.HashFromBytes(b.CoinbaseExtra(sidechain.SideTemplateId)), b.Side.Height, err) + } else { + outputCache.Store(b) + if i == 0 { + blockCache.Set(types.HashFromBytes(b.CoinbaseExtra(sidechain.SideTemplateId)), b) + } + } + } + } +} diff --git a/cmd/cachetoarchive/cachetoarchive.go b/cmd/cachetoarchive/cachetoarchive.go index e7d890a..e5eada3 100644 --- a/cmd/cachetoarchive/cachetoarchive.go +++ b/cmd/cachetoarchive/cachetoarchive.go @@ -1,11 +1,14 @@ package main import ( + "context" "flag" + "git.gammaspectra.live/P2Pool/p2pool-observer/monero/client" "git.gammaspectra.live/P2Pool/p2pool-observer/p2pool/cache/archive" "git.gammaspectra.live/P2Pool/p2pool-observer/p2pool/cache/legacy" "git.gammaspectra.live/P2Pool/p2pool-observer/p2pool/sidechain" "git.gammaspectra.live/P2Pool/p2pool-observer/types" + "github.com/floatdrop/lru" "log" "math" "os" @@ -44,7 +47,22 @@ func main() { } defer cache.Close() - archiveCache, err := archive.NewCache(*outputArchive, consensus) + difficultyCache := lru.New[uint64, types.Difficulty](1024) + + getDifficultyByHeight := func(height uint64) types.Difficulty { + if v := difficultyCache.Get(height); v == nil { + if r, err := client.GetDefaultClient().GetBlockHeaderByHeight(height, context.Background()); err == nil { + d := types.DifficultyFrom64(r.BlockHeader.Difficulty) + difficultyCache.Set(height, d) + return d + } + return types.ZeroDifficulty + } else { + return *v + } + } + + archiveCache, err := archive.NewCache(*outputArchive, consensus, getDifficultyByHeight) if err != nil { log.Panic(err) } diff --git a/cmd/legacytoarchive/legacytoarchive.go b/cmd/legacytoarchive/legacytoarchive.go index 07c4004..124b131 100644 --- a/cmd/legacytoarchive/legacytoarchive.go +++ b/cmd/legacytoarchive/legacytoarchive.go @@ -26,7 +26,9 @@ func main() { log.Panic(err) } - archiveCache, err := archive.NewCache(*outputArchive, consensus) + archiveCache, err := archive.NewCache(*outputArchive, consensus, func(height uint64) types.Difficulty { + return types.ZeroDifficulty + }) if err != nil { log.Panic(err) } @@ -36,6 +38,8 @@ func main() { totalStored := 0 + derivationCache := sidechain.NewDerivationCache() + loadBlock := func(id types.Hash) *sidechain.PoolBlock { n := id.String() fPath := path.Join(*inputFolder, "blocks", n[:1], n) @@ -45,7 +49,7 @@ func main() { if hexBuf, err := hex.DecodeString(string(buf)); err != nil { log.Panic(err) } else { - if block, err := sidechain.NewShareFromExportedBytes(hexBuf, consensus.NetworkType); err != nil { + if block, err := sidechain.NewShareFromExportedBytes(hexBuf, consensus.NetworkType, derivationCache); err != nil { log.Printf("error decoding block %s, %s", id.String(), err) } else { return block @@ -55,14 +59,17 @@ func main() { return nil } - var storeBlock func(k types.Hash, b *sidechain.PoolBlock) - storeBlock = func(k types.Hash, b *sidechain.PoolBlock) { + var storeBlock func(k types.Hash, b *sidechain.PoolBlock, depth uint64) + storeBlock = func(k types.Hash, b *sidechain.PoolBlock, depth uint64) { if b == nil || processed[k] { return } + if depth >= consensus.ChainWindowSize*4*30 { //avoid infinite memory growth + return + } if parent := loadBlock(b.Side.Parent); parent != nil { b.FillTransactionParentIndices(parent) - storeBlock(b.Side.Parent, parent) + storeBlock(b.Side.Parent, parent, depth+1) } b.Depth.Store(math.MaxUint64) archiveCache.Store(b) @@ -73,14 +80,17 @@ func main() { for i := 0; i <= 0xf; i++ { n := hex.EncodeToString([]byte{byte(i)}) dPath := path.Join(*inputFolder, "blocks", n[1:]) + log.Printf("Reading directory %s", dPath) if dir, err := os.ReadDir(dPath); err != nil { log.Panic(err) } else { for _, e := range dir { h, _ := hex.DecodeString(e.Name()) id := types.HashFromBytes(h) - - storeBlock(id, loadBlock(id)) + bb := loadBlock(id) + if bb != nil && !archiveCache.ExistsByMainId(bb.MainId()) { + storeBlock(id, bb, 0) + } } } } @@ -100,7 +110,7 @@ func main() { if hexBuf, err := hex.DecodeString(string(buf)); err != nil { log.Panic(err) } else { - if block, err := sidechain.NewShareFromExportedBytes(hexBuf, consensus.NetworkType); err != nil { + if block, err := sidechain.NewShareFromExportedBytes(hexBuf, consensus.NetworkType, derivationCache); err != nil { log.Panic(err) } else { block.Depth.Store(math.MaxUint64) diff --git a/monero/address/address.go b/monero/address/address.go index ec0c883..ea29132 100644 --- a/monero/address/address.go +++ b/monero/address/address.go @@ -93,6 +93,15 @@ func FromRawAddress(network uint8, spend, view crypto.PublicKey) *Address { } func (a *Address) ToBase58() string { + if a.checksum == nil { + var nice [69]byte + nice[0] = a.Network + copy(nice[1:], a.SpendPub.AsSlice()) + copy(nice[1+crypto.PublicKeySize:], a.ViewPub.AsSlice()) + sum := moneroutil.GetChecksum(nice[:65]) + //this race is ok + a.checksum = sum[:] + } return moneroutil.EncodeMoneroBase58([]byte{a.Network}, a.SpendPub.AsSlice(), a.ViewPub.AsSlice(), a.checksum[:]) } diff --git a/p2pool/cache/archive/archive.go b/p2pool/cache/archive/archive.go index b8f160d..c474c3c 100644 --- a/p2pool/cache/archive/archive.go +++ b/p2pool/cache/archive/archive.go @@ -80,12 +80,16 @@ func (r multiRecord) Bytes() []byte { func (c *Cache) Store(block *sidechain.PoolBlock) { sideId := block.SideTemplateId(c.consensus) + if bytes.Compare(sideId[:], block.CoinbaseExtra(sidechain.SideTemplateId)) != 0 { + //wrong calculated template id + return + } mainId := block.MainId() var sideHeight, mainHeight [8]byte binary.BigEndian.PutUint64(sideHeight[:], block.Side.Height) binary.BigEndian.PutUint64(mainHeight[:], block.Main.Coinbase.GenHeight) - if c.existsByMainId(mainId) { + if c.ExistsByMainId(mainId) { return } @@ -104,6 +108,11 @@ func (c *Cache) Store(block *sidechain.PoolBlock) { if c.existsBySideChainHeightRange(block.Side.Height-c.consensus.ChainWindowSize-1, block.Side.Height-1) { storePruned = true } + + //fallback for parent-less blocks + if len(c.LoadByTemplateId(block.Side.Parent)) == 0 { + storePruned, storeCompact = false, false + } } else if block.Depth.Load() < c.consensus.ChainWindowSize { storePruned = true } @@ -163,7 +172,7 @@ func (c *Cache) RemoveByTemplateId(id types.Hash) { //TODO } -func (c *Cache) existsByMainId(id types.Hash) (result bool) { +func (c *Cache) ExistsByMainId(id types.Hash) (result bool) { _ = c.db.View(func(tx *bolt.Tx) error { b := tx.Bucket(blocksByMainId) if b.Get(id[:]) != nil { @@ -260,6 +269,48 @@ func (c *Cache) LoadByTemplateId(id types.Hash) (result []*sidechain.PoolBlock) return result } +func (c *Cache) ScanHeights(startHeight, endHeight uint64) chan []*sidechain.PoolBlock { + result := make(chan []*sidechain.PoolBlock) + go func() { + defer close(result) + _ = c.db.View(func(tx *bolt.Tx) error { + b := tx.Bucket(refBySideHeight) + var startHeightBytes, endHeightBytes [8]byte + + cursor := b.Cursor() + binary.BigEndian.PutUint64(startHeightBytes[:], startHeight) + binary.BigEndian.PutUint64(endHeightBytes[:], endHeight) + + k, v := cursor.Seek(startHeightBytes[:]) + for { + if k == nil { + return nil + } + r := multiRecordFromBytes(v) + blocks := make([]*sidechain.PoolBlock, 0, len(r)) + for _, h := range r { + if e := c.loadByMainId(tx, h); e != nil { + if bl := c.decodeBlock(e); bl != nil { + blocks = append(blocks, bl) + } else { + return fmt.Errorf("could not decode block %s", h.String()) + } + } else { + return fmt.Errorf("could not find block %s", h.String()) + } + } + result <- blocks + if bytes.Compare(k, endHeightBytes[:]) >= 0 { + break + } + k, v = cursor.Next() + } + return nil + }) + }() + return result +} + func (c *Cache) existsBySideChainHeightRange(startHeight, endHeight uint64) (result bool) { _ = c.db.View(func(tx *bolt.Tx) error { b := tx.Bucket(refBySideHeight) diff --git a/p2pool/sidechain/poolblock.go b/p2pool/sidechain/poolblock.go index af906a4..3e3f70c 100644 --- a/p2pool/sidechain/poolblock.go +++ b/p2pool/sidechain/poolblock.go @@ -91,7 +91,7 @@ type PoolBlock struct { // NewShareFromExportedBytes TODO deprecate this in favor of standard serialized shares // Deprecated -func NewShareFromExportedBytes(buf []byte, networkType NetworkType) (*PoolBlock, error) { +func NewShareFromExportedBytes(buf []byte, networkType NetworkType, cacheInterface DerivationCacheInterface) (*PoolBlock, error) { b := &PoolBlock{ NetworkType: networkType, } @@ -193,7 +193,7 @@ func NewShareFromExportedBytes(buf []byte, networkType NetworkType) (*PoolBlock, return nil, err } - b.FillPrivateKeys(&NilDerivationCache{}) //TODO + b.FillPrivateKeys(cacheInterface) b.cache.templateId = types.HashFromBytes(b.CoinbaseExtra(SideTemplateId)) diff --git a/p2pool/sidechain/poolblock_test.go b/p2pool/sidechain/poolblock_test.go index 1cd6815..5b9f70c 100644 --- a/p2pool/sidechain/poolblock_test.go +++ b/p2pool/sidechain/poolblock_test.go @@ -32,7 +32,7 @@ func TestPoolBlockDecode(t *testing.T) { t.Fatal(err) } - block, err := NewShareFromExportedBytes(contents, NetworkMainnet) + block, err := NewShareFromExportedBytes(contents, NetworkMainnet, &NilDerivationCache{}) if err != nil { t.Fatal(err) } diff --git a/p2pool/sidechain/sidedata.go b/p2pool/sidechain/sidedata.go index 0e06059..867d037 100644 --- a/p2pool/sidechain/sidedata.go +++ b/p2pool/sidechain/sidedata.go @@ -3,6 +3,7 @@ package sidechain import ( "bytes" "encoding/binary" + "fmt" "git.gammaspectra.live/P2Pool/p2pool-observer/monero/crypto" p2pooltypes "git.gammaspectra.live/P2Pool/p2pool-observer/p2pool/types" "git.gammaspectra.live/P2Pool/p2pool-observer/types" @@ -125,16 +126,16 @@ func (b *SideData) FromReader(reader readerAndByteReader, version ShareVersion) } if version > ShareVersion_V1 { if err = binary.Read(reader, binary.LittleEndian, &b.ExtraBuffer.SoftwareId); err != nil { - return err + return fmt.Errorf("within extra buffer: %w", err) } if err = binary.Read(reader, binary.LittleEndian, &b.ExtraBuffer.SoftwareVersion); err != nil { - return err + return fmt.Errorf("within extra buffer: %w", err) } if err = binary.Read(reader, binary.LittleEndian, &b.ExtraBuffer.RandomNumber); err != nil { - return err + return fmt.Errorf("within extra buffer: %w", err) } if err = binary.Read(reader, binary.LittleEndian, &b.ExtraBuffer.SideChainExtraNonce); err != nil { - return err + return fmt.Errorf("within extra buffer: %w", err) } } diff --git a/p2pool/sidechain/utils.go b/p2pool/sidechain/utils.go index 52b3c61..23673cc 100644 --- a/p2pool/sidechain/utils.go +++ b/p2pool/sidechain/utils.go @@ -16,7 +16,9 @@ import ( "sync/atomic" ) +type GetByMainIdFunc func(h types.Hash) *PoolBlock type GetByTemplateIdFunc func(h types.Hash) *PoolBlock +type GetBySideHeightIdFunc func(height uint64) UniquePoolBlockSlice func CalculateOutputs(block *PoolBlock, consensus *Consensus, difficultyByHeight block.GetDifficultyByHeightFunc, getByTemplateId GetByTemplateIdFunc, derivationCache DerivationCacheInterface, preAllocatedShares Shares) (outputs transaction.Outputs, bottomHeight uint64) { tmpShares, bottomHeight := GetShares(block, consensus, difficultyByHeight, getByTemplateId, preAllocatedShares)