ZMQ: Use serial method instead of channels to preserve ordering of events

This commit is contained in:
DataHoarder 2023-05-31 12:25:40 +02:00
parent 879f15515c
commit 2fe65a7a62
Signed by: DataHoarder
SSH key fingerprint: SHA256:OLTRf6Fl87G52SiR7sWLGNzlJt4WOX+tfI2yxo0z7xk
4 changed files with 55 additions and 111 deletions

View file

@ -38,19 +38,17 @@ func NewClient(endpoint string, topics ...Topic) *Client {
// Stream provides channels where instances of the desired topic object are
// sent to.
type Stream struct {
ErrC chan error
FullChainMainC chan *FullChainMain
FullTxPoolAddC chan *FullTxPoolAdd
FullMinerDataC chan *FullMinerData
MinimalChainMainC chan *MinimalChainMain
MinimalTxPoolAddC chan []TxMempoolData
FullChainMainC func(*FullChainMain)
FullTxPoolAddC func([]FullTxPoolAdd)
FullMinerDataC func(*FullMinerData)
MinimalChainMainC func(*MinimalChainMain)
MinimalTxPoolAddC func([]TxMempoolData)
}
// Listen listens for a list of topics pre-configured for this client (via NewClient).
func (c *Client) Listen(ctx context.Context) (*Stream, error) {
func (c *Client) Listen(ctx context.Context, fullChainMain func(chainMain *FullChainMain), fullTxPoolAdd func(txs []FullTxPoolAdd), fullMinerData func(main *FullMinerData), minimalChainMain func(chainMain *MinimalChainMain), minimalTxPoolAdd func(txs []TxMempoolData)) error {
if err := c.listen(ctx, c.topics...); err != nil {
return nil, fmt.Errorf("listen on '%s': %w", strings.Join(func() (r []string) {
return fmt.Errorf("listen on '%s': %w", strings.Join(func() (r []string) {
for _, s := range c.topics {
r = append(r, string(s))
}
@ -59,30 +57,18 @@ func (c *Client) Listen(ctx context.Context) (*Stream, error) {
}
stream := &Stream{
ErrC: make(chan error),
FullChainMainC: make(chan *FullChainMain),
FullTxPoolAddC: make(chan *FullTxPoolAdd),
FullMinerDataC: make(chan *FullMinerData),
MinimalChainMainC: make(chan *MinimalChainMain),
MinimalTxPoolAddC: make(chan []TxMempoolData),
FullChainMainC: fullChainMain,
FullTxPoolAddC: fullTxPoolAdd,
FullMinerDataC: fullMinerData,
MinimalChainMainC: minimalChainMain,
MinimalTxPoolAddC: minimalTxPoolAdd,
}
go func() {
if err := c.loop(stream); err != nil {
stream.ErrC <- fmt.Errorf("loop: %w", err)
}
if err := c.loop(stream); err != nil {
return fmt.Errorf("loop: %w", err)
}
close(stream.ErrC)
close(stream.FullChainMainC)
close(stream.FullTxPoolAddC)
close(stream.FullMinerDataC)
close(stream.MinimalChainMainC)
close(stream.MinimalTxPoolAddC)
}()
return stream, nil
return nil
}
// Close closes any established connection, if any.
@ -167,21 +153,19 @@ func (c *Client) transmitFullChainMain(stream *Stream, gson []byte) error {
return fmt.Errorf("unmarshal: %w", err)
}
for _, element := range arr {
stream.FullChainMainC <- element
stream.FullChainMainC(element)
}
return nil
}
func (c *Client) transmitFullTxPoolAdd(stream *Stream, gson []byte) error {
var arr []*FullTxPoolAdd
var arr []FullTxPoolAdd
if err := utils.UnmarshalJSON(gson, &arr); err != nil {
return fmt.Errorf("unmarshal: %w", err)
}
for _, element := range arr {
stream.FullTxPoolAddC <- element
}
stream.FullTxPoolAddC(arr)
return nil
}
@ -192,7 +176,7 @@ func (c *Client) transmitFullMinerData(stream *Stream, gson []byte) error {
if err := utils.UnmarshalJSON(gson, element); err != nil {
return fmt.Errorf("unmarshal: %w", err)
}
stream.FullMinerDataC <- element
stream.FullMinerDataC(element)
return nil
}
@ -202,7 +186,7 @@ func (c *Client) transmitMinimalChainMain(stream *Stream, gson []byte) error {
if err := utils.UnmarshalJSON(gson, element); err != nil {
return fmt.Errorf("unmarshal: %w", err)
}
stream.MinimalChainMainC <- element
stream.MinimalChainMainC(element)
return nil
}
@ -212,7 +196,7 @@ func (c *Client) transmitMinimalTxPoolAdd(stream *Stream, gson []byte) error {
if err := utils.UnmarshalJSON(gson, &arr); err != nil {
return fmt.Errorf("unmarshal: %w", err)
}
stream.MinimalTxPoolAddC <- arr
stream.MinimalTxPoolAddC(arr)
return nil
}

View file

@ -9,6 +9,7 @@ import (
"git.gammaspectra.live/P2Pool/p2pool-observer/monero/client/zmq"
"git.gammaspectra.live/P2Pool/p2pool-observer/monero/randomx"
"git.gammaspectra.live/P2Pool/p2pool-observer/monero/transaction"
"git.gammaspectra.live/P2Pool/p2pool-observer/p2pool/mempool"
"git.gammaspectra.live/P2Pool/p2pool-observer/p2pool/sidechain"
p2pooltypes "git.gammaspectra.live/P2Pool/p2pool-observer/p2pool/types"
"git.gammaspectra.live/P2Pool/p2pool-observer/types"
@ -61,18 +62,10 @@ func NewMainChain(s *sidechain.SideChain, p2pool P2PoolInterface) *MainChain {
func (c *MainChain) Listen() error {
ctx := c.p2pool.Context()
s, err := c.p2pool.ClientZMQ().Listen(ctx)
if err != nil {
return err
}
for {
select {
case err := <-s.ErrC:
return err
case fullChainMain := <-s.FullChainMainC:
err := c.p2pool.ClientZMQ().Listen(ctx,
func(fullChainMain *zmq.FullChainMain) {
if len(fullChainMain.MinerTx.Inputs) < 1 {
continue
return
}
d := &sidechain.ChainMain{
Difficulty: types.ZeroDifficulty,
@ -113,13 +106,14 @@ func (c *MainChain) Listen() error {
}
if len(outputs) != len(fullChainMain.MinerTx.Outputs) {
continue
return
}
extraDataRaw, _ := hex.DecodeString(fullChainMain.MinerTx.Extra)
extraTags := transaction.ExtraTags{}
if err = extraTags.UnmarshalBinary(extraDataRaw); err != nil {
continue
if err := extraTags.UnmarshalBinary(extraDataRaw); err != nil {
//TODO: err
extraTags = nil
}
blockData := &mainblock.Block{
@ -144,7 +138,18 @@ func (c *MainChain) Listen() error {
TransactionParentIndices: nil,
}
c.HandleMainBlock(blockData)
case fullMinerData := <-s.FullMinerDataC:
}, func(txs []zmq.FullTxPoolAdd) {
}, func(fullMinerData *zmq.FullMinerData) {
pool := make(mempool.Mempool, len(fullMinerData.TxBacklog))
for i := range fullMinerData.TxBacklog {
pool[i] = &mempool.MempoolEntry{
Id: fullMinerData.TxBacklog[i].Id,
BlobSize: fullMinerData.TxBacklog[i].BlobSize,
Weight: fullMinerData.TxBacklog[i].Weight,
Fee: fullMinerData.TxBacklog[i].Fee,
}
}
c.HandleMinerData(&p2pooltypes.MinerData{
MajorVersion: fullMinerData.MajorVersion,
Height: fullMinerData.Height,
@ -154,10 +159,18 @@ func (c *MainChain) Listen() error {
MedianWeight: fullMinerData.MedianWeight,
AlreadyGeneratedCoins: fullMinerData.AlreadyGeneratedCoins,
MedianTimestamp: fullMinerData.MedianTimestamp,
TxBacklog: pool,
TimeReceived: time.Now(),
})
}
}, func(chainMain *zmq.MinimalChainMain) {
}, func(txs []zmq.TxMempoolData) {
//TODO
})
if err != nil {
return err
}
return nil
}
func (c *MainChain) getTimestamps(timestamps []uint64) bool {
@ -264,60 +277,6 @@ func (c *MainChain) HandleMainBlock(b *mainblock.Block) {
c.updateTip()
}
// HandleChainMain
// Deprecated
func (c *MainChain) HandleChainMain(mainData *sidechain.ChainMain, extra []byte) {
func() {
c.lock.Lock()
defer c.lock.Unlock()
if h, ok := c.mainchainByHeight[mainData.Height]; ok {
h.Height = mainData.Height
h.Timestamp = mainData.Timestamp
h.Reward = mainData.Reward
mainData.Id = h.Id
mainData.Difficulty = h.Difficulty
c.mainchainByHash[h.Id] = h
} else {
return
}
if mainData.Height > c.highest {
c.highest = mainData.Height
}
log.Printf("[MainChain] new main chain block: height = %d, id = %s, timestamp = %d, reward = %s", mainData.Height, mainData.Id.String(), mainData.Timestamp, utils.XMRUnits(mainData.Reward))
c.updateMedianTimestamp()
}()
c.p2pool.UpdateMainData(mainData)
var tags transaction.ExtraTags
if err := tags.UnmarshalBinary(extra); err != nil {
return
}
extraMergeMiningTag := tags.GetTag(transaction.TxExtraTagMergeMining)
if extraMergeMiningTag == nil {
return
}
sidechainHashData := extraMergeMiningTag.Data
if len(sidechainHashData) != types.HashSize {
return
}
sidechainId := types.HashFromBytes(sidechainHashData)
if block := c.sidechain.GetPoolBlockByTemplateId(sidechainId); block != nil {
c.p2pool.UpdateBlockFound(mainData, block)
} else {
c.sidechain.WatchMainChainBlock(mainData, sidechainId)
}
c.updateTip()
}
func (c *MainChain) GetChainMainByHeight(height uint64) *sidechain.ChainMain {
c.lock.RLock()
defer c.lock.RUnlock()

View file

@ -178,7 +178,7 @@ func NewP2Pool(consensus *sidechain.Consensus, settings map[string]string) (*P2P
return nil, err
}
pool.zmqClient = zmq.NewClient(settings["zmq-url"], zmq.TopicFullChainMain, zmq.TopicFullMinerData)
pool.zmqClient = zmq.NewClient(settings["zmq-url"], zmq.TopicFullChainMain, zmq.TopicFullMinerData, zmq.TopicMinimalTxPoolAdd)
pool.sidechain = sidechain.NewSideChain(pool)

View file

@ -1,6 +1,7 @@
package types
import (
"git.gammaspectra.live/P2Pool/p2pool-observer/p2pool/mempool"
"git.gammaspectra.live/P2Pool/p2pool-observer/types"
"time"
)
@ -14,6 +15,6 @@ type MinerData struct {
MedianWeight uint64 `json:"median_weight"`
AlreadyGeneratedCoins uint64 `json:"already_generated_coins"`
MedianTimestamp uint64 `json:"median_timestamp"`
//TxBacklog any
TimeReceived time.Time `json:"time_received"`
TimeReceived time.Time `json:"time_received"`
TxBacklog mempool.Mempool `json:"tx_backlog"`
}