Observer
The observer pattern allows an observable to emit events to observers. The emission can either be done through channels or invoking registered callbacks.
Conventional Approach
First let's define what are Observer
and Observable
as interfaces.
type (
Event struct {
Data string
}
Observer interface {
OnNotify(Event)
}
Observable interface {
Register(Observer)
Unregister(Observer)
Notify(Event)
}
)
Now we can create a Reporter
like a news reporter that broadcast news to people.
type Reporter struct {
observers map[Observer]struct{}
}
func (r *Reporter) Register(ob Observer) {
r.observers[ob] = struct{}{}
}
func (r *Reporter) Unregister(ob Observer) {
delete(r.observers, ob)
}
func (r *Reporter) Notify(e Event) {
for ob := range r.observers {
ob.OnNotify(e)
}
}
People are acting as Audience
to the event that reporter reports.
type Audience struct {
name string
}
func (a *Audience) OnNotify(e Event) {
fmt.Printf("%s received breaking news! %s\n", a.name, e.Data)
}
Concurrent Approach
The problem with above aproach is that it is not concurrent. The observable calls OnNotify
on each observer. What if OnNotify
is an expensive function? Then there will be a significant delay to relay each event. We want to make it concurrent with room for parallelism.
type Observable interface {
Register(interface{})
Unregister(interface{})
Notify(data []byte)
}
Now we can register variable of any data type to this observable. We implement the interface like before with few minor modifications. Notice that Notify
is no longer calling a callback, it is simply passing data to observers through channel. It is up to the observers to handle the data.
func NewReporter() Observable {
return &Reporter{
observers: make(map[interface{}]chan []byte),
}
}
type Reporter struct {
observers map[interface{}]chan []byte
}
func (r *Reporter) Register(ob interface{}) chan []byte {
r.observers[ob] = make(chan []byte)
return r.observers[ob]
}
func (r *Reporter) Unregister(ob interface{}) {
delete(r.observers, ob)
}
func (r *Reporter) Notify(data []byte) {
for _, ch := range r.observers {
select {
case ch <- data:
default:
}
}
}
Finally let's put it to use.
type Person struct {
Name string
}
func main() {
ob := NewReporter() // This is our observable
p := &Person{Name: "Calvin"} // This is our observer
ch := ob.Register(p)
go func() {
for data := range ch {
fmt.Println(p.Name, "received data", string(data))
}
}()
timer := time.NewTimer(10 * time.Second)
ticker := time.NewTicker(200 * time.Millisecond)
for {
select {
case <-timer.C:
return
case <-ticker.C:
ob.Notify([]byte("hello there"))
}
}
}
Last updated