Added subscription service

This commit is contained in:
DataHoarder 2023-04-25 08:34:17 +02:00
parent 840d0c8f3a
commit cafa2e5559
Signed by: DataHoarder
SSH key fingerprint: SHA256:OLTRf6Fl87G52SiR7sWLGNzlJt4WOX+tfI2yxo0z7xk
11 changed files with 835 additions and 160 deletions

3
.gitignore vendored
View file

@ -1,3 +1,4 @@
.idea
.env
docker-compose.override.yml
docker-compose.override.yml
subscriptions.db

117
api.go Normal file
View file

@ -0,0 +1,117 @@
package main
import (
"encoding/json"
"fmt"
"git.gammaspectra.live/P2Pool/p2pool-observer/index"
"golang.org/x/exp/slices"
"io"
"net/http"
"net/url"
"strings"
"time"
)
func getFromAPI(host, method string) any {
uri, _ := url.Parse(host + method)
if response, err := http.DefaultClient.Do(&http.Request{
Method: "GET",
URL: uri,
}); err != nil {
return nil
} else {
defer response.Body.Close()
if response.StatusCode == http.StatusOK {
if strings.Index(response.Header.Get("content-type"), "/json") != -1 {
var result any
decoder := json.NewDecoder(response.Body)
decoder.UseNumber()
err = decoder.Decode(&result)
return result
} else if data, err := io.ReadAll(response.Body); err != nil {
return nil
} else {
return data
}
} else {
return nil
}
}
}
func getTypeFromAPI[T any](host, method string) *T {
uri, _ := url.Parse(host + method)
if response, err := http.DefaultClient.Do(&http.Request{
Method: "GET",
URL: uri,
}); err != nil {
return nil
} else {
defer response.Body.Close()
if response.StatusCode == http.StatusOK {
var result T
if data, err := io.ReadAll(response.Body); err != nil {
return nil
} else if json.Unmarshal(data, &result) != nil {
return nil
} else {
return &result
}
} else {
return nil
}
}
}
func getSideBlocksFromAPI(host, method string) []*index.SideBlock {
return getSliceFromAPI[*index.SideBlock](host, method)
}
func getSliceFromAPI[T any](host, method string) []T {
uri, _ := url.Parse(host + method)
if response, err := http.DefaultClient.Do(&http.Request{
Method: "GET",
URL: uri,
}); err != nil {
return nil
} else {
defer response.Body.Close()
if response.StatusCode == http.StatusOK {
var result []T
if data, err := io.ReadAll(response.Body); err != nil {
return nil
} else if json.Unmarshal(data, &result) != nil {
return nil
} else {
return result
}
} else {
return nil
}
}
}
func getPoolInfo(host string) map[string]any {
var basePoolInfo map[string]any
var ok bool
for {
d := getFromAPI(host, "/api/pool_info")
basePoolInfo, ok = d.(map[string]any)
if d == nil || !ok || basePoolInfo == nil || len(basePoolInfo) == 0 {
time.Sleep(5)
continue
}
break
}
return basePoolInfo
}
func getPreviousBlocks(host string) (result foundBlocks) {
result = getSliceFromAPI[*index.FoundBlock](host, fmt.Sprintf("/api/found_blocks?limit=%d", numberOfBlockHistoryToKeep))
//sort from oldest to newest
slices.SortFunc(result, func(a, b *index.FoundBlock) bool {
return a.MainBlock.Height < b.MainBlock.Height
})
return result
}

325
bot.go
View file

@ -3,7 +3,6 @@ package main
import (
"context"
"encoding/json"
"errors"
"flag"
"fmt"
utils2 "git.gammaspectra.live/P2Pool/p2pool-observer/cmd/utils"
@ -18,12 +17,12 @@ import (
"gopkg.in/sorcix/irc.v2"
"io"
"log"
"math"
"net/http"
"net/url"
"nhooyr.io/websocket"
"nhooyr.io/websocket/wsjson"
"os"
"regexp"
"strings"
"sync"
"time"
@ -64,15 +63,6 @@ const FormatItalic = "\x1D"
const FormatUnderline = "\x1F"
const FormatReset = "\x0F"
var guestUserRegex = regexp.MustCompile("^Guest[0-9]+_*$")
func isNickAllowed(nick string) error {
if guestUserRegex.MatchString(nick) {
return errors.New("guest user is not allowed")
}
return nil
}
func main() {
ircHost := flag.String("irc-host", "irc.libera.chat", "")
ircPort := flag.Uint("irc-port", 6697, "")
@ -84,106 +74,15 @@ func main() {
botPassword := os.Getenv("BOT_PASSWORD")
pleromaCookie := os.Getenv("PLEROMA_COOKIE")
dbPath := flag.String("db", "", "Path of database")
botChannels := flag.String("channels", "", "A list of #CHANNEL,NAME,API_ENDPOINT separated by; for example: #p2pool-main,Main,https://p2pool.observer;#p2pool-mini,Mini,https://mini.p2pool.observer")
flag.Parse()
type channelEntry struct {
ApiEndpoint string
Channel string
Name string
PreviousBlocks foundBlocks
LastWindowMainDifficulty struct {
Height uint64
Difficulty uint64
}
PoolInfo map[string]any
Consensus *sidechain.Consensus
Window map[types.Hash]*index.SideBlock
Ws *websocket.Conn
}
const numberOfBlockHistoryToKeep = 10
getPoolInfo := func(host string) map[string]any {
var basePoolInfo map[string]any
for {
func() {
uri, _ := url.Parse(host + "/api/pool_info")
if response, err := http.DefaultClient.Do(&http.Request{
Method: "GET",
URL: uri,
}); err != nil {
return
} else {
defer response.Body.Close()
if response.StatusCode == http.StatusOK {
if strings.Index(response.Header.Get("content-type"), "/json") != -1 {
decoder := json.NewDecoder(response.Body)
decoder.UseNumber()
err = decoder.Decode(&basePoolInfo)
return
}
}
return
}
}()
if basePoolInfo == nil || len(basePoolInfo) == 0 {
time.Sleep(5)
continue
}
break
}
return basePoolInfo
}
getPreviousBlocks := func(host string) (result foundBlocks) {
if host == "" {
return nil
}
response, err := http.DefaultClient.Get(fmt.Sprintf("%s/api/found_blocks?limit=%d", host, numberOfBlockHistoryToKeep))
if err != nil {
return nil
}
defer response.Body.Close()
if buf, err := io.ReadAll(response.Body); err != nil {
return nil
} else {
if err = json.Unmarshal(buf, &result); err != nil {
return nil
} else {
//sort from oldest to newest
slices.SortFunc(result, func(a, b *index.FoundBlock) bool {
return a.MainBlock.Height < b.MainBlock.Height
})
return result
}
}
}
openWebsocket := func(host string) *websocket.Conn {
if host == "" {
return nil
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
defer cancel()
u, _ := url.Parse(host)
if u.Scheme == "https" {
c, _, err := websocket.Dial(ctx, fmt.Sprintf("wss://%s/api/events", u.Host), nil)
if err != nil {
return nil
}
return c
} else {
c, _, err := websocket.Dial(ctx, fmt.Sprintf("ws://%s/api/events", u.Host), nil)
if err != nil {
return nil
}
return c
}
db, err := NewDB(*dbPath)
if err != nil {
log.Panic(err)
}
doPleromaPost := func(e *channelEntry, message string) {
@ -239,6 +138,8 @@ func main() {
continue
}
log.Printf("Creating entry for %s", split[2])
basePoolInfo := getPoolInfo(split[2])
consensusData, _ := json.Marshal(basePoolInfo["sidechain"].(map[string]any)["consensus"].(map[string]any))
@ -247,15 +148,24 @@ func main() {
log.Panic(err)
}
channelEntries = append(channelEntries, &channelEntry{
entry := &channelEntry{
ApiEndpoint: split[2],
Channel: split[0],
Name: split[1],
Consensus: consensus,
PreviousBlocks: getPreviousBlocks(split[2]),
Ws: openWebsocket(split[2]),
Window: make(map[types.Hash]*index.SideBlock),
})
}
for _, b := range getSideBlocksFromAPI(entry.ApiEndpoint, fmt.Sprintf("/api/side_blocks_in_window?window=%d&noMainStatus", entry.DesiredPruneDistance())) {
entry.Window[b.TemplateId] = b
if entry.Tip == nil || entry.Tip.SideHeight < b.SideHeight {
entry.Tip = b
}
}
//websocket needs to come after
entry.openWebSocket()
channelEntries = append(channelEntries, entry)
}
@ -265,6 +175,7 @@ func main() {
var beforeJoiningChannels sync.Once
if bot, err := hbot.NewBot(fmt.Sprintf("%s:%d", *ircHost, *ircPort), *botNickName, func(bot *hbot.Bot) {
bot.ThrottleDelay = time.Millisecond * 100
bot.Nick = *botNickName
bot.Realname = *botUserName
bot.SSL = *ircSsl
@ -311,7 +222,7 @@ func main() {
}) {
onlineUsersLock.Lock()
defer onlineUsersLock.Unlock()
for _, nick := range strings.Split(message.Params[3], " ") {
for _, nick := range strings.Split(strings.ToLower(message.Params[3]), " ") {
if !slices.Contains(onlineUsers, nick) {
onlineUsers = append(onlineUsers, strings.TrimLeft(nick, "@~%+"))
}
@ -332,8 +243,8 @@ func main() {
if len(message.Params) != 1 {
return false
}
from := message.From
to := message.To
from := strings.ToLower(message.From)
to := strings.ToLower(message.To)
onlineUsersLock.Lock()
defer onlineUsersLock.Unlock()
if i := slices.Index(onlineUsers, from); i != -1 {
@ -356,7 +267,7 @@ func main() {
if len(message.Params) != 1 {
return false
}
nick := message.From
nick := strings.ToLower(message.From)
channelName := message.To
onlineUsersLock.Lock()
defer onlineUsersLock.Unlock()
@ -375,7 +286,7 @@ func main() {
onlineUsers = append(onlineUsers, nick)
}
} else if message.Command == irc.QUIT {
nick := message.From
nick := strings.ToLower(message.From)
onlineUsersLock.Lock()
defer onlineUsersLock.Unlock()
if i := slices.Index(onlineUsers, nick); i != -1 {
@ -395,33 +306,80 @@ func main() {
// see about irc.ERR_NICKNAMEINUSE or irc.ERR_NICKCOLLISION to recover nick
shareFound := func(e *channelEntry, b *index.SideBlock, target string) {
uHeight := (b.SideHeight << 16) | (uint64(b.MainId[0]) << 8) | uint64(b.MainId[1])
payoutFound := func(e *channelEntry, b *index.FoundBlock, o *index.MainCoinbaseOutput, sub *Subscription) {
uHeight := (b.SideHeight << 16) | (uint64(b.MainBlock.Id[0]) << 8) | uint64(b.MainBlock.Id[1])
if b.UncleOf != types.ZeroHash {
bot.Msg(target, fmt.Sprintf(
"%sUNCLE SHARE FOUND%s on %s: Side height %s%d%s, Parent Side height %s%d%s :: %s/s/%s :: Accounted for %d%% of value :: Template Id %s%s%s :: Miner Address %s%s",
payoutIndex := (b.SideHeight << uint64(math.Ceil(math.Log2(float64(e.Consensus.ChainWindowSize*4))))) | uint64(o.Index)
bot.Msg(sub.Nick, fmt.Sprintf(
"%sPAYOUT%s on %s: %s%s%s XMR%s to %s%s%s:: Main height %s%d%s, Side height %d :: %s/s/%s :: Verify payout %s/p/%s",
FormatColorLightGreen+FormatBold, FormatReset, e.Name,
FormatColorOrange, FormatBold, utils.XMRUnits(o.Value), FormatReset,
FormatItalic, utils.Shorten(o.MinerAddress.ToBase58(), 10), FormatReset,
FormatColorRed, b.MainBlock.Height, FormatReset,
b.SideHeight,
e.ApiEndpoint, utils.EncodeBinaryNumber(uHeight),
e.ApiEndpoint, utils.EncodeBinaryNumber(payoutIndex),
))
}
shareFound := func(e *channelEntry, tip *index.SideBlock, sub *Subscription) {
uHeight := (tip.SideHeight << 16) | (uint64(tip.MainId[0]) << 8) | uint64(tip.MainId[1])
shareCount := 0
uncleCount := 0
var yourWeight types.Difficulty
var totalWeight types.Difficulty
for range index.IterateSideBlocksInPPLNSWindow(tip, e.Consensus, e.DifficultyFromHeight, e.SideBlockByTemplateId, e.SideBlockUnclesByTemplateId, func(b *index.SideBlock, weight types.Difficulty) {
totalWeight = totalWeight.Add(weight)
if tip.Miner == b.Miner {
if b.IsUncle() {
uncleCount++
} else {
shareCount++
}
yourWeight = yourWeight.Add(weight)
}
}, func(err error) {
}) {
}
shareRatio := float64(yourWeight.Lo) / float64(totalWeight.Lo)
if shareRatio > notificationPoolShare { //disable spammy notifications
return
}
if tip.UncleOf != types.ZeroHash {
bot.Msg(sub.Nick, fmt.Sprintf(
"%sUNCLE SHARE FOUND%s on %s: Side height %s%d%s, Parent Side height %s%d%s :: %s/s/%s :: Accounted for %d%% of value :: Template Id %s%s%s :: Window shares %d (+%d uncles) ~%.03f%% :: Miner Address %s%s",
FormatColorLightGreen+FormatBold, FormatReset, e.Name,
FormatColorRed, b.SideHeight, FormatReset,
FormatColorRed, b.EffectiveHeight, FormatReset,
FormatColorRed, tip.SideHeight, FormatReset,
FormatColorRed, tip.EffectiveHeight, FormatReset,
e.ApiEndpoint, utils.EncodeBinaryNumber(uHeight),
100-e.Consensus.UnclePenalty,
FormatItalic, utils.Shorten(b.TemplateId.String(), 8), FormatReset,
FormatItalic, utils.Shorten(b.MinerAddress.ToBase58(), 10),
FormatItalic, utils.Shorten(tip.TemplateId.String(), 8), FormatReset,
shareCount, uncleCount, shareRatio*100,
FormatItalic, utils.Shorten(tip.MinerAddress.ToBase58(), 10),
))
} else {
uncleText := ""
if len(b.Uncles) > 0 {
uncleText = fmt.Sprintf(":: Includes %d uncle(s) for extra %d%% of their value ", len(b.Uncles), e.Consensus.UnclePenalty)
if len(tip.Uncles) > 0 {
uncleText = fmt.Sprintf(":: Includes %d uncle(s) for extra %d%% of their value ", len(tip.Uncles), e.Consensus.UnclePenalty)
}
bot.Msg(target, fmt.Sprintf(
"%sSHARE FOUND%s on %s: Side height %s%d%s :: %s/s/%s %s:: Template Id %s%s%s :: Miner Address %s%s",
bot.Msg(sub.Nick, fmt.Sprintf(
"%sSHARE FOUND%s on %s: Side height %s%d%s :: %s/s/%s %s:: Template Id %s%s%s :: Window shares %d (+%d uncles) ~%.03f%% :: Miner Address %s%s",
FormatColorLightGreen+FormatBold, FormatReset, e.Name,
FormatColorRed, b.SideHeight, FormatReset,
FormatColorRed, tip.SideHeight, FormatReset,
e.ApiEndpoint, utils.EncodeBinaryNumber(uHeight),
uncleText,
FormatItalic, utils.Shorten(b.TemplateId.String(), 8), FormatReset,
FormatItalic, utils.Shorten(b.MinerAddress.ToBase58(), 10),
FormatItalic, utils.Shorten(tip.TemplateId.String(), 8), FormatReset,
shareCount, uncleCount, shareRatio*100,
FormatItalic, utils.Shorten(tip.MinerAddress.ToBase58(), 10),
))
}
}
@ -492,10 +450,30 @@ func main() {
if len(trimMessage) <= 0 || trimMessage[0] != '.' {
return false
}
return message.To == bot.Nick
if message.To == bot.Nick {
return true
}
for _, e := range channelEntries {
if message.To == e.Channel {
return true
}
}
return false
},
Action: func(bot *hbot.Bot, message *hbot.Message) bool {
bot.Msg(message.Name, "Bot notifications currently out of service")
replyTo := message.To
if replyTo == bot.Nick || replyTo[0] != '#' {
replyTo = message.Name
}
for _, c := range commands {
if matches := c.Match.FindStringSubmatch(message.Content); len(matches) > 0 {
if c.Handle(db, channelEntries, bot, message, replyTo, matches...) {
return true
}
}
}
bot.Msg(replyTo, "Command not recognized")
return true
},
})
@ -510,38 +488,79 @@ func main() {
if e.Ws == nil {
log.Printf("[WS] WebSocket for %s is disconnected, retrying", e.ApiEndpoint)
for ; e.Ws == nil; e.Ws = openWebsocket(e.ApiEndpoint) {
for ; e.Ws == nil; e.openWebSocket() {
time.Sleep(time.Second * 30)
log.Printf("[WS] WebSocket for %s is disconnected, retrying again", e.ApiEndpoint)
}
}
for {
var event utils2.JSONEvent
if err := wsjson.Read(context.Background(), e.Ws, &event); err != nil {
if err := func() error {
//there should be at least one share every five minutes, otherwise we are out of sync
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*5)
defer cancel()
return wsjson.Read(ctx, e.Ws, &event)
}(); err != nil {
e.Ws.Close(websocket.StatusGoingAway, "error on read "+err.Error())
e.Ws = nil
break
} else {
switch event.Type {
case utils2.JSONEventSideBlock:
//shareFound(e, event.SideBlock, "DataHoarder")
case utils2.JSONEventFoundBlock:
e.PreviousBlocks = append(e.PreviousBlocks, event.FoundBlock)
blockFound(e, event.FoundBlock, e.PreviousBlocks.GetPrevious(event.FoundBlock))
if len(e.PreviousBlocks) > numberOfBlockHistoryToKeep {
//delete oldest block
e.PreviousBlocks = slices.Delete(e.PreviousBlocks, 0, 1)
func() {
e.ChainLock.Lock()
defer e.ChainLock.Unlock()
switch event.Type {
case utils2.JSONEventSideBlock:
b := event.SideBlock
e.Window[b.TemplateId] = b
if !b.IsUncle() {
e.Tip = b
}
onlineUsersLock.RLock()
defer onlineUsersLock.RUnlock()
for _, sub := range db.GetByAddress(b.MinerAddress) {
if slices.Contains(onlineUsers, strings.ToLower(sub.Nick)) {
//do not send notification if user is not online
shareFound(e, b, sub)
} else {
log.Printf("Could not send notification to %s - %s: not online", sub.Nick, sub.Address.ToBase58())
}
}
e.pruneBlocks()
case utils2.JSONEventFoundBlock:
e.PreviousBlocks = append(e.PreviousBlocks, event.FoundBlock)
blockFound(e, event.FoundBlock, e.PreviousBlocks.GetPrevious(event.FoundBlock))
onlineUsersLock.RLock()
defer onlineUsersLock.RUnlock()
for _, o := range event.MainCoinbaseOutputs {
for _, sub := range db.GetByAddress(o.MinerAddress) {
if slices.Contains(onlineUsers, strings.ToLower(sub.Nick)) {
//do not send notification if user is not online
payoutFound(e, event.FoundBlock, &o, sub)
} else {
log.Printf("Could not send notification to %s - %s: not online", sub.Nick, sub.Address.ToBase58())
}
}
}
if len(e.PreviousBlocks) > numberOfBlockHistoryToKeep {
//delete oldest block
e.PreviousBlocks = slices.Delete(e.PreviousBlocks, 0, 1)
}
case utils2.JSONEventOrphanedBlock:
if i := slices.IndexFunc(e.PreviousBlocks, func(block *index.FoundBlock) bool {
return event.SideBlock.MainId == block.MainBlock.Id
}); i != -1 {
//only notify if we reported it previously
slices.Delete(e.PreviousBlocks, i, i+1)
blockOrphaned(e, event.SideBlock)
}
}
case utils2.JSONEventOrphanedBlock:
if i := slices.IndexFunc(e.PreviousBlocks, func(block *index.FoundBlock) bool {
return event.SideBlock.MainId == block.MainBlock.Id
}); i != -1 {
//only notify if we reported it previously
slices.Delete(e.PreviousBlocks, i, i+1)
blockOrphaned(e, event.SideBlock)
}
}
}()
}
}
}(e)

180
commands.go Normal file
View file

@ -0,0 +1,180 @@
package main
import (
"errors"
"fmt"
"git.gammaspectra.live/P2Pool/p2pool-observer/index"
"git.gammaspectra.live/P2Pool/p2pool-observer/monero/address"
"git.gammaspectra.live/P2Pool/p2pool-observer/p2pool/sidechain"
"git.gammaspectra.live/P2Pool/p2pool-observer/types"
"git.gammaspectra.live/P2Pool/p2pool-observer/utils"
hbot "github.com/whyrusleeping/hellabot"
"regexp"
)
type command struct {
Match *regexp.Regexp
Handle func(db *DB, entries []*channelEntry, bot *hbot.Bot, message *hbot.Message, replyTo string, matches ...string) bool
}
var guestUserRegex = regexp.MustCompile("^Guest[0-9]+_*$")
func isNickAllowed(nick string) error {
if guestUserRegex.MatchString(nick) {
return errors.New("guest user is not allowed")
}
return nil
}
var commands = []command{
{
Match: regexp.MustCompile("^\\.(status|shares)[ \\t]*"),
Handle: func(db *DB, entries []*channelEntry, bot *hbot.Bot, message *hbot.Message, replyTo string, matches ...string) bool {
subs := db.GetByNick(message.Name)
if len(subs) == 0 {
bot.Msg(replyTo, "No known subscriptions to your nick.")
return true
}
type result struct {
Endpoint string
ShareCount int
UncleCount int
YourWeight types.Difficulty
TotalWeight types.Difficulty
Tip *index.SideBlock
Address *address.Address
MinerId uint64
SharesPosition *PositionChart
UnclesPosition *PositionChart
Consensus *sidechain.Consensus
}
var hasResults bool
var results []*result
for i := range entries {
e := entries[i]
if e.ApiEndpoint != "" && (message.To == bot.Nick || e.Channel == message.To) {
func() {
e.ChainLock.RLock()
defer e.ChainLock.RUnlock()
var tr []*result
for _, sub := range subs {
var r result
r.Tip = e.Tip
r.Endpoint = e.ApiEndpoint
r.Address = sub.Address
r.Consensus = e.Consensus
r.SharesPosition = NewPositionChart(30, uint64(e.Tip.WindowDepth))
r.UnclesPosition = NewPositionChart(30, uint64(e.Tip.WindowDepth))
tr = append(tr, &r)
}
for range index.IterateSideBlocksInPPLNSWindow(e.Tip, e.Consensus, e.DifficultyFromHeight, e.SideBlockByTemplateId, e.SideBlockUnclesByTemplateId, func(b *index.SideBlock, weight types.Difficulty) {
for _, r := range tr {
r.TotalWeight = r.TotalWeight.Add(weight)
if b.MinerAddress.Compare(r.Address) == 0 {
r.MinerId = b.Miner
r.YourWeight = r.YourWeight.Add(weight)
if b.IsUncle() {
r.UnclesPosition.Add(int(e.Tip.SideHeight-b.SideHeight), 1)
r.UncleCount++
} else {
r.SharesPosition.Add(int(e.Tip.SideHeight-b.SideHeight), 1)
r.ShareCount++
}
}
}
}, func(err error) {
}) {
}
for _, r := range tr {
if r.ShareCount > 0 || r.UncleCount > 0 {
results = append(results, r)
hasResults = true
}
}
}()
}
}
if !hasResults {
bot.Msg(replyTo, "You do not currently have any shares within the PPLNS window across the tracked pools.")
} else {
for _, r := range results {
ratio := float64(r.YourWeight.Lo) / float64(r.TotalWeight.Lo)
bot.Msg(replyTo, fmt.Sprintf(
"Your shares %d (+%d uncles) ~%.03f%% %sH/s :: Miner Statistics %s/m/%s :: Shares/Uncles position %s %s",
r.ShareCount,
r.UncleCount,
ratio*100,
utils.SiUnits(ratio*float64(types.DifficultyFrom64(r.Tip.Difficulty).Div64(r.Consensus.TargetBlockTime).Lo), 3),
r.Endpoint,
utils.EncodeBinaryNumber(r.MinerId),
r.SharesPosition.String(),
r.UnclesPosition.String(),
))
}
}
return true
},
},
{
Match: regexp.MustCompile("^\\.(sub|subscribe)[ \\t]+(4[123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz]+)[ \\t]*"),
Handle: func(db *DB, entries []*channelEntry, bot *hbot.Bot, message *hbot.Message, replyTo string, matches ...string) bool {
if err := isNickAllowed(message.Name); err != nil {
bot.Msg(replyTo, fmt.Sprintf("Cannot subscribe: %s", err))
return true
}
addr := address.FromBase58(matches[2])
if addr == nil {
bot.Msg(replyTo, "Cannot subscribe: Invalid Monero address")
return true
}
if err := db.Store(&Subscription{
Address: addr,
Nick: message.Name,
}); err == nil {
bot.Msg(replyTo, fmt.Sprintf("Subscribed your nick to shares found by %s%s%s while you are online. You can private message this bot for any commands instead of using public channels.", FormatItalic, utils.Shorten(addr.ToBase58(), 10), FormatReset))
for _, e := range entries {
func() {
e.ChainLock.RLock()
defer e.ChainLock.RUnlock()
var yourWeight types.Difficulty
var totalWeight types.Difficulty
for range index.IterateSideBlocksInPPLNSWindow(e.Tip, e.Consensus, e.DifficultyFromHeight, e.SideBlockByTemplateId, e.SideBlockUnclesByTemplateId, func(b *index.SideBlock, weight types.Difficulty) {
totalWeight = totalWeight.Add(weight)
if b.MinerAddress.Compare(addr) == 0 {
yourWeight = yourWeight.Add(weight)
}
}, func(err error) {
}) {
}
shareRatio := float64(yourWeight.Lo) / float64(totalWeight.Lo)
if shareRatio > notificationPoolShare { //warn about spammy notifications
bot.Msg(replyTo, fmt.Sprintf("You have more than %.01f%% of the %s pool total share weight (%s) with %.03f%%. Share notifications will not be sent above this threshold. Consider using the /api/events interface directly.", notificationPoolShare*100, e.Name, e.ApiEndpoint, shareRatio*100))
}
}()
}
return true
} else {
bot.Msg(replyTo, fmt.Sprintf("Cannot subscribe: %s", err))
return true
}
},
},
}

5
constants.go Normal file
View file

@ -0,0 +1,5 @@
package main
const numberOfBlockHistoryToKeep = 10
const notificationPoolShare = 0.2

129
db.go Normal file
View file

@ -0,0 +1,129 @@
package main
import (
"encoding/json"
"git.gammaspectra.live/P2Pool/p2pool-observer/monero/address"
bolt "go.etcd.io/bbolt"
"golang.org/x/exp/slices"
"log"
"strings"
"time"
)
type Subscription struct {
Address *address.Address `json:"address"`
Nick string `json:"nick"`
}
type DB struct {
db *bolt.DB
}
var refByNick = []byte("refByNick")
var refByAddr = []byte("refByAddr")
func NewDB(path string) (*DB, error) {
if db, err := bolt.Open(path, 0666, &bolt.Options{Timeout: time.Second * 5}); err != nil {
return nil, err
} else {
if err = db.Update(func(tx *bolt.Tx) error {
if _, err := tx.CreateBucketIfNotExists(refByNick); err != nil {
return err
} else if _, err := tx.CreateBucketIfNotExists(refByAddr); err != nil {
return err
}
return nil
}); err != nil {
return nil, err
}
return &DB{
db: db,
}, nil
}
}
func (db *DB) GetByNick(nick string) (result []*Subscription) {
_ = db.db.View(func(tx *bolt.Tx) error {
b := tx.Bucket(refByNick)
if buf := b.Get([]byte(strings.ToLower(nick))); buf != nil {
var addrs []*address.Address
if err := json.Unmarshal(buf, &addrs); err != nil {
return err
}
for _, a := range addrs {
result = append(result, &Subscription{
Address: a,
Nick: nick,
})
}
}
return nil
})
return result
}
func (db *DB) GetByAddress(a *address.Address) (result []*Subscription) {
_ = db.db.View(func(tx *bolt.Tx) error {
b := tx.Bucket(refByAddr)
if buf := b.Get([]byte(a.ToBase58())); buf != nil {
var nicks []string
if err := json.Unmarshal(buf, &nicks); err != nil {
return err
}
for _, nick := range nicks {
result = append(result, &Subscription{
Address: a,
Nick: nick,
})
}
}
return nil
})
return result
}
func (db *DB) Store(sub *Subscription) error {
if err := db.db.Update(func(tx *bolt.Tx) error {
b1 := tx.Bucket(refByAddr)
var nicks []string
if k := b1.Get([]byte(sub.Address.ToBase58())); k != nil {
if err := json.Unmarshal(k, &nicks); err != nil {
return err
}
}
if !slices.Contains(nicks, strings.ToLower(sub.Nick)) {
nicks = append(nicks, strings.ToLower(sub.Nick))
}
if buf, err := json.Marshal(nicks); err != nil {
return err
} else if err = b1.Put([]byte(sub.Address.ToBase58()), buf); err != nil {
return err
}
b2 := tx.Bucket(refByNick)
var addresses []*address.Address
if k := b2.Get([]byte(strings.ToLower(sub.Nick))); k != nil {
if err := json.Unmarshal(k, &addresses); err != nil {
return err
}
}
if !slices.ContainsFunc(addresses, func(a *address.Address) bool {
return a.Compare(sub.Address) == 0
}) {
addresses = append(addresses, sub.Address)
}
if buf, err := json.Marshal(addresses); err != nil {
return err
} else if err = b2.Put([]byte(strings.ToLower(sub.Nick)), buf); err != nil {
return err
}
return nil
}); err != nil {
log.Printf("[DB] bolt error: %s", err)
return err
}
return nil
}

View file

@ -20,10 +20,13 @@ services:
- PLEROMA_COOKIE=${PLEROMA_COOKIE}
security_opt:
- no-new-privileges:true
volumes:
- data:/data:rw
networks:
- p2pool-observer-bot
command: >-
/usr/bin/bot
-db /data/subscriptions.db
-irc-host "${IRC_HOST}"
-irc-port "${IRC_PORT}"
-bot-nick "${BOT_NICK}"

118
entry.go Normal file
View file

@ -0,0 +1,118 @@
package main
import (
"context"
"fmt"
"git.gammaspectra.live/P2Pool/p2pool-observer/index"
"git.gammaspectra.live/P2Pool/p2pool-observer/p2pool/sidechain"
"git.gammaspectra.live/P2Pool/p2pool-observer/types"
"golang.org/x/exp/slices"
"net/url"
"nhooyr.io/websocket"
"sync"
"time"
)
type channelEntry struct {
ApiEndpoint string
Channel string
Name string
PreviousBlocks foundBlocks
LastWindowMainDifficulty struct {
Height uint64
Difficulty uint64
}
PoolInfo map[string]any
Consensus *sidechain.Consensus
Tip *index.SideBlock
Window map[types.Hash]*index.SideBlock
ChainLock sync.RWMutex
Ws *websocket.Conn
}
func (c *channelEntry) DesiredPruneDistance() uint64 {
//prune after a whole day
/*pruneDistance := (3600 * 24) / c.Consensus.TargetBlockTime
if pruneDistance < c.Consensus.ChainWindowSize {
pruneDistance = c.Consensus.ChainWindowSize
}*/
return c.Consensus.ChainWindowSize
}
func (c *channelEntry) SideBlockUnclesByTemplateId(id types.Hash) chan *index.SideBlock {
result := make(chan *index.SideBlock)
go func() {
defer close(result)
if b := c.SideBlockByTemplateId(id); b != nil && !b.IsUncle() {
for _, u := range b.Uncles {
if uncle := c.SideBlockByTemplateId(u.TemplateId); uncle != nil {
result <- uncle
}
}
}
}()
return result
}
func (c *channelEntry) SideBlockByTemplateId(id types.Hash) *index.SideBlock {
if b, ok := c.Window[id]; ok {
return b
} else if b = getTypeFromAPI[index.SideBlock](c.ApiEndpoint, "/api/block_by_id/"+id.String()); b != nil {
c.Window[b.TemplateId] = b
}
return nil
}
func (c *channelEntry) DifficultyFromHeight(height uint64) types.Difficulty {
if d := getTypeFromAPI[types.Difficulty](c.ApiEndpoint, fmt.Sprintf("/api/main_difficulty_by_height/%d", height)); d != nil {
return *d
}
return types.ZeroDifficulty
}
func (c *channelEntry) openWebSocket() {
if c.ApiEndpoint == "" {
c.Ws = nil
return
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
defer cancel()
u, _ := url.Parse(c.ApiEndpoint)
if u.Scheme == "https" {
conn, _, err := websocket.Dial(ctx, fmt.Sprintf("wss://%s/api/events", u.Host), nil)
if err != nil {
c.Ws = nil
return
}
c.Ws = conn
} else {
conn, _, err := websocket.Dial(ctx, fmt.Sprintf("ws://%s/api/events", u.Host), nil)
if err != nil {
c.Ws = nil
return
}
c.Ws = conn
}
}
func (c *channelEntry) pruneBlocks() {
pruneDistance := c.DesiredPruneDistance()
inChainFromTip := make([]types.Hash, 0, pruneDistance*2)
for cur := c.Tip; cur != nil && (c.Tip.EffectiveHeight-cur.EffectiveHeight) <= pruneDistance; cur = c.Window[cur.ParentTemplateId] {
inChainFromTip = append(inChainFromTip, cur.TemplateId)
for _, u := range cur.Uncles {
inChainFromTip = append(inChainFromTip, u.TemplateId)
}
}
for k := range c.Window {
if !slices.Contains(inChainFromTip, k) {
delete(c.Window, k)
}
}
}

5
go.mod
View file

@ -3,9 +3,10 @@ module git.gammaspectra.live/P2Pool/p2pool-observer-bot
go 1.20
require (
git.gammaspectra.live/P2Pool/p2pool-observer v0.0.0-20230424175621-2292212ecdc5
git.gammaspectra.live/P2Pool/p2pool-observer v0.0.0-20230425032551-691a83c37b3a
github.com/whyrusleeping/hellabot v0.0.0-20230331073038-70f5dd5c40d9
golang.org/x/exp v0.0.0-20230424174712-0ee363d48fb1
go.etcd.io/bbolt v1.3.7
golang.org/x/exp v0.0.0-20230425010034-47ecfdc1ba53
gopkg.in/inconshreveable/log15.v2 v2.16.0
gopkg.in/sorcix/irc.v2 v2.0.0-20200812151606-3f15758ea8c7
nhooyr.io/websocket v1.8.7

10
go.sum
View file

@ -6,8 +6,8 @@ git.gammaspectra.live/P2Pool/go-randomx v0.0.0-20221027085532-f46adfce03a7 h1:bz
git.gammaspectra.live/P2Pool/go-randomx v0.0.0-20221027085532-f46adfce03a7/go.mod h1:3kT0v4AMwT/OdorfH2gRWPwoOrUX/LV03HEeBsaXG1c=
git.gammaspectra.live/P2Pool/moneroutil v0.0.0-20221007140323-a2daa2d5fc48 h1:ExrYG0RSrx/I4McPWgUF4B8R2OkblMrMki2ia8vG6Bw=
git.gammaspectra.live/P2Pool/moneroutil v0.0.0-20221007140323-a2daa2d5fc48/go.mod h1:XeSC8jK8RXnnzVAmp9e9AQZCDIbML3UoCRkxxGA+lpU=
git.gammaspectra.live/P2Pool/p2pool-observer v0.0.0-20230424175621-2292212ecdc5 h1:ZjBXaKFlqN/gyQ0y7QLMXYjm1U8y924diBk6TrNMQGI=
git.gammaspectra.live/P2Pool/p2pool-observer v0.0.0-20230424175621-2292212ecdc5/go.mod h1:wCBIojZblScPifeE5M+GE1bMumwiDFS5HsMYHIEqTQ8=
git.gammaspectra.live/P2Pool/p2pool-observer v0.0.0-20230425032551-691a83c37b3a h1:eeQsDqeYrj1mCwoC46/ZRhwYQz4obxn2mrupfWQQXzk=
git.gammaspectra.live/P2Pool/p2pool-observer v0.0.0-20230425032551-691a83c37b3a/go.mod h1:wCBIojZblScPifeE5M+GE1bMumwiDFS5HsMYHIEqTQ8=
git.gammaspectra.live/P2Pool/randomx-go-bindings v0.0.0-20221027134633-11f5607e6752 h1:4r4KXpFLbixah+OGrBT9ZEflSZoFHD7aVJpXL3ukVIo=
git.gammaspectra.live/P2Pool/randomx-go-bindings v0.0.0-20221027134633-11f5607e6752/go.mod h1:KQaYHIxGXNHNMQELC7xGLu8xouwvP/dN7iGk681BXmk=
github.com/bahlo/generic-list-go v0.2.0 h1:5sz/EEAK+ls5wF+NeqDpk5+iNdMDXrh3z3nPnH1Wvgk=
@ -87,10 +87,12 @@ github.com/ugorji/go/codec v1.1.7 h1:2SvQaVZ1ouYrrKKwoSk2pzd4A9evlKJb9oTL+OaLUSs
github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY=
github.com/whyrusleeping/hellabot v0.0.0-20230331073038-70f5dd5c40d9 h1:y7lb+uda1qXXCfyxbPV707hHWrPL5bfId2bk7n9kvm8=
github.com/whyrusleeping/hellabot v0.0.0-20230331073038-70f5dd5c40d9/go.mod h1:g3f61CcN5csyM0R/e0xF2FX8gKiuGHREi3ostG6FWlQ=
go.etcd.io/bbolt v1.3.7 h1:j+zJOnnEjF/kyHlDDgGnVL/AIqIJPq8UoB2GSNfkUfQ=
go.etcd.io/bbolt v1.3.7/go.mod h1:N9Mkw9X8x5fupy0IKsmuqVtoGDyxsaDlbk4Rd05IAQw=
golang.org/x/crypto v0.8.0 h1:pd9TJtTueMTVQXzk8E2XESSMQDj/U7OUu0PqJqPXQjQ=
golang.org/x/crypto v0.8.0/go.mod h1:mRqEX+O9/h5TFCrQhkgjo2yKi0yYA+9ecGkdQoHrywE=
golang.org/x/exp v0.0.0-20230424174712-0ee363d48fb1 h1:7ewEue0BB5yqldKyRBV5KoDD2uiBiQpTA6DxObvjj/M=
golang.org/x/exp v0.0.0-20230424174712-0ee363d48fb1/go.mod h1:V1LtkGg67GoY2N1AnLN78QLrzxkLyJw7RJb1gzOOz9w=
golang.org/x/exp v0.0.0-20230425010034-47ecfdc1ba53 h1:5llv2sWeaMSnA3w2kS57ouQQ4pudlXrR0dCgw51QK9o=
golang.org/x/exp v0.0.0-20230425010034-47ecfdc1ba53/go.mod h1:V1LtkGg67GoY2N1AnLN78QLrzxkLyJw7RJb1gzOOz9w=
golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=

100
position_chart.go Normal file
View file

@ -0,0 +1,100 @@
package main
import (
"git.gammaspectra.live/P2Pool/p2pool-observer/utils"
"golang.org/x/exp/slices"
)
type PositionChart struct {
totalItems uint64
bucket []uint64
idle byte
}
func (p *PositionChart) Add(index int, value uint64) {
if index < 0 || index > int(p.totalItems) {
return
}
if len(p.bucket) == 1 {
p.bucket[0] += value
return
}
i := uint64(index) * uint64(len(p.bucket)-1) / (p.totalItems - 1)
p.bucket[i] += value
}
func (p *PositionChart) Total() (result uint64) {
for _, e := range p.bucket {
result += e
}
return
}
func (p *PositionChart) Size() uint64 {
return uint64(len(p.bucket))
}
func (p *PositionChart) Resolution() uint64 {
return p.totalItems / uint64(len(p.bucket))
}
func (p *PositionChart) SetIdle(idleChar byte) {
p.idle = idleChar
}
func (p *PositionChart) String() string {
position := make([]byte, 2*2+len(p.bucket))
position[0], position[1] = '[', '<'
position[len(position)-2], position[len(position)-1] = '<', ']'
for i, e := range utils.ReverseSlice(slices.Clone(p.bucket)) {
if e > 0 {
if e > 9 {
position[2+i] = '+'
} else {
position[2+i] = 0x30 + byte(e)
}
} else {
position[2+i] = p.idle
}
}
return string(position)
}
func (p *PositionChart) StringWithSeparator(index int) string {
if index < 0 || index > int(p.totalItems) {
return p.String()
}
separatorIndex := index * (len(p.bucket) - 1) / int(p.totalItems-1)
position := make([]byte, 1+2*2+len(p.bucket))
position[0], position[1] = '[', '<'
position[2+separatorIndex] = '|'
position[len(position)-2], position[len(position)-1] = '<', ']'
for i, e := range utils.ReverseSlice(slices.Clone(p.bucket)) {
if i >= separatorIndex {
i++
}
if e > 0 {
if e > 9 {
position[2+i] = '+'
} else {
position[2+i] = 0x30 + byte(e)
}
} else {
position[2+i] = p.idle
}
}
return string(position)
}
func NewPositionChart(size uint64, totalItems uint64) *PositionChart {
if size < 1 {
size = 1
}
return &PositionChart{
totalItems: totalItems,
bucket: make([]uint64, size),
idle: '.',
}
}