From 3cde3800deb692a5a6d39d6e0f2be832795825bb Mon Sep 17 00:00:00 2001 From: WeebDataHoarder <57538841+WeebDataHoarder@users.noreply.github.com> Date: Mon, 8 Apr 2024 13:59:11 +0200 Subject: [PATCH] Allow stratum mempool selection to include high fee transactions or time since last reception --- monero/client/tx.go | 4 +-- monero/client/zmq/types.go | 18 +++++------ p2pool/mempool/mempool.go | 18 ++++++----- p2pool/stratum/mempool.go | 63 ++++++++++++++++++++++++++++++++++++++ p2pool/stratum/server.go | 53 ++++++++++++++++++++++++-------- 5 files changed, 124 insertions(+), 32 deletions(-) create mode 100644 p2pool/stratum/mempool.go diff --git a/monero/client/tx.go b/monero/client/tx.go index dd73b7c..250e501 100644 --- a/monero/client/tx.go +++ b/monero/client/tx.go @@ -25,7 +25,7 @@ func isRctBulletproofPlus(t int) bool { } // NewEntryFromRPCData TODO -func NewEntryFromRPCData(id types.Hash, buf []byte, json *daemon.TransactionJSON) *mempool.MempoolEntry { +func NewEntryFromRPCData(id types.Hash, buf []byte, json *daemon.TransactionJSON) *mempool.Entry { isBulletproof := isRctBulletproof(json.RctSignatures.Type) isBulletproofPlus := isRctBulletproofPlus(json.RctSignatures.Type) @@ -96,7 +96,7 @@ func NewEntryFromRPCData(id types.Hash, buf []byte, json *daemon.TransactionJSON weight = uint64(len(buf)) + bpClawback } - return &mempool.MempoolEntry{ + return &mempool.Entry{ Id: id, BlobSize: uint64(len(buf)), Weight: weight, diff --git a/monero/client/zmq/types.go b/monero/client/zmq/types.go index 783b79c..ad1b734 100644 --- a/monero/client/zmq/types.go +++ b/monero/client/zmq/types.go @@ -108,13 +108,13 @@ type FullTxPoolAdd struct { } type FullMinerData struct { - MajorVersion uint8 `json:"major_version"` - Height uint64 `json:"height"` - PrevId types.Hash `json:"prev_id"` - SeedHash types.Hash `json:"seed_hash"` - Difficulty types.Difficulty `json:"difficulty"` - MedianWeight uint64 `json:"median_weight"` - AlreadyGeneratedCoins uint64 `json:"already_generated_coins"` - MedianTimestamp uint64 `json:"median_timestamp"` - TxBacklog []*mempool.MempoolEntry `json:"tx_backlog"` + MajorVersion uint8 `json:"major_version"` + Height uint64 `json:"height"` + PrevId types.Hash `json:"prev_id"` + SeedHash types.Hash `json:"seed_hash"` + Difficulty types.Difficulty `json:"difficulty"` + MedianWeight uint64 `json:"median_weight"` + AlreadyGeneratedCoins uint64 `json:"already_generated_coins"` + MedianTimestamp uint64 `json:"median_timestamp"` + TxBacklog []*mempool.Entry `json:"tx_backlog"` } diff --git a/p2pool/mempool/mempool.go b/p2pool/mempool/mempool.go index e7ada66..589936c 100644 --- a/p2pool/mempool/mempool.go +++ b/p2pool/mempool/mempool.go @@ -7,21 +7,23 @@ import ( "math" "math/bits" "slices" + "time" ) -type MempoolEntry struct { - Id types.Hash `json:"id"` - BlobSize uint64 `json:"blob_size"` - Weight uint64 `json:"weight"` - Fee uint64 `json:"fee"` +type Entry struct { + Id types.Hash `json:"id"` + BlobSize uint64 `json:"blob_size"` + Weight uint64 `json:"weight"` + Fee uint64 `json:"fee"` + TimeReceived time.Time `json:"-"` } -type Mempool []*MempoolEntry +type Mempool []*Entry func (m Mempool) Sort() { // Sort all transactions by fee per byte (highest to lowest) - slices.SortFunc(m, func(a, b *MempoolEntry) int { + slices.SortFunc(m, func(a, b *Entry) int { return a.Compare(b) }) } @@ -137,7 +139,7 @@ func (m Mempool) PerfectSum(targetFee uint64) chan Mempool { } // Compare returns -1 if self is preferred over o, 0 if equal, 1 if o is preferred over self -func (t *MempoolEntry) Compare(o *MempoolEntry) int { +func (t *Entry) Compare(o *Entry) int { a := t.Fee * o.Weight b := o.Fee * t.Weight diff --git a/p2pool/stratum/mempool.go b/p2pool/stratum/mempool.go new file mode 100644 index 0000000..fcaad9d --- /dev/null +++ b/p2pool/stratum/mempool.go @@ -0,0 +1,63 @@ +package stratum + +import ( + "git.gammaspectra.live/P2Pool/consensus/v3/p2pool/mempool" + "git.gammaspectra.live/P2Pool/consensus/v3/types" + "github.com/dolthub/swiss" + "time" +) + +type MiningMempool swiss.Map[types.Hash, *mempool.Entry] + +func (m *MiningMempool) m() *swiss.Map[types.Hash, *mempool.Entry] { + return (*swiss.Map[types.Hash, *mempool.Entry])(m) +} + +// Add Inserts a transaction into the mempool. +func (m *MiningMempool) Add(tx *mempool.Entry) (added bool) { + mm := m.m() + if !mm.Has(tx.Id) { + if tx.TimeReceived.IsZero() { + tx.TimeReceived = time.Now() + } + mm.Put(tx.Id, tx) + added = true + } + + return added +} + +func (m *MiningMempool) Swap(pool mempool.Mempool) { + currentTime := time.Now() + + mm := m.m() + for _, tx := range pool { + if v, ok := mm.Get(tx.Id); ok { + //tx is already here, use previous seen time + tx.TimeReceived = v.TimeReceived + } else { + tx.TimeReceived = currentTime + } + } + + mm.Clear() + + for _, tx := range pool { + mm.Put(tx.Id, tx) + } +} + +func (m *MiningMempool) Select(highFee uint64, receivedSince time.Duration) (pool mempool.Mempool) { + pool = make(mempool.Mempool, 0, m.m().Count()) + + currentTime := time.Now() + + m.m().Iter(func(_ types.Hash, tx *mempool.Entry) (stop bool) { + if currentTime.Sub(tx.TimeReceived) > receivedSince || tx.Fee >= highFee { + pool = append(pool, tx) + } + return false + }) + + return pool +} diff --git a/p2pool/stratum/server.go b/p2pool/stratum/server.go index 760e350..3080c81 100644 --- a/p2pool/stratum/server.go +++ b/p2pool/stratum/server.go @@ -14,6 +14,7 @@ import ( p2pooltypes "git.gammaspectra.live/P2Pool/consensus/v3/p2pool/types" "git.gammaspectra.live/P2Pool/consensus/v3/types" "git.gammaspectra.live/P2Pool/consensus/v3/utils" + "github.com/dolthub/swiss" gojson "github.com/goccy/go-json" fasthex "github.com/tmthrgd/go-hex" "math" @@ -25,6 +26,10 @@ import ( "time" ) +// HighFeeValue 0.006 XMR +const HighFeeValue uint64 = 6000000000 +const TimeInMempool = time.Second * 5 + type ephemeralPubKeyCacheKey [crypto.PublicKeySize*2 + 8]byte type ephemeralPubKeyCacheEntry struct { @@ -65,7 +70,7 @@ type Server struct { lock sync.RWMutex sidechain *sidechain.SideChain - mempool mempool.Mempool + mempool *MiningMempool lastMempoolRefresh time.Time preAllocatedDifficultyData []sidechain.DifficultyData @@ -117,6 +122,7 @@ func NewServer(s *sidechain.SideChain, submitFunc func(block *sidechain.PoolBloc preAllocatedSharesPool: sidechain.NewPreAllocatedSharesPool(s.Consensus().ChainWindowSize * 2), preAllocatedBuffer: make([]byte, 0, sidechain.PoolBlockMaxTemplateSize), miners: make(map[address.PackedAddress]*MinerTrackingEntry), + mempool: (*MiningMempool)(swiss.NewMap[types.Hash, *mempool.Entry](512)), // buffer 4 at a time for non-blocking source incomingChanges: make(chan func() bool, 4), } @@ -258,11 +264,14 @@ func (s *Server) fillNewTemplateData(currentDifficulty types.Difficulty) error { return errors.New("could not find reserved share index") } + // Only choose transactions that were received 5 or more seconds ago, or high fee (>= 0.006 XMR) transactions + selectedMempool := s.mempool.Select(HighFeeValue, TimeInMempool) + //TODO: limit max Monero block size baseReward := block.GetBaseReward(s.minerData.AlreadyGeneratedCoins) - totalWeight, totalFees := s.mempool.WeightAndFees() + totalWeight, totalFees := selectedMempool.WeightAndFees() maxReward := baseReward + totalFees @@ -276,22 +285,27 @@ func (s *Server) fillNewTemplateData(currentDifficulty types.Difficulty) error { } coinbaseTransactionWeight := uint64(tx.BufferLength()) - var selectedMempool mempool.Mempool + var pickedMempool mempool.Mempool if totalWeight+coinbaseTransactionWeight <= s.minerData.MedianWeight { // if a block doesn't get into the penalty zone, just pick all transactions - selectedMempool = s.mempool + pickedMempool = selectedMempool } else { - selectedMempool = s.mempool.Pick(baseReward, coinbaseTransactionWeight, s.minerData.MedianWeight) + pickedMempool = selectedMempool.Pick(baseReward, coinbaseTransactionWeight, s.minerData.MedianWeight) } - s.newTemplateData.Transactions = make([]types.Hash, len(selectedMempool)) + //shuffle transactions + unsafeRandom.Shuffle(len(pickedMempool), func(i, j int) { + pickedMempool[i], pickedMempool[j] = pickedMempool[j], pickedMempool[i] + }) - for i, entry := range selectedMempool { + s.newTemplateData.Transactions = make([]types.Hash, len(pickedMempool)) + + for i, entry := range pickedMempool { s.newTemplateData.Transactions[i] = entry.Id } - finalReward := mempool.GetBlockReward(baseReward, s.minerData.MedianWeight, selectedMempool.Fees(), coinbaseTransactionWeight+selectedMempool.Weight()) + finalReward := mempool.GetBlockReward(baseReward, s.minerData.MedianWeight, pickedMempool.Fees(), coinbaseTransactionWeight+pickedMempool.Weight()) if finalReward < baseReward { return errors.New("final reward < base reward, should never happen") @@ -671,14 +685,27 @@ func (s *Server) createCoinbaseTransaction(txType uint8, shares sidechain.Shares func (s *Server) HandleMempoolData(data mempool.Mempool) { s.incomingChanges <- func() bool { + timeReceived := time.Now() + s.lock.Lock() defer s.lock.Unlock() - s.mempool = append(s.mempool, data...) + var highFeeReceived bool + for _, tx := range data { + //prevent a lot of calls if not needed + if utils.GlobalLogLevel&utils.LogLevelDebug > 0 { + utils.Debugf("Stratum", "new tx id = %s, size = %d, weight = %d, fee = %s", tx.Id, tx.BlobSize, tx.Weight, utils.XMRUnits(tx.Fee)) + } - // Refresh if 10 seconds have passed between templates and new transactions arrived - if time.Now().Sub(s.lastMempoolRefresh) >= time.Second*10 { - s.lastMempoolRefresh = time.Now() + if s.mempool.Add(tx) && tx.Fee >= HighFeeValue { + highFeeReceived = true + utils.Noticef("Stratum", "high fee tx received: %s, %s", tx.Id, utils.XMRUnits(tx.Fee)) + } + } + + // Refresh if 10 seconds have passed between templates and new transactions arrived, or a high fee was received + if highFeeReceived || timeReceived.Sub(s.lastMempoolRefresh) >= time.Second*10 { + s.lastMempoolRefresh = timeReceived if err := s.fillNewTemplateData(types.ZeroDifficulty); err != nil { utils.Errorf("Stratum", "Error building new template data: %s", err) return false @@ -696,7 +723,7 @@ func (s *Server) HandleMinerData(minerData *p2pooltypes.MinerData) { if s.minerData == nil || s.minerData.Height <= minerData.Height { s.minerData = minerData - s.mempool = minerData.TxBacklog + s.mempool.Swap(minerData.TxBacklog) s.lastMempoolRefresh = time.Now() if err := s.fillNewTemplateData(types.ZeroDifficulty); err != nil { utils.Errorf("Stratum", "Error building new template data: %s", err)