This repository has been archived on 2024-04-07. You can view files and clone it, but cannot push or open issues or pull requests.
go-monero/pkg/zmq/zmq.go
Ciro S. Costa ac58bd1fd1 add zmq support
here i add `pkg/zmq`, a package aimed at providing one with the ability
of subscribing to zmq topics that the monero daemon published messages
to.

as exaplined under [1], there are four topics that one can subscribe to:

	-  json-minimal-txpool_add
	-  json-full-txpool_add
	-  json-minimal-chain_main
	-  json-full-chain_main

in the implementation provided here, one goes about listening to these
by:

	1. creating a client aiming at a topic
	2. telling the client to listen
	3. consuming typed objects from a "stream" object

e.g.:

	client := zmq.NewClient(endpoint, zmq.TopicMinimalTxPoolAdd)
	defer client.Close()

	stream, _ := client.Listen(ctx)
	for {
		select {
		case err := <-stream.ErrC:
			panic(err)
		case tx := <-stream.MinimalTxPoolAddC:
			fmt.Println(tx)
		}
	}

CLI users can also make use of it via `monero daemon zmq`:

	$ monero daemon zmq \
		--topic json-minimal-chain_main  \
		--endpoint tcp://127.0.0.1:18085

[1]: https://github.com/monero-project/monero/blob/master/docs/ZMQ.md

Signed-off-by: Ciro S. Costa <utxobr@protonmail.com>
2021-07-28 07:37:23 -04:00

225 lines
5.1 KiB
Go

package zmq
import (
"bytes"
"context"
"encoding/json"
"fmt"
"github.com/go-zeromq/zmq4"
)
type Client struct {
endpoint string
topic Topic
sub zmq4.Socket
}
// NewClient instantiates a new client that will receive monerod's zmq events.
//
// - `topic` is a fully-formed zmq topic to subscribe to
//
// - `endpoint` is the full address where monerod has been configured to
// publish the messages to, including the network schama. for instance,
// considering that monerod has been started with
//
// monerod --zmq-pub tcp://127.0.0.1:18085
//
// `endpoint` should be 'tcp://127.0.0.1:18085'.
//
//
func NewClient(endpoint string, topic Topic) *Client {
return &Client{
endpoint: endpoint,
topic: topic,
}
}
// Stream provides channels where instances of the desired topic object are
// sent to.
//
type Stream struct {
ErrC chan error
FullChainMainC chan *FullChainMain
FullTxPoolAddC chan *FullTxPoolAdd
MinimalChainMainC chan *MinimalChainMain
MinimalTxPoolAddC chan *MinimalTxPoolAdd
}
// Listen listens for a topic pre-configured for this client (via NewClient).
//
// Clients listen to a single topic at a time - to listen to multiple topics,
// create new clients and listen on the corresponding stream's channel.
//
func (c *Client) Listen(ctx context.Context) (*Stream, error) {
if err := c.listen(ctx, c.topic); err != nil {
return nil, fmt.Errorf("listen on '%s': %w", c.topic, err)
}
stream := &Stream{
ErrC: make(chan error),
FullChainMainC: make(chan *FullChainMain),
FullTxPoolAddC: make(chan *FullTxPoolAdd),
MinimalChainMainC: make(chan *MinimalChainMain),
MinimalTxPoolAddC: make(chan *MinimalTxPoolAdd),
}
go func() {
if err := c.loop(stream); err != nil {
stream.ErrC <- fmt.Errorf("loop: %w", err)
}
close(stream.ErrC)
close(stream.FullChainMainC)
close(stream.FullTxPoolAddC)
close(stream.MinimalChainMainC)
close(stream.MinimalTxPoolAddC)
}()
return stream, nil
}
// Close closes any established connection, if any.
//
func (c *Client) Close() error {
if c.sub == nil {
return nil
}
return c.sub.Close()
}
func (c *Client) listen(ctx context.Context, topic Topic) error {
c.sub = zmq4.NewSub(ctx)
err := c.sub.Dial(c.endpoint)
if err != nil {
return fmt.Errorf("dial '%s': %w", c.endpoint, err)
}
err = c.sub.SetOption(zmq4.OptionSubscribe, string(topic))
if err != nil {
return fmt.Errorf("subscribe: %w", err)
}
return nil
}
func (c *Client) loop(stream *Stream) error {
for {
msg, err := c.sub.Recv()
if err != nil {
return fmt.Errorf("recv: %w", err)
}
for _, frame := range msg.Frames {
err := c.ingestFrameArray(stream, frame)
if err != nil {
return fmt.Errorf("consume frame: %w", err)
}
}
}
}
func (c *Client) ingestFrameArray(stream *Stream, frame []byte) error {
topic, gson, err := jsonFromFrame(frame)
if err != nil {
return fmt.Errorf("json from frame: %w", err)
}
if c.topic != topic {
return fmt.Errorf("topic '%s' doesn't match "+
"expected '%s'", topic, c.topic)
}
switch c.topic {
case TopicFullChainMain:
return c.transmitFullChainMain(stream, gson)
case TopicFullTxPoolAdd:
return c.transmitFullTxPoolAdd(stream, gson)
case TopicMinimalChainMain:
return c.transmitMinimalChainMain(stream, gson)
case TopicMinimalTxPoolAdd:
return c.transmitMinimalTxPoolAdd(stream, gson)
default:
return fmt.Errorf("unhandled topic '%s'", topic)
}
}
func (c *Client) transmitFullChainMain(stream *Stream, gson []byte) error {
arr := []*FullChainMain{}
if err := json.Unmarshal(gson, &arr); err != nil {
return fmt.Errorf("unmarshal: %w", err)
}
for _, element := range arr {
stream.FullChainMainC <- element
}
return nil
}
func (c *Client) transmitFullTxPoolAdd(stream *Stream, gson []byte) error {
arr := []*FullTxPoolAdd{}
if err := json.Unmarshal(gson, &arr); err != nil {
return fmt.Errorf("unmarshal: %w", err)
}
for _, element := range arr {
stream.FullTxPoolAddC <- element
}
return nil
}
func (c *Client) transmitMinimalChainMain(stream *Stream, gson []byte) error {
element := &MinimalChainMain{}
if err := json.Unmarshal(gson, element); err != nil {
return fmt.Errorf("unmarshal: %w", err)
}
stream.MinimalChainMainC <- element
return nil
}
func (c *Client) transmitMinimalTxPoolAdd(stream *Stream, gson []byte) error {
arr := []*MinimalTxPoolAdd{}
if err := json.Unmarshal(gson, &arr); err != nil {
return fmt.Errorf("unmarshal: %w", err)
}
for _, element := range arr {
stream.MinimalTxPoolAddC <- element
}
return nil
}
func jsonFromFrame(frame []byte) (Topic, []byte, error) {
unknown := TopicUnknown
parts := bytes.SplitN(frame, []byte(":"), 2)
if len(parts) != 2 {
return unknown, nil, fmt.Errorf(
"malformed: expected 2 parts, got %d", len(parts))
}
topic, gson := string(parts[0]), parts[1]
switch topic {
case string(TopicMinimalChainMain):
return TopicMinimalChainMain, gson, nil
case string(TopicFullChainMain):
return TopicFullChainMain, gson, nil
case string(TopicFullTxPoolAdd):
return TopicFullTxPoolAdd, gson, nil
case string(TopicMinimalTxPoolAdd):
return TopicMinimalTxPoolAdd, gson, nil
}
return unknown, nil, fmt.Errorf("unknown topic '%s'", topic)
}