917 lines
24 KiB
Go
917 lines
24 KiB
Go
package p2p
|
|
|
|
import (
|
|
"context"
|
|
"crypto/rand"
|
|
"encoding/binary"
|
|
"errors"
|
|
"fmt"
|
|
"git.gammaspectra.live/P2Pool/consensus/v4/monero/client"
|
|
"git.gammaspectra.live/P2Pool/consensus/v4/p2pool/mainchain"
|
|
"git.gammaspectra.live/P2Pool/consensus/v4/p2pool/sidechain"
|
|
p2pooltypes "git.gammaspectra.live/P2Pool/consensus/v4/p2pool/types"
|
|
"git.gammaspectra.live/P2Pool/consensus/v4/types"
|
|
"git.gammaspectra.live/P2Pool/consensus/v4/utils"
|
|
unsafeRandom "math/rand/v2"
|
|
"net"
|
|
"net/netip"
|
|
"slices"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
"unsafe"
|
|
)
|
|
|
|
type P2PoolInterface interface {
|
|
sidechain.ConsensusProvider
|
|
SideChain() *sidechain.SideChain
|
|
MainChain() *mainchain.MainChain
|
|
GetChainMainTip() *sidechain.ChainMain
|
|
GetMinerDataTip() *p2pooltypes.MinerData
|
|
ClientRPC() *client.Client
|
|
}
|
|
|
|
type PeerListEntry struct {
|
|
AddressPort netip.AddrPort
|
|
FailedConnections atomic.Uint32
|
|
LastSeenTimestamp atomic.Uint64
|
|
}
|
|
|
|
type PeerList []*PeerListEntry
|
|
|
|
func (l PeerList) Get(addr netip.Addr) *PeerListEntry {
|
|
if i := slices.IndexFunc(l, func(entry *PeerListEntry) bool {
|
|
return entry.AddressPort.Addr().Compare(addr) == 0
|
|
}); i != -1 {
|
|
return l[i]
|
|
}
|
|
return nil
|
|
}
|
|
func (l PeerList) Delete(addr netip.Addr) PeerList {
|
|
ret := l
|
|
for i := slices.IndexFunc(ret, func(entry *PeerListEntry) bool {
|
|
return entry.AddressPort.Addr().Compare(addr) == 0
|
|
}); i != -1; i = slices.IndexFunc(ret, func(entry *PeerListEntry) bool {
|
|
return entry.AddressPort.Addr().Compare(addr) == 0
|
|
}) {
|
|
ret = slices.Delete(ret, i, i+1)
|
|
}
|
|
return ret
|
|
}
|
|
|
|
type BanEntry struct {
|
|
Expiration uint64
|
|
Error error
|
|
}
|
|
|
|
type Server struct {
|
|
p2pool P2PoolInterface
|
|
|
|
peerId uint64
|
|
versionInformation p2pooltypes.PeerVersionInformation
|
|
|
|
listenAddress netip.AddrPort
|
|
externalListenPort uint16
|
|
|
|
useIPv4 bool
|
|
useIPv6 bool
|
|
|
|
ipv6AddrsLock sync.RWMutex
|
|
ipv6OutgoingAddresses []netip.Addr
|
|
|
|
close atomic.Bool
|
|
listener *net.TCPListener
|
|
|
|
fastestPeer *Client
|
|
|
|
MaxOutgoingPeers uint32
|
|
MaxIncomingPeers uint32
|
|
|
|
NumOutgoingConnections atomic.Int32
|
|
NumIncomingConnections atomic.Int32
|
|
|
|
PendingOutgoingConnections *utils.CircularBuffer[string]
|
|
|
|
peerList PeerList
|
|
peerListLock sync.RWMutex
|
|
moneroPeerList PeerList
|
|
|
|
bansLock sync.RWMutex
|
|
bans map[[16]byte]BanEntry
|
|
|
|
clientsLock sync.RWMutex
|
|
clients []*Client
|
|
|
|
cachedBlocksLock sync.RWMutex
|
|
cachedBlocks map[types.Hash]*sidechain.PoolBlock
|
|
|
|
ctx context.Context
|
|
}
|
|
|
|
func NewServer(p2pool P2PoolInterface, listenAddress string, externalListenPort uint16, maxOutgoingPeers, maxIncomingPeers uint32, useIPv4, useIPv6 bool, ctx context.Context) (*Server, error) {
|
|
peerId := make([]byte, int(unsafe.Sizeof(uint64(0))))
|
|
_, err := rand.Read(peerId)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
addrPort, err := netip.ParseAddrPort(listenAddress)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
s := &Server{
|
|
p2pool: p2pool,
|
|
listenAddress: addrPort,
|
|
externalListenPort: externalListenPort,
|
|
peerId: binary.LittleEndian.Uint64(peerId),
|
|
MaxOutgoingPeers: min(maxOutgoingPeers, 450),
|
|
MaxIncomingPeers: min(maxIncomingPeers, 450),
|
|
cachedBlocks: make(map[types.Hash]*sidechain.PoolBlock, p2pool.Consensus().ChainWindowSize*3),
|
|
versionInformation: p2pooltypes.PeerVersionInformation{
|
|
SoftwareId: p2pooltypes.CurrentSoftwareId,
|
|
SoftwareVersion: p2pooltypes.CurrentSoftwareVersion,
|
|
Protocol: p2pooltypes.SupportedProtocolVersion,
|
|
},
|
|
useIPv4: useIPv4,
|
|
useIPv6: useIPv6,
|
|
ctx: ctx,
|
|
bans: make(map[[16]byte]BanEntry),
|
|
}
|
|
|
|
s.PendingOutgoingConnections = utils.NewCircularBuffer[string](int(s.MaxOutgoingPeers))
|
|
s.RefreshOutgoingIPv6()
|
|
|
|
return s, nil
|
|
}
|
|
|
|
func (s *Server) RefreshOutgoingIPv6() {
|
|
utils.Logf("P2PServer", "Refreshing outgoing IPv6")
|
|
addrs, _ := utils.GetOutboundIPv6()
|
|
s.ipv6AddrsLock.Lock()
|
|
defer s.ipv6AddrsLock.Unlock()
|
|
s.ipv6OutgoingAddresses = addrs
|
|
for _, a := range addrs {
|
|
utils.Logf("P2PServer", "Outgoing IPv6: %s", a.String())
|
|
}
|
|
}
|
|
|
|
func (s *Server) GetOutgoingIPv6() []netip.Addr {
|
|
s.ipv6AddrsLock.RLock()
|
|
defer s.ipv6AddrsLock.RUnlock()
|
|
return s.ipv6OutgoingAddresses
|
|
}
|
|
|
|
func (s *Server) ListenPort() uint16 {
|
|
return s.listenAddress.Port()
|
|
}
|
|
|
|
func (s *Server) ExternalListenPort() uint16 {
|
|
if s.externalListenPort != 0 {
|
|
return s.externalListenPort
|
|
} else {
|
|
return s.listenAddress.Port()
|
|
}
|
|
}
|
|
|
|
func (s *Server) AddToPeerList(addressPort netip.AddrPort) {
|
|
if addressPort.Addr().IsLoopback() {
|
|
return
|
|
}
|
|
addr := addressPort.Addr().Unmap()
|
|
|
|
if !s.useIPv4 && addr.Is4() {
|
|
return
|
|
} else if !s.useIPv6 && addr.Is6() {
|
|
return
|
|
}
|
|
|
|
s.peerListLock.Lock()
|
|
defer s.peerListLock.Unlock()
|
|
if e := s.peerList.Get(addr); e == nil {
|
|
if ok, _ := s.IsBanned(addr); ok {
|
|
return
|
|
}
|
|
e = &PeerListEntry{
|
|
AddressPort: netip.AddrPortFrom(addr, addressPort.Port()),
|
|
}
|
|
e.LastSeenTimestamp.Store(uint64(time.Now().Unix()))
|
|
s.peerList = append(s.peerList, e)
|
|
} else {
|
|
e.LastSeenTimestamp.Store(uint64(time.Now().Unix()))
|
|
}
|
|
}
|
|
|
|
func (s *Server) UpdateInPeerList(addressPort netip.AddrPort) {
|
|
if addressPort.Addr().IsLoopback() {
|
|
return
|
|
}
|
|
addr := addressPort.Addr().Unmap()
|
|
|
|
if !s.useIPv4 && addr.Is4() {
|
|
return
|
|
} else if !s.useIPv6 && addr.Is6() {
|
|
return
|
|
}
|
|
s.peerListLock.Lock()
|
|
defer s.peerListLock.Unlock()
|
|
if e := s.peerList.Get(addr); e == nil {
|
|
if ok, _ := s.IsBanned(addr); ok {
|
|
return
|
|
}
|
|
e = &PeerListEntry{
|
|
AddressPort: netip.AddrPortFrom(addr, addressPort.Port()),
|
|
}
|
|
e.LastSeenTimestamp.Store(uint64(time.Now().Unix()))
|
|
s.peerList = append(s.peerList, e)
|
|
} else {
|
|
e.FailedConnections.Store(0)
|
|
e.LastSeenTimestamp.Store(uint64(time.Now().Unix()))
|
|
}
|
|
}
|
|
|
|
func (s *Server) PeerList() PeerList {
|
|
s.peerListLock.RLock()
|
|
defer s.peerListLock.RUnlock()
|
|
|
|
return slices.Clone(s.peerList)
|
|
}
|
|
|
|
func (s *Server) RemoveFromPeerList(ip netip.Addr) {
|
|
ip = ip.Unmap()
|
|
s.peerListLock.Lock()
|
|
defer s.peerListLock.Unlock()
|
|
for i := len(s.peerList) - 1; i >= 0; i-- {
|
|
a := s.peerList[i]
|
|
if a.AddressPort.Addr().Compare(ip) == 0 {
|
|
s.peerList = slices.Delete(s.peerList, i, i+1)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *Server) GetFastestClient() *Client {
|
|
var client *Client
|
|
var ping uint64
|
|
for _, c := range s.Clients() {
|
|
p := c.PingDuration.Load()
|
|
if c.IsGood() && p != 0 && (ping == 0 || p < ping) {
|
|
client = c
|
|
ping = p
|
|
}
|
|
}
|
|
|
|
return client
|
|
}
|
|
|
|
func (s *Server) UpdatePeerList() {
|
|
curTime := uint64(time.Now().Unix())
|
|
for _, c := range s.Clients() {
|
|
if c.IsGood() && curTime >= c.NextOutgoingPeerListRequestTimestamp.Load() {
|
|
c.SendPeerListRequest()
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *Server) CleanupBanList() {
|
|
s.bansLock.Lock()
|
|
defer s.bansLock.Unlock()
|
|
|
|
currentTime := uint64(time.Now().Unix())
|
|
|
|
for k, b := range s.bans {
|
|
if currentTime >= b.Expiration {
|
|
delete(s.bans, k)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *Server) UpdateClientConnections() {
|
|
|
|
currentTime := uint64(time.Now().Unix())
|
|
lastUpdated := uint64(s.SideChain().LastUpdated().Unix())
|
|
|
|
var hasGoodPeers bool
|
|
var fastestPeer *Client
|
|
|
|
connectedClients := s.Clients()
|
|
|
|
connectedPeers := make([]netip.Addr, 0, len(connectedClients))
|
|
|
|
for _, client := range connectedClients {
|
|
timeout := uint64(10)
|
|
if client.HandshakeComplete.Load() {
|
|
timeout = 300
|
|
}
|
|
|
|
lastAlive := client.LastActiveTimestamp.Load()
|
|
|
|
if currentTime >= (lastAlive + timeout) {
|
|
idleTime := currentTime - lastAlive
|
|
utils.Logf("P2PServer", "peer %s has been idle for %d seconds, disconnecting", client.AddressPort, idleTime)
|
|
client.Close()
|
|
continue
|
|
}
|
|
|
|
if client.HandshakeComplete.Load() && client.LastBroadcastTimestamp.Load() > 0 {
|
|
// - Side chain is at least 15 minutes newer (last_updated >= client->m_lastBroadcastTimestamp + 900)
|
|
// - It's been at least 10 seconds since side chain updated (cur_time >= last_updated + 10)
|
|
// - It's been at least 10 seconds since the last block request (peer is not syncing)
|
|
// - Peer should have sent a broadcast by now
|
|
|
|
if lastUpdated > 0 && (currentTime >= max(lastUpdated, client.LastBlockRequestTimestamp.Load())+10) && (lastUpdated >= client.LastBroadcastTimestamp.Load()+900) {
|
|
dt := lastUpdated - client.LastBroadcastTimestamp.Load()
|
|
client.Ban(DefaultBanTime, fmt.Errorf("not broadcasting blocks (last update %d seconds ago)", dt))
|
|
client.Close()
|
|
continue
|
|
}
|
|
}
|
|
|
|
connectedPeers = append(connectedPeers, client.AddressPort.Addr().Unmap())
|
|
if client.IsGood() {
|
|
hasGoodPeers = true
|
|
if client.PingDuration.Load() >= 0 && (fastestPeer == nil || fastestPeer.PingDuration.Load() > client.PingDuration.Load()) {
|
|
fastestPeer = client
|
|
}
|
|
}
|
|
}
|
|
|
|
deletedPeers := 0
|
|
peerList := s.PeerList()
|
|
peerList2 := slices.Clone(peerList)
|
|
for _, p := range peerList {
|
|
if slices.ContainsFunc(connectedPeers, func(addr netip.Addr) bool {
|
|
return p.AddressPort.Addr().Compare(addr) == 0
|
|
}) {
|
|
p.LastSeenTimestamp.Store(currentTime)
|
|
}
|
|
if (p.LastSeenTimestamp.Load() + 3600) < currentTime {
|
|
peerList2 = peerList2.Delete(p.AddressPort.Addr())
|
|
deletedPeers++
|
|
}
|
|
}
|
|
peerList = peerList2
|
|
|
|
if deletedPeers > 0 {
|
|
func() {
|
|
s.peerListLock.Lock()
|
|
defer s.peerListLock.Unlock()
|
|
s.peerList = slices.Clone(peerList)
|
|
}()
|
|
}
|
|
|
|
N := int(s.MaxOutgoingPeers)
|
|
|
|
// Special case: when we can't find p2pool peers, scan through monerod peers (try 25 peers at a time)
|
|
if !hasGoodPeers && len(s.moneroPeerList) > 0 {
|
|
utils.Logf("P2PServer", "Scanning monerod peers, %d left", len(s.moneroPeerList))
|
|
for i := 0; i < 25 && len(s.moneroPeerList) > 0; i++ {
|
|
peerList = append(peerList, s.moneroPeerList[len(s.moneroPeerList)-1])
|
|
s.moneroPeerList = s.moneroPeerList[:len(s.moneroPeerList)-1]
|
|
}
|
|
N = len(peerList)
|
|
}
|
|
|
|
var wg sync.WaitGroup
|
|
attempts := 0
|
|
|
|
for i := s.NumOutgoingConnections.Load() - s.NumIncomingConnections.Load(); int(i) < N && len(peerList) > 0; {
|
|
k := unsafeRandom.IntN(len(peerList)) % len(peerList)
|
|
peer := peerList[k]
|
|
|
|
if !slices.ContainsFunc(connectedPeers, func(addr netip.Addr) bool {
|
|
return peer.AddressPort.Addr().Compare(addr) == 0
|
|
}) {
|
|
wg.Add(1)
|
|
attempts++
|
|
go func() {
|
|
defer wg.Done()
|
|
|
|
if s.NumOutgoingConnections.Load() >= int32(s.MaxOutgoingPeers) {
|
|
return
|
|
}
|
|
if err := s.Connect(peer.AddressPort); err != nil {
|
|
utils.Logf("P2PServer", "Connection to %s rejected (%s)", peer.AddressPort.String(), err.Error())
|
|
}
|
|
}()
|
|
i++
|
|
}
|
|
|
|
peerList = slices.Delete(peerList, k, k+1)
|
|
}
|
|
|
|
wg.Wait()
|
|
|
|
if attempts == 0 && !hasGoodPeers && len(s.moneroPeerList) == 0 {
|
|
utils.Logf("P2PServer", "No connections to other p2pool nodes, check your monerod/p2pool/network/firewall setup!")
|
|
if moneroPeerList, err := s.p2pool.ClientRPC().GetPeerList(); err == nil {
|
|
s.moneroPeerList = make(PeerList, 0, len(moneroPeerList.WhiteList))
|
|
for _, p := range moneroPeerList.WhiteList {
|
|
addr, err := netip.ParseAddr(p.Host)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
e := &PeerListEntry{
|
|
AddressPort: netip.AddrPortFrom(addr, s.Consensus().DefaultPort()),
|
|
}
|
|
e.LastSeenTimestamp.Store(uint64(p.LastSeen))
|
|
if ok, _ := s.IsBanned(addr); !ok {
|
|
if !s.useIPv4 && addr.Is4() {
|
|
continue
|
|
} else if !s.useIPv6 && addr.Is6() {
|
|
continue
|
|
}
|
|
s.moneroPeerList = append(s.moneroPeerList, e)
|
|
}
|
|
}
|
|
slices.SortFunc(s.moneroPeerList, func(a, b *PeerListEntry) int {
|
|
aValue, bValue := a.LastSeenTimestamp.Load(), b.LastSeenTimestamp.Load()
|
|
if aValue < bValue {
|
|
return -1
|
|
} else if aValue > bValue {
|
|
return 1
|
|
}
|
|
return 0
|
|
})
|
|
utils.Logf("P2PServer", "monerod peer list loaded (%d peers)", len(s.moneroPeerList))
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *Server) AddCachedBlock(block *sidechain.PoolBlock) {
|
|
s.cachedBlocksLock.Lock()
|
|
defer s.cachedBlocksLock.Unlock()
|
|
|
|
if s.cachedBlocks == nil {
|
|
return
|
|
}
|
|
|
|
s.cachedBlocks[block.SideTemplateId(s.p2pool.Consensus())] = block
|
|
}
|
|
|
|
func (s *Server) ClearCachedBlocks() {
|
|
s.cachedBlocksLock.Lock()
|
|
defer s.cachedBlocksLock.Unlock()
|
|
|
|
s.cachedBlocks = nil
|
|
}
|
|
|
|
func (s *Server) GetCachedBlock(hash types.Hash) *sidechain.PoolBlock {
|
|
s.cachedBlocksLock.RLock()
|
|
defer s.cachedBlocksLock.RUnlock()
|
|
|
|
return s.cachedBlocks[hash]
|
|
}
|
|
|
|
func (s *Server) GetCachedBlocksTip() (tip *sidechain.PoolBlock) {
|
|
s.cachedBlocksLock.RLock()
|
|
defer s.cachedBlocksLock.RUnlock()
|
|
|
|
for _, b := range s.cachedBlocks {
|
|
if tip == nil || tip.Side.Height < b.Side.Height || (tip.Side.Height == b.Side.Height && len(b.Side.Uncles) > len(tip.Side.Uncles)) {
|
|
tip = b
|
|
}
|
|
}
|
|
return tip
|
|
}
|
|
|
|
func (s *Server) DownloadMissingBlocks() {
|
|
clientList := s.Clients()
|
|
|
|
if len(clientList) == 0 {
|
|
return
|
|
}
|
|
|
|
s.cachedBlocksLock.RLock()
|
|
defer s.cachedBlocksLock.RUnlock()
|
|
|
|
for {
|
|
obtained := false
|
|
for _, h := range s.SideChain().GetMissingBlocks() {
|
|
if b, ok := s.cachedBlocks[h]; ok {
|
|
if _, err, _ := s.SideChain().AddPoolBlockExternal(b); err == nil {
|
|
obtained = true
|
|
continue
|
|
}
|
|
}
|
|
|
|
clientList[unsafeRandom.IntN(len(clientList))].SendUniqueBlockRequest(h)
|
|
}
|
|
if !obtained {
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *Server) Listen() (err error) {
|
|
var listener net.Listener
|
|
var ok bool
|
|
if listener, err = (&net.ListenConfig{}).Listen(s.ctx, "tcp", s.listenAddress.String()); err != nil {
|
|
return err
|
|
} else if s.listener, ok = listener.(*net.TCPListener); !ok {
|
|
return errors.New("not a tcp listener")
|
|
} else {
|
|
defer s.listener.Close()
|
|
|
|
var wg sync.WaitGroup
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
for range utils.ContextTick(s.ctx, time.Second*5) {
|
|
s.UpdatePeerList()
|
|
s.UpdateClientConnections()
|
|
}
|
|
}()
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
for range utils.ContextTick(s.ctx, time.Second) {
|
|
if s.SideChain().PreCalcFinished() {
|
|
s.ClearCachedBlocks()
|
|
break
|
|
}
|
|
|
|
s.DownloadMissingBlocks()
|
|
}
|
|
// Slow down updates for missing blocks after sync
|
|
for range utils.ContextTick(s.ctx, time.Minute*2) {
|
|
s.DownloadMissingBlocks()
|
|
}
|
|
}()
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
for range utils.ContextTick(s.ctx, time.Hour) {
|
|
s.RefreshOutgoingIPv6()
|
|
}
|
|
}()
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
for range utils.ContextTick(s.ctx, time.Minute*5) {
|
|
s.CleanupBanList()
|
|
}
|
|
}()
|
|
|
|
for !s.close.Load() {
|
|
if conn, err := s.listener.AcceptTCP(); err != nil {
|
|
utils.Errorf("P2PServer", "Connection accept failed %s", err.Error())
|
|
continue
|
|
} else {
|
|
if err = func() error {
|
|
if uint32(s.NumIncomingConnections.Load()) > s.MaxIncomingPeers {
|
|
return errors.New("incoming connections limit was reached")
|
|
}
|
|
if addrPort, err := netip.ParseAddrPort(conn.RemoteAddr().String()); err != nil {
|
|
return err
|
|
} else if !addrPort.Addr().IsLoopback() {
|
|
if clients := s.GetAddressConnected(addrPort.Addr()); !addrPort.Addr().IsLoopback() && len(clients) != 0 {
|
|
return errors.New("peer is already connected as " + clients[0].AddressPort.String())
|
|
}
|
|
|
|
addr := addrPort.Addr().Unmap()
|
|
|
|
if !s.useIPv4 && addr.Is4() {
|
|
return errors.New("peer is IPv4 but we do not allow it")
|
|
} else if !s.useIPv6 && addr.Is6() {
|
|
return errors.New("peer is IPv6 but we do not allow it")
|
|
}
|
|
|
|
if ok, b := s.IsBanned(addr); ok {
|
|
return fmt.Errorf("peer is banned: %w", b.Error)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}(); err != nil {
|
|
go func() {
|
|
defer conn.Close()
|
|
utils.Errorf("P2PServer", "Connection from %s rejected (%s)", conn.RemoteAddr().String(), err.Error())
|
|
}()
|
|
continue
|
|
}
|
|
|
|
func() {
|
|
utils.Logf("P2PServer", "Incoming connection from %s", conn.RemoteAddr().String())
|
|
|
|
s.clientsLock.Lock()
|
|
defer s.clientsLock.Unlock()
|
|
client := NewClient(s, conn)
|
|
client.IsIncomingConnection = true
|
|
s.clients = append(s.clients, client)
|
|
s.NumIncomingConnections.Add(1)
|
|
go client.OnConnection()
|
|
}()
|
|
}
|
|
|
|
}
|
|
|
|
wg.Wait()
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *Server) GetAddressConnectedPrefix(prefix netip.Prefix) (result []*Client) {
|
|
s.clientsLock.RLock()
|
|
defer s.clientsLock.RUnlock()
|
|
for _, c := range s.clients {
|
|
if prefix.Contains(c.AddressPort.Addr()) {
|
|
result = append(result, c)
|
|
}
|
|
}
|
|
return result
|
|
}
|
|
|
|
func (s *Server) GetAddressConnected(addr netip.Addr) (result []*Client) {
|
|
s.clientsLock.RLock()
|
|
defer s.clientsLock.RUnlock()
|
|
for _, c := range s.clients {
|
|
if c.AddressPort.Addr().Compare(addr) == 0 {
|
|
result = append(result, c)
|
|
}
|
|
}
|
|
return result
|
|
}
|
|
|
|
func (s *Server) DirectConnect(addrPort netip.AddrPort) (*Client, error) {
|
|
addr := addrPort.Addr().Unmap()
|
|
if !s.useIPv4 && addr.Is4() {
|
|
return nil, errors.New("peer is IPv4 but we do not allow it")
|
|
} else if !s.useIPv6 && addr.Is6() {
|
|
return nil, errors.New("peer is IPv6 but we do not allow it")
|
|
}
|
|
|
|
if clients := s.GetAddressConnected(addrPort.Addr()); !addrPort.Addr().IsLoopback() && len(clients) != 0 {
|
|
return nil, errors.New("peer is already connected as " + clients[0].AddressPort.String())
|
|
}
|
|
|
|
if !s.PendingOutgoingConnections.PushUnique(addrPort.Addr().String()) {
|
|
return nil, errors.New("peer is already attempting connection")
|
|
}
|
|
|
|
s.NumOutgoingConnections.Add(1)
|
|
|
|
var localAddr net.Addr
|
|
|
|
//select IPv6 outgoing address
|
|
if addr.Is6() {
|
|
addrs := s.GetOutgoingIPv6()
|
|
if len(addrs) > 1 {
|
|
a := addrs[unsafeRandom.IntN(len(addrs))]
|
|
localAddr = &net.TCPAddr{IP: a.AsSlice(), Zone: a.Zone()}
|
|
} else if len(addrs) == 1 {
|
|
localAddr = &net.TCPAddr{IP: addrs[0].AsSlice(), Zone: addrs[0].Zone()}
|
|
}
|
|
}
|
|
|
|
if localAddr != nil {
|
|
utils.Logf("P2PServer", "Outgoing connection to %s using %s", addrPort.String(), localAddr.String())
|
|
} else {
|
|
utils.Logf("P2PServer", "Outgoing connection to %s", addrPort.String())
|
|
}
|
|
|
|
if conn, err := (&net.Dialer{Timeout: time.Second * 5, LocalAddr: localAddr}).DialContext(s.ctx, "tcp", addrPort.String()); err != nil {
|
|
s.NumOutgoingConnections.Add(-1)
|
|
s.PendingOutgoingConnections.Replace(addrPort.Addr().String(), "")
|
|
if p := s.PeerList().Get(addrPort.Addr()); p != nil {
|
|
if p.FailedConnections.Add(1) >= 10 {
|
|
s.RemoveFromPeerList(addrPort.Addr())
|
|
}
|
|
}
|
|
return nil, err
|
|
} else if tcpConn, ok := conn.(*net.TCPConn); !ok {
|
|
s.NumOutgoingConnections.Add(-1)
|
|
s.PendingOutgoingConnections.Replace(addrPort.Addr().String(), "")
|
|
if p := s.PeerList().Get(addrPort.Addr()); p != nil {
|
|
if p.FailedConnections.Add(1) >= 10 {
|
|
s.RemoveFromPeerList(addrPort.Addr())
|
|
}
|
|
}
|
|
return nil, errors.New("not a tcp connection")
|
|
} else {
|
|
s.clientsLock.Lock()
|
|
defer s.clientsLock.Unlock()
|
|
client := NewClient(s, tcpConn)
|
|
s.clients = append(s.clients, client)
|
|
go client.OnConnection()
|
|
return client, nil
|
|
}
|
|
}
|
|
|
|
func (s *Server) Connect(addrPort netip.AddrPort) error {
|
|
if ok, b := s.IsBanned(addrPort.Addr()); ok {
|
|
return fmt.Errorf("peer is banned: %w", b.Error)
|
|
}
|
|
|
|
_, err := s.DirectConnect(addrPort)
|
|
return err
|
|
}
|
|
|
|
func (s *Server) Clients() []*Client {
|
|
s.clientsLock.RLock()
|
|
defer s.clientsLock.RUnlock()
|
|
return slices.Clone(s.clients)
|
|
}
|
|
|
|
func (s *Server) IsBanned(ip netip.Addr) (bool, *BanEntry) {
|
|
if ip.IsLoopback() {
|
|
return false, nil
|
|
}
|
|
ip = ip.Unmap()
|
|
var prefix netip.Prefix
|
|
if ip.Is6() {
|
|
//ban the /64
|
|
prefix, _ = ip.Prefix(64)
|
|
} else if ip.Is4() {
|
|
//ban only a single ip, /32
|
|
prefix, _ = ip.Prefix(32)
|
|
}
|
|
|
|
if !prefix.IsValid() {
|
|
return false, nil
|
|
}
|
|
|
|
k := prefix.Addr().As16()
|
|
|
|
if b, ok := func() (entry BanEntry, ok bool) {
|
|
s.bansLock.RLock()
|
|
defer s.bansLock.RUnlock()
|
|
entry, ok = s.bans[k]
|
|
return entry, ok
|
|
}(); ok == false {
|
|
return false, nil
|
|
} else if uint64(time.Now().Unix()) >= b.Expiration {
|
|
return false, nil
|
|
} else {
|
|
return true, &b
|
|
}
|
|
}
|
|
|
|
func (s *Server) Ban(ip netip.Addr, duration time.Duration, err error) {
|
|
if ok, _ := s.IsBanned(ip); ok {
|
|
return
|
|
}
|
|
|
|
utils.Logf("P2PServer", "Banned %s for %s: %s", ip.String(), duration.String(), err.Error())
|
|
if !ip.IsLoopback() {
|
|
ip = ip.Unmap()
|
|
var prefix netip.Prefix
|
|
if ip.Is6() {
|
|
//ban the /64
|
|
prefix, _ = ip.Prefix(64)
|
|
} else if ip.Is4() {
|
|
//ban only a single ip, /32
|
|
prefix, _ = ip.Prefix(32)
|
|
}
|
|
|
|
if prefix.IsValid() {
|
|
func() {
|
|
s.bansLock.Lock()
|
|
defer s.bansLock.Unlock()
|
|
s.bans[prefix.Addr().As16()] = BanEntry{
|
|
Error: err,
|
|
Expiration: uint64(time.Now().Unix()) + uint64(duration.Seconds()),
|
|
}
|
|
}()
|
|
for _, c := range s.GetAddressConnectedPrefix(prefix) {
|
|
c.Close()
|
|
}
|
|
}
|
|
}
|
|
|
|
}
|
|
|
|
func (s *Server) Close() {
|
|
if !s.close.Swap(true) && s.listener != nil {
|
|
s.clientsLock.Lock()
|
|
defer s.clientsLock.Unlock()
|
|
for _, c := range s.clients {
|
|
c.Connection.Close()
|
|
}
|
|
s.listener.Close()
|
|
}
|
|
}
|
|
|
|
func (s *Server) VersionInformation() *p2pooltypes.PeerVersionInformation {
|
|
return &s.versionInformation
|
|
}
|
|
|
|
func (s *Server) PeerId() uint64 {
|
|
return s.peerId
|
|
}
|
|
|
|
func (s *Server) SideChain() *sidechain.SideChain {
|
|
return s.p2pool.SideChain()
|
|
}
|
|
|
|
func (s *Server) MainChain() *mainchain.MainChain {
|
|
return s.p2pool.MainChain()
|
|
}
|
|
|
|
func (s *Server) Broadcast(block *sidechain.PoolBlock) {
|
|
var message, prunedMessage, compactMessage *ClientMessage
|
|
if block != nil {
|
|
// Full block broadcast
|
|
buffer := make([]byte, 4, block.BufferLength()+4)
|
|
blockData, err := block.AppendBinaryFlags(buffer, false, false)
|
|
if err != nil {
|
|
utils.Panicf("[P2PServer] Tried to broadcast block %s at height %d but received error: %s", block.SideTemplateId(s.Consensus()), block.Side.Height, err)
|
|
return
|
|
}
|
|
binary.LittleEndian.PutUint32(blockData, uint32(len(blockData)-4))
|
|
message = &ClientMessage{
|
|
MessageId: MessageBlockBroadcast,
|
|
Buffer: blockData,
|
|
}
|
|
|
|
// Pruned block broadcast
|
|
prunedBuffer := make([]byte, 4, block.BufferLength()+4)
|
|
prunedBlockData, _ := block.AppendBinaryFlags(prunedBuffer, true, false)
|
|
binary.LittleEndian.PutUint32(prunedBlockData, uint32(len(prunedBlockData)-4))
|
|
prunedMessage = &ClientMessage{
|
|
MessageId: MessageBlockBroadcast,
|
|
Buffer: prunedBlockData,
|
|
}
|
|
|
|
// Compact block broadcast
|
|
compactBuffer := make([]byte, 4, block.BufferLength()+4)
|
|
compactBlockData, _ := block.AppendBinaryFlags(compactBuffer, true, true)
|
|
binary.LittleEndian.PutUint32(compactBlockData, uint32(len(compactBlockData)-4))
|
|
if len(compactBlockData) >= len(prunedBlockData) {
|
|
//do not send compact if it ends up larger due to some reason, like parent missing or mismatch in transactions
|
|
compactMessage = prunedMessage
|
|
} else {
|
|
compactMessage = &ClientMessage{
|
|
MessageId: MessageBlockBroadcastCompact,
|
|
Buffer: compactBlockData,
|
|
}
|
|
}
|
|
} else {
|
|
message = &ClientMessage{
|
|
MessageId: MessageBlockBroadcast,
|
|
Buffer: binary.LittleEndian.AppendUint32(nil, 0),
|
|
}
|
|
prunedMessage, compactMessage = message, message
|
|
}
|
|
|
|
if !s.versionInformation.SupportsFeature(p2pooltypes.FeaturePrunedBroadcast) {
|
|
prunedMessage = message
|
|
}
|
|
|
|
if !s.versionInformation.SupportsFeature(p2pooltypes.FeatureCompactBroadcast) {
|
|
compactMessage = prunedMessage
|
|
}
|
|
|
|
supportsNotify := s.versionInformation.SupportsFeature(p2pooltypes.FeatureBlockNotify)
|
|
|
|
blockTemplateId := block.SideTemplateId(s.Consensus())
|
|
|
|
go func() {
|
|
for _, c := range s.Clients() {
|
|
if c.IsGood() {
|
|
if !func() (sent bool) {
|
|
broadcastedHashes := c.BroadcastedHashes.Slice()
|
|
|
|
// has peer not broadcasted block parent to us?
|
|
if slices.Index(broadcastedHashes, block.Side.Parent) == -1 {
|
|
return false
|
|
}
|
|
// has peer not broadcasted block uncles to us?
|
|
for _, uncleHash := range block.Side.Uncles {
|
|
if slices.Index(broadcastedHashes, uncleHash) == -1 {
|
|
return false
|
|
}
|
|
}
|
|
|
|
// has peer broadcasted this block to us?
|
|
if slices.Index(broadcastedHashes, blockTemplateId) != -1 &&
|
|
supportsNotify && c.VersionInformation.SupportsFeature(p2pooltypes.FeatureBlockNotify) {
|
|
c.SendBlockNotify(blockTemplateId)
|
|
return true
|
|
}
|
|
|
|
if c.VersionInformation.SupportsFeature(p2pooltypes.FeatureCompactBroadcast) {
|
|
c.SendMessage(compactMessage)
|
|
return true
|
|
} else if c.VersionInformation.SupportsFeature(p2pooltypes.FeaturePrunedBroadcast) {
|
|
c.SendMessage(prunedMessage)
|
|
return true
|
|
}
|
|
return false
|
|
}() {
|
|
//fallback
|
|
c.SendMessage(message)
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
func (s *Server) HandshakeConsensusId() types.Hash {
|
|
return s.p2pool.Consensus().Id
|
|
}
|
|
|
|
func (s *Server) Consensus() *sidechain.Consensus {
|
|
return s.p2pool.Consensus()
|
|
}
|