From a46d182aa87358ef1b9fe33c396f40e2b147522a Mon Sep 17 00:00:00 2001 From: WeebDataHoarder <57538841+WeebDataHoarder@users.noreply.github.com> Date: Fri, 10 Mar 2023 16:45:22 +0100 Subject: [PATCH] Save/load blocks from p2pool.cache --- .gitignore | 4 +- cmd/p2pool/p2pool.go | 7 +- p2pool/api/api.go | 2 + p2pool/cache/cache.go | 21 ++++++ p2pool/cache/legacy/legacy.go | 116 ++++++++++++++++++++++++++++++++++ p2pool/p2p/client.go | 28 +++++--- p2pool/p2p/server.go | 48 +++++++++++++- p2pool/p2pool.go | 25 ++++++++ p2pool/sidechain/blobcache.go | 2 + p2pool/sidechain/sidechain.go | 2 + 10 files changed, 241 insertions(+), 14 deletions(-) create mode 100644 p2pool/cache/cache.go create mode 100644 p2pool/cache/legacy/legacy.go diff --git a/.gitignore b/.gitignore index 723ef36..004bb06 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,3 @@ -.idea \ No newline at end of file +.idea +p2pool.cache +p2pool_peers.txt \ No newline at end of file diff --git a/cmd/p2pool/p2pool.go b/cmd/p2pool/p2pool.go index 398066c..230943e 100644 --- a/cmd/p2pool/p2pool.go +++ b/cmd/p2pool/p2pool.go @@ -24,7 +24,6 @@ func main() { moneroRpcPort := flag.Uint("rpc-port", 18081, "monerod RPC API port number") moneroZmqPort := flag.Uint("zmq-port", 18083, "monerod ZMQ pub port number") p2pListen := flag.String("p2p", fmt.Sprintf("0.0.0.0:%d", currentConsensus.DefaultPort()), "IP:port for p2p server to listen on.") - //TODO: zmq addPeers := flag.String("addpeers", "", "Comma-separated list of IP:port of other p2pool nodes to connect to") peerList := flag.String("peer-list", "p2pool_peers.txt", "Either a path or an URL to obtain peer lists from. If it is a path, new peers will be saved to this path") consensusConfigFile := flag.String("config", "", "Name of the p2pool config file") @@ -34,6 +33,8 @@ func main() { inPeers := flag.Uint64("in-peers", 10, "Maximum number of incoming connections for p2p server (any value between 10 and 450)") p2pExternalPort := flag.Uint64("p2p-external-port", 0, "Port number that your router uses for mapping to your local p2p port. Use it if you are behind a NAT and still want to accept incoming connections") + noCache := flag.Bool("no-cache", false, "Disable p2pool.cache") + flag.Parse() client.SetDefaultClientSettings(fmt.Sprintf("http://%s:%d", *moneroHost, *moneroRpcPort)) @@ -73,6 +74,10 @@ func main() { settings["in-peers"] = strconv.FormatUint(*inPeers, 10) settings["external-port"] = strconv.FormatUint(*p2pExternalPort, 10) + if !*noCache { + settings["cache"] = "p2pool.cache" + } + if p2pool, err := p2pool2.NewP2Pool(currentConsensus, settings); err != nil { log.Fatalf("Could not start p2pool: %s", err) } else { diff --git a/p2pool/api/api.go b/p2pool/api/api.go index eeb4825..8071c42 100644 --- a/p2pool/api/api.go +++ b/p2pool/api/api.go @@ -14,6 +14,8 @@ import ( "strconv" ) +// Api +// Deprecated type Api struct { db *database.Database path string diff --git a/p2pool/cache/cache.go b/p2pool/cache/cache.go new file mode 100644 index 0000000..8128c61 --- /dev/null +++ b/p2pool/cache/cache.go @@ -0,0 +1,21 @@ +package cache + +import ( + "git.gammaspectra.live/P2Pool/p2pool-observer/p2pool/p2p" + "git.gammaspectra.live/P2Pool/p2pool-observer/p2pool/sidechain" + "git.gammaspectra.live/P2Pool/p2pool-observer/types" +) + +type Cache interface { + Store(block *sidechain.PoolBlock) +} + +type HeapCache interface { + Cache + LoadAll(s *p2p.Server) +} + +type AddressableCache interface { + Remove(hash types.Hash) + Load(hash types.Hash) *sidechain.PoolBlock +} diff --git a/p2pool/cache/legacy/legacy.go b/p2pool/cache/legacy/legacy.go new file mode 100644 index 0000000..552ecb1 --- /dev/null +++ b/p2pool/cache/legacy/legacy.go @@ -0,0 +1,116 @@ +package legacy + +import ( + "encoding/binary" + "git.gammaspectra.live/P2Pool/p2pool-observer/p2pool/p2p" + "git.gammaspectra.live/P2Pool/p2pool-observer/p2pool/sidechain" + "io" + "log" + "os" + "sync" + "sync/atomic" + "time" +) + +const blockSize = 96 * 1024 +const numBlocks = 4608 +const cacheSize = blockSize * numBlocks + +type Cache struct { + f *os.File + flushRunning atomic.Bool + storeIndex atomic.Uint32 + loadingStarted sync.Once + loadingInProgress atomic.Bool +} + +func NewCache(path string) (*Cache, error) { + if f, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE, 0666); err != nil { + return nil, err + } else { + if _, err = f.Seek(cacheSize-1, io.SeekStart); err != nil { + _ = f.Close() + return nil, err + } + //create sparse file + if _, err = f.Write([]byte{0}); err != nil { + _ = f.Close() + return nil, err + } + + return &Cache{ + f: f, + }, nil + } +} + +func (c *Cache) Store(block *sidechain.PoolBlock) { + if c.loadingInProgress.Load() { + return + } + if blob, err := block.MarshalBinary(); err != nil { + return + } else { + if (len(blob) + 4) > blockSize { + //block too big + return + } + storeIndex := (c.storeIndex.Add(1) % numBlocks) * blockSize + _, _ = c.f.WriteAt(binary.LittleEndian.AppendUint32(nil, uint32(len(blob))), int64(storeIndex)) + _, _ = c.f.WriteAt(blob, int64(storeIndex)+4) + } +} + +func (c *Cache) LoadAll(s *p2p.Server) { + c.loadingStarted.Do(func() { + c.loadingInProgress.Store(true) + defer c.loadingInProgress.Store(false) + log.Print("[Cache] Loading cached blocks") + + var blobLen [4]byte + buf := make([]byte, 0, blockSize) + + var blocksLoaded int + for i := 0; i < numBlocks; i++ { + storeIndex := (c.storeIndex.Add(1) % numBlocks) * blockSize + + if _, err := c.f.ReadAt(blobLen[:], int64(storeIndex)); err != nil { + return + } + blobLength := binary.LittleEndian.Uint32(blobLen[:]) + if (blobLength + 4) > blockSize { + //block too big + continue + } + if _, err := c.f.ReadAt(buf[:blobLength], int64(storeIndex)+4); err != nil { + continue + } + + block := &sidechain.PoolBlock{ + NetworkType: s.Consensus().NetworkType, + LocalTimestamp: uint64(time.Now().Unix()), + } + + if err := block.UnmarshalBinary(buf[:blobLength]); err != nil { + continue + } + + s.AddCachedBlock(block) + + blocksLoaded++ + } + + log.Printf("[Cache] Loaded %d cached blocks", blocksLoaded) + }) +} + +func (c *Cache) Close() { + _ = c.f.Close() +} + +func (c *Cache) Flush() { + if !c.flushRunning.Swap(true) { + defer c.flushRunning.Store(false) + _ = c.f.Sync() + } +} diff --git a/p2pool/p2p/client.go b/p2pool/p2p/client.go index f6013a1..ddf5964 100644 --- a/p2pool/p2p/client.go +++ b/p2pool/p2p/client.go @@ -128,6 +128,16 @@ func (c *Client) SendMissingBlockRequest(hash types.Hash) { return } + if b := c.Owner.GetCachedBlock(hash); b != nil { + log.Printf("[P2PClient] Using cached block for id = %s", hash.String()) + if missingBlocks, err := c.Owner.SideChain().AddPoolBlockExternal(b); err == nil { + for _, id := range missingBlocks { + c.SendMissingBlockRequest(id) + } + return + } + } + // do not re-request hashes that have been requested if !c.RequestedHashes.PushUnique(hash) { return @@ -194,7 +204,7 @@ func (c *Client) SendPeerListRequest() { MessageId: MessagePeerListRequest, }) c.LastPeerListRequestTimestamp.Store(uint64(time.Now().UnixMicro())) - log.Printf("[P2PClient] Sending PEER_LIST_REQUEST to %s", c.AddressPort.String()) + //log.Printf("[P2PClient] Sending PEER_LIST_REQUEST to %s", c.AddressPort.String()) } func (c *Client) SendPeerListResponse(list []netip.AddrPort) { @@ -394,15 +404,6 @@ func (c *Client) OnConnection() { isChainTipBlockRequest := false if c.chainTipBlockRequest.Swap(false) { isChainTipBlockRequest = true - if expectedBlockId == types.ZeroHash { - peerHeight := block.Main.Coinbase.GenHeight - ourHeight := c.Owner.MainChain().GetMinerDataTip().Height - - if (peerHeight + 2) < ourHeight { - c.Ban(DefaultBanTime, fmt.Errorf("mining on top of a stale block (mainchain peer height %d, expected >= %d)", peerHeight, ourHeight)) - return - } - } //TODO: stale block log.Printf("[P2PClient] Peer %s tip is at id = %s, height = %d, main height = %d", c.AddressPort.String(), types.HashFromBytes(block.CoinbaseExtra(sidechain.SideTemplateId)), block.Side.Height, block.Main.Coinbase.GenHeight) @@ -411,6 +412,13 @@ func (c *Client) OnConnection() { c.Ban(DefaultBanTime, fmt.Errorf("expected block id = %s, got %s", expectedBlockId, types.ZeroHash.String())) return } + peerHeight := block.Main.Coinbase.GenHeight + ourHeight := c.Owner.MainChain().GetMinerDataTip().Height + + if (peerHeight + 2) < ourHeight { + c.Ban(DefaultBanTime, fmt.Errorf("mining on top of a stale block (mainchain peer height %d, expected >= %d)", peerHeight, ourHeight)) + return + } c.SendPeerListRequest() } diff --git a/p2pool/p2p/server.go b/p2pool/p2p/server.go index 131319f..e43b11b 100644 --- a/p2pool/p2p/server.go +++ b/p2pool/p2p/server.go @@ -8,6 +8,7 @@ import ( "git.gammaspectra.live/P2Pool/p2pool-observer/p2pool/mainchain" "git.gammaspectra.live/P2Pool/p2pool-observer/p2pool/sidechain" p2pooltypes "git.gammaspectra.live/P2Pool/p2pool-observer/p2pool/types" + "git.gammaspectra.live/P2Pool/p2pool-observer/types" "git.gammaspectra.live/P2Pool/p2pool-observer/utils" "golang.org/x/exp/slices" "log" @@ -54,6 +55,9 @@ type Server struct { clientsLock sync.RWMutex clients []*Client + cachedBlocksLock sync.RWMutex + cachedBlocks map[types.Hash]*sidechain.PoolBlock + ctx context.Context } @@ -76,6 +80,7 @@ func NewServer(p2pool P2PoolInterface, listenAddress string, externalListenPort peerId: binary.LittleEndian.Uint64(peerId), MaxOutgoingPeers: utils.Min(utils.Max(maxOutgoingPeers, 10), 450), MaxIncomingPeers: utils.Min(utils.Max(maxIncomingPeers, 10), 450), + cachedBlocks: make(map[types.Hash]*sidechain.PoolBlock, p2pool.Consensus().ChainWindowSize*3), versionInformation: PeerVersionInformation{SoftwareId: SoftwareIdGoObserver, SoftwareVersion: CurrentSoftwareVersion, Protocol: SupportedProtocolVersion}, ctx: ctx, } @@ -142,7 +147,7 @@ func (s *Server) updatePeerList() { } func (s *Server) updateClientConnections() { - log.Printf("clients %d len %d", s.NumOutgoingConnections.Load(), len(s.Clients())) + //log.Printf("clients %d len %d", s.NumOutgoingConnections.Load(), len(s.Clients())) currentPeers := uint32(s.NumOutgoingConnections.Load()) peerList := s.PeerList() for currentPeers < s.MaxOutgoingPeers && len(peerList) > 0 { @@ -161,13 +166,48 @@ func (s *Server) updateClientConnections() { } } +func (s *Server) AddCachedBlock(block *sidechain.PoolBlock) { + s.cachedBlocksLock.Lock() + defer s.cachedBlocksLock.Unlock() + + if s.cachedBlocks == nil { + return + } + + s.cachedBlocks[block.SideTemplateId(s.p2pool.Consensus())] = block +} + +func (s *Server) ClearCachedBlocks() { + s.cachedBlocksLock.Lock() + defer s.cachedBlocksLock.Unlock() + + s.cachedBlocks = nil +} + +func (s *Server) GetCachedBlock(hash types.Hash) *sidechain.PoolBlock { + s.cachedBlocksLock.RLock() + defer s.cachedBlocksLock.RUnlock() + + return s.cachedBlocks[hash] +} + func (s *Server) DownloadMissingBlocks() { clientList := s.Clients() if len(clientList) == 0 { return } + + s.cachedBlocksLock.RLock() + defer s.cachedBlocksLock.RUnlock() + for _, h := range s.SideChain().GetMissingBlocks() { + if b, ok := s.cachedBlocks[h]; ok { + if _, err := s.SideChain().AddPoolBlockExternal(b); err == nil { + continue + } + } + clientList[unsafeRandom.Intn(len(clientList))].SendUniqueBlockRequest(h) } } @@ -198,11 +238,15 @@ func (s *Server) Listen() (err error) { wg.Add(1) go func() { defer wg.Done() - for range time.Tick(time.Second * 60) { + for range time.Tick(time.Second) { if s.close.Load() { return } + if s.SideChain().PreCalcFinished() { + s.ClearCachedBlocks() + } + s.DownloadMissingBlocks() } }() diff --git a/p2pool/p2pool.go b/p2pool/p2pool.go index 59f7a47..05ed11a 100644 --- a/p2pool/p2pool.go +++ b/p2pool/p2pool.go @@ -9,6 +9,8 @@ import ( "git.gammaspectra.live/P2Pool/p2pool-observer/monero/client" "git.gammaspectra.live/P2Pool/p2pool-observer/monero/client/zmq" "git.gammaspectra.live/P2Pool/p2pool-observer/monero/transaction" + "git.gammaspectra.live/P2Pool/p2pool-observer/p2pool/cache" + "git.gammaspectra.live/P2Pool/p2pool-observer/p2pool/cache/legacy" "git.gammaspectra.live/P2Pool/p2pool-observer/p2pool/mainchain" "git.gammaspectra.live/P2Pool/p2pool-observer/p2pool/p2p" "git.gammaspectra.live/P2Pool/p2pool-observer/p2pool/sidechain" @@ -24,6 +26,7 @@ type P2Pool struct { consensus *sidechain.Consensus sidechain *sidechain.SideChain mainchain *mainchain.MainChain + cache cache.HeapCache server *p2p.Server ctx context.Context @@ -76,6 +79,12 @@ func NewP2Pool(consensus *sidechain.Consensus, settings map[string]string) (*P2P return nil, errors.New("could not create MainChain") } + if cachePath, ok := settings["cache"]; ok { + if pool.cache, err = legacy.NewCache(cachePath); err != nil { + return nil, fmt.Errorf("could not create cache: %w", err) + } + } + if addr, ok := settings["listen"]; ok { listenAddress = addr } @@ -265,6 +274,12 @@ func (p *P2Pool) Run() (err error) { } }() + //TODO: move peer list loading here + + if p.cache != nil { + p.cache.LoadAll(p.Server()) + } + p.started.Store(true) if err = p.Server().Listen(); err != nil { @@ -381,6 +396,16 @@ func (p *P2Pool) SubmitBlock(b *block.Block) { }() } +func (p *P2Pool) Store(block *sidechain.PoolBlock) { + if p.cache != nil { + p.cache.Store(block) + } +} + +func (p *P2Pool) ClearCachedBlocks() { + p.server.ClearCachedBlocks() +} + func (p *P2Pool) Started() bool { return p.started.Load() } diff --git a/p2pool/sidechain/blobcache.go b/p2pool/sidechain/blobcache.go index 45568a0..51fa223 100644 --- a/p2pool/sidechain/blobcache.go +++ b/p2pool/sidechain/blobcache.go @@ -38,6 +38,8 @@ func (c *SideChain) compressedBlockId(block *PoolBlock) []byte { func (c *SideChain) saveBlock(block *PoolBlock) { go func() { + c.server.Store(block) + //TODO: make this a worker with a queue? if !block.Verified.Load() || block.Invalid.Load() { diff --git a/p2pool/sidechain/sidechain.go b/p2pool/sidechain/sidechain.go index b4c66a9..c8d35c8 100644 --- a/p2pool/sidechain/sidechain.go +++ b/p2pool/sidechain/sidechain.go @@ -44,6 +44,8 @@ type P2PoolInterface interface { SubmitBlock(block *mainblock.Block) GetChainMainTip() *ChainMain GetMinerDataTip() *p2pooltypes.MinerData + Store(block *PoolBlock) + ClearCachedBlocks() } type ChainMain struct {