package pubsub
import (
"strings"
"sync"
"sync/atomic"
)
type Message struct {
Topic string
Payload interface{}
}
type Subscription struct {
ID uint64
Pattern string
Ch chan Message
}
type PubSub struct {
mu sync.RWMutex
subs map[uint64]*Subscription
counter uint64
}
func New() *PubSub {
return &PubSub{
subs: make(map[uint64]*Subscription),
}
}
func (ps *PubSub) Subscribe(pattern string, bufSize int) *Subscription {
id := atomic.AddUint64(&ps.counter, 1)
sub := &Subscription{
ID: id,
Pattern: pattern,
Ch: make(chan Message, bufSize),
}
ps.mu.Lock()
ps.subs[id] = sub
ps.mu.Unlock()
return sub
}
func (ps *PubSub) Unsubscribe(id uint64) {
ps.mu.Lock()
if sub, ok := ps.subs[id]; ok {
close(sub.Ch)
delete(ps.subs, id)
}
ps.mu.Unlock()
}
func (ps *PubSub) Publish(topic string, payload interface{}) {
msg := Message{Topic: topic, Payload: payload}
ps.mu.RLock()
defer ps.mu.RUnlock()
for _, sub := range ps.subs {
if matches(sub.Pattern, topic) {
select {
case sub.Ch <- msg:
default:
// Channel full, skip (or log warning)
}
}
}
}
// matches checks if pattern matches topic
// * matches exactly one segment
// ** matches zero or more segments
func matches(pattern, topic string) bool {
patternParts := strings.Split(pattern, ".")
topicParts := strings.Split(topic, ".")
return matchParts(patternParts, topicParts)
}
func matchParts(pattern, topic []string) bool {
if len(pattern) == 0 && len(topic) == 0 {
return true
}
if len(pattern) == 0 {
return false
}
if pattern[0] == "**" {
// ** matches zero or more segments
if len(pattern) == 1 {
return true // ** at end matches everything
}
// Try matching ** with different lengths
for i := 0; i <= len(topic); i++ {
if matchParts(pattern[1:], topic[i:]) {
return true
}
}
return false
}
if len(topic) == 0 {
return false
}
if pattern[0] == "*" || pattern[0] == topic[0] {
return matchParts(pattern[1:], topic[1:])
}
return false
}
func (ps *PubSub) Close() {
ps.mu.Lock()
defer ps.mu.Unlock()
for _, sub := range ps.subs {
close(sub.Ch)
}
ps.subs = make(map[uint64]*Subscription)
}