refactored sidechain testdata initialization, web stream request api

This commit is contained in:
DataHoarder 2023-06-02 14:34:49 +02:00
parent d40172d479
commit 8d779e9cbc
Signed by: DataHoarder
SSH key fingerprint: SHA256:OLTRf6Fl87G52SiR7sWLGNzlJt4WOX+tfI2yxo0z7xk
4 changed files with 183 additions and 134 deletions

View file

@ -70,6 +70,42 @@ func getSliceFromAPI[T any](method string, cacheTime ...int) []T {
})
}
func getStreamFromAPI[T any](method string, cacheTime ...int) chan T {
result := make(chan T, 1)
go func() {
defer close(result)
uri, _ := url.Parse(os.Getenv("API_URL") + method)
if response, err := http.DefaultClient.Do(&http.Request{
Method: "GET",
URL: uri,
}); err != nil {
return
} else {
defer response.Body.Close()
defer io.ReadAll(response.Body)
if response.StatusCode == http.StatusOK {
decoder := utils.NewJSONDecoder(response.Body)
for decoder.More() {
var item T
if decoder.Decode(&item) != nil {
return
} else {
result <- item
}
}
}
}
}()
return result
}
func getSideBlocksStreamFromAPI(method string) chan *index.SideBlock {
return getStreamFromAPI[*index.SideBlock](method)
}
func getSideBlocksFromAPI(method string, cacheTime ...int) []*index.SideBlock {
return getSliceFromAPI[*index.SideBlock](method, cacheTime...)
}

View file

@ -0,0 +1,103 @@
package sidechain
import (
"context"
mainblock "git.gammaspectra.live/P2Pool/p2pool-observer/monero/block"
"git.gammaspectra.live/P2Pool/p2pool-observer/monero/client"
p2pooltypes "git.gammaspectra.live/P2Pool/p2pool-observer/p2pool/types"
"git.gammaspectra.live/P2Pool/p2pool-observer/types"
"sync/atomic"
)
type FakeServer struct {
consensus *Consensus
lastHeader atomic.Pointer[mainblock.Header]
}
func (s *FakeServer) Context() context.Context {
return context.Background()
}
func (s *FakeServer) Consensus() *Consensus {
return s.consensus
}
func (s *FakeServer) GetBlob(key []byte) (blob []byte, err error) {
return nil, nil
}
func (s *FakeServer) SetBlob(key, blob []byte) (err error) {
return nil
}
func (s *FakeServer) RemoveBlob(key []byte) (err error) {
return nil
}
func (s *FakeServer) UpdateTip(tip *PoolBlock) {
}
func (s *FakeServer) Broadcast(block *PoolBlock) {
}
func (s *FakeServer) ClientRPC() *client.Client {
return client.GetDefaultClient()
}
func (s *FakeServer) GetChainMainByHeight(height uint64) *ChainMain {
return nil
}
func (s *FakeServer) GetChainMainByHash(hash types.Hash) *ChainMain {
return nil
}
func (s *FakeServer) GetMinimalBlockHeaderByHeight(height uint64) *mainblock.Header {
if h := s.lastHeader.Load(); h != nil && h.Height == height {
return h
}
if h, err := s.ClientRPC().GetBlockHeaderByHeight(height, context.Background()); err != nil {
return nil
} else {
header := &mainblock.Header{
MajorVersion: uint8(h.BlockHeader.MajorVersion),
MinorVersion: uint8(h.BlockHeader.MinorVersion),
Timestamp: uint64(h.BlockHeader.Timestamp),
PreviousId: types.MustHashFromString(h.BlockHeader.PrevHash),
Height: h.BlockHeader.Height,
Nonce: uint32(h.BlockHeader.Nonce),
Reward: h.BlockHeader.Reward,
Difficulty: types.DifficultyFrom64(h.BlockHeader.Difficulty),
Id: types.MustHashFromString(h.BlockHeader.Hash),
}
s.lastHeader.Store(header)
return header
}
}
func (s *FakeServer) GetMinimalBlockHeaderByHash(hash types.Hash) *mainblock.Header {
return nil
}
func (s *FakeServer) GetDifficultyByHeight(height uint64) types.Difficulty {
return s.GetMinimalBlockHeaderByHeight(height).Difficulty
}
func (s *FakeServer) UpdateBlockFound(data *ChainMain, block *PoolBlock) {
}
func (s *FakeServer) SubmitBlock(block *mainblock.Block) {
}
func (s *FakeServer) GetChainMainTip() *ChainMain {
return nil
}
func (s *FakeServer) GetMinerDataTip() *p2pooltypes.MinerData {
return nil
}
func (s *FakeServer) Store(block *PoolBlock) {
}
func (s *FakeServer) ClearCachedBlocks() {
}
func GetFakeTestServer(consensus *Consensus) *FakeServer {
return &FakeServer{
consensus: consensus,
}
}

View file

@ -3,6 +3,7 @@ package sidechain
import (
"bytes"
"context"
"encoding/binary"
"errors"
"fmt"
"git.gammaspectra.live/P2Pool/p2pool-observer/monero"
@ -16,6 +17,7 @@ import (
"git.gammaspectra.live/P2Pool/p2pool-observer/utils"
"git.gammaspectra.live/P2Pool/sha3"
"golang.org/x/exp/slices"
"io"
"log"
"sync"
"sync/atomic"
@ -1038,3 +1040,39 @@ func (c *SideChain) isLongerChain(block, candidate *PoolBlock) (isLonger, isAlte
return c.server.GetChainMainByHash(h)
})
}
func (c *SideChain) LoadTestData(reader io.Reader, patchedBlocks ...[]byte) error {
var err error
buf := make([]byte, PoolBlockMaxTemplateSize)
for {
buf = buf[:0]
var blockLen uint32
if err = binary.Read(reader, binary.LittleEndian, &blockLen); err == io.EOF {
break
} else if err != nil {
return err
}
if _, err = io.ReadFull(reader, buf[:blockLen]); err != nil {
return err
}
b := &PoolBlock{}
if err = b.UnmarshalBinary(c.Consensus(), c.DerivationCache(), buf[:blockLen]); err != nil {
return err
}
if err = c.AddPoolBlock(b); err != nil {
return err
}
}
for _, buf := range patchedBlocks {
b := &PoolBlock{}
if err = b.UnmarshalBinary(c.Consensus(), c.DerivationCache(), buf); err != nil {
return err
}
if err = c.AddPoolBlock(b); err != nil {
return err
}
}
return nil
}

View file

@ -1,18 +1,12 @@
package sidechain
import (
"context"
"encoding/binary"
mainblock "git.gammaspectra.live/P2Pool/p2pool-observer/monero/block"
"git.gammaspectra.live/P2Pool/p2pool-observer/monero/client"
p2pooltypes "git.gammaspectra.live/P2Pool/p2pool-observer/p2pool/types"
"git.gammaspectra.live/P2Pool/p2pool-observer/types"
"io"
"log"
"os"
"path"
"runtime"
"sync/atomic"
"testing"
)
@ -31,36 +25,9 @@ func init() {
}
func testSideChain(s *SideChain, t *testing.T, reader io.Reader, sideHeight, mainHeight uint64, patchedBlocks ...[]byte) {
var err error
buf := make([]byte, PoolBlockMaxTemplateSize)
for {
buf = buf[:0]
var blockLen uint32
if err = binary.Read(reader, binary.LittleEndian, &blockLen); err == io.EOF {
break
} else if err != nil {
t.Fatal(err)
}
if _, err = io.ReadFull(reader, buf[:blockLen]); err != nil {
t.Fatal(err)
}
b := &PoolBlock{}
if err = b.UnmarshalBinary(s.Consensus(), s.DerivationCache(), buf[:blockLen]); err != nil {
t.Fatal(err)
}
if err = s.AddPoolBlock(b); err != nil {
t.Fatal(err)
}
}
for _, buf := range patchedBlocks {
b := &PoolBlock{}
if err = b.UnmarshalBinary(s.Consensus(), s.DerivationCache(), buf); err != nil {
t.Fatal(err)
}
if err = s.AddPoolBlock(b); err != nil {
t.Fatal(err)
}
if err := s.LoadTestData(reader, patchedBlocks...); err != nil {
t.Fatal(err)
}
tip := s.GetChainTip()
@ -104,9 +71,7 @@ func testSideChain(s *SideChain, t *testing.T, reader io.Reader, sideHeight, mai
func TestSideChainDefault(t *testing.T) {
s := NewSideChain(&fakeServer{
consensus: ConsensusDefault,
})
s := NewSideChain(GetFakeTestServer(ConsensusDefault))
f, err := os.Open("testdata/sidechain_dump.dat")
if err != nil {
@ -119,9 +84,7 @@ func TestSideChainDefault(t *testing.T) {
func TestSideChainDefaultPreFork(t *testing.T) {
s := NewSideChain(&fakeServer{
consensus: ConsensusDefault,
})
s := NewSideChain(GetFakeTestServer(ConsensusDefault))
f, err := os.Open("testdata/old_sidechain_dump.dat")
if err != nil {
@ -134,9 +97,7 @@ func TestSideChainDefaultPreFork(t *testing.T) {
func TestSideChainMini(t *testing.T) {
s := NewSideChain(&fakeServer{
consensus: ConsensusMini,
})
s := NewSideChain(GetFakeTestServer(ConsensusMini))
f, err := os.Open("testdata/sidechain_dump_mini.dat")
if err != nil {
@ -149,9 +110,7 @@ func TestSideChainMini(t *testing.T) {
func TestSideChainMiniPreFork(t *testing.T) {
s := NewSideChain(&fakeServer{
consensus: ConsensusMini,
})
s := NewSideChain(GetFakeTestServer(ConsensusMini))
f, err := os.Open("testdata/old_sidechain_dump_mini.dat")
if err != nil {
@ -172,90 +131,3 @@ func TestSideChainMiniPreFork(t *testing.T) {
testSideChain(s, t, f, 2424349, 2696040, block2420028, block2420027)
}
type fakeServer struct {
consensus *Consensus
lastHeader atomic.Pointer[mainblock.Header]
}
func (s *fakeServer) Context() context.Context {
return context.Background()
}
func (s *fakeServer) Consensus() *Consensus {
return s.consensus
}
func (s *fakeServer) GetBlob(key []byte) (blob []byte, err error) {
return nil, nil
}
func (s *fakeServer) SetBlob(key, blob []byte) (err error) {
return nil
}
func (s *fakeServer) RemoveBlob(key []byte) (err error) {
return nil
}
func (s *fakeServer) UpdateTip(tip *PoolBlock) {
}
func (s *fakeServer) Broadcast(block *PoolBlock) {
}
func (s *fakeServer) ClientRPC() *client.Client {
return client.GetDefaultClient()
}
func (s *fakeServer) GetChainMainByHeight(height uint64) *ChainMain {
return nil
}
func (s *fakeServer) GetChainMainByHash(hash types.Hash) *ChainMain {
return nil
}
func (s *fakeServer) GetMinimalBlockHeaderByHeight(height uint64) *mainblock.Header {
if h := s.lastHeader.Load(); h != nil && h.Height == height {
return h
}
if h, err := s.ClientRPC().GetBlockHeaderByHeight(height, context.Background()); err != nil {
return nil
} else {
header := &mainblock.Header{
MajorVersion: uint8(h.BlockHeader.MajorVersion),
MinorVersion: uint8(h.BlockHeader.MinorVersion),
Timestamp: uint64(h.BlockHeader.Timestamp),
PreviousId: types.MustHashFromString(h.BlockHeader.PrevHash),
Height: h.BlockHeader.Height,
Nonce: uint32(h.BlockHeader.Nonce),
Reward: h.BlockHeader.Reward,
Difficulty: types.DifficultyFrom64(h.BlockHeader.Difficulty),
Id: types.MustHashFromString(h.BlockHeader.Hash),
}
s.lastHeader.Store(header)
return header
}
}
func (s *fakeServer) GetMinimalBlockHeaderByHash(hash types.Hash) *mainblock.Header {
return nil
}
func (s *fakeServer) GetDifficultyByHeight(height uint64) types.Difficulty {
return s.GetMinimalBlockHeaderByHeight(height).Difficulty
}
func (s *fakeServer) UpdateBlockFound(data *ChainMain, block *PoolBlock) {
}
func (s *fakeServer) SubmitBlock(block *mainblock.Block) {
}
func (s *fakeServer) GetChainMainTip() *ChainMain {
return nil
}
func (s *fakeServer) GetMinerDataTip() *p2pooltypes.MinerData {
return nil
}
func (s *fakeServer) Store(block *PoolBlock) {
}
func (s *fakeServer) ClearCachedBlocks() {
}