lotus/lib/statemachine/group.go

117 lines
2.6 KiB
Go
Raw Normal View History

2020-01-13 17:44:59 +00:00
package statemachine
2020-01-06 21:01:49 +00:00
import (
"context"
"reflect"
"sync"
"github.com/filecoin-project/go-statestore"
2020-01-06 21:01:49 +00:00
"github.com/ipfs/go-datastore"
"golang.org/x/xerrors"
)
type StateHandler interface {
// returns
Plan(events []Event, user interface{}) (interface{}, error)
}
2020-01-13 17:44:59 +00:00
// StateGroup manages a group of state machines sharing the same logic
type StateGroup struct {
2020-01-06 21:01:49 +00:00
sts *statestore.StateStore
hnd StateHandler
stateType reflect.Type
lk sync.Mutex
2020-01-13 17:44:59 +00:00
sms map[datastore.Key]*StateMachine
2020-01-06 21:01:49 +00:00
}
// stateType: T - (reflect.TypeOf(MyStateStruct{}))
2020-01-13 17:44:59 +00:00
func New(ds datastore.Datastore, hnd StateHandler, stateType reflect.Type) *StateGroup {
return &StateGroup{
2020-01-06 21:01:49 +00:00
sts: statestore.New(ds),
hnd: hnd,
stateType: stateType,
2020-01-13 17:44:59 +00:00
sms: map[datastore.Key]*StateMachine{},
2020-01-06 21:01:49 +00:00
}
}
2020-01-13 17:44:59 +00:00
// Send sends an event to machine identified by `id`.
// `evt` is going to be passed into StateHandler.Planner, in the events[].User param
//
// If a state machine with the specified id doesn't exits, it's created, and it's
// state is set to zero-value of stateType provided in group constructor
func (s *StateGroup) Send(id interface{}, evt interface{}) (err error) {
2020-01-06 21:01:49 +00:00
s.lk.Lock()
defer s.lk.Unlock()
2020-01-13 17:44:59 +00:00
sm, exist := s.sms[statestore.ToKey(id)]
2020-01-06 21:01:49 +00:00
if !exist {
2020-01-13 17:44:59 +00:00
sm, err = s.loadOrCreate(id)
2020-01-06 21:01:49 +00:00
if err != nil {
return xerrors.Errorf("loadOrCreate state: %w", err)
}
2020-01-13 17:44:59 +00:00
s.sms[statestore.ToKey(id)] = sm
2020-01-06 21:01:49 +00:00
}
return sm.send(Event{User: evt})
}
2020-01-13 17:44:59 +00:00
func (s *StateGroup) loadOrCreate(name interface{}) (*StateMachine, error) {
2020-01-06 21:01:49 +00:00
exists, err := s.sts.Has(name)
if err != nil {
return nil, xerrors.Errorf("failed to check if state for %v exists: %w", name, err)
}
if !exists {
userState := reflect.New(s.stateType).Interface()
err = s.sts.Begin(name, userState)
if err != nil {
return nil, xerrors.Errorf("saving initial state: %w", err)
}
}
2020-01-13 17:44:59 +00:00
res := &StateMachine{
2020-01-06 21:01:49 +00:00
planner: s.hnd.Plan,
eventsIn: make(chan Event),
name: name,
st: s.sts.Get(name),
stateType: s.stateType,
stageDone: make(chan struct{}),
closing: make(chan struct{}),
closed: make(chan struct{}),
}
go res.run()
return res, nil
}
2020-01-13 17:44:59 +00:00
// Stop stops all state machines in this group
func (s *StateGroup) Stop(ctx context.Context) error {
2020-01-06 21:01:49 +00:00
s.lk.Lock()
defer s.lk.Unlock()
for _, sm := range s.sms {
if err := sm.stop(ctx); err != nil {
return err
}
}
return nil
}
2020-01-10 02:11:00 +00:00
2020-01-13 17:44:59 +00:00
// List outputs states of all state machines in this group
// out: *[]StateT
func (s *StateGroup) List(out interface{}) error {
2020-01-10 02:11:00 +00:00
return s.sts.List(out)
}
2020-01-13 17:44:59 +00:00
// Get gets state for a single state machine
func (s *StateGroup) Get(id interface{}) *statestore.StoredState {
return s.sts.Get(id)
2020-01-10 02:11:00 +00:00
}