add zmq support

here i add `pkg/zmq`, a package aimed at providing one with the ability
of subscribing to zmq topics that the monero daemon published messages
to.

as exaplined under [1], there are four topics that one can subscribe to:

	-  json-minimal-txpool_add
	-  json-full-txpool_add
	-  json-minimal-chain_main
	-  json-full-chain_main

in the implementation provided here, one goes about listening to these
by:

	1. creating a client aiming at a topic
	2. telling the client to listen
	3. consuming typed objects from a "stream" object

e.g.:

	client := zmq.NewClient(endpoint, zmq.TopicMinimalTxPoolAdd)
	defer client.Close()

	stream, _ := client.Listen(ctx)
	for {
		select {
		case err := <-stream.ErrC:
			panic(err)
		case tx := <-stream.MinimalTxPoolAddC:
			fmt.Println(tx)
		}
	}

CLI users can also make use of it via `monero daemon zmq`:

	$ monero daemon zmq \
		--topic json-minimal-chain_main  \
		--endpoint tcp://127.0.0.1:18085

[1]: https://github.com/monero-project/monero/blob/master/docs/ZMQ.md

Signed-off-by: Ciro S. Costa <utxobr@protonmail.com>
This commit is contained in:
Ciro S. Costa 2021-07-27 07:10:25 -04:00
parent 2eb47cf90f
commit ac58bd1fd1
8 changed files with 525 additions and 8 deletions

View file

@ -3,10 +3,9 @@
[![GoDoc](https://img.shields.io/static/v1?label=godoc&message=reference&color=blue)](https://pkg.go.dev/github.com/cirocosta/go-monero)
A Go library (and CLI) for interacting with Monero's daemon via RPC or the P2P
network, free of CGO, either on clearnet or not.
Support for `monero-wallet-rpc` coming soon.
A multi-platform [Go] library (and command line interface) for interacting with
[Monero] servers either on clearnet or not (see [Tor support]), supporting for
daemon and wallet RPC, p2p commands and ZeroMQ.
## Quick start
@ -23,9 +22,7 @@ $ GO111MODULE=on go get github.com/cirocosta/go-monero/cmd/monero
```
or fetching the binary for your distribution from the [releases page]. See
[INSTALL.md](./INSTALL.md) for details and examples.
[releases page]: https://github.com/cirocosta/go-monero/releases
[INSTALL.md] for details and examples.
### Example
@ -103,6 +100,7 @@ Available Commands:
get-block full block information by either block height or hash
get-block-count look up how many blocks are in the longest chain known to the node
get-block-header retrieve block(s) header(s) by hash
get-block-headers-range retrieve a range of block headers
get-block-template generate a block template for mining a new block
get-coinbase-tx-sum compute the coinbase amount and the fees amount for n last blocks starting at particular height
get-connections information about incoming and outgoing connections.
@ -119,17 +117,22 @@ Available Commands:
get-transaction-pool-stats statistics about the transaction pool
get-version version of the monero daemon
hardfork-info information regarding hard fork voting and readiness.
mining-status information about this daemon's mining activity
on-get-block-hash find out block's hash by height
relay-tx relay a list of transaction ids
rpc-access-tracking statistics about rpc access
set-bans ban another nodes
start-mining start mining on the daemon
stop-mining stop mining on the daemon
sync-info daemon's chain synchronization info
zmq listen for zmq notifications
Flags:
-a, --address string full address of the monero node to reach out to (default "http://localhost:18081")
-a, --address string full address of the monero node to reach out to [MONERO_ADDRESS] (default "http://localhost:18081")
-h, --help help for daemon
-p, --password string password to supply for rpc auth
--request-timeout duration max wait time until considering the request a failure (default 1m0s)
--shorten-addresses whether addresses should be shortened when displaying pretty results (default true)
--tls-ca-cert string certificate authority to load
--tls-client-cert string tls client certificate to use when connecting
--tls-client-key string tls client key to use when connecting
@ -305,3 +308,11 @@ Big thanks to the Monero community and other projects around cryptonote:
![xmr address](./assets/donate.png)
891B5keCnwXN14hA9FoAzGFtaWmcuLjTDT5aRTp65juBLkbNpEhLNfgcBn6aWdGuBqBnSThqMPsGRjWVQadCrhoAT6CnSL3
[CGO]: https://pkg.go.dev/cmd/cgo
[Go]: https://go.dev
[INSTALL.md]: ./INSTALL.md
[Monero]: https://getmonero.org/
[releases page]: https://github.com/cirocosta/go-monero/releases
[Tor support]: #tor-support

View file

@ -0,0 +1,105 @@
package daemon
import (
"context"
"fmt"
"strings"
"github.com/spf13/cobra"
"github.com/cirocosta/go-monero/cmd/monero/display"
"github.com/cirocosta/go-monero/pkg/zmq"
)
type zmqCommand struct {
JSON bool
topic string
endpoint string
}
var zmqTopics = []string{
string(zmq.TopicMinimalTxPoolAdd),
string(zmq.TopicFullTxPoolAdd),
string(zmq.TopicMinimalChainMain),
string(zmq.TopicFullChainMain),
}
func (c *zmqCommand) Cmd() *cobra.Command {
var topicChoicesTxt = fmt.Sprintf("(%s)", strings.Join(zmqTopics, ","))
cmd := &cobra.Command{
Use: "zmq",
Short: "listen for zmq notifications",
RunE: c.RunE,
}
cmd.Flags().BoolVar(&c.JSON, "json",
false, "whether or not to output the result as json")
cmd.Flags().StringVar(&c.topic, "topic",
"json-minimal-txpool_add", "zmq topic to subscribe to "+
topicChoicesTxt)
_ = cmd.MarkFlagRequired("topic")
_ = cmd.RegisterFlagCompletionFunc("topic", c.topicCompletion)
cmd.Flags().StringVar(&c.endpoint, "endpoint",
"", "zero-mq endpoint to listen for publications")
_ = cmd.MarkFlagRequired("endpoint")
return cmd
}
func (c *zmqCommand) topicCompletion(
cmd *cobra.Command, args []string, toComplete string,
) ([]string, cobra.ShellCompDirective) {
return zmqTopics, cobra.ShellCompDirectiveDefault
}
func (c *zmqCommand) RunE(_ *cobra.Command, _ []string) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
client := zmq.NewClient(c.endpoint, zmq.Topic(c.topic))
defer client.Close()
stream, err := client.Listen(ctx)
if err != nil {
return fmt.Errorf("listen: %w", err)
}
if err := c.consumeStream(ctx, stream); err != nil {
return fmt.Errorf("consume stream: %w", err)
}
return nil
}
func (c *zmqCommand) consumeStream(
ctx context.Context, stream *zmq.Stream,
) error {
for {
var tx interface{}
select {
case err := <-stream.ErrC:
return err
case <-ctx.Done():
return ctx.Err()
case tx = <-stream.FullChainMainC:
case tx = <-stream.FullTxPoolAddC:
case tx = <-stream.MinimalChainMainC:
case tx = <-stream.MinimalTxPoolAddC:
}
if tx != nil {
if err := display.JSON(tx); err != nil {
return fmt.Errorf("display json: %w", err)
}
}
}
}
func init() {
RootCommand.AddCommand((&zmqCommand{}).Cmd())
}

1
go.mod
View file

@ -4,6 +4,7 @@ go 1.16
require (
github.com/dustin/go-humanize v1.0.0
github.com/go-zeromq/zmq4 v0.13.0
github.com/golangci/golangci-lint v1.41.1
github.com/gosuri/uitable v0.0.4
github.com/mattn/go-isatty v0.0.13 // indirect

5
go.sum
View file

@ -199,6 +199,10 @@ github.com/go-toolsmith/typep v1.0.2 h1:8xdsa1+FSIH/RhEkgnD1j2CJOy5mNllW1Q9tRiYw
github.com/go-toolsmith/typep v1.0.2/go.mod h1:JSQCQMUPdRlMZFswiq3TGpNp1GMktqkR2Ns5AIQkATU=
github.com/go-xmlfmt/xmlfmt v0.0.0-20191208150333-d5b6f63a941b h1:khEcpUM4yFcxg4/FHQWkvVRmgijNXRfzkIDHh23ggEo=
github.com/go-xmlfmt/xmlfmt v0.0.0-20191208150333-d5b6f63a941b/go.mod h1:aUCEOzzezBEjDBbFBoSiya/gduyIiWYRP6CnSFIV8AM=
github.com/go-zeromq/goczmq/v4 v4.2.2 h1:HAJN+i+3NW55ijMJJhk7oWxHKXgAuSBkoFfvr8bYj4U=
github.com/go-zeromq/goczmq/v4 v4.2.2/go.mod h1:Sm/lxrfxP/Oxqs0tnHD6WAhwkWrx+S+1MRrKzcxoaYE=
github.com/go-zeromq/zmq4 v0.13.0 h1:XUWXLyeRsPsv4KlKMXnv/cEm//Vew2RLuNmDFQnZQXU=
github.com/go-zeromq/zmq4 v0.13.0/go.mod h1:TrFwdPHMSLG7Rhp8OVhQBkb4bSajfucWv8rwoEFIgSY=
github.com/gobwas/glob v0.2.3 h1:A4xDbljILXROh+kObIiy5kIaPYD8e96x1tgBhUI5J+Y=
github.com/gobwas/glob v0.2.3/go.mod h1:d3Ez4x06l9bZtSvzIay5+Yzi0fmZzPgnTbPcKjJAkT8=
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
@ -853,6 +857,7 @@ golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=

104
pkg/zmq/types.go Normal file
View file

@ -0,0 +1,104 @@
package zmq
type Topic string
const (
TopicUnknown Topic = "unknown"
TopicMinimalTxPoolAdd Topic = "json-minimal-txpool_add"
TopicFullTxPoolAdd Topic = "json-full-txpool_add"
TopicMinimalChainMain Topic = "json-minimal-chain_main"
TopicFullChainMain Topic = "json-full-chain_main"
)
type MinimalChainMain struct {
FirstHeight uint64 `json:"first_height"`
FirstPrevID string `json:"first_prev_id"`
Ids []string `json:"ids"`
}
type FullChainMain struct {
MajorVersion int `json:"major_version"`
MinorVersion int `json:"minor_version"`
Timestamp int64 `json:"timestamp"`
PrevID string `json:"prev_id"`
Nonce uint64 `json:"nonce"`
MinerTx struct {
Version int `json:"version"`
UnlockTime int64 `json:"unlock_time"`
Inputs []struct {
Gen struct {
Height uint64 `json:"height"`
} `json:"gen"`
} `json:"inputs"`
Outputs []struct {
Amount uint64 `json:"amount"`
ToKey struct {
Key string `json:"key"`
} `json:"to_key"`
} `json:"outputs"`
Extra string `json:"extra"`
Signatures []interface{} `json:"signatures"`
Ringct struct {
Type int `json:"type"`
Encrypted []interface{} `json:"encrypted"`
Commitments []interface{} `json:"commitments"`
Fee uint64 `json:"fee"`
} `json:"ringct"`
} `json:"miner_tx"`
TxHashes []string `json:"tx_hashes"`
}
type MinimalTxPoolAdd struct {
ID string `json:"id"`
BlobSize uint64 `json:"blob_size"`
}
type FullTxPoolAdd struct {
Version int `json:"version"`
UnlockTime int64 `json:"unlock_time"`
Inputs []struct {
ToKey struct {
Amount uint64 `json:"amount"`
KeyOffsets []uint64 `json:"key_offsets"`
KeyImage string `json:"key_image"`
} `json:"to_key"`
} `json:"inputs"`
Outputs []struct {
Amount int `json:"amount"`
ToKey struct {
Key string `json:"key"`
} `json:"to_key"`
} `json:"outputs"`
Extra string `json:"extra"`
Signatures []interface{} `json:"signatures"`
Ringct struct {
Type int `json:"type"`
Encrypted []struct {
Mask string `json:"mask"`
Amount string `json:"amount"`
} `json:"encrypted"`
Commitments []string `json:"commitments"`
Fee int `json:"fee"`
Prunable struct {
RangeProofs []interface{} `json:"range_proofs"`
Bulletproofs []struct {
V []string `json:"V"`
AUpper string `json:"A"`
S string `json:"S"`
T1 string `json:"T1"`
T2 string `json:"T2"`
Taux string `json:"taux"`
Mu string `json:"mu"`
L []string `json:"L"`
R []string `json:"R"`
ALower string `json:"a"`
B string `json:"b"`
T string `json:"t"`
} `json:"bulletproofs"`
Mlsags []interface{} `json:"mlsags"`
PseudoOuts []string `json:"pseudo_outs"`
} `json:"prunable"`
} `json:"ringct"`
}

224
pkg/zmq/zmq.go Normal file
View file

@ -0,0 +1,224 @@
package zmq
import (
"bytes"
"context"
"encoding/json"
"fmt"
"github.com/go-zeromq/zmq4"
)
type Client struct {
endpoint string
topic Topic
sub zmq4.Socket
}
// NewClient instantiates a new client that will receive monerod's zmq events.
//
// - `topic` is a fully-formed zmq topic to subscribe to
//
// - `endpoint` is the full address where monerod has been configured to
// publish the messages to, including the network schama. for instance,
// considering that monerod has been started with
//
// monerod --zmq-pub tcp://127.0.0.1:18085
//
// `endpoint` should be 'tcp://127.0.0.1:18085'.
//
//
func NewClient(endpoint string, topic Topic) *Client {
return &Client{
endpoint: endpoint,
topic: topic,
}
}
// Stream provides channels where instances of the desired topic object are
// sent to.
//
type Stream struct {
ErrC chan error
FullChainMainC chan *FullChainMain
FullTxPoolAddC chan *FullTxPoolAdd
MinimalChainMainC chan *MinimalChainMain
MinimalTxPoolAddC chan *MinimalTxPoolAdd
}
// Listen listens for a topic pre-configured for this client (via NewClient).
//
// Clients listen to a single topic at a time - to listen to multiple topics,
// create new clients and listen on the corresponding stream's channel.
//
func (c *Client) Listen(ctx context.Context) (*Stream, error) {
if err := c.listen(ctx, c.topic); err != nil {
return nil, fmt.Errorf("listen on '%s': %w", c.topic, err)
}
stream := &Stream{
ErrC: make(chan error),
FullChainMainC: make(chan *FullChainMain),
FullTxPoolAddC: make(chan *FullTxPoolAdd),
MinimalChainMainC: make(chan *MinimalChainMain),
MinimalTxPoolAddC: make(chan *MinimalTxPoolAdd),
}
go func() {
if err := c.loop(stream); err != nil {
stream.ErrC <- fmt.Errorf("loop: %w", err)
}
close(stream.ErrC)
close(stream.FullChainMainC)
close(stream.FullTxPoolAddC)
close(stream.MinimalChainMainC)
close(stream.MinimalTxPoolAddC)
}()
return stream, nil
}
// Close closes any established connection, if any.
//
func (c *Client) Close() error {
if c.sub == nil {
return nil
}
return c.sub.Close()
}
func (c *Client) listen(ctx context.Context, topic Topic) error {
c.sub = zmq4.NewSub(ctx)
err := c.sub.Dial(c.endpoint)
if err != nil {
return fmt.Errorf("dial '%s': %w", c.endpoint, err)
}
err = c.sub.SetOption(zmq4.OptionSubscribe, string(topic))
if err != nil {
return fmt.Errorf("subscribe: %w", err)
}
return nil
}
func (c *Client) loop(stream *Stream) error {
for {
msg, err := c.sub.Recv()
if err != nil {
return fmt.Errorf("recv: %w", err)
}
for _, frame := range msg.Frames {
err := c.ingestFrameArray(stream, frame)
if err != nil {
return fmt.Errorf("consume frame: %w", err)
}
}
}
}
func (c *Client) ingestFrameArray(stream *Stream, frame []byte) error {
topic, gson, err := jsonFromFrame(frame)
if err != nil {
return fmt.Errorf("json from frame: %w", err)
}
if c.topic != topic {
return fmt.Errorf("topic '%s' doesn't match "+
"expected '%s'", topic, c.topic)
}
switch c.topic {
case TopicFullChainMain:
return c.transmitFullChainMain(stream, gson)
case TopicFullTxPoolAdd:
return c.transmitFullTxPoolAdd(stream, gson)
case TopicMinimalChainMain:
return c.transmitMinimalChainMain(stream, gson)
case TopicMinimalTxPoolAdd:
return c.transmitMinimalTxPoolAdd(stream, gson)
default:
return fmt.Errorf("unhandled topic '%s'", topic)
}
}
func (c *Client) transmitFullChainMain(stream *Stream, gson []byte) error {
arr := []*FullChainMain{}
if err := json.Unmarshal(gson, &arr); err != nil {
return fmt.Errorf("unmarshal: %w", err)
}
for _, element := range arr {
stream.FullChainMainC <- element
}
return nil
}
func (c *Client) transmitFullTxPoolAdd(stream *Stream, gson []byte) error {
arr := []*FullTxPoolAdd{}
if err := json.Unmarshal(gson, &arr); err != nil {
return fmt.Errorf("unmarshal: %w", err)
}
for _, element := range arr {
stream.FullTxPoolAddC <- element
}
return nil
}
func (c *Client) transmitMinimalChainMain(stream *Stream, gson []byte) error {
element := &MinimalChainMain{}
if err := json.Unmarshal(gson, element); err != nil {
return fmt.Errorf("unmarshal: %w", err)
}
stream.MinimalChainMainC <- element
return nil
}
func (c *Client) transmitMinimalTxPoolAdd(stream *Stream, gson []byte) error {
arr := []*MinimalTxPoolAdd{}
if err := json.Unmarshal(gson, &arr); err != nil {
return fmt.Errorf("unmarshal: %w", err)
}
for _, element := range arr {
stream.MinimalTxPoolAddC <- element
}
return nil
}
func jsonFromFrame(frame []byte) (Topic, []byte, error) {
unknown := TopicUnknown
parts := bytes.SplitN(frame, []byte(":"), 2)
if len(parts) != 2 {
return unknown, nil, fmt.Errorf(
"malformed: expected 2 parts, got %d", len(parts))
}
topic, gson := string(parts[0]), parts[1]
switch topic {
case string(TopicMinimalChainMain):
return TopicMinimalChainMain, gson, nil
case string(TopicFullChainMain):
return TopicFullChainMain, gson, nil
case string(TopicFullTxPoolAdd):
return TopicFullTxPoolAdd, gson, nil
case string(TopicMinimalTxPoolAdd):
return TopicMinimalTxPoolAdd, gson, nil
}
return unknown, nil, fmt.Errorf("unknown topic '%s'", topic)
}

View file

@ -0,0 +1,3 @@
package zmq
var JSONFromFrame = jsonFromFrame

64
pkg/zmq/zmq_test.go Normal file
View file

@ -0,0 +1,64 @@
package zmq_test
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/cirocosta/go-monero/pkg/zmq"
)
func TestJSONFromFrame(t *testing.T) {
t.Parallel()
for _, tc := range []struct {
name string
input []byte
expectedJSON []byte
expectedTopic zmq.Topic
err string
}{
{
name: "nil",
input: nil,
err: "malformed",
},
{
name: "empty",
input: []byte{},
err: "malformed",
},
{
name: "unknown-topic",
input: []byte(`foobar:[{"foo":"bar"}]`),
err: "unknown topic",
},
{
name: "proper w/ known-topic",
input: []byte(`json-minimal-txpool_add:[{"foo":"bar"}]`),
expectedTopic: zmq.TopicMinimalTxPoolAdd,
expectedJSON: []byte(`[{"foo":"bar"}]`),
},
} {
tc := tc
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
aTopic, aJSON, err := zmq.JSONFromFrame(tc.input)
if tc.err != "" {
require.Error(t, err)
assert.Contains(t, err.Error(), tc.err)
return
}
assert.NoError(t, err)
assert.Equal(t, tc.expectedTopic, aTopic)
assert.Equal(t, tc.expectedJSON, aJSON)
})
}
}