Update to protocol 1.2, implement BlockNotify

* Bump Software version to 3.0
* Remove old fast sync extension
* SideChain: fix for out of order blocks
* SideChain: shuffle blocks in tests
This commit is contained in:
DataHoarder 2023-07-21 16:20:41 +02:00
parent 35b5b46160
commit b0fe55c8a8
Signed by: DataHoarder
SSH key fingerprint: SHA256:OLTRf6Fl87G52SiR7sWLGNzlJt4WOX+tfI2yxo0z7xk
7 changed files with 142 additions and 119 deletions

View file

@ -25,6 +25,7 @@ import (
const DefaultBanTime = time.Second * 600
const PeerListResponseMaxPeers = 16
const PeerRequestDelay = 60
const MaxBufferSize = 128 * 1024
@ -232,13 +233,26 @@ func (c *Client) SendUniqueBlockRequest(hash types.Hash) {
}
func (c *Client) SendBlockRequest(id types.Hash) {
if len(c.blockPendingRequests) < 80 {
c.SendBlockRequestWithBound(id, 80)
}
func (c *Client) SendBlockRequestWithBound(id types.Hash, bound int) bool {
if len(c.blockPendingRequests) < bound {
c.blockPendingRequests <- id
c.SendMessage(&ClientMessage{
MessageId: MessageBlockRequest,
Buffer: id[:],
})
return true
}
return false
}
func (c *Client) SendBlockNotify(id types.Hash) {
c.SendMessage(&ClientMessage{
MessageId: MessageBlockNotify,
Buffer: id[:],
})
}
func (c *Client) SendBlockResponse(block *sidechain.PoolBlock) {
@ -263,33 +277,8 @@ func (c *Client) SendBlockResponse(block *sidechain.PoolBlock) {
}
}
func (c *Client) SendInternalFastTemplateHeaderSyncRequest(hash types.Hash) {
buf := make([]byte, 0, binary.MaxVarintLen64*2+types.HashSize)
buf = binary.AppendUvarint(buf, uint64(InternalMessageFastTemplateHeaderSyncRequest))
buf = binary.AppendUvarint(buf, uint64(types.HashSize))
buf = append(buf, hash[:]...)
c.SendMessage(&ClientMessage{
MessageId: MessageInternal,
Buffer: buf,
})
}
func (c *Client) SendInternalFastTemplateHeaderSyncResponse(hashes ...types.Hash) {
buf := make([]byte, 0, binary.MaxVarintLen64*2+8+len(hashes)*types.HashSize)
buf = binary.AppendUvarint(buf, uint64(InternalMessageFastTemplateHeaderSyncResponse))
buf = binary.AppendUvarint(buf, uint64(8+len(hashes)*types.HashSize))
buf = binary.LittleEndian.AppendUint64(buf, uint64(len(hashes)))
for _, h := range hashes {
buf = append(buf, h[:]...)
}
c.SendMessage(&ClientMessage{
MessageId: MessageInternal,
Buffer: buf,
})
}
func (c *Client) SendPeerListRequest() {
c.NextOutgoingPeerListRequestTimestamp.Store(uint64(time.Now().Unix()) + 60 + (unsafeRandom.Uint64() % 61))
c.NextOutgoingPeerListRequestTimestamp.Store(uint64(time.Now().Unix()) + PeerRequestDelay + (unsafeRandom.Uint64() % (PeerRequestDelay + 1)))
c.SendMessage(&ClientMessage{
MessageId: MessagePeerListRequest,
})
@ -483,6 +472,16 @@ func (c *Client) OnConnection() {
}
}
if block != nil && c.VersionInformation.SupportsFeature(p2pooltypes.FeatureBlockNotify) {
c.SendBlockNotify(block.Side.Parent)
for _, uncleId := range block.Side.Uncles {
c.SendBlockNotify(uncleId)
}
if parent := c.Owner.SideChain().GetParent(block); parent != nil {
c.SendBlockNotify(parent.Side.Parent)
}
}
c.SendBlockResponse(block)
case MessageBlockResponse:
block := &sidechain.PoolBlock{
@ -527,6 +526,16 @@ func (c *Client) OnConnection() {
return
}
//Atomic max, not necessary as no external writers exist
topHeight := max(c.BroadcastMaxHeight.Load(), block.Side.Height)
for {
if oldHeight := c.BroadcastMaxHeight.Swap(topHeight); oldHeight <= topHeight {
break
} else {
topHeight = oldHeight
}
}
if time.Now().Unix() >= int64(c.NextOutgoingPeerListRequestTimestamp.Load()) {
c.SendPeerListRequest()
}
@ -786,6 +795,24 @@ func (c *Client) OnConnection() {
}
}
}
case MessageBlockNotify:
c.LastBlockRequestTimestamp.Store(uint64(time.Now().Unix()))
var templateId types.Hash
if err := binary.Read(c, binary.LittleEndian, &templateId); err != nil {
c.Ban(DefaultBanTime, err)
return
}
c.BroadcastedHashes.Push(templateId)
// If we don't know about this block, request it from this peer. The peer can do it to speed up our initial sync, for example.
if c.Owner.SideChain().GetPoolBlockByTemplateId(templateId) == nil {
//TODO: prevent sending duplicate requests
if c.SendBlockRequestWithBound(templateId, 25) {
}
}
case MessageInternal:
internalMessageId, err := binary.ReadUvarint(c)
@ -799,69 +826,10 @@ func (c *Client) OnConnection() {
return
}
reader := io.LimitReader(c, int64(messageSize))
_ = reader
switch InternalMessageId(internalMessageId) {
case InternalMessageFastTemplateHeaderSyncRequest:
c.LastBlockRequestTimestamp.Store(uint64(time.Now().Unix()))
var fromTemplateId types.Hash
if err := binary.Read(reader, binary.LittleEndian, &fromTemplateId); err != nil {
c.Ban(DefaultBanTime, err)
return
}
log.Printf("[P2PClient] Peer %s: received InternalMessageFastTemplateHeaderSyncRequest for %s", c.AddressPort, fromTemplateId)
//TODO: this could just be a sample of like 16 blocks
var blocks, uncles sidechain.UniquePoolBlockSlice
if fromTemplateId == types.ZeroHash {
tip := c.Owner.SideChain().GetChainTip()
if tip != nil {
blocks, uncles = c.Owner.SideChain().GetPoolBlocksFromTip(tip.SideTemplateId(c.Owner.Consensus()))
}
} else {
blocks, uncles = c.Owner.SideChain().GetPoolBlocksFromTip(fromTemplateId)
}
hashes := make([]types.Hash, 0, len(blocks)+len(uncles))
for _, b := range blocks {
hashes = append(hashes, b.SideTemplateId(c.Owner.Consensus()))
}
for _, u := range uncles {
hashes = append(hashes, u.SideTemplateId(c.Owner.Consensus()))
}
if uint64(len(hashes)) > c.Owner.Consensus().ChainWindowSize*3 {
hashes = hashes[:c.Owner.Consensus().ChainWindowSize*3]
}
//shuffle to have random chances of faster sync
unsafeRandom.Shuffle(len(hashes), func(i, j int) {
hashes[i], hashes[j] = hashes[j], hashes[i]
})
maxLen := (MaxBufferSize - 128) / types.HashSize
if len(hashes) > maxLen {
hashes = hashes[:maxLen]
}
c.SendInternalFastTemplateHeaderSyncResponse(hashes...)
case InternalMessageFastTemplateHeaderSyncResponse:
var hashLen uint64
if err = binary.Read(c, binary.LittleEndian, &hashLen); err != nil {
c.Ban(DefaultBanTime, err)
return
}
if hashLen > c.Owner.Consensus().ChainWindowSize*3 {
c.Ban(DefaultBanTime, errors.New("size error"))
return
}
log.Printf("[P2PClient] Peer %s: received InternalMessageFastTemplateHeaderSyncResponse with size %d", c.AddressPort, hashLen)
var hash types.Hash
clients := c.Owner.Clients()
for i := uint64(0); i < hashLen; i++ {
if err := binary.Read(reader, binary.LittleEndian, &hash); err != nil {
c.Ban(DefaultBanTime, err)
return
}
clients = c.SendMissingBlockRequestAtRandom(hash, clients)
}
default:
c.Ban(DefaultBanTime, fmt.Errorf("unknown InternalMessageId %d", internalMessageId))
return
@ -876,13 +844,7 @@ func (c *Client) OnConnection() {
}
func (c *Client) afterInitialProtocolExchange() {
if c.VersionInformation.SupportsFeature(p2pooltypes.InternalFeatureFastTemplateHeaderSync) {
if tip := c.LastKnownTip.Load(); tip != nil {
c.SendInternalFastTemplateHeaderSyncRequest(tip.SideTemplateId(c.Owner.Consensus()))
} else {
c.SendInternalFastTemplateHeaderSyncRequest(types.ZeroHash)
}
}
//TODO: use notify to send fast sync data
}
func (c *Client) sendHandshakeChallenge() {

View file

@ -14,13 +14,10 @@ const (
MessagePeerListResponse
// MessageBlockBroadcastCompact Protocol 1.1
MessageBlockBroadcastCompact
// MessageBlockNotify Protocol 1.2
MessageBlockNotify
MessageInternal = 0xff
)
type InternalMessageId uint64
const (
InternalMessageFastTemplateHeaderSyncRequest = InternalMessageId(iota)
InternalMessageFastTemplateHeaderSyncResponse
)

View file

@ -823,20 +823,32 @@ func (s *Server) Broadcast(block *sidechain.PoolBlock) {
prunedMessage, compactMessage = message, message
}
blockTemplateId := block.SideTemplateId(s.Consensus())
go func() {
for _, c := range s.Clients() {
if c.IsGood() {
if !func() bool {
if !func() (sent bool) {
broadcastedHashes := c.BroadcastedHashes.Slice()
// has peer not broadcasted block parent to us?
if slices.Index(broadcastedHashes, block.Side.Parent) == -1 {
return false
}
// has peer not broadcasted block uncles to us?
for _, uncleHash := range block.Side.Uncles {
if slices.Index(broadcastedHashes, uncleHash) == -1 {
return false
}
}
// has peer broadcasted this block to us?
if slices.Index(broadcastedHashes, blockTemplateId) != -1 &&
c.VersionInformation.SupportsFeature(p2pooltypes.FeatureBlockNotify) {
c.SendBlockNotify(blockTemplateId)
return true
}
if c.VersionInformation.SupportsFeature(p2pooltypes.FeatureCompactBroadcast) {
c.SendMessage(compactMessage)
} else {

View file

@ -19,6 +19,7 @@ import (
"github.com/dolthub/swiss"
"io"
"log"
"math/rand"
"slices"
"sync"
"sync/atomic"
@ -786,20 +787,32 @@ func (c *SideChain) verifyBlock(block *PoolBlock) (verification error, invalid e
}
func (c *SideChain) updateDepths(block *PoolBlock) {
preCalcDepth := c.Consensus().ChainWindowSize + UncleBlockDepth - 1
updateDepth := func(b *PoolBlock, newDepth uint64) {
oldDepth := b.Depth.Load()
if oldDepth < newDepth {
b.Depth.Store(newDepth)
if oldDepth < preCalcDepth && newDepth >= preCalcDepth {
//TODO launchPrecalc
}
}
}
for i := uint64(1); i <= UncleBlockDepth; i++ {
blocksAtHeight, _ := c.blocksByHeight.Get(block.Side.Height + i)
for _, child := range blocksAtHeight {
if child.Side.Parent == block.SideTemplateId(c.Consensus()) {
if i != 1 {
log.Printf("[SideChain] Block %s side height %d is inconsistent with parent's side_height %d", block.SideTemplateId(c.Consensus()), block.Side.Height, child.Side.Height)
log.Printf("[SideChain] Block %s side height %d is inconsistent with child's side_height %d", block.SideTemplateId(c.Consensus()), block.Side.Height, child.Side.Height)
return
} else {
block.Depth.Store(max(block.Depth.Load(), child.Depth.Load()+1))
updateDepth(block, child.Depth.Load()+1)
}
}
if ix := slices.Index(child.Side.Uncles, block.SideTemplateId(c.Consensus())); ix != 1 {
block.Depth.Store(max(block.Depth.Load(), child.Depth.Load()+1))
updateDepth(block, child.Depth.Load()+1)
}
}
}
@ -817,6 +830,31 @@ func (c *SideChain) updateDepths(block *PoolBlock) {
_ = c.verifyLoop(block)
}
for i := uint64(1); i <= UncleBlockDepth; i++ {
for _, child := range c.getPoolBlocksByHeight(block.Side.Height + i) {
oldDepth := child.Depth.Load()
if child.Side.Parent == block.SideTemplateId(c.Consensus()) {
if i != 1 {
log.Printf("[SideChain] Block %s side height %d is inconsistent with child's side_height %d", block.SideTemplateId(c.Consensus()), block.Side.Height, child.Side.Height)
return
} else if blockDepth > 0 {
updateDepth(child, blockDepth-1)
}
}
if slices.Contains(child.Side.Uncles, block.SideTemplateId(c.Consensus())) {
if blockDepth > i {
updateDepth(child, blockDepth-i)
}
}
if child.Depth.Load() > oldDepth {
blocksToUpdate = append(blocksToUpdate, child)
}
}
}
if parent := block.iteratorGetParent(c.getPoolBlockByTemplateId); parent != nil {
if parent.Side.Height+1 != block.Side.Height {
log.Printf("[SideChain] Block %s side height %d is inconsistent with parent's side_height %d", block.SideTemplateId(c.Consensus()), block.Side.Height, parent.Side.Height)
@ -824,7 +862,7 @@ func (c *SideChain) updateDepths(block *PoolBlock) {
}
if parent.Depth.Load() < blockDepth+1 {
parent.Depth.Store(blockDepth + 1)
updateDepth(parent, blockDepth+1)
blocksToUpdate = append(blocksToUpdate, parent)
}
}
@ -840,7 +878,7 @@ func (c *SideChain) updateDepths(block *PoolBlock) {
d := block.Side.Height - uncle.Side.Height
if uncle.Depth.Load() < blockDepth+d {
uncle.Depth.Store(blockDepth + d)
updateDepth(uncle, blockDepth+d)
blocksToUpdate = append(blocksToUpdate, uncle)
}
})
@ -1051,7 +1089,7 @@ func (c *SideChain) GetParent(block *PoolBlock) *PoolBlock {
}
func (c *SideChain) getParent(block *PoolBlock) *PoolBlock {
return c.getPoolBlockByTemplateId(block.Side.Parent)
return block.iteratorGetParent(c.getPoolBlockByTemplateId)
}
func (c *SideChain) GetPoolBlockByTemplateId(id types.Hash) *PoolBlock {
@ -1168,6 +1206,9 @@ func (c *SideChain) isLongerChain(block, candidate *PoolBlock) (isLonger, isAlte
func (c *SideChain) LoadTestData(reader io.Reader, patchedBlocks ...[]byte) error {
var err error
buf := make([]byte, PoolBlockMaxTemplateSize)
blocks := make([]*PoolBlock, 0, c.Consensus().ChainWindowSize*3)
for {
buf = buf[:0]
var blockLen uint32
@ -1183,9 +1224,7 @@ func (c *SideChain) LoadTestData(reader io.Reader, patchedBlocks ...[]byte) erro
if err = b.UnmarshalBinary(c.Consensus(), c.DerivationCache(), buf[:blockLen]); err != nil {
return err
}
if err = c.AddPoolBlock(b); err != nil {
return err
}
blocks = append(blocks, b)
}
for _, buf := range patchedBlocks {
@ -1193,6 +1232,15 @@ func (c *SideChain) LoadTestData(reader io.Reader, patchedBlocks ...[]byte) erro
if err = b.UnmarshalBinary(c.Consensus(), c.DerivationCache(), buf); err != nil {
return err
}
blocks = append(blocks, b)
}
// Shuffle blocks
rand.Shuffle(len(blocks), func(i, j int) {
blocks[i], blocks[j] = blocks[j], blocks[i]
})
for _, b := range blocks {
if err = c.AddPoolBlock(b); err != nil {
return err
}

View file

@ -1,7 +1,6 @@
package sidechain
import (
"encoding/binary"
"fmt"
"git.gammaspectra.live/P2Pool/p2pool-observer/monero"
"git.gammaspectra.live/P2Pool/p2pool-observer/monero/block"
@ -319,8 +318,7 @@ func ShuffleShares[T any](shares []T, shareVersion ShareVersion, privateKeySeed
func ShuffleSequence(shareVersion ShareVersion, privateKeySeed types.Hash, items int, swap func(i, j int)) {
n := uint64(items)
if shareVersion > ShareVersion_V1 && n > 1 {
h := crypto.PooledKeccak256(privateKeySeed[:])
seed := binary.LittleEndian.Uint64(h[:])
seed := crypto.PooledKeccak256(privateKeySeed[:]).Uint64()
if seed == 0 {
seed = 1

View file

@ -19,7 +19,7 @@ type ProtocolFeature int
const (
FeatureCompactBroadcast = ProtocolFeature(iota)
InternalFeatureFastTemplateHeaderSync
FeatureBlockNotify
)
type PeerVersionInformation struct {
@ -32,8 +32,8 @@ func (i *PeerVersionInformation) SupportsFeature(feature ProtocolFeature) bool {
switch feature {
case FeatureCompactBroadcast:
return i.Protocol >= ProtocolVersion_1_1
/*case InternalFeatureFastTemplateHeaderSync:
return i.Protocol >= ProtocolVersion_1_1 && i.SoftwareId == SoftwareIdGoObserver && i.SoftwareVersion.Major() == 1 && i.SoftwareVersion >= ((1<<16)|1)*/
case FeatureBlockNotify:
return i.Protocol >= ProtocolVersion_1_2
default:
return false
}
@ -71,6 +71,7 @@ const (
ProtocolVersion_0_0 ProtocolVersion = (0 << 16) | 0
ProtocolVersion_1_0 ProtocolVersion = (1 << 16) | 0
ProtocolVersion_1_1 ProtocolVersion = (1 << 16) | 1
ProtocolVersion_1_2 ProtocolVersion = (1 << 16) | 2
)
type SoftwareVersion SemanticVersion
@ -86,8 +87,8 @@ func (v SoftwareVersion) String() string {
return SemanticVersion(v).String()
}
const SupportedProtocolVersion = ProtocolVersion_1_1
const CurrentSoftwareVersion SoftwareVersion = (2 << 16) | 0
const SupportedProtocolVersion = ProtocolVersion_1_2
const CurrentSoftwareVersion SoftwareVersion = (3 << 16) | 0
const CurrentSoftwareId = SoftwareIdGoObserver
type SoftwareId uint32

View file

@ -2,6 +2,7 @@ package types
import (
"database/sql/driver"
"encoding/binary"
"encoding/hex"
"errors"
"git.gammaspectra.live/P2Pool/p2pool-observer/utils"
@ -95,6 +96,10 @@ func (h Hash) String() string {
return hex.EncodeToString(h[:])
}
func (h Hash) Uint64() uint64 {
return binary.LittleEndian.Uint64(h[:])
}
func (h *Hash) Scan(src any) error {
if src == nil {
return nil