all: drop x/xerrors in favor of fmt+errors

This commit is contained in:
Sebastien Binet 2020-10-21 13:53:21 +02:00
parent 361b05fb45
commit 2e2a862550
30 changed files with 287 additions and 299 deletions

46
conn.go
View file

@ -7,16 +7,16 @@ package zmq4
import (
"bytes"
"encoding/binary"
"errors"
"fmt"
"io"
"net"
"strings"
"sync"
"sync/atomic"
"golang.org/x/xerrors"
)
var ErrClosedConn = xerrors.New("zmq4: read/write on closed connection")
var ErrClosedConn = errors.New("zmq4: read/write on closed connection")
// Conn implements the ZeroMQ Message Transport Protocol as defined
// in https://rfc.zeromq.org/spec:23/ZMTP/.
@ -66,11 +66,11 @@ func (c *Conn) Write(p []byte) (int, error) {
// Open performs a complete ZMTP handshake.
func Open(rw net.Conn, sec Security, sockType SocketType, sockID SocketIdentity, server bool, onCloseErrorCB func(c *Conn)) (*Conn, error) {
if rw == nil {
return nil, xerrors.Errorf("zmq4: invalid nil read-writer")
return nil, fmt.Errorf("zmq4: invalid nil read-writer")
}
if sec == nil {
return nil, xerrors.Errorf("zmq4: invalid nil security")
return nil, fmt.Errorf("zmq4: invalid nil security")
}
conn := &Conn{
@ -89,7 +89,7 @@ func Open(rw net.Conn, sec Security, sockType SocketType, sockID SocketIdentity,
err := conn.init(sec)
if err != nil {
return nil, xerrors.Errorf("zmq4: could not initialize ZMTP connection: %w", err)
return nil, fmt.Errorf("zmq4: could not initialize ZMTP connection: %w", err)
}
return conn, nil
@ -101,17 +101,17 @@ func (conn *Conn) init(sec Security) error {
err = conn.greet(conn.Server)
if err != nil {
return xerrors.Errorf("zmq4: could not exchange greetings: %w", err)
return fmt.Errorf("zmq4: could not exchange greetings: %w", err)
}
err = conn.sec.Handshake(conn, conn.Server)
if err != nil {
return xerrors.Errorf("zmq4: could not perform security handshake: %w", err)
return fmt.Errorf("zmq4: could not perform security handshake: %w", err)
}
peer := SocketType(conn.Peer.Meta[sysSockType])
if !peer.IsCompatible(conn.typ) {
return xerrors.Errorf("zmq4: peer=%q not compatible with %q", peer, conn.typ)
return fmt.Errorf("zmq4: peer=%q not compatible with %q", peer, conn.typ)
}
// FIXME(sbinet): if security mechanism does not define a client/server
@ -136,14 +136,14 @@ func (conn *Conn) greet(server bool) error {
err = send.write(conn.rw)
if err != nil {
conn.checkIO(err)
return xerrors.Errorf("zmq4: could not send greeting: %w", err)
return fmt.Errorf("zmq4: could not send greeting: %w", err)
}
var recv greeting
err = recv.read(conn.rw)
if err != nil {
conn.checkIO(err)
return xerrors.Errorf("zmq4: could not recv greeting: %w", err)
return fmt.Errorf("zmq4: could not recv greeting: %w", err)
}
peerKind := asString(recv.Mechanism[:])
@ -153,7 +153,7 @@ func (conn *Conn) greet(server bool) error {
conn.Peer.Server, err = asBool(recv.Server)
if err != nil {
return xerrors.Errorf("zmq4: could not get peer server flag: %w", err)
return fmt.Errorf("zmq4: could not get peer server flag: %w", err)
}
return nil
@ -189,7 +189,7 @@ func (c *Conn) SendMsg(msg Msg) error {
}
err := c.send(false, frame, flag)
if err != nil {
return xerrors.Errorf("zmq4: error sending frame %d/%d: %w", i+1, nframes, err)
return fmt.Errorf("zmq4: error sending frame %d/%d: %w", i+1, nframes, err)
}
}
return nil
@ -202,7 +202,7 @@ func (c *Conn) RecvMsg() (Msg, error) {
}
msg := c.read()
if msg.err != nil {
return msg, xerrors.Errorf("zmq4: could not read recv msg: %w", msg.err)
return msg, fmt.Errorf("zmq4: could not read recv msg: %w", msg.err)
}
if !msg.isCmd() {
@ -211,19 +211,19 @@ func (c *Conn) RecvMsg() (Msg, error) {
switch len(msg.Frames) {
case 0:
msg.err = xerrors.Errorf("zmq4: empty command")
msg.err = fmt.Errorf("zmq4: empty command")
return msg, msg.err
case 1:
// ok
default:
msg.err = xerrors.Errorf("zmq4: invalid length command")
msg.err = fmt.Errorf("zmq4: invalid length command")
return msg, msg.err
}
var cmd Cmd
msg.err = cmd.unmarshalZMTP(msg.Frames[0])
if msg.err != nil {
return msg, xerrors.Errorf("zmq4: could not unmarshal ZMTP recv msg: %w", msg.err)
return msg, fmt.Errorf("zmq4: could not unmarshal ZMTP recv msg: %w", msg.err)
}
switch cmd.Name {
@ -254,7 +254,7 @@ func (c *Conn) RecvCmd() (Cmd, error) {
msg := c.read()
if msg.err != nil {
return cmd, xerrors.Errorf("zmq4: could not read recv cmd: %w", msg.err)
return cmd, fmt.Errorf("zmq4: could not read recv cmd: %w", msg.err)
}
if !msg.isCmd() {
@ -263,18 +263,18 @@ func (c *Conn) RecvCmd() (Cmd, error) {
switch len(msg.Frames) {
case 0:
msg.err = xerrors.Errorf("zmq4: empty command")
msg.err = fmt.Errorf("zmq4: empty command")
return cmd, msg.err
case 1:
// ok
default:
msg.err = xerrors.Errorf("zmq4: invalid length command")
msg.err = fmt.Errorf("zmq4: invalid length command")
return cmd, msg.err
}
err := cmd.unmarshalZMTP(msg.Frames[0])
if err != nil {
return cmd, xerrors.Errorf("zmq4: could not unmarshal ZMTP recv cmd: %w", err)
return cmd, fmt.Errorf("zmq4: could not unmarshal ZMTP recv cmd: %w", err)
}
return cmd, nil
@ -482,13 +482,13 @@ func (conn *Conn) checkIO(err error) {
return
}
if err == io.EOF || xerrors.Is(err, io.EOF) {
if err == io.EOF || errors.Is(err, io.EOF) {
conn.SetClosed()
return
}
var e net.Error
if xerrors.As(err, &e); e != nil && !e.Timeout() {
if errors.As(err, &e); e != nil && !e.Timeout() {
conn.SetClosed()
}
}

1
go.mod
View file

@ -5,5 +5,4 @@ go 1.13
require (
github.com/go-zeromq/goczmq/v4 v4.2.2
golang.org/x/sync v0.0.0-20190423024810-112230192c58
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7
)

2
go.sum
View file

@ -2,5 +2,3 @@ github.com/go-zeromq/goczmq/v4 v4.2.2 h1:HAJN+i+3NW55ijMJJhk7oWxHKXgAuSBkoFfvr8b
github.com/go-zeromq/goczmq/v4 v4.2.2/go.mod h1:Sm/lxrfxP/Oxqs0tnHD6WAhwkWrx+S+1MRrKzcxoaYE=
golang.org/x/sync v0.0.0-20190423024810-112230192c58 h1:8gQV6CLnAEikrhgkHFbMAEhagSSnXWGV915qUMm9mrU=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7 h1:9zdDQZ7Thm29KFXgAX/+yaf3eVbP7djjWp/dXAppNCc=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=

View file

@ -10,18 +10,18 @@
package inproc
import (
"errors"
"fmt"
"net"
"strings"
"sync"
"golang.org/x/xerrors"
)
var (
mgr = context{db: make(map[string]*Listener)}
ErrClosed = xerrors.New("inproc: connection closed")
ErrConnRefused = xerrors.New("inproc: connection refused")
ErrClosed = errors.New("inproc: connection closed")
ErrConnRefused = errors.New("inproc: connection refused")
)
func init() {
@ -96,7 +96,7 @@ func Listen(addr string) (*Listener, error) {
_, dup := mgr.db[addr]
if dup {
mgr.mu.Unlock()
return nil, xerrors.Errorf("inproc: address %q already in use", addr)
return nil, fmt.Errorf("inproc: address %q already in use", addr)
}
l := &Listener{

View file

@ -6,13 +6,13 @@ package inproc
import (
"bytes"
"fmt"
"io"
"math/rand"
"reflect"
"testing"
"golang.org/x/sync/errgroup"
"golang.org/x/xerrors"
)
func TestBasicIO(t *testing.T) {
@ -84,7 +84,7 @@ func TestRW(t *testing.T) {
grp.Go(func() error {
conn, err := lst.Accept()
if err != nil {
return xerrors.Errorf("could not accept connection: %w", err)
return fmt.Errorf("could not accept connection: %w", err)
}
defer conn.Close()
@ -101,26 +101,26 @@ func TestRW(t *testing.T) {
raw := make([]byte, len("HELLO"))
_, err = io.ReadFull(conn, raw)
if err != nil {
return xerrors.Errorf("could not read request: %w", err)
return fmt.Errorf("could not read request: %w", err)
}
if got, want := raw, []byte("HELLO"); !reflect.DeepEqual(got, want) {
return xerrors.Errorf("invalid request: got=%v, want=%v", got, want)
return fmt.Errorf("invalid request: got=%v, want=%v", got, want)
}
_, err = conn.Write([]byte("HELLO"))
if err != nil {
return xerrors.Errorf("could not write reply: %w", err)
return fmt.Errorf("could not write reply: %w", err)
}
raw = make([]byte, len("QUIT"))
_, err = io.ReadFull(conn, raw)
if err != nil {
return xerrors.Errorf("could not read final request: %w", err)
return fmt.Errorf("could not read final request: %w", err)
}
if got, want := raw, []byte("QUIT"); !reflect.DeepEqual(got, want) {
return xerrors.Errorf("invalid request: got=%v, want=%v", got, want)
return fmt.Errorf("invalid request: got=%v, want=%v", got, want)
}
return nil
@ -129,7 +129,7 @@ func TestRW(t *testing.T) {
grp.Go(func() error {
conn, err := Dial("inproc://rw-srv")
if err != nil {
return xerrors.Errorf("could not dial server: %w", err)
return fmt.Errorf("could not dial server: %w", err)
}
defer conn.Close()
@ -145,22 +145,22 @@ func TestRW(t *testing.T) {
_, err = conn.Write([]byte("HELLO"))
if err != nil {
return xerrors.Errorf("could not send request: %w", err)
return fmt.Errorf("could not send request: %w", err)
}
raw := make([]byte, len("HELLO"))
_, err = io.ReadFull(conn, raw)
if err != nil {
return xerrors.Errorf("could not read reply: %w", err)
return fmt.Errorf("could not read reply: %w", err)
}
if got, want := raw, []byte("HELLO"); !reflect.DeepEqual(got, want) {
return xerrors.Errorf("invalid reply: got=%v, want=%v", got, want)
return fmt.Errorf("invalid reply: got=%v, want=%v", got, want)
}
_, err = conn.Write([]byte("QUIT"))
if err != nil {
return xerrors.Errorf("could not write final request: %w", err)
return fmt.Errorf("could not write final request: %w", err)
}
return nil

View file

@ -7,22 +7,22 @@ package zmq4
import (
"bytes"
"encoding/binary"
"errors"
"fmt"
"io"
"strings"
"golang.org/x/xerrors"
)
var (
errGreeting = xerrors.New("zmq4: invalid greeting received")
errSecMech = xerrors.New("zmq4: invalid security mechanism")
errBadSec = xerrors.New("zmq4: invalid or unsupported security mechanism")
ErrBadCmd = xerrors.New("zmq4: invalid command name")
ErrBadFrame = xerrors.New("zmq4: invalid frame")
errOverflow = xerrors.New("zmq4: overflow")
errEmptyAppMDKey = xerrors.New("zmq4: empty application metadata key")
errDupAppMDKey = xerrors.New("zmq4: duplicate application metadata key")
errBoolCnv = xerrors.New("zmq4: invalid byte to bool conversion")
errGreeting = errors.New("zmq4: invalid greeting received")
errSecMech = errors.New("zmq4: invalid security mechanism")
errBadSec = errors.New("zmq4: invalid or unsupported security mechanism")
ErrBadCmd = errors.New("zmq4: invalid command name")
ErrBadFrame = errors.New("zmq4: invalid frame")
errOverflow = errors.New("zmq4: overflow")
errEmptyAppMDKey = errors.New("zmq4: empty application metadata key")
errDupAppMDKey = errors.New("zmq4: duplicate application metadata key")
errBoolCnv = errors.New("zmq4: invalid byte to bool conversion")
)
const (
@ -95,21 +95,21 @@ func (g *greeting) read(r io.Reader) error {
var data [zmtpMsgLen]byte
_, err := io.ReadFull(r, data[:])
if err != nil {
return xerrors.Errorf("could not read ZMTP greeting: %w", err)
return fmt.Errorf("could not read ZMTP greeting: %w", err)
}
g.unmarshal(data[:])
if g.Sig.Header != sigHeader {
return xerrors.Errorf("invalid ZMTP signature header: %w", errGreeting)
return fmt.Errorf("invalid ZMTP signature header: %w", errGreeting)
}
if g.Sig.Footer != sigFooter {
return xerrors.Errorf("invalid ZMTP signature footer: %w", errGreeting)
return fmt.Errorf("invalid ZMTP signature footer: %w", errGreeting)
}
if !g.validate(defaultVersion) {
return xerrors.Errorf(
return fmt.Errorf(
"invalid ZMTP version (got=%v, want=%v): %w",
g.Version, defaultVersion, errGreeting,
)

View file

@ -6,10 +6,9 @@ package zmq4
import (
"bytes"
"fmt"
"io"
"testing"
"golang.org/x/xerrors"
)
func TestGreeting(t *testing.T) {
@ -37,12 +36,12 @@ func TestGreeting(t *testing.T) {
{
name: "empty-buffer",
data: nil,
want: xerrors.Errorf("could not read ZMTP greeting: %w", io.EOF),
want: fmt.Errorf("could not read ZMTP greeting: %w", io.EOF),
},
{
name: "unexpected-EOF",
data: make([]byte, 1),
want: xerrors.Errorf("could not read ZMTP greeting: %w", io.ErrUnexpectedEOF),
want: fmt.Errorf("could not read ZMTP greeting: %w", io.ErrUnexpectedEOF),
},
{
name: "invalid-header",
@ -59,7 +58,7 @@ func TestGreeting(t *testing.T) {
}
return w.Bytes()
}(),
want: xerrors.Errorf("invalid ZMTP signature header: %w", errGreeting),
want: fmt.Errorf("invalid ZMTP signature header: %w", errGreeting),
},
{
name: "invalid-footer",
@ -76,7 +75,7 @@ func TestGreeting(t *testing.T) {
}
return w.Bytes()
}(),
want: xerrors.Errorf("invalid ZMTP signature footer: %w", errGreeting),
want: fmt.Errorf("invalid ZMTP signature footer: %w", errGreeting),
},
{
name: "higher-major-version",
@ -125,7 +124,7 @@ func TestGreeting(t *testing.T) {
}
return w.Bytes()
}(),
want: xerrors.Errorf("invalid ZMTP version (got=%v, want=%v): %w",
want: fmt.Errorf("invalid ZMTP version (got=%v, want=%v): %w",
[2]uint8{defaultVersion[0] - 1, defaultVersion[1]},
defaultVersion,
errGreeting,

View file

@ -6,11 +6,11 @@ package zmq4
import (
"context"
"fmt"
"log"
"sync"
"golang.org/x/sync/errgroup"
"golang.org/x/xerrors"
)
// Proxy connects a frontend socket to a backend socket.
@ -152,7 +152,7 @@ func (p *Proxy) init(front, back, capture Socket) {
return nil
default:
// API error. panic.
panic(xerrors.Errorf("invalid control socket command: %v", cmd))
panic(fmt.Errorf("invalid control socket command: %v", cmd))
}
}
}

View file

@ -6,13 +6,13 @@ package zmq4_test
import (
"context"
"fmt"
"sync"
"testing"
"time"
"github.com/go-zeromq/zmq4"
"golang.org/x/sync/errgroup"
"golang.org/x/xerrors"
)
func TestProxy(t *testing.T) {
@ -67,7 +67,7 @@ func TestProxy(t *testing.T) {
defer frontIn.Close()
err := frontIn.Dial(epFront)
if err != nil {
return xerrors.Errorf("front-in could not dial %q: %w", epFront, err)
return fmt.Errorf("front-in could not dial %q: %w", epFront, err)
}
wg1.Done()
@ -79,7 +79,7 @@ func TestProxy(t *testing.T) {
t.Logf("front-in sending %v...", msg)
err = frontIn.Send(msg)
if err != nil {
return xerrors.Errorf("could not send front-in %q: %w", msg, err)
return fmt.Errorf("could not send front-in %q: %w", msg, err)
}
t.Logf("front-in sending %v... [done]", msg)
}
@ -98,7 +98,7 @@ func TestProxy(t *testing.T) {
defer front.Close()
err := front.Listen(epFront)
if err != nil {
return xerrors.Errorf("front could not listen %q: %w", epFront, err)
return fmt.Errorf("front could not listen %q: %w", epFront, err)
}
wg1.Done()
@ -119,7 +119,7 @@ func TestProxy(t *testing.T) {
defer back.Close()
err := back.Listen(epBack)
if err != nil {
return xerrors.Errorf("back could not listen %q: %w", epBack, err)
return fmt.Errorf("back could not listen %q: %w", epBack, err)
}
wg1.Done()
@ -140,7 +140,7 @@ func TestProxy(t *testing.T) {
defer backOut.Close()
err := backOut.Dial(epBack)
if err != nil {
return xerrors.Errorf("back-out could not dial %q: %w", epBack, err)
return fmt.Errorf("back-out could not dial %q: %w", epBack, err)
}
wg1.Done()
@ -152,10 +152,10 @@ func TestProxy(t *testing.T) {
t.Logf("back-out recving %v...", want)
msg, err := backOut.Recv()
if err != nil {
return xerrors.Errorf("back-out could not recv: %w", err)
return fmt.Errorf("back-out could not recv: %w", err)
}
if msg.String() != want.String() {
return xerrors.Errorf("invalid message: got=%v, want=%v", msg, want)
return fmt.Errorf("invalid message: got=%v, want=%v", msg, want)
}
t.Logf("back-out recving %v... [done]", msg)
}
@ -175,7 +175,7 @@ func TestProxy(t *testing.T) {
defer captOut.Close()
err := captOut.Listen(epCapt)
if err != nil {
return xerrors.Errorf("capt-out could not listen %q: %w", epCapt, err)
return fmt.Errorf("capt-out could not listen %q: %w", epCapt, err)
}
wg1.Done()
@ -188,10 +188,10 @@ func TestProxy(t *testing.T) {
t.Logf("capt-out recving %v...", want)
msg, err := captOut.Recv()
if err != nil {
return xerrors.Errorf("capt-out could not recv msg: %w", err)
return fmt.Errorf("capt-out could not recv msg: %w", err)
}
if msg.String() != want.String() {
return xerrors.Errorf("capt-out: invalid message: got=%v, want=%v", msg, want)
return fmt.Errorf("capt-out: invalid message: got=%v, want=%v", msg, want)
}
t.Logf("capt-out recving %v... [done]", msg)
}
@ -210,7 +210,7 @@ func TestProxy(t *testing.T) {
defer capt.Close()
err := capt.Dial(epCapt)
if err != nil {
return xerrors.Errorf("capt could not dial %q: %w", epCapt, err)
return fmt.Errorf("capt could not dial %q: %w", epCapt, err)
}
wg1.Done()

5
pub.go
View file

@ -6,10 +6,9 @@ package zmq4
import (
"context"
"fmt"
"net"
"sync"
"golang.org/x/xerrors"
)
// Topics is an interface that wraps the basic Topics method.
@ -57,7 +56,7 @@ func (pub *pubSocket) SendMulti(msg Msg) error {
// Recv receives a complete message.
func (*pubSocket) Recv() (Msg, error) {
msg := Msg{err: xerrors.Errorf("zmq4: PUB sockets can't recv messages")}
msg := Msg{err: fmt.Errorf("zmq4: PUB sockets can't recv messages")}
return msg, msg.err
}

View file

@ -6,9 +6,8 @@ package zmq4
import (
"context"
"fmt"
"net"
"golang.org/x/xerrors"
)
// NewPull returns a new PULL ZeroMQ socket.
@ -32,14 +31,14 @@ func (pull *pullSocket) Close() error {
// Send puts the message on the outbound send queue.
// Send blocks until the message can be queued or the send deadline expires.
func (*pullSocket) Send(msg Msg) error {
return xerrors.Errorf("zmq4: PULL sockets can't send messages")
return fmt.Errorf("zmq4: PULL sockets can't send messages")
}
// SendMulti puts the message on the outbound send queue.
// SendMulti blocks until the message can be queued or the send deadline expires.
// The message will be sent as a multipart message.
func (pull *pullSocket) SendMulti(msg Msg) error {
return xerrors.Errorf("zmq4: PULL sockets can't send messages")
return fmt.Errorf("zmq4: PULL sockets can't send messages")
}
// Recv receives a complete message.

View file

@ -6,9 +6,8 @@ package zmq4
import (
"context"
"fmt"
"net"
"golang.org/x/xerrors"
)
// NewPush returns a new PUSH ZeroMQ socket.
@ -44,7 +43,7 @@ func (push *pushSocket) SendMulti(msg Msg) error {
// Recv receives a complete message.
func (*pushSocket) Recv() (Msg, error) {
return Msg{}, xerrors.Errorf("zmq4: PUSH sockets can't recv messages")
return Msg{}, fmt.Errorf("zmq4: PUSH sockets can't recv messages")
}
// Listen connects a local endpoint to the Socket.

5
rep.go
View file

@ -6,10 +6,9 @@ package zmq4
import (
"context"
"fmt"
"net"
"sync"
"golang.org/x/xerrors"
)
// NewRep returns a new REP ZeroMQ socket.
@ -150,7 +149,7 @@ func (r *repReader) read(ctx context.Context, msg *Msg) error {
}
pre, innerMsg := splitReq(repMsg.msg)
if pre == nil {
return xerrors.Errorf("zmq4: invalid REP message")
return fmt.Errorf("zmq4: invalid REP message")
}
*msg = innerMsg
r.state.Set(repMsg.conn, pre)

7
req.go
View file

@ -6,10 +6,9 @@ package zmq4
import (
"context"
"fmt"
"net"
"sync"
"golang.org/x/xerrors"
)
// NewReq returns a new REQ ZeroMQ socket.
@ -120,7 +119,7 @@ func (r *reqWriter) write(ctx context.Context, msg Msg) error {
return nil
}
}
return xerrors.Errorf("zmq4: no connections available: %w", err)
return fmt.Errorf("zmq4: no connections available: %w", err)
}
func (r *reqWriter) addConn(c *Conn) {
@ -182,7 +181,7 @@ func (r *reqReader) Close() error {
func (r *reqReader) read(ctx context.Context, msg *Msg) error {
curConn := r.state.Get()
if curConn == nil {
return xerrors.Errorf("zmq4: no connections available")
return fmt.Errorf("zmq4: no connections available")
}
*msg = curConn.read()
if msg.err != nil {

View file

@ -5,9 +5,8 @@
package zmq4
import (
"fmt"
"io"
"golang.org/x/xerrors"
)
// Security is an interface for ZMTP security mechanisms
@ -65,17 +64,17 @@ func (nullSecurity) Type() SecurityType {
func (nullSecurity) Handshake(conn *Conn, server bool) error {
raw, err := conn.Meta.MarshalZMTP()
if err != nil {
return xerrors.Errorf("zmq4: could not marshal metadata: %w", err)
return fmt.Errorf("zmq4: could not marshal metadata: %w", err)
}
err = conn.SendCmd(CmdReady, raw)
if err != nil {
return xerrors.Errorf("zmq4: could not send metadata to peer: %w", err)
return fmt.Errorf("zmq4: could not send metadata to peer: %w", err)
}
cmd, err := conn.RecvCmd()
if err != nil {
return xerrors.Errorf("zmq4: could not recv metadata from peer: %w", err)
return fmt.Errorf("zmq4: could not recv metadata from peer: %w", err)
}
if cmd.Name != CmdReady {
@ -84,7 +83,7 @@ func (nullSecurity) Handshake(conn *Conn, server bool) error {
err = conn.Peer.Meta.UnmarshalZMTP(cmd.Body)
if err != nil {
return xerrors.Errorf("zmq4: could not unmarshal peer metadata: %w", err)
return fmt.Errorf("zmq4: could not unmarshal peer metadata: %w", err)
}
return nil

View file

@ -6,10 +6,10 @@
package null
import (
"fmt"
"io"
"github.com/go-zeromq/zmq4"
"golang.org/x/xerrors"
)
// security implements the NULL security mechanism.
@ -34,17 +34,17 @@ func (security) Type() zmq4.SecurityType {
func (security) Handshake(conn *zmq4.Conn, server bool) error {
raw, err := conn.Meta.MarshalZMTP()
if err != nil {
return xerrors.Errorf("security/null: could not marshal metadata: %w", err)
return fmt.Errorf("security/null: could not marshal metadata: %w", err)
}
err = conn.SendCmd(zmq4.CmdReady, raw)
if err != nil {
return xerrors.Errorf("security/null: could not send metadata to peer: %w", err)
return fmt.Errorf("security/null: could not send metadata to peer: %w", err)
}
cmd, err := conn.RecvCmd()
if err != nil {
return xerrors.Errorf("security/null: could not recv metadata from peer: %w", err)
return fmt.Errorf("security/null: could not recv metadata from peer: %w", err)
}
if cmd.Name != zmq4.CmdReady {
@ -53,7 +53,7 @@ func (security) Handshake(conn *zmq4.Conn, server bool) error {
err = conn.Peer.Meta.UnmarshalZMTP(cmd.Body)
if err != nil {
return xerrors.Errorf("security/null: could not unmarshal peer metadata: %w", err)
return fmt.Errorf("security/null: could not unmarshal peer metadata: %w", err)
}
return nil

View file

@ -7,6 +7,7 @@ package null_test
import (
"bytes"
"context"
"fmt"
"os"
"reflect"
"strings"
@ -16,7 +17,6 @@ import (
"github.com/go-zeromq/zmq4"
"github.com/go-zeromq/zmq4/security/null"
"golang.org/x/sync/errgroup"
"golang.org/x/xerrors"
)
func TestSecurity(t *testing.T) {
@ -68,21 +68,21 @@ func TestHandshakeReqRep(t *testing.T) {
grp.Go(func() error {
err := rep.Listen(ep)
if err != nil {
return xerrors.Errorf("could not listen: %w", err)
return fmt.Errorf("could not listen: %w", err)
}
msg, err := rep.Recv()
if err != nil {
return xerrors.Errorf("could not recv REQ message: %w", err)
return fmt.Errorf("could not recv REQ message: %w", err)
}
if !reflect.DeepEqual(msg, reqQuit) {
return xerrors.Errorf("got = %v, want = %v", msg, repQuit)
return fmt.Errorf("got = %v, want = %v", msg, repQuit)
}
err = rep.Send(repQuit)
if err != nil {
return xerrors.Errorf("could not send REP message: %w", err)
return fmt.Errorf("could not send REP message: %w", err)
}
return nil
@ -91,12 +91,12 @@ func TestHandshakeReqRep(t *testing.T) {
grp.Go(func() error {
err := req.Dial(ep)
if err != nil {
return xerrors.Errorf("could not dial: %w", err)
return fmt.Errorf("could not dial: %w", err)
}
err = req.Send(reqQuit)
if err != nil {
return xerrors.Errorf("could not send REQ message: %w", err)
return fmt.Errorf("could not send REQ message: %w", err)
}
return nil
})

View file

@ -7,10 +7,10 @@
package plain
import (
"fmt"
"io"
"github.com/go-zeromq/zmq4"
"golang.org/x/xerrors"
)
// security implements the PLAIN security mechanism.
@ -40,44 +40,44 @@ func (sec *security) Handshake(conn *zmq4.Conn, server bool) error {
case server:
cmd, err := conn.RecvCmd()
if err != nil {
return xerrors.Errorf("security/plain: could not receive HELLO from client: %w", err)
return fmt.Errorf("security/plain: could not receive HELLO from client: %w", err)
}
if cmd.Name != zmq4.CmdHello {
return xerrors.Errorf("security/plain: expected HELLO command")
return fmt.Errorf("security/plain: expected HELLO command")
}
// FIXME(sbinet): perform a real authentication
err = validateHello(cmd.Body)
if err != nil {
conn.SendCmd(zmq4.CmdError, []byte("invalid")) // FIXME(sbinet) correct ERROR reason
return xerrors.Errorf("security/plain: could not authenticate client: %w", err)
return fmt.Errorf("security/plain: could not authenticate client: %w", err)
}
err = conn.SendCmd(zmq4.CmdWelcome, nil)
if err != nil {
return xerrors.Errorf("security/plain: could not send WELCOME to client: %w", err)
return fmt.Errorf("security/plain: could not send WELCOME to client: %w", err)
}
cmd, err = conn.RecvCmd()
if err != nil {
return xerrors.Errorf("security/plain: could not receive INITIATE from client: %w", err)
return fmt.Errorf("security/plain: could not receive INITIATE from client: %w", err)
}
err = conn.Peer.Meta.UnmarshalZMTP(cmd.Body)
if err != nil {
return xerrors.Errorf("security/plain: could not unmarshal peer metadata: %w", err)
return fmt.Errorf("security/plain: could not unmarshal peer metadata: %w", err)
}
raw, err := conn.Meta.MarshalZMTP()
if err != nil {
conn.SendCmd(zmq4.CmdError, []byte("invalid")) // FIXME(sbinet) correct ERROR reason
return xerrors.Errorf("security/plain: could not serialize metadata: %w", err)
return fmt.Errorf("security/plain: could not serialize metadata: %w", err)
}
err = conn.SendCmd(zmq4.CmdReady, raw)
if err != nil {
return xerrors.Errorf("security/plain: could not send READY to client: %w", err)
return fmt.Errorf("security/plain: could not send READY to client: %w", err)
}
case !server:
@ -89,41 +89,41 @@ func (sec *security) Handshake(conn *zmq4.Conn, server bool) error {
err := conn.SendCmd(zmq4.CmdHello, hello)
if err != nil {
return xerrors.Errorf("security/plain: could not send HELLO to server: %w", err)
return fmt.Errorf("security/plain: could not send HELLO to server: %w", err)
}
cmd, err := conn.RecvCmd()
if err != nil {
return xerrors.Errorf("security/plain: could not receive WELCOME from server: %w", err)
return fmt.Errorf("security/plain: could not receive WELCOME from server: %w", err)
}
if cmd.Name != zmq4.CmdWelcome {
conn.SendCmd(zmq4.CmdError, []byte("invalid command")) // FIXME(sbinet) correct ERROR reason
return xerrors.Errorf("security/plain: expected a WELCOME command from server: %w", err)
return fmt.Errorf("security/plain: expected a WELCOME command from server: %w", err)
}
raw, err := conn.Meta.MarshalZMTP()
if err != nil {
conn.SendCmd(zmq4.CmdError, []byte("internal error")) // FIXME(sbinet) correct ERROR reason
return xerrors.Errorf("security/plain: could not serialize metadata: %w", err)
return fmt.Errorf("security/plain: could not serialize metadata: %w", err)
}
err = conn.SendCmd(zmq4.CmdInitiate, raw)
if err != nil {
return xerrors.Errorf("security/plain: could not send INITIATE to server: %w", err)
return fmt.Errorf("security/plain: could not send INITIATE to server: %w", err)
}
cmd, err = conn.RecvCmd()
if err != nil {
return xerrors.Errorf("security/plain: could not receive READY from server: %w", err)
return fmt.Errorf("security/plain: could not receive READY from server: %w", err)
}
if cmd.Name != zmq4.CmdReady {
conn.SendCmd(zmq4.CmdError, []byte("invalid command")) // FIXME(sbinet) correct ERROR reason
return xerrors.Errorf("security/plain: expected a READY command from server: %w", err)
return fmt.Errorf("security/plain: expected a READY command from server: %w", err)
}
err = conn.Peer.Meta.UnmarshalZMTP(cmd.Body)
if err != nil {
return xerrors.Errorf("security/plain: could not unmarshal peer metadata: %w", err)
return fmt.Errorf("security/plain: could not unmarshal peer metadata: %w", err)
}
sec.user = nil

View file

@ -8,6 +8,7 @@ package plain_test
import (
"context"
"fmt"
"os"
"reflect"
"testing"
@ -17,7 +18,6 @@ import (
"github.com/go-zeromq/zmq4"
"github.com/go-zeromq/zmq4/security/plain"
"golang.org/x/sync/errgroup"
"golang.org/x/xerrors"
)
func TestMain(m *testing.M) {
@ -66,21 +66,21 @@ func TestHandshakeReqCRep(t *testing.T) {
grp.Go(func() error {
err := rep.Listen(ep)
if err != nil {
return xerrors.Errorf("could not listen: %w", err)
return fmt.Errorf("could not listen: %w", err)
}
msg, err := rep.Recv()
if err != nil {
return xerrors.Errorf("could not recv REQ message: %w", err)
return fmt.Errorf("could not recv REQ message: %w", err)
}
if !reflect.DeepEqual(msg, reqQuit) {
return xerrors.Errorf("got = %v, want = %v", msg, repQuit)
return fmt.Errorf("got = %v, want = %v", msg, repQuit)
}
err = rep.Send(repQuit)
if err != nil {
return xerrors.Errorf("could not send REP message: %w", err)
return fmt.Errorf("could not send REP message: %w", err)
}
return nil
@ -89,20 +89,20 @@ func TestHandshakeReqCRep(t *testing.T) {
grp.Go(func() error {
err := req.Dial(ep)
if err != nil {
return xerrors.Errorf("could not dial: %w", err)
return fmt.Errorf("could not dial: %w", err)
}
err = req.Send(reqQuit)
if err != nil {
return xerrors.Errorf("could not send REQ message: %w", err)
return fmt.Errorf("could not send REQ message: %w", err)
}
msg, err := req.Recv()
if err != nil {
return xerrors.Errorf("could not recv REQ message: %w", err)
return fmt.Errorf("could not recv REQ message: %w", err)
}
if !reflect.DeepEqual(msg, repQuit) {
return xerrors.Errorf("got = %v, want = %v", msg, repQuit)
return fmt.Errorf("got = %v, want = %v", msg, repQuit)
}
return nil
})
@ -135,21 +135,21 @@ func TestHandshakeCReqRep(t *testing.T) {
grp.Go(func() error {
err := rep.Listen(ep)
if err != nil {
return xerrors.Errorf("could not listen: %w", err)
return fmt.Errorf("could not listen: %w", err)
}
msg, err := rep.Recv()
if err != nil {
return xerrors.Errorf("could not recv REQ message: %w", err)
return fmt.Errorf("could not recv REQ message: %w", err)
}
if !reflect.DeepEqual(msg, reqQuit) {
return xerrors.Errorf("got = %v, want = %v", msg, repQuit)
return fmt.Errorf("got = %v, want = %v", msg, repQuit)
}
err = rep.Send(repQuit)
if err != nil {
return xerrors.Errorf("could not send REP message: %w", err)
return fmt.Errorf("could not send REP message: %w", err)
}
return nil
@ -158,20 +158,20 @@ func TestHandshakeCReqRep(t *testing.T) {
grp.Go(func() error {
err := req.Dial(ep)
if err != nil {
return xerrors.Errorf("could not dial: %w", err)
return fmt.Errorf("could not dial: %w", err)
}
err = req.Send(reqQuit)
if err != nil {
return xerrors.Errorf("could not send REQ message: %w", err)
return fmt.Errorf("could not send REQ message: %w", err)
}
msg, err := req.Recv()
if err != nil {
return xerrors.Errorf("could not recv REQ message: %w", err)
return fmt.Errorf("could not recv REQ message: %w", err)
}
if !reflect.DeepEqual(msg, repQuit) {
return xerrors.Errorf("got = %v, want = %v", msg, repQuit)
return fmt.Errorf("got = %v, want = %v", msg, repQuit)
}
return nil
})
@ -204,21 +204,21 @@ func TestHandshakeCReqCRep(t *testing.T) {
grp.Go(func() error {
err := rep.Listen(ep)
if err != nil {
return xerrors.Errorf("could not listen: %w", err)
return fmt.Errorf("could not listen: %w", err)
}
msg, err := rep.Recv()
if err != nil {
return xerrors.Errorf("could not recv REQ message: %w", err)
return fmt.Errorf("could not recv REQ message: %w", err)
}
if !reflect.DeepEqual(msg, reqQuit) {
return xerrors.Errorf("got = %v, want = %v", msg, repQuit)
return fmt.Errorf("got = %v, want = %v", msg, repQuit)
}
err = rep.Send(repQuit)
if err != nil {
return xerrors.Errorf("could not send REP message: %w", err)
return fmt.Errorf("could not send REP message: %w", err)
}
return nil
@ -227,20 +227,20 @@ func TestHandshakeCReqCRep(t *testing.T) {
grp.Go(func() error {
err := req.Dial(ep)
if err != nil {
return xerrors.Errorf("could not dial: %w", err)
return fmt.Errorf("could not dial: %w", err)
}
err = req.Send(reqQuit)
if err != nil {
return xerrors.Errorf("could not send REQ message: %w", err)
return fmt.Errorf("could not send REQ message: %w", err)
}
msg, err := req.Recv()
if err != nil {
return xerrors.Errorf("could not recv REQ message: %w", err)
return fmt.Errorf("could not recv REQ message: %w", err)
}
if !reflect.DeepEqual(msg, repQuit) {
return xerrors.Errorf("got = %v, want = %v", msg, repQuit)
return fmt.Errorf("got = %v, want = %v", msg, repQuit)
}
return nil
})

View file

@ -18,7 +18,6 @@ import (
"github.com/go-zeromq/zmq4"
"github.com/go-zeromq/zmq4/security/plain"
"golang.org/x/sync/errgroup"
"golang.org/x/xerrors"
)
var (
@ -73,15 +72,15 @@ func TestHandshakeReqRep(t *testing.T) {
grp.Go(func() error {
err := rep.Listen(ep)
if err != nil {
return xerrors.Errorf("could not listen: %w", err)
return fmt.Errorf("could not listen: %w", err)
}
msg, err := rep.Recv()
if err != nil {
return xerrors.Errorf("could not recv REQ message: %w", err)
return fmt.Errorf("could not recv REQ message: %w", err)
}
if string(msg.Frames[0]) != "QUIT" {
return xerrors.Errorf("received wrong REQ message: %#v", msg)
return fmt.Errorf("received wrong REQ message: %#v", msg)
}
return nil
})
@ -89,12 +88,12 @@ func TestHandshakeReqRep(t *testing.T) {
grp.Go(func() error {
err := req.Dial(ep)
if err != nil {
return xerrors.Errorf("could not dial: %w", err)
return fmt.Errorf("could not dial: %w", err)
}
err = req.Send(reqQuit)
if err != nil {
return xerrors.Errorf("could not send REQ message: %w", err)
return fmt.Errorf("could not send REQ message: %w", err)
}
return nil
})
@ -116,11 +115,11 @@ func EndPoint(transport string) (string, error) {
case "tcp":
addr, err := net.ResolveTCPAddr("tcp", "127.0.0.1:0")
if err != nil {
return "", xerrors.Errorf("could not resolve TCP address: %w", err)
return "", fmt.Errorf("could not resolve TCP address: %w", err)
}
l, err := net.ListenTCP("tcp", addr)
if err != nil {
return "", xerrors.Errorf("could not listen to TCP addr=%q: %w", addr, err)
return "", fmt.Errorf("could not listen to TCP addr=%q: %w", addr, err)
}
defer l.Close()
return fmt.Sprintf("tcp://%s", l.Addr()), nil

View file

@ -7,6 +7,7 @@ package zmq4
import (
"bytes"
"context"
"fmt"
"os"
"reflect"
"strings"
@ -14,7 +15,6 @@ import (
"time"
"golang.org/x/sync/errgroup"
"golang.org/x/xerrors"
)
func TestNullSecurity(t *testing.T) {
@ -66,21 +66,21 @@ func TestNullHandshakeReqRep(t *testing.T) {
grp.Go(func() error {
err := rep.Listen(ep)
if err != nil {
return xerrors.Errorf("could not listen: %w", err)
return fmt.Errorf("could not listen: %w", err)
}
msg, err := rep.Recv()
if err != nil {
return xerrors.Errorf("could not recv REQ message: %w", err)
return fmt.Errorf("could not recv REQ message: %w", err)
}
if !reflect.DeepEqual(msg, reqQuit) {
return xerrors.Errorf("got = %v, want = %v", msg, repQuit)
return fmt.Errorf("got = %v, want = %v", msg, repQuit)
}
err = rep.Send(repQuit)
if err != nil {
return xerrors.Errorf("could not send REP message: %w", err)
return fmt.Errorf("could not send REP message: %w", err)
}
return nil
@ -89,12 +89,12 @@ func TestNullHandshakeReqRep(t *testing.T) {
grp.Go(func() error {
err := req.Dial(ep)
if err != nil {
return xerrors.Errorf("could not dial: %w", err)
return fmt.Errorf("could not dial: %w", err)
}
err = req.Send(reqQuit)
if err != nil {
return xerrors.Errorf("could not send REQ message: %w", err)
return fmt.Errorf("could not send REQ message: %w", err)
}
return nil
})

View file

@ -6,6 +6,8 @@ package zmq4
import (
"context"
"errors"
"fmt"
"log"
"net"
"os"
@ -15,7 +17,6 @@ import (
"time"
"github.com/go-zeromq/zmq4/internal/inproc"
"golang.org/x/xerrors"
)
const (
@ -24,9 +25,9 @@ const (
)
var (
errInvalidAddress = xerrors.New("zmq4: invalid address")
errInvalidAddress = errors.New("zmq4: invalid address")
ErrBadProperty = xerrors.New("zmq4: bad property")
ErrBadProperty = errors.New("zmq4: bad property")
)
// socket implements the ZeroMQ socket interface
@ -193,7 +194,7 @@ func (sck *socket) Listen(endpoint string) error {
}
if err != nil {
return xerrors.Errorf("zmq4: could not listen to %q: %w", endpoint, err)
return fmt.Errorf("zmq4: could not listen to %q: %w", endpoint, err)
}
sck.listener = l
@ -261,19 +262,19 @@ connect:
time.Sleep(sck.retry)
goto connect
}
return xerrors.Errorf("zmq4: could not dial to %q: %w", endpoint, err)
return fmt.Errorf("zmq4: could not dial to %q: %w", endpoint, err)
}
if conn == nil {
return xerrors.Errorf("zmq4: got a nil dial-conn to %q", endpoint)
return fmt.Errorf("zmq4: got a nil dial-conn to %q", endpoint)
}
zconn, err := Open(conn, sck.sec, sck.typ, sck.id, false, sck.scheduleRmConn)
if err != nil {
return xerrors.Errorf("zmq4: could not open a ZMTP connection: %w", err)
return fmt.Errorf("zmq4: could not open a ZMTP connection: %w", err)
}
if zconn == nil {
return xerrors.Errorf("zmq4: got a nil ZMTP connection to %q", endpoint)
return fmt.Errorf("zmq4: got a nil ZMTP connection to %q", endpoint)
}
go sck.connReaper()

View file

@ -6,6 +6,7 @@ package zmq4_test
import (
"context"
"fmt"
"io"
"net"
"testing"
@ -13,7 +14,6 @@ import (
"github.com/go-zeromq/zmq4"
"golang.org/x/sync/errgroup"
"golang.org/x/xerrors"
)
func TestInvalidConn(t *testing.T) {
@ -37,17 +37,17 @@ func TestInvalidConn(t *testing.T) {
grp.Go(func() error {
conn, err := net.Dial("tcp", ep[len("tcp://"):])
if err != nil {
return xerrors.Errorf("could not dial %q: %w", ep, err)
return fmt.Errorf("could not dial %q: %w", ep, err)
}
defer conn.Close()
var reply = make([]byte, 64)
_, err = io.ReadFull(conn, reply)
if err != nil {
return xerrors.Errorf("could not read reply bytes...: %w", err)
return fmt.Errorf("could not read reply bytes...: %w", err)
}
_, err = conn.Write(make([]byte, 64))
if err != nil {
return xerrors.Errorf("could not send bytes...: %w", err)
return fmt.Errorf("could not send bytes...: %w", err)
}
time.Sleep(1 * time.Second) // FIXME(sbinet): hugly.
return nil
@ -160,7 +160,7 @@ func TestConnPairs(t *testing.T) {
if err == nil {
t.Fatalf("dialed %q", ep)
}
want := xerrors.Errorf("zmq4: could not open a ZMTP connection: zmq4: could not initialize ZMTP connection: zmq4: peer=%q not compatible with %q", tc.srv.Type(), tc.wrong.Type())
want := fmt.Errorf("zmq4: could not open a ZMTP connection: zmq4: could not initialize ZMTP connection: zmq4: peer=%q not compatible with %q", tc.srv.Type(), tc.wrong.Type())
if got, want := err.Error(), want.Error(); got != want {
t.Fatalf("invalid error:\ngot = %v\nwant= %v", got, want)
}

View file

@ -11,8 +11,6 @@ import (
"log"
"net"
"strings"
"golang.org/x/xerrors"
)
// splitAddr returns the triplet (network, addr, error)
@ -52,7 +50,7 @@ func splitAddr(v string) (network, addr string, err error) {
host = ep[1]
return "inproc", host, nil
default:
err = xerrors.Errorf("zmq4: unknown protocol %q", network)
err = fmt.Errorf("zmq4: unknown protocol %q", network)
}
return network, addr, err

View file

@ -7,13 +7,13 @@ package zmq4_test
import (
"bytes"
"context"
"fmt"
"sync"
"testing"
"time"
"github.com/go-zeromq/zmq4"
"golang.org/x/sync/errgroup"
"golang.org/x/xerrors"
)
var (
@ -89,11 +89,11 @@ func TestPair(t *testing.T) {
err := tc.srv.Listen(ep)
if err != nil {
return xerrors.Errorf("could not listen: %w", err)
return fmt.Errorf("could not listen: %w", err)
}
if addr := tc.srv.Addr(); addr == nil {
return xerrors.Errorf("listener with nil Addr")
return fmt.Errorf("listener with nil Addr")
}
wg1.Wait()
@ -102,25 +102,25 @@ func TestPair(t *testing.T) {
for _, msg := range msgs {
err = tc.srv.Send(msg)
if err != nil {
return xerrors.Errorf("could not send message %v: %w", msg, err)
return fmt.Errorf("could not send message %v: %w", msg, err)
}
reply, err := tc.srv.Recv()
if err != nil {
return xerrors.Errorf("could not recv reply to %v: %w", msg, err)
return fmt.Errorf("could not recv reply to %v: %w", msg, err)
}
if got, want := reply, zmq4.NewMsgString("reply: "+string(msg.Bytes())); !bytes.Equal(got.Bytes(), want.Bytes()) {
return xerrors.Errorf("invalid cli reply for msg #%d: got=%v, want=%v", i, got, want)
return fmt.Errorf("invalid cli reply for msg #%d: got=%v, want=%v", i, got, want)
}
}
quit, err := tc.srv.Recv()
if err != nil {
return xerrors.Errorf("could not recv QUIT message: %w", err)
return fmt.Errorf("could not recv QUIT message: %w", err)
}
if got, want := quit, zmq4.NewMsgString("QUIT"); !bytes.Equal(got.Bytes(), want.Bytes()) {
return xerrors.Errorf("invalid QUIT message from cli: got=%v, want=%v", got, want)
return fmt.Errorf("invalid QUIT message from cli: got=%v, want=%v", got, want)
}
return err
@ -130,7 +130,7 @@ func TestPair(t *testing.T) {
err := tc.cli.Dial(ep)
if err != nil {
return xerrors.Errorf("could not dial: %w", err)
return fmt.Errorf("could not dial: %w", err)
}
wg1.Done()
@ -139,23 +139,23 @@ func TestPair(t *testing.T) {
for i := range msgs {
msg, err := tc.cli.Recv()
if err != nil {
return xerrors.Errorf("could not recv #%d msg from srv: %w", i, err)
return fmt.Errorf("could not recv #%d msg from srv: %w", i, err)
}
if !bytes.Equal(msg.Bytes(), msgs[i].Bytes()) {
return xerrors.Errorf("invalid #%d msg from srv: got=%v, want=%v",
msg, msgs[i],
return fmt.Errorf("invalid #%d msg from srv: got=%v, want=%v",
i, msg, msgs[i],
)
}
err = tc.cli.Send(zmq4.NewMsgString("reply: " + string(msg.Bytes())))
if err != nil {
return xerrors.Errorf("could not send message %v: %w", msg, err)
return fmt.Errorf("could not send message %v: %w", msg, err)
}
}
err = tc.cli.Send(zmq4.NewMsgString("QUIT"))
if err != nil {
return xerrors.Errorf("could not send QUIT message: %w", err)
return fmt.Errorf("could not send QUIT message: %w", err)
}
return err

View file

@ -6,6 +6,8 @@ package zmq4_test
import (
"context"
"errors"
"fmt"
"reflect"
"sort"
"sync"
@ -14,7 +16,6 @@ import (
"github.com/go-zeromq/zmq4"
"golang.org/x/sync/errgroup"
"golang.org/x/xerrors"
)
var (
@ -127,11 +128,11 @@ func TestPubSub(t *testing.T) {
err := tc.pub.Listen(ep)
if err != nil {
return xerrors.Errorf("could not listen: %w", err)
return fmt.Errorf("could not listen: %w", err)
}
if addr := tc.pub.Addr(); addr == nil {
return xerrors.Errorf("listener with nil Addr")
return fmt.Errorf("listener with nil Addr")
}
wg1.Wait()
@ -142,7 +143,7 @@ func TestPubSub(t *testing.T) {
for _, msg := range msgs[0] {
err = tc.pub.Send(msg)
if err != nil {
return xerrors.Errorf("could not send message %v: %w", msg, err)
return fmt.Errorf("could not send message %v: %w", msg, err)
}
}
@ -155,11 +156,11 @@ func TestPubSub(t *testing.T) {
var err error
err = sub.Dial(ep)
if err != nil {
return xerrors.Errorf("could not dial: %w", err)
return fmt.Errorf("could not dial: %w", err)
}
if addr := sub.Addr(); addr != nil {
return xerrors.Errorf("dialer with non-nil Addr")
return fmt.Errorf("dialer with non-nil Addr")
}
wg1.Done()
@ -167,7 +168,7 @@ func TestPubSub(t *testing.T) {
err = sub.SetOption(zmq4.OptionSubscribe, topics[isub])
if err != nil {
return xerrors.Errorf("could not subscribe to topic %q: %w", topics[isub], err)
return fmt.Errorf("could not subscribe to topic %q: %w", topics[isub], err)
}
wg2.Done()
@ -177,10 +178,10 @@ func TestPubSub(t *testing.T) {
for imsg, want := range msgs {
msg, err := sub.Recv()
if err != nil {
return xerrors.Errorf("could not recv message %v: %w", want, err)
return fmt.Errorf("could not recv message %v: %w", want, err)
}
if !reflect.DeepEqual(msg, want) {
return xerrors.Errorf("sub[%d][msg=%d]: got = %v, want= %v", isub, imsg, msg, want)
return fmt.Errorf("sub[%d][msg=%d]: got = %v, want= %v", isub, imsg, msg, want)
}
nmsgs[isub]++
}
@ -228,7 +229,7 @@ func TestPubSubClosedSub(t *testing.T) {
grp.Go(func() error {
err := pub.Listen(ep)
if err != nil {
return xerrors.Errorf("could not listen on end point: %+v", err)
return fmt.Errorf("could not listen on end point: %+v", err)
}
<-subReady
@ -236,7 +237,7 @@ func TestPubSubClosedSub(t *testing.T) {
for i := 0; i < nmsgs; i++ {
if err := pub.Send(msg); err != nil {
return xerrors.Errorf("could not send message %v: %w", msg, err)
return fmt.Errorf("could not send message %v: %w", msg, err)
}
}
@ -246,7 +247,7 @@ func TestPubSubClosedSub(t *testing.T) {
for i := 0; i < nmsgs; i++ {
if err := pub.Send(msg); err != nil {
return xerrors.Errorf("could not send message %v: %w", msg, err)
return fmt.Errorf("could not send message %v: %w", msg, err)
}
}
@ -262,12 +263,12 @@ func TestPubSubClosedSub(t *testing.T) {
var err error
err = sub.Dial(ep)
if err != nil {
return xerrors.Errorf("could not dial: %w", err)
return fmt.Errorf("could not dial: %w", err)
}
err = sub.SetOption(zmq4.OptionSubscribe, topic)
if err != nil {
return xerrors.Errorf("could not subscribe to topic %q: %w", topic, err)
return fmt.Errorf("could not subscribe to topic %q: %w", topic, err)
}
close(subReady)
@ -278,10 +279,10 @@ func TestPubSubClosedSub(t *testing.T) {
break
}
if err != nil {
return xerrors.Errorf("could not recv message: %w", err)
return fmt.Errorf("could not recv message: %w", err)
}
if !reflect.DeepEqual(rmsg, msg) {
return xerrors.Errorf("sub: got = %v, want= %v", rmsg, msg)
return fmt.Errorf("sub: got = %v, want= %v", rmsg, msg)
}
}
@ -315,10 +316,10 @@ func TestPubSubMultiPart(t *testing.T) {
var err error
err = pub.Listen(ep)
if err != nil {
return xerrors.Errorf("could not listen on end point: %+v", err)
return fmt.Errorf("could not listen on end point: %+v", err)
}
if addr := pub.Addr(); addr == nil {
return xerrors.Errorf("listener with nil Addr")
return fmt.Errorf("listener with nil Addr")
}
pss.WaitForSubscriptions()
@ -326,7 +327,7 @@ func TestPubSubMultiPart(t *testing.T) {
err = pub.SendMulti(msg)
if err != nil {
return xerrors.Errorf("could not send message %v: %w", msg, err)
return fmt.Errorf("could not send message %v: %w", msg, err)
}
return nil
@ -336,11 +337,11 @@ func TestPubSubMultiPart(t *testing.T) {
var err error
err = sub.Dial(ep)
if err != nil {
return xerrors.Errorf("could not dial: %w", err)
return fmt.Errorf("could not dial: %w", err)
}
if addr := sub.Addr(); addr != nil {
return xerrors.Errorf("dialer with non-nil Addr")
return fmt.Errorf("dialer with non-nil Addr")
}
pss.DialComplete()
@ -348,7 +349,7 @@ func TestPubSubMultiPart(t *testing.T) {
err = sub.SetOption(zmq4.OptionSubscribe, "msg")
if err != nil {
return xerrors.Errorf("could not subscribe to topic: %w", err)
return fmt.Errorf("could not subscribe to topic: %w", err)
}
pss.SubscriptionComplete()
@ -356,10 +357,10 @@ func TestPubSubMultiPart(t *testing.T) {
newMsg, err := sub.Recv()
if err != nil {
return xerrors.Errorf("could not recv message %v: %w", msg, err)
return fmt.Errorf("could not recv message %v: %w", msg, err)
}
if !reflect.DeepEqual(newMsg, msg) {
return xerrors.Errorf("got = %v, want= %v", newMsg, msg)
return fmt.Errorf("got = %v, want= %v", newMsg, msg)
}
return err
})
@ -459,7 +460,7 @@ func TestPubSubDeadPub(t *testing.T) {
err := pub.Listen(ep)
if err != nil {
return xerrors.Errorf("could not listen on end point: %+v", err)
return fmt.Errorf("could not listen on end point: %+v", err)
}
<-subReady
@ -467,7 +468,7 @@ func TestPubSubDeadPub(t *testing.T) {
for i := 0; i < nmsgs; i++ {
if err := pub.Send(msg); err != nil {
return xerrors.Errorf("could not send message %v: %w", msg, err)
return fmt.Errorf("could not send message %v: %w", msg, err)
}
}
<-subDoneReading
@ -479,12 +480,12 @@ func TestPubSubDeadPub(t *testing.T) {
var err error
err = sub.Dial(ep)
if err != nil {
return xerrors.Errorf("could not dial: %w", err)
return fmt.Errorf("could not dial: %w", err)
}
err = sub.SetOption(zmq4.OptionSubscribe, topic)
if err != nil {
return xerrors.Errorf("could not subscribe to topic %q: %w", topic, err)
return fmt.Errorf("could not subscribe to topic %q: %w", topic, err)
}
close(subReady)
@ -492,10 +493,10 @@ func TestPubSubDeadPub(t *testing.T) {
for i := 0; i < nmsgs; i++ {
rmsg, err := sub.Recv()
if err != nil {
return xerrors.Errorf("could not recv message: %w", err)
return fmt.Errorf("could not recv message: %w", err)
}
if !reflect.DeepEqual(rmsg, msg) {
return xerrors.Errorf("sub: got = %v, want= %v", rmsg, msg)
return fmt.Errorf("sub: got = %v, want= %v", rmsg, msg)
}
}
@ -504,7 +505,7 @@ func TestPubSubDeadPub(t *testing.T) {
_, err = sub.Recv() // make sure we aren't deadlocked
if err == nil {
return xerrors.New("expected an error")
return errors.New("expected an error")
}
return nil
@ -544,7 +545,7 @@ func TestPubOptionHWM(t *testing.T) {
var err error
err = pub.Listen(ep)
if err != nil {
return xerrors.Errorf("could not listen on end point: %+v", err)
return fmt.Errorf("could not listen on end point: %+v", err)
}
pss.WaitForSubscriptions()
@ -553,7 +554,7 @@ func TestPubOptionHWM(t *testing.T) {
msg := zmq4.NewMsgFrom([]byte("msg"), []byte(string(rune(i))))
err = pub.Send(msg)
if err != nil {
return xerrors.Errorf("error sending message. [%d] got: %v", i, err)
return fmt.Errorf("error sending message. [%d] got: %v", i, err)
}
}
@ -568,7 +569,7 @@ func TestPubOptionHWM(t *testing.T) {
var err error
err = sub.Dial(ep)
if err != nil {
return xerrors.Errorf("could not dial end point: %+v", err)
return fmt.Errorf("could not dial end point: %+v", err)
}
pss.DialComplete()
@ -576,7 +577,7 @@ func TestPubOptionHWM(t *testing.T) {
err = sub.SetOption(zmq4.OptionSubscribe, topic)
if err != nil {
return xerrors.Errorf("could not subscribe to topic %q: %w", topic, err)
return fmt.Errorf("could not subscribe to topic %q: %w", topic, err)
}
pss.SubscriptionComplete()
@ -592,13 +593,13 @@ func TestPubOptionHWM(t *testing.T) {
break
}
if err != nil {
return xerrors.Errorf("could not recv message: %v", err)
return fmt.Errorf("could not recv message: %v", err)
}
nmsgs++
}
if nmsgs >= msgCount {
return xerrors.Errorf("Expected dropped messages")
return fmt.Errorf("Expected dropped messages")
}
return err
@ -637,7 +638,7 @@ func BenchmarkPubSub(b *testing.B) {
var err error
err = pub.Listen(ep)
if err != nil {
return xerrors.Errorf("could not listen on end point: %+v", err)
return fmt.Errorf("could not listen on end point: %+v", err)
}
pss.WaitForSubscriptions()
@ -646,7 +647,7 @@ func BenchmarkPubSub(b *testing.B) {
for i := 0; i < msgCount; i++ {
err = pub.SendMulti(msg)
if err != nil {
return xerrors.Errorf("error sending message: %v\n", err)
return fmt.Errorf("error sending message: %v\n", err)
}
}
@ -657,7 +658,7 @@ func BenchmarkPubSub(b *testing.B) {
var err error
err = sub.Dial(ep)
if err != nil {
return xerrors.Errorf("could not dial end point: %+v", err)
return fmt.Errorf("could not dial end point: %+v", err)
}
pss.DialComplete()
@ -665,7 +666,7 @@ func BenchmarkPubSub(b *testing.B) {
err = sub.SetOption(zmq4.OptionSubscribe, topic)
if err != nil {
return xerrors.Errorf("could not subscribe to topic %q: %w", topic, err)
return fmt.Errorf("could not subscribe to topic %q: %w", topic, err)
}
pss.SubscriptionComplete()
@ -675,7 +676,7 @@ func BenchmarkPubSub(b *testing.B) {
for i := 0; i < msgCount; i++ {
msg, err := sub.Recv()
if err != nil {
return xerrors.Errorf("could not recv message: %v", err)
return fmt.Errorf("could not recv message: %v", err)
}
for _, frame := range msg.Frames {
siz += len(frame)

View file

@ -6,13 +6,13 @@ package zmq4_test
import (
"context"
"fmt"
"reflect"
"testing"
"time"
"github.com/go-zeromq/zmq4"
"golang.org/x/sync/errgroup"
"golang.org/x/xerrors"
)
var (
@ -74,21 +74,21 @@ func TestPushPull(t *testing.T) {
err := tc.push.Listen(ep)
if err != nil {
return xerrors.Errorf("could not listen: %w", err)
return fmt.Errorf("could not listen: %w", err)
}
if addr := tc.push.Addr(); addr == nil {
return xerrors.Errorf("listener with nil Addr")
return fmt.Errorf("listener with nil Addr")
}
err = tc.push.Send(hello)
if err != nil {
return xerrors.Errorf("could not send %v: %w", hello, err)
return fmt.Errorf("could not send %v: %w", hello, err)
}
err = tc.push.Send(bye)
if err != nil {
return xerrors.Errorf("could not send %v: %w", bye, err)
return fmt.Errorf("could not send %v: %w", bye, err)
}
return err
})
@ -96,29 +96,29 @@ func TestPushPull(t *testing.T) {
err := tc.pull.Dial(ep)
if err != nil {
return xerrors.Errorf("could not dial: %w", err)
return fmt.Errorf("could not dial: %w", err)
}
if addr := tc.pull.Addr(); addr != nil {
return xerrors.Errorf("dialer with non-nil Addr")
return fmt.Errorf("dialer with non-nil Addr")
}
msg, err := tc.pull.Recv()
if err != nil {
return xerrors.Errorf("could not recv %v: %w", hello, err)
return fmt.Errorf("could not recv %v: %w", hello, err)
}
if got, want := msg, hello; !reflect.DeepEqual(got, want) {
return xerrors.Errorf("recv1: got = %v, want= %v", got, want)
return fmt.Errorf("recv1: got = %v, want= %v", got, want)
}
msg, err = tc.pull.Recv()
if err != nil {
return xerrors.Errorf("could not recv %v: %w", bye, err)
return fmt.Errorf("could not recv %v: %w", bye, err)
}
if got, want := msg, bye; !reflect.DeepEqual(got, want) {
return xerrors.Errorf("recv2: got = %v, want= %v", got, want)
return fmt.Errorf("recv2: got = %v, want= %v", got, want)
}
return err

View file

@ -6,13 +6,13 @@ package zmq4_test
import (
"context"
"fmt"
"reflect"
"testing"
"time"
"github.com/go-zeromq/zmq4"
"golang.org/x/sync/errgroup"
"golang.org/x/xerrors"
)
var (
@ -79,18 +79,18 @@ func TestReqRep(t *testing.T) {
err := tc.rep.Listen(ep)
if err != nil {
return xerrors.Errorf("could not listen: %w", err)
return fmt.Errorf("could not listen: %w", err)
}
if addr := tc.rep.Addr(); addr == nil {
return xerrors.Errorf("listener with nil Addr")
return fmt.Errorf("listener with nil Addr")
}
loop := true
for loop {
msg, err := tc.rep.Recv()
if err != nil {
return xerrors.Errorf("could not recv REQ message: %w", err)
return fmt.Errorf("could not recv REQ message: %w", err)
}
var rep zmq4.Msg
switch string(msg.Frames[0]) {
@ -105,7 +105,7 @@ func TestReqRep(t *testing.T) {
err = tc.rep.Send(rep)
if err != nil {
return xerrors.Errorf("could not send REP message to %v: %w", msg, err)
return fmt.Errorf("could not send REP message to %v: %w", msg, err)
}
}
@ -115,11 +115,11 @@ func TestReqRep(t *testing.T) {
err := tc.req1.Dial(ep)
if err != nil {
return xerrors.Errorf("could not dial: %w", err)
return fmt.Errorf("could not dial: %w", err)
}
if addr := tc.req1.Addr(); addr != nil {
return xerrors.Errorf("dialer with non-nil Addr")
return fmt.Errorf("dialer with non-nil Addr")
}
for _, msg := range []struct {
@ -132,15 +132,15 @@ func TestReqRep(t *testing.T) {
} {
err = tc.req1.Send(msg.req)
if err != nil {
return xerrors.Errorf("could not send REQ message %v: %w", msg.req, err)
return fmt.Errorf("could not send REQ message %v: %w", msg.req, err)
}
rep, err := tc.req1.Recv()
if err != nil {
return xerrors.Errorf("could not recv REP message %v: %w", msg.req, err)
return fmt.Errorf("could not recv REP message %v: %w", msg.req, err)
}
if got, want := rep, msg.rep; !reflect.DeepEqual(got, want) {
return xerrors.Errorf("got = %v, want= %v", got, want)
return fmt.Errorf("got = %v, want= %v", got, want)
}
}
@ -215,18 +215,18 @@ func TestMultiReqRepIssue70(t *testing.T) {
grp.Go(func() error {
err := tc.rep.Listen(ep)
if err != nil {
return xerrors.Errorf("could not listen: %w", err)
return fmt.Errorf("could not listen: %w", err)
}
if addr := tc.rep.Addr(); addr == nil {
return xerrors.Errorf("listener with nil Addr")
return fmt.Errorf("listener with nil Addr")
}
loop1, loop2 := true, true
for loop1 || loop2 {
msg, err := tc.rep.Recv()
if err != nil {
return xerrors.Errorf("could not recv REQ message: %w", err)
return fmt.Errorf("could not recv REQ message: %w", err)
}
var rep zmq4.Msg
switch string(msg.Frames[0]) {
@ -248,7 +248,7 @@ func TestMultiReqRepIssue70(t *testing.T) {
err = tc.rep.Send(rep)
if err != nil {
return xerrors.Errorf("could not send REP message to %v: %w", msg, err)
return fmt.Errorf("could not send REP message to %v: %w", msg, err)
}
}
return err
@ -257,11 +257,11 @@ func TestMultiReqRepIssue70(t *testing.T) {
err := tc.req2.Dial(ep)
if err != nil {
return xerrors.Errorf("could not dial: %w", err)
return fmt.Errorf("could not dial: %w", err)
}
if addr := tc.req2.Addr(); addr != nil {
return xerrors.Errorf("dialer with non-nil Addr")
return fmt.Errorf("dialer with non-nil Addr")
}
for _, msg := range []struct {
@ -274,15 +274,15 @@ func TestMultiReqRepIssue70(t *testing.T) {
} {
err = tc.req2.Send(msg.req)
if err != nil {
return xerrors.Errorf("could not send REQ message %v: %w", msg.req, err)
return fmt.Errorf("could not send REQ message %v: %w", msg.req, err)
}
rep, err := tc.req2.Recv()
if err != nil {
return xerrors.Errorf("could not recv REP message %v: %w", msg.req, err)
return fmt.Errorf("could not recv REP message %v: %w", msg.req, err)
}
if got, want := rep, msg.rep; !reflect.DeepEqual(got, want) {
return xerrors.Errorf("got = %v, want= %v", got, want)
return fmt.Errorf("got = %v, want= %v", got, want)
}
}
return err
@ -291,11 +291,11 @@ func TestMultiReqRepIssue70(t *testing.T) {
err := tc.req1.Dial(ep)
if err != nil {
return xerrors.Errorf("could not dial: %w", err)
return fmt.Errorf("could not dial: %w", err)
}
if addr := tc.req1.Addr(); addr != nil {
return xerrors.Errorf("dialer with non-nil Addr")
return fmt.Errorf("dialer with non-nil Addr")
}
for _, msg := range []struct {
@ -308,15 +308,15 @@ func TestMultiReqRepIssue70(t *testing.T) {
} {
err = tc.req1.Send(msg.req)
if err != nil {
return xerrors.Errorf("could not send REQ message %v: %w", msg.req, err)
return fmt.Errorf("could not send REQ message %v: %w", msg.req, err)
}
rep, err := tc.req1.Recv()
if err != nil {
return xerrors.Errorf("could not recv REP message %v: %w", msg.req, err)
return fmt.Errorf("could not recv REP message %v: %w", msg.req, err)
}
if got, want := rep, msg.rep; !reflect.DeepEqual(got, want) {
return xerrors.Errorf("got = %v, want= %v", got, want)
return fmt.Errorf("got = %v, want= %v", got, want)
}
}
return err

View file

@ -16,7 +16,6 @@ import (
"github.com/go-zeromq/zmq4"
"golang.org/x/sync/errgroup"
"golang.org/x/xerrors"
)
var (
@ -131,11 +130,11 @@ func TestRouterDealer(t *testing.T) {
err := router.Listen(ep)
if err != nil {
return xerrors.Errorf("could not listen: %w", err)
return fmt.Errorf("could not listen: %w", err)
}
if addr := router.Addr(); addr == nil {
return xerrors.Errorf("listener with nil Addr")
return fmt.Errorf("listener with nil Addr")
}
wgd.Wait()
@ -146,14 +145,14 @@ func TestRouterDealer(t *testing.T) {
for i := 0; i < len(dealers)*N+1 && fired < N; i++ {
msg, err := router.Recv()
if err != nil {
return xerrors.Errorf("could not recv message: %w", err)
return fmt.Errorf("could not recv message: %w", err)
}
if len(msg.Frames) == 0 {
seenMu.RLock()
str := fmt.Sprintf("%v", seen)
seenMu.RUnlock()
return xerrors.Errorf("router received empty message (test=%q, iter=%d, seen=%v)", tc.name, i, str)
return fmt.Errorf("router received empty message (test=%q, iter=%d, seen=%v)", tc.name, i, str)
}
id := string(msg.Frames[0])
seenMu.Lock()
@ -169,11 +168,11 @@ func TestRouterDealer(t *testing.T) {
}
err = router.Send(msg)
if err != nil {
return xerrors.Errorf("could not send %v: %w", msg, err)
return fmt.Errorf("could not send %v: %w", msg, err)
}
}
if fired != N {
return xerrors.Errorf("did not fire everybody (fired=%d, want=%d)", fired, N)
return fmt.Errorf("did not fire everybody (fired=%d, want=%d)", fired, N)
}
return nil
})
@ -183,11 +182,11 @@ func TestRouterDealer(t *testing.T) {
err := dealer.Dial(ep)
if err != nil {
return xerrors.Errorf("could not dial: %w", err)
return fmt.Errorf("could not dial: %w", err)
}
if addr := dealer.Addr(); addr != nil {
return xerrors.Errorf("dialer with non-nil Addr")
return fmt.Errorf("dialer with non-nil Addr")
}
wgd.Done()
@ -200,19 +199,19 @@ func TestRouterDealer(t *testing.T) {
// tell the broker we are ready for work
err = dealer.Send(ready)
if err != nil {
return xerrors.Errorf("could not send %v: %w", ready, err)
return fmt.Errorf("could not send %v: %w", ready, err)
}
// get workload from broker
msg, err := dealer.Recv()
if err != nil {
return xerrors.Errorf("could not recv msg: %w", err)
return fmt.Errorf("could not recv msg: %w", err)
}
if len(msg.Frames) < 2 {
seenMu.RLock()
str := fmt.Sprintf("%v", seen)
seenMu.RUnlock()
return xerrors.Errorf("dealer-%d received invalid msg %v (test=%q, iter=%d, seen=%v)", idealer, msg, tc.name, n, str)
return fmt.Errorf("dealer-%d received invalid msg %v (test=%q, iter=%d, seen=%v)", idealer, msg, tc.name, n, str)
}
work := msg.Frames[1]
fired[idealer]++

View file

@ -6,6 +6,7 @@ package zmq4_test
import (
"context"
"fmt"
"reflect"
"sync"
"testing"
@ -13,7 +14,6 @@ import (
"github.com/go-zeromq/zmq4"
"golang.org/x/sync/errgroup"
"golang.org/x/xerrors"
)
var (
@ -101,11 +101,11 @@ func TestXPubSub(t *testing.T) {
err := tc.xpub.Listen(ep)
if err != nil {
return xerrors.Errorf("could not listen: %w", err)
return fmt.Errorf("could not listen: %w", err)
}
if addr := tc.xpub.Addr(); addr == nil {
return xerrors.Errorf("listener with nil Addr")
return fmt.Errorf("listener with nil Addr")
}
wg1.Wait()
@ -123,7 +123,7 @@ func TestXPubSub(t *testing.T) {
for _, msg := range msgs[0] {
err = tc.xpub.Send(msg)
if err != nil {
return xerrors.Errorf("could not send message %v: %w", msg, err)
return fmt.Errorf("could not send message %v: %w", msg, err)
}
}
@ -136,11 +136,11 @@ func TestXPubSub(t *testing.T) {
var err error
err = sub.Dial(ep)
if err != nil {
return xerrors.Errorf("could not dial: %w", err)
return fmt.Errorf("could not dial: %w", err)
}
if addr := sub.Addr(); addr != nil {
return xerrors.Errorf("dialer with non-nil Addr")
return fmt.Errorf("dialer with non-nil Addr")
}
wg1.Done()
@ -148,7 +148,7 @@ func TestXPubSub(t *testing.T) {
err = sub.SetOption(zmq4.OptionSubscribe, topics[isub])
if err != nil {
return xerrors.Errorf("could not subscribe to topic %q: %w", topics[isub], err)
return fmt.Errorf("could not subscribe to topic %q: %w", topics[isub], err)
}
wg2.Done()
@ -158,10 +158,10 @@ func TestXPubSub(t *testing.T) {
for imsg, want := range msgs {
msg, err := sub.Recv()
if err != nil {
return xerrors.Errorf("could not recv message %v: %w", want, err)
return fmt.Errorf("could not recv message %v: %w", want, err)
}
if !reflect.DeepEqual(msg, want) {
return xerrors.Errorf("sub[%d][msg=%d]: got = %v, want= %v", isub, imsg, msg, want)
return fmt.Errorf("sub[%d][msg=%d]: got = %v, want= %v", isub, imsg, msg, want)
}
nmsgs[isub]++
}