zmq4: optimize pub socket Send

This also changes the semantics of the pub socket to match that of czmq;
pubSocket.Send now returns immediately once the msg has been written
onto an outbound queue.

A new option, OptionHWM is added, per czmq semantics, to control the
high-water mark. Only the pub socket supports this option at the moment.
This commit is contained in:
Murphy Law 2019-10-14 19:21:05 -07:00 committed by Sebastien Binet
parent 4bfbf6ceda
commit 33d608acca
5 changed files with 447 additions and 18 deletions

View file

@ -62,4 +62,5 @@ func WithRecvBufferSize(size int) Option {
const (
OptionSubscribe = "SUBSCRIBE"
OptionUnsubscribe = "UNSUBSCRIBE"
OptionHWM = "HWM"
)

87
pub.go
View file

@ -10,7 +10,6 @@ import (
"sort"
"sync"
"golang.org/x/sync/errgroup"
"golang.org/x/xerrors"
)
@ -81,7 +80,25 @@ func (pub *pubSocket) GetOption(name string) (interface{}, error) {
// SetOption is used to set an option for a socket.
func (pub *pubSocket) SetOption(name string, value interface{}) error {
return pub.sck.SetOption(name, value)
err := pub.sck.SetOption(name, value)
if err != nil {
return err
}
if name != OptionHWM {
return ErrBadProperty
}
hwm, ok := value.(int)
if !ok {
return ErrBadProperty
}
w := pub.sck.w.(*pubMWriter)
w.qmu.Lock()
w.hwm = hwm
w.qmu.Unlock()
return nil
}
// Topics returns the sorted list of topics a socket is subscribed to.
@ -213,15 +230,46 @@ type pubMWriter struct {
ctx context.Context
mu sync.Mutex
ws []*Conn
qmu sync.Mutex
qcond *sync.Cond
q *Queue
hwm int
closed bool
}
func newPubMWriter(ctx context.Context) *pubMWriter {
return &pubMWriter{
p := &pubMWriter{
ctx: ctx,
q: NewQueue(),
}
p.qcond = sync.NewCond(&p.qmu)
go p.run()
return p
}
func (w *pubMWriter) run() {
for {
w.qmu.Lock()
for w.q.Len() == 0 {
w.qcond.Wait()
if w.closed {
return
}
}
msg, _ := w.q.Peek()
w.q.Pop()
w.qmu.Unlock()
w.sendMsg(msg)
}
}
func (w *pubMWriter) Close() error {
w.qmu.Lock()
w.closed = true
w.qcond.Signal()
w.qmu.Unlock()
w.mu.Lock()
var err error
for _, ww := range w.ws {
@ -249,6 +297,7 @@ func (mw *pubMWriter) rmConn(w *Conn) {
for i := range mw.ws {
if mw.ws[i] == w {
cur = i
mw.ws[i].Close()
break
}
}
@ -258,25 +307,27 @@ func (mw *pubMWriter) rmConn(w *Conn) {
}
func (w *pubMWriter) write(ctx context.Context, msg Msg) error {
grp, ctx := errgroup.WithContext(ctx)
w.mu.Lock()
w.qmu.Lock()
defer w.qmu.Unlock()
if w.hwm != 0 && w.q.Len() >= w.hwm {
//TODO(inphi): per subscriber hwm
return nil
}
w.q.Push(msg)
w.qcond.Signal()
return nil
}
func (w *pubMWriter) sendMsg(msg Msg) {
topic := string(msg.Frames[0])
w.mu.Lock()
defer w.mu.Unlock()
for i := range w.ws {
ww := w.ws[i]
grp.Go(func() error {
if !ww.subscribed(topic) {
return nil
}
err := ww.SendMsg(msg)
if err != nil && ww.Closed() {
err = nil
}
return err
})
if ww.subscribed(topic) {
_ = ww.SendMsg(msg)
}
}
err := grp.Wait()
w.mu.Unlock()
return err
}
var (

79
queue.go Normal file
View file

@ -0,0 +1,79 @@
// Copyright 2019 The go-zeromq Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package zmq4
import (
"container/list"
)
const innerCap = 512
type Queue struct {
rep *list.List
len int
}
func NewQueue() *Queue {
q := &Queue{list.New(), 0}
return q
}
func (q *Queue) Len() int {
return q.len
}
func (q *Queue) Init() {
q.rep.Init()
q.len = 0
}
func (q *Queue) Push(val Msg) {
q.len++
var i []interface{}
elem := q.rep.Back()
if elem != nil {
i = elem.Value.([]interface{})
}
if i == nil || len(i) == innerCap {
elem = q.rep.PushBack(make([]interface{}, 0, innerCap))
i = elem.Value.([]interface{})
}
elem.Value = append(i, val)
}
func (q *Queue) Peek() (Msg, bool) {
i := q.front()
if i == nil {
return Msg{}, false
}
return i[0].(Msg), true
}
func (q *Queue) Pop() {
elem := q.rep.Front()
if elem == nil {
panic("attempting to Pop on an empty Queue")
}
q.len--
i := elem.Value.([]interface{})
i[0] = nil // remove ref to poped element
i = i[1:]
if len(i) == 0 {
q.rep.Remove(elem)
} else {
elem.Value = i
}
}
func (q *Queue) front() []interface{} {
elem := q.rep.Front()
if elem == nil {
return nil
}
return elem.Value.([]interface{})
}

100
queue_test.go Normal file
View file

@ -0,0 +1,100 @@
// Copyright 2019 The go-zeromq Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package zmq4
import (
"reflect"
"testing"
)
func makeMsg(i int) Msg {
return NewMsgString(string(i))
}
func TestQueue(t *testing.T) {
q := NewQueue()
if q.Len() != 0 {
t.Fatal("queue should be empty")
}
if _, exists := q.Peek(); exists {
t.Fatal("Queue should be empty")
}
q.Push(makeMsg(1))
if q.Len() != 1 {
t.Fatal("queue should contain 1 element")
}
msg, ok := q.Peek()
if !ok || !reflect.DeepEqual(msg, makeMsg(1)) {
t.Fatal("unexpected value in queue")
}
q.Push(makeMsg(2))
if q.Len() != 2 {
t.Fatal("queue should contain 2 elements")
}
msg, ok = q.Peek()
if !ok || !reflect.DeepEqual(msg, makeMsg(1)) {
t.Fatal("unexpected value in queue")
}
q.Pop()
if q.Len() != 1 {
t.Fatal("queue should contain 1 element")
}
msg, ok = q.Peek()
if !ok || !reflect.DeepEqual(msg, makeMsg(2)) {
t.Fatal("unexpected value in queue")
}
q.Pop()
if q.Len() != 0 {
t.Fatal("queue should be empty")
}
q.Push(makeMsg(1))
q.Push(makeMsg(2))
q.Init()
if q.Len() != 0 {
t.Fatal("queue should be empty")
}
}
func TestQueueNewInnerList(t *testing.T) {
q := NewQueue()
for i := 1; i <= innerCap; i++ {
q.Push(makeMsg(i))
}
if q.Len() != innerCap {
t.Fatalf("queue should contain %d elements", innerCap)
}
// next push will create a new inner slice
q.Push(makeMsg(innerCap + 1))
if q.Len() != innerCap+1 {
t.Fatalf("queue should contain %d elements", innerCap+1)
}
msg, ok := q.Peek()
if !ok || !reflect.DeepEqual(msg, makeMsg(1)) {
t.Fatal("unexpected value in queue")
}
q.Pop()
if q.Len() != innerCap {
t.Fatalf("queue should contain %d elements", innerCap)
}
msg, ok = q.Peek()
if !ok || !reflect.DeepEqual(msg, makeMsg(2)) {
t.Fatal("unexpected value in queue")
}
q.Push(makeMsg(innerCap + 1))
q.Init()
if q.Len() != 0 {
t.Fatal("queue should be empty")
}
}

View file

@ -371,6 +371,7 @@ func TestPubSubDeadPub(t *testing.T) {
defer sub.Close()
subReady := make(chan struct{})
subDoneReading := make(chan struct{})
pubClosed := make(chan struct{})
const nmsgs = 4 // the number of messages do not matter
@ -393,6 +394,7 @@ func TestPubSubDeadPub(t *testing.T) {
return xerrors.Errorf("could not send message %v: %w", msg, err)
}
}
<-subDoneReading
return err
})
@ -421,6 +423,7 @@ func TestPubSubDeadPub(t *testing.T) {
}
}
close(subDoneReading)
<-pubClosed
_, err = sub.Recv() // make sure we aren't deadlocked
@ -435,3 +438,198 @@ func TestPubSubDeadPub(t *testing.T) {
t.Fatalf("error: %+v", err)
}
}
func TestPubOptionHWM(t *testing.T) {
topic := "msg"
pub := zmq4.NewPub(bkg)
subCtx, subCancel := context.WithCancel(bkg)
sub := zmq4.NewSub(subCtx)
defer pub.Close()
defer sub.Close()
msgCount := 100
hwm := 10
if err := pub.SetOption(zmq4.OptionHWM, hwm); err != nil {
t.Fatalf("unable to set HWM")
}
ep := must(EndPoint("tcp"))
cleanUp(ep)
ctx, timeout := context.WithTimeout(context.Background(), 20*time.Second)
defer timeout()
grp, ctx := errgroup.WithContext(ctx)
pss := newPubSubSync(1)
grp.Go(func() error {
var err error
err = pub.Listen(ep)
if err != nil {
return xerrors.Errorf("could not listen on end point: %+v", err)
}
pss.WaitForSubscriptions()
for i := 1; i <= msgCount; i++ {
msg := zmq4.NewMsgFrom([]byte("msg"), []byte(string(i)))
err = pub.Send(msg)
if err != nil {
return xerrors.Errorf("error sending message. [%d] got: %v", i, err)
}
}
// give the subscriber time to receive the last message
time.Sleep(time.Second * 2)
// Inform the subscriber that there are no more messages, otherwise it'll wait indefinitely while trying to receive dropped messages
subCancel()
return nil
})
grp.Go(func() error {
var err error
err = sub.Dial(ep)
if err != nil {
return xerrors.Errorf("could not dial end point: %+v", err)
}
pss.DialComplete()
pss.WaitForDialers()
err = sub.SetOption(zmq4.OptionSubscribe, topic)
if err != nil {
return xerrors.Errorf("could not subscribe to topic %q: %w", topic, err)
}
pss.SubscriptionComplete()
pss.WaitForSubscriptions()
time.Sleep(time.Second * 1) // slow down for a bit
nmsgs := 0
for i := 1; i <= msgCount; i++ {
_, err = sub.Recv()
if err != nil {
return xerrors.Errorf("could not recv message: %v", err)
}
if subCtx.Err() != nil {
break
}
nmsgs++
}
if nmsgs >= msgCount {
return xerrors.Errorf("Expected dropped messages")
}
return err
})
if err := grp.Wait(); err != nil {
t.Fatalf("error: %+v", err)
}
}
func BenchmarkPubSub(b *testing.B) {
topic := "msg"
msg := zmq4.NewMsg([]byte("msg"))
pub := zmq4.NewPub(bkg)
sub := zmq4.NewSub(bkg)
defer pub.Close()
defer sub.Close()
ep := must(EndPoint("tcp"))
cleanUp(ep)
ctx, timeout := context.WithTimeout(context.Background(), 20*time.Second)
defer timeout()
grp, ctx := errgroup.WithContext(ctx)
msgCount := 1 << 18
pss := newPubSubSync(1)
grp.Go(func() error {
var err error
err = pub.Listen(ep)
if err != nil {
return xerrors.Errorf("could not listen on end point: %+v", err)
}
pss.WaitForSubscriptions()
time.Sleep(1 * time.Second)
for i := 0; i < msgCount; i++ {
err = pub.Send(msg)
if err != nil {
return xerrors.Errorf("error sending message: %v\n", err)
}
}
return err
})
grp.Go(func() error {
var err error
err = sub.Dial(ep)
if err != nil {
return xerrors.Errorf("could not dial end point: %+v", err)
}
pss.DialComplete()
pss.WaitForDialers()
err = sub.SetOption(zmq4.OptionSubscribe, topic)
if err != nil {
return xerrors.Errorf("could not subscribe to topic %q: %w", topic, err)
}
pss.SubscriptionComplete()
pss.WaitForSubscriptions()
for i := 0; i < msgCount; i++ {
_, err := sub.Recv()
if err != nil {
return xerrors.Errorf("could not recv message: %v", err)
}
}
return err
})
if err := grp.Wait(); err != nil {
b.Fatalf("error: %+v", err)
}
}
type pubSubSync struct {
wg1 sync.WaitGroup
wg2 sync.WaitGroup
}
func newPubSubSync(nrSubs int) *pubSubSync {
p := &pubSubSync{}
p.wg1.Add(nrSubs)
p.wg2.Add(nrSubs)
return p
}
func (p *pubSubSync) DialComplete() {
p.wg1.Done()
}
func (p *pubSubSync) WaitForDialers() {
p.wg1.Wait()
}
func (p *pubSubSync) SubscriptionComplete() {
p.wg2.Done()
}
func (p *pubSubSync) WaitForSubscriptions() {
p.wg2.Wait()
}