Remove unnecessary github.com/go-zeromq/goczmq/v4 dependency
This commit is contained in:
parent
e75c615ba1
commit
6f01655832
2
.github/workflows/ci.yml
vendored
2
.github/workflows/ci.yml
vendored
|
@ -7,7 +7,7 @@ on:
|
|||
branches: [main]
|
||||
|
||||
env:
|
||||
TAGS: "-tags=czmq4"
|
||||
TAGS: ""
|
||||
|
||||
jobs:
|
||||
main:
|
||||
|
|
|
@ -1,219 +0,0 @@
|
|||
// Copyright 2018 The go-zeromq Authors. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
//go:build czmq4
|
||||
// +build czmq4
|
||||
|
||||
package zmq4
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net"
|
||||
"strings"
|
||||
|
||||
czmq4 "github.com/go-zeromq/goczmq/v4"
|
||||
)
|
||||
|
||||
func NewCPair(ctx context.Context, opts ...czmq4.SockOption) Socket {
|
||||
return newCSocket(czmq4.Pair, opts...)
|
||||
}
|
||||
|
||||
func NewCPub(ctx context.Context, opts ...czmq4.SockOption) Socket {
|
||||
return newCSocket(czmq4.Pub, opts...)
|
||||
}
|
||||
|
||||
func NewCSub(ctx context.Context, opts ...czmq4.SockOption) Socket {
|
||||
return newCSocket(czmq4.Sub, opts...)
|
||||
}
|
||||
|
||||
func NewCReq(ctx context.Context, opts ...czmq4.SockOption) Socket {
|
||||
return newCSocket(czmq4.Req, opts...)
|
||||
}
|
||||
|
||||
func NewCRep(ctx context.Context, opts ...czmq4.SockOption) Socket {
|
||||
return newCSocket(czmq4.Rep, opts...)
|
||||
}
|
||||
|
||||
func NewCDealer(ctx context.Context, opts ...czmq4.SockOption) Socket {
|
||||
return newCSocket(czmq4.Dealer, opts...)
|
||||
}
|
||||
|
||||
func NewCRouter(ctx context.Context, opts ...czmq4.SockOption) Socket {
|
||||
return newCSocket(czmq4.Router, opts...)
|
||||
}
|
||||
|
||||
func NewCPull(ctx context.Context, opts ...czmq4.SockOption) Socket {
|
||||
return newCSocket(czmq4.Pull, opts...)
|
||||
}
|
||||
|
||||
func NewCPush(ctx context.Context, opts ...czmq4.SockOption) Socket {
|
||||
return newCSocket(czmq4.Push, opts...)
|
||||
}
|
||||
|
||||
func NewCXPub(ctx context.Context, opts ...czmq4.SockOption) Socket {
|
||||
return newCSocket(czmq4.XPub, opts...)
|
||||
}
|
||||
|
||||
func NewCXSub(ctx context.Context, opts ...czmq4.SockOption) Socket {
|
||||
return newCSocket(czmq4.XSub, opts...)
|
||||
}
|
||||
|
||||
type csocket struct {
|
||||
sock *czmq4.Sock
|
||||
addr net.Addr
|
||||
}
|
||||
|
||||
func newCSocket(ctyp int, opts ...czmq4.SockOption) *csocket {
|
||||
sck := &csocket{sock: czmq4.NewSock(ctyp)}
|
||||
for _, opt := range opts {
|
||||
opt(sck.sock)
|
||||
}
|
||||
return sck
|
||||
}
|
||||
|
||||
func (sck *csocket) Close() error {
|
||||
sck.sock.Destroy()
|
||||
return nil
|
||||
}
|
||||
|
||||
// Send puts the message on the outbound send queue.
|
||||
// Send blocks until the message can be queued or the send deadline expires.
|
||||
func (sck *csocket) Send(msg Msg) error {
|
||||
return sck.sock.SendMessage(msg.Frames)
|
||||
}
|
||||
|
||||
// SendMulti puts the message on the outbound send queue.
|
||||
// SendMulti blocks until the message can be queued or the send deadline expires.
|
||||
// The message will be sent as a multipart message.
|
||||
func (sck *csocket) SendMulti(msg Msg) error {
|
||||
return sck.sock.SendMessage(msg.Frames)
|
||||
}
|
||||
|
||||
// Recv receives a complete message.
|
||||
func (sck *csocket) Recv() (Msg, error) {
|
||||
frames, err := sck.sock.RecvMessage()
|
||||
return Msg{Frames: frames}, err
|
||||
}
|
||||
|
||||
// Listen connects a local endpoint to the Socket.
|
||||
func (sck *csocket) Listen(addr string) error {
|
||||
port, err := sck.sock.Bind(addr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
sck.addr = netAddrFrom(port, addr)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Dial connects a remote endpoint to the Socket.
|
||||
func (sck *csocket) Dial(addr string) error {
|
||||
return sck.sock.Connect(addr)
|
||||
}
|
||||
|
||||
// Type returns the type of this Socket (PUB, SUB, ...)
|
||||
func (sck *csocket) Type() SocketType {
|
||||
switch sck.sock.GetType() {
|
||||
case czmq4.Pair:
|
||||
return Pair
|
||||
case czmq4.Pub:
|
||||
return Pub
|
||||
case czmq4.Sub:
|
||||
return Sub
|
||||
case czmq4.Req:
|
||||
return Req
|
||||
case czmq4.Rep:
|
||||
return Rep
|
||||
case czmq4.Dealer:
|
||||
return Dealer
|
||||
case czmq4.Router:
|
||||
return Router
|
||||
case czmq4.Pull:
|
||||
return Pull
|
||||
case czmq4.Push:
|
||||
return Push
|
||||
case czmq4.XPub:
|
||||
return XPub
|
||||
case czmq4.XSub:
|
||||
return XSub
|
||||
}
|
||||
panic("invalid C-socket type")
|
||||
}
|
||||
|
||||
// Addr returns the listener's address.
|
||||
// Addr returns nil if the socket isn't a listener.
|
||||
func (sck *csocket) Addr() net.Addr {
|
||||
return sck.addr
|
||||
}
|
||||
|
||||
// Conn returns the underlying net.Conn the socket is bound to.
|
||||
func (sck *csocket) Conn() net.Conn {
|
||||
panic("not implemented")
|
||||
}
|
||||
|
||||
// GetOption is used to retrieve an option for a socket.
|
||||
func (sck *csocket) GetOption(name string) (interface{}, error) {
|
||||
panic("not implemented")
|
||||
}
|
||||
|
||||
// SetOption is used to set an option for a socket.
|
||||
func (sck *csocket) SetOption(name string, value interface{}) error {
|
||||
switch name {
|
||||
case OptionSubscribe:
|
||||
topic := value.(string)
|
||||
sck.sock.SetOption(czmq4.SockSetSubscribe(topic))
|
||||
return nil
|
||||
case OptionUnsubscribe:
|
||||
topic := value.(string)
|
||||
sck.sock.SetOption(czmq4.SockSetUnsubscribe(topic))
|
||||
return nil
|
||||
default:
|
||||
panic("unknown set option name [" + name + "]")
|
||||
}
|
||||
panic("not implemented")
|
||||
}
|
||||
|
||||
// CWithID configures a ZeroMQ socket identity.
|
||||
func CWithID(id SocketIdentity) czmq4.SockOption {
|
||||
return czmq4.SockSetIdentity(string(id))
|
||||
}
|
||||
|
||||
func netAddrFrom(port int, ep string) net.Addr {
|
||||
network, addr, err := splitAddr(ep)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
switch network {
|
||||
case "ipc":
|
||||
network = "unix"
|
||||
case "tcp":
|
||||
network = "tcp"
|
||||
case "udp":
|
||||
network = "udp"
|
||||
case "inproc":
|
||||
network = "inproc"
|
||||
default:
|
||||
panic("zmq4: unknown protocol [" + network + "]")
|
||||
}
|
||||
if idx := strings.Index(addr, ":"); idx != -1 {
|
||||
addr = string(addr[:idx])
|
||||
}
|
||||
return caddr{host: addr, port: fmt.Sprintf("%d", port), net: network}
|
||||
}
|
||||
|
||||
type caddr struct {
|
||||
host string
|
||||
port string
|
||||
net string
|
||||
}
|
||||
|
||||
func (addr caddr) Network() string { return addr.net }
|
||||
func (addr caddr) String() string {
|
||||
return addr.host + ":" + addr.port
|
||||
}
|
||||
|
||||
var (
|
||||
_ Socket = (*csocket)(nil)
|
||||
_ net.Addr = (*caddr)(nil)
|
||||
)
|
418
czmq4_test.go
418
czmq4_test.go
|
@ -1,418 +0,0 @@
|
|||
// Copyright 2018 The go-zeromq Authors. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
//go:build czmq4
|
||||
// +build czmq4
|
||||
|
||||
package zmq4_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/go-zeromq/zmq4"
|
||||
)
|
||||
|
||||
var (
|
||||
cpushpulls = []testCasePushPull{
|
||||
{
|
||||
name: "tcp-cpush-pull",
|
||||
endpoint: must(EndPoint("tcp")),
|
||||
push: zmq4.NewCPush(bkg),
|
||||
pull: zmq4.NewPull(bkg),
|
||||
},
|
||||
{
|
||||
name: "tcp-push-cpull",
|
||||
endpoint: must(EndPoint("tcp")),
|
||||
push: zmq4.NewPush(bkg),
|
||||
pull: zmq4.NewCPull(bkg),
|
||||
},
|
||||
{
|
||||
name: "tcp-cpush-cpull",
|
||||
endpoint: must(EndPoint("tcp")),
|
||||
push: zmq4.NewCPush(bkg),
|
||||
pull: zmq4.NewCPull(bkg),
|
||||
},
|
||||
{
|
||||
name: "ipc-cpush-pull",
|
||||
endpoint: "ipc://ipc-cpush-pull",
|
||||
push: zmq4.NewCPush(bkg),
|
||||
pull: zmq4.NewPull(bkg),
|
||||
},
|
||||
{
|
||||
name: "ipc-push-cpull",
|
||||
endpoint: "ipc://ipc-push-cpull",
|
||||
push: zmq4.NewPush(bkg),
|
||||
pull: zmq4.NewCPull(bkg),
|
||||
},
|
||||
{
|
||||
name: "ipc-cpush-cpull",
|
||||
endpoint: "ipc://ipc-cpush-cpull",
|
||||
push: zmq4.NewCPush(bkg),
|
||||
pull: zmq4.NewCPull(bkg),
|
||||
},
|
||||
//{
|
||||
// name: "udp-cpush-cpull",
|
||||
// endpoint: "udp://127.0.0.1:55555",
|
||||
// push: zmq4.NewCPush(),
|
||||
// pull: zmq4.NewCPull(),
|
||||
//},
|
||||
{
|
||||
name: "inproc-cpush-cpull",
|
||||
endpoint: "inproc://cpush-cpull",
|
||||
push: zmq4.NewCPush(bkg),
|
||||
pull: zmq4.NewCPull(bkg),
|
||||
},
|
||||
}
|
||||
|
||||
creqreps = []testCaseReqRep{
|
||||
{
|
||||
name: "tcp-creq-rep",
|
||||
endpoint: must(EndPoint("tcp")),
|
||||
req1: zmq4.NewCReq(bkg),
|
||||
rep: zmq4.NewRep(bkg),
|
||||
},
|
||||
{
|
||||
name: "tcp-req-crep",
|
||||
endpoint: must(EndPoint("tcp")),
|
||||
req1: zmq4.NewReq(bkg),
|
||||
rep: zmq4.NewCRep(bkg),
|
||||
},
|
||||
{
|
||||
name: "tcp-creq-crep",
|
||||
endpoint: must(EndPoint("tcp")),
|
||||
req1: zmq4.NewCReq(bkg),
|
||||
rep: zmq4.NewCRep(bkg),
|
||||
},
|
||||
{
|
||||
name: "ipc-creq-rep",
|
||||
endpoint: "ipc://ipc-creq-rep",
|
||||
req1: zmq4.NewCReq(bkg),
|
||||
rep: zmq4.NewRep(bkg),
|
||||
},
|
||||
{
|
||||
name: "ipc-req-crep",
|
||||
endpoint: "ipc://ipc-req-crep",
|
||||
req1: zmq4.NewReq(bkg),
|
||||
rep: zmq4.NewCRep(bkg),
|
||||
},
|
||||
{
|
||||
name: "ipc-creq-crep",
|
||||
endpoint: "ipc://ipc-creq-crep",
|
||||
req1: zmq4.NewCReq(bkg),
|
||||
rep: zmq4.NewCRep(bkg),
|
||||
},
|
||||
{
|
||||
name: "inproc-creq-crep",
|
||||
endpoint: "inproc://inproc-creq-crep",
|
||||
req1: zmq4.NewCReq(bkg),
|
||||
rep: zmq4.NewCRep(bkg),
|
||||
},
|
||||
}
|
||||
|
||||
cpubsubs = []testCasePubSub{
|
||||
{
|
||||
name: "tcp-cpub-sub",
|
||||
endpoint: must(EndPoint("tcp")),
|
||||
pub: zmq4.NewCPub(bkg),
|
||||
sub0: zmq4.NewSub(bkg),
|
||||
sub1: zmq4.NewSub(bkg),
|
||||
sub2: zmq4.NewSub(bkg),
|
||||
},
|
||||
{
|
||||
name: "tcp-pub-csub",
|
||||
endpoint: must(EndPoint("tcp")),
|
||||
pub: zmq4.NewPub(bkg),
|
||||
sub0: zmq4.NewCSub(bkg),
|
||||
sub1: zmq4.NewCSub(bkg),
|
||||
sub2: zmq4.NewCSub(bkg),
|
||||
},
|
||||
{
|
||||
name: "tcp-cpub-csub",
|
||||
endpoint: must(EndPoint("tcp")),
|
||||
pub: zmq4.NewCPub(bkg),
|
||||
sub0: zmq4.NewCSub(bkg),
|
||||
sub1: zmq4.NewCSub(bkg),
|
||||
sub2: zmq4.NewCSub(bkg),
|
||||
},
|
||||
{
|
||||
name: "ipc-cpub-sub",
|
||||
endpoint: "ipc://ipc-cpub-sub",
|
||||
pub: zmq4.NewCPub(bkg),
|
||||
sub0: zmq4.NewSub(bkg),
|
||||
sub1: zmq4.NewSub(bkg),
|
||||
sub2: zmq4.NewSub(bkg),
|
||||
},
|
||||
{
|
||||
name: "ipc-pub-csub",
|
||||
endpoint: "ipc://ipc-pub-csub",
|
||||
pub: zmq4.NewPub(bkg),
|
||||
sub0: zmq4.NewCSub(bkg),
|
||||
sub1: zmq4.NewCSub(bkg),
|
||||
sub2: zmq4.NewCSub(bkg),
|
||||
},
|
||||
{
|
||||
name: "ipc-cpub-csub",
|
||||
endpoint: "ipc://ipc-cpub-csub",
|
||||
pub: zmq4.NewCPub(bkg),
|
||||
sub0: zmq4.NewCSub(bkg),
|
||||
sub1: zmq4.NewCSub(bkg),
|
||||
sub2: zmq4.NewCSub(bkg),
|
||||
},
|
||||
{
|
||||
name: "inproc-cpub-csub",
|
||||
endpoint: "inproc://inproc-cpub-csub",
|
||||
pub: zmq4.NewCPub(bkg),
|
||||
sub0: zmq4.NewCSub(bkg),
|
||||
sub1: zmq4.NewCSub(bkg),
|
||||
sub2: zmq4.NewCSub(bkg),
|
||||
},
|
||||
}
|
||||
|
||||
cxpubsubs = []testCaseXPubSub{
|
||||
{
|
||||
name: "tcp-cxpub-sub",
|
||||
endpoint: must(EndPoint("tcp")),
|
||||
xpub: zmq4.NewCXPub(bkg),
|
||||
sub0: zmq4.NewSub(bkg),
|
||||
sub1: zmq4.NewSub(bkg),
|
||||
sub2: zmq4.NewSub(bkg),
|
||||
},
|
||||
{
|
||||
name: "tcp-xpub-csub",
|
||||
endpoint: must(EndPoint("tcp")),
|
||||
xpub: zmq4.NewXPub(bkg),
|
||||
sub0: zmq4.NewCSub(bkg),
|
||||
sub1: zmq4.NewCSub(bkg),
|
||||
sub2: zmq4.NewCSub(bkg),
|
||||
},
|
||||
{
|
||||
name: "tcp-cxpub-csub",
|
||||
endpoint: must(EndPoint("tcp")),
|
||||
xpub: zmq4.NewCXPub(bkg),
|
||||
sub0: zmq4.NewCSub(bkg),
|
||||
sub1: zmq4.NewCSub(bkg),
|
||||
sub2: zmq4.NewCSub(bkg),
|
||||
},
|
||||
{
|
||||
name: "ipc-cxpub-sub",
|
||||
endpoint: "ipc://ipc-cxpub-sub",
|
||||
xpub: zmq4.NewCXPub(bkg),
|
||||
sub0: zmq4.NewSub(bkg),
|
||||
sub1: zmq4.NewSub(bkg),
|
||||
sub2: zmq4.NewSub(bkg),
|
||||
},
|
||||
{
|
||||
name: "ipc-xpub-csub",
|
||||
endpoint: "ipc://ipc-xpub-csub",
|
||||
xpub: zmq4.NewXPub(bkg),
|
||||
sub0: zmq4.NewCSub(bkg),
|
||||
sub1: zmq4.NewCSub(bkg),
|
||||
sub2: zmq4.NewCSub(bkg),
|
||||
},
|
||||
{
|
||||
name: "ipc-cxpub-csub",
|
||||
endpoint: "ipc://ipc-cxpub-csub",
|
||||
xpub: zmq4.NewCXPub(bkg),
|
||||
sub0: zmq4.NewCSub(bkg),
|
||||
sub1: zmq4.NewCSub(bkg),
|
||||
sub2: zmq4.NewCSub(bkg),
|
||||
},
|
||||
{
|
||||
name: "inproc-cxpub-csub",
|
||||
endpoint: "inproc://inproc-cxpub-csub",
|
||||
xpub: zmq4.NewCXPub(bkg),
|
||||
sub0: zmq4.NewCSub(bkg),
|
||||
sub1: zmq4.NewCSub(bkg),
|
||||
sub2: zmq4.NewCSub(bkg),
|
||||
},
|
||||
}
|
||||
|
||||
crouterdealers = []testCaseRouterDealer{
|
||||
{
|
||||
name: "tcp-router-cdealer",
|
||||
skip: true,
|
||||
endpoint: func() string { return must(EndPoint("tcp")) },
|
||||
router: func(ctx context.Context) zmq4.Socket {
|
||||
return zmq4.NewRouter(ctx, zmq4.WithID(zmq4.SocketIdentity("router")))
|
||||
},
|
||||
dealer0: func(ctx context.Context) zmq4.Socket {
|
||||
return zmq4.NewCDealer(ctx, zmq4.CWithID(zmq4.SocketIdentity("dealer-0")))
|
||||
},
|
||||
dealer1: func(ctx context.Context) zmq4.Socket {
|
||||
return zmq4.NewCDealer(ctx, zmq4.CWithID(zmq4.SocketIdentity("dealer-1")))
|
||||
},
|
||||
dealer2: func(ctx context.Context) zmq4.Socket {
|
||||
return zmq4.NewCDealer(ctx, zmq4.CWithID(zmq4.SocketIdentity("dealer-2")))
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "tcp-crouter-dealer",
|
||||
skip: true,
|
||||
endpoint: func() string { return must(EndPoint("tcp")) },
|
||||
router: func(ctx context.Context) zmq4.Socket {
|
||||
return zmq4.NewCRouter(ctx, zmq4.CWithID(zmq4.SocketIdentity("router")))
|
||||
},
|
||||
dealer0: func(ctx context.Context) zmq4.Socket {
|
||||
return zmq4.NewDealer(ctx, zmq4.WithID(zmq4.SocketIdentity("dealer-0")))
|
||||
},
|
||||
dealer1: func(ctx context.Context) zmq4.Socket {
|
||||
return zmq4.NewDealer(ctx, zmq4.WithID(zmq4.SocketIdentity("dealer-1")))
|
||||
},
|
||||
dealer2: func(ctx context.Context) zmq4.Socket {
|
||||
return zmq4.NewDealer(ctx, zmq4.WithID(zmq4.SocketIdentity("dealer-2")))
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "tcp-crouter-cdealer",
|
||||
skip: true,
|
||||
endpoint: func() string { return must(EndPoint("tcp")) },
|
||||
router: func(ctx context.Context) zmq4.Socket {
|
||||
return zmq4.NewCRouter(ctx, zmq4.CWithID(zmq4.SocketIdentity("router")))
|
||||
},
|
||||
dealer0: func(ctx context.Context) zmq4.Socket {
|
||||
return zmq4.NewCDealer(ctx, zmq4.CWithID(zmq4.SocketIdentity("dealer-0")))
|
||||
},
|
||||
dealer1: func(ctx context.Context) zmq4.Socket {
|
||||
return zmq4.NewCDealer(ctx, zmq4.CWithID(zmq4.SocketIdentity("dealer-1")))
|
||||
},
|
||||
dealer2: func(ctx context.Context) zmq4.Socket {
|
||||
return zmq4.NewCDealer(ctx, zmq4.CWithID(zmq4.SocketIdentity("dealer-2")))
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "ipc-router-cdealer",
|
||||
skip: true,
|
||||
endpoint: func() string { return "ipc://ipc-router-cdealer" },
|
||||
router: func(ctx context.Context) zmq4.Socket {
|
||||
return zmq4.NewRouter(ctx, zmq4.WithID(zmq4.SocketIdentity("router")))
|
||||
},
|
||||
dealer0: func(ctx context.Context) zmq4.Socket {
|
||||
return zmq4.NewCDealer(ctx, zmq4.CWithID(zmq4.SocketIdentity("dealer-0")))
|
||||
},
|
||||
dealer1: func(ctx context.Context) zmq4.Socket {
|
||||
return zmq4.NewCDealer(ctx, zmq4.CWithID(zmq4.SocketIdentity("dealer-1")))
|
||||
},
|
||||
dealer2: func(ctx context.Context) zmq4.Socket {
|
||||
return zmq4.NewCDealer(ctx, zmq4.CWithID(zmq4.SocketIdentity("dealer-2")))
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "ipc-crouter-dealer",
|
||||
skip: true,
|
||||
endpoint: func() string { return "ipc://crouter-dealer" },
|
||||
router: func(ctx context.Context) zmq4.Socket {
|
||||
return zmq4.NewCRouter(ctx, zmq4.CWithID(zmq4.SocketIdentity("router")))
|
||||
},
|
||||
dealer0: func(ctx context.Context) zmq4.Socket {
|
||||
return zmq4.NewDealer(ctx, zmq4.WithID(zmq4.SocketIdentity("dealer-0")))
|
||||
},
|
||||
dealer1: func(ctx context.Context) zmq4.Socket {
|
||||
return zmq4.NewDealer(ctx, zmq4.WithID(zmq4.SocketIdentity("dealer-1")))
|
||||
},
|
||||
dealer2: func(ctx context.Context) zmq4.Socket {
|
||||
return zmq4.NewDealer(ctx, zmq4.WithID(zmq4.SocketIdentity("dealer-2")))
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "ipc-crouter-cdealer",
|
||||
skip: true,
|
||||
endpoint: func() string { return "ipc://crouter-cdealer" },
|
||||
router: func(ctx context.Context) zmq4.Socket {
|
||||
return zmq4.NewCRouter(ctx, zmq4.CWithID(zmq4.SocketIdentity("router")))
|
||||
},
|
||||
dealer0: func(ctx context.Context) zmq4.Socket {
|
||||
return zmq4.NewCDealer(ctx, zmq4.CWithID(zmq4.SocketIdentity("dealer-0")))
|
||||
},
|
||||
dealer1: func(ctx context.Context) zmq4.Socket {
|
||||
return zmq4.NewCDealer(ctx, zmq4.CWithID(zmq4.SocketIdentity("dealer-1")))
|
||||
},
|
||||
dealer2: func(ctx context.Context) zmq4.Socket {
|
||||
return zmq4.NewCDealer(ctx, zmq4.CWithID(zmq4.SocketIdentity("dealer-2")))
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "inproc-crouter-cdealer",
|
||||
skip: true,
|
||||
endpoint: func() string { return "inproc://crouter-cdealer" },
|
||||
router: func(ctx context.Context) zmq4.Socket {
|
||||
return zmq4.NewCRouter(ctx, zmq4.CWithID(zmq4.SocketIdentity("router")))
|
||||
},
|
||||
dealer0: func(ctx context.Context) zmq4.Socket {
|
||||
return zmq4.NewCDealer(ctx, zmq4.CWithID(zmq4.SocketIdentity("dealer-0")))
|
||||
},
|
||||
dealer1: func(ctx context.Context) zmq4.Socket {
|
||||
return zmq4.NewCDealer(ctx, zmq4.CWithID(zmq4.SocketIdentity("dealer-1")))
|
||||
},
|
||||
dealer2: func(ctx context.Context) zmq4.Socket {
|
||||
return zmq4.NewCDealer(ctx, zmq4.CWithID(zmq4.SocketIdentity("dealer-2")))
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
cpairs = []testCasePair{
|
||||
{
|
||||
name: "tcp-cpair-pair",
|
||||
endpoint: must(EndPoint("tcp")),
|
||||
srv: zmq4.NewCPair(bkg),
|
||||
cli: zmq4.NewPair(bkg),
|
||||
},
|
||||
{
|
||||
name: "tcp-pair-cpair",
|
||||
endpoint: must(EndPoint("tcp")),
|
||||
srv: zmq4.NewPair(bkg),
|
||||
cli: zmq4.NewCPair(bkg),
|
||||
},
|
||||
{
|
||||
name: "tcp-cpair-cpair",
|
||||
endpoint: must(EndPoint("tcp")),
|
||||
srv: zmq4.NewCPair(bkg),
|
||||
cli: zmq4.NewCPair(bkg),
|
||||
},
|
||||
{
|
||||
name: "ipc-cpair-pair",
|
||||
endpoint: "ipc://ipc-cpair-pair",
|
||||
srv: zmq4.NewCPair(bkg),
|
||||
cli: zmq4.NewPair(bkg),
|
||||
},
|
||||
{
|
||||
name: "ipc-pair-cpair",
|
||||
endpoint: "ipc://ipc-pair-cpair",
|
||||
srv: zmq4.NewPair(bkg),
|
||||
cli: zmq4.NewCPair(bkg),
|
||||
},
|
||||
{
|
||||
name: "ipc-cpair-cpair",
|
||||
endpoint: "ipc://ipc-cpair-cpair",
|
||||
srv: zmq4.NewCPair(bkg),
|
||||
cli: zmq4.NewCPair(bkg),
|
||||
},
|
||||
// { // FIXME(sbinet)
|
||||
// name: "inproc-cpair-pair",
|
||||
// endpoint: "inproc://inproc-cpair-pair",
|
||||
// srv: zmq4.NewCPair(bkg),
|
||||
// cli: zmq4.NewPair(bkg),
|
||||
// },
|
||||
// { // FIXME(sbinet)
|
||||
// name: "inproc-pair-cpair",
|
||||
// endpoint: "inproc://inproc-pair-cpair",
|
||||
// srv: zmq4.NewPair(bkg),
|
||||
// cli: zmq4.NewCPair(bkg),
|
||||
// },
|
||||
{
|
||||
name: "inproc-cpair-cpair",
|
||||
endpoint: "inproc://inproc-cpair-cpair",
|
||||
srv: zmq4.NewCPair(bkg),
|
||||
cli: zmq4.NewCPair(bkg),
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
func init() {
|
||||
pushpulls = append(pushpulls, cpushpulls...)
|
||||
reqreps = append(reqreps, creqreps...)
|
||||
pairs = append(pairs, cpairs...)
|
||||
pubsubs = append(pubsubs, cpubsubs...)
|
||||
xpubsubs = append(xpubsubs, cxpubsubs...)
|
||||
routerdealers = append(routerdealers, crouterdealers...)
|
||||
}
|
1
go.mod
1
go.mod
|
@ -3,7 +3,6 @@ module github.com/go-zeromq/zmq4
|
|||
go 1.20
|
||||
|
||||
require (
|
||||
github.com/go-zeromq/goczmq/v4 v4.2.2
|
||||
golang.org/x/sync v0.3.0
|
||||
golang.org/x/text v0.13.0
|
||||
)
|
||||
|
|
2
go.sum
2
go.sum
|
@ -1,5 +1,3 @@
|
|||
github.com/go-zeromq/goczmq/v4 v4.2.2 h1:HAJN+i+3NW55ijMJJhk7oWxHKXgAuSBkoFfvr8bYj4U=
|
||||
github.com/go-zeromq/goczmq/v4 v4.2.2/go.mod h1:Sm/lxrfxP/Oxqs0tnHD6WAhwkWrx+S+1MRrKzcxoaYE=
|
||||
golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E=
|
||||
golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y=
|
||||
golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k=
|
||||
|
|
|
@ -1,256 +0,0 @@
|
|||
// Copyright 2018 The go-zeromq Authors. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
//go:build czmq4
|
||||
// +build czmq4
|
||||
|
||||
package plain_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
czmq4 "github.com/go-zeromq/goczmq/v4"
|
||||
"github.com/go-zeromq/zmq4"
|
||||
"github.com/go-zeromq/zmq4/security/plain"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
var (
|
||||
repQuit = zmq4.NewMsgString("bye")
|
||||
)
|
||||
|
||||
func TestMain(m *testing.M) {
|
||||
auth := czmq4.NewAuth()
|
||||
|
||||
err := auth.Allow("127.0.0.1")
|
||||
if err != nil {
|
||||
auth.Destroy()
|
||||
panic(err)
|
||||
}
|
||||
|
||||
err = auth.Plain("./testdata/password.txt")
|
||||
if err != nil {
|
||||
auth.Destroy()
|
||||
panic(err)
|
||||
}
|
||||
|
||||
// call flag.Parse() here if TestMain uses flags
|
||||
|
||||
exit := m.Run()
|
||||
|
||||
auth.Destroy()
|
||||
os.Exit(exit)
|
||||
}
|
||||
|
||||
func TestHandshakeReqCRep(t *testing.T) {
|
||||
t.Skipf("REQ-CREP")
|
||||
|
||||
sec := plain.Security("user", "secret")
|
||||
if got, want := sec.Type(), zmq4.PlainSecurity; got != want {
|
||||
t.Fatalf("got=%v, want=%v", got, want)
|
||||
}
|
||||
|
||||
ctx, timeout := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
defer timeout()
|
||||
|
||||
ep := must(EndPoint("tcp"))
|
||||
|
||||
req := zmq4.NewReq(ctx, zmq4.WithSecurity(sec))
|
||||
defer req.Close()
|
||||
|
||||
rep := zmq4.NewCRep(ctx, czmq4.SockSetZapDomain("global"), czmq4.SockSetPlainServer(1))
|
||||
defer rep.Close()
|
||||
|
||||
grp, ctx := errgroup.WithContext(ctx)
|
||||
grp.Go(func() error {
|
||||
err := rep.Listen(ep)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not listen: %w", err)
|
||||
}
|
||||
|
||||
msg, err := rep.Recv()
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not recv REQ message: %w", err)
|
||||
}
|
||||
|
||||
if !reflect.DeepEqual(msg, reqQuit) {
|
||||
return fmt.Errorf("got = %v, want = %v", msg, repQuit)
|
||||
}
|
||||
|
||||
err = rep.Send(repQuit)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not send REP message: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
grp.Go(func() error {
|
||||
err := req.Dial(ep)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not dial: %w", err)
|
||||
}
|
||||
|
||||
err = req.Send(reqQuit)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not send REQ message: %w", err)
|
||||
}
|
||||
msg, err := req.Recv()
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not recv REQ message: %w", err)
|
||||
}
|
||||
|
||||
if !reflect.DeepEqual(msg, repQuit) {
|
||||
return fmt.Errorf("got = %v, want = %v", msg, repQuit)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
if err := grp.Wait(); err != nil {
|
||||
t.Fatalf("error: %+v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestHandshakeCReqRep(t *testing.T) {
|
||||
t.Skipf("CREQ-REP")
|
||||
|
||||
sec := plain.Security("user", "secret")
|
||||
if got, want := sec.Type(), zmq4.PlainSecurity; got != want {
|
||||
t.Fatalf("got=%v, want=%v", got, want)
|
||||
}
|
||||
|
||||
ctx, timeout := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
defer timeout()
|
||||
|
||||
ep := must(EndPoint("tcp"))
|
||||
|
||||
req := zmq4.NewCReq(ctx, czmq4.SockSetPlainUsername("user"), czmq4.SockSetPlainPassword("secret"))
|
||||
defer req.Close()
|
||||
|
||||
rep := zmq4.NewRep(ctx, zmq4.WithSecurity(sec))
|
||||
defer rep.Close()
|
||||
|
||||
grp, ctx := errgroup.WithContext(ctx)
|
||||
grp.Go(func() error {
|
||||
err := rep.Listen(ep)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not listen: %w", err)
|
||||
}
|
||||
|
||||
msg, err := rep.Recv()
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not recv REQ message: %w", err)
|
||||
}
|
||||
|
||||
if !reflect.DeepEqual(msg, reqQuit) {
|
||||
return fmt.Errorf("got = %v, want = %v", msg, repQuit)
|
||||
}
|
||||
|
||||
err = rep.Send(repQuit)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not send REP message: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
grp.Go(func() error {
|
||||
err := req.Dial(ep)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not dial: %w", err)
|
||||
}
|
||||
|
||||
err = req.Send(reqQuit)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not send REQ message: %w", err)
|
||||
}
|
||||
msg, err := req.Recv()
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not recv REQ message: %w", err)
|
||||
}
|
||||
|
||||
if !reflect.DeepEqual(msg, repQuit) {
|
||||
return fmt.Errorf("got = %v, want = %v", msg, repQuit)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
if err := grp.Wait(); err != nil {
|
||||
t.Fatalf("error: %+v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestHandshakeCReqCRep(t *testing.T) {
|
||||
t.Skipf("CREQ-CREP")
|
||||
|
||||
sec := plain.Security("user", "secret")
|
||||
if got, want := sec.Type(), zmq4.PlainSecurity; got != want {
|
||||
t.Fatalf("got=%v, want=%v", got, want)
|
||||
}
|
||||
|
||||
ctx, timeout := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
defer timeout()
|
||||
|
||||
ep := must(EndPoint("tcp"))
|
||||
|
||||
req := zmq4.NewCReq(ctx, czmq4.SockSetPlainUsername("user"), czmq4.SockSetPlainPassword("secret"))
|
||||
defer req.Close()
|
||||
|
||||
rep := zmq4.NewCRep(ctx, czmq4.SockSetZapDomain("global"), czmq4.SockSetPlainServer(1))
|
||||
defer rep.Close()
|
||||
|
||||
grp, ctx := errgroup.WithContext(ctx)
|
||||
grp.Go(func() error {
|
||||
err := rep.Listen(ep)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not listen: %w", err)
|
||||
}
|
||||
|
||||
msg, err := rep.Recv()
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not recv REQ message: %w", err)
|
||||
}
|
||||
|
||||
if !reflect.DeepEqual(msg, reqQuit) {
|
||||
return fmt.Errorf("got = %v, want = %v", msg, repQuit)
|
||||
}
|
||||
|
||||
err = rep.Send(repQuit)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not send REP message: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
grp.Go(func() error {
|
||||
err := req.Dial(ep)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not dial: %w", err)
|
||||
}
|
||||
|
||||
err = req.Send(reqQuit)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not send REQ message: %w", err)
|
||||
}
|
||||
msg, err := req.Recv()
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not recv REQ message: %w", err)
|
||||
}
|
||||
|
||||
if !reflect.DeepEqual(msg, repQuit) {
|
||||
return fmt.Errorf("got = %v, want = %v", msg, repQuit)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
if err := grp.Wait(); err != nil {
|
||||
t.Fatalf("error: %+v", err)
|
||||
}
|
||||
}
|
Loading…
Reference in a new issue