pubsub_iface_v2.diff

· erock's pastes · raw

expires: 2024-12-24

 1diff --git a/multicast.go b/multicast.go
 2index 23c0d8d..f503008 100644
 3--- a/multicast.go
 4+++ b/multicast.go
 5@@ -329,10 +329,13 @@ func (b *PubSubMulticast) Sub(channels []*Channel, sub *Sub) error {
 6 	return finErr
 7 }
 8 
 9-func (b *PubSubMulticast) Pub(channels []*Channel, pub *Pub) error {
10+func (b *PubSubMulticast) Pub(matcher ChannelMatcher, pub *Pub) error {
11 	var wg sync.WaitGroup
12 	var finErr error
13-	for _, ch := range channels {
14+	b.Channels.Range(func(I string, ch *Channel) bool {
15+		if !matcher(ch) {
16+			return true
17+		}
18 		wg.Add(1)
19 		go func(channel *Channel) {
20 			err := b._pub(channel, pub)
21@@ -347,7 +350,8 @@ func (b *PubSubMulticast) Pub(channels []*Channel, pub *Pub) error {
22 			}
23 			wg.Done()
24 		}(ch)
25-	}
26+		return true
27+	})
28 	wg.Wait()
29 	return finErr
30 }
31diff --git a/pubsub.go b/pubsub.go
32index a9232a1..c386964 100644
33--- a/pubsub.go
34+++ b/pubsub.go
35@@ -212,14 +212,16 @@ func (pipe *Pipe) Cleanup() {
36 	})
37 }
38 
39+type ChannelMatcher func(J *Channel) bool
40+
41 type PubSub interface {
42-	GetSubs(channels []*Channel) []*Sub
43-	GetPubs(channels []*Channel) []*Pub
44+	GetSubs(channels ChannelMatcher) []*Sub
45+	GetPubs(channels ChannelMatcher) []*Pub
46 	GetChannels() []*Channel
47 	GetPipes() []*Pipe
48 	Pipe(pipes []*Pipe, pipeClient *PipeClient) error
49-	Sub(channels []*Channel, sub *Sub) error
50-	Pub(channel []*Channel, pub *Pub) error
51+	Sub(channels ChannelMatcher, sub *Sub) error
52+	Pub(channels ChannelMatcher, pub *Pub) error
53 }
54 
55 type Cfg struct {