diff --git a/.gitignore b/.gitignore index e892ae5..3065174 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ .idea .env -docker-compose.override.yml \ No newline at end of file +docker-compose.override.yml +subscriptions.db \ No newline at end of file diff --git a/api.go b/api.go new file mode 100644 index 0000000..1657065 --- /dev/null +++ b/api.go @@ -0,0 +1,117 @@ +package main + +import ( + "encoding/json" + "fmt" + "git.gammaspectra.live/P2Pool/p2pool-observer/index" + "golang.org/x/exp/slices" + "io" + "net/http" + "net/url" + "strings" + "time" +) + +func getFromAPI(host, method string) any { + uri, _ := url.Parse(host + method) + if response, err := http.DefaultClient.Do(&http.Request{ + Method: "GET", + URL: uri, + }); err != nil { + return nil + } else { + defer response.Body.Close() + if response.StatusCode == http.StatusOK { + if strings.Index(response.Header.Get("content-type"), "/json") != -1 { + var result any + decoder := json.NewDecoder(response.Body) + decoder.UseNumber() + err = decoder.Decode(&result) + return result + } else if data, err := io.ReadAll(response.Body); err != nil { + return nil + } else { + return data + } + } else { + return nil + } + } +} + +func getTypeFromAPI[T any](host, method string) *T { + uri, _ := url.Parse(host + method) + if response, err := http.DefaultClient.Do(&http.Request{ + Method: "GET", + URL: uri, + }); err != nil { + return nil + } else { + defer response.Body.Close() + if response.StatusCode == http.StatusOK { + var result T + if data, err := io.ReadAll(response.Body); err != nil { + return nil + } else if json.Unmarshal(data, &result) != nil { + return nil + } else { + return &result + } + } else { + return nil + } + } +} + +func getSideBlocksFromAPI(host, method string) []*index.SideBlock { + return getSliceFromAPI[*index.SideBlock](host, method) +} + +func getSliceFromAPI[T any](host, method string) []T { + uri, _ := url.Parse(host + method) + if response, err := http.DefaultClient.Do(&http.Request{ + Method: "GET", + URL: uri, + }); err != nil { + return nil + } else { + defer response.Body.Close() + if response.StatusCode == http.StatusOK { + var result []T + if data, err := io.ReadAll(response.Body); err != nil { + return nil + } else if json.Unmarshal(data, &result) != nil { + return nil + } else { + return result + } + } else { + return nil + } + } +} + +func getPoolInfo(host string) map[string]any { + var basePoolInfo map[string]any + + var ok bool + for { + d := getFromAPI(host, "/api/pool_info") + basePoolInfo, ok = d.(map[string]any) + if d == nil || !ok || basePoolInfo == nil || len(basePoolInfo) == 0 { + time.Sleep(5) + continue + } + break + } + return basePoolInfo +} + +func getPreviousBlocks(host string) (result foundBlocks) { + result = getSliceFromAPI[*index.FoundBlock](host, fmt.Sprintf("/api/found_blocks?limit=%d", numberOfBlockHistoryToKeep)) + //sort from oldest to newest + slices.SortFunc(result, func(a, b *index.FoundBlock) bool { + return a.MainBlock.Height < b.MainBlock.Height + }) + return result +} diff --git a/bot.go b/bot.go index a425bcb..57927aa 100644 --- a/bot.go +++ b/bot.go @@ -3,7 +3,6 @@ package main import ( "context" "encoding/json" - "errors" "flag" "fmt" utils2 "git.gammaspectra.live/P2Pool/p2pool-observer/cmd/utils" @@ -18,12 +17,12 @@ import ( "gopkg.in/sorcix/irc.v2" "io" "log" + "math" "net/http" "net/url" "nhooyr.io/websocket" "nhooyr.io/websocket/wsjson" "os" - "regexp" "strings" "sync" "time" @@ -64,15 +63,6 @@ const FormatItalic = "\x1D" const FormatUnderline = "\x1F" const FormatReset = "\x0F" -var guestUserRegex = regexp.MustCompile("^Guest[0-9]+_*$") - -func isNickAllowed(nick string) error { - if guestUserRegex.MatchString(nick) { - return errors.New("guest user is not allowed") - } - return nil -} - func main() { ircHost := flag.String("irc-host", "irc.libera.chat", "") ircPort := flag.Uint("irc-port", 6697, "") @@ -84,106 +74,15 @@ func main() { botPassword := os.Getenv("BOT_PASSWORD") pleromaCookie := os.Getenv("PLEROMA_COOKIE") + dbPath := flag.String("db", "", "Path of database") + botChannels := flag.String("channels", "", "A list of #CHANNEL,NAME,API_ENDPOINT separated by; for example: #p2pool-main,Main,https://p2pool.observer;#p2pool-mini,Mini,https://mini.p2pool.observer") flag.Parse() - type channelEntry struct { - ApiEndpoint string - Channel string - Name string - PreviousBlocks foundBlocks - LastWindowMainDifficulty struct { - Height uint64 - Difficulty uint64 - } - PoolInfo map[string]any - Consensus *sidechain.Consensus - Window map[types.Hash]*index.SideBlock - - Ws *websocket.Conn - } - - const numberOfBlockHistoryToKeep = 10 - - getPoolInfo := func(host string) map[string]any { - var basePoolInfo map[string]any - - for { - func() { - uri, _ := url.Parse(host + "/api/pool_info") - if response, err := http.DefaultClient.Do(&http.Request{ - Method: "GET", - URL: uri, - }); err != nil { - return - } else { - defer response.Body.Close() - if response.StatusCode == http.StatusOK { - if strings.Index(response.Header.Get("content-type"), "/json") != -1 { - decoder := json.NewDecoder(response.Body) - decoder.UseNumber() - err = decoder.Decode(&basePoolInfo) - return - } - } - return - } - }() - if basePoolInfo == nil || len(basePoolInfo) == 0 { - time.Sleep(5) - continue - } - break - } - return basePoolInfo - } - - getPreviousBlocks := func(host string) (result foundBlocks) { - if host == "" { - return nil - } - response, err := http.DefaultClient.Get(fmt.Sprintf("%s/api/found_blocks?limit=%d", host, numberOfBlockHistoryToKeep)) - if err != nil { - return nil - } - defer response.Body.Close() - if buf, err := io.ReadAll(response.Body); err != nil { - return nil - } else { - if err = json.Unmarshal(buf, &result); err != nil { - return nil - } else { - //sort from oldest to newest - slices.SortFunc(result, func(a, b *index.FoundBlock) bool { - return a.MainBlock.Height < b.MainBlock.Height - }) - return result - } - } - } - - openWebsocket := func(host string) *websocket.Conn { - if host == "" { - return nil - } - ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) - defer cancel() - - u, _ := url.Parse(host) - if u.Scheme == "https" { - c, _, err := websocket.Dial(ctx, fmt.Sprintf("wss://%s/api/events", u.Host), nil) - if err != nil { - return nil - } - return c - } else { - c, _, err := websocket.Dial(ctx, fmt.Sprintf("ws://%s/api/events", u.Host), nil) - if err != nil { - return nil - } - return c - } + db, err := NewDB(*dbPath) + if err != nil { + log.Panic(err) } doPleromaPost := func(e *channelEntry, message string) { @@ -239,6 +138,8 @@ func main() { continue } + log.Printf("Creating entry for %s", split[2]) + basePoolInfo := getPoolInfo(split[2]) consensusData, _ := json.Marshal(basePoolInfo["sidechain"].(map[string]any)["consensus"].(map[string]any)) @@ -247,15 +148,24 @@ func main() { log.Panic(err) } - channelEntries = append(channelEntries, &channelEntry{ + entry := &channelEntry{ ApiEndpoint: split[2], Channel: split[0], Name: split[1], Consensus: consensus, PreviousBlocks: getPreviousBlocks(split[2]), - Ws: openWebsocket(split[2]), Window: make(map[types.Hash]*index.SideBlock), - }) + } + for _, b := range getSideBlocksFromAPI(entry.ApiEndpoint, fmt.Sprintf("/api/side_blocks_in_window?window=%d&noMainStatus", entry.DesiredPruneDistance())) { + entry.Window[b.TemplateId] = b + if entry.Tip == nil || entry.Tip.SideHeight < b.SideHeight { + entry.Tip = b + } + } + //websocket needs to come after + entry.openWebSocket() + + channelEntries = append(channelEntries, entry) } @@ -265,6 +175,7 @@ func main() { var beforeJoiningChannels sync.Once if bot, err := hbot.NewBot(fmt.Sprintf("%s:%d", *ircHost, *ircPort), *botNickName, func(bot *hbot.Bot) { + bot.ThrottleDelay = time.Millisecond * 100 bot.Nick = *botNickName bot.Realname = *botUserName bot.SSL = *ircSsl @@ -311,7 +222,7 @@ func main() { }) { onlineUsersLock.Lock() defer onlineUsersLock.Unlock() - for _, nick := range strings.Split(message.Params[3], " ") { + for _, nick := range strings.Split(strings.ToLower(message.Params[3]), " ") { if !slices.Contains(onlineUsers, nick) { onlineUsers = append(onlineUsers, strings.TrimLeft(nick, "@~%+")) } @@ -332,8 +243,8 @@ func main() { if len(message.Params) != 1 { return false } - from := message.From - to := message.To + from := strings.ToLower(message.From) + to := strings.ToLower(message.To) onlineUsersLock.Lock() defer onlineUsersLock.Unlock() if i := slices.Index(onlineUsers, from); i != -1 { @@ -356,7 +267,7 @@ func main() { if len(message.Params) != 1 { return false } - nick := message.From + nick := strings.ToLower(message.From) channelName := message.To onlineUsersLock.Lock() defer onlineUsersLock.Unlock() @@ -375,7 +286,7 @@ func main() { onlineUsers = append(onlineUsers, nick) } } else if message.Command == irc.QUIT { - nick := message.From + nick := strings.ToLower(message.From) onlineUsersLock.Lock() defer onlineUsersLock.Unlock() if i := slices.Index(onlineUsers, nick); i != -1 { @@ -395,33 +306,80 @@ func main() { // see about irc.ERR_NICKNAMEINUSE or irc.ERR_NICKCOLLISION to recover nick - shareFound := func(e *channelEntry, b *index.SideBlock, target string) { - uHeight := (b.SideHeight << 16) | (uint64(b.MainId[0]) << 8) | uint64(b.MainId[1]) + payoutFound := func(e *channelEntry, b *index.FoundBlock, o *index.MainCoinbaseOutput, sub *Subscription) { + uHeight := (b.SideHeight << 16) | (uint64(b.MainBlock.Id[0]) << 8) | uint64(b.MainBlock.Id[1]) - if b.UncleOf != types.ZeroHash { - bot.Msg(target, fmt.Sprintf( - "%sUNCLE SHARE FOUND%s on %s: Side height %s%d%s, Parent Side height %s%d%s :: %s/s/%s :: Accounted for %d%% of value :: Template Id %s%s%s :: Miner Address %s%s", + payoutIndex := (b.SideHeight << uint64(math.Ceil(math.Log2(float64(e.Consensus.ChainWindowSize*4))))) | uint64(o.Index) + + bot.Msg(sub.Nick, fmt.Sprintf( + "%sPAYOUT%s on %s: %s%s%s XMR%s to %s%s%s:: Main height %s%d%s, Side height %d :: %s/s/%s :: Verify payout %s/p/%s", + FormatColorLightGreen+FormatBold, FormatReset, e.Name, + FormatColorOrange, FormatBold, utils.XMRUnits(o.Value), FormatReset, + FormatItalic, utils.Shorten(o.MinerAddress.ToBase58(), 10), FormatReset, + FormatColorRed, b.MainBlock.Height, FormatReset, + b.SideHeight, + e.ApiEndpoint, utils.EncodeBinaryNumber(uHeight), + e.ApiEndpoint, utils.EncodeBinaryNumber(payoutIndex), + )) + + } + + shareFound := func(e *channelEntry, tip *index.SideBlock, sub *Subscription) { + uHeight := (tip.SideHeight << 16) | (uint64(tip.MainId[0]) << 8) | uint64(tip.MainId[1]) + + shareCount := 0 + uncleCount := 0 + + var yourWeight types.Difficulty + var totalWeight types.Difficulty + + for range index.IterateSideBlocksInPPLNSWindow(tip, e.Consensus, e.DifficultyFromHeight, e.SideBlockByTemplateId, e.SideBlockUnclesByTemplateId, func(b *index.SideBlock, weight types.Difficulty) { + totalWeight = totalWeight.Add(weight) + if tip.Miner == b.Miner { + if b.IsUncle() { + uncleCount++ + } else { + shareCount++ + } + yourWeight = yourWeight.Add(weight) + } + }, func(err error) { + + }) { + + } + + shareRatio := float64(yourWeight.Lo) / float64(totalWeight.Lo) + if shareRatio > notificationPoolShare { //disable spammy notifications + return + } + + if tip.UncleOf != types.ZeroHash { + bot.Msg(sub.Nick, fmt.Sprintf( + "%sUNCLE SHARE FOUND%s on %s: Side height %s%d%s, Parent Side height %s%d%s :: %s/s/%s :: Accounted for %d%% of value :: Template Id %s%s%s :: Window shares %d (+%d uncles) ~%.03f%% :: Miner Address %s%s", FormatColorLightGreen+FormatBold, FormatReset, e.Name, - FormatColorRed, b.SideHeight, FormatReset, - FormatColorRed, b.EffectiveHeight, FormatReset, + FormatColorRed, tip.SideHeight, FormatReset, + FormatColorRed, tip.EffectiveHeight, FormatReset, e.ApiEndpoint, utils.EncodeBinaryNumber(uHeight), 100-e.Consensus.UnclePenalty, - FormatItalic, utils.Shorten(b.TemplateId.String(), 8), FormatReset, - FormatItalic, utils.Shorten(b.MinerAddress.ToBase58(), 10), + FormatItalic, utils.Shorten(tip.TemplateId.String(), 8), FormatReset, + shareCount, uncleCount, shareRatio*100, + FormatItalic, utils.Shorten(tip.MinerAddress.ToBase58(), 10), )) } else { uncleText := "" - if len(b.Uncles) > 0 { - uncleText = fmt.Sprintf(":: Includes %d uncle(s) for extra %d%% of their value ", len(b.Uncles), e.Consensus.UnclePenalty) + if len(tip.Uncles) > 0 { + uncleText = fmt.Sprintf(":: Includes %d uncle(s) for extra %d%% of their value ", len(tip.Uncles), e.Consensus.UnclePenalty) } - bot.Msg(target, fmt.Sprintf( - "%sSHARE FOUND%s on %s: Side height %s%d%s :: %s/s/%s %s:: Template Id %s%s%s :: Miner Address %s%s", + bot.Msg(sub.Nick, fmt.Sprintf( + "%sSHARE FOUND%s on %s: Side height %s%d%s :: %s/s/%s %s:: Template Id %s%s%s :: Window shares %d (+%d uncles) ~%.03f%% :: Miner Address %s%s", FormatColorLightGreen+FormatBold, FormatReset, e.Name, - FormatColorRed, b.SideHeight, FormatReset, + FormatColorRed, tip.SideHeight, FormatReset, e.ApiEndpoint, utils.EncodeBinaryNumber(uHeight), uncleText, - FormatItalic, utils.Shorten(b.TemplateId.String(), 8), FormatReset, - FormatItalic, utils.Shorten(b.MinerAddress.ToBase58(), 10), + FormatItalic, utils.Shorten(tip.TemplateId.String(), 8), FormatReset, + shareCount, uncleCount, shareRatio*100, + FormatItalic, utils.Shorten(tip.MinerAddress.ToBase58(), 10), )) } } @@ -492,10 +450,30 @@ func main() { if len(trimMessage) <= 0 || trimMessage[0] != '.' { return false } - return message.To == bot.Nick + if message.To == bot.Nick { + return true + } + for _, e := range channelEntries { + if message.To == e.Channel { + return true + } + } + return false }, Action: func(bot *hbot.Bot, message *hbot.Message) bool { - bot.Msg(message.Name, "Bot notifications currently out of service") + replyTo := message.To + if replyTo == bot.Nick || replyTo[0] != '#' { + replyTo = message.Name + } + + for _, c := range commands { + if matches := c.Match.FindStringSubmatch(message.Content); len(matches) > 0 { + if c.Handle(db, channelEntries, bot, message, replyTo, matches...) { + return true + } + } + } + bot.Msg(replyTo, "Command not recognized") return true }, }) @@ -510,38 +488,79 @@ func main() { if e.Ws == nil { log.Printf("[WS] WebSocket for %s is disconnected, retrying", e.ApiEndpoint) - for ; e.Ws == nil; e.Ws = openWebsocket(e.ApiEndpoint) { + for ; e.Ws == nil; e.openWebSocket() { time.Sleep(time.Second * 30) log.Printf("[WS] WebSocket for %s is disconnected, retrying again", e.ApiEndpoint) } } for { + var event utils2.JSONEvent - if err := wsjson.Read(context.Background(), e.Ws, &event); err != nil { + if err := func() error { + //there should be at least one share every five minutes, otherwise we are out of sync + ctx, cancel := context.WithTimeout(context.Background(), time.Minute*5) + defer cancel() + return wsjson.Read(ctx, e.Ws, &event) + }(); err != nil { e.Ws.Close(websocket.StatusGoingAway, "error on read "+err.Error()) e.Ws = nil break } else { - switch event.Type { - case utils2.JSONEventSideBlock: - //shareFound(e, event.SideBlock, "DataHoarder") - case utils2.JSONEventFoundBlock: - e.PreviousBlocks = append(e.PreviousBlocks, event.FoundBlock) - blockFound(e, event.FoundBlock, e.PreviousBlocks.GetPrevious(event.FoundBlock)) - if len(e.PreviousBlocks) > numberOfBlockHistoryToKeep { - //delete oldest block - e.PreviousBlocks = slices.Delete(e.PreviousBlocks, 0, 1) + func() { + e.ChainLock.Lock() + defer e.ChainLock.Unlock() + + switch event.Type { + case utils2.JSONEventSideBlock: + b := event.SideBlock + e.Window[b.TemplateId] = b + if !b.IsUncle() { + e.Tip = b + } + + onlineUsersLock.RLock() + defer onlineUsersLock.RUnlock() + for _, sub := range db.GetByAddress(b.MinerAddress) { + if slices.Contains(onlineUsers, strings.ToLower(sub.Nick)) { + //do not send notification if user is not online + shareFound(e, b, sub) + } else { + log.Printf("Could not send notification to %s - %s: not online", sub.Nick, sub.Address.ToBase58()) + } + } + e.pruneBlocks() + case utils2.JSONEventFoundBlock: + e.PreviousBlocks = append(e.PreviousBlocks, event.FoundBlock) + blockFound(e, event.FoundBlock, e.PreviousBlocks.GetPrevious(event.FoundBlock)) + + onlineUsersLock.RLock() + defer onlineUsersLock.RUnlock() + for _, o := range event.MainCoinbaseOutputs { + for _, sub := range db.GetByAddress(o.MinerAddress) { + if slices.Contains(onlineUsers, strings.ToLower(sub.Nick)) { + //do not send notification if user is not online + payoutFound(e, event.FoundBlock, &o, sub) + } else { + log.Printf("Could not send notification to %s - %s: not online", sub.Nick, sub.Address.ToBase58()) + } + } + } + + if len(e.PreviousBlocks) > numberOfBlockHistoryToKeep { + //delete oldest block + e.PreviousBlocks = slices.Delete(e.PreviousBlocks, 0, 1) + } + case utils2.JSONEventOrphanedBlock: + if i := slices.IndexFunc(e.PreviousBlocks, func(block *index.FoundBlock) bool { + return event.SideBlock.MainId == block.MainBlock.Id + }); i != -1 { + //only notify if we reported it previously + slices.Delete(e.PreviousBlocks, i, i+1) + blockOrphaned(e, event.SideBlock) + } } - case utils2.JSONEventOrphanedBlock: - if i := slices.IndexFunc(e.PreviousBlocks, func(block *index.FoundBlock) bool { - return event.SideBlock.MainId == block.MainBlock.Id - }); i != -1 { - //only notify if we reported it previously - slices.Delete(e.PreviousBlocks, i, i+1) - blockOrphaned(e, event.SideBlock) - } - } + }() } } }(e) diff --git a/commands.go b/commands.go new file mode 100644 index 0000000..95d6b20 --- /dev/null +++ b/commands.go @@ -0,0 +1,180 @@ +package main + +import ( + "errors" + "fmt" + "git.gammaspectra.live/P2Pool/p2pool-observer/index" + "git.gammaspectra.live/P2Pool/p2pool-observer/monero/address" + "git.gammaspectra.live/P2Pool/p2pool-observer/p2pool/sidechain" + "git.gammaspectra.live/P2Pool/p2pool-observer/types" + "git.gammaspectra.live/P2Pool/p2pool-observer/utils" + hbot "github.com/whyrusleeping/hellabot" + "regexp" +) + +type command struct { + Match *regexp.Regexp + Handle func(db *DB, entries []*channelEntry, bot *hbot.Bot, message *hbot.Message, replyTo string, matches ...string) bool +} + +var guestUserRegex = regexp.MustCompile("^Guest[0-9]+_*$") + +func isNickAllowed(nick string) error { + if guestUserRegex.MatchString(nick) { + return errors.New("guest user is not allowed") + } + return nil +} + +var commands = []command{ + { + Match: regexp.MustCompile("^\\.(status|shares)[ \\t]*"), + Handle: func(db *DB, entries []*channelEntry, bot *hbot.Bot, message *hbot.Message, replyTo string, matches ...string) bool { + subs := db.GetByNick(message.Name) + if len(subs) == 0 { + bot.Msg(replyTo, "No known subscriptions to your nick.") + return true + } + type result struct { + Endpoint string + ShareCount int + UncleCount int + YourWeight types.Difficulty + TotalWeight types.Difficulty + Tip *index.SideBlock + Address *address.Address + MinerId uint64 + SharesPosition *PositionChart + UnclesPosition *PositionChart + Consensus *sidechain.Consensus + } + + var hasResults bool + var results []*result + for i := range entries { + e := entries[i] + if e.ApiEndpoint != "" && (message.To == bot.Nick || e.Channel == message.To) { + func() { + e.ChainLock.RLock() + defer e.ChainLock.RUnlock() + + var tr []*result + for _, sub := range subs { + var r result + r.Tip = e.Tip + r.Endpoint = e.ApiEndpoint + r.Address = sub.Address + r.Consensus = e.Consensus + + r.SharesPosition = NewPositionChart(30, uint64(e.Tip.WindowDepth)) + r.UnclesPosition = NewPositionChart(30, uint64(e.Tip.WindowDepth)) + tr = append(tr, &r) + } + + for range index.IterateSideBlocksInPPLNSWindow(e.Tip, e.Consensus, e.DifficultyFromHeight, e.SideBlockByTemplateId, e.SideBlockUnclesByTemplateId, func(b *index.SideBlock, weight types.Difficulty) { + for _, r := range tr { + r.TotalWeight = r.TotalWeight.Add(weight) + if b.MinerAddress.Compare(r.Address) == 0 { + r.MinerId = b.Miner + r.YourWeight = r.YourWeight.Add(weight) + if b.IsUncle() { + r.UnclesPosition.Add(int(e.Tip.SideHeight-b.SideHeight), 1) + r.UncleCount++ + } else { + r.SharesPosition.Add(int(e.Tip.SideHeight-b.SideHeight), 1) + r.ShareCount++ + } + } + } + }, func(err error) { + + }) { + + } + + for _, r := range tr { + if r.ShareCount > 0 || r.UncleCount > 0 { + results = append(results, r) + hasResults = true + } + } + + }() + } + } + + if !hasResults { + bot.Msg(replyTo, "You do not currently have any shares within the PPLNS window across the tracked pools.") + } else { + for _, r := range results { + ratio := float64(r.YourWeight.Lo) / float64(r.TotalWeight.Lo) + bot.Msg(replyTo, fmt.Sprintf( + "Your shares %d (+%d uncles) ~%.03f%% %sH/s :: Miner Statistics %s/m/%s :: Shares/Uncles position %s %s", + r.ShareCount, + r.UncleCount, + ratio*100, + utils.SiUnits(ratio*float64(types.DifficultyFrom64(r.Tip.Difficulty).Div64(r.Consensus.TargetBlockTime).Lo), 3), + r.Endpoint, + utils.EncodeBinaryNumber(r.MinerId), + r.SharesPosition.String(), + r.UnclesPosition.String(), + )) + } + } + return true + + }, + }, + + { + Match: regexp.MustCompile("^\\.(sub|subscribe)[ \\t]+(4[123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz]+)[ \\t]*"), + Handle: func(db *DB, entries []*channelEntry, bot *hbot.Bot, message *hbot.Message, replyTo string, matches ...string) bool { + if err := isNickAllowed(message.Name); err != nil { + bot.Msg(replyTo, fmt.Sprintf("Cannot subscribe: %s", err)) + return true + } + + addr := address.FromBase58(matches[2]) + if addr == nil { + bot.Msg(replyTo, "Cannot subscribe: Invalid Monero address") + return true + } + + if err := db.Store(&Subscription{ + Address: addr, + Nick: message.Name, + }); err == nil { + bot.Msg(replyTo, fmt.Sprintf("Subscribed your nick to shares found by %s%s%s while you are online. You can private message this bot for any commands instead of using public channels.", FormatItalic, utils.Shorten(addr.ToBase58(), 10), FormatReset)) + for _, e := range entries { + func() { + e.ChainLock.RLock() + defer e.ChainLock.RUnlock() + + var yourWeight types.Difficulty + var totalWeight types.Difficulty + + for range index.IterateSideBlocksInPPLNSWindow(e.Tip, e.Consensus, e.DifficultyFromHeight, e.SideBlockByTemplateId, e.SideBlockUnclesByTemplateId, func(b *index.SideBlock, weight types.Difficulty) { + totalWeight = totalWeight.Add(weight) + if b.MinerAddress.Compare(addr) == 0 { + yourWeight = yourWeight.Add(weight) + } + }, func(err error) { + + }) { + + } + + shareRatio := float64(yourWeight.Lo) / float64(totalWeight.Lo) + if shareRatio > notificationPoolShare { //warn about spammy notifications + bot.Msg(replyTo, fmt.Sprintf("You have more than %.01f%% of the %s pool total share weight (%s) with %.03f%%. Share notifications will not be sent above this threshold. Consider using the /api/events interface directly.", notificationPoolShare*100, e.Name, e.ApiEndpoint, shareRatio*100)) + } + }() + } + return true + } else { + bot.Msg(replyTo, fmt.Sprintf("Cannot subscribe: %s", err)) + return true + } + }, + }, +} diff --git a/constants.go b/constants.go new file mode 100644 index 0000000..2bb6e97 --- /dev/null +++ b/constants.go @@ -0,0 +1,5 @@ +package main + +const numberOfBlockHistoryToKeep = 10 + +const notificationPoolShare = 0.2 diff --git a/db.go b/db.go new file mode 100644 index 0000000..b7b06a2 --- /dev/null +++ b/db.go @@ -0,0 +1,129 @@ +package main + +import ( + "encoding/json" + "git.gammaspectra.live/P2Pool/p2pool-observer/monero/address" + bolt "go.etcd.io/bbolt" + "golang.org/x/exp/slices" + "log" + "strings" + "time" +) + +type Subscription struct { + Address *address.Address `json:"address"` + Nick string `json:"nick"` +} + +type DB struct { + db *bolt.DB +} + +var refByNick = []byte("refByNick") +var refByAddr = []byte("refByAddr") + +func NewDB(path string) (*DB, error) { + if db, err := bolt.Open(path, 0666, &bolt.Options{Timeout: time.Second * 5}); err != nil { + return nil, err + } else { + if err = db.Update(func(tx *bolt.Tx) error { + if _, err := tx.CreateBucketIfNotExists(refByNick); err != nil { + return err + } else if _, err := tx.CreateBucketIfNotExists(refByAddr); err != nil { + return err + } + return nil + }); err != nil { + return nil, err + } + return &DB{ + db: db, + }, nil + } +} + +func (db *DB) GetByNick(nick string) (result []*Subscription) { + _ = db.db.View(func(tx *bolt.Tx) error { + b := tx.Bucket(refByNick) + if buf := b.Get([]byte(strings.ToLower(nick))); buf != nil { + var addrs []*address.Address + if err := json.Unmarshal(buf, &addrs); err != nil { + return err + } + for _, a := range addrs { + result = append(result, &Subscription{ + Address: a, + Nick: nick, + }) + } + } + return nil + }) + return result +} + +func (db *DB) GetByAddress(a *address.Address) (result []*Subscription) { + _ = db.db.View(func(tx *bolt.Tx) error { + b := tx.Bucket(refByAddr) + if buf := b.Get([]byte(a.ToBase58())); buf != nil { + var nicks []string + if err := json.Unmarshal(buf, &nicks); err != nil { + return err + } + for _, nick := range nicks { + result = append(result, &Subscription{ + Address: a, + Nick: nick, + }) + } + } + return nil + }) + return result +} + +func (db *DB) Store(sub *Subscription) error { + + if err := db.db.Update(func(tx *bolt.Tx) error { + + b1 := tx.Bucket(refByAddr) + var nicks []string + if k := b1.Get([]byte(sub.Address.ToBase58())); k != nil { + if err := json.Unmarshal(k, &nicks); err != nil { + return err + } + } + if !slices.Contains(nicks, strings.ToLower(sub.Nick)) { + nicks = append(nicks, strings.ToLower(sub.Nick)) + } + if buf, err := json.Marshal(nicks); err != nil { + return err + } else if err = b1.Put([]byte(sub.Address.ToBase58()), buf); err != nil { + return err + } + + b2 := tx.Bucket(refByNick) + var addresses []*address.Address + if k := b2.Get([]byte(strings.ToLower(sub.Nick))); k != nil { + if err := json.Unmarshal(k, &addresses); err != nil { + return err + } + } + if !slices.ContainsFunc(addresses, func(a *address.Address) bool { + return a.Compare(sub.Address) == 0 + }) { + addresses = append(addresses, sub.Address) + } + if buf, err := json.Marshal(addresses); err != nil { + return err + } else if err = b2.Put([]byte(strings.ToLower(sub.Nick)), buf); err != nil { + return err + } + + return nil + }); err != nil { + log.Printf("[DB] bolt error: %s", err) + return err + } + return nil +} diff --git a/docker-compose.yml b/docker-compose.yml index 2e74580..a59a782 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -20,10 +20,13 @@ services: - PLEROMA_COOKIE=${PLEROMA_COOKIE} security_opt: - no-new-privileges:true + volumes: + - data:/data:rw networks: - p2pool-observer-bot command: >- /usr/bin/bot + -db /data/subscriptions.db -irc-host "${IRC_HOST}" -irc-port "${IRC_PORT}" -bot-nick "${BOT_NICK}" diff --git a/entry.go b/entry.go new file mode 100644 index 0000000..ff03fbe --- /dev/null +++ b/entry.go @@ -0,0 +1,118 @@ +package main + +import ( + "context" + "fmt" + "git.gammaspectra.live/P2Pool/p2pool-observer/index" + "git.gammaspectra.live/P2Pool/p2pool-observer/p2pool/sidechain" + "git.gammaspectra.live/P2Pool/p2pool-observer/types" + "golang.org/x/exp/slices" + "net/url" + "nhooyr.io/websocket" + "sync" + "time" +) + +type channelEntry struct { + ApiEndpoint string + Channel string + Name string + PreviousBlocks foundBlocks + LastWindowMainDifficulty struct { + Height uint64 + Difficulty uint64 + } + PoolInfo map[string]any + Consensus *sidechain.Consensus + Tip *index.SideBlock + Window map[types.Hash]*index.SideBlock + ChainLock sync.RWMutex + + Ws *websocket.Conn +} + +func (c *channelEntry) DesiredPruneDistance() uint64 { + //prune after a whole day + /*pruneDistance := (3600 * 24) / c.Consensus.TargetBlockTime + if pruneDistance < c.Consensus.ChainWindowSize { + pruneDistance = c.Consensus.ChainWindowSize + }*/ + + return c.Consensus.ChainWindowSize +} + +func (c *channelEntry) SideBlockUnclesByTemplateId(id types.Hash) chan *index.SideBlock { + result := make(chan *index.SideBlock) + go func() { + defer close(result) + if b := c.SideBlockByTemplateId(id); b != nil && !b.IsUncle() { + for _, u := range b.Uncles { + if uncle := c.SideBlockByTemplateId(u.TemplateId); uncle != nil { + result <- uncle + } + } + } + }() + return result +} + +func (c *channelEntry) SideBlockByTemplateId(id types.Hash) *index.SideBlock { + if b, ok := c.Window[id]; ok { + return b + } else if b = getTypeFromAPI[index.SideBlock](c.ApiEndpoint, "/api/block_by_id/"+id.String()); b != nil { + c.Window[b.TemplateId] = b + } + return nil +} + +func (c *channelEntry) DifficultyFromHeight(height uint64) types.Difficulty { + if d := getTypeFromAPI[types.Difficulty](c.ApiEndpoint, fmt.Sprintf("/api/main_difficulty_by_height/%d", height)); d != nil { + return *d + } + return types.ZeroDifficulty +} + +func (c *channelEntry) openWebSocket() { + if c.ApiEndpoint == "" { + c.Ws = nil + return + } + ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) + defer cancel() + + u, _ := url.Parse(c.ApiEndpoint) + if u.Scheme == "https" { + conn, _, err := websocket.Dial(ctx, fmt.Sprintf("wss://%s/api/events", u.Host), nil) + if err != nil { + c.Ws = nil + return + } + c.Ws = conn + } else { + conn, _, err := websocket.Dial(ctx, fmt.Sprintf("ws://%s/api/events", u.Host), nil) + if err != nil { + c.Ws = nil + return + } + c.Ws = conn + } +} + +func (c *channelEntry) pruneBlocks() { + pruneDistance := c.DesiredPruneDistance() + + inChainFromTip := make([]types.Hash, 0, pruneDistance*2) + + for cur := c.Tip; cur != nil && (c.Tip.EffectiveHeight-cur.EffectiveHeight) <= pruneDistance; cur = c.Window[cur.ParentTemplateId] { + inChainFromTip = append(inChainFromTip, cur.TemplateId) + for _, u := range cur.Uncles { + inChainFromTip = append(inChainFromTip, u.TemplateId) + } + } + + for k := range c.Window { + if !slices.Contains(inChainFromTip, k) { + delete(c.Window, k) + } + } +} diff --git a/go.mod b/go.mod index f8301d5..7cec4ad 100644 --- a/go.mod +++ b/go.mod @@ -3,9 +3,10 @@ module git.gammaspectra.live/P2Pool/p2pool-observer-bot go 1.20 require ( - git.gammaspectra.live/P2Pool/p2pool-observer v0.0.0-20230424175621-2292212ecdc5 + git.gammaspectra.live/P2Pool/p2pool-observer v0.0.0-20230425032551-691a83c37b3a github.com/whyrusleeping/hellabot v0.0.0-20230331073038-70f5dd5c40d9 - golang.org/x/exp v0.0.0-20230424174712-0ee363d48fb1 + go.etcd.io/bbolt v1.3.7 + golang.org/x/exp v0.0.0-20230425010034-47ecfdc1ba53 gopkg.in/inconshreveable/log15.v2 v2.16.0 gopkg.in/sorcix/irc.v2 v2.0.0-20200812151606-3f15758ea8c7 nhooyr.io/websocket v1.8.7 diff --git a/go.sum b/go.sum index b1f4954..ea40a17 100644 --- a/go.sum +++ b/go.sum @@ -6,8 +6,8 @@ git.gammaspectra.live/P2Pool/go-randomx v0.0.0-20221027085532-f46adfce03a7 h1:bz git.gammaspectra.live/P2Pool/go-randomx v0.0.0-20221027085532-f46adfce03a7/go.mod h1:3kT0v4AMwT/OdorfH2gRWPwoOrUX/LV03HEeBsaXG1c= git.gammaspectra.live/P2Pool/moneroutil v0.0.0-20221007140323-a2daa2d5fc48 h1:ExrYG0RSrx/I4McPWgUF4B8R2OkblMrMki2ia8vG6Bw= git.gammaspectra.live/P2Pool/moneroutil v0.0.0-20221007140323-a2daa2d5fc48/go.mod h1:XeSC8jK8RXnnzVAmp9e9AQZCDIbML3UoCRkxxGA+lpU= -git.gammaspectra.live/P2Pool/p2pool-observer v0.0.0-20230424175621-2292212ecdc5 h1:ZjBXaKFlqN/gyQ0y7QLMXYjm1U8y924diBk6TrNMQGI= -git.gammaspectra.live/P2Pool/p2pool-observer v0.0.0-20230424175621-2292212ecdc5/go.mod h1:wCBIojZblScPifeE5M+GE1bMumwiDFS5HsMYHIEqTQ8= +git.gammaspectra.live/P2Pool/p2pool-observer v0.0.0-20230425032551-691a83c37b3a h1:eeQsDqeYrj1mCwoC46/ZRhwYQz4obxn2mrupfWQQXzk= +git.gammaspectra.live/P2Pool/p2pool-observer v0.0.0-20230425032551-691a83c37b3a/go.mod h1:wCBIojZblScPifeE5M+GE1bMumwiDFS5HsMYHIEqTQ8= git.gammaspectra.live/P2Pool/randomx-go-bindings v0.0.0-20221027134633-11f5607e6752 h1:4r4KXpFLbixah+OGrBT9ZEflSZoFHD7aVJpXL3ukVIo= git.gammaspectra.live/P2Pool/randomx-go-bindings v0.0.0-20221027134633-11f5607e6752/go.mod h1:KQaYHIxGXNHNMQELC7xGLu8xouwvP/dN7iGk681BXmk= github.com/bahlo/generic-list-go v0.2.0 h1:5sz/EEAK+ls5wF+NeqDpk5+iNdMDXrh3z3nPnH1Wvgk= @@ -87,10 +87,12 @@ github.com/ugorji/go/codec v1.1.7 h1:2SvQaVZ1ouYrrKKwoSk2pzd4A9evlKJb9oTL+OaLUSs github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY= github.com/whyrusleeping/hellabot v0.0.0-20230331073038-70f5dd5c40d9 h1:y7lb+uda1qXXCfyxbPV707hHWrPL5bfId2bk7n9kvm8= github.com/whyrusleeping/hellabot v0.0.0-20230331073038-70f5dd5c40d9/go.mod h1:g3f61CcN5csyM0R/e0xF2FX8gKiuGHREi3ostG6FWlQ= +go.etcd.io/bbolt v1.3.7 h1:j+zJOnnEjF/kyHlDDgGnVL/AIqIJPq8UoB2GSNfkUfQ= +go.etcd.io/bbolt v1.3.7/go.mod h1:N9Mkw9X8x5fupy0IKsmuqVtoGDyxsaDlbk4Rd05IAQw= golang.org/x/crypto v0.8.0 h1:pd9TJtTueMTVQXzk8E2XESSMQDj/U7OUu0PqJqPXQjQ= golang.org/x/crypto v0.8.0/go.mod h1:mRqEX+O9/h5TFCrQhkgjo2yKi0yYA+9ecGkdQoHrywE= -golang.org/x/exp v0.0.0-20230424174712-0ee363d48fb1 h1:7ewEue0BB5yqldKyRBV5KoDD2uiBiQpTA6DxObvjj/M= -golang.org/x/exp v0.0.0-20230424174712-0ee363d48fb1/go.mod h1:V1LtkGg67GoY2N1AnLN78QLrzxkLyJw7RJb1gzOOz9w= +golang.org/x/exp v0.0.0-20230425010034-47ecfdc1ba53 h1:5llv2sWeaMSnA3w2kS57ouQQ4pudlXrR0dCgw51QK9o= +golang.org/x/exp v0.0.0-20230425010034-47ecfdc1ba53/go.mod h1:V1LtkGg67GoY2N1AnLN78QLrzxkLyJw7RJb1gzOOz9w= golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= diff --git a/position_chart.go b/position_chart.go new file mode 100644 index 0000000..96a5275 --- /dev/null +++ b/position_chart.go @@ -0,0 +1,100 @@ +package main + +import ( + "git.gammaspectra.live/P2Pool/p2pool-observer/utils" + "golang.org/x/exp/slices" +) + +type PositionChart struct { + totalItems uint64 + bucket []uint64 + idle byte +} + +func (p *PositionChart) Add(index int, value uint64) { + if index < 0 || index > int(p.totalItems) { + return + } + if len(p.bucket) == 1 { + p.bucket[0] += value + return + } + i := uint64(index) * uint64(len(p.bucket)-1) / (p.totalItems - 1) + p.bucket[i] += value +} + +func (p *PositionChart) Total() (result uint64) { + for _, e := range p.bucket { + result += e + } + return +} + +func (p *PositionChart) Size() uint64 { + return uint64(len(p.bucket)) +} + +func (p *PositionChart) Resolution() uint64 { + return p.totalItems / uint64(len(p.bucket)) +} + +func (p *PositionChart) SetIdle(idleChar byte) { + p.idle = idleChar +} + +func (p *PositionChart) String() string { + position := make([]byte, 2*2+len(p.bucket)) + position[0], position[1] = '[', '<' + position[len(position)-2], position[len(position)-1] = '<', ']' + for i, e := range utils.ReverseSlice(slices.Clone(p.bucket)) { + if e > 0 { + if e > 9 { + position[2+i] = '+' + } else { + position[2+i] = 0x30 + byte(e) + } + } else { + position[2+i] = p.idle + } + } + + return string(position) +} + +func (p *PositionChart) StringWithSeparator(index int) string { + if index < 0 || index > int(p.totalItems) { + return p.String() + } + separatorIndex := index * (len(p.bucket) - 1) / int(p.totalItems-1) + position := make([]byte, 1+2*2+len(p.bucket)) + position[0], position[1] = '[', '<' + position[2+separatorIndex] = '|' + position[len(position)-2], position[len(position)-1] = '<', ']' + for i, e := range utils.ReverseSlice(slices.Clone(p.bucket)) { + if i >= separatorIndex { + i++ + } + if e > 0 { + if e > 9 { + position[2+i] = '+' + } else { + position[2+i] = 0x30 + byte(e) + } + } else { + position[2+i] = p.idle + } + } + + return string(position) +} + +func NewPositionChart(size uint64, totalItems uint64) *PositionChart { + if size < 1 { + size = 1 + } + return &PositionChart{ + totalItems: totalItems, + bucket: make([]uint64, size), + idle: '.', + } +}