1. 7
    1. 10

      I think I have a pretty good one.

      Basically,

      type Broker[T any] struct { ... }
      func (b *Broker[T]) Subscribe(c chan<- T, allow func(T) bool) error
      func (b *Broker[T]) Unsubscribe(c chan<- T) (Stats, error)
      func (b *Broker[T]) Publish(v T) Stats
      type Stats struct { Skips, Sends, Drops uint64 }
      

      That API is the result of many iterations over many years. Particularly critical is how subscribers are modeled as recv-only chans of type T, which callers are responsible for constructing and providing as input.

      1. 2

        Why the allow filter?

        1. 1

          Essentially just a first-pass filter to avoid sending unnecessary values. Definitely relies on some trust re: the caller.

          1. 1

            I think it allows for much nicer statistics and little bit faster processing (the allow function is called synchronously, which prevents the goroutine communication overhad - note that the caller can mess this up, if the allow function takes a while).