diff --git a/chain/events/state/predicates.go b/chain/events/state/predicates.go index 912c6def6..8a45b53fb 100644 --- a/chain/events/state/predicates.go +++ b/chain/events/state/predicates.go @@ -4,13 +4,11 @@ import ( "bytes" "context" + "github.com/filecoin-project/go-address" "github.com/ipfs/go-cid" cbor "github.com/ipfs/go-ipld-cbor" typegen "github.com/whyrusleeping/cbor-gen" - "github.com/filecoin-project/go-address" - "github.com/filecoin-project/go-amt-ipld/v2" - "github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/builtin" "github.com/filecoin-project/specs-actors/actors/builtin/market" @@ -87,20 +85,25 @@ func (sp *StatePredicates) OnStorageMarketActorChanged(diffStorageMarketState Di }) } -type DiffDealStatesFunc func(ctx context.Context, oldDealStateRoot *amt.Root, newDealStateRoot *amt.Root) (changed bool, user UserData, err error) +type DiffAdtArraysFunc func(ctx context.Context, oldDealStateRoot, newDealStateRoot *adt.Array) (changed bool, user UserData, err error) -// OnDealStateChanged calls diffDealStates when the market state changes -func (sp *StatePredicates) OnDealStateChanged(diffDealStates DiffDealStatesFunc) DiffStorageMarketStateFunc { +// OnDealStateChanged calls diffDealStates when the market deal state changes +func (sp *StatePredicates) OnDealStateChanged(diffDealStates DiffAdtArraysFunc) DiffStorageMarketStateFunc { return func(ctx context.Context, oldState *market.State, newState *market.State) (changed bool, user UserData, err error) { if oldState.States.Equals(newState.States) { return false, nil, nil } - oldRoot, err := amt.LoadAMT(ctx, sp.cst, oldState.States) + ctxStore := &contextStore{ + ctx: ctx, + cst: sp.cst, + } + + oldRoot, err := adt.AsArray(ctxStore, oldState.States) if err != nil { return false, nil, err } - newRoot, err := amt.LoadAMT(ctx, sp.cst, newState.States) + newRoot, err := adt.AsArray(ctxStore, newState.States) if err != nil { return false, nil, err } @@ -109,39 +112,193 @@ func (sp *StatePredicates) OnDealStateChanged(diffDealStates DiffDealStatesFunc) } } +// OnDealProposalChanged calls diffDealProps when the market proposal state changes +func (sp *StatePredicates) OnDealProposalChanged(diffDealProps DiffAdtArraysFunc) DiffStorageMarketStateFunc { + return func(ctx context.Context, oldState *market.State, newState *market.State) (changed bool, user UserData, err error) { + if oldState.Proposals.Equals(newState.Proposals) { + return false, nil, nil + } + + ctxStore := &contextStore{ + ctx: ctx, + cst: sp.cst, + } + + oldRoot, err := adt.AsArray(ctxStore, oldState.Proposals) + if err != nil { + return false, nil, err + } + newRoot, err := adt.AsArray(ctxStore, newState.Proposals) + if err != nil { + return false, nil, err + } + + return diffDealProps(ctx, oldRoot, newRoot) + } +} + +var _ AdtArrayDiff = &MarketDealProposalChanges{} + +type MarketDealProposalChanges struct { + Added []ProposalIDState + Removed []ProposalIDState +} + +type ProposalIDState struct { + ID abi.DealID + Proposal market.DealProposal +} + +func (m *MarketDealProposalChanges) Add(key uint64, val *typegen.Deferred) error { + dp := new(market.DealProposal) + err := dp.UnmarshalCBOR(bytes.NewReader(val.Raw)) + if err != nil { + return err + } + m.Added = append(m.Added, ProposalIDState{abi.DealID(key), *dp}) + return nil +} + +func (m *MarketDealProposalChanges) Modify(key uint64, from, to *typegen.Deferred) error { + // short circuit, DealProposals are static + return nil +} + +func (m *MarketDealProposalChanges) Remove(key uint64, val *typegen.Deferred) error { + dp := new(market.DealProposal) + err := dp.UnmarshalCBOR(bytes.NewReader(val.Raw)) + if err != nil { + return err + } + m.Removed = append(m.Removed, ProposalIDState{abi.DealID(key), *dp}) + return nil +} + +// OnDealProposalAmtChanged detects changes in the deal proposal AMT for all deal proposals and returns a MarketProposalsChanges structure containing: +// - Added Proposals +// - Modified Proposals +// - Removed Proposals +func (sp *StatePredicates) OnDealProposalAmtChanged() DiffAdtArraysFunc { + return func(ctx context.Context, oldDealProps, newDealProps *adt.Array) (changed bool, user UserData, err error) { + proposalChanges := new(MarketDealProposalChanges) + if err := DiffAdtArray(oldDealProps, newDealProps, proposalChanges); err != nil { + return false, nil, err + } + + if len(proposalChanges.Added)+len(proposalChanges.Removed) == 0 { + return false, nil, nil + } + + return true, proposalChanges, nil + } +} + +var _ AdtArrayDiff = &MarketDealStateChanges{} + +type MarketDealStateChanges struct { + Added []DealIDState + Modified []DealStateChange + Removed []DealIDState +} + +type DealIDState struct { + ID abi.DealID + Deal market.DealState +} + +func (m *MarketDealStateChanges) Add(key uint64, val *typegen.Deferred) error { + ds := new(market.DealState) + err := ds.UnmarshalCBOR(bytes.NewReader(val.Raw)) + if err != nil { + return err + } + m.Added = append(m.Added, DealIDState{abi.DealID(key), *ds}) + return nil +} + +func (m *MarketDealStateChanges) Modify(key uint64, from, to *typegen.Deferred) error { + dsFrom := new(market.DealState) + if err := dsFrom.UnmarshalCBOR(bytes.NewReader(from.Raw)); err != nil { + return err + } + + dsTo := new(market.DealState) + if err := dsTo.UnmarshalCBOR(bytes.NewReader(to.Raw)); err != nil { + return err + } + + if *dsFrom != *dsTo { + m.Modified = append(m.Modified, DealStateChange{abi.DealID(key), dsFrom, dsTo}) + } + return nil +} + +func (m *MarketDealStateChanges) Remove(key uint64, val *typegen.Deferred) error { + ds := new(market.DealState) + err := ds.UnmarshalCBOR(bytes.NewReader(val.Raw)) + if err != nil { + return err + } + m.Removed = append(m.Removed, DealIDState{abi.DealID(key), *ds}) + return nil +} + +// OnDealStateAmtChanged detects changes in the deal state AMT for all deal states and returns a MarketDealStateChanges structure containing: +// - Added Deals +// - Modified Deals +// - Removed Deals +func (sp *StatePredicates) OnDealStateAmtChanged() DiffAdtArraysFunc { + return func(ctx context.Context, oldDealStates, newDealStates *adt.Array) (changed bool, user UserData, err error) { + dealStateChanges := new(MarketDealStateChanges) + if err := DiffAdtArray(oldDealStates, newDealStates, dealStateChanges); err != nil { + return false, nil, err + } + + if len(dealStateChanges.Added)+len(dealStateChanges.Modified)+len(dealStateChanges.Removed) == 0 { + return false, nil, nil + } + + return true, dealStateChanges, nil + } +} + // ChangedDeals is a set of changes to deal state type ChangedDeals map[abi.DealID]DealStateChange // DealStateChange is a change in deal state from -> to type DealStateChange struct { + ID abi.DealID From *market.DealState To *market.DealState } // DealStateChangedForIDs detects changes in the deal state AMT for the given deal IDs -func (sp *StatePredicates) DealStateChangedForIDs(dealIds []abi.DealID) DiffDealStatesFunc { - return func(ctx context.Context, oldDealStateRoot *amt.Root, newDealStateRoot *amt.Root) (changed bool, user UserData, err error) { +func (sp *StatePredicates) DealStateChangedForIDs(dealIds []abi.DealID) DiffAdtArraysFunc { + return func(ctx context.Context, oldDealStateArray, newDealStateArray *adt.Array) (changed bool, user UserData, err error) { changedDeals := make(ChangedDeals) for _, dealID := range dealIds { var oldDealPtr, newDealPtr *market.DealState var oldDeal, newDeal market.DealState // If the deal has been removed, we just set it to nil - err := oldDealStateRoot.Get(ctx, uint64(dealID), &oldDeal) - if err == nil { - oldDealPtr = &oldDeal - } else if _, ok := err.(*amt.ErrNotFound); !ok { + found, err := oldDealStateArray.Get(uint64(dealID), &oldDeal) + if err != nil { return false, nil, err } - err = newDealStateRoot.Get(ctx, uint64(dealID), &newDeal) - if err == nil { - newDealPtr = &newDeal - } else if _, ok := err.(*amt.ErrNotFound); !ok { + if found { + oldDealPtr = &oldDeal + } + + found, err = newDealStateArray.Get(uint64(dealID), &newDeal) + if err != nil { return false, nil, err } + if found { + newDealPtr = &newDeal + } if oldDeal != newDeal { - changedDeals[dealID] = DealStateChange{oldDealPtr, newDealPtr} + changedDeals[dealID] = DealStateChange{dealID, oldDealPtr, newDealPtr} } } if len(changedDeals) > 0 { diff --git a/chain/events/state/predicates_test.go b/chain/events/state/predicates_test.go index 1573b84d8..9b85cf9c0 100644 --- a/chain/events/state/predicates_test.go +++ b/chain/events/state/predicates_test.go @@ -21,6 +21,7 @@ import ( "github.com/filecoin-project/specs-actors/actors/builtin/market" "github.com/filecoin-project/specs-actors/actors/builtin/miner" "github.com/filecoin-project/specs-actors/actors/crypto" + "github.com/filecoin-project/specs-actors/actors/util/adt" tutils "github.com/filecoin-project/specs-actors/support/testing" "github.com/filecoin-project/lotus/chain/types" @@ -65,98 +66,219 @@ func (m mockAPI) setActor(tsk types.TipSetKey, act *types.Actor) { m.ts[tsk] = act } -func TestPredicates(t *testing.T) { +func TestMarketPredicates(t *testing.T) { ctx := context.Background() bs := bstore.NewBlockstore(ds_sync.MutexWrap(ds.NewMapDatastore())) store := cbornode.NewCborStore(bs) + oldDeal1 := &market.DealState{ + SectorStartEpoch: 1, + LastUpdatedEpoch: 2, + SlashEpoch: 0, + } + oldDeal2 := &market.DealState{ + SectorStartEpoch: 4, + LastUpdatedEpoch: 5, + SlashEpoch: 0, + } oldDeals := map[abi.DealID]*market.DealState{ - abi.DealID(1): { - SectorStartEpoch: 1, - LastUpdatedEpoch: 2, - }, - abi.DealID(2): { - SectorStartEpoch: 4, - LastUpdatedEpoch: 5, - }, + abi.DealID(1): oldDeal1, + abi.DealID(2): oldDeal2, } - oldStateC := createMarketState(ctx, t, store, oldDeals) + oldProp1 := &market.DealProposal{ + PieceCID: dummyCid, + PieceSize: 0, + VerifiedDeal: false, + Client: tutils.NewIDAddr(t, 1), + Provider: tutils.NewIDAddr(t, 1), + StartEpoch: 1, + EndEpoch: 2, + StoragePricePerEpoch: big.Zero(), + ProviderCollateral: big.Zero(), + ClientCollateral: big.Zero(), + } + oldProp2 := &market.DealProposal{ + PieceCID: dummyCid, + PieceSize: 0, + VerifiedDeal: false, + Client: tutils.NewIDAddr(t, 1), + Provider: tutils.NewIDAddr(t, 1), + StartEpoch: 2, + EndEpoch: 3, + StoragePricePerEpoch: big.Zero(), + ProviderCollateral: big.Zero(), + ClientCollateral: big.Zero(), + } + oldProps := map[abi.DealID]*market.DealProposal{ + abi.DealID(1): oldProp1, + abi.DealID(2): oldProp2, + } + + oldStateC := createMarketState(ctx, t, store, oldDeals, oldProps) + + newDeal1 := &market.DealState{ + SectorStartEpoch: 1, + LastUpdatedEpoch: 3, + SlashEpoch: 0, + } + + // deal 2 removed + + // added + newDeal3 := &market.DealState{ + SectorStartEpoch: 1, + LastUpdatedEpoch: 2, + SlashEpoch: 3, + } newDeals := map[abi.DealID]*market.DealState{ - abi.DealID(1): { - SectorStartEpoch: 1, - LastUpdatedEpoch: 3, - }, + abi.DealID(1): newDeal1, + // deal 2 was removed + abi.DealID(3): newDeal3, } - newStateC := createMarketState(ctx, t, store, newDeals) - miner, err := address.NewFromString("t00") + // added + newProp3 := &market.DealProposal{ + PieceCID: dummyCid, + PieceSize: 0, + VerifiedDeal: false, + Client: tutils.NewIDAddr(t, 1), + Provider: tutils.NewIDAddr(t, 1), + StartEpoch: 4, + EndEpoch: 4, + StoragePricePerEpoch: big.Zero(), + ProviderCollateral: big.Zero(), + ClientCollateral: big.Zero(), + } + newProps := map[abi.DealID]*market.DealProposal{ + abi.DealID(1): oldProp1, // 1 was persisted + // prop 2 was removed + abi.DealID(3): newProp3, // new + // NB: DealProposals cannot be modified, so don't test that case. + } + newStateC := createMarketState(ctx, t, store, newDeals, newProps) + + minerAddr, err := address.NewFromString("t00") require.NoError(t, err) - oldState, err := mockTipset(miner, 1) + oldState, err := mockTipset(minerAddr, 1) require.NoError(t, err) - newState, err := mockTipset(miner, 2) + newState, err := mockTipset(minerAddr, 2) require.NoError(t, err) api := newMockAPI(bs) api.setActor(oldState.Key(), &types.Actor{Head: oldStateC}) api.setActor(newState.Key(), &types.Actor{Head: newStateC}) - preds := NewStatePredicates(api) + t.Run("deal ID predicate", func(t *testing.T) { + preds := NewStatePredicates(api) - dealIds := []abi.DealID{abi.DealID(1), abi.DealID(2)} - diffFn := preds.OnStorageMarketActorChanged(preds.OnDealStateChanged(preds.DealStateChangedForIDs(dealIds))) + dealIds := []abi.DealID{abi.DealID(1), abi.DealID(2)} + diffIDFn := preds.OnStorageMarketActorChanged(preds.OnDealStateChanged(preds.DealStateChangedForIDs(dealIds))) - // Diff a state against itself: expect no change - changed, _, err := diffFn(ctx, oldState.Key(), oldState.Key()) - require.NoError(t, err) - require.False(t, changed) + // Diff a state against itself: expect no change + changed, _, err := diffIDFn(ctx, oldState.Key(), oldState.Key()) + require.NoError(t, err) + require.False(t, changed) - // Diff old state against new state - changed, val, err := diffFn(ctx, oldState.Key(), newState.Key()) - require.NoError(t, err) - require.True(t, changed) + // Diff old state against new state + changed, valIDs, err := diffIDFn(ctx, oldState.Key(), newState.Key()) + require.NoError(t, err) + require.True(t, changed) - changedDeals, ok := val.(ChangedDeals) - require.True(t, ok) - require.Len(t, changedDeals, 2) - require.Contains(t, changedDeals, abi.DealID(1)) - require.Contains(t, changedDeals, abi.DealID(2)) - deal1 := changedDeals[abi.DealID(1)] - if deal1.From.LastUpdatedEpoch != 2 || deal1.To.LastUpdatedEpoch != 3 { - t.Fatal("Unexpected change to LastUpdatedEpoch") - } - deal2 := changedDeals[abi.DealID(2)] - if deal2.From.LastUpdatedEpoch != 5 || deal2.To != nil { - t.Fatal("Expected To to be nil") - } + changedDealIDs, ok := valIDs.(ChangedDeals) + require.True(t, ok) + require.Len(t, changedDealIDs, 2) + require.Contains(t, changedDealIDs, abi.DealID(1)) + require.Contains(t, changedDealIDs, abi.DealID(2)) + deal1 := changedDealIDs[abi.DealID(1)] + if deal1.From.LastUpdatedEpoch != 2 || deal1.To.LastUpdatedEpoch != 3 { + t.Fatal("Unexpected change to LastUpdatedEpoch") + } + deal2 := changedDealIDs[abi.DealID(2)] + if deal2.From.LastUpdatedEpoch != 5 || deal2.To != nil { + t.Fatal("Expected To to be nil") + } - // Diff with non-existent deal. - noDeal := []abi.DealID{3} - diffNoDealFn := preds.OnStorageMarketActorChanged(preds.OnDealStateChanged(preds.DealStateChangedForIDs(noDeal))) - changed, _, err = diffNoDealFn(ctx, oldState.Key(), newState.Key()) - require.NoError(t, err) - require.False(t, changed) + // Diff with non-existent deal. + noDeal := []abi.DealID{4} + diffNoDealFn := preds.OnStorageMarketActorChanged(preds.OnDealStateChanged(preds.DealStateChangedForIDs(noDeal))) + changed, _, err = diffNoDealFn(ctx, oldState.Key(), newState.Key()) + require.NoError(t, err) + require.False(t, changed) - // Test that OnActorStateChanged does not call the callback if the state has not changed - mockAddr, err := address.NewFromString("t01") - require.NoError(t, err) - actorDiffFn := preds.OnActorStateChanged(mockAddr, func(context.Context, cid.Cid, cid.Cid) (bool, UserData, error) { - t.Fatal("No state change so this should not be called") - return false, nil, nil + // Test that OnActorStateChanged does not call the callback if the state has not changed + mockAddr, err := address.NewFromString("t01") + require.NoError(t, err) + actorDiffFn := preds.OnActorStateChanged(mockAddr, func(context.Context, cid.Cid, cid.Cid) (bool, UserData, error) { + t.Fatal("No state change so this should not be called") + return false, nil, nil + }) + changed, _, err = actorDiffFn(ctx, oldState.Key(), oldState.Key()) + require.NoError(t, err) + require.False(t, changed) + + // Test that OnDealStateChanged does not call the callback if the state has not changed + diffDealStateFn := preds.OnDealStateChanged(func(context.Context, *adt.Array, *adt.Array) (bool, UserData, error) { + t.Fatal("No state change so this should not be called") + return false, nil, nil + }) + marketState := createEmptyMarketState(t, store) + changed, _, err = diffDealStateFn(ctx, marketState, marketState) + require.NoError(t, err) + require.False(t, changed) }) - changed, _, err = actorDiffFn(ctx, oldState.Key(), oldState.Key()) - require.NoError(t, err) - require.False(t, changed) - // Test that OnDealStateChanged does not call the callback if the state has not changed - diffDealStateFn := preds.OnDealStateChanged(func(context.Context, *amt.Root, *amt.Root) (bool, UserData, error) { - t.Fatal("No state change so this should not be called") - return false, nil, nil + t.Run("deal state array predicate", func(t *testing.T) { + preds := NewStatePredicates(api) + diffArrFn := preds.OnStorageMarketActorChanged(preds.OnDealStateChanged(preds.OnDealStateAmtChanged())) + + changed, _, err := diffArrFn(ctx, oldState.Key(), oldState.Key()) + require.NoError(t, err) + require.False(t, changed) + + changed, valArr, err := diffArrFn(ctx, oldState.Key(), newState.Key()) + require.NoError(t, err) + require.True(t, changed) + + changedDeals, ok := valArr.(*MarketDealStateChanges) + require.True(t, ok) + require.Len(t, changedDeals.Added, 1) + require.Equal(t, abi.DealID(3), changedDeals.Added[0].ID) + require.Equal(t, *newDeal3, changedDeals.Added[0].Deal) + + require.Len(t, changedDeals.Removed, 1) + + require.Len(t, changedDeals.Modified, 1) + require.Equal(t, abi.DealID(1), changedDeals.Modified[0].ID) + require.Equal(t, newDeal1, changedDeals.Modified[0].To) + require.Equal(t, oldDeal1, changedDeals.Modified[0].From) + + require.Equal(t, abi.DealID(2), changedDeals.Removed[0].ID) + }) + + t.Run("deal proposal array predicate", func(t *testing.T) { + preds := NewStatePredicates(api) + diffArrFn := preds.OnStorageMarketActorChanged(preds.OnDealProposalChanged(preds.OnDealProposalAmtChanged())) + changed, _, err := diffArrFn(ctx, oldState.Key(), oldState.Key()) + require.NoError(t, err) + require.False(t, changed) + + changed, valArr, err := diffArrFn(ctx, oldState.Key(), newState.Key()) + require.NoError(t, err) + require.True(t, changed) + + changedProps, ok := valArr.(*MarketDealProposalChanges) + require.True(t, ok) + require.Len(t, changedProps.Added, 1) + require.Equal(t, abi.DealID(3), changedProps.Added[0].ID) + require.Equal(t, *newProp3, changedProps.Added[0].Proposal) + + // proposals cannot be modified -- no modified testing + + require.Len(t, changedProps.Removed, 1) + require.Equal(t, abi.DealID(2), changedProps.Removed[0].ID) + require.Equal(t, *oldProp2, changedProps.Removed[0].Proposal) }) - marketState := createEmptyMarketState(t, store) - changed, _, err = diffDealStateFn(ctx, marketState, marketState) - require.NoError(t, err) - require.False(t, changed) } func TestMinerSectorChange(t *testing.T) { @@ -207,14 +329,15 @@ func TestMinerSectorChange(t *testing.T) { require.True(t, ok) require.Equal(t, len(sectorChanges.Added), 1) - require.Equal(t, sectorChanges.Added[0], si3) + require.Equal(t, 1, len(sectorChanges.Added)) + require.Equal(t, si3, sectorChanges.Added[0]) - require.Equal(t, len(sectorChanges.Removed), 1) - require.Equal(t, sectorChanges.Removed[0], si0) + require.Equal(t, 1, len(sectorChanges.Removed)) + require.Equal(t, si0, sectorChanges.Removed[0]) - require.Equal(t, len(sectorChanges.Extended), 1) - require.Equal(t, sectorChanges.Extended[0].From, si1) - require.Equal(t, sectorChanges.Extended[0].To, si1Ext) + require.Equal(t, 1, len(sectorChanges.Extended)) + require.Equal(t, si1, sectorChanges.Extended[0].From) + require.Equal(t, si1Ext, sectorChanges.Extended[0].To) change, val, err = minerDiffFn(ctx, oldState.Key(), oldState.Key()) require.NoError(t, err) @@ -229,20 +352,20 @@ func TestMinerSectorChange(t *testing.T) { sectorChanges, ok = val.(*MinerSectorChanges) require.True(t, ok) - require.Equal(t, len(sectorChanges.Added), 1) - require.Equal(t, sectorChanges.Added[0], si0) + require.Equal(t, 1, len(sectorChanges.Added)) + require.Equal(t, si0, sectorChanges.Added[0]) - require.Equal(t, len(sectorChanges.Removed), 1) - require.Equal(t, sectorChanges.Removed[0], si3) + require.Equal(t, 1, len(sectorChanges.Removed)) + require.Equal(t, si3, sectorChanges.Removed[0]) - require.Equal(t, len(sectorChanges.Extended), 1) - require.Equal(t, sectorChanges.Extended[0].To, si1) - require.Equal(t, sectorChanges.Extended[0].From, si1Ext) + require.Equal(t, 1, len(sectorChanges.Extended)) + require.Equal(t, si1, sectorChanges.Extended[0].To) + require.Equal(t, si1Ext, sectorChanges.Extended[0].From) } -func mockTipset(miner address.Address, timestamp uint64) (*types.TipSet, error) { +func mockTipset(minerAddr address.Address, timestamp uint64) (*types.TipSet, error) { return types.NewTipSet([]*types.BlockHeader{{ - Miner: miner, + Miner: minerAddr, Height: 5, ParentStateRoot: dummyCid, Messages: dummyCid, @@ -253,11 +376,13 @@ func mockTipset(miner address.Address, timestamp uint64) (*types.TipSet, error) }}) } -func createMarketState(ctx context.Context, t *testing.T, store *cbornode.BasicIpldStore, deals map[abi.DealID]*market.DealState) cid.Cid { - rootCid := createDealAMT(ctx, t, store, deals) +func createMarketState(ctx context.Context, t *testing.T, store *cbornode.BasicIpldStore, deals map[abi.DealID]*market.DealState, props map[abi.DealID]*market.DealProposal) cid.Cid { + dealRootCid := createDealAMT(ctx, t, store, deals) + propRootCid := createProposalAMT(ctx, t, store, props) state := createEmptyMarketState(t, store) - state.States = rootCid + state.States = dealRootCid + state.Proposals = propRootCid stateC, err := store.Put(ctx, state) require.NoError(t, err) @@ -283,6 +408,17 @@ func createDealAMT(ctx context.Context, t *testing.T, store *cbornode.BasicIpldS return rootCid } +func createProposalAMT(ctx context.Context, t *testing.T, store *cbornode.BasicIpldStore, props map[abi.DealID]*market.DealProposal) cid.Cid { + root := amt.NewAMT(store) + for dealID, prop := range props { + err := root.Set(ctx, uint64(dealID), prop) + require.NoError(t, err) + } + rootCid, err := root.Flush(ctx) + require.NoError(t, err) + return rootCid +} + func createMinerState(ctx context.Context, t *testing.T, store *cbornode.BasicIpldStore, owner, worker address.Address, sectors []miner.SectorOnChainInfo) cid.Cid { rootCid := createSectorsAMT(ctx, t, store, sectors) diff --git a/chain/sync.go b/chain/sync.go index df067f1e9..99301ec2a 100644 --- a/chain/sync.go +++ b/chain/sync.go @@ -1073,17 +1073,14 @@ func extractSyncState(ctx context.Context) *SyncerState { // collectHeaders collects the headers from the blocks between any two tipsets. // -// `from` is the heaviest/projected/target tipset we have learned about, and -// `to` is usually an anchor tipset we already have in our view of the chain +// `incoming` is the heaviest/projected/target tipset we have learned about, and +// `known` is usually an anchor tipset we already have in our view of the chain // (which could be the genesis). // // collectHeaders checks if portions of the chain are in our ChainStore; falling // down to the network to retrieve the missing parts. If during the process, any // portion we receive is in our denylist (bad list), we short-circuit. // -// {hint/naming}: `from` and `to` is in inverse order. `from` is the highest, -// and `to` is the lowest. This method traverses the chain backwards. -// // {hint/usage}: This is used by collectChain, which is in turn called from the // main Sync method (Syncer#Sync), so it's a pretty central method. // @@ -1093,7 +1090,7 @@ func extractSyncState(ctx context.Context) *SyncerState { // bad. // 2. Check the consistency of beacon entries in the from tipset. We check // total equality of the BeaconEntries in each block. -// 3. Travers the chain backwards, for each tipset: +// 3. Traverse the chain backwards, for each tipset: // 3a. Load it from the chainstore; if found, it move on to its parent. // 3b. Query our peers via BlockSync in batches, requesting up to a // maximum of 500 tipsets every time. @@ -1104,40 +1101,40 @@ func extractSyncState(ctx context.Context) *SyncerState { // // All throughout the process, we keep checking if the received blocks are in // the deny list, and short-circuit the process if so. -func (syncer *Syncer) collectHeaders(ctx context.Context, from *types.TipSet, to *types.TipSet) ([]*types.TipSet, error) { +func (syncer *Syncer) collectHeaders(ctx context.Context, incoming *types.TipSet, known *types.TipSet) ([]*types.TipSet, error) { ctx, span := trace.StartSpan(ctx, "collectHeaders") defer span.End() ss := extractSyncState(ctx) span.AddAttributes( - trace.Int64Attribute("fromHeight", int64(from.Height())), - trace.Int64Attribute("toHeight", int64(to.Height())), + trace.Int64Attribute("incomingHeight", int64(incoming.Height())), + trace.Int64Attribute("knownHeight", int64(known.Height())), ) // Check if the parents of the from block are in the denylist. // i.e. if a fork of the chain has been requested that we know to be bad. - for _, pcid := range from.Parents().Cids() { + for _, pcid := range incoming.Parents().Cids() { if reason, ok := syncer.bad.Has(pcid); ok { newReason := reason.Linked("linked to %s", pcid) - for _, b := range from.Cids() { + for _, b := range incoming.Cids() { syncer.bad.Add(b, newReason) } - return nil, xerrors.Errorf("chain linked to block marked previously as bad (%s, %s) (reason: %s)", from.Cids(), pcid, reason) + return nil, xerrors.Errorf("chain linked to block marked previously as bad (%s, %s) (reason: %s)", incoming.Cids(), pcid, reason) } } { // ensure consistency of beacon entires - targetBE := from.Blocks()[0].BeaconEntries + targetBE := incoming.Blocks()[0].BeaconEntries sorted := sort.SliceIsSorted(targetBE, func(i, j int) bool { return targetBE[i].Round < targetBE[j].Round }) if !sorted { - syncer.bad.Add(from.Cids()[0], NewBadBlockReason(from.Cids(), "wrong order of beacon entires")) + syncer.bad.Add(incoming.Cids()[0], NewBadBlockReason(incoming.Cids(), "wrong order of beacon entires")) return nil, xerrors.Errorf("wrong order of beacon entires") } - for _, bh := range from.Blocks()[1:] { + for _, bh := range incoming.Blocks()[1:] { if len(targetBE) != len(bh.BeaconEntries) { // cannot mark bad, I think @Kubuxu return nil, xerrors.Errorf("tipset contained different number for beacon entires") @@ -1152,12 +1149,12 @@ func (syncer *Syncer) collectHeaders(ctx context.Context, from *types.TipSet, to } } - blockSet := []*types.TipSet{from} + blockSet := []*types.TipSet{incoming} - at := from.Parents() + at := incoming.Parents() // we want to sync all the blocks until the height above the block we have - untilHeight := to.Height() + 1 + untilHeight := known.Height() + 1 ss.SetHeight(blockSet[len(blockSet)-1].Height()) @@ -1172,7 +1169,7 @@ loop: syncer.bad.Add(b, newReason) } - return nil, xerrors.Errorf("chain contained block marked previously as bad (%s, %s) (reason: %s)", from.Cids(), bc, reason) + return nil, xerrors.Errorf("chain contained block marked previously as bad (%s, %s) (reason: %s)", incoming.Cids(), bc, reason) } } @@ -1221,7 +1218,7 @@ loop: syncer.bad.Add(b, newReason) } - return nil, xerrors.Errorf("chain contained block marked previously as bad (%s, %s) (reason: %s)", from.Cids(), bc, reason) + return nil, xerrors.Errorf("chain contained block marked previously as bad (%s, %s) (reason: %s)", incoming.Cids(), bc, reason) } } blockSet = append(blockSet, b) @@ -1233,23 +1230,23 @@ loop: at = blks[len(blks)-1].Parents() } - if !types.CidArrsEqual(blockSet[len(blockSet)-1].Parents().Cids(), to.Cids()) { - last := blockSet[len(blockSet)-1] - if last.Parents() == to.Parents() { + // base is the tipset in the candidate chain at the height equal to our known tipset height. + if base := blockSet[len(blockSet)-1]; !types.CidArrsEqual(base.Parents().Cids(), known.Cids()) { + if base.Parents() == known.Parents() { // common case: receiving a block thats potentially part of the same tipset as our best block return blockSet, nil } // We have now ascertained that this is *not* a 'fast forward' - log.Warnf("(fork detected) synced header chain (%s - %d) does not link to our best block (%s - %d)", from.Cids(), from.Height(), to.Cids(), to.Height()) - fork, err := syncer.syncFork(ctx, last, to) + log.Warnf("(fork detected) synced header chain (%s - %d) does not link to our best block (%s - %d)", incoming.Cids(), incoming.Height(), known.Cids(), known.Height()) + fork, err := syncer.syncFork(ctx, base, known) if err != nil { if xerrors.Is(err, ErrForkTooLong) { // TODO: we're marking this block bad in the same way that we mark invalid blocks bad. Maybe distinguish? log.Warn("adding forked chain to our bad tipset cache") - for _, b := range from.Blocks() { - syncer.bad.Add(b.Cid(), NewBadBlockReason(from.Cids(), "fork past finality")) + for _, b := range incoming.Blocks() { + syncer.bad.Add(b.Cid(), NewBadBlockReason(incoming.Cids(), "fork past finality")) } } return nil, xerrors.Errorf("failed to sync fork: %w", err) @@ -1269,13 +1266,13 @@ var ErrForkTooLong = fmt.Errorf("fork longer than threshold") // If the fork is too long (build.ForkLengthThreshold), we add the entire subchain to the // denylist. Else, we find the common ancestor, and add the missing chain // fragment until the fork point to the returned []TipSet. -func (syncer *Syncer) syncFork(ctx context.Context, from *types.TipSet, to *types.TipSet) ([]*types.TipSet, error) { - tips, err := syncer.Bsync.GetBlocks(ctx, from.Parents(), int(build.ForkLengthThreshold)) +func (syncer *Syncer) syncFork(ctx context.Context, incoming *types.TipSet, known *types.TipSet) ([]*types.TipSet, error) { + tips, err := syncer.Bsync.GetBlocks(ctx, incoming.Parents(), int(build.ForkLengthThreshold)) if err != nil { return nil, err } - nts, err := syncer.store.LoadTipSet(to.Parents()) + nts, err := syncer.store.LoadTipSet(known.Parents()) if err != nil { return nil, xerrors.Errorf("failed to load next local tipset: %w", err) } @@ -1285,7 +1282,7 @@ func (syncer *Syncer) syncFork(ctx context.Context, from *types.TipSet, to *type if !syncer.Genesis.Equals(nts) { return nil, xerrors.Errorf("somehow synced chain that linked back to a different genesis (bad genesis: %s)", nts.Key()) } - return nil, xerrors.Errorf("synced chain forked at genesis, refusing to sync") + return nil, xerrors.Errorf("synced chain forked at genesis, refusing to sync; incoming: %s") } if nts.Equals(tips[cur]) { diff --git a/chain/types/bigint_test.go b/chain/types/bigint_test.go index d337bfc85..b66528db3 100644 --- a/chain/types/bigint_test.go +++ b/chain/types/bigint_test.go @@ -43,7 +43,7 @@ func TestBigIntSerializationRoundTrip(t *testing.T) { func TestFilRoundTrip(t *testing.T) { testValues := []string{ - "0", "1", "1.001", "100.10001", "101100", "5000.01", "5000", + "0 FIL", "1 FIL", "1.001 FIL", "100.10001 FIL", "101100 FIL", "5000.01 FIL", "5000 FIL", } for _, v := range testValues { diff --git a/chain/types/fil.go b/chain/types/fil.go index 527078e0f..1d912d9c0 100644 --- a/chain/types/fil.go +++ b/chain/types/fil.go @@ -13,9 +13,9 @@ type FIL BigInt func (f FIL) String() string { r := new(big.Rat).SetFrac(f.Int, big.NewInt(int64(build.FilecoinPrecision))) if r.Sign() == 0 { - return "0" + return "0 FIL" } - return strings.TrimRight(strings.TrimRight(r.FloatString(18), "0"), ".") + return strings.TrimRight(strings.TrimRight(r.FloatString(18), "0"), ".") + " FIL" } func (f FIL) Format(s fmt.State, ch rune) { @@ -28,14 +28,35 @@ func (f FIL) Format(s fmt.State, ch rune) { } func ParseFIL(s string) (FIL, error) { + suffix := strings.TrimLeft(s, ".1234567890") + s = s[:len(s)-len(suffix)] + var attofil bool + if suffix != "" { + norm := strings.ToLower(strings.TrimSpace(suffix)) + switch norm { + case "", "fil": + case "attofil", "afil": + attofil = true + default: + return FIL{}, fmt.Errorf("unrecognized suffix: %q", suffix) + } + } + r, ok := new(big.Rat).SetString(s) if !ok { return FIL{}, fmt.Errorf("failed to parse %q as a decimal number", s) } - r = r.Mul(r, big.NewRat(int64(build.FilecoinPrecision), 1)) + if !attofil { + r = r.Mul(r, big.NewRat(int64(build.FilecoinPrecision), 1)) + } + if !r.IsInt() { - return FIL{}, fmt.Errorf("invalid FIL value: %q", s) + var pref string + if attofil { + pref = "atto" + } + return FIL{}, fmt.Errorf("invalid %sFIL value: %q", pref, s) } return FIL{r.Num()}, nil diff --git a/cli/client.go b/cli/client.go index 8c2ad7eb8..f80ed99ec 100644 --- a/cli/client.go +++ b/cli/client.go @@ -485,6 +485,8 @@ var clientFindCmd = &cli.Command{ }, } +const DefaultMaxRetrievePrice = 1 + var clientRetrieveCmd = &cli.Command{ Name: "retrieve", Usage: "retrieve data from network", @@ -502,6 +504,10 @@ var clientRetrieveCmd = &cli.Command{ Name: "miner", Usage: "miner address for retrieval, if not present it'll use local discovery", }, + &cli.StringFlag{ + Name: "maxPrice", + Usage: fmt.Sprintf("maximum price the client is willing to consider (default: %d FIL)", DefaultMaxRetrievePrice), + }, &cli.StringFlag{ Name: "pieceCid", Usage: "require data to be retrieved from a specific Piece CID", @@ -560,6 +566,11 @@ var clientRetrieveCmd = &cli.Command{ minerStrAddr := cctx.String("miner") if minerStrAddr == "" { // Local discovery offers, err := fapi.ClientFindData(ctx, file, pieceCid) + + // sort by price low to high + sort.Slice(offers, func(i, j int) bool { + return offers[i].MinPrice.LessThan(offers[j].MinPrice) + }) if err != nil { return err } @@ -584,6 +595,21 @@ var clientRetrieveCmd = &cli.Command{ return fmt.Errorf("The received offer errored: %s", offer.Err) } + maxPrice := types.FromFil(DefaultMaxRetrievePrice) + + if cctx.String("maxPrice") != "" { + maxPriceFil, err := types.ParseFIL(cctx.String("maxPrice")) + if err != nil { + return xerrors.Errorf("parsing maxPrice: %w", err) + } + + maxPrice = types.BigInt(maxPriceFil) + } + + if offer.MinPrice.GreaterThan(maxPrice) { + return xerrors.Errorf("failed to find offer satisfying maxPrice: %s", maxPrice) + } + ref := &lapi.FileRef{ Path: cctx.Args().Get(1), IsCAR: cctx.Bool("car"), diff --git a/cmd/lotus-chainwatch/main.go b/cmd/lotus-chainwatch/main.go index b5ceb7348..d3c5a570b 100644 --- a/cmd/lotus-chainwatch/main.go +++ b/cmd/lotus-chainwatch/main.go @@ -51,7 +51,7 @@ func main() { if err := app.Run(os.Args); err != nil { log.Warnf("%+v", err) - return + os.Exit(1) } } diff --git a/cmd/lotus-chainwatch/storage.go b/cmd/lotus-chainwatch/storage.go index 726a7a187..ae94d9dcb 100644 --- a/cmd/lotus-chainwatch/storage.go +++ b/cmd/lotus-chainwatch/storage.go @@ -294,6 +294,30 @@ create table if not exists miner_info primary key (miner_id) ); +/* +* captures chain-specific power state for any given stateroot +*/ +create table if not exists chain_power +( + state_root text not null + constraint chain_power_pk + primary key, + baseline_power text not null +); + +/* +* captures miner-specific power state for any given stateroot +*/ +create table if not exists miner_power +( + miner_id text not null, + state_root text not null, + raw_bytes_power text not null, + quality_adjusted_power text not null, + constraint miner_power_pk + primary key (miner_id, state_root) +); + /* used to tell when a miners sectors (proven-not-yet-expired) changed if the miner_sectors_cid's are different a new sector was added or removed (terminated/expired) */ create table if not exists miner_sectors_heads ( @@ -500,6 +524,46 @@ func (st *storage) storeActors(actors map[address.Address]map[types.Actor]actorI return nil } +// storeChainPower captures reward actor state as it relates to power captured on-chain +func (st *storage) storeChainPower(rewardTips map[types.TipSetKey]*rewardStateInfo) error { + tx, err := st.db.Begin() + if err != nil { + return xerrors.Errorf("begin chain_power tx: %w", err) + } + + if _, err := tx.Exec(`create temp table cp (like chain_power excluding constraints) on commit drop`); err != nil { + return xerrors.Errorf("prep chain_power temp: %w", err) + } + + stmt, err := tx.Prepare(`copy cp (state_root, baseline_power) from STDIN`) + if err != nil { + return xerrors.Errorf("prepare tmp chain_power: %w", err) + } + + for _, rewardState := range rewardTips { + if _, err := stmt.Exec( + rewardState.stateroot.String(), + rewardState.baselinePower.String(), + ); err != nil { + log.Errorw("failed to store chain power", "state_root", rewardState.stateroot, "error", err) + } + } + + if err := stmt.Close(); err != nil { + return xerrors.Errorf("close prepared chain_power: %w", err) + } + + if _, err := tx.Exec(`insert into chain_power select * from cp on conflict do nothing`); err != nil { + return xerrors.Errorf("insert chain_power from tmp: %w", err) + } + + if err := tx.Commit(); err != nil { + return xerrors.Errorf("commit chain_power tx: %w", err) + } + + return nil +} + type storeSectorsAPI interface { StateMinerSectors(context.Context, address.Address, *abi.BitField, bool, types.TipSetKey) ([]*api.ChainSectorInfo, error) } @@ -607,6 +671,50 @@ func (st *storage) storeMiners(minerTips map[types.TipSetKey][]*minerStateInfo) return tx.Commit() } +// storeMinerPower captures miner actor state as it relates to power per miner captured on-chain +func (st *storage) storeMinerPower(minerTips map[types.TipSetKey][]*minerStateInfo) error { + tx, err := st.db.Begin() + if err != nil { + return xerrors.Errorf("begin miner_power tx: %w", err) + } + + if _, err := tx.Exec(`create temp table mp (like miner_power excluding constraints) on commit drop`); err != nil { + return xerrors.Errorf("prep miner_power temp: %w", err) + } + + stmt, err := tx.Prepare(`copy mp (miner_id, state_root, raw_bytes_power, quality_adjusted_power) from STDIN`) + if err != nil { + return xerrors.Errorf("prepare tmp miner_power: %w", err) + } + + for _, miners := range minerTips { + for _, minerInfo := range miners { + if _, err := stmt.Exec( + minerInfo.addr.String(), + minerInfo.stateroot.String(), + minerInfo.rawPower.String(), + minerInfo.qalPower.String(), + ); err != nil { + log.Errorw("failed to store miner power", "miner", minerInfo.addr, "stateroot", minerInfo.stateroot, "error", err) + } + } + } + + if err := stmt.Close(); err != nil { + return xerrors.Errorf("close prepared miner_power: %w", err) + } + + if _, err := tx.Exec(`insert into miner_power select * from mp on conflict do nothing`); err != nil { + return xerrors.Errorf("insert miner_power from tmp: %w", err) + } + + if err := tx.Commit(); err != nil { + return xerrors.Errorf("commit miner_power tx: %w", err) + } + + return nil +} + func (st *storage) storeMinerSectorsHeads(minerTips map[types.TipSetKey][]*minerStateInfo, api api.FullNode) error { tx, err := st.db.Begin() if err != nil { diff --git a/cmd/lotus-chainwatch/sync.go b/cmd/lotus-chainwatch/sync.go index 3627072e8..00c5ecac9 100644 --- a/cmd/lotus-chainwatch/sync.go +++ b/cmd/lotus-chainwatch/sync.go @@ -13,12 +13,14 @@ import ( "github.com/filecoin-project/go-address" "github.com/ipfs/go-cid" cbg "github.com/whyrusleeping/cbor-gen" + "golang.org/x/xerrors" "github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/abi/big" "github.com/filecoin-project/specs-actors/actors/builtin" "github.com/filecoin-project/specs-actors/actors/builtin/miner" "github.com/filecoin-project/specs-actors/actors/builtin/power" + "github.com/filecoin-project/specs-actors/actors/builtin/reward" "github.com/filecoin-project/specs-actors/actors/util/adt" "github.com/filecoin-project/lotus/api" @@ -53,6 +55,11 @@ func runSyncer(ctx context.Context, api api.FullNode, st *storage, maxBatch int) }() } +type rewardStateInfo struct { + stateroot cid.Cid + baselinePower big.Int +} + type minerStateInfo struct { // common addr address.Address @@ -273,6 +280,8 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types. } }) + // map of tipset to reward state + rewardTips := make(map[types.TipSetKey]*rewardStateInfo, len(changes)) // map of tipset to all miners that had a head-change at that tipset. minerTips := make(map[types.TipSetKey][]*minerStateInfo, len(changes)) // heads we've seen, im being paranoid @@ -302,40 +311,74 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types. alk.Unlock() }) - log.Infof("Getting miner info") + log.Infof("Getting actor change info") minerChanges := 0 for addr, m := range actors { for actor, c := range m { - if actor.Code != builtin.StorageMinerActorCodeID { + // reward actor + if actor.Code == builtin.RewardActorCodeID { + rewardTips[c.tsKey] = &rewardStateInfo{ + stateroot: c.stateroot, + baselinePower: big.Zero(), + } continue } - // only want miner actors with head change events - if _, found := headsSeen[actor.Head]; found { - continue + // miner actors with head change events + if actor.Code == builtin.StorageMinerActorCodeID { + if _, found := headsSeen[actor.Head]; found { + continue + } + minerChanges++ + + minerTips[c.tsKey] = append(minerTips[c.tsKey], &minerStateInfo{ + addr: addr, + act: actor, + stateroot: c.stateroot, + + tsKey: c.tsKey, + parentTsKey: c.parentTsKey, + + state: miner.State{}, + info: nil, + + rawPower: big.Zero(), + qalPower: big.Zero(), + }) + + headsSeen[actor.Head] = struct{}{} } - minerChanges++ - - minerTips[c.tsKey] = append(minerTips[c.tsKey], &minerStateInfo{ - addr: addr, - act: actor, - stateroot: c.stateroot, - - tsKey: c.tsKey, - parentTsKey: c.parentTsKey, - - state: miner.State{}, - info: nil, - - rawPower: big.Zero(), - qalPower: big.Zero(), - }) - - headsSeen[actor.Head] = struct{}{} + continue } } + rewardProcessingStartedAt := time.Now() + parmap.Par(50, parmap.KVMapArr(rewardTips), func(it func() (types.TipSetKey, *rewardStateInfo)) { + tsKey, rewardInfo := it() + // get reward actor states at each tipset once for all updates + rewardActor, err := api.StateGetActor(ctx, builtin.RewardActorAddr, tsKey) + if err != nil { + log.Error(xerrors.Errorf("get reward state (@ %s): %w", rewardInfo.stateroot.String(), err)) + return + } + + rewardStateRaw, err := api.ChainReadObj(ctx, rewardActor.Head) + if err != nil { + log.Error(xerrors.Errorf("read state obj (@ %s): %w", rewardInfo.stateroot.String(), err)) + return + } + + var rewardActorState reward.State + if err := rewardActorState.UnmarshalCBOR(bytes.NewReader(rewardStateRaw)); err != nil { + log.Error(xerrors.Errorf("unmarshal state (@ %s): %w", rewardInfo.stateroot.String(), err)) + return + } + + rewardInfo.baselinePower = rewardActorState.BaselinePower + }) + log.Infow("Completed Reward Processing", "duration", time.Since(rewardProcessingStartedAt).String(), "processed", len(rewardTips)) + minerProcessingStartedAt := time.Now() log.Infow("Processing miners", "numTips", len(minerTips), "numMinerChanges", minerChanges) // extract the power actor state at each tipset, loop over all miners that changed at said tipset and extract their @@ -415,25 +458,35 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types. } log.Info("Storing actors") - if err := st.storeActors(actors); err != nil { log.Error(err) return } + chainPowerStartedAt := time.Now() + if err := st.storeChainPower(rewardTips); err != nil { + log.Error(err) + } + log.Infow("Stored chain power", "duration", time.Since(chainPowerStartedAt).String()) + log.Info("Storing miners") if err := st.storeMiners(minerTips); err != nil { log.Error(err) return } - log.Info("Storing miner sectors") + minerPowerStartedAt := time.Now() + if err := st.storeMinerPower(minerTips); err != nil { + log.Error(err) + } + log.Infow("Stored miner power", "duration", time.Since(minerPowerStartedAt).String()) + sectorStart := time.Now() if err := st.storeSectors(minerTips, api); err != nil { log.Error(err) return } - log.Infow("Finished storing miner sectors", "duration", time.Since(sectorStart).String()) + log.Infow("Stored miner sectors", "duration", time.Since(sectorStart).String()) log.Info("Storing miner sectors heads") if err := st.storeMinerSectorsHeads(minerTips, api); err != nil { diff --git a/cmd/lotus-seal-worker/main.go b/cmd/lotus-seal-worker/main.go index ff45687f8..2ec186eaa 100644 --- a/cmd/lotus-seal-worker/main.go +++ b/cmd/lotus-seal-worker/main.go @@ -236,7 +236,7 @@ var runCmd = &cli.Command{ { // init datastore for r.Exists - _, err := lr.Datastore("/") + _, err := lr.Datastore("/metadata") if err != nil { return err } diff --git a/cmd/lotus-shed/verifreg.go b/cmd/lotus-shed/verifreg.go index 61701e8c0..747a233a5 100644 --- a/cmd/lotus-shed/verifreg.go +++ b/cmd/lotus-shed/verifreg.go @@ -3,6 +3,7 @@ package main import ( "bytes" "fmt" + "github.com/filecoin-project/go-address" "github.com/filecoin-project/lotus/build" "github.com/urfave/cli/v2" @@ -199,7 +200,7 @@ var verifRegListVerifiersCmd = &cli.Command{ return err } - vh, err := hamt.LoadNode(ctx, cst, st.Verifiers) + vh, err := hamt.LoadNode(ctx, cst, st.Verifiers, hamt.UseTreeBitWidth(5)) if err != nil { return err } @@ -251,7 +252,7 @@ var verifRegListClientsCmd = &cli.Command{ return err } - vh, err := hamt.LoadNode(ctx, cst, st.VerifiedClients) + vh, err := hamt.LoadNode(ctx, cst, st.VerifiedClients, hamt.UseTreeBitWidth(5)) if err != nil { return err } @@ -346,7 +347,7 @@ var verifRegCheckVerifierCmd = &cli.Command{ return err } - vh, err := hamt.LoadNode(ctx, cst, st.Verifiers) + vh, err := hamt.LoadNode(ctx, cst, st.Verifiers, hamt.UseTreeBitWidth(5)) if err != nil { return err } diff --git a/cmd/lotus-storage-miner/init.go b/cmd/lotus-storage-miner/init.go index 14972c69a..208c17d03 100644 --- a/cmd/lotus-storage-miner/init.go +++ b/cmd/lotus-storage-miner/init.go @@ -359,10 +359,8 @@ func migratePreSealMeta(ctx context.Context, api lapi.FullNode, metadata string, }*/ } - log.Infof("Setting next sector ID to %d", maxSectorID+1) - buf := make([]byte, binary.MaxVarintLen64) - size := binary.PutUvarint(buf, uint64(maxSectorID+1)) + size := binary.PutUvarint(buf, uint64(maxSectorID)) return mds.Put(datastore.NewKey(modules.StorageCounterDSPrefix), buf[:size]) } diff --git a/lib/lotuslog/levels.go b/lib/lotuslog/levels.go index ae3959568..d8b4678b7 100644 --- a/lib/lotuslog/levels.go +++ b/lib/lotuslog/levels.go @@ -18,4 +18,6 @@ func SetupLogLevels() { _ = logging.SetLogLevel("stores", "DEBUG") _ = logging.SetLogLevel("nat", "INFO") } + // Always mute RtRefreshManager because it breaks terminals + _ = logging.SetLogLevel("dht/RtRefreshManager", "FATAL") } diff --git a/node/repo/importmgr/mbstore.go b/node/repo/importmgr/mbstore.go index 889752cf2..3b6058bee 100644 --- a/node/repo/importmgr/mbstore.go +++ b/node/repo/importmgr/mbstore.go @@ -8,7 +8,6 @@ import ( blocks "github.com/ipfs/go-block-format" "github.com/ipfs/go-cid" - "github.com/ipfs/go-datastore" blockstore "github.com/ipfs/go-ipfs-blockstore" ) @@ -63,7 +62,7 @@ func (m *multiReadBs) Get(cid cid.Cid) (blocks.Block, error) { } if merr == nil { - return nil, datastore.ErrNotFound + return nil, blockstore.ErrNotFound } return nil, merr diff --git a/paychmgr/paych.go b/paychmgr/paych.go index 763c448f9..64ce6e3a7 100644 --- a/paychmgr/paych.go +++ b/paychmgr/paych.go @@ -4,7 +4,10 @@ import ( "bytes" "context" "fmt" - "math" + + "github.com/filecoin-project/specs-actors/actors/abi/big" + + "github.com/filecoin-project/lotus/api" cborutil "github.com/filecoin-project/go-cbor-util" "github.com/filecoin-project/specs-actors/actors/builtin" @@ -34,9 +37,14 @@ type ManagerApi struct { full.StateAPI } +type StateManagerApi interface { + LoadActorState(ctx context.Context, a address.Address, out interface{}, ts *types.TipSet) (*types.Actor, error) + Call(ctx context.Context, msg *types.Message, ts *types.TipSet) (*api.InvocResult, error) +} + type Manager struct { store *Store - sm *stmgr.StateManager + sm StateManagerApi mpool full.MpoolAPI wallet full.WalletAPI @@ -54,84 +62,24 @@ func NewManager(sm *stmgr.StateManager, pchstore *Store, api ManagerApi) *Manage } } -func maxLaneFromState(st *paych.State) (uint64, error) { - maxLane := uint64(math.MaxInt64) - for _, state := range st.LaneStates { - if (state.ID)+1 > maxLane+1 { - maxLane = state.ID - } +// Used by the tests to supply mocks +func newManager(sm StateManagerApi, pchstore *Store) *Manager { + return &Manager{ + store: pchstore, + sm: sm, } - return maxLane, nil -} - -func (pm *Manager) TrackInboundChannel(ctx context.Context, ch address.Address) error { - _, st, err := pm.loadPaychState(ctx, ch) - if err != nil { - return err - } - - var account account.State - _, err = pm.sm.LoadActorState(ctx, st.From, &account, nil) - if err != nil { - return err - } - from := account.Address - _, err = pm.sm.LoadActorState(ctx, st.To, &account, nil) - if err != nil { - return err - } - to := account.Address - - maxLane, err := maxLaneFromState(st) - if err != nil { - return err - } - - return pm.store.TrackChannel(&ChannelInfo{ - Channel: ch, - Control: to, - Target: from, - - Direction: DirInbound, - NextLane: maxLane + 1, - }) -} - -func (pm *Manager) loadOutboundChannelInfo(ctx context.Context, ch address.Address) (*ChannelInfo, error) { - _, st, err := pm.loadPaychState(ctx, ch) - if err != nil { - return nil, err - } - - maxLane, err := maxLaneFromState(st) - if err != nil { - return nil, err - } - - var account account.State - _, err = pm.sm.LoadActorState(ctx, st.From, &account, nil) - if err != nil { - return nil, err - } - from := account.Address - _, err = pm.sm.LoadActorState(ctx, st.To, &account, nil) - if err != nil { - return nil, err - } - to := account.Address - - return &ChannelInfo{ - Channel: ch, - Control: from, - Target: to, - - Direction: DirOutbound, - NextLane: maxLane + 1, - }, nil } func (pm *Manager) TrackOutboundChannel(ctx context.Context, ch address.Address) error { - ci, err := pm.loadOutboundChannelInfo(ctx, ch) + return pm.trackChannel(ctx, ch, DirOutbound) +} + +func (pm *Manager) TrackInboundChannel(ctx context.Context, ch address.Address) error { + return pm.trackChannel(ctx, ch, DirInbound) +} + +func (pm *Manager) trackChannel(ctx context.Context, ch address.Address, dir uint64) error { + ci, err := pm.loadStateChannelInfo(ctx, ch, dir) if err != nil { return err } @@ -147,62 +95,91 @@ func (pm *Manager) GetChannelInfo(addr address.Address) (*ChannelInfo, error) { return pm.store.getChannelInfo(addr) } -// checks if the given voucher is valid (is or could become spendable at some point) +// CheckVoucherValid checks if the given voucher is valid (is or could become spendable at some point) func (pm *Manager) CheckVoucherValid(ctx context.Context, ch address.Address, sv *paych.SignedVoucher) error { - act, pca, err := pm.loadPaychState(ctx, ch) + _, err := pm.checkVoucherValid(ctx, ch, sv) + return err +} + +func (pm *Manager) checkVoucherValid(ctx context.Context, ch address.Address, sv *paych.SignedVoucher) (map[uint64]*paych.LaneState, error) { + act, pchState, err := pm.loadPaychState(ctx, ch) if err != nil { - return err + return nil, err } var account account.State - _, err = pm.sm.LoadActorState(ctx, pca.From, &account, nil) + _, err = pm.sm.LoadActorState(ctx, pchState.From, &account, nil) if err != nil { - return err + return nil, err } from := account.Address // verify signature vb, err := sv.SigningBytes() if err != nil { - return err + return nil, err } // TODO: technically, either party may create and sign a voucher. // However, for now, we only accept them from the channel creator. // More complex handling logic can be added later if err := sigs.Verify(sv.Signature, from, vb); err != nil { - return err + return nil, err } - sendAmount := sv.Amount - - // now check the lane state - // TODO: should check against vouchers in our local store too - // there might be something conflicting - ls := findLane(pca.LaneStates, uint64(sv.Lane)) - if ls == nil { - } else { - if (ls.Nonce) >= sv.Nonce { - return fmt.Errorf("nonce too low") - } - - sendAmount = types.BigSub(sv.Amount, ls.Redeemed) + // Check the voucher against the highest known voucher nonce / value + laneStates, err := pm.laneState(pchState, ch) + if err != nil { + return nil, err } - // TODO: also account for vouchers on other lanes we've received - newTotal := types.BigAdd(sendAmount, pca.ToSend) + // If the new voucher nonce value is less than the highest known + // nonce for the lane + ls, lsExists := laneStates[sv.Lane] + if lsExists && sv.Nonce <= ls.Nonce { + return nil, fmt.Errorf("nonce too low") + } + + // If the voucher amount is less than the highest known voucher amount + if lsExists && sv.Amount.LessThanEqual(ls.Redeemed) { + return nil, fmt.Errorf("voucher amount is lower than amount for voucher with lower nonce") + } + + // Total redeemed is the total redeemed amount for all lanes, including + // the new voucher + // eg + // + // lane 1 redeemed: 3 + // lane 2 redeemed: 2 + // voucher for lane 1: 5 + // + // Voucher supersedes lane 1 redeemed, therefore + // effective lane 1 redeemed: 5 + // + // lane 1: 5 + // lane 2: 2 + // - + // total: 7 + totalRedeemed, err := pm.totalRedeemedWithVoucher(laneStates, sv) + if err != nil { + return nil, err + } + + // Total required balance = total redeemed + toSend + // Must not exceed actor balance + newTotal := types.BigAdd(totalRedeemed, pchState.ToSend) if act.Balance.LessThan(newTotal) { - return fmt.Errorf("not enough funds in channel to cover voucher") + return nil, fmt.Errorf("not enough funds in channel to cover voucher") } if len(sv.Merges) != 0 { - return fmt.Errorf("dont currently support paych lane merges") + return nil, fmt.Errorf("dont currently support paych lane merges") } - return nil + return laneStates, nil } -// checks if the given voucher is currently spendable +// CheckVoucherSpendable checks if the given voucher is currently spendable func (pm *Manager) CheckVoucherSpendable(ctx context.Context, ch address.Address, sv *paych.SignedVoucher, secret []byte, proof []byte) (bool, error) { owner, err := pm.getPaychOwner(ctx, ch) if err != nil { @@ -267,10 +244,6 @@ func (pm *Manager) getPaychOwner(ctx context.Context, ch address.Address) (addre } func (pm *Manager) AddVoucher(ctx context.Context, ch address.Address, sv *paych.SignedVoucher, proof []byte, minDelta types.BigInt) (types.BigInt, error) { - if err := pm.CheckVoucherValid(ctx, ch, sv); err != nil { - return types.NewInt(0), err - } - pm.store.lk.Lock() defer pm.store.lk.Unlock() @@ -279,16 +252,7 @@ func (pm *Manager) AddVoucher(ctx context.Context, ch address.Address, sv *paych return types.NewInt(0), err } - laneState, err := pm.laneState(ctx, ch, uint64(sv.Lane)) - if err != nil { - return types.NewInt(0), err - } - - if minDelta.GreaterThan(types.NewInt(0)) && laneState.Nonce > sv.Nonce { - return types.NewInt(0), xerrors.Errorf("already storing voucher with higher nonce; %d > %d", laneState.Nonce, sv.Nonce) - } - - // look for duplicates + // Check if the voucher has already been added for i, v := range ci.Vouchers { eq, err := cborutil.Equals(sv, v.Voucher) if err != nil { @@ -297,27 +261,41 @@ func (pm *Manager) AddVoucher(ctx context.Context, ch address.Address, sv *paych if !eq { continue } - if v.Proof != nil { - if !bytes.Equal(v.Proof, proof) { - log.Warnf("AddVoucher: multiple proofs for single voucher, storing both") - break + + // This is a duplicate voucher. + // Update the proof on the existing voucher + if len(proof) > 0 && !bytes.Equal(v.Proof, proof) { + log.Warnf("AddVoucher: adding proof to stored voucher") + ci.Vouchers[i] = &VoucherInfo{ + Voucher: v.Voucher, + Proof: proof, } - log.Warnf("AddVoucher: voucher re-added with matching proof") - return types.NewInt(0), nil + + return types.NewInt(0), pm.store.putChannelInfo(ci) } - log.Warnf("AddVoucher: adding proof to stored voucher") - ci.Vouchers[i] = &VoucherInfo{ - Voucher: v.Voucher, - Proof: proof, - } - - return types.NewInt(0), pm.store.putChannelInfo(ci) + // Otherwise just ignore the duplicate voucher + log.Warnf("AddVoucher: voucher re-added with matching proof") + return types.NewInt(0), nil } - delta := types.BigSub(sv.Amount, laneState.Redeemed) + // Check voucher validity + laneStates, err := pm.checkVoucherValid(ctx, ch, sv) + if err != nil { + return types.NewInt(0), err + } + + // The change in value is the delta between the voucher amount and + // the highest previous voucher amount for the lane + laneState, exists := laneStates[sv.Lane] + redeemed := big.NewInt(0) + if exists { + redeemed = laneState.Redeemed + } + + delta := types.BigSub(sv.Amount, redeemed) if minDelta.GreaterThan(delta) { - return delta, xerrors.Errorf("addVoucher: supplied token amount too low; minD=%s, D=%s; laneAmt=%s; v.Amt=%s", minDelta, delta, laneState.Redeemed, sv.Amount) + return delta, xerrors.Errorf("addVoucher: supplied token amount too low; minD=%s, D=%s; laneAmt=%s; v.Amt=%s", minDelta, delta, redeemed, sv.Amount) } ci.Vouchers = append(ci.Vouchers, &VoucherInfo{ @@ -325,7 +303,7 @@ func (pm *Manager) AddVoucher(ctx context.Context, ch address.Address, sv *paych Proof: proof, }) - if ci.NextLane <= (sv.Lane) { + if ci.NextLane <= sv.Lane { ci.NextLane = sv.Lane + 1 } @@ -333,6 +311,7 @@ func (pm *Manager) AddVoucher(ctx context.Context, ch address.Address, sv *paych } func (pm *Manager) AllocateLane(ch address.Address) (uint64, error) { + // TODO: should this take into account lane state? return pm.store.AllocateLane(ch) } @@ -355,6 +334,7 @@ func (pm *Manager) OutboundChanTo(from, to address.Address) (address.Address, er } func (pm *Manager) NextNonceForLane(ctx context.Context, ch address.Address, lane uint64) (uint64, error) { + // TODO: should this take into account lane state? vouchers, err := pm.store.VouchersForPaych(ch) if err != nil { return 0, err @@ -362,9 +342,9 @@ func (pm *Manager) NextNonceForLane(ctx context.Context, ch address.Address, lan var maxnonce uint64 for _, v := range vouchers { - if uint64(v.Voucher.Lane) == lane { - if uint64(v.Voucher.Nonce) > maxnonce { - maxnonce = uint64(v.Voucher.Nonce) + if v.Voucher.Lane == lane { + if v.Voucher.Nonce > maxnonce { + maxnonce = v.Voucher.Nonce } } } diff --git a/paychmgr/paych_test.go b/paychmgr/paych_test.go new file mode 100644 index 000000000..64e344ea7 --- /dev/null +++ b/paychmgr/paych_test.go @@ -0,0 +1,720 @@ +package paychmgr + +import ( + "context" + "fmt" + "sync" + "testing" + + "github.com/filecoin-project/specs-actors/actors/builtin" + "github.com/ipfs/go-cid" + + "github.com/filecoin-project/lotus/lib/sigs" + "github.com/filecoin-project/specs-actors/actors/crypto" + + "github.com/stretchr/testify/require" + + "github.com/filecoin-project/specs-actors/actors/abi/big" + + "github.com/filecoin-project/specs-actors/actors/abi" + tutils "github.com/filecoin-project/specs-actors/support/testing" + + "github.com/filecoin-project/specs-actors/actors/builtin/paych" + + "github.com/filecoin-project/specs-actors/actors/builtin/account" + + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/chain/types" + + ds "github.com/ipfs/go-datastore" + ds_sync "github.com/ipfs/go-datastore/sync" +) + +type testPchState struct { + actor *types.Actor + state paych.State +} + +type mockStateManager struct { + lk sync.Mutex + accountState map[address.Address]account.State + paychState map[address.Address]testPchState + response *api.InvocResult +} + +func newMockStateManager() *mockStateManager { + return &mockStateManager{ + accountState: make(map[address.Address]account.State), + paychState: make(map[address.Address]testPchState), + } +} + +func (sm *mockStateManager) setAccountState(a address.Address, state account.State) { + sm.lk.Lock() + defer sm.lk.Unlock() + sm.accountState[a] = state +} + +func (sm *mockStateManager) setPaychState(a address.Address, actor *types.Actor, state paych.State) { + sm.lk.Lock() + defer sm.lk.Unlock() + sm.paychState[a] = testPchState{actor, state} +} + +func (sm *mockStateManager) LoadActorState(ctx context.Context, a address.Address, out interface{}, ts *types.TipSet) (*types.Actor, error) { + sm.lk.Lock() + defer sm.lk.Unlock() + + if outState, ok := out.(*account.State); ok { + *outState = sm.accountState[a] + return nil, nil + } + if outState, ok := out.(*paych.State); ok { + info := sm.paychState[a] + *outState = info.state + return info.actor, nil + } + panic(fmt.Sprintf("unexpected state type %v", out)) +} + +func (sm *mockStateManager) Call(ctx context.Context, msg *types.Message, ts *types.TipSet) (*api.InvocResult, error) { + return sm.response, nil +} + +func TestPaychOutbound(t *testing.T) { + ctx := context.Background() + store := NewStore(ds_sync.MutexWrap(ds.NewMapDatastore())) + + ch := tutils.NewIDAddr(t, 100) + from := tutils.NewIDAddr(t, 101) + to := tutils.NewIDAddr(t, 102) + fromAcct := tutils.NewIDAddr(t, 201) + toAcct := tutils.NewIDAddr(t, 202) + + sm := newMockStateManager() + sm.setAccountState(fromAcct, account.State{Address: from}) + sm.setAccountState(toAcct, account.State{Address: to}) + sm.setPaychState(ch, nil, paych.State{ + From: fromAcct, + To: toAcct, + ToSend: big.NewInt(0), + SettlingAt: abi.ChainEpoch(0), + MinSettleHeight: abi.ChainEpoch(0), + LaneStates: []*paych.LaneState{}, + }) + + mgr := newManager(sm, store) + err := mgr.TrackOutboundChannel(ctx, ch) + require.NoError(t, err) + + ci, err := mgr.GetChannelInfo(ch) + require.NoError(t, err) + require.Equal(t, ci.Channel, ch) + require.Equal(t, ci.Control, from) + require.Equal(t, ci.Target, to) + require.EqualValues(t, ci.Direction, DirOutbound) + require.EqualValues(t, ci.NextLane, 0) + require.Len(t, ci.Vouchers, 0) +} + +func TestPaychInbound(t *testing.T) { + ctx := context.Background() + store := NewStore(ds_sync.MutexWrap(ds.NewMapDatastore())) + + ch := tutils.NewIDAddr(t, 100) + from := tutils.NewIDAddr(t, 101) + to := tutils.NewIDAddr(t, 102) + fromAcct := tutils.NewIDAddr(t, 201) + toAcct := tutils.NewIDAddr(t, 202) + + sm := newMockStateManager() + sm.setAccountState(fromAcct, account.State{Address: from}) + sm.setAccountState(toAcct, account.State{Address: to}) + sm.setPaychState(ch, nil, paych.State{ + From: fromAcct, + To: toAcct, + ToSend: big.NewInt(0), + SettlingAt: abi.ChainEpoch(0), + MinSettleHeight: abi.ChainEpoch(0), + LaneStates: []*paych.LaneState{}, + }) + + mgr := newManager(sm, store) + err := mgr.TrackInboundChannel(ctx, ch) + require.NoError(t, err) + + ci, err := mgr.GetChannelInfo(ch) + require.NoError(t, err) + require.Equal(t, ci.Channel, ch) + require.Equal(t, ci.Control, to) + require.Equal(t, ci.Target, from) + require.EqualValues(t, ci.Direction, DirInbound) + require.EqualValues(t, ci.NextLane, 0) + require.Len(t, ci.Vouchers, 0) +} + +func TestCheckVoucherValid(t *testing.T) { + ctx := context.Background() + + fromKeyPrivate, fromKeyPublic := testGenerateKeyPair(t) + randKeyPrivate, _ := testGenerateKeyPair(t) + + ch := tutils.NewIDAddr(t, 100) + from := tutils.NewSECP256K1Addr(t, string(fromKeyPublic)) + to := tutils.NewSECP256K1Addr(t, "secpTo") + fromAcct := tutils.NewActorAddr(t, "fromAct") + toAcct := tutils.NewActorAddr(t, "toAct") + + sm := newMockStateManager() + sm.setAccountState(fromAcct, account.State{Address: from}) + sm.setAccountState(toAcct, account.State{Address: to}) + + tcases := []struct { + name string + expectError bool + key []byte + actorBalance big.Int + toSend big.Int + voucherAmount big.Int + voucherLane uint64 + voucherNonce uint64 + laneStates []*paych.LaneState + }{{ + name: "passes when voucher amount < balance", + key: fromKeyPrivate, + actorBalance: big.NewInt(10), + toSend: big.NewInt(0), + voucherAmount: big.NewInt(5), + }, { + name: "fails when funds too low", + expectError: true, + key: fromKeyPrivate, + actorBalance: big.NewInt(5), + toSend: big.NewInt(0), + voucherAmount: big.NewInt(10), + }, { + name: "fails when invalid signature", + expectError: true, + key: randKeyPrivate, + actorBalance: big.NewInt(10), + toSend: big.NewInt(0), + voucherAmount: big.NewInt(5), + }, { + name: "fails when nonce too low", + expectError: true, + key: fromKeyPrivate, + actorBalance: big.NewInt(10), + toSend: big.NewInt(0), + voucherAmount: big.NewInt(5), + voucherLane: 1, + voucherNonce: 2, + laneStates: []*paych.LaneState{{ + ID: 1, + Redeemed: big.NewInt(2), + Nonce: 3, + }}, + }, { + name: "passes when nonce higher", + key: fromKeyPrivate, + actorBalance: big.NewInt(10), + toSend: big.NewInt(0), + voucherAmount: big.NewInt(5), + voucherLane: 1, + voucherNonce: 3, + laneStates: []*paych.LaneState{{ + ID: 1, + Redeemed: big.NewInt(2), + Nonce: 2, + }}, + }, { + name: "passes when nonce for different lane", + key: fromKeyPrivate, + actorBalance: big.NewInt(10), + toSend: big.NewInt(0), + voucherAmount: big.NewInt(5), + voucherLane: 2, + voucherNonce: 2, + laneStates: []*paych.LaneState{{ + ID: 1, + Redeemed: big.NewInt(2), + Nonce: 3, + }}, + }, { + name: "fails when voucher has higher nonce but lower value than lane state", + expectError: true, + key: fromKeyPrivate, + actorBalance: big.NewInt(10), + toSend: big.NewInt(0), + voucherAmount: big.NewInt(5), + voucherLane: 1, + voucherNonce: 3, + laneStates: []*paych.LaneState{{ + ID: 1, + Redeemed: big.NewInt(6), + Nonce: 2, + }}, + }, { + name: "fails when voucher + ToSend > balance", + expectError: true, + key: fromKeyPrivate, + actorBalance: big.NewInt(10), + toSend: big.NewInt(9), + voucherAmount: big.NewInt(2), + }, { + // voucher supersedes lane 1 redeemed so + // lane 1 effective redeemed = voucher amount + // + // required balance = toSend + total redeemed + // = 1 + 6 (lane1) + // = 7 + // So required balance: 7 < actor balance: 10 + name: "passes when voucher + total redeemed <= balance", + key: fromKeyPrivate, + actorBalance: big.NewInt(10), + toSend: big.NewInt(1), + voucherAmount: big.NewInt(6), + voucherLane: 1, + voucherNonce: 2, + laneStates: []*paych.LaneState{{ + ID: 1, // Lane 1 (same as voucher lane 1) + Redeemed: big.NewInt(4), + Nonce: 1, + }}, + }, { + // required balance = toSend + total redeemed + // = 1 + 4 (lane 2) + 6 (voucher lane 1) + // = 11 + // So required balance: 11 > actor balance: 10 + name: "fails when voucher + total redeemed > balance", + expectError: true, + key: fromKeyPrivate, + actorBalance: big.NewInt(10), + toSend: big.NewInt(1), + voucherAmount: big.NewInt(6), + voucherLane: 1, + voucherNonce: 1, + laneStates: []*paych.LaneState{{ + ID: 2, // Lane 2 (different from voucher lane 1) + Redeemed: big.NewInt(4), + Nonce: 1, + }}, + }} + + for _, tcase := range tcases { + tcase := tcase + t.Run(tcase.name, func(t *testing.T) { + store := NewStore(ds_sync.MutexWrap(ds.NewMapDatastore())) + + act := &types.Actor{ + Code: builtin.AccountActorCodeID, + Head: cid.Cid{}, + Nonce: 0, + Balance: tcase.actorBalance, + } + sm.setPaychState(ch, act, paych.State{ + From: fromAcct, + To: toAcct, + ToSend: tcase.toSend, + SettlingAt: abi.ChainEpoch(0), + MinSettleHeight: abi.ChainEpoch(0), + LaneStates: tcase.laneStates, + }) + + mgr := newManager(sm, store) + err := mgr.TrackInboundChannel(ctx, ch) + require.NoError(t, err) + + sv := testCreateVoucher(t, tcase.voucherLane, tcase.voucherNonce, tcase.voucherAmount, tcase.key) + + err = mgr.CheckVoucherValid(ctx, ch, sv) + if tcase.expectError { + require.Error(t, err) + } else { + require.NoError(t, err) + } + }) + } +} + +func TestCheckVoucherValidCountingAllLanes(t *testing.T) { + ctx := context.Background() + + fromKeyPrivate, fromKeyPublic := testGenerateKeyPair(t) + + ch := tutils.NewIDAddr(t, 100) + from := tutils.NewSECP256K1Addr(t, string(fromKeyPublic)) + to := tutils.NewSECP256K1Addr(t, "secpTo") + fromAcct := tutils.NewActorAddr(t, "fromAct") + toAcct := tutils.NewActorAddr(t, "toAct") + minDelta := big.NewInt(0) + + sm := newMockStateManager() + sm.setAccountState(fromAcct, account.State{Address: from}) + sm.setAccountState(toAcct, account.State{Address: to}) + + store := NewStore(ds_sync.MutexWrap(ds.NewMapDatastore())) + + actorBalance := big.NewInt(10) + toSend := big.NewInt(1) + laneStates := []*paych.LaneState{{ + ID: 1, + Nonce: 1, + Redeemed: big.NewInt(3), + }, { + ID: 2, + Nonce: 1, + Redeemed: big.NewInt(4), + }} + + act := &types.Actor{ + Code: builtin.AccountActorCodeID, + Head: cid.Cid{}, + Nonce: 0, + Balance: actorBalance, + } + sm.setPaychState(ch, act, paych.State{ + From: fromAcct, + To: toAcct, + ToSend: toSend, + SettlingAt: abi.ChainEpoch(0), + MinSettleHeight: abi.ChainEpoch(0), + LaneStates: laneStates, + }) + + mgr := newManager(sm, store) + err := mgr.TrackInboundChannel(ctx, ch) + require.NoError(t, err) + + // + // Should not be possible to add a voucher with a value such that + // + toSend > + // + // lane 1 redeemed: 3 + // voucher amount (lane 1): 6 + // lane 1 redeemed (with voucher): 6 + // + // Lane 1: 6 + // Lane 2: 4 + // toSend: 1 + // -- + // total: 11 + // + // actor balance is 10 so total is too high. + // + voucherLane := uint64(1) + voucherNonce := uint64(2) + voucherAmount := big.NewInt(6) + sv := testCreateVoucher(t, voucherLane, voucherNonce, voucherAmount, fromKeyPrivate) + err = mgr.CheckVoucherValid(ctx, ch, sv) + require.Error(t, err) + + // + // lane 1 redeemed: 3 + // voucher amount (lane 1): 4 + // lane 1 redeemed (with voucher): 4 + // + // Lane 1: 4 + // Lane 2: 4 + // toSend: 1 + // -- + // total: 9 + // + // actor balance is 10 so total is ok. + // + voucherAmount = big.NewInt(4) + sv = testCreateVoucher(t, voucherLane, voucherNonce, voucherAmount, fromKeyPrivate) + err = mgr.CheckVoucherValid(ctx, ch, sv) + require.NoError(t, err) + + // Add voucher to lane 1, so Lane 1 effective redeemed + // (with first voucher) is now 4 + _, err = mgr.AddVoucher(ctx, ch, sv, nil, minDelta) + require.NoError(t, err) + + // + // lane 1 redeemed: 4 + // voucher amount (lane 1): 6 + // lane 1 redeemed (with voucher): 6 + // + // Lane 1: 6 + // Lane 2: 4 + // toSend: 1 + // -- + // total: 11 + // + // actor balance is 10 so total is too high. + // + voucherNonce++ + voucherAmount = big.NewInt(6) + sv = testCreateVoucher(t, voucherLane, voucherNonce, voucherAmount, fromKeyPrivate) + err = mgr.CheckVoucherValid(ctx, ch, sv) + require.Error(t, err) + + // + // lane 1 redeemed: 4 + // voucher amount (lane 1): 5 + // lane 1 redeemed (with voucher): 5 + // + // Lane 1: 5 + // Lane 2: 4 + // toSend: 1 + // -- + // total: 10 + // + // actor balance is 10 so total is ok. + // + voucherAmount = big.NewInt(5) + sv = testCreateVoucher(t, voucherLane, voucherNonce, voucherAmount, fromKeyPrivate) + err = mgr.CheckVoucherValid(ctx, ch, sv) + require.NoError(t, err) +} + +func TestAddVoucherDelta(t *testing.T) { + ctx := context.Background() + + // Set up a manager with a single payment channel + mgr, ch, fromKeyPrivate := testSetupMgrWithChannel(ctx, t) + + voucherLane := uint64(1) + + // Expect error when adding a voucher whose amount is less than minDelta + minDelta := big.NewInt(2) + nonce := uint64(1) + voucherAmount := big.NewInt(1) + sv := testCreateVoucher(t, voucherLane, nonce, voucherAmount, fromKeyPrivate) + _, err := mgr.AddVoucher(ctx, ch, sv, nil, minDelta) + require.Error(t, err) + + // Expect success when adding a voucher whose amount is equal to minDelta + nonce++ + voucherAmount = big.NewInt(2) + sv = testCreateVoucher(t, voucherLane, nonce, voucherAmount, fromKeyPrivate) + delta, err := mgr.AddVoucher(ctx, ch, sv, nil, minDelta) + require.NoError(t, err) + require.EqualValues(t, delta.Int64(), 2) + + // Check that delta is correct when there's an existing voucher + nonce++ + voucherAmount = big.NewInt(5) + sv = testCreateVoucher(t, voucherLane, nonce, voucherAmount, fromKeyPrivate) + delta, err = mgr.AddVoucher(ctx, ch, sv, nil, minDelta) + require.NoError(t, err) + require.EqualValues(t, delta.Int64(), 3) + + // Check that delta is correct when voucher added to a different lane + nonce = uint64(1) + voucherAmount = big.NewInt(6) + voucherLane = uint64(2) + sv = testCreateVoucher(t, voucherLane, nonce, voucherAmount, fromKeyPrivate) + delta, err = mgr.AddVoucher(ctx, ch, sv, nil, minDelta) + require.NoError(t, err) + require.EqualValues(t, delta.Int64(), 6) +} + +func TestAddVoucherNextLane(t *testing.T) { + ctx := context.Background() + + // Set up a manager with a single payment channel + mgr, ch, fromKeyPrivate := testSetupMgrWithChannel(ctx, t) + + minDelta := big.NewInt(0) + voucherAmount := big.NewInt(2) + + // Add a voucher in lane 2 + nonce := uint64(1) + voucherLane := uint64(2) + sv := testCreateVoucher(t, voucherLane, nonce, voucherAmount, fromKeyPrivate) + _, err := mgr.AddVoucher(ctx, ch, sv, nil, minDelta) + require.NoError(t, err) + + ci, err := mgr.GetChannelInfo(ch) + require.NoError(t, err) + require.EqualValues(t, ci.NextLane, 3) + + // Add a voucher in lane 1 + voucherLane = uint64(1) + sv = testCreateVoucher(t, voucherLane, nonce, voucherAmount, fromKeyPrivate) + _, err = mgr.AddVoucher(ctx, ch, sv, nil, minDelta) + require.NoError(t, err) + + ci, err = mgr.GetChannelInfo(ch) + require.NoError(t, err) + require.EqualValues(t, ci.NextLane, 3) + + // Add a voucher in lane 5 + voucherLane = uint64(5) + sv = testCreateVoucher(t, voucherLane, nonce, voucherAmount, fromKeyPrivate) + _, err = mgr.AddVoucher(ctx, ch, sv, nil, minDelta) + require.NoError(t, err) + + ci, err = mgr.GetChannelInfo(ch) + require.NoError(t, err) + require.EqualValues(t, ci.NextLane, 6) +} + +func TestAddVoucherProof(t *testing.T) { + ctx := context.Background() + + // Set up a manager with a single payment channel + mgr, ch, fromKeyPrivate := testSetupMgrWithChannel(ctx, t) + + nonce := uint64(1) + voucherAmount := big.NewInt(1) + minDelta := big.NewInt(0) + voucherAmount = big.NewInt(2) + voucherLane := uint64(1) + + // Add a voucher with no proof + var proof []byte + sv := testCreateVoucher(t, voucherLane, nonce, voucherAmount, fromKeyPrivate) + _, err := mgr.AddVoucher(ctx, ch, sv, nil, minDelta) + require.NoError(t, err) + + // Expect one voucher with no proof + ci, err := mgr.GetChannelInfo(ch) + require.NoError(t, err) + require.Len(t, ci.Vouchers, 1) + require.Len(t, ci.Vouchers[0].Proof, 0) + + // Add same voucher with no proof + voucherLane = uint64(1) + _, err = mgr.AddVoucher(ctx, ch, sv, proof, minDelta) + require.NoError(t, err) + + // Expect one voucher with no proof + ci, err = mgr.GetChannelInfo(ch) + require.NoError(t, err) + require.Len(t, ci.Vouchers, 1) + require.Len(t, ci.Vouchers[0].Proof, 0) + + // Add same voucher with proof + proof = []byte{1} + _, err = mgr.AddVoucher(ctx, ch, sv, proof, minDelta) + require.NoError(t, err) + + // Should add proof to existing voucher + ci, err = mgr.GetChannelInfo(ch) + require.NoError(t, err) + require.Len(t, ci.Vouchers, 1) + require.Len(t, ci.Vouchers[0].Proof, 1) +} + +func TestAllocateLane(t *testing.T) { + ctx := context.Background() + + // Set up a manager with a single payment channel + mgr, ch, _ := testSetupMgrWithChannel(ctx, t) + + // First lane should be 0 + lane, err := mgr.AllocateLane(ch) + require.NoError(t, err) + require.EqualValues(t, lane, 0) + + // Next lane should be 1 + lane, err = mgr.AllocateLane(ch) + require.NoError(t, err) + require.EqualValues(t, lane, 1) +} + +func TestNextNonceForLane(t *testing.T) { + ctx := context.Background() + + // Set up a manager with a single payment channel + mgr, ch, key := testSetupMgrWithChannel(ctx, t) + + // Expect next nonce for non-existent lane to be 1 + next, err := mgr.NextNonceForLane(ctx, ch, 1) + require.NoError(t, err) + require.EqualValues(t, next, 1) + + voucherAmount := big.NewInt(1) + minDelta := big.NewInt(0) + voucherAmount = big.NewInt(2) + + // Add vouchers such that we have + // lane 1: nonce 2 + // lane 1: nonce 4 + // lane 2: nonce 7 + voucherLane := uint64(1) + for _, nonce := range []uint64{2, 4} { + voucherAmount = big.Add(voucherAmount, big.NewInt(1)) + sv := testCreateVoucher(t, voucherLane, nonce, voucherAmount, key) + _, err := mgr.AddVoucher(ctx, ch, sv, nil, minDelta) + require.NoError(t, err) + } + + voucherLane = uint64(2) + nonce := uint64(7) + sv := testCreateVoucher(t, voucherLane, nonce, voucherAmount, key) + _, err = mgr.AddVoucher(ctx, ch, sv, nil, minDelta) + require.NoError(t, err) + + // Expect next nonce for lane 1 to be 5 + next, err = mgr.NextNonceForLane(ctx, ch, 1) + require.NoError(t, err) + require.EqualValues(t, next, 5) + + // Expect next nonce for lane 2 to be 8 + next, err = mgr.NextNonceForLane(ctx, ch, 2) + require.NoError(t, err) + require.EqualValues(t, next, 8) +} + +func testSetupMgrWithChannel(ctx context.Context, t *testing.T) (*Manager, address.Address, []byte) { + fromKeyPrivate, fromKeyPublic := testGenerateKeyPair(t) + + ch := tutils.NewIDAddr(t, 100) + from := tutils.NewSECP256K1Addr(t, string(fromKeyPublic)) + to := tutils.NewSECP256K1Addr(t, "secpTo") + fromAcct := tutils.NewActorAddr(t, "fromAct") + toAcct := tutils.NewActorAddr(t, "toAct") + + sm := newMockStateManager() + sm.setAccountState(fromAcct, account.State{Address: from}) + sm.setAccountState(toAcct, account.State{Address: to}) + + act := &types.Actor{ + Code: builtin.AccountActorCodeID, + Head: cid.Cid{}, + Nonce: 0, + Balance: big.NewInt(20), + } + sm.setPaychState(ch, act, paych.State{ + From: fromAcct, + To: toAcct, + ToSend: big.NewInt(0), + SettlingAt: abi.ChainEpoch(0), + MinSettleHeight: abi.ChainEpoch(0), + LaneStates: []*paych.LaneState{}, + }) + + store := NewStore(ds_sync.MutexWrap(ds.NewMapDatastore())) + mgr := newManager(sm, store) + err := mgr.TrackInboundChannel(ctx, ch) + require.NoError(t, err) + return mgr, ch, fromKeyPrivate +} + +func testGenerateKeyPair(t *testing.T) ([]byte, []byte) { + priv, err := sigs.Generate(crypto.SigTypeSecp256k1) + require.NoError(t, err) + pub, err := sigs.ToPublic(crypto.SigTypeSecp256k1, priv) + require.NoError(t, err) + return priv, pub +} + +func testCreateVoucher(t *testing.T, voucherLane uint64, nonce uint64, voucherAmount big.Int, key []byte) *paych.SignedVoucher { + sv := &paych.SignedVoucher{ + Lane: voucherLane, + Nonce: nonce, + Amount: voucherAmount, + } + + signingBytes, err := sv.SigningBytes() + require.NoError(t, err) + sig, err := sigs.Sign(crypto.SigTypeSecp256k1, key, signingBytes) + require.NoError(t, err) + sv.Signature = sig + return sv +} diff --git a/paychmgr/simple.go b/paychmgr/simple.go index d0dee5e19..4d275a1a7 100644 --- a/paychmgr/simple.go +++ b/paychmgr/simple.go @@ -75,7 +75,7 @@ func (pm *Manager) waitForPaychCreateMsg(ctx context.Context, mcid cid.Cid) { } paychaddr := decodedReturn.RobustAddress - ci, err := pm.loadOutboundChannelInfo(ctx, paychaddr) + ci, err := pm.loadStateChannelInfo(ctx, paychaddr, DirOutbound) if err != nil { log.Errorf("loading channel info: %w", err) return diff --git a/paychmgr/state.go b/paychmgr/state.go index 6aff6bd9e..7d06a35a4 100644 --- a/paychmgr/state.go +++ b/paychmgr/state.go @@ -3,6 +3,10 @@ package paychmgr import ( "context" + "github.com/filecoin-project/specs-actors/actors/abi/big" + + "github.com/filecoin-project/specs-actors/actors/builtin/account" + "github.com/filecoin-project/go-address" "github.com/filecoin-project/specs-actors/actors/builtin/paych" xerrors "golang.org/x/xerrors" @@ -20,55 +24,91 @@ func (pm *Manager) loadPaychState(ctx context.Context, ch address.Address) (*typ return act, &pcast, nil } -func findLane(states []*paych.LaneState, lane uint64) *paych.LaneState { - var ls *paych.LaneState - for _, laneState := range states { - if uint64(laneState.ID) == lane { - ls = laneState - break - } +func (pm *Manager) loadStateChannelInfo(ctx context.Context, ch address.Address, dir uint64) (*ChannelInfo, error) { + _, st, err := pm.loadPaychState(ctx, ch) + if err != nil { + return nil, err } - return ls + + var account account.State + _, err = pm.sm.LoadActorState(ctx, st.From, &account, nil) + if err != nil { + return nil, err + } + from := account.Address + _, err = pm.sm.LoadActorState(ctx, st.To, &account, nil) + if err != nil { + return nil, err + } + to := account.Address + + ci := &ChannelInfo{ + Channel: ch, + Direction: dir, + NextLane: nextLaneFromState(st), + } + + if dir == DirOutbound { + ci.Control = from + ci.Target = to + } else { + ci.Control = to + ci.Target = from + } + + return ci, nil } -func (pm *Manager) laneState(ctx context.Context, ch address.Address, lane uint64) (paych.LaneState, error) { - _, state, err := pm.loadPaychState(ctx, ch) - if err != nil { - return paych.LaneState{}, err +func nextLaneFromState(st *paych.State) uint64 { + if len(st.LaneStates) == 0 { + return 0 } + maxLane := st.LaneStates[0].ID + for _, state := range st.LaneStates { + if state.ID > maxLane { + maxLane = state.ID + } + } + return maxLane + 1 +} + +// laneState gets the LaneStates from chain, then applies all vouchers in +// the data store over the chain state +func (pm *Manager) laneState(state *paych.State, ch address.Address) (map[uint64]*paych.LaneState, error) { // TODO: we probably want to call UpdateChannelState with all vouchers to be fully correct // (but technically dont't need to) - // TODO: make sure this is correct + laneStates := make(map[uint64]*paych.LaneState, len(state.LaneStates)) - ls := findLane(state.LaneStates, lane) - if ls == nil { - ls = &paych.LaneState{ - ID: lane, - Redeemed: types.NewInt(0), - Nonce: 0, - } + // Get the lane state from the chain + for _, laneState := range state.LaneStates { + laneStates[laneState.ID] = laneState } + // Apply locally stored vouchers vouchers, err := pm.store.VouchersForPaych(ch) - if err != nil { - if err == ErrChannelNotTracked { - return *ls, nil - } - return paych.LaneState{}, err + if err != nil && err != ErrChannelNotTracked { + return nil, err } for _, v := range vouchers { for range v.Voucher.Merges { - return paych.LaneState{}, xerrors.Errorf("paych merges not handled yet") + return nil, xerrors.Errorf("paych merges not handled yet") } - if v.Voucher.Lane != lane { - continue + // If there's a voucher for a lane that isn't in chain state just + // create it + ls, ok := laneStates[v.Voucher.Lane] + if !ok { + ls = &paych.LaneState{ + ID: v.Voucher.Lane, + Redeemed: types.NewInt(0), + Nonce: 0, + } + laneStates[v.Voucher.Lane] = ls } if v.Voucher.Nonce < ls.Nonce { - log.Warnf("Found outdated voucher: ch=%s, lane=%d, v.nonce=%d lane.nonce=%d", ch, lane, v.Voucher.Nonce, ls.Nonce) continue } @@ -76,5 +116,36 @@ func (pm *Manager) laneState(ctx context.Context, ch address.Address, lane uint6 ls.Redeemed = v.Voucher.Amount } - return *ls, nil + return laneStates, nil +} + +// Get the total redeemed amount across all lanes, after applying the voucher +func (pm *Manager) totalRedeemedWithVoucher(laneStates map[uint64]*paych.LaneState, sv *paych.SignedVoucher) (big.Int, error) { + // TODO: merges + if len(sv.Merges) != 0 { + return big.Int{}, xerrors.Errorf("dont currently support paych lane merges") + } + + total := big.NewInt(0) + for _, ls := range laneStates { + total = big.Add(total, ls.Redeemed) + } + + lane, ok := laneStates[sv.Lane] + if ok { + // If the voucher is for an existing lane, and the voucher nonce + // and is higher than the lane nonce + if sv.Nonce > lane.Nonce { + // Add the delta between the redeemed amount and the voucher + // amount to the total + delta := big.Sub(sv.Amount, lane.Redeemed) + total = big.Add(total, delta) + } + } else { + // If the voucher is *not* for an existing lane, just add its + // value (implicitly a new lane will be created for the voucher) + total = big.Add(total, sv.Amount) + } + + return total, nil } diff --git a/paychmgr/store_test.go b/paychmgr/store_test.go index 6ef407f4f..094226464 100644 --- a/paychmgr/store_test.go +++ b/paychmgr/store_test.go @@ -60,7 +60,7 @@ func TestStore(t *testing.T) { require.Len(t, vouchers, 1) // Requesting voucher for non-existent channel should error - vouchers, err = store.VouchersForPaych(tutils.NewIDAddr(t, 300)) + _, err = store.VouchersForPaych(tutils.NewIDAddr(t, 300)) require.Equal(t, err, ErrChannelNotTracked) // Allocate lane for channel @@ -74,7 +74,7 @@ func TestStore(t *testing.T) { require.Equal(t, lane, uint64(1)) // Allocate next lane for non-existent channel should error - lane, err = store.AllocateLane(tutils.NewIDAddr(t, 300)) + _, err = store.AllocateLane(tutils.NewIDAddr(t, 300)) require.Equal(t, err, ErrChannelNotTracked) } diff --git a/tools/stats/metrics.go b/tools/stats/metrics.go index 3abb4b13b..a6a3db3ad 100644 --- a/tools/stats/metrics.go +++ b/tools/stats/metrics.go @@ -176,17 +176,18 @@ func (ht *apiIpldStore) Put(ctx context.Context, v interface{}) (cid.Cid, error) } func RecordTipsetStatePoints(ctx context.Context, api api.FullNode, pl *PointList, tipset *types.TipSet) error { - pc, err := api.StatePledgeCollateral(ctx, tipset.Key()) - if err != nil { - return err - } - attoFil := types.NewInt(build.FilecoinPrecision).Int - pcFil := new(big.Rat).SetFrac(pc.Int, attoFil) - pcFilFloat, _ := pcFil.Float64() - p := NewPoint("chain.pledge_collateral", pcFilFloat) - pl.AddPoint(p) + //TODO: StatePledgeCollateral API is not implemented and is commented out - re-enable this block once the API is implemented again. + //pc, err := api.StatePledgeCollateral(ctx, tipset.Key()) + //if err != nil { + //return err + //} + + //pcFil := new(big.Rat).SetFrac(pc.Int, attoFil) + //pcFilFloat, _ := pcFil.Float64() + //p := NewPoint("chain.pledge_collateral", pcFilFloat) + //pl.AddPoint(p) netBal, err := api.WalletBalance(ctx, builtin.RewardActorAddr) if err != nil { @@ -195,7 +196,7 @@ func RecordTipsetStatePoints(ctx context.Context, api api.FullNode, pl *PointLis netBalFil := new(big.Rat).SetFrac(netBal.Int, attoFil) netBalFilFloat, _ := netBalFil.Float64() - p = NewPoint("network.balance", netBalFilFloat) + p := NewPoint("network.balance", netBalFilFloat) pl.AddPoint(p) totalPower, err := api.StateMinerPower(ctx, address.Address{}, tipset.Key())