zmq4: add multi-part Send API

benchstat between Send and SendMulti:
name               old time/op  new time/op  delta
PubSubSendMulti-8   26.9s ± 0%    7.2s ± 0%   ~     (p=1.000 n=1+1)
This commit is contained in:
Murphy Law 2019-11-01 17:09:05 -04:00 committed by Sebastien Binet
parent 33d608acca
commit 78ce94b745
17 changed files with 251 additions and 8 deletions

52
conn.go
View file

@ -177,6 +177,10 @@ func (c *Conn) SendMsg(msg Msg) error {
if c.Closed() {
return ErrClosedConn
}
if msg.multipart {
return c.sendMulti(msg)
}
nframes := len(msg.Frames)
for i, frame := range msg.Frames {
var flag byte
@ -276,6 +280,54 @@ func (c *Conn) RecvCmd() (Cmd, error) {
return cmd, nil
}
func (c *Conn) sendMulti(msg Msg) error {
var buffers net.Buffers
nframes := len(msg.Frames)
for i, frame := range msg.Frames {
var flag byte
if i < nframes-1 {
flag ^= hasMoreBitFlag
}
size := len(frame)
isLong := size > 255
if isLong {
flag ^= isLongBitFlag
}
var (
hdr = [8 + 1]byte{flag}
hsz int
)
if isLong {
hsz = 9
binary.BigEndian.PutUint64(hdr[1:], uint64(size))
} else {
hsz = 2
hdr[1] = uint8(size)
}
switch c.sec.Type() {
case NullSecurity:
buffers = append(buffers, hdr[:hsz], frame)
default:
var secBuf bytes.Buffer
if _, err := c.sec.Encrypt(&secBuf, frame); err != nil {
return err
}
buffers = append(buffers, hdr[:hsz], secBuf.Bytes())
}
}
if _, err := buffers.WriteTo(c.rw); err != nil {
c.checkIO(err)
return err
}
return nil
}
func (c *Conn) send(isCommand bool, body []byte, flag byte) error {
// Long flag
size := len(body)

View file

@ -83,6 +83,13 @@ func (sck *csocket) Send(msg Msg) error {
return sck.sock.SendMessage(msg.Frames)
}
// SendMulti puts the message on the outbound send queue.
// SendMulti blocks until the message can be queued or the send deadline expires.
// The message will be sent as a multipart message.
func (sck *csocket) SendMulti(msg Msg) error {
return sck.sock.SendMessage(msg.Frames)
}
// Recv receives a complete message.
func (sck *csocket) Recv() (Msg, error) {
frames, err := sck.sock.RecvMessage()

View file

@ -32,6 +32,13 @@ func (dealer *dealerSocket) Send(msg Msg) error {
return dealer.sck.Send(msg)
}
// SendMulti puts the message on the outbound send queue.
// SendMulti blocks until the message can be queued or the send deadline expires.
// The message will be sent as a multipart message.
func (dealer *dealerSocket) SendMulti(msg Msg) error {
return dealer.sck.SendMulti(msg)
}
// Recv receives a complete message.
func (dealer *dealerSocket) Recv() (Msg, error) {
return dealer.sck.Recv()

7
msg.go
View file

@ -19,9 +19,10 @@ const (
// Msg is a ZMTP message, possibly composed of multiple frames.
type Msg struct {
Frames [][]byte
Type MsgType
err error
Frames [][]byte
Type MsgType
multipart bool
err error
}
func NewMsg(frame []byte) Msg {

View file

@ -32,6 +32,13 @@ func (pair *pairSocket) Send(msg Msg) error {
return pair.sck.Send(msg)
}
// SendMulti puts the message on the outbound send queue.
// SendMulti blocks until the message can be queued or the send deadline expires.
// The message will be sent as a multipart message.
func (pair *pairSocket) SendMulti(msg Msg) error {
return pair.sck.SendMulti(msg)
}
// Recv receives a complete message.
func (pair *pairSocket) Recv() (Msg, error) {
return pair.sck.Recv()

11
pub.go
View file

@ -46,6 +46,16 @@ func (pub *pubSocket) Send(msg Msg) error {
return pub.sck.w.write(ctx, msg)
}
// SendMulti puts the message on the outbound send queue.
// SendMulti blocks until the message can be queued or the send deadline expires.
// The message will be sent as a multipart message.
func (pub *pubSocket) SendMulti(msg Msg) error {
msg.multipart = true
ctx, cancel := context.WithTimeout(pub.sck.ctx, pub.sck.timeout())
defer cancel()
return pub.sck.w.write(ctx, msg)
}
// Recv receives a complete message.
func (*pubSocket) Recv() (Msg, error) {
msg := Msg{err: xerrors.Errorf("zmq4: PUB sockets can't recv messages")}
@ -322,6 +332,7 @@ func (w *pubMWriter) sendMsg(msg Msg) {
topic := string(msg.Frames[0])
w.mu.Lock()
defer w.mu.Unlock()
// TODO(inphi): distribute messages across subscribers at once
for i := range w.ws {
ww := w.ws[i]
if ww.subscribed(topic) {

View file

@ -35,6 +35,13 @@ func (*pullSocket) Send(msg Msg) error {
return xerrors.Errorf("zmq4: PULL sockets can't send messages")
}
// SendMulti puts the message on the outbound send queue.
// SendMulti blocks until the message can be queued or the send deadline expires.
// The message will be sent as a multipart message.
func (pull *pullSocket) SendMulti(msg Msg) error {
return xerrors.Errorf("zmq4: PULL sockets can't send messages")
}
// Recv receives a complete message.
func (pull *pullSocket) Recv() (Msg, error) {
return pull.sck.Recv()

View file

@ -35,6 +35,13 @@ func (push *pushSocket) Send(msg Msg) error {
return push.sck.Send(msg)
}
// SendMulti puts the message on the outbound send queue.
// SendMulti blocks until the message can be queued or the send deadline expires.
// The message will be sent as a multipart message.
func (push *pushSocket) SendMulti(msg Msg) error {
return push.sck.SendMulti(msg)
}
// Recv receives a complete message.
func (*pushSocket) Recv() (Msg, error) {
return Msg{}, xerrors.Errorf("zmq4: PUSH sockets can't recv messages")

8
rep.go
View file

@ -33,6 +33,14 @@ func (rep *repSocket) Send(msg Msg) error {
return rep.sck.Send(msg)
}
// SendMulti puts the message on the outbound send queue.
// SendMulti blocks until the message can be queued or the send deadline expires.
// The message will be sent as a multipart message.
func (rep *repSocket) SendMulti(msg Msg) error {
msg.Frames = append([][]byte{nil}, msg.Frames...)
return rep.sck.SendMulti(msg)
}
// Recv receives a complete message.
func (rep *repSocket) Recv() (Msg, error) {
msg, err := rep.sck.Recv()

8
req.go
View file

@ -33,6 +33,14 @@ func (req *reqSocket) Send(msg Msg) error {
return req.sck.Send(msg)
}
// SendMulti puts the message on the outbound send queue.
// SendMulti blocks until the message can be queued or the send deadline expires.
// The message will be sent as a multipart message.
func (req *reqSocket) SendMulti(msg Msg) error {
msg.Frames = append([][]byte{nil}, msg.Frames...)
return req.sck.SendMulti(msg)
}
// Recv receives a complete message.
func (req *reqSocket) Recv() (Msg, error) {
msg, err := req.sck.Recv()

View file

@ -40,6 +40,14 @@ func (router *routerSocket) Send(msg Msg) error {
return router.sck.w.write(ctx, msg)
}
// SendMulti puts the message on the outbound send queue.
// SendMulti blocks until the message can be queued or the send deadline expires.
// The message will be sent as a multipart message.
func (router *routerSocket) SendMulti(msg Msg) error {
msg.multipart = true
return router.Send(msg)
}
// Recv receives a complete message.
func (router *routerSocket) Recv() (Msg, error) {
return router.sck.Recv()

View file

@ -124,6 +124,16 @@ func (sck *socket) Send(msg Msg) error {
return sck.w.write(ctx, msg)
}
// SendMulti puts the message on the outbound send queue.
// SendMulti blocks until the message can be queued or the send deadline expires.
// The message will be sent as a multipart message.
func (sck *socket) SendMulti(msg Msg) error {
msg.multipart = true
ctx, cancel := context.WithTimeout(sck.ctx, sck.timeout())
defer cancel()
return sck.w.write(ctx, msg)
}
// Recv receives a complete message.
func (sck *socket) Recv() (Msg, error) {
ctx, cancel := context.WithCancel(sck.ctx)

7
sub.go
View file

@ -39,6 +39,13 @@ func (sub *subSocket) Send(msg Msg) error {
return sub.sck.Send(msg)
}
// SendMulti puts the message on the outbound send queue.
// SendMulti blocks until the message can be queued or the send deadline expires.
// The message will be sent as a multipart message.
func (sub *subSocket) SendMulti(msg Msg) error {
return sub.sck.SendMulti(msg)
}
// Recv receives a complete message.
func (sub *subSocket) Recv() (Msg, error) {
return sub.sck.Recv()

View file

@ -32,6 +32,13 @@ func (xpub *xpubSocket) Send(msg Msg) error {
return xpub.sck.Send(msg)
}
// SendMulti puts the message on the outbound send queue.
// SendMulti blocks until the message can be queued or the send deadline expires.
// The message will be sent as a multipart message.
func (xpub *xpubSocket) SendMulti(msg Msg) error {
return xpub.sck.SendMulti(msg)
}
// Recv receives a complete message.
func (xpub *xpubSocket) Recv() (Msg, error) {
return xpub.sck.Recv()

View file

@ -32,6 +32,13 @@ func (xsub *xsubSocket) Send(msg Msg) error {
return xsub.sck.Send(msg)
}
// SendMulti puts the message on the outbound send queue.
// SendMulti blocks until the message can be queued or the send deadline expires.
// The message will be sent as a multipart message.
func (xsub *xsubSocket) SendMulti(msg Msg) error {
return xsub.sck.SendMulti(msg)
}
// Recv receives a complete message.
func (xsub *xsubSocket) Recv() (Msg, error) {
return xsub.sck.Recv()

View file

@ -18,6 +18,11 @@ type Socket interface {
// Send blocks until the message can be queued or the send deadline expires.
Send(msg Msg) error
// SendMulti puts the message on the outbound send queue.
// SendMulti blocks until the message can be queued or the send deadline expires.
// The message will be sent as a multipart message.
SendMulti(msg Msg) error
// Recv receives a complete message.
Recv() (Msg, error)

View file

@ -293,6 +293,82 @@ func TestPubSubClosedSub(t *testing.T) {
}
}
func TestPubSubMultiPart(t *testing.T) {
msg := zmq4.NewMsgFrom([]byte("msgA"), []byte("msgB"), []byte("msgC"))
pub := zmq4.NewPub(bkg)
sub := zmq4.NewSub(bkg)
defer pub.Close()
defer sub.Close()
ep := must(EndPoint("tcp"))
cleanUp(ep)
ctx, timeout := context.WithTimeout(context.Background(), 20*time.Second)
defer timeout()
grp, _ := errgroup.WithContext(ctx)
pss := newPubSubSync(1)
grp.Go(func() error {
var err error
err = pub.Listen(ep)
if err != nil {
return xerrors.Errorf("could not listen on end point: %+v", err)
}
if addr := pub.Addr(); addr == nil {
return xerrors.Errorf("listener with nil Addr")
}
pss.WaitForSubscriptions()
time.Sleep(1 * time.Second)
err = pub.SendMulti(msg)
if err != nil {
return xerrors.Errorf("could not send message %v: %w", msg, err)
}
return nil
})
grp.Go(func() error {
var err error
err = sub.Dial(ep)
if err != nil {
return xerrors.Errorf("could not dial: %w", err)
}
if addr := sub.Addr(); addr != nil {
return xerrors.Errorf("dialer with non-nil Addr")
}
pss.DialComplete()
pss.WaitForDialers()
err = sub.SetOption(zmq4.OptionSubscribe, "msg")
if err != nil {
return xerrors.Errorf("could not subscribe to topic: %w", err)
}
pss.SubscriptionComplete()
pss.WaitForSubscriptions()
newMsg, err := sub.Recv()
if err != nil {
return xerrors.Errorf("could not recv message %v: %w", msg, err)
}
if !reflect.DeepEqual(newMsg, msg) {
return xerrors.Errorf("got = %v, want= %v", newMsg, msg)
}
return err
})
if err := grp.Wait(); err != nil {
t.Fatalf("error: %+v", err)
}
}
func TestTopics(t *testing.T) {
ctx, timeout := context.WithTimeout(context.Background(), 20*time.Second)
defer timeout()
@ -461,7 +537,7 @@ func TestPubOptionHWM(t *testing.T) {
ctx, timeout := context.WithTimeout(context.Background(), 20*time.Second)
defer timeout()
grp, ctx := errgroup.WithContext(ctx)
grp, _ := errgroup.WithContext(ctx)
pss := newPubSubSync(1)
grp.Go(func() error {
@ -535,7 +611,11 @@ func TestPubOptionHWM(t *testing.T) {
func BenchmarkPubSub(b *testing.B) {
topic := "msg"
msg := zmq4.NewMsg([]byte("msg"))
msgs := make([][]byte, 10)
for i := range msgs {
msgs[i] = []byte("msg")
}
msg := zmq4.NewMsgFrom(msgs...)
pub := zmq4.NewPub(bkg)
sub := zmq4.NewSub(bkg)
@ -548,7 +628,7 @@ func BenchmarkPubSub(b *testing.B) {
ctx, timeout := context.WithTimeout(context.Background(), 20*time.Second)
defer timeout()
grp, ctx := errgroup.WithContext(ctx)
grp, _ := errgroup.WithContext(ctx)
msgCount := 1 << 18
pss := newPubSubSync(1)
@ -564,7 +644,7 @@ func BenchmarkPubSub(b *testing.B) {
time.Sleep(1 * time.Second)
for i := 0; i < msgCount; i++ {
err = pub.Send(msg)
err = pub.SendMulti(msg)
if err != nil {
return xerrors.Errorf("error sending message: %v\n", err)
}
@ -591,11 +671,15 @@ func BenchmarkPubSub(b *testing.B) {
pss.SubscriptionComplete()
pss.WaitForSubscriptions()
var siz int
for i := 0; i < msgCount; i++ {
_, err := sub.Recv()
msg, err := sub.Recv()
if err != nil {
return xerrors.Errorf("could not recv message: %v", err)
}
for _, frame := range msg.Frames {
siz += len(frame)
}
}
return err