diff --git a/chain/events/events.go b/chain/events/events.go index 408c8845e..c1ef07a4e 100644 --- a/chain/events/events.go +++ b/chain/events/events.go @@ -74,7 +74,7 @@ func NewEvents(ctx context.Context, api eventAPI) *Events { htHeights: map[abi.ChainEpoch][]uint64{}, }, - hcEvents: newHCEvents(ctx, api, tsc, uint64(gcConfidence)), + hcEvents: *newHCEvents(ctx, api, tsc, uint64(gcConfidence)), } e.ready.Add(1) diff --git a/chain/events/events_called.go b/chain/events/events_called.go index 1306b26ec..3d8e05c02 100644 --- a/chain/events/events_called.go +++ b/chain/events/events_called.go @@ -93,7 +93,7 @@ type hcEvents struct { watcherEvents } -func newHCEvents(ctx context.Context, cs eventAPI, tsc *tipSetCache, gcConfidence uint64) hcEvents { +func newHCEvents(ctx context.Context, cs eventAPI, tsc *tipSetCache, gcConfidence uint64) *hcEvents { e := hcEvents{ ctx: ctx, cs: cs, @@ -109,7 +109,7 @@ func newHCEvents(ctx context.Context, cs eventAPI, tsc *tipSetCache, gcConfidenc e.messageEvents = newMessageEvents(ctx, &e, cs) e.watcherEvents = newWatcherEvents(ctx, &e, cs) - return e + return &e } // Called when there is a change to the head with tipsets to be @@ -297,7 +297,7 @@ func (e *hcEvents) applyTimeouts(ts *types.TipSet) { } // Listen for an event -// - CheckFunc: immediately checks if the event already occured +// - CheckFunc: immediately checks if the event already occurred // - EventHandler: called when the event has occurred, after confidence tipsets // - RevertHandler: called if the chain head changes causing the event to revert // - confidence: wait this many tipsets before calling EventHandler @@ -351,17 +351,17 @@ type headChangeAPI interface { type watcherEvents struct { ctx context.Context cs eventAPI - hcApi headChangeAPI + hcAPI headChangeAPI lk sync.RWMutex matchers map[triggerID]StateMatchFunc } -func newWatcherEvents(ctx context.Context, hcApi headChangeAPI, cs eventAPI) watcherEvents { +func newWatcherEvents(ctx context.Context, hcAPI headChangeAPI, cs eventAPI) watcherEvents { return watcherEvents{ ctx: ctx, cs: cs, - hcApi: hcApi, + hcAPI: hcAPI, matchers: make(map[triggerID]StateMatchFunc), } } @@ -387,7 +387,7 @@ func (we *watcherEvents) checkStateChanges(oldState, newState *types.TipSet) map return res } -// A change in state +// StateChange represents a change in state type StateChange interface{} // StateChangeHandler arguments: @@ -437,7 +437,7 @@ func (we *watcherEvents) StateChanged(check CheckFunc, scHnd StateChangeHandler, return scHnd(prevTs, ts, states, height) } - id, err := we.hcApi.onHeadChanged(check, hnd, rev, confidence, timeout) + id, err := we.hcAPI.onHeadChanged(check, hnd, rev, confidence, timeout) if err != nil { return err } @@ -453,17 +453,17 @@ func (we *watcherEvents) StateChanged(check CheckFunc, scHnd StateChangeHandler, type messageEvents struct { ctx context.Context cs eventAPI - hcApi headChangeAPI + hcAPI headChangeAPI lk sync.RWMutex matchers map[triggerID][]MsgMatchFunc } -func newMessageEvents(ctx context.Context, hcApi headChangeAPI, cs eventAPI) messageEvents { +func newMessageEvents(ctx context.Context, hcAPI headChangeAPI, cs eventAPI) messageEvents { return messageEvents{ ctx: ctx, cs: cs, - hcApi: hcApi, + hcAPI: hcAPI, matchers: map[triggerID][]MsgMatchFunc{}, } } @@ -592,7 +592,7 @@ func (me *messageEvents) Called(check CheckFunc, msgHnd MsgHandler, rev RevertHa return msgHnd(msg, rec, ts, height) } - id, err := me.hcApi.onHeadChanged(check, hnd, rev, confidence, timeout) + id, err := me.hcAPI.onHeadChanged(check, hnd, rev, confidence, timeout) if err != nil { return err } diff --git a/chain/events/events_test.go b/chain/events/events_test.go index 2bb5b0916..5798fb75c 100644 --- a/chain/events/events_test.go +++ b/chain/events/events_test.go @@ -1068,7 +1068,7 @@ func TestRemoveTriggersOnMessage(t *testing.T) { type testStateChange struct { from string - to string + to string } func TestStateChanged(t *testing.T) { diff --git a/chain/events/state/predicates.go b/chain/events/state/predicates.go index 21d43adab..3245d5c03 100644 --- a/chain/events/state/predicates.go +++ b/chain/events/state/predicates.go @@ -2,41 +2,41 @@ package state import ( "context" - cbor "github.com/ipfs/go-ipld-cbor" - "github.com/ipfs/go-cid" - "github.com/filecoin-project/lotus/api/apibstore" "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-amt-ipld/v2" + "github.com/filecoin-project/lotus/api/apibstore" "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/builtin" "github.com/filecoin-project/specs-actors/actors/builtin/market" + "github.com/ipfs/go-cid" + cbor "github.com/ipfs/go-ipld-cbor" ) -// Data returned from the DiffFunc +// UserData is the data returned from the DiffFunc type UserData interface{} -// The calls made by this class external APIs -type ChainApi interface { +// ChainAPI abstracts out calls made by this class to external APIs +type ChainAPI interface { apibstore.ChainIO StateGetActor(ctx context.Context, actor address.Address, tsk types.TipSetKey) (*types.Actor, error) } -// Use StatePredicates to respond to state changes +// StatePredicates has common predicates for responding to state changes type StatePredicates struct { - api ChainApi + api ChainAPI cst *cbor.BasicIpldStore } -func NewStatePredicates(api ChainApi) *StatePredicates { +func NewStatePredicates(api ChainAPI) *StatePredicates { return &StatePredicates{ api: api, cst: cbor.NewCborStore(apibstore.NewAPIBlockstore(api)), } } -// Check if there's a change form oldState to newState, and return +// DiffFunc check if there's a change form oldState to newState, and returns // - changed: was there a change // - user: user-defined data representing the state change // - err @@ -44,7 +44,7 @@ type DiffFunc func(ctx context.Context, oldState, newState *types.TipSet) (chang type DiffStateFunc func(ctx context.Context, oldActorStateHead, newActorStateHead cid.Cid) (changed bool, user UserData, err error) -// Calls diffStateFunc when the state changes for the given actor +// OnActorStateChanged calls diffStateFunc when the state changes for the given actor func (sp *StatePredicates) OnActorStateChanged(addr address.Address, diffStateFunc DiffStateFunc) DiffFunc { return func(ctx context.Context, oldState, newState *types.TipSet) (changed bool, user UserData, err error) { oldActor, err := sp.api.StateGetActor(ctx, addr, oldState.Key()) @@ -52,6 +52,10 @@ func (sp *StatePredicates) OnActorStateChanged(addr address.Address, diffStateFu return false, nil, err } newActor, err := sp.api.StateGetActor(ctx, addr, newState.Key()) + if err != nil { + return false, nil, err + } + if oldActor.Head.Equals(newActor.Head) { return false, nil, nil } @@ -61,7 +65,7 @@ func (sp *StatePredicates) OnActorStateChanged(addr address.Address, diffStateFu type DiffStorageMarketStateFunc func(ctx context.Context, oldState *market.State, newState *market.State) (changed bool, user UserData, err error) -// Calls diffStorageMarketState when the state changes for the market actor +// OnStorageMarketActorChanged calls diffStorageMarketState when the state changes for the market actor func (sp *StatePredicates) OnStorageMarketActorChanged(diffStorageMarketState DiffStorageMarketStateFunc) DiffFunc { return sp.OnActorStateChanged(builtin.StorageMarketActorAddr, func(ctx context.Context, oldActorStateHead, newActorStateHead cid.Cid) (changed bool, user UserData, err error) { var oldState market.State @@ -78,7 +82,7 @@ func (sp *StatePredicates) OnStorageMarketActorChanged(diffStorageMarketState Di type DiffDealStatesFunc func(ctx context.Context, oldDealStateRoot *amt.Root, newDealStateRoot *amt.Root) (changed bool, user UserData, err error) -// Calls diffDealStates when the market state changes +// OnDealStateChanged calls diffDealStates when the market state changes func (sp *StatePredicates) OnDealStateChanged(diffDealStates DiffDealStatesFunc) DiffStorageMarketStateFunc { return func(ctx context.Context, oldState *market.State, newState *market.State) (changed bool, user UserData, err error) { if oldState.States.Equals(newState.States) { @@ -98,31 +102,31 @@ func (sp *StatePredicates) OnDealStateChanged(diffDealStates DiffDealStatesFunc) } } -// A set of changes to deal state +// ChangedDeals is a set of changes to deal state type ChangedDeals map[abi.DealID]DealStateChange -// Change in deal state from -> to +// DealStateChange is a change in deal state from -> to type DealStateChange struct { From market.DealState - To market.DealState + To market.DealState } -// Detect changes in the deal state AMT for the given deal IDs +// DealStateChangedForIDs detects changes in the deal state AMT for the given deal IDs func (sp *StatePredicates) DealStateChangedForIDs(dealIds []abi.DealID) DiffDealStatesFunc { return func(ctx context.Context, oldDealStateRoot *amt.Root, newDealStateRoot *amt.Root) (changed bool, user UserData, err error) { changedDeals := make(ChangedDeals) - for _, dealId := range dealIds { + for _, dealID := range dealIds { var oldDeal, newDeal market.DealState - err := oldDealStateRoot.Get(ctx, uint64(dealId), &oldDeal) + err := oldDealStateRoot.Get(ctx, uint64(dealID), &oldDeal) if err != nil { return false, nil, err } - err = newDealStateRoot.Get(ctx, uint64(dealId), &newDeal) + err = newDealStateRoot.Get(ctx, uint64(dealID), &newDeal) if err != nil { return false, nil, err } if oldDeal != newDeal { - changedDeals[dealId] = DealStateChange{oldDeal, newDeal} + changedDeals[dealID] = DealStateChange{oldDeal, newDeal} } } if len(changedDeals) > 0 { diff --git a/chain/events/state/predicates_test.go b/chain/events/state/predicates_test.go index 190f5bd2a..1c10209a8 100644 --- a/chain/events/state/predicates_test.go +++ b/chain/events/state/predicates_test.go @@ -2,9 +2,10 @@ package state import ( "context" + "testing" + "github.com/filecoin-project/specs-actors/actors/crypto" "github.com/ipfs/go-hamt-ipld" - "testing" "github.com/filecoin-project/go-amt-ipld/v2" "github.com/filecoin-project/specs-actors/actors/builtin/market" @@ -28,23 +29,23 @@ func init() { dummyCid, _ = cid.Parse("bafkqaaa") } -type mockApi struct { +type mockAPI struct { ts map[types.TipSetKey]*types.Actor bs bstore.Blockstore } -func newMockApi(bs bstore.Blockstore) *mockApi { - return &mockApi{ +func newMockAPI(bs bstore.Blockstore) *mockAPI { + return &mockAPI{ bs: bs, ts: make(map[types.TipSetKey]*types.Actor), } } -func (m mockApi) ChainHasObj(ctx context.Context, c cid.Cid) (bool, error) { +func (m mockAPI) ChainHasObj(ctx context.Context, c cid.Cid) (bool, error) { return m.bs.Has(c) } -func (m mockApi) ChainReadObj(ctx context.Context, c cid.Cid) ([]byte, error) { +func (m mockAPI) ChainReadObj(ctx context.Context, c cid.Cid) ([]byte, error) { blk, err := m.bs.Get(c) if err != nil { return nil, xerrors.Errorf("blockstore get: %w", err) @@ -53,11 +54,11 @@ func (m mockApi) ChainReadObj(ctx context.Context, c cid.Cid) ([]byte, error) { return blk.RawData(), nil } -func (m mockApi) StateGetActor(ctx context.Context, actor address.Address, tsk types.TipSetKey) (*types.Actor, error) { +func (m mockAPI) StateGetActor(ctx context.Context, actor address.Address, tsk types.TipSetKey) (*types.Actor, error) { return m.ts[tsk], nil } -func (m mockApi) setActor(tsk types.TipSetKey, act *types.Actor) { +func (m mockAPI) setActor(tsk types.TipSetKey, act *types.Actor) { m.ts[tsk] = act } @@ -79,7 +80,7 @@ func TestPredicates(t *testing.T) { SlashEpoch: 0, }, } - oldStateC := createMarketState(t, store, oldDeals, ctx) + oldStateC := createMarketState(ctx, t, store, oldDeals) newDeals := map[abi.DealID]*market.DealState{ abi.DealID(1): { @@ -93,7 +94,7 @@ func TestPredicates(t *testing.T) { SlashEpoch: 6, }, } - newStateC := createMarketState(t, store, newDeals, ctx) + newStateC := createMarketState(ctx, t, store, newDeals) miner, err := address.NewFromString("t00") require.NoError(t, err) @@ -102,7 +103,7 @@ func TestPredicates(t *testing.T) { newState, err := mockTipset(miner, 2) require.NoError(t, err) - api := newMockApi(bs) + api := newMockAPI(bs) api.setActor(oldState.Key(), &types.Actor{Head: oldStateC}) api.setActor(newState.Key(), &types.Actor{Head: newStateC}) @@ -149,8 +150,8 @@ func mockTipset(miner address.Address, timestamp uint64) (*types.TipSet, error) }}) } -func createMarketState(t *testing.T, store *cbornode.BasicIpldStore, deals map[abi.DealID]*market.DealState, ctx context.Context) cid.Cid { - rootCid := createAMT(t, store, deals, ctx) +func createMarketState(ctx context.Context, t *testing.T, store *cbornode.BasicIpldStore, deals map[abi.DealID]*market.DealState) cid.Cid { + rootCid := createAMT(ctx, t, store, deals) emptyArrayCid, err := amt.NewAMT(store).Flush(context.TODO()) require.NoError(t, err) @@ -164,10 +165,10 @@ func createMarketState(t *testing.T, store *cbornode.BasicIpldStore, deals map[a return stateC } -func createAMT(t *testing.T, store *cbornode.BasicIpldStore, deals map[abi.DealID]*market.DealState, ctx context.Context) cid.Cid { +func createAMT(ctx context.Context, t *testing.T, store *cbornode.BasicIpldStore, deals map[abi.DealID]*market.DealState) cid.Cid { root := amt.NewAMT(store) - for dealId, dealState := range deals { - err := root.Set(ctx, uint64(dealId), dealState) + for dealID, dealState := range deals { + err := root.Set(ctx, uint64(dealID), dealState) require.NoError(t, err) } rootCid, err := root.Flush(ctx)