This repository has been archived on 2024-04-07. You can view files and clone it, but cannot push or open issues or pull requests.
go-monero/cmd/monero/commands/daemon/zmq.go
2022-10-05 09:40:23 +02:00

106 lines
2.3 KiB
Go

package daemon
import (
"context"
"fmt"
"strings"
"github.com/spf13/cobra"
"git.gammaspectra.live/P2Pool/go-monero/cmd/monero/display"
"git.gammaspectra.live/P2Pool/go-monero/pkg/zmq"
)
type zmqCommand struct {
JSON bool
topic string
endpoint string
}
var zmqTopics = []string{
string(zmq.TopicMinimalTxPoolAdd),
string(zmq.TopicFullTxPoolAdd),
string(zmq.TopicMinimalChainMain),
string(zmq.TopicFullChainMain),
}
func (c *zmqCommand) Cmd() *cobra.Command {
var topicChoicesTxt = fmt.Sprintf("(%s)", strings.Join(zmqTopics, ","))
cmd := &cobra.Command{
Use: "zmq",
Short: "listen for zmq notifications",
RunE: c.RunE,
}
cmd.Flags().BoolVar(&c.JSON, "json",
false, "whether or not to output the result as json")
cmd.Flags().StringVar(&c.topic, "topic",
"json-minimal-txpool_add", "zmq topic to subscribe to "+
topicChoicesTxt)
_ = cmd.MarkFlagRequired("topic")
_ = cmd.RegisterFlagCompletionFunc("topic", c.topicCompletion)
cmd.Flags().StringVar(&c.endpoint, "endpoint",
"", "zero-mq endpoint to listen for publications")
_ = cmd.MarkFlagRequired("endpoint")
return cmd
}
func (c *zmqCommand) topicCompletion(
cmd *cobra.Command, args []string, toComplete string,
) ([]string, cobra.ShellCompDirective) {
return zmqTopics, cobra.ShellCompDirectiveDefault
}
func (c *zmqCommand) RunE(_ *cobra.Command, _ []string) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
client := zmq.NewClient(c.endpoint, zmq.Topic(c.topic))
defer client.Close()
stream, err := client.Listen(ctx)
if err != nil {
return fmt.Errorf("listen: %w", err)
}
if err := c.consumeStream(ctx, stream); err != nil {
return fmt.Errorf("consume stream: %w", err)
}
return nil
}
func (c *zmqCommand) consumeStream(
ctx context.Context, stream *zmq.Stream,
) error {
for {
var tx interface{}
select {
case err := <-stream.ErrC:
return err
case <-ctx.Done():
return ctx.Err()
case tx = <-stream.FullChainMainC:
case tx = <-stream.FullTxPoolAddC:
case tx = <-stream.MinimalChainMainC:
case tx = <-stream.MinimalTxPoolAddC:
}
if tx != nil {
if err := display.JSON(tx); err != nil {
return fmt.Errorf("display json: %w", err)
}
}
}
}
func init() {
RootCommand.AddCommand((&zmqCommand{}).Cmd())
}