721 lines
15 KiB
Go
721 lines
15 KiB
Go
// Copyright 2018 The go-zeromq Authors. All rights reserved.
|
|
// Use of this source code is governed by a BSD-style
|
|
// license that can be found in the LICENSE file.
|
|
|
|
package zmq4_test
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"reflect"
|
|
"sort"
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
|
|
"git.gammaspectra.live/P2Pool/zmq4"
|
|
"golang.org/x/sync/errgroup"
|
|
)
|
|
|
|
var (
|
|
pubsubs = []testCasePubSub{
|
|
{
|
|
name: "tcp-pub-sub",
|
|
endpoint: must(EndPoint("tcp")),
|
|
pub: zmq4.NewPub(bkg),
|
|
sub0: zmq4.NewSub(bkg, zmq4.WithID(zmq4.SocketIdentity("sub0"))),
|
|
sub1: zmq4.NewSub(bkg, zmq4.WithID(zmq4.SocketIdentity("sub1"))),
|
|
sub2: zmq4.NewSub(bkg, zmq4.WithID(zmq4.SocketIdentity("sub2"))),
|
|
},
|
|
{
|
|
name: "ipc-pub-sub",
|
|
endpoint: "ipc://ipc-pub-sub",
|
|
pub: zmq4.NewPub(bkg),
|
|
sub0: zmq4.NewSub(bkg, zmq4.WithID(zmq4.SocketIdentity("sub0"))),
|
|
sub1: zmq4.NewSub(bkg, zmq4.WithID(zmq4.SocketIdentity("sub1"))),
|
|
sub2: zmq4.NewSub(bkg, zmq4.WithID(zmq4.SocketIdentity("sub2"))),
|
|
},
|
|
{
|
|
name: "inproc-pub-sub",
|
|
endpoint: "inproc://inproc-pub-sub",
|
|
pub: zmq4.NewPub(bkg),
|
|
sub0: zmq4.NewSub(bkg, zmq4.WithID(zmq4.SocketIdentity("sub0"))),
|
|
sub1: zmq4.NewSub(bkg, zmq4.WithID(zmq4.SocketIdentity("sub1"))),
|
|
sub2: zmq4.NewSub(bkg, zmq4.WithID(zmq4.SocketIdentity("sub2"))),
|
|
},
|
|
}
|
|
)
|
|
|
|
type testCasePubSub struct {
|
|
name string
|
|
skip bool
|
|
endpoint string
|
|
pub zmq4.Socket
|
|
sub0 zmq4.Socket
|
|
sub1 zmq4.Socket
|
|
sub2 zmq4.Socket
|
|
}
|
|
|
|
func TestNotBlockingSendOnPub(t *testing.T) {
|
|
|
|
pub := zmq4.NewPub(context.Background())
|
|
defer pub.Close()
|
|
|
|
err := pub.Listen(must(EndPoint("tcp")))
|
|
if err != nil {
|
|
t.Fatalf("could not listen on end point: %+v", err)
|
|
}
|
|
|
|
errc := make(chan error)
|
|
go func() {
|
|
errc <- pub.Send(zmq4.NewMsg([]byte("blocked?")))
|
|
}()
|
|
|
|
select {
|
|
case <-time.After(5 * time.Second):
|
|
t.Fatalf("pub socket should not block!")
|
|
case err := <-errc:
|
|
if err != nil {
|
|
t.Fatalf("unexpected error: %+v", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestPubSub(t *testing.T) {
|
|
var (
|
|
topics = []string{"", "MSG", "msg"}
|
|
wantNumMsgs = []int{3, 1, 1}
|
|
msg0 = zmq4.NewMsgString("anything")
|
|
msg1 = zmq4.NewMsgString("MSG 1")
|
|
msg2 = zmq4.NewMsgString("msg 2")
|
|
msgs = [][]zmq4.Msg{
|
|
0: {msg0, msg1, msg2},
|
|
1: {msg1},
|
|
2: {msg2},
|
|
}
|
|
)
|
|
|
|
for i := range pubsubs {
|
|
tc := pubsubs[i]
|
|
t.Run(tc.name, func(t *testing.T) {
|
|
defer tc.pub.Close()
|
|
defer tc.sub0.Close()
|
|
defer tc.sub1.Close()
|
|
defer tc.sub2.Close()
|
|
|
|
ep := tc.endpoint
|
|
cleanUp(ep)
|
|
|
|
if tc.skip {
|
|
t.Skipf(tc.name)
|
|
}
|
|
// t.Parallel()
|
|
|
|
ctx, timeout := context.WithTimeout(context.Background(), 20*time.Second)
|
|
defer timeout()
|
|
|
|
nmsgs := []int{0, 0, 0}
|
|
subs := []zmq4.Socket{tc.sub0, tc.sub1, tc.sub2}
|
|
|
|
var wg1 sync.WaitGroup
|
|
var wg2 sync.WaitGroup
|
|
wg1.Add(len(subs))
|
|
wg2.Add(len(subs))
|
|
|
|
grp, _ := errgroup.WithContext(ctx)
|
|
grp.Go(func() error {
|
|
|
|
err := tc.pub.Listen(ep)
|
|
if err != nil {
|
|
return fmt.Errorf("could not listen: %w", err)
|
|
}
|
|
|
|
if addr := tc.pub.Addr(); addr == nil {
|
|
return fmt.Errorf("listener with nil Addr")
|
|
}
|
|
|
|
wg1.Wait()
|
|
wg2.Wait()
|
|
|
|
time.Sleep(1 * time.Second)
|
|
|
|
for _, msg := range msgs[0] {
|
|
err = tc.pub.Send(msg)
|
|
if err != nil {
|
|
return fmt.Errorf("could not send message %v: %w", msg, err)
|
|
}
|
|
}
|
|
|
|
return err
|
|
})
|
|
|
|
for isub := range subs {
|
|
func(isub int, sub zmq4.Socket) {
|
|
grp.Go(func() error {
|
|
var err error
|
|
err = sub.Dial(ep)
|
|
if err != nil {
|
|
return fmt.Errorf("could not dial: %w", err)
|
|
}
|
|
|
|
if addr := sub.Addr(); addr != nil {
|
|
return fmt.Errorf("dialer with non-nil Addr")
|
|
}
|
|
|
|
wg1.Done()
|
|
wg1.Wait()
|
|
|
|
err = sub.SetOption(zmq4.OptionSubscribe, topics[isub])
|
|
if err != nil {
|
|
return fmt.Errorf("could not subscribe to topic %q: %w", topics[isub], err)
|
|
}
|
|
|
|
wg2.Done()
|
|
wg2.Wait()
|
|
|
|
msgs := msgs[isub]
|
|
for imsg, want := range msgs {
|
|
msg, err := sub.Recv()
|
|
if err != nil {
|
|
return fmt.Errorf("could not recv message %v: %w", want, err)
|
|
}
|
|
if !reflect.DeepEqual(msg, want) {
|
|
return fmt.Errorf("sub[%d][msg=%d]: got = %v, want= %v", isub, imsg, msg, want)
|
|
}
|
|
nmsgs[isub]++
|
|
}
|
|
|
|
return err
|
|
})
|
|
}(isub, subs[isub])
|
|
}
|
|
|
|
if err := grp.Wait(); err != nil {
|
|
t.Fatalf("error: %+v", err)
|
|
}
|
|
|
|
for i, want := range wantNumMsgs {
|
|
if want != nmsgs[i] {
|
|
t.Errorf("sub[%d]: got %d messages, want %d msgs=%v", i, nmsgs[i], want, nmsgs)
|
|
}
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
// TestPubSubClosedSub ensures that publishers do not return errors even after a subscriber is closed/disconnected.
|
|
func TestPubSubClosedSub(t *testing.T) {
|
|
ep := must(EndPoint("tcp"))
|
|
topic := "msg"
|
|
msg := zmq4.NewMsgString("msg")
|
|
|
|
bkg := context.Background()
|
|
ctx, timeout := context.WithTimeout(bkg, 20*time.Second)
|
|
defer timeout()
|
|
|
|
pub := zmq4.NewPub(ctx)
|
|
defer pub.Close()
|
|
|
|
subCtx, cancelSub := context.WithCancel(ctx)
|
|
sub := zmq4.NewSub(subCtx)
|
|
|
|
subReady := make(chan struct{})
|
|
subClosed := make(chan struct{})
|
|
|
|
const nmsgs = 100 // the number of messages do not matter
|
|
|
|
grp, _ := errgroup.WithContext(ctx)
|
|
grp.Go(func() error {
|
|
err := pub.Listen(ep)
|
|
if err != nil {
|
|
return fmt.Errorf("could not listen on end point: %+v", err)
|
|
}
|
|
|
|
<-subReady
|
|
time.Sleep(time.Second * 1)
|
|
|
|
for i := 0; i < nmsgs; i++ {
|
|
if err := pub.Send(msg); err != nil {
|
|
return fmt.Errorf("could not send message %v: %w", msg, err)
|
|
}
|
|
}
|
|
|
|
cancelSub()
|
|
<-subClosed
|
|
time.Sleep(time.Second * 1)
|
|
|
|
for i := 0; i < nmsgs; i++ {
|
|
if err := pub.Send(msg); err != nil {
|
|
return fmt.Errorf("could not send message %v: %w", msg, err)
|
|
}
|
|
}
|
|
|
|
return err
|
|
})
|
|
|
|
grp.Go(func() error {
|
|
defer func() {
|
|
sub.Close()
|
|
close(subClosed)
|
|
}()
|
|
|
|
var err error
|
|
err = sub.Dial(ep)
|
|
if err != nil {
|
|
return fmt.Errorf("could not dial: %w", err)
|
|
}
|
|
|
|
err = sub.SetOption(zmq4.OptionSubscribe, topic)
|
|
if err != nil {
|
|
return fmt.Errorf("could not subscribe to topic %q: %w", topic, err)
|
|
}
|
|
|
|
close(subReady)
|
|
|
|
for {
|
|
rmsg, err := sub.Recv()
|
|
if subCtx.Err() == context.Canceled {
|
|
break
|
|
}
|
|
if err != nil {
|
|
return fmt.Errorf("could not recv message: %w", err)
|
|
}
|
|
if !reflect.DeepEqual(rmsg, msg) {
|
|
return fmt.Errorf("sub: got = %v, want= %v", rmsg, msg)
|
|
}
|
|
}
|
|
|
|
return err
|
|
})
|
|
|
|
if err := grp.Wait(); err != nil {
|
|
t.Fatalf("error: %+v", err)
|
|
}
|
|
}
|
|
|
|
func TestPubSubMultiPart(t *testing.T) {
|
|
msg := zmq4.NewMsgFrom([]byte("msgA"), []byte("msgB"), []byte("msgC"))
|
|
pub := zmq4.NewPub(bkg)
|
|
sub := zmq4.NewSub(bkg)
|
|
|
|
defer pub.Close()
|
|
defer sub.Close()
|
|
|
|
ep := must(EndPoint("tcp"))
|
|
cleanUp(ep)
|
|
|
|
ctx, timeout := context.WithTimeout(context.Background(), 20*time.Second)
|
|
defer timeout()
|
|
|
|
grp, _ := errgroup.WithContext(ctx)
|
|
|
|
pss := newPubSubSync(1)
|
|
|
|
grp.Go(func() error {
|
|
var err error
|
|
err = pub.Listen(ep)
|
|
if err != nil {
|
|
return fmt.Errorf("could not listen on end point: %+v", err)
|
|
}
|
|
if addr := pub.Addr(); addr == nil {
|
|
return fmt.Errorf("listener with nil Addr")
|
|
}
|
|
|
|
pss.WaitForSubscriptions()
|
|
time.Sleep(1 * time.Second)
|
|
|
|
err = pub.SendMulti(msg)
|
|
if err != nil {
|
|
return fmt.Errorf("could not send message %v: %w", msg, err)
|
|
}
|
|
|
|
return nil
|
|
})
|
|
|
|
grp.Go(func() error {
|
|
var err error
|
|
err = sub.Dial(ep)
|
|
if err != nil {
|
|
return fmt.Errorf("could not dial: %w", err)
|
|
}
|
|
|
|
if addr := sub.Addr(); addr != nil {
|
|
return fmt.Errorf("dialer with non-nil Addr")
|
|
}
|
|
|
|
pss.DialComplete()
|
|
pss.WaitForDialers()
|
|
|
|
err = sub.SetOption(zmq4.OptionSubscribe, "msg")
|
|
if err != nil {
|
|
return fmt.Errorf("could not subscribe to topic: %w", err)
|
|
}
|
|
|
|
pss.SubscriptionComplete()
|
|
pss.WaitForSubscriptions()
|
|
|
|
newMsg, err := sub.Recv()
|
|
if err != nil {
|
|
return fmt.Errorf("could not recv message %v: %w", msg, err)
|
|
}
|
|
if !reflect.DeepEqual(newMsg, msg) {
|
|
return fmt.Errorf("got = %v, want= %v", newMsg, msg)
|
|
}
|
|
return err
|
|
})
|
|
|
|
if err := grp.Wait(); err != nil {
|
|
t.Fatalf("error: %+v", err)
|
|
}
|
|
}
|
|
|
|
func TestTopics(t *testing.T) {
|
|
ctx, timeout := context.WithTimeout(context.Background(), 20*time.Second)
|
|
defer timeout()
|
|
|
|
ep := must(EndPoint("tcp"))
|
|
pub := zmq4.NewPub(ctx)
|
|
sub0 := zmq4.NewSub(ctx, zmq4.WithID(zmq4.SocketIdentity("sub0")))
|
|
sub1 := zmq4.NewSub(ctx, zmq4.WithID(zmq4.SocketIdentity("sub1")))
|
|
sub2 := zmq4.NewSub(ctx, zmq4.WithID(zmq4.SocketIdentity("sub2")))
|
|
sub3 := zmq4.NewSub(ctx, zmq4.WithID(zmq4.SocketIdentity("sub3")))
|
|
sub4 := zmq4.NewSub(ctx, zmq4.WithID(zmq4.SocketIdentity("sub4")))
|
|
sub5 := zmq4.NewSub(ctx, zmq4.WithID(zmq4.SocketIdentity("sub5")))
|
|
|
|
subs := []zmq4.Socket{sub0, sub1, sub2, sub3, sub4, sub5}
|
|
|
|
defer pub.Close()
|
|
defer sub0.Close()
|
|
defer sub1.Close()
|
|
defer sub2.Close()
|
|
defer sub3.Close()
|
|
defer sub4.Close()
|
|
defer sub5.Close()
|
|
|
|
err := pub.Listen(ep)
|
|
if err != nil {
|
|
t.Fatalf("could not listen: %+v", err)
|
|
}
|
|
|
|
for isub, sub := range subs {
|
|
topics := []string{"", "a", "b", "c", "2", "A_2"}
|
|
|
|
err = sub.Dial(ep)
|
|
if err != nil {
|
|
t.Fatalf("could not dial: %+v", err)
|
|
}
|
|
|
|
err = sub.SetOption(zmq4.OptionSubscribe, topics[isub])
|
|
if err != nil {
|
|
t.Fatalf("could not subscribe to topic %q: %+v", topics[isub], err)
|
|
}
|
|
time.Sleep(500 * time.Millisecond)
|
|
|
|
got := sub.(zmq4.Topics).Topics()
|
|
want := []string{topics[isub]}
|
|
if !reflect.DeepEqual(got, want) {
|
|
t.Fatalf("Missing or wrong topics.\ngot= %q\nwant=%q", got, want)
|
|
}
|
|
|
|
got = pub.(zmq4.Topics).Topics()
|
|
if len(got) != isub+1 {
|
|
t.Fatalf("got %d topics, want %d topics", len(got), isub+1)
|
|
}
|
|
|
|
want = make([]string, isub+1)
|
|
copy(want, topics)
|
|
sort.Strings(want)
|
|
if !reflect.DeepEqual(got, want) {
|
|
t.Fatalf("Missing or wrong topics.\ngot= %q\nwant=%q", got, want)
|
|
}
|
|
}
|
|
}
|
|
|
|
// TestPubSubDeadPub ensures that subscribers can proceed even after losing connection to the publisher
|
|
func TestPubSubDeadPub(t *testing.T) {
|
|
ep := must(EndPoint("tcp"))
|
|
topic := "msg"
|
|
msg := zmq4.NewMsgString("msg")
|
|
|
|
bkg := context.Background()
|
|
ctx, timeout := context.WithTimeout(bkg, 20*time.Second)
|
|
defer timeout()
|
|
|
|
pub := zmq4.NewPub(ctx)
|
|
sub := zmq4.NewSub(ctx)
|
|
|
|
defer sub.Close()
|
|
|
|
subReady := make(chan struct{})
|
|
subDoneReading := make(chan struct{})
|
|
pubClosed := make(chan struct{})
|
|
|
|
const nmsgs = 4 // the number of messages do not matter
|
|
|
|
grp, _ := errgroup.WithContext(ctx)
|
|
grp.Go(func() error {
|
|
defer close(pubClosed)
|
|
defer pub.Close()
|
|
|
|
err := pub.Listen(ep)
|
|
if err != nil {
|
|
return fmt.Errorf("could not listen on end point: %+v", err)
|
|
}
|
|
|
|
<-subReady
|
|
time.Sleep(time.Second * 1)
|
|
|
|
for i := 0; i < nmsgs; i++ {
|
|
if err := pub.Send(msg); err != nil {
|
|
return fmt.Errorf("could not send message %v: %w", msg, err)
|
|
}
|
|
}
|
|
<-subDoneReading
|
|
|
|
return err
|
|
})
|
|
|
|
grp.Go(func() error {
|
|
var err error
|
|
err = sub.Dial(ep)
|
|
if err != nil {
|
|
return fmt.Errorf("could not dial: %w", err)
|
|
}
|
|
|
|
err = sub.SetOption(zmq4.OptionSubscribe, topic)
|
|
if err != nil {
|
|
return fmt.Errorf("could not subscribe to topic %q: %w", topic, err)
|
|
}
|
|
|
|
close(subReady)
|
|
|
|
for i := 0; i < nmsgs; i++ {
|
|
rmsg, err := sub.Recv()
|
|
if err != nil {
|
|
return fmt.Errorf("could not recv message: %w", err)
|
|
}
|
|
if !reflect.DeepEqual(rmsg, msg) {
|
|
return fmt.Errorf("sub: got = %v, want= %v", rmsg, msg)
|
|
}
|
|
}
|
|
|
|
close(subDoneReading)
|
|
<-pubClosed
|
|
|
|
_, err = sub.Recv() // make sure we aren't deadlocked
|
|
if err == nil {
|
|
return errors.New("expected an error")
|
|
}
|
|
|
|
return nil
|
|
})
|
|
|
|
if err := grp.Wait(); err != nil {
|
|
t.Fatalf("error: %+v", err)
|
|
}
|
|
}
|
|
|
|
func TestPubOptionHWM(t *testing.T) {
|
|
topic := "msg"
|
|
pub := zmq4.NewPub(bkg)
|
|
|
|
subCtx, subCancel := context.WithCancel(bkg)
|
|
sub := zmq4.NewSub(subCtx)
|
|
|
|
defer pub.Close()
|
|
defer sub.Close()
|
|
|
|
msgCount := 100
|
|
hwm := 10
|
|
if err := pub.SetOption(zmq4.OptionHWM, hwm); err != nil {
|
|
t.Fatalf("unable to set HWM")
|
|
}
|
|
|
|
ep := must(EndPoint("tcp"))
|
|
cleanUp(ep)
|
|
|
|
ctx, timeout := context.WithTimeout(context.Background(), 20*time.Second)
|
|
defer timeout()
|
|
|
|
grp, _ := errgroup.WithContext(ctx)
|
|
pss := newPubSubSync(1)
|
|
|
|
grp.Go(func() error {
|
|
var err error
|
|
err = pub.Listen(ep)
|
|
if err != nil {
|
|
return fmt.Errorf("could not listen on end point: %+v", err)
|
|
}
|
|
|
|
pss.WaitForSubscriptions()
|
|
|
|
for i := 1; i <= msgCount; i++ {
|
|
msg := zmq4.NewMsgFrom([]byte("msg"), []byte(string(rune(i))))
|
|
err = pub.Send(msg)
|
|
if err != nil {
|
|
return fmt.Errorf("error sending message. [%d] got: %v", i, err)
|
|
}
|
|
}
|
|
|
|
// give the subscriber time to receive the last message
|
|
time.Sleep(time.Second * 2)
|
|
// Inform the subscriber that there are no more messages, otherwise it'll wait indefinitely while trying to receive dropped messages
|
|
subCancel()
|
|
return nil
|
|
})
|
|
|
|
grp.Go(func() error {
|
|
var err error
|
|
err = sub.Dial(ep)
|
|
if err != nil {
|
|
return fmt.Errorf("could not dial end point: %+v", err)
|
|
}
|
|
|
|
pss.DialComplete()
|
|
pss.WaitForDialers()
|
|
|
|
err = sub.SetOption(zmq4.OptionSubscribe, topic)
|
|
if err != nil {
|
|
return fmt.Errorf("could not subscribe to topic %q: %w", topic, err)
|
|
}
|
|
|
|
pss.SubscriptionComplete()
|
|
pss.WaitForSubscriptions()
|
|
|
|
time.Sleep(time.Second * 1) // slow down for a bit
|
|
|
|
nmsgs := 0
|
|
|
|
for i := 1; i <= msgCount; i++ {
|
|
_, err := sub.Recv()
|
|
if subCtx.Err() != nil {
|
|
break
|
|
}
|
|
if err != nil {
|
|
return fmt.Errorf("could not recv message: %v", err)
|
|
}
|
|
nmsgs++
|
|
}
|
|
|
|
if nmsgs >= msgCount {
|
|
return fmt.Errorf("Expected dropped messages")
|
|
}
|
|
|
|
return err
|
|
})
|
|
|
|
if err := grp.Wait(); err != nil {
|
|
t.Fatalf("error: %+v", err)
|
|
}
|
|
}
|
|
|
|
func BenchmarkPubSub(b *testing.B) {
|
|
topic := "msg"
|
|
msgs := make([][]byte, 10)
|
|
for i := range msgs {
|
|
msgs[i] = []byte("msg")
|
|
}
|
|
msg := zmq4.NewMsgFrom(msgs...)
|
|
pub := zmq4.NewPub(bkg)
|
|
sub := zmq4.NewSub(bkg)
|
|
|
|
defer pub.Close()
|
|
defer sub.Close()
|
|
|
|
ep := must(EndPoint("tcp"))
|
|
cleanUp(ep)
|
|
|
|
ctx, timeout := context.WithTimeout(context.Background(), 20*time.Second)
|
|
defer timeout()
|
|
|
|
grp, _ := errgroup.WithContext(ctx)
|
|
|
|
msgCount := 1 << 18
|
|
pss := newPubSubSync(1)
|
|
|
|
grp.Go(func() error {
|
|
var err error
|
|
err = pub.Listen(ep)
|
|
if err != nil {
|
|
return fmt.Errorf("could not listen on end point: %+v", err)
|
|
}
|
|
|
|
pss.WaitForSubscriptions()
|
|
time.Sleep(1 * time.Second)
|
|
|
|
for i := 0; i < msgCount; i++ {
|
|
err = pub.SendMulti(msg)
|
|
if err != nil {
|
|
return fmt.Errorf("error sending message: %v\n", err)
|
|
}
|
|
}
|
|
|
|
return err
|
|
})
|
|
|
|
grp.Go(func() error {
|
|
var err error
|
|
err = sub.Dial(ep)
|
|
if err != nil {
|
|
return fmt.Errorf("could not dial end point: %+v", err)
|
|
}
|
|
|
|
pss.DialComplete()
|
|
pss.WaitForDialers()
|
|
|
|
err = sub.SetOption(zmq4.OptionSubscribe, topic)
|
|
if err != nil {
|
|
return fmt.Errorf("could not subscribe to topic %q: %w", topic, err)
|
|
}
|
|
|
|
pss.SubscriptionComplete()
|
|
pss.WaitForSubscriptions()
|
|
|
|
var siz int
|
|
for i := 0; i < msgCount; i++ {
|
|
msg, err := sub.Recv()
|
|
if err != nil {
|
|
return fmt.Errorf("could not recv message: %v", err)
|
|
}
|
|
for _, frame := range msg.Frames {
|
|
siz += len(frame)
|
|
}
|
|
}
|
|
|
|
return err
|
|
})
|
|
|
|
if err := grp.Wait(); err != nil {
|
|
b.Fatalf("error: %+v", err)
|
|
}
|
|
}
|
|
|
|
type pubSubSync struct {
|
|
wg1 sync.WaitGroup
|
|
wg2 sync.WaitGroup
|
|
}
|
|
|
|
func newPubSubSync(nrSubs int) *pubSubSync {
|
|
p := &pubSubSync{}
|
|
p.wg1.Add(nrSubs)
|
|
p.wg2.Add(nrSubs)
|
|
return p
|
|
}
|
|
|
|
func (p *pubSubSync) DialComplete() {
|
|
p.wg1.Done()
|
|
}
|
|
|
|
func (p *pubSubSync) WaitForDialers() {
|
|
p.wg1.Wait()
|
|
}
|
|
|
|
func (p *pubSubSync) SubscriptionComplete() {
|
|
p.wg2.Done()
|
|
}
|
|
|
|
func (p *pubSubSync) WaitForSubscriptions() {
|
|
p.wg2.Wait()
|
|
}
|