libs: Use go-statemachine

This commit is contained in:
Łukasz Magiera 2020-03-06 19:59:08 +01:00
parent 79237a9309
commit 3d1a5f4bf3
13 changed files with 12 additions and 635 deletions

View File

@ -9,7 +9,6 @@ import (
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/blocksync"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/lib/statemachine"
"github.com/filecoin-project/lotus/node/hello"
"github.com/filecoin-project/lotus/paychmgr"
"github.com/filecoin-project/lotus/storage/sealing"
@ -83,13 +82,4 @@ func main() {
fmt.Println(err)
os.Exit(1)
}
err = gen.WriteMapEncodersToFile("./lib/statemachine/cbor_gen.go", "statemachine",
statemachine.TestState{},
statemachine.TestEvent{},
)
if err != nil {
fmt.Printf("%+v\n", err)
os.Exit(1)
}
}

1
go.mod
View File

@ -22,6 +22,7 @@ require (
github.com/filecoin-project/go-padreader v0.0.0-20200210211231-548257017ca6
github.com/filecoin-project/go-paramfetch v0.0.2-0.20200218225740-47c639bab663
github.com/filecoin-project/go-sectorbuilder v0.0.2-0.20200306043753-5cdbe369b47d
github.com/filecoin-project/go-statemachine v0.0.0-20200226041606-2074af6d51d9
github.com/filecoin-project/go-statestore v0.1.0
github.com/filecoin-project/specs-actors v0.0.0-20200306043603-709a3ce21094
github.com/filecoin-project/specs-storage v0.0.0-20200303233430-1a5a408f7513

View File

@ -1,241 +0,0 @@
// Code generated by github.com/whyrusleeping/cbor-gen. DO NOT EDIT.
package statemachine
import (
"fmt"
"io"
cbg "github.com/whyrusleeping/cbor-gen"
xerrors "golang.org/x/xerrors"
)
var _ = xerrors.Errorf
func (t *TestState) MarshalCBOR(w io.Writer) error {
if t == nil {
_, err := w.Write(cbg.CborNull)
return err
}
if _, err := w.Write([]byte{162}); err != nil {
return err
}
// t.A (uint64) (uint64)
if len("A") > cbg.MaxLength {
return xerrors.Errorf("Value in field \"A\" was too long")
}
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajTextString, uint64(len("A")))); err != nil {
return err
}
if _, err := w.Write([]byte("A")); err != nil {
return err
}
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, uint64(t.A))); err != nil {
return err
}
// t.B (uint64) (uint64)
if len("B") > cbg.MaxLength {
return xerrors.Errorf("Value in field \"B\" was too long")
}
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajTextString, uint64(len("B")))); err != nil {
return err
}
if _, err := w.Write([]byte("B")); err != nil {
return err
}
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, uint64(t.B))); err != nil {
return err
}
return nil
}
func (t *TestState) UnmarshalCBOR(r io.Reader) error {
br := cbg.GetPeeker(r)
maj, extra, err := cbg.CborReadHeader(br)
if err != nil {
return err
}
if maj != cbg.MajMap {
return fmt.Errorf("cbor input should be of type map")
}
if extra > cbg.MaxLength {
return fmt.Errorf("TestState: map struct too large (%d)", extra)
}
var name string
n := extra
for i := uint64(0); i < n; i++ {
{
sval, err := cbg.ReadString(br)
if err != nil {
return err
}
name = string(sval)
}
switch name {
// t.A (uint64) (uint64)
case "A":
{
maj, extra, err = cbg.CborReadHeader(br)
if err != nil {
return err
}
if maj != cbg.MajUnsignedInt {
return fmt.Errorf("wrong type for uint64 field")
}
t.A = uint64(extra)
}
// t.B (uint64) (uint64)
case "B":
{
maj, extra, err = cbg.CborReadHeader(br)
if err != nil {
return err
}
if maj != cbg.MajUnsignedInt {
return fmt.Errorf("wrong type for uint64 field")
}
t.B = uint64(extra)
}
default:
return fmt.Errorf("unknown struct field %d: '%s'", i, name)
}
}
return nil
}
func (t *TestEvent) MarshalCBOR(w io.Writer) error {
if t == nil {
_, err := w.Write(cbg.CborNull)
return err
}
if _, err := w.Write([]byte{162}); err != nil {
return err
}
// t.A (string) (string)
if len("A") > cbg.MaxLength {
return xerrors.Errorf("Value in field \"A\" was too long")
}
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajTextString, uint64(len("A")))); err != nil {
return err
}
if _, err := w.Write([]byte("A")); err != nil {
return err
}
if len(t.A) > cbg.MaxLength {
return xerrors.Errorf("Value in field t.A was too long")
}
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajTextString, uint64(len(t.A)))); err != nil {
return err
}
if _, err := w.Write([]byte(t.A)); err != nil {
return err
}
// t.Val (uint64) (uint64)
if len("Val") > cbg.MaxLength {
return xerrors.Errorf("Value in field \"Val\" was too long")
}
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajTextString, uint64(len("Val")))); err != nil {
return err
}
if _, err := w.Write([]byte("Val")); err != nil {
return err
}
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, uint64(t.Val))); err != nil {
return err
}
return nil
}
func (t *TestEvent) UnmarshalCBOR(r io.Reader) error {
br := cbg.GetPeeker(r)
maj, extra, err := cbg.CborReadHeader(br)
if err != nil {
return err
}
if maj != cbg.MajMap {
return fmt.Errorf("cbor input should be of type map")
}
if extra > cbg.MaxLength {
return fmt.Errorf("TestEvent: map struct too large (%d)", extra)
}
var name string
n := extra
for i := uint64(0); i < n; i++ {
{
sval, err := cbg.ReadString(br)
if err != nil {
return err
}
name = string(sval)
}
switch name {
// t.A (string) (string)
case "A":
{
sval, err := cbg.ReadString(br)
if err != nil {
return err
}
t.A = string(sval)
}
// t.Val (uint64) (uint64)
case "Val":
{
maj, extra, err = cbg.CborReadHeader(br)
if err != nil {
return err
}
if maj != cbg.MajUnsignedInt {
return fmt.Errorf("wrong type for uint64 field")
}
t.Val = uint64(extra)
}
default:
return fmt.Errorf("unknown struct field %d: '%s'", i, name)
}
}
return nil
}

View File

@ -1,16 +0,0 @@
package statemachine
import "context"
type Context struct {
ctx context.Context
send func(evt interface{}) error
}
func (ctx *Context) Context() context.Context {
return ctx.ctx
}
func (ctx *Context) Send(evt interface{}) error {
return ctx.send(evt)
}

View File

@ -1,116 +0,0 @@
package statemachine
import (
"context"
"reflect"
"sync"
"github.com/filecoin-project/go-statestore"
"github.com/ipfs/go-datastore"
"golang.org/x/xerrors"
)
type StateHandler interface {
// returns
Plan(events []Event, user interface{}) (interface{}, error)
}
// StateGroup manages a group of state machines sharing the same logic
type StateGroup struct {
sts *statestore.StateStore
hnd StateHandler
stateType reflect.Type
lk sync.Mutex
sms map[datastore.Key]*StateMachine
}
// stateType: T - (MyStateStruct{})
func New(ds datastore.Datastore, hnd StateHandler, stateType interface{}) *StateGroup {
return &StateGroup{
sts: statestore.New(ds),
hnd: hnd,
stateType: reflect.TypeOf(stateType),
sms: map[datastore.Key]*StateMachine{},
}
}
// Send sends an event to machine identified by `id`.
// `evt` is going to be passed into StateHandler.Planner, in the events[].User param
//
// If a state machine with the specified id doesn't exits, it's created, and it's
// state is set to zero-value of stateType provided in group constructor
func (s *StateGroup) Send(id interface{}, evt interface{}) (err error) {
s.lk.Lock()
defer s.lk.Unlock()
sm, exist := s.sms[statestore.ToKey(id)]
if !exist {
sm, err = s.loadOrCreate(id)
if err != nil {
return xerrors.Errorf("loadOrCreate state: %w", err)
}
s.sms[statestore.ToKey(id)] = sm
}
return sm.send(Event{User: evt})
}
func (s *StateGroup) loadOrCreate(name interface{}) (*StateMachine, error) {
exists, err := s.sts.Has(name)
if err != nil {
return nil, xerrors.Errorf("failed to check if state for %v exists: %w", name, err)
}
if !exists {
userState := reflect.New(s.stateType).Interface()
err = s.sts.Begin(name, userState)
if err != nil {
return nil, xerrors.Errorf("saving initial state: %w", err)
}
}
res := &StateMachine{
planner: s.hnd.Plan,
eventsIn: make(chan Event),
name: name,
st: s.sts.Get(name),
stateType: s.stateType,
stageDone: make(chan struct{}),
closing: make(chan struct{}),
closed: make(chan struct{}),
}
go res.run()
return res, nil
}
// Stop stops all state machines in this group
func (s *StateGroup) Stop(ctx context.Context) error {
s.lk.Lock()
defer s.lk.Unlock()
for _, sm := range s.sms {
if err := sm.stop(ctx); err != nil {
return err
}
}
return nil
}
// List outputs states of all state machines in this group
// out: *[]StateT
func (s *StateGroup) List(out interface{}) error {
return s.sts.List(out)
}
// Get gets state for a single state machine
func (s *StateGroup) Get(id interface{}) *statestore.StoredState {
return s.sts.Get(id)
}

View File

@ -1,127 +0,0 @@
package statemachine
import (
"context"
"reflect"
"sync/atomic"
"github.com/filecoin-project/go-statestore"
logging "github.com/ipfs/go-log"
)
var log = logging.Logger("evtsm")
type Event struct {
User interface{}
}
// TODO: This probably should be returning an int indicating partial event processing
// (or something like errPartial(nEvents))
// returns func(ctx Context, st <T>) (func(*<T>), error), where <T> is the typeOf(User) param
type Planner func(events []Event, user interface{}) (interface{}, error)
type StateMachine struct {
planner Planner
eventsIn chan Event
name interface{}
st *statestore.StoredState
stateType reflect.Type
stageDone chan struct{}
closing chan struct{}
closed chan struct{}
busy int32
}
func (fsm *StateMachine) run() {
defer close(fsm.closed)
var pendingEvents []Event
for {
// NOTE: This requires at least one event to be sent to trigger a stage
// This means that after restarting the state machine users of this
// code must send a 'restart' event
select {
case evt := <-fsm.eventsIn:
pendingEvents = append(pendingEvents, evt)
case <-fsm.stageDone:
if len(pendingEvents) == 0 {
continue
}
case <-fsm.closing:
return
}
if atomic.CompareAndSwapInt32(&fsm.busy, 0, 1) {
var nextStep interface{}
var ustate interface{}
err := fsm.mutateUser(func(user interface{}) (err error) {
nextStep, err = fsm.planner(pendingEvents, user)
ustate = user
return err
})
if err != nil {
log.Errorf("Executing event planner failed: %+v", err)
return
}
pendingEvents = nil
if nextStep == nil {
continue
}
ctx := Context{
ctx: context.TODO(),
send: func(evt interface{}) error {
return fsm.send(Event{User: evt})
},
}
go func() {
res := reflect.ValueOf(nextStep).Call([]reflect.Value{reflect.ValueOf(ctx), reflect.ValueOf(ustate).Elem()})
if res[0].Interface() != nil {
log.Errorf("executing step: %+v", res[0].Interface().(error)) // TODO: propagate top level
return
}
atomic.StoreInt32(&fsm.busy, 0)
fsm.stageDone <- struct{}{}
}()
}
}
}
func (fsm *StateMachine) mutateUser(cb func(user interface{}) error) error {
mutt := reflect.FuncOf([]reflect.Type{reflect.PtrTo(fsm.stateType)}, []reflect.Type{reflect.TypeOf(new(error)).Elem()}, false)
mutf := reflect.MakeFunc(mutt, func(args []reflect.Value) (results []reflect.Value) {
err := cb(args[0].Interface())
return []reflect.Value{reflect.ValueOf(&err).Elem()}
})
return fsm.st.Mutate(mutf.Interface())
}
func (fsm *StateMachine) send(evt Event) error {
fsm.eventsIn <- evt // TODO: ctx, at least
return nil
}
func (fsm *StateMachine) stop(ctx context.Context) error {
close(fsm.closing)
select {
case <-fsm.closed:
return nil
case <-ctx.Done():
return ctx.Err()
}
}

View File

@ -1,105 +0,0 @@
package statemachine
import (
"context"
"testing"
"github.com/ipfs/go-datastore"
logging "github.com/ipfs/go-log"
"gotest.tools/assert"
)
func init() {
logging.SetLogLevel("*", "INFO")
}
type testHandler struct {
t *testing.T
proceed chan struct{}
done chan struct{}
}
func (t *testHandler) Plan(events []Event, state interface{}) (interface{}, error) {
return t.plan(events, state.(*TestState))
}
func (t *testHandler) plan(events []Event, state *TestState) (func(Context, TestState) error, error) {
for _, event := range events {
e := event.User.(*TestEvent)
switch e.A {
case "restart":
case "start":
state.A = 1
case "b":
state.A = 2
state.B = e.Val
}
}
switch state.A {
case 1:
return t.step0, nil
case 2:
return t.step1, nil
default:
t.t.Fatal(state.A)
}
panic("how?")
}
func (t *testHandler) step0(ctx Context, st TestState) error {
ctx.Send(&TestEvent{A: "b", Val: 55})
<-t.proceed
return nil
}
func (t *testHandler) step1(ctx Context, st TestState) error {
assert.Equal(t.t, uint64(2), st.A)
close(t.done)
return nil
}
func TestBasic(t *testing.T) {
for i := 0; i < 1000; i++ { // run a few times to expose any races
ds := datastore.NewMapDatastore()
th := &testHandler{t: t, done: make(chan struct{}), proceed: make(chan struct{})}
close(th.proceed)
smm := New(ds, th, TestState{})
if err := smm.Send(uint64(2), &TestEvent{A: "start"}); err != nil {
t.Fatalf("%+v", err)
}
<-th.done
}
}
func TestPersist(t *testing.T) {
for i := 0; i < 1000; i++ { // run a few times to expose any races
ds := datastore.NewMapDatastore()
th := &testHandler{t: t, done: make(chan struct{}), proceed: make(chan struct{})}
smm := New(ds, th, TestState{})
if err := smm.Send(uint64(2), &TestEvent{A: "start"}); err != nil {
t.Fatalf("%+v", err)
}
if err := smm.Stop(context.Background()); err != nil {
t.Fatal(err)
return
}
smm = New(ds, th, TestState{})
if err := smm.Send(uint64(2), &TestEvent{A: "restart"}); err != nil {
t.Fatalf("%+v", err)
}
close(th.proceed)
<-th.done
}
}
var _ StateHandler = &testHandler{}

View File

@ -1,11 +0,0 @@
package statemachine
type TestState struct {
A uint64
B uint64
}
type TestEvent struct {
A string
Val uint64
}

View File

@ -6,17 +6,18 @@ import (
"reflect"
"time"
"github.com/filecoin-project/specs-actors/actors/abi"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-statemachine"
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/lib/statemachine"
)
func (m *Sealing) Plan(events []statemachine.Event, user interface{}) (interface{}, error) {
func (m *Sealing) Plan(events []statemachine.Event, user interface{}) (interface{}, uint64, error) {
next, err := m.plan(events, user.(*SectorInfo))
if err != nil || next == nil {
return nil, err
return nil, 0, err
}
return func(ctx statemachine.Context, si SectorInfo) error {
@ -27,7 +28,7 @@ func (m *Sealing) Plan(events []statemachine.Event, user interface{}) (interface
}
return nil
}, nil
}, uint64(len(events)), nil // TODO: This processed event count is not very correct
}
var fsmPlanners = []func(events []statemachine.Event, state *SectorInfo) error{

View File

@ -6,8 +6,9 @@ import (
logging "github.com/ipfs/go-log/v2"
"github.com/stretchr/testify/require"
"github.com/filecoin-project/go-statemachine"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/lib/statemachine"
)
func init() {

View File

@ -12,6 +12,7 @@ import (
"golang.org/x/xerrors"
"github.com/filecoin-project/go-padreader"
"github.com/filecoin-project/go-statemachine"
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/specs-actors/actors/builtin/miner"
"github.com/filecoin-project/specs-actors/actors/crypto"
@ -20,7 +21,6 @@ import (
"github.com/filecoin-project/lotus/chain/events"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/lib/statemachine"
"github.com/filecoin-project/lotus/storage/sealmgr"
)

View File

@ -5,6 +5,7 @@ import (
"github.com/filecoin-project/specs-actors/actors/crypto"
"github.com/filecoin-project/go-statemachine"
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/specs-actors/actors/builtin"
"github.com/filecoin-project/specs-actors/actors/builtin/miner"
@ -14,7 +15,6 @@ import (
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/actors"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/lib/statemachine"
)
func (m *Sealing) handlePacking(ctx statemachine.Context, sector SectorInfo) error {

View File

@ -4,6 +4,7 @@ import (
"bytes"
"time"
"github.com/filecoin-project/go-statemachine"
"github.com/filecoin-project/specs-actors/actors/builtin/miner"
"github.com/filecoin-project/specs-actors/actors/util/adt"
"golang.org/x/xerrors"
@ -12,7 +13,6 @@ import (
"github.com/filecoin-project/lotus/api/apibstore"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/lib/statemachine"
)
const minRetryTime = 1 * time.Minute