diff --git a/multicast.go b/multicast.go index 23c0d8d..f503008 100644 --- a/multicast.go +++ b/multicast.go @@ -329,10 +329,13 @@ func (b *PubSubMulticast) Sub(channels []*Channel, sub *Sub) error { return finErr } -func (b *PubSubMulticast) Pub(channels []*Channel, pub *Pub) error { +func (b *PubSubMulticast) Pub(matcher ChannelMatcher, pub *Pub) error { var wg sync.WaitGroup var finErr error - for _, ch := range channels { + b.Channels.Range(func(I string, ch *Channel) bool { + if !matcher(ch) { + return true + } wg.Add(1) go func(channel *Channel) { err := b._pub(channel, pub) @@ -347,7 +350,8 @@ func (b *PubSubMulticast) Pub(channels []*Channel, pub *Pub) error { } wg.Done() }(ch) - } + return true + }) wg.Wait() return finErr } diff --git a/pubsub.go b/pubsub.go index a9232a1..c386964 100644 --- a/pubsub.go +++ b/pubsub.go @@ -212,14 +212,16 @@ func (pipe *Pipe) Cleanup() { }) } +type ChannelMatcher func(J *Channel) bool + type PubSub interface { - GetSubs(channels []*Channel) []*Sub - GetPubs(channels []*Channel) []*Pub + GetSubs(channels ChannelMatcher) []*Sub + GetPubs(channels ChannelMatcher) []*Pub GetChannels() []*Channel GetPipes() []*Pipe Pipe(pipes []*Pipe, pipeClient *PipeClient) error - Sub(channels []*Channel, sub *Sub) error - Pub(channel []*Channel, pub *Pub) error + Sub(channels ChannelMatcher, sub *Sub) error + Pub(channels ChannelMatcher, pub *Pub) error } type Cfg struct {