276 lines
6.9 KiB
Go
276 lines
6.9 KiB
Go
|
// Copyright 2016 The go-ethereum Authors
|
||
|
// This file is part of the go-ethereum library.
|
||
|
//
|
||
|
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||
|
// it under the terms of the GNU Lesser General Public License as published by
|
||
|
// the Free Software Foundation, either version 3 of the License, or
|
||
|
// (at your option) any later version.
|
||
|
//
|
||
|
// The go-ethereum library is distributed in the hope that it will be useful,
|
||
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||
|
// GNU Lesser General Public License for more details.
|
||
|
//
|
||
|
// You should have received a copy of the GNU Lesser General Public License
|
||
|
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
||
|
|
||
|
package event
|
||
|
|
||
|
import (
|
||
|
"context"
|
||
|
"sync"
|
||
|
"time"
|
||
|
|
||
|
"github.com/ethereum/go-ethereum/common/mclock"
|
||
|
)
|
||
|
|
||
|
// Subscription represents a stream of events. The carrier of the events is typically a
|
||
|
// channel, but isn't part of the interface.
|
||
|
//
|
||
|
// Subscriptions can fail while established. Failures are reported through an error
|
||
|
// channel. It receives a value if there is an issue with the subscription (e.g. the
|
||
|
// network connection delivering the events has been closed). Only one value will ever be
|
||
|
// sent.
|
||
|
//
|
||
|
// The error channel is closed when the subscription ends successfully (i.e. when the
|
||
|
// source of events is closed). It is also closed when Unsubscribe is called.
|
||
|
//
|
||
|
// The Unsubscribe method cancels the sending of events. You must call Unsubscribe in all
|
||
|
// cases to ensure that resources related to the subscription are released. It can be
|
||
|
// called any number of times.
|
||
|
type Subscription interface {
|
||
|
Err() <-chan error // returns the error channel
|
||
|
Unsubscribe() // cancels sending of events, closing the error channel
|
||
|
}
|
||
|
|
||
|
// NewSubscription runs a producer function as a subscription in a new goroutine. The
|
||
|
// channel given to the producer is closed when Unsubscribe is called. If fn returns an
|
||
|
// error, it is sent on the subscription's error channel.
|
||
|
func NewSubscription(producer func(<-chan struct{}) error) Subscription {
|
||
|
s := &funcSub{unsub: make(chan struct{}), err: make(chan error, 1)}
|
||
|
go func() {
|
||
|
defer close(s.err)
|
||
|
err := producer(s.unsub)
|
||
|
s.mu.Lock()
|
||
|
defer s.mu.Unlock()
|
||
|
if !s.unsubscribed {
|
||
|
if err != nil {
|
||
|
s.err <- err
|
||
|
}
|
||
|
s.unsubscribed = true
|
||
|
}
|
||
|
}()
|
||
|
return s
|
||
|
}
|
||
|
|
||
|
type funcSub struct {
|
||
|
unsub chan struct{}
|
||
|
err chan error
|
||
|
mu sync.Mutex
|
||
|
unsubscribed bool
|
||
|
}
|
||
|
|
||
|
func (s *funcSub) Unsubscribe() {
|
||
|
s.mu.Lock()
|
||
|
if s.unsubscribed {
|
||
|
s.mu.Unlock()
|
||
|
return
|
||
|
}
|
||
|
s.unsubscribed = true
|
||
|
close(s.unsub)
|
||
|
s.mu.Unlock()
|
||
|
// Wait for producer shutdown.
|
||
|
<-s.err
|
||
|
}
|
||
|
|
||
|
func (s *funcSub) Err() <-chan error {
|
||
|
return s.err
|
||
|
}
|
||
|
|
||
|
// Resubscribe calls fn repeatedly to keep a subscription established. When the
|
||
|
// subscription is established, Resubscribe waits for it to fail and calls fn again. This
|
||
|
// process repeats until Unsubscribe is called or the active subscription ends
|
||
|
// successfully.
|
||
|
//
|
||
|
// Resubscribe applies backoff between calls to fn. The time between calls is adapted
|
||
|
// based on the error rate, but will never exceed backoffMax.
|
||
|
func Resubscribe(backoffMax time.Duration, fn ResubscribeFunc) Subscription {
|
||
|
s := &resubscribeSub{
|
||
|
waitTime: backoffMax / 10,
|
||
|
backoffMax: backoffMax,
|
||
|
fn: fn,
|
||
|
err: make(chan error),
|
||
|
unsub: make(chan struct{}),
|
||
|
}
|
||
|
go s.loop()
|
||
|
return s
|
||
|
}
|
||
|
|
||
|
// A ResubscribeFunc attempts to establish a subscription.
|
||
|
type ResubscribeFunc func(context.Context) (Subscription, error)
|
||
|
|
||
|
type resubscribeSub struct {
|
||
|
fn ResubscribeFunc
|
||
|
err chan error
|
||
|
unsub chan struct{}
|
||
|
unsubOnce sync.Once
|
||
|
lastTry mclock.AbsTime
|
||
|
waitTime, backoffMax time.Duration
|
||
|
}
|
||
|
|
||
|
func (s *resubscribeSub) Unsubscribe() {
|
||
|
s.unsubOnce.Do(func() {
|
||
|
s.unsub <- struct{}{}
|
||
|
<-s.err
|
||
|
})
|
||
|
}
|
||
|
|
||
|
func (s *resubscribeSub) Err() <-chan error {
|
||
|
return s.err
|
||
|
}
|
||
|
|
||
|
func (s *resubscribeSub) loop() {
|
||
|
defer close(s.err)
|
||
|
var done bool
|
||
|
for !done {
|
||
|
sub := s.subscribe()
|
||
|
if sub == nil {
|
||
|
break
|
||
|
}
|
||
|
done = s.waitForError(sub)
|
||
|
sub.Unsubscribe()
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (s *resubscribeSub) subscribe() Subscription {
|
||
|
subscribed := make(chan error)
|
||
|
var sub Subscription
|
||
|
retry:
|
||
|
for {
|
||
|
s.lastTry = mclock.Now()
|
||
|
ctx, cancel := context.WithCancel(context.Background())
|
||
|
go func() {
|
||
|
rsub, err := s.fn(ctx)
|
||
|
sub = rsub
|
||
|
subscribed <- err
|
||
|
}()
|
||
|
select {
|
||
|
case err := <-subscribed:
|
||
|
cancel()
|
||
|
if err != nil {
|
||
|
// Subscribing failed, wait before launching the next try.
|
||
|
if s.backoffWait() {
|
||
|
return nil
|
||
|
}
|
||
|
continue retry
|
||
|
}
|
||
|
if sub == nil {
|
||
|
panic("event: ResubscribeFunc returned nil subscription and no error")
|
||
|
}
|
||
|
return sub
|
||
|
case <-s.unsub:
|
||
|
cancel()
|
||
|
return nil
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (s *resubscribeSub) waitForError(sub Subscription) bool {
|
||
|
defer sub.Unsubscribe()
|
||
|
select {
|
||
|
case err := <-sub.Err():
|
||
|
return err == nil
|
||
|
case <-s.unsub:
|
||
|
return true
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (s *resubscribeSub) backoffWait() bool {
|
||
|
if time.Duration(mclock.Now()-s.lastTry) > s.backoffMax {
|
||
|
s.waitTime = s.backoffMax / 10
|
||
|
} else {
|
||
|
s.waitTime *= 2
|
||
|
if s.waitTime > s.backoffMax {
|
||
|
s.waitTime = s.backoffMax
|
||
|
}
|
||
|
}
|
||
|
|
||
|
t := time.NewTimer(s.waitTime)
|
||
|
defer t.Stop()
|
||
|
select {
|
||
|
case <-t.C:
|
||
|
return false
|
||
|
case <-s.unsub:
|
||
|
return true
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// SubscriptionScope provides a facility to unsubscribe multiple subscriptions at once.
|
||
|
//
|
||
|
// For code that handle more than one subscription, a scope can be used to conveniently
|
||
|
// unsubscribe all of them with a single call. The example demonstrates a typical use in a
|
||
|
// larger program.
|
||
|
//
|
||
|
// The zero value is ready to use.
|
||
|
type SubscriptionScope struct {
|
||
|
mu sync.Mutex
|
||
|
subs map[*scopeSub]struct{}
|
||
|
closed bool
|
||
|
}
|
||
|
|
||
|
type scopeSub struct {
|
||
|
sc *SubscriptionScope
|
||
|
s Subscription
|
||
|
}
|
||
|
|
||
|
// Track starts tracking a subscription. If the scope is closed, Track returns nil. The
|
||
|
// returned subscription is a wrapper. Unsubscribing the wrapper removes it from the
|
||
|
// scope.
|
||
|
func (sc *SubscriptionScope) Track(s Subscription) Subscription {
|
||
|
sc.mu.Lock()
|
||
|
defer sc.mu.Unlock()
|
||
|
if sc.closed {
|
||
|
return nil
|
||
|
}
|
||
|
if sc.subs == nil {
|
||
|
sc.subs = make(map[*scopeSub]struct{})
|
||
|
}
|
||
|
ss := &scopeSub{sc, s}
|
||
|
sc.subs[ss] = struct{}{}
|
||
|
return ss
|
||
|
}
|
||
|
|
||
|
// Close calls Unsubscribe on all tracked subscriptions and prevents further additions to
|
||
|
// the tracked set. Calls to Track after Close return nil.
|
||
|
func (sc *SubscriptionScope) Close() {
|
||
|
sc.mu.Lock()
|
||
|
defer sc.mu.Unlock()
|
||
|
if sc.closed {
|
||
|
return
|
||
|
}
|
||
|
sc.closed = true
|
||
|
for s := range sc.subs {
|
||
|
s.s.Unsubscribe()
|
||
|
}
|
||
|
sc.subs = nil
|
||
|
}
|
||
|
|
||
|
// Count returns the number of tracked subscriptions.
|
||
|
// It is meant to be used for debugging.
|
||
|
func (sc *SubscriptionScope) Count() int {
|
||
|
sc.mu.Lock()
|
||
|
defer sc.mu.Unlock()
|
||
|
return len(sc.subs)
|
||
|
}
|
||
|
|
||
|
func (s *scopeSub) Unsubscribe() {
|
||
|
s.s.Unsubscribe()
|
||
|
s.sc.mu.Lock()
|
||
|
defer s.sc.mu.Unlock()
|
||
|
delete(s.sc.subs, s)
|
||
|
}
|
||
|
|
||
|
func (s *scopeSub) Err() <-chan error {
|
||
|
return s.s.Err()
|
||
|
}
|