zmq4: merge zmtp package into zmq4

This commit is contained in:
Sebastien Binet 2018-04-23 10:38:56 +02:00
parent d64156cd31
commit 690b787e90
27 changed files with 353 additions and 339 deletions

View file

@ -2,67 +2,17 @@
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package zmtp implements the ZeroMQ Message Transport Protocol as defined
// in https://rfc.zeromq.org/spec:23/ZMTP/.
package zmtp
package zmq4
import (
"bytes"
"encoding/binary"
"fmt"
"io"
"strings"
"github.com/pkg/errors"
)
// Msg is a ZMTP message, possibly composed of multiple frames.
type Msg struct {
Frames [][]byte
}
func NewMsg(frame []byte) Msg {
return Msg{Frames: [][]byte{frame}}
}
func NewMsgFrom(frames ...[]byte) Msg {
return Msg{Frames: frames}
}
func NewMsgString(frame string) Msg {
return NewMsg([]byte(frame))
}
func NewMsgFromString(frames []string) Msg {
msg := Msg{Frames: make([][]byte, len(frames))}
for i, frame := range frames {
msg.Frames[i] = append(msg.Frames[i], []byte(frame)...)
}
return msg
}
func (msg Msg) String() string {
buf := new(bytes.Buffer)
buf.WriteString("Msg{Frames:{")
for i, frame := range msg.Frames {
if i > 0 {
buf.WriteString(", ")
}
fmt.Fprintf(buf, "%q", frame)
}
buf.WriteString("}}")
return buf.String()
}
func (msg Msg) Clone() Msg {
o := Msg{Frames: make([][]byte, len(msg.Frames))}
for i, frame := range msg.Frames {
o.Frames[i] = make([]byte, len(frame))
copy(o.Frames[i], frame)
}
return o
}
// Conn implements the ZeroMQ Message Transport Protocol as defined
// in https://rfc.zeromq.org/spec:23/ZMTP/.
type Conn struct {
@ -85,11 +35,11 @@ func (c *Conn) Close() error {
// Open performs a complete ZMTP handshake.
func Open(rw io.ReadWriteCloser, sec Security, sockType SocketType, sockID SocketIdentity, server bool) (*Conn, error) {
if rw == nil {
return nil, errors.Errorf("zmtp: invalid nil read-writer")
return nil, errors.Errorf("zmq4: invalid nil read-writer")
}
if sec == nil {
return nil, errors.Errorf("zmtp: invalid nil security")
return nil, errors.Errorf("zmq4: invalid nil security")
}
conn := &Conn{
@ -114,22 +64,22 @@ func (conn *Conn) init(sec Security, md map[string]string) error {
err = conn.greet(conn.server)
if err != nil {
return errors.Wrapf(err, "zmtp: could not exchange greetings")
return errors.Wrapf(err, "zmq4: could not exchange greetings")
}
err = conn.sec.Handshake()
if err != nil {
return errors.Wrapf(err, "zmtp: could not perform security handshake")
return errors.Wrapf(err, "zmq4: could not perform security handshake")
}
err = conn.sendMD(md)
if err != nil {
return errors.Wrapf(err, "zmtp: could not send metadata to peer")
return errors.Wrapf(err, "zmq4: could not send metadata to peer")
}
conn.Peer.MD, err = conn.recvMD()
if err != nil {
return errors.Wrapf(err, "zmtp: could not recv metadata from peer")
return errors.Wrapf(err, "zmq4: could not recv metadata from peer")
}
// FIXME(sbinet): if security mechanism does not define a client/server
@ -153,13 +103,13 @@ func (conn *Conn) greet(server bool) error {
err = send.write(conn.rw)
if err != nil {
return errors.Wrapf(err, "zmtp: could not send greeting")
return errors.Wrapf(err, "zmq4: could not send greeting")
}
var recv greeting
err = recv.read(conn.rw)
if err != nil {
return errors.Wrapf(err, "zmtp: could not recv greeting")
return errors.Wrapf(err, "zmq4: could not recv greeting")
}
peerKind := asString(recv.Mechanism[:])
@ -169,7 +119,7 @@ func (conn *Conn) greet(server bool) error {
conn.Peer.Server, err = asBool(recv.Server)
if err != nil {
return errors.Wrapf(err, "zmtp: could not get peer server flag")
return errors.Wrapf(err, "zmq4: could not get peer server flag")
}
return nil
@ -245,7 +195,7 @@ func (c *Conn) recvMD() (map[string]string, error) {
peer := SocketType(sysMetadata[sysSockType])
if !peer.IsCompatible(c.typ) {
return nil, errors.Errorf("zmtp: peer=%q not compatible with %q", peer, c.typ)
return nil, errors.Errorf("zmq4: peer=%q not compatible with %q", peer, c.typ)
}
return appMetadata, nil
}
@ -270,7 +220,7 @@ func (c *Conn) SendMsg(msg Msg) error {
}
err := c.send(false, frame, flag)
if err != nil {
return errors.Wrapf(err, "zmtp: error sending frame %d/%d", i+1, nframes)
return errors.Wrapf(err, "zmq4: error sending frame %d/%d", i+1, nframes)
}
}
return nil
@ -289,11 +239,11 @@ func (c *Conn) RecvMsg() (Msg, error) {
switch len(msg.Frames) {
case 0:
return Msg{}, errors.Errorf("zmtp: empty command")
return Msg{}, errors.Errorf("zmq4: empty command")
case 1:
// ok
default:
return msg, errors.Errorf("zmtp: invalid length command")
return msg, errors.Errorf("zmq4: invalid length command")
}
var cmd command

View file

@ -4,58 +4,56 @@
// +build czmq4
package zmq4_test
package zmq4
import (
"context"
"net"
"github.com/go-zeromq/zmq4"
"github.com/go-zeromq/zmq4/zmtp"
czmq4 "github.com/zeromq/goczmq"
)
func NewCPair(ctx context.Context) zmq4.Socket {
func NewCPair(ctx context.Context) Socket {
return &csocket{czmq4.NewSock(czmq4.Pair)}
}
func NewCPub(ctx context.Context) zmq4.Socket {
func NewCPub(ctx context.Context) Socket {
return &csocket{czmq4.NewSock(czmq4.Pub)}
}
func NewCSub(ctx context.Context) zmq4.Socket {
func NewCSub(ctx context.Context) Socket {
return &csocket{czmq4.NewSock(czmq4.Sub)}
}
func NewCReq(ctx context.Context) zmq4.Socket {
func NewCReq(ctx context.Context) Socket {
return &csocket{czmq4.NewSock(czmq4.Req)}
}
func NewCRep(ctx context.Context) zmq4.Socket {
func NewCRep(ctx context.Context) Socket {
return &csocket{czmq4.NewSock(czmq4.Rep)}
}
func NewCDealer(ctx context.Context) zmq4.Socket {
func NewCDealer(ctx context.Context) Socket {
return &csocket{czmq4.NewSock(czmq4.Dealer)}
}
func NewCRouter(ctx context.Context) zmq4.Socket {
func NewCRouter(ctx context.Context) Socket {
return &csocket{czmq4.NewSock(czmq4.Router)}
}
func NewCPull(ctx context.Context) zmq4.Socket {
func NewCPull(ctx context.Context) Socket {
return &csocket{czmq4.NewSock(czmq4.Pull)}
}
func NewCPush(ctx context.Context) zmq4.Socket {
func NewCPush(ctx context.Context) Socket {
return &csocket{czmq4.NewSock(czmq4.Push)}
}
func NewCXPub(ctx context.Context) zmq4.Socket {
func NewCXPub(ctx context.Context) Socket {
return &csocket{czmq4.NewSock(czmq4.XPub)}
}
func NewCXSub(ctx context.Context) zmq4.Socket {
func NewCXSub(ctx context.Context) Socket {
return &csocket{czmq4.NewSock(czmq4.XSub)}
}
@ -70,14 +68,14 @@ func (sck *csocket) Close() error {
// Send puts the message on the outbound send queue.
// Send blocks until the message can be queued or the send deadline expires.
func (sck *csocket) Send(msg zmtp.Msg) error {
func (sck *csocket) Send(msg Msg) error {
return sck.sock.SendMessage(msg.Frames)
}
// Recv receives a complete message.
func (sck *csocket) Recv() (zmtp.Msg, error) {
func (sck *csocket) Recv() (Msg, error) {
frames, err := sck.sock.RecvMessage()
return zmtp.Msg{Frames: frames}, err
return Msg{Frames: frames}, err
}
// Listen connects a local endpoint to the Socket.
@ -92,30 +90,30 @@ func (sck *csocket) Dial(addr string) error {
}
// Type returns the type of this Socket (PUB, SUB, ...)
func (sck *csocket) Type() zmtp.SocketType {
func (sck *csocket) Type() SocketType {
switch sck.sock.GetType() {
case czmq4.Pair:
return zmtp.Pair
return Pair
case czmq4.Pub:
return zmtp.Pub
return Pub
case czmq4.Sub:
return zmtp.Sub
return Sub
case czmq4.Req:
return zmtp.Req
return Req
case czmq4.Rep:
return zmtp.Rep
return Rep
case czmq4.Dealer:
return zmtp.Dealer
return Dealer
case czmq4.Router:
return zmtp.Router
return Router
case czmq4.Pull:
return zmtp.Pull
return Pull
case czmq4.Push:
return zmtp.Push
return Push
case czmq4.XPub:
return zmtp.XPub
return XPub
case czmq4.XSub:
return zmtp.XSub
return XSub
}
panic("invalid C-socket type")
}
@ -133,11 +131,11 @@ func (sck *csocket) GetOption(name string) (interface{}, error) {
// SetOption is used to set an option for a socket.
func (sck *csocket) SetOption(name string, value interface{}) error {
switch name {
case zmq4.OptionSubscribe:
case OptionSubscribe:
topic := value.(string)
sck.sock.SetOption(czmq4.SockSetSubscribe(topic))
return nil
case zmq4.OptionUnsubscribe:
case OptionUnsubscribe:
topic := value.(string)
sck.sock.SetUnsubscribe(topic)
return nil
@ -148,5 +146,5 @@ func (sck *csocket) SetOption(name string, value interface{}) error {
}
var (
_ zmq4.Socket = (*csocket)(nil)
_ Socket = (*csocket)(nil)
)

View file

@ -15,44 +15,44 @@ var (
{
name: "tcp-cpush-pull",
endpoint: must(EndPoint("tcp")),
push: NewCPush(bkg),
push: zmq4.NewCPush(bkg),
pull: zmq4.NewPull(bkg),
},
{
name: "tcp-push-cpull",
endpoint: must(EndPoint("tcp")),
push: zmq4.NewPush(bkg),
pull: NewCPull(bkg),
pull: zmq4.NewCPull(bkg),
},
{
name: "tcp-cpush-cpull",
endpoint: must(EndPoint("tcp")),
push: NewCPush(bkg),
pull: NewCPull(bkg),
push: zmq4.NewCPush(bkg),
pull: zmq4.NewCPull(bkg),
},
{
name: "ipc-cpush-pull",
endpoint: "ipc://ipc-cpush-pull",
push: NewCPush(bkg),
push: zmq4.NewCPush(bkg),
pull: zmq4.NewPull(bkg),
},
{
name: "ipc-push-cpull",
endpoint: "ipc://ipc-push-cpull",
push: zmq4.NewPush(bkg),
pull: NewCPull(bkg),
pull: zmq4.NewCPull(bkg),
},
{
name: "ipc-cpush-cpull",
endpoint: "ipc://ipc-cpush-cpull",
push: NewCPush(bkg),
pull: NewCPull(bkg),
push: zmq4.NewCPush(bkg),
pull: zmq4.NewCPull(bkg),
},
//{
// name: "udp-cpush-cpull",
// endpoint: "udp://127.0.0.1:55555",
// push: NewCPush(),
// pull: NewCPull(),
// push: zmq4.NewCPush(),
// pull: zmq4.NewCPull(),
//},
}
@ -60,38 +60,38 @@ var (
{
name: "tcp-creq-rep",
endpoint: must(EndPoint("tcp")),
req: NewCReq(bkg),
req: zmq4.NewCReq(bkg),
rep: zmq4.NewRep(bkg),
},
{
name: "tcp-req-crep",
endpoint: must(EndPoint("tcp")),
req: zmq4.NewReq(bkg),
rep: NewCRep(bkg),
rep: zmq4.NewCRep(bkg),
},
{
name: "tcp-creq-crep",
endpoint: must(EndPoint("tcp")),
req: NewCReq(bkg),
rep: NewCRep(bkg),
req: zmq4.NewCReq(bkg),
rep: zmq4.NewCRep(bkg),
},
{
name: "ipc-creq-rep",
endpoint: "ipc://ipc-creq-rep",
req: NewCReq(bkg),
req: zmq4.NewCReq(bkg),
rep: zmq4.NewRep(bkg),
},
{
name: "ipc-req-crep",
endpoint: "ipc://ipc-req-crep",
req: zmq4.NewReq(bkg),
rep: NewCRep(bkg),
rep: zmq4.NewCRep(bkg),
},
{
name: "ipc-creq-crep",
endpoint: "ipc://ipc-creq-crep",
req: NewCReq(bkg),
rep: NewCRep(bkg),
req: zmq4.NewCReq(bkg),
rep: zmq4.NewCRep(bkg),
},
}
@ -99,7 +99,7 @@ var (
{
name: "tcp-cpub-sub",
endpoint: must(EndPoint("tcp")),
pub: NewCPub(bkg),
pub: zmq4.NewCPub(bkg),
sub1: zmq4.NewSub(bkg),
sub2: zmq4.NewSub(bkg),
},
@ -107,20 +107,20 @@ var (
name: "tcp-pub-csub",
endpoint: must(EndPoint("tcp")),
pub: zmq4.NewPub(bkg),
sub1: NewCSub(bkg),
sub2: NewCSub(bkg),
sub1: zmq4.NewCSub(bkg),
sub2: zmq4.NewCSub(bkg),
},
{
name: "tcp-cpub-csub",
endpoint: must(EndPoint("tcp")),
pub: NewCPub(bkg),
sub1: NewCSub(bkg),
sub2: NewCSub(bkg),
pub: zmq4.NewCPub(bkg),
sub1: zmq4.NewCSub(bkg),
sub2: zmq4.NewCSub(bkg),
},
{
name: "ipc-cpub-sub",
endpoint: "ipc://ipc-cpub-sub",
pub: NewCPub(bkg),
pub: zmq4.NewCPub(bkg),
sub1: zmq4.NewSub(bkg),
sub2: zmq4.NewSub(bkg),
},
@ -128,15 +128,15 @@ var (
name: "ipc-pub-csub",
endpoint: "ipc://ipc-pub-csub",
pub: zmq4.NewPub(bkg),
sub1: NewCSub(bkg),
sub2: NewCSub(bkg),
sub1: zmq4.NewCSub(bkg),
sub2: zmq4.NewCSub(bkg),
},
{
name: "ipc-cpub-csub",
endpoint: "ipc://ipc-cpub-csub",
pub: NewCPub(bkg),
sub1: NewCSub(bkg),
sub2: NewCSub(bkg),
pub: zmq4.NewCPub(bkg),
sub1: zmq4.NewCSub(bkg),
sub2: zmq4.NewCSub(bkg),
},
}
)

View file

@ -6,14 +6,12 @@ package zmq4
import (
"context"
"github.com/go-zeromq/zmq4/zmtp"
)
// NewDealer returns a new DEALER ZeroMQ socket.
// The returned socket value is initially unbound.
func NewDealer(ctx context.Context, opts ...Option) Socket {
return &dealerSocket{newSocket(ctx, zmtp.Dealer, opts...)}
return &dealerSocket{newSocket(ctx, Dealer, opts...)}
}
// dealerSocket is a DEALER ZeroMQ socket.

103
msg.go Normal file
View file

@ -0,0 +1,103 @@
// Copyright 2018 The go-zeromq Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package zmq4
import (
"bytes"
"fmt"
"io"
)
// Msg is a ZMTP message, possibly composed of multiple frames.
type Msg struct {
Frames [][]byte
}
func NewMsg(frame []byte) Msg {
return Msg{Frames: [][]byte{frame}}
}
func NewMsgFrom(frames ...[]byte) Msg {
return Msg{Frames: frames}
}
func NewMsgString(frame string) Msg {
return NewMsg([]byte(frame))
}
func NewMsgFromString(frames []string) Msg {
msg := Msg{Frames: make([][]byte, len(frames))}
for i, frame := range frames {
msg.Frames[i] = append(msg.Frames[i], []byte(frame)...)
}
return msg
}
func (msg Msg) String() string {
buf := new(bytes.Buffer)
buf.WriteString("Msg{Frames:{")
for i, frame := range msg.Frames {
if i > 0 {
buf.WriteString(", ")
}
fmt.Fprintf(buf, "%q", frame)
}
buf.WriteString("}}")
return buf.String()
}
func (msg Msg) Clone() Msg {
o := Msg{Frames: make([][]byte, len(msg.Frames))}
for i, frame := range msg.Frames {
o.Frames[i] = make([]byte, len(frame))
copy(o.Frames[i], frame)
}
return o
}
// command is a ZMTP command as per:
// https://rfc.zeromq.org/spec:23/ZMTP/#formal-grammar
type command struct {
Name string
Body []byte
}
func (cmd *command) unmarshalZMTP(data []byte) error {
if len(data) == 0 {
return io.ErrUnexpectedEOF
}
n := int(data[0])
if n > len(data)-1 {
return errBadCmd
}
cmd.Name = string(data[1 : n+1])
cmd.Body = data[n+1:]
return nil
}
func (cmd *command) marshalZMTP() ([]byte, error) {
n := len(cmd.Name)
if n > 255 {
return nil, errBadCmd
}
buf := make([]byte, 0, 1+n+len(cmd.Body))
buf = append(buf, byte(n))
buf = append(buf, []byte(cmd.Name)...)
buf = append(buf, cmd.Body...)
return buf, nil
}
// ZMTP commands as per:
// https://rfc.zeromq.org/spec:23/ZMTP/#commands
const (
cmdCancel = "CANCEL"
cmdError = "ERROR"
cmdHello = "HELLO"
cmdPing = "PING"
cmdPong = "PONG"
cmdReady = "READY"
cmdSubscribe = "SUBSCRIBE"
)

5
null_security.go Normal file
View file

@ -0,0 +1,5 @@
// Copyright 2018 The go-zeromq Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package zmq4

View file

@ -6,9 +6,6 @@ package zmq4
import (
"time"
"github.com/go-zeromq/zmq4/zmtp"
"github.com/go-zeromq/zmq4/zmtp/security/null"
)
// Option configures some aspect of a ZeroMQ socket.
@ -16,7 +13,7 @@ import (
type Option func(s *socket)
// WithID configures a ZeroMQ socket identity.
func WithID(id zmtp.SocketIdentity) Option {
func WithID(id SocketIdentity) Option {
return func(s *socket) {
s.id = id
}
@ -24,11 +21,8 @@ func WithID(id zmtp.SocketIdentity) Option {
// WithSecurity configures a ZeroMQ socket to use the given security mechanism.
// If the security mechanims is nil, the NULL mechanism is used.
func WithSecurity(sec zmtp.Security) Option {
func WithSecurity(sec Security) Option {
return func(s *socket) {
if sec == nil {
sec = null.Security()
}
s.sec = sec
}
}

View file

@ -6,14 +6,12 @@ package zmq4
import (
"context"
"github.com/go-zeromq/zmq4/zmtp"
)
// NewPair returns a new PAIR ZeroMQ socket.
// The returned socket value is initially unbound.
func NewPair(ctx context.Context, opts ...Option) Socket {
return &pairSocket{newSocket(ctx, zmtp.Pair, opts...)}
return &pairSocket{newSocket(ctx, Pair, opts...)}
}
// pairSocket is a PAIR ZeroMQ socket.

View file

@ -2,7 +2,7 @@
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package zmtp
package zmq4
import (
"bytes"
@ -14,16 +14,15 @@ import (
)
var (
errGreeting = errors.New("zmtp: invalid greeting received")
errSecMech = errors.New("zmtp: invalid security mechanism")
errBadSec = errors.New("zmtp: invalid or unsupported security mechanism")
errBadCmd = errors.New("zmtp: invalid command name")
errBadFrame = errors.New("zmtp: invalid frame")
errOverflow = errors.New("zmtp: overflow")
errEmptyAppMDKey = errors.New("zmtp: empty application metadata key")
errDupAppMDKey = errors.New("zmtp: duplicate application metadata key")
errBoolCnv = errors.New("zmtp: invalid byte to bool conversion")
errMoreCmd = errors.New("zmtp: MORE not supported") // FIXME(sbinet)
errGreeting = errors.New("zmq4: invalid greeting received")
errSecMech = errors.New("zmq4: invalid security mechanism")
errBadSec = errors.New("zmq4: invalid or unsupported security mechanism")
errBadCmd = errors.New("zmq4: invalid command name")
errBadFrame = errors.New("zmq4: invalid frame")
errOverflow = errors.New("zmq4: overflow")
errEmptyAppMDKey = errors.New("zmq4: empty application metadata key")
errDupAppMDKey = errors.New("zmq4: duplicate application metadata key")
errBoolCnv = errors.New("zmq4: invalid byte to bool conversion")
)
const (
@ -151,51 +150,6 @@ func (g *greeting) marshal() []byte {
return buf[:]
}
// command is a ZMTP command as per:
// https://rfc.zeromq.org/spec:23/ZMTP/#formal-grammar
type command struct {
Name string
Body []byte
}
func (cmd *command) unmarshalZMTP(data []byte) error {
if len(data) == 0 {
return io.ErrUnexpectedEOF
}
n := int(data[0])
if n > len(data)-1 {
return errBadCmd
}
cmd.Name = string(data[1 : n+1])
cmd.Body = data[n+1:]
return nil
}
func (cmd *command) marshalZMTP() ([]byte, error) {
n := len(cmd.Name)
if n > 255 {
return nil, errBadCmd
}
buf := make([]byte, 0, 1+n+len(cmd.Body))
buf = append(buf, byte(n))
buf = append(buf, []byte(cmd.Name)...)
buf = append(buf, cmd.Body...)
return buf, nil
}
// ZMTP commands as per:
// https://rfc.zeromq.org/spec:23/ZMTP/#commands
const (
cmdCancel = "CANCEL"
cmdError = "ERROR"
cmdHello = "HELLO"
cmdPing = "PING"
cmdPong = "PONG"
cmdReady = "READY"
cmdSubscribe = "SUBSCRIBE"
)
const (
sysSockType = "Socket-Type"
sysSockID = "Identity"

6
pub.go
View file

@ -6,14 +6,12 @@ package zmq4
import (
"context"
"github.com/go-zeromq/zmq4/zmtp"
)
// NewPub returns a new PUB ZeroMQ socket.
// The returned socket value is initially unbound.
func NewPub(ctx context.Context, opts ...Option) Socket {
return &pubSocket{newSocket(ctx, zmtp.Pub, opts...)}
return &pubSocket{newSocket(ctx, Pub, opts...)}
}
// pubSocket is a PUB ZeroMQ socket.
@ -23,7 +21,7 @@ type pubSocket struct {
// Send puts the message on the outbound send queue.
// Send blocks until the message can be queued or the send deadline expires.
func (pub *pubSocket) Send(msg zmtp.Msg) error {
func (pub *pubSocket) Send(msg Msg) error {
pub.socket.mu.RLock()
var err error
// FIXME(sbinet): only send to correct subscribers...

View file

@ -7,14 +7,13 @@ package zmq4
import (
"context"
"github.com/go-zeromq/zmq4/zmtp"
"github.com/pkg/errors"
)
// NewPull returns a new PULL ZeroMQ socket.
// The returned socket value is initially unbound.
func NewPull(ctx context.Context, opts ...Option) Socket {
return &pullSocket{newSocket(ctx, zmtp.Pull, opts...)}
return &pullSocket{newSocket(ctx, Pull, opts...)}
}
// pullSocket is a PULL ZeroMQ socket.
@ -24,7 +23,7 @@ type pullSocket struct {
// Send puts the message on the outbound send queue.
// Send blocks until the message can be queued or the send deadline expires.
func (*pullSocket) Send(msg zmtp.Msg) error {
func (*pullSocket) Send(msg Msg) error {
return errors.Errorf("zmq4: PULL sockets can't send messages")
}

View file

@ -7,14 +7,13 @@ package zmq4
import (
"context"
"github.com/go-zeromq/zmq4/zmtp"
"github.com/pkg/errors"
)
// NewPush returns a new PUSH ZeroMQ socket.
// The returned socket value is initially unbound.
func NewPush(ctx context.Context, opts ...Option) Socket {
return &pushSocket{newSocket(ctx, zmtp.Push, opts...)}
return &pushSocket{newSocket(ctx, Push, opts...)}
}
// pushSocket is a PUSH ZeroMQ socket.
@ -23,8 +22,8 @@ type pushSocket struct {
}
// Recv receives a complete message.
func (*pushSocket) Recv() (zmtp.Msg, error) {
return zmtp.Msg{}, errors.Errorf("zmq4: PUSH sockets can't recv messages")
func (*pushSocket) Recv() (Msg, error) {
return Msg{}, errors.Errorf("zmq4: PUSH sockets can't recv messages")
}
var (

8
rep.go
View file

@ -6,14 +6,12 @@ package zmq4
import (
"context"
"github.com/go-zeromq/zmq4/zmtp"
)
// NewRep returns a new REP ZeroMQ socket.
// The returned socket value is initially unbound.
func NewRep(ctx context.Context, opts ...Option) Socket {
return &repSocket{newSocket(ctx, zmtp.Rep, opts...)}
return &repSocket{newSocket(ctx, Rep, opts...)}
}
// repSocket is a REP ZeroMQ socket.
@ -23,13 +21,13 @@ type repSocket struct {
// Send puts the message on the outbound send queue.
// Send blocks until the message can be queued or the send deadline expires.
func (rep *repSocket) Send(msg zmtp.Msg) error {
func (rep *repSocket) Send(msg Msg) error {
msg.Frames = append([][]byte{nil}, msg.Frames...)
return rep.socket.Send(msg)
}
// Recv receives a complete message.
func (rep *repSocket) Recv() (zmtp.Msg, error) {
func (rep *repSocket) Recv() (Msg, error) {
msg, err := rep.socket.Recv()
if len(msg.Frames) > 1 {
msg.Frames = msg.Frames[1:]

8
req.go
View file

@ -6,14 +6,12 @@ package zmq4
import (
"context"
"github.com/go-zeromq/zmq4/zmtp"
)
// NewReq returns a new REQ ZeroMQ socket.
// The returned socket value is initially unbound.
func NewReq(ctx context.Context, opts ...Option) Socket {
return &reqSocket{newSocket(ctx, zmtp.Req, opts...)}
return &reqSocket{newSocket(ctx, Req, opts...)}
}
// reqSocket is a REQ ZeroMQ socket.
@ -23,13 +21,13 @@ type reqSocket struct {
// Send puts the message on the outbound send queue.
// Send blocks until the message can be queued or the send deadline expires.
func (req *reqSocket) Send(msg zmtp.Msg) error {
func (req *reqSocket) Send(msg Msg) error {
msg.Frames = append([][]byte{nil}, msg.Frames...)
return req.socket.Send(msg)
}
// Recv receives a complete message.
func (req *reqSocket) Recv() (zmtp.Msg, error) {
func (req *reqSocket) Recv() (Msg, error) {
msg, err := req.socket.Recv()
if len(msg.Frames) > 1 {
msg.Frames = msg.Frames[1:]

View file

@ -6,14 +6,12 @@ package zmq4
import (
"context"
"github.com/go-zeromq/zmq4/zmtp"
)
// NewRouter returns a new ROUTER ZeroMQ socket.
// The returned socket value is initially unbound.
func NewRouter(ctx context.Context, opts ...Option) Socket {
return &routerSocket{newSocket(ctx, zmtp.Router, opts...)}
return &routerSocket{newSocket(ctx, Router, opts...)}
}
// routerSocket is a ROUTER ZeroMQ socket.

View file

@ -2,7 +2,7 @@
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package zmtp
package zmq4
import (
"io"
@ -34,12 +34,40 @@ const (
// that does no authentication nor encryption.
NullSecurity SecurityType = "NULL"
// PlainSecurityType is a security mechanism that uses
// PlainSecurity is a security mechanism that uses
// plaintext passwords. It is a reference implementation and
// should not be used to anything important.
PlainSecurityType SecurityType = "PLAIN"
PlainSecurity SecurityType = "PLAIN"
// CurveSecurityType uses ZMQ_CURVE for authentication
// CurveSecurity uses ZMQ_CURVE for authentication
// and encryption.
CurveSecurityType SecurityType = "CURVE"
CurveSecurity SecurityType = "CURVE"
)
// security implements the NULL security mechanism.
type nullSecurity struct{}
// Type returns the security mechanism type.
func (nullSecurity) Type() SecurityType {
return NullSecurity
}
// Handshake implements the ZMTP security handshake according to
// this security mechanism.
// see:
// https://rfc.zeromq.org/spec:23/ZMTP/
// https://rfc.zeromq.org/spec:24/ZMTP-PLAIN/
// https://rfc.zeromq.org/spec:25/ZMTP-CURVE/
func (nullSecurity) Handshake() error {
return nil
}
// Encrypt writes the encrypted form of data to w.
func (nullSecurity) Encrypt(w io.Writer, data []byte) (int, error) {
return w.Write(data)
}
// Decrypt writes the decrypted form of data to w.
func (nullSecurity) Decrypt(w io.Writer, data []byte) (int, error) {
return w.Write(data)
}

View file

@ -8,20 +8,20 @@ package null
import (
"io"
"github.com/go-zeromq/zmq4/zmtp"
"github.com/go-zeromq/zmq4"
)
// security implements the NULL security mechanism.
type security struct{}
// Security returns a value that implements the NULL security mechanism
func Security() zmtp.Security {
func Security() zmq4.Security {
return security{}
}
// Type returns the security mechanism type.
func (security) Type() zmtp.SecurityType {
return zmtp.NullSecurity
func (security) Type() zmq4.SecurityType {
return zmq4.NullSecurity
}
// Handshake implements the ZMTP security handshake according to

View file

@ -6,17 +6,10 @@ package zmq4
import (
"context"
"crypto/rand"
"fmt"
"io"
"log"
"net"
"strings"
"sync"
"time"
"github.com/go-zeromq/zmq4/zmtp"
"github.com/go-zeromq/zmq4/zmtp/security/null"
"github.com/pkg/errors"
)
@ -37,14 +30,14 @@ type socket struct {
ready chan struct{} // ready when at least 1 connection is live
once *sync.Once
ep string // socket end-point
typ zmtp.SocketType
id zmtp.SocketIdentity
typ SocketType
id SocketIdentity
retry time.Duration
sec zmtp.Security
sec Security
mu sync.RWMutex
ids map[string]*zmtp.Conn // ZMTP connection IDs
conns []*zmtp.Conn // ZMTP connections
ids map[string]*Conn // ZMTP connection IDs
conns []*Conn // ZMTP connections
props map[string]interface{} // properties of this socket
@ -54,7 +47,7 @@ type socket struct {
dialer net.Dialer
}
func newDefaultSocket(ctx context.Context, sockType zmtp.SocketType) *socket {
func newDefaultSocket(ctx context.Context, sockType SocketType) *socket {
if ctx == nil {
ctx = context.Background()
}
@ -64,8 +57,8 @@ func newDefaultSocket(ctx context.Context, sockType zmtp.SocketType) *socket {
ready: make(chan struct{}),
typ: sockType,
retry: defaultRetry,
sec: null.Security(),
ids: make(map[string]*zmtp.Conn),
sec: nullSecurity{},
ids: make(map[string]*Conn),
conns: nil,
props: make(map[string]interface{}),
ctx: ctx,
@ -74,7 +67,7 @@ func newDefaultSocket(ctx context.Context, sockType zmtp.SocketType) *socket {
}
}
func newSocket(ctx context.Context, sockType zmtp.SocketType, opts ...Option) *socket {
func newSocket(ctx context.Context, sockType SocketType, opts ...Option) *socket {
sck := newDefaultSocket(ctx, sockType)
for _, opt := range opts {
opt(sck)
@ -93,18 +86,20 @@ func (sck *socket) Close() error {
return errInvalidSocket
}
var err error
sck.mu.RLock()
for _, conn := range sck.conns {
e := conn.Close()
if e != nil && err == nil {
err = e
}
}
sck.mu.RUnlock()
return err
}
// Send puts the message on the outbound send queue.
// Send blocks until the message can be queued or the send deadline expires.
func (sck *socket) Send(msg zmtp.Msg) error {
func (sck *socket) Send(msg Msg) error {
sck.isReady()
sck.mu.RLock()
err := sck.conns[0].SendMsg(msg)
@ -113,7 +108,7 @@ func (sck *socket) Send(msg zmtp.Msg) error {
}
// Recv receives a complete message.
func (sck *socket) Recv() (zmtp.Msg, error) {
func (sck *socket) Recv() (Msg, error) {
sck.isReady()
sck.mu.RLock()
msg, err := sck.conns[0].RecvMsg()
@ -168,7 +163,7 @@ func (sck *socket) accept() {
continue
}
zconn, err := zmtp.Open(conn, sck.sec, sck.typ, sck.id, true)
zconn, err := Open(conn, sck.sec, sck.typ, sck.id, true)
if err != nil {
panic(err)
// return errors.Wrapf(err, "could not open a ZMTP connection")
@ -222,7 +217,7 @@ connect:
return errors.Wrapf(err, "got a nil dial-conn to %q", endpoint)
}
zconn, err := zmtp.Open(conn, sck.sec, sck.typ, sck.id, false)
zconn, err := Open(conn, sck.sec, sck.typ, sck.id, false)
if err != nil {
return errors.Wrapf(err, "could not open a ZMTP connection")
}
@ -240,7 +235,7 @@ connect:
return nil
}
func (sck *socket) addConn(c *zmtp.Conn) {
func (sck *socket) addConn(c *Conn) {
sck.mu.Lock()
sck.conns = append(sck.conns, c)
uuid, ok := c.Peer.MD["Identity"]
@ -252,7 +247,7 @@ func (sck *socket) addConn(c *zmtp.Conn) {
}
// Type returns the type of this Socket (PUB, SUB, ...)
func (sck *socket) Type() zmtp.SocketType {
func (sck *socket) Type() SocketType {
return sck.typ
}
@ -282,55 +277,3 @@ func (sck *socket) isReady() {
var (
_ Socket = (*socket)(nil)
)
// splitAddr returns the triplet (network, addr, error)
func splitAddr(v string) (network, addr string, err error) {
ep := strings.Split(v, "://")
if len(ep) != 2 {
err = errInvalidAddress
return network, addr, err
}
var (
host string
port string
)
network = ep[0]
switch network {
case "tcp", "udp":
host, port, err = net.SplitHostPort(ep[1])
if err != nil {
return network, addr, err
}
switch port {
case "0", "*", "":
port = "0"
}
switch host {
case "", "*":
host = "0.0.0.0"
}
addr = host + ":" + port
return network, addr, err
case "ipc":
host = ep[1]
port = ""
return network, host, nil
case "inproc":
err = fmt.Errorf("zmq4: protocol %q not implemented", network)
default:
err = fmt.Errorf("zmq4: unknown protocol %q", network)
}
return network, addr, err
}
func newUUID() string {
var uuid [16]byte
if _, err := io.ReadFull(rand.Reader, uuid[:]); err != nil {
log.Fatalf("cannot generate random data for UUID: %v", err)
}
uuid[8] = uuid[8]&^0xc0 | 0x80
uuid[6] = uuid[6]&^0xf0 | 0x40
return fmt.Sprintf("%x-%x-%x-%x-%x", uuid[:4], uuid[4:6], uuid[6:8], uuid[8:10], uuid[10:])
}

View file

@ -2,7 +2,7 @@
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package zmtp
package zmq4
// SocketType is a ZeroMQ socket type.
type SocketType string

8
sub.go
View file

@ -6,14 +6,12 @@ package zmq4
import (
"context"
"github.com/go-zeromq/zmq4/zmtp"
)
// NewSub returns a new SUB ZeroMQ socket.
// The returned socket value is initially unbound.
func NewSub(ctx context.Context, opts ...Option) Socket {
return &subSocket{newSocket(ctx, zmtp.Sub, opts...)}
return &subSocket{newSocket(ctx, Sub, opts...)}
}
// subSocket is a SUB ZeroMQ socket.
@ -30,9 +28,9 @@ func (sub *subSocket) SetOption(name string, value interface{}) error {
switch name {
case OptionSubscribe:
err = sub.Send(zmtp.NewMsgFrom([]byte{1}, []byte(value.(string))))
err = sub.Send(NewMsgFrom([]byte{1}, []byte(value.(string))))
case OptionUnsubscribe:
err = sub.Send(zmtp.NewMsgFrom([]byte{0}, []byte(value.(string))))
err = sub.Send(NewMsgFrom([]byte{0}, []byte(value.(string))))
}
return err

66
utils.go Normal file
View file

@ -0,0 +1,66 @@
// Copyright 2018 The go-zeromq Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package zmq4
import (
"crypto/rand"
"fmt"
"io"
"log"
"net"
"strings"
)
// splitAddr returns the triplet (network, addr, error)
func splitAddr(v string) (network, addr string, err error) {
ep := strings.Split(v, "://")
if len(ep) != 2 {
err = errInvalidAddress
return network, addr, err
}
var (
host string
port string
)
network = ep[0]
switch network {
case "tcp", "udp":
host, port, err = net.SplitHostPort(ep[1])
if err != nil {
return network, addr, err
}
switch port {
case "0", "*", "":
port = "0"
}
switch host {
case "", "*":
host = "0.0.0.0"
}
addr = host + ":" + port
return network, addr, err
case "ipc":
host = ep[1]
port = ""
return network, host, nil
case "inproc":
err = fmt.Errorf("zmq4: protocol %q not implemented", network)
default:
err = fmt.Errorf("zmq4: unknown protocol %q", network)
}
return network, addr, err
}
func newUUID() string {
var uuid [16]byte
if _, err := io.ReadFull(rand.Reader, uuid[:]); err != nil {
log.Fatalf("cannot generate random data for UUID: %v", err)
}
uuid[8] = uuid[8]&^0xc0 | 0x80
uuid[6] = uuid[6]&^0xf0 | 0x40
return fmt.Sprintf("%x-%x-%x-%x-%x", uuid[:4], uuid[4:6], uuid[6:8], uuid[8:10], uuid[10:])
}

View file

@ -6,14 +6,12 @@ package zmq4
import (
"context"
"github.com/go-zeromq/zmq4/zmtp"
)
// NewXPub returns a new XPUB ZeroMQ socket.
// The returned socket value is initially unbound.
func NewXPub(ctx context.Context, opts ...Option) Socket {
return &xpubSocket{newSocket(ctx, zmtp.XPub, opts...)}
return &xpubSocket{newSocket(ctx, XPub, opts...)}
}
// xpubSocket is a XPUB ZeroMQ socket.

View file

@ -6,14 +6,12 @@ package zmq4
import (
"context"
"github.com/go-zeromq/zmq4/zmtp"
)
// NewXSub returns a new XSUB ZeroMQ socket.
// The returned socket value is initially unbound.
func NewXSub(ctx context.Context, opts ...Option) Socket {
return &xsubSocket{newSocket(ctx, zmtp.XSub, opts...)}
return &xsubSocket{newSocket(ctx, XSub, opts...)}
}
// xsubSocket is a XSUB ZeroMQ socket.

10
zmq4.go
View file

@ -7,10 +7,6 @@
// For more informations, see http://zeromq.org.
package zmq4
import (
"github.com/go-zeromq/zmq4/zmtp"
)
// Socket represents a ZeroMQ socket.
type Socket interface {
// Close closes the open Socket
@ -18,10 +14,10 @@ type Socket interface {
// Send puts the message on the outbound send queue.
// Send blocks until the message can be queued or the send deadline expires.
Send(msg zmtp.Msg) error
Send(msg Msg) error
// Recv receives a complete message.
Recv() (zmtp.Msg, error)
Recv() (Msg, error)
// Listen connects a local endpoint to the Socket.
Listen(ep string) error
@ -30,7 +26,7 @@ type Socket interface {
Dial(ep string) error
// Type returns the type of this Socket (PUB, SUB, ...)
Type() zmtp.SocketType
Type() SocketType
// GetOption is used to retrieve an option for a socket.
GetOption(name string) (interface{}, error)

View file

@ -13,7 +13,6 @@ import (
"time"
"github.com/go-zeromq/zmq4"
"github.com/go-zeromq/zmq4/zmtp"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"
)
@ -49,9 +48,9 @@ type testCasePubSub struct {
func TestPubSub(t *testing.T) {
var (
topic = "MSG"
msg1 = zmtp.NewMsgString("MSG 1")
msg2 = zmtp.NewMsgString("MSG 2")
msgs = []zmtp.Msg{msg1, msg2}
msg1 = zmq4.NewMsgString("MSG 1")
msg2 = zmq4.NewMsgString("MSG 2")
msgs = []zmq4.Msg{msg1, msg2}
)
for _, tc := range pubsubs {

View file

@ -13,7 +13,6 @@ import (
"time"
"github.com/go-zeromq/zmq4"
"github.com/go-zeromq/zmq4/zmtp"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"
)
@ -45,8 +44,8 @@ type testCasePushPull struct {
func TestPushPull(t *testing.T) {
var (
hello = zmtp.NewMsg([]byte("HELLO WORLD"))
bye = zmtp.NewMsgFrom([]byte("GOOD"), []byte("BYE"))
hello = zmq4.NewMsg([]byte("HELLO WORLD"))
bye = zmq4.NewMsgFrom([]byte("GOOD"), []byte("BYE"))
)
for _, tc := range pushpulls {

View file

@ -13,7 +13,6 @@ import (
"time"
"github.com/go-zeromq/zmq4"
"github.com/go-zeromq/zmq4/zmtp"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"
)
@ -45,12 +44,12 @@ type testCaseReqRep struct {
func TestReqRep(t *testing.T) {
var (
reqName = zmtp.NewMsgString("NAME")
reqLang = zmtp.NewMsgString("LANG")
reqQuit = zmtp.NewMsgString("QUIT")
repName = zmtp.NewMsgString("zmq4")
repLang = zmtp.NewMsgString("Go")
repQuit = zmtp.NewMsgString("bye")
reqName = zmq4.NewMsgString("NAME")
reqLang = zmq4.NewMsgString("LANG")
reqQuit = zmq4.NewMsgString("QUIT")
repName = zmq4.NewMsgString("zmq4")
repLang = zmq4.NewMsgString("Go")
repQuit = zmq4.NewMsgString("bye")
)
for _, tc := range reqreps {
@ -86,7 +85,7 @@ func TestReqRep(t *testing.T) {
if err != nil {
return errors.Wrapf(err, "could not recv REQ message")
}
var rep zmtp.Msg
var rep zmq4.Msg
switch string(msg.Frames[0]) {
case "NAME":
rep = repName
@ -113,8 +112,8 @@ func TestReqRep(t *testing.T) {
}
for _, msg := range []struct {
req zmtp.Msg
rep zmtp.Msg
req zmq4.Msg
rep zmq4.Msg
}{
{reqName, repName},
{reqLang, repLang},