diff --git a/gen/main.go b/gen/main.go index 03b82ad5b..6e464e79a 100644 --- a/gen/main.go +++ b/gen/main.go @@ -9,7 +9,6 @@ import ( "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/chain/blocksync" "github.com/filecoin-project/lotus/chain/types" - "github.com/filecoin-project/lotus/lib/statemachine" "github.com/filecoin-project/lotus/node/hello" "github.com/filecoin-project/lotus/paychmgr" "github.com/filecoin-project/lotus/storage/sealing" @@ -83,13 +82,4 @@ func main() { fmt.Println(err) os.Exit(1) } - - err = gen.WriteMapEncodersToFile("./lib/statemachine/cbor_gen.go", "statemachine", - statemachine.TestState{}, - statemachine.TestEvent{}, - ) - if err != nil { - fmt.Printf("%+v\n", err) - os.Exit(1) - } } diff --git a/go.mod b/go.mod index c22687f74..1e7953a48 100644 --- a/go.mod +++ b/go.mod @@ -22,6 +22,7 @@ require ( github.com/filecoin-project/go-padreader v0.0.0-20200210211231-548257017ca6 github.com/filecoin-project/go-paramfetch v0.0.2-0.20200218225740-47c639bab663 github.com/filecoin-project/go-sectorbuilder v0.0.2-0.20200306043753-5cdbe369b47d + github.com/filecoin-project/go-statemachine v0.0.0-20200226041606-2074af6d51d9 github.com/filecoin-project/go-statestore v0.1.0 github.com/filecoin-project/specs-actors v0.0.0-20200306043603-709a3ce21094 github.com/filecoin-project/specs-storage v0.0.0-20200303233430-1a5a408f7513 diff --git a/lib/statemachine/cbor_gen.go b/lib/statemachine/cbor_gen.go deleted file mode 100644 index 4bebe01d0..000000000 --- a/lib/statemachine/cbor_gen.go +++ /dev/null @@ -1,241 +0,0 @@ -// Code generated by github.com/whyrusleeping/cbor-gen. DO NOT EDIT. - -package statemachine - -import ( - "fmt" - "io" - - cbg "github.com/whyrusleeping/cbor-gen" - xerrors "golang.org/x/xerrors" -) - -var _ = xerrors.Errorf - -func (t *TestState) MarshalCBOR(w io.Writer) error { - if t == nil { - _, err := w.Write(cbg.CborNull) - return err - } - if _, err := w.Write([]byte{162}); err != nil { - return err - } - - // t.A (uint64) (uint64) - if len("A") > cbg.MaxLength { - return xerrors.Errorf("Value in field \"A\" was too long") - } - - if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajTextString, uint64(len("A")))); err != nil { - return err - } - if _, err := w.Write([]byte("A")); err != nil { - return err - } - - if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, uint64(t.A))); err != nil { - return err - } - - // t.B (uint64) (uint64) - if len("B") > cbg.MaxLength { - return xerrors.Errorf("Value in field \"B\" was too long") - } - - if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajTextString, uint64(len("B")))); err != nil { - return err - } - if _, err := w.Write([]byte("B")); err != nil { - return err - } - - if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, uint64(t.B))); err != nil { - return err - } - - return nil -} - -func (t *TestState) UnmarshalCBOR(r io.Reader) error { - br := cbg.GetPeeker(r) - - maj, extra, err := cbg.CborReadHeader(br) - if err != nil { - return err - } - if maj != cbg.MajMap { - return fmt.Errorf("cbor input should be of type map") - } - - if extra > cbg.MaxLength { - return fmt.Errorf("TestState: map struct too large (%d)", extra) - } - - var name string - n := extra - - for i := uint64(0); i < n; i++ { - - { - sval, err := cbg.ReadString(br) - if err != nil { - return err - } - - name = string(sval) - } - - switch name { - // t.A (uint64) (uint64) - case "A": - - { - - maj, extra, err = cbg.CborReadHeader(br) - if err != nil { - return err - } - if maj != cbg.MajUnsignedInt { - return fmt.Errorf("wrong type for uint64 field") - } - t.A = uint64(extra) - - } - // t.B (uint64) (uint64) - case "B": - - { - - maj, extra, err = cbg.CborReadHeader(br) - if err != nil { - return err - } - if maj != cbg.MajUnsignedInt { - return fmt.Errorf("wrong type for uint64 field") - } - t.B = uint64(extra) - - } - - default: - return fmt.Errorf("unknown struct field %d: '%s'", i, name) - } - } - - return nil -} -func (t *TestEvent) MarshalCBOR(w io.Writer) error { - if t == nil { - _, err := w.Write(cbg.CborNull) - return err - } - if _, err := w.Write([]byte{162}); err != nil { - return err - } - - // t.A (string) (string) - if len("A") > cbg.MaxLength { - return xerrors.Errorf("Value in field \"A\" was too long") - } - - if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajTextString, uint64(len("A")))); err != nil { - return err - } - if _, err := w.Write([]byte("A")); err != nil { - return err - } - - if len(t.A) > cbg.MaxLength { - return xerrors.Errorf("Value in field t.A was too long") - } - - if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajTextString, uint64(len(t.A)))); err != nil { - return err - } - if _, err := w.Write([]byte(t.A)); err != nil { - return err - } - - // t.Val (uint64) (uint64) - if len("Val") > cbg.MaxLength { - return xerrors.Errorf("Value in field \"Val\" was too long") - } - - if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajTextString, uint64(len("Val")))); err != nil { - return err - } - if _, err := w.Write([]byte("Val")); err != nil { - return err - } - - if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, uint64(t.Val))); err != nil { - return err - } - - return nil -} - -func (t *TestEvent) UnmarshalCBOR(r io.Reader) error { - br := cbg.GetPeeker(r) - - maj, extra, err := cbg.CborReadHeader(br) - if err != nil { - return err - } - if maj != cbg.MajMap { - return fmt.Errorf("cbor input should be of type map") - } - - if extra > cbg.MaxLength { - return fmt.Errorf("TestEvent: map struct too large (%d)", extra) - } - - var name string - n := extra - - for i := uint64(0); i < n; i++ { - - { - sval, err := cbg.ReadString(br) - if err != nil { - return err - } - - name = string(sval) - } - - switch name { - // t.A (string) (string) - case "A": - - { - sval, err := cbg.ReadString(br) - if err != nil { - return err - } - - t.A = string(sval) - } - // t.Val (uint64) (uint64) - case "Val": - - { - - maj, extra, err = cbg.CborReadHeader(br) - if err != nil { - return err - } - if maj != cbg.MajUnsignedInt { - return fmt.Errorf("wrong type for uint64 field") - } - t.Val = uint64(extra) - - } - - default: - return fmt.Errorf("unknown struct field %d: '%s'", i, name) - } - } - - return nil -} diff --git a/lib/statemachine/context.go b/lib/statemachine/context.go deleted file mode 100644 index 2fb9b8f00..000000000 --- a/lib/statemachine/context.go +++ /dev/null @@ -1,16 +0,0 @@ -package statemachine - -import "context" - -type Context struct { - ctx context.Context - send func(evt interface{}) error -} - -func (ctx *Context) Context() context.Context { - return ctx.ctx -} - -func (ctx *Context) Send(evt interface{}) error { - return ctx.send(evt) -} diff --git a/lib/statemachine/group.go b/lib/statemachine/group.go deleted file mode 100644 index 2c8562955..000000000 --- a/lib/statemachine/group.go +++ /dev/null @@ -1,116 +0,0 @@ -package statemachine - -import ( - "context" - "reflect" - "sync" - - "github.com/filecoin-project/go-statestore" - "github.com/ipfs/go-datastore" - "golang.org/x/xerrors" -) - -type StateHandler interface { - // returns - Plan(events []Event, user interface{}) (interface{}, error) -} - -// StateGroup manages a group of state machines sharing the same logic -type StateGroup struct { - sts *statestore.StateStore - hnd StateHandler - stateType reflect.Type - - lk sync.Mutex - sms map[datastore.Key]*StateMachine -} - -// stateType: T - (MyStateStruct{}) -func New(ds datastore.Datastore, hnd StateHandler, stateType interface{}) *StateGroup { - return &StateGroup{ - sts: statestore.New(ds), - hnd: hnd, - stateType: reflect.TypeOf(stateType), - - sms: map[datastore.Key]*StateMachine{}, - } -} - -// Send sends an event to machine identified by `id`. -// `evt` is going to be passed into StateHandler.Planner, in the events[].User param -// -// If a state machine with the specified id doesn't exits, it's created, and it's -// state is set to zero-value of stateType provided in group constructor -func (s *StateGroup) Send(id interface{}, evt interface{}) (err error) { - s.lk.Lock() - defer s.lk.Unlock() - - sm, exist := s.sms[statestore.ToKey(id)] - if !exist { - sm, err = s.loadOrCreate(id) - if err != nil { - return xerrors.Errorf("loadOrCreate state: %w", err) - } - s.sms[statestore.ToKey(id)] = sm - } - - return sm.send(Event{User: evt}) -} - -func (s *StateGroup) loadOrCreate(name interface{}) (*StateMachine, error) { - exists, err := s.sts.Has(name) - if err != nil { - return nil, xerrors.Errorf("failed to check if state for %v exists: %w", name, err) - } - - if !exists { - userState := reflect.New(s.stateType).Interface() - - err = s.sts.Begin(name, userState) - if err != nil { - return nil, xerrors.Errorf("saving initial state: %w", err) - } - } - - res := &StateMachine{ - planner: s.hnd.Plan, - eventsIn: make(chan Event), - - name: name, - st: s.sts.Get(name), - stateType: s.stateType, - - stageDone: make(chan struct{}), - closing: make(chan struct{}), - closed: make(chan struct{}), - } - - go res.run() - - return res, nil -} - -// Stop stops all state machines in this group -func (s *StateGroup) Stop(ctx context.Context) error { - s.lk.Lock() - defer s.lk.Unlock() - - for _, sm := range s.sms { - if err := sm.stop(ctx); err != nil { - return err - } - } - - return nil -} - -// List outputs states of all state machines in this group -// out: *[]StateT -func (s *StateGroup) List(out interface{}) error { - return s.sts.List(out) -} - -// Get gets state for a single state machine -func (s *StateGroup) Get(id interface{}) *statestore.StoredState { - return s.sts.Get(id) -} diff --git a/lib/statemachine/machine.go b/lib/statemachine/machine.go deleted file mode 100644 index d30c9e7d4..000000000 --- a/lib/statemachine/machine.go +++ /dev/null @@ -1,127 +0,0 @@ -package statemachine - -import ( - "context" - "reflect" - "sync/atomic" - - "github.com/filecoin-project/go-statestore" - - logging "github.com/ipfs/go-log" -) - -var log = logging.Logger("evtsm") - -type Event struct { - User interface{} -} - -// TODO: This probably should be returning an int indicating partial event processing -// (or something like errPartial(nEvents)) -// returns func(ctx Context, st ) (func(*), error), where is the typeOf(User) param -type Planner func(events []Event, user interface{}) (interface{}, error) - -type StateMachine struct { - planner Planner - eventsIn chan Event - - name interface{} - st *statestore.StoredState - stateType reflect.Type - - stageDone chan struct{} - closing chan struct{} - closed chan struct{} - - busy int32 -} - -func (fsm *StateMachine) run() { - defer close(fsm.closed) - - var pendingEvents []Event - - for { - // NOTE: This requires at least one event to be sent to trigger a stage - // This means that after restarting the state machine users of this - // code must send a 'restart' event - select { - case evt := <-fsm.eventsIn: - pendingEvents = append(pendingEvents, evt) - case <-fsm.stageDone: - if len(pendingEvents) == 0 { - continue - } - case <-fsm.closing: - return - } - - if atomic.CompareAndSwapInt32(&fsm.busy, 0, 1) { - var nextStep interface{} - var ustate interface{} - - err := fsm.mutateUser(func(user interface{}) (err error) { - nextStep, err = fsm.planner(pendingEvents, user) - ustate = user - return err - }) - if err != nil { - log.Errorf("Executing event planner failed: %+v", err) - return - } - - pendingEvents = nil - - if nextStep == nil { - continue - } - - ctx := Context{ - ctx: context.TODO(), - send: func(evt interface{}) error { - return fsm.send(Event{User: evt}) - }, - } - - go func() { - res := reflect.ValueOf(nextStep).Call([]reflect.Value{reflect.ValueOf(ctx), reflect.ValueOf(ustate).Elem()}) - - if res[0].Interface() != nil { - log.Errorf("executing step: %+v", res[0].Interface().(error)) // TODO: propagate top level - return - } - - atomic.StoreInt32(&fsm.busy, 0) - fsm.stageDone <- struct{}{} - }() - - } - } -} - -func (fsm *StateMachine) mutateUser(cb func(user interface{}) error) error { - mutt := reflect.FuncOf([]reflect.Type{reflect.PtrTo(fsm.stateType)}, []reflect.Type{reflect.TypeOf(new(error)).Elem()}, false) - - mutf := reflect.MakeFunc(mutt, func(args []reflect.Value) (results []reflect.Value) { - err := cb(args[0].Interface()) - return []reflect.Value{reflect.ValueOf(&err).Elem()} - }) - - return fsm.st.Mutate(mutf.Interface()) -} - -func (fsm *StateMachine) send(evt Event) error { - fsm.eventsIn <- evt // TODO: ctx, at least - return nil -} - -func (fsm *StateMachine) stop(ctx context.Context) error { - close(fsm.closing) - - select { - case <-fsm.closed: - return nil - case <-ctx.Done(): - return ctx.Err() - } -} diff --git a/lib/statemachine/machine_test.go b/lib/statemachine/machine_test.go deleted file mode 100644 index 49b8324ce..000000000 --- a/lib/statemachine/machine_test.go +++ /dev/null @@ -1,105 +0,0 @@ -package statemachine - -import ( - "context" - "testing" - - "github.com/ipfs/go-datastore" - logging "github.com/ipfs/go-log" - "gotest.tools/assert" -) - -func init() { - logging.SetLogLevel("*", "INFO") -} - -type testHandler struct { - t *testing.T - proceed chan struct{} - done chan struct{} -} - -func (t *testHandler) Plan(events []Event, state interface{}) (interface{}, error) { - return t.plan(events, state.(*TestState)) -} - -func (t *testHandler) plan(events []Event, state *TestState) (func(Context, TestState) error, error) { - for _, event := range events { - e := event.User.(*TestEvent) - switch e.A { - case "restart": - case "start": - state.A = 1 - case "b": - state.A = 2 - state.B = e.Val - } - } - - switch state.A { - case 1: - return t.step0, nil - case 2: - return t.step1, nil - default: - t.t.Fatal(state.A) - } - panic("how?") -} - -func (t *testHandler) step0(ctx Context, st TestState) error { - ctx.Send(&TestEvent{A: "b", Val: 55}) - <-t.proceed - return nil -} - -func (t *testHandler) step1(ctx Context, st TestState) error { - assert.Equal(t.t, uint64(2), st.A) - - close(t.done) - return nil -} - -func TestBasic(t *testing.T) { - for i := 0; i < 1000; i++ { // run a few times to expose any races - ds := datastore.NewMapDatastore() - - th := &testHandler{t: t, done: make(chan struct{}), proceed: make(chan struct{})} - close(th.proceed) - smm := New(ds, th, TestState{}) - - if err := smm.Send(uint64(2), &TestEvent{A: "start"}); err != nil { - t.Fatalf("%+v", err) - } - - <-th.done - } -} - -func TestPersist(t *testing.T) { - for i := 0; i < 1000; i++ { // run a few times to expose any races - ds := datastore.NewMapDatastore() - - th := &testHandler{t: t, done: make(chan struct{}), proceed: make(chan struct{})} - smm := New(ds, th, TestState{}) - - if err := smm.Send(uint64(2), &TestEvent{A: "start"}); err != nil { - t.Fatalf("%+v", err) - } - - if err := smm.Stop(context.Background()); err != nil { - t.Fatal(err) - return - } - - smm = New(ds, th, TestState{}) - if err := smm.Send(uint64(2), &TestEvent{A: "restart"}); err != nil { - t.Fatalf("%+v", err) - } - close(th.proceed) - - <-th.done - } -} - -var _ StateHandler = &testHandler{} diff --git a/lib/statemachine/testing.go b/lib/statemachine/testing.go deleted file mode 100644 index 8a2c7e8c6..000000000 --- a/lib/statemachine/testing.go +++ /dev/null @@ -1,11 +0,0 @@ -package statemachine - -type TestState struct { - A uint64 - B uint64 -} - -type TestEvent struct { - A string - Val uint64 -} diff --git a/storage/sealing/fsm.go b/storage/sealing/fsm.go index 06ba11354..43be0077c 100644 --- a/storage/sealing/fsm.go +++ b/storage/sealing/fsm.go @@ -6,17 +6,18 @@ import ( "reflect" "time" - "github.com/filecoin-project/specs-actors/actors/abi" "golang.org/x/xerrors" + "github.com/filecoin-project/go-statemachine" + "github.com/filecoin-project/specs-actors/actors/abi" + "github.com/filecoin-project/lotus/api" - "github.com/filecoin-project/lotus/lib/statemachine" ) -func (m *Sealing) Plan(events []statemachine.Event, user interface{}) (interface{}, error) { +func (m *Sealing) Plan(events []statemachine.Event, user interface{}) (interface{}, uint64, error) { next, err := m.plan(events, user.(*SectorInfo)) if err != nil || next == nil { - return nil, err + return nil, 0, err } return func(ctx statemachine.Context, si SectorInfo) error { @@ -27,7 +28,7 @@ func (m *Sealing) Plan(events []statemachine.Event, user interface{}) (interface } return nil - }, nil + }, uint64(len(events)), nil // TODO: This processed event count is not very correct } var fsmPlanners = []func(events []statemachine.Event, state *SectorInfo) error{ diff --git a/storage/sealing/fsm_test.go b/storage/sealing/fsm_test.go index a4d53d018..41becc4f3 100644 --- a/storage/sealing/fsm_test.go +++ b/storage/sealing/fsm_test.go @@ -6,8 +6,9 @@ import ( logging "github.com/ipfs/go-log/v2" "github.com/stretchr/testify/require" + "github.com/filecoin-project/go-statemachine" + "github.com/filecoin-project/lotus/api" - "github.com/filecoin-project/lotus/lib/statemachine" ) func init() { diff --git a/storage/sealing/sealing.go b/storage/sealing/sealing.go index a3652485e..89294eb91 100644 --- a/storage/sealing/sealing.go +++ b/storage/sealing/sealing.go @@ -12,6 +12,7 @@ import ( "golang.org/x/xerrors" "github.com/filecoin-project/go-padreader" + "github.com/filecoin-project/go-statemachine" "github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/builtin/miner" "github.com/filecoin-project/specs-actors/actors/crypto" @@ -20,7 +21,6 @@ import ( "github.com/filecoin-project/lotus/chain/events" "github.com/filecoin-project/lotus/chain/store" "github.com/filecoin-project/lotus/chain/types" - "github.com/filecoin-project/lotus/lib/statemachine" "github.com/filecoin-project/lotus/storage/sealmgr" ) diff --git a/storage/sealing/states.go b/storage/sealing/states.go index b083096e9..e12860447 100644 --- a/storage/sealing/states.go +++ b/storage/sealing/states.go @@ -5,6 +5,7 @@ import ( "github.com/filecoin-project/specs-actors/actors/crypto" + "github.com/filecoin-project/go-statemachine" "github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/builtin" "github.com/filecoin-project/specs-actors/actors/builtin/miner" @@ -14,7 +15,6 @@ import ( "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain/actors" "github.com/filecoin-project/lotus/chain/types" - "github.com/filecoin-project/lotus/lib/statemachine" ) func (m *Sealing) handlePacking(ctx statemachine.Context, sector SectorInfo) error { diff --git a/storage/sealing/states_failed.go b/storage/sealing/states_failed.go index a7dfe36a9..d5667f57e 100644 --- a/storage/sealing/states_failed.go +++ b/storage/sealing/states_failed.go @@ -4,6 +4,7 @@ import ( "bytes" "time" + "github.com/filecoin-project/go-statemachine" "github.com/filecoin-project/specs-actors/actors/builtin/miner" "github.com/filecoin-project/specs-actors/actors/util/adt" "golang.org/x/xerrors" @@ -12,7 +13,6 @@ import ( "github.com/filecoin-project/lotus/api/apibstore" "github.com/filecoin-project/lotus/chain/store" "github.com/filecoin-project/lotus/chain/types" - "github.com/filecoin-project/lotus/lib/statemachine" ) const minRetryTime = 1 * time.Minute