Merge branch 'next' into mock-clock

This commit is contained in:
Raúl Kripalani 2020-07-13 11:29:53 +01:00
commit 973dbd6a13
20 changed files with 1622 additions and 352 deletions

View File

@ -4,13 +4,11 @@ import (
"bytes" "bytes"
"context" "context"
"github.com/filecoin-project/go-address"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
cbor "github.com/ipfs/go-ipld-cbor" cbor "github.com/ipfs/go-ipld-cbor"
typegen "github.com/whyrusleeping/cbor-gen" typegen "github.com/whyrusleeping/cbor-gen"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-amt-ipld/v2"
"github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/specs-actors/actors/builtin" "github.com/filecoin-project/specs-actors/actors/builtin"
"github.com/filecoin-project/specs-actors/actors/builtin/market" "github.com/filecoin-project/specs-actors/actors/builtin/market"
@ -87,20 +85,25 @@ func (sp *StatePredicates) OnStorageMarketActorChanged(diffStorageMarketState Di
}) })
} }
type DiffDealStatesFunc func(ctx context.Context, oldDealStateRoot *amt.Root, newDealStateRoot *amt.Root) (changed bool, user UserData, err error) type DiffAdtArraysFunc func(ctx context.Context, oldDealStateRoot, newDealStateRoot *adt.Array) (changed bool, user UserData, err error)
// OnDealStateChanged calls diffDealStates when the market state changes // OnDealStateChanged calls diffDealStates when the market deal state changes
func (sp *StatePredicates) OnDealStateChanged(diffDealStates DiffDealStatesFunc) DiffStorageMarketStateFunc { func (sp *StatePredicates) OnDealStateChanged(diffDealStates DiffAdtArraysFunc) DiffStorageMarketStateFunc {
return func(ctx context.Context, oldState *market.State, newState *market.State) (changed bool, user UserData, err error) { return func(ctx context.Context, oldState *market.State, newState *market.State) (changed bool, user UserData, err error) {
if oldState.States.Equals(newState.States) { if oldState.States.Equals(newState.States) {
return false, nil, nil return false, nil, nil
} }
oldRoot, err := amt.LoadAMT(ctx, sp.cst, oldState.States) ctxStore := &contextStore{
ctx: ctx,
cst: sp.cst,
}
oldRoot, err := adt.AsArray(ctxStore, oldState.States)
if err != nil { if err != nil {
return false, nil, err return false, nil, err
} }
newRoot, err := amt.LoadAMT(ctx, sp.cst, newState.States) newRoot, err := adt.AsArray(ctxStore, newState.States)
if err != nil { if err != nil {
return false, nil, err return false, nil, err
} }
@ -109,39 +112,193 @@ func (sp *StatePredicates) OnDealStateChanged(diffDealStates DiffDealStatesFunc)
} }
} }
// OnDealProposalChanged calls diffDealProps when the market proposal state changes
func (sp *StatePredicates) OnDealProposalChanged(diffDealProps DiffAdtArraysFunc) DiffStorageMarketStateFunc {
return func(ctx context.Context, oldState *market.State, newState *market.State) (changed bool, user UserData, err error) {
if oldState.Proposals.Equals(newState.Proposals) {
return false, nil, nil
}
ctxStore := &contextStore{
ctx: ctx,
cst: sp.cst,
}
oldRoot, err := adt.AsArray(ctxStore, oldState.Proposals)
if err != nil {
return false, nil, err
}
newRoot, err := adt.AsArray(ctxStore, newState.Proposals)
if err != nil {
return false, nil, err
}
return diffDealProps(ctx, oldRoot, newRoot)
}
}
var _ AdtArrayDiff = &MarketDealProposalChanges{}
type MarketDealProposalChanges struct {
Added []ProposalIDState
Removed []ProposalIDState
}
type ProposalIDState struct {
ID abi.DealID
Proposal market.DealProposal
}
func (m *MarketDealProposalChanges) Add(key uint64, val *typegen.Deferred) error {
dp := new(market.DealProposal)
err := dp.UnmarshalCBOR(bytes.NewReader(val.Raw))
if err != nil {
return err
}
m.Added = append(m.Added, ProposalIDState{abi.DealID(key), *dp})
return nil
}
func (m *MarketDealProposalChanges) Modify(key uint64, from, to *typegen.Deferred) error {
// short circuit, DealProposals are static
return nil
}
func (m *MarketDealProposalChanges) Remove(key uint64, val *typegen.Deferred) error {
dp := new(market.DealProposal)
err := dp.UnmarshalCBOR(bytes.NewReader(val.Raw))
if err != nil {
return err
}
m.Removed = append(m.Removed, ProposalIDState{abi.DealID(key), *dp})
return nil
}
// OnDealProposalAmtChanged detects changes in the deal proposal AMT for all deal proposals and returns a MarketProposalsChanges structure containing:
// - Added Proposals
// - Modified Proposals
// - Removed Proposals
func (sp *StatePredicates) OnDealProposalAmtChanged() DiffAdtArraysFunc {
return func(ctx context.Context, oldDealProps, newDealProps *adt.Array) (changed bool, user UserData, err error) {
proposalChanges := new(MarketDealProposalChanges)
if err := DiffAdtArray(oldDealProps, newDealProps, proposalChanges); err != nil {
return false, nil, err
}
if len(proposalChanges.Added)+len(proposalChanges.Removed) == 0 {
return false, nil, nil
}
return true, proposalChanges, nil
}
}
var _ AdtArrayDiff = &MarketDealStateChanges{}
type MarketDealStateChanges struct {
Added []DealIDState
Modified []DealStateChange
Removed []DealIDState
}
type DealIDState struct {
ID abi.DealID
Deal market.DealState
}
func (m *MarketDealStateChanges) Add(key uint64, val *typegen.Deferred) error {
ds := new(market.DealState)
err := ds.UnmarshalCBOR(bytes.NewReader(val.Raw))
if err != nil {
return err
}
m.Added = append(m.Added, DealIDState{abi.DealID(key), *ds})
return nil
}
func (m *MarketDealStateChanges) Modify(key uint64, from, to *typegen.Deferred) error {
dsFrom := new(market.DealState)
if err := dsFrom.UnmarshalCBOR(bytes.NewReader(from.Raw)); err != nil {
return err
}
dsTo := new(market.DealState)
if err := dsTo.UnmarshalCBOR(bytes.NewReader(to.Raw)); err != nil {
return err
}
if *dsFrom != *dsTo {
m.Modified = append(m.Modified, DealStateChange{abi.DealID(key), dsFrom, dsTo})
}
return nil
}
func (m *MarketDealStateChanges) Remove(key uint64, val *typegen.Deferred) error {
ds := new(market.DealState)
err := ds.UnmarshalCBOR(bytes.NewReader(val.Raw))
if err != nil {
return err
}
m.Removed = append(m.Removed, DealIDState{abi.DealID(key), *ds})
return nil
}
// OnDealStateAmtChanged detects changes in the deal state AMT for all deal states and returns a MarketDealStateChanges structure containing:
// - Added Deals
// - Modified Deals
// - Removed Deals
func (sp *StatePredicates) OnDealStateAmtChanged() DiffAdtArraysFunc {
return func(ctx context.Context, oldDealStates, newDealStates *adt.Array) (changed bool, user UserData, err error) {
dealStateChanges := new(MarketDealStateChanges)
if err := DiffAdtArray(oldDealStates, newDealStates, dealStateChanges); err != nil {
return false, nil, err
}
if len(dealStateChanges.Added)+len(dealStateChanges.Modified)+len(dealStateChanges.Removed) == 0 {
return false, nil, nil
}
return true, dealStateChanges, nil
}
}
// ChangedDeals is a set of changes to deal state // ChangedDeals is a set of changes to deal state
type ChangedDeals map[abi.DealID]DealStateChange type ChangedDeals map[abi.DealID]DealStateChange
// DealStateChange is a change in deal state from -> to // DealStateChange is a change in deal state from -> to
type DealStateChange struct { type DealStateChange struct {
ID abi.DealID
From *market.DealState From *market.DealState
To *market.DealState To *market.DealState
} }
// DealStateChangedForIDs detects changes in the deal state AMT for the given deal IDs // DealStateChangedForIDs detects changes in the deal state AMT for the given deal IDs
func (sp *StatePredicates) DealStateChangedForIDs(dealIds []abi.DealID) DiffDealStatesFunc { func (sp *StatePredicates) DealStateChangedForIDs(dealIds []abi.DealID) DiffAdtArraysFunc {
return func(ctx context.Context, oldDealStateRoot *amt.Root, newDealStateRoot *amt.Root) (changed bool, user UserData, err error) { return func(ctx context.Context, oldDealStateArray, newDealStateArray *adt.Array) (changed bool, user UserData, err error) {
changedDeals := make(ChangedDeals) changedDeals := make(ChangedDeals)
for _, dealID := range dealIds { for _, dealID := range dealIds {
var oldDealPtr, newDealPtr *market.DealState var oldDealPtr, newDealPtr *market.DealState
var oldDeal, newDeal market.DealState var oldDeal, newDeal market.DealState
// If the deal has been removed, we just set it to nil // If the deal has been removed, we just set it to nil
err := oldDealStateRoot.Get(ctx, uint64(dealID), &oldDeal) found, err := oldDealStateArray.Get(uint64(dealID), &oldDeal)
if err == nil { if err != nil {
oldDealPtr = &oldDeal
} else if _, ok := err.(*amt.ErrNotFound); !ok {
return false, nil, err return false, nil, err
} }
err = newDealStateRoot.Get(ctx, uint64(dealID), &newDeal) if found {
if err == nil { oldDealPtr = &oldDeal
newDealPtr = &newDeal }
} else if _, ok := err.(*amt.ErrNotFound); !ok {
found, err = newDealStateArray.Get(uint64(dealID), &newDeal)
if err != nil {
return false, nil, err return false, nil, err
} }
if found {
newDealPtr = &newDeal
}
if oldDeal != newDeal { if oldDeal != newDeal {
changedDeals[dealID] = DealStateChange{oldDealPtr, newDealPtr} changedDeals[dealID] = DealStateChange{dealID, oldDealPtr, newDealPtr}
} }
} }
if len(changedDeals) > 0 { if len(changedDeals) > 0 {

View File

@ -21,6 +21,7 @@ import (
"github.com/filecoin-project/specs-actors/actors/builtin/market" "github.com/filecoin-project/specs-actors/actors/builtin/market"
"github.com/filecoin-project/specs-actors/actors/builtin/miner" "github.com/filecoin-project/specs-actors/actors/builtin/miner"
"github.com/filecoin-project/specs-actors/actors/crypto" "github.com/filecoin-project/specs-actors/actors/crypto"
"github.com/filecoin-project/specs-actors/actors/util/adt"
tutils "github.com/filecoin-project/specs-actors/support/testing" tutils "github.com/filecoin-project/specs-actors/support/testing"
"github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types"
@ -65,73 +66,141 @@ func (m mockAPI) setActor(tsk types.TipSetKey, act *types.Actor) {
m.ts[tsk] = act m.ts[tsk] = act
} }
func TestPredicates(t *testing.T) { func TestMarketPredicates(t *testing.T) {
ctx := context.Background() ctx := context.Background()
bs := bstore.NewBlockstore(ds_sync.MutexWrap(ds.NewMapDatastore())) bs := bstore.NewBlockstore(ds_sync.MutexWrap(ds.NewMapDatastore()))
store := cbornode.NewCborStore(bs) store := cbornode.NewCborStore(bs)
oldDeals := map[abi.DealID]*market.DealState{ oldDeal1 := &market.DealState{
abi.DealID(1): {
SectorStartEpoch: 1, SectorStartEpoch: 1,
LastUpdatedEpoch: 2, LastUpdatedEpoch: 2,
}, SlashEpoch: 0,
abi.DealID(2): { }
oldDeal2 := &market.DealState{
SectorStartEpoch: 4, SectorStartEpoch: 4,
LastUpdatedEpoch: 5, LastUpdatedEpoch: 5,
}, SlashEpoch: 0,
}
oldDeals := map[abi.DealID]*market.DealState{
abi.DealID(1): oldDeal1,
abi.DealID(2): oldDeal2,
} }
oldStateC := createMarketState(ctx, t, store, oldDeals)
newDeals := map[abi.DealID]*market.DealState{ oldProp1 := &market.DealProposal{
abi.DealID(1): { PieceCID: dummyCid,
PieceSize: 0,
VerifiedDeal: false,
Client: tutils.NewIDAddr(t, 1),
Provider: tutils.NewIDAddr(t, 1),
StartEpoch: 1,
EndEpoch: 2,
StoragePricePerEpoch: big.Zero(),
ProviderCollateral: big.Zero(),
ClientCollateral: big.Zero(),
}
oldProp2 := &market.DealProposal{
PieceCID: dummyCid,
PieceSize: 0,
VerifiedDeal: false,
Client: tutils.NewIDAddr(t, 1),
Provider: tutils.NewIDAddr(t, 1),
StartEpoch: 2,
EndEpoch: 3,
StoragePricePerEpoch: big.Zero(),
ProviderCollateral: big.Zero(),
ClientCollateral: big.Zero(),
}
oldProps := map[abi.DealID]*market.DealProposal{
abi.DealID(1): oldProp1,
abi.DealID(2): oldProp2,
}
oldStateC := createMarketState(ctx, t, store, oldDeals, oldProps)
newDeal1 := &market.DealState{
SectorStartEpoch: 1, SectorStartEpoch: 1,
LastUpdatedEpoch: 3, LastUpdatedEpoch: 3,
}, SlashEpoch: 0,
} }
newStateC := createMarketState(ctx, t, store, newDeals)
miner, err := address.NewFromString("t00") // deal 2 removed
// added
newDeal3 := &market.DealState{
SectorStartEpoch: 1,
LastUpdatedEpoch: 2,
SlashEpoch: 3,
}
newDeals := map[abi.DealID]*market.DealState{
abi.DealID(1): newDeal1,
// deal 2 was removed
abi.DealID(3): newDeal3,
}
// added
newProp3 := &market.DealProposal{
PieceCID: dummyCid,
PieceSize: 0,
VerifiedDeal: false,
Client: tutils.NewIDAddr(t, 1),
Provider: tutils.NewIDAddr(t, 1),
StartEpoch: 4,
EndEpoch: 4,
StoragePricePerEpoch: big.Zero(),
ProviderCollateral: big.Zero(),
ClientCollateral: big.Zero(),
}
newProps := map[abi.DealID]*market.DealProposal{
abi.DealID(1): oldProp1, // 1 was persisted
// prop 2 was removed
abi.DealID(3): newProp3, // new
// NB: DealProposals cannot be modified, so don't test that case.
}
newStateC := createMarketState(ctx, t, store, newDeals, newProps)
minerAddr, err := address.NewFromString("t00")
require.NoError(t, err) require.NoError(t, err)
oldState, err := mockTipset(miner, 1) oldState, err := mockTipset(minerAddr, 1)
require.NoError(t, err) require.NoError(t, err)
newState, err := mockTipset(miner, 2) newState, err := mockTipset(minerAddr, 2)
require.NoError(t, err) require.NoError(t, err)
api := newMockAPI(bs) api := newMockAPI(bs)
api.setActor(oldState.Key(), &types.Actor{Head: oldStateC}) api.setActor(oldState.Key(), &types.Actor{Head: oldStateC})
api.setActor(newState.Key(), &types.Actor{Head: newStateC}) api.setActor(newState.Key(), &types.Actor{Head: newStateC})
t.Run("deal ID predicate", func(t *testing.T) {
preds := NewStatePredicates(api) preds := NewStatePredicates(api)
dealIds := []abi.DealID{abi.DealID(1), abi.DealID(2)} dealIds := []abi.DealID{abi.DealID(1), abi.DealID(2)}
diffFn := preds.OnStorageMarketActorChanged(preds.OnDealStateChanged(preds.DealStateChangedForIDs(dealIds))) diffIDFn := preds.OnStorageMarketActorChanged(preds.OnDealStateChanged(preds.DealStateChangedForIDs(dealIds)))
// Diff a state against itself: expect no change // Diff a state against itself: expect no change
changed, _, err := diffFn(ctx, oldState.Key(), oldState.Key()) changed, _, err := diffIDFn(ctx, oldState.Key(), oldState.Key())
require.NoError(t, err) require.NoError(t, err)
require.False(t, changed) require.False(t, changed)
// Diff old state against new state // Diff old state against new state
changed, val, err := diffFn(ctx, oldState.Key(), newState.Key()) changed, valIDs, err := diffIDFn(ctx, oldState.Key(), newState.Key())
require.NoError(t, err) require.NoError(t, err)
require.True(t, changed) require.True(t, changed)
changedDeals, ok := val.(ChangedDeals) changedDealIDs, ok := valIDs.(ChangedDeals)
require.True(t, ok) require.True(t, ok)
require.Len(t, changedDeals, 2) require.Len(t, changedDealIDs, 2)
require.Contains(t, changedDeals, abi.DealID(1)) require.Contains(t, changedDealIDs, abi.DealID(1))
require.Contains(t, changedDeals, abi.DealID(2)) require.Contains(t, changedDealIDs, abi.DealID(2))
deal1 := changedDeals[abi.DealID(1)] deal1 := changedDealIDs[abi.DealID(1)]
if deal1.From.LastUpdatedEpoch != 2 || deal1.To.LastUpdatedEpoch != 3 { if deal1.From.LastUpdatedEpoch != 2 || deal1.To.LastUpdatedEpoch != 3 {
t.Fatal("Unexpected change to LastUpdatedEpoch") t.Fatal("Unexpected change to LastUpdatedEpoch")
} }
deal2 := changedDeals[abi.DealID(2)] deal2 := changedDealIDs[abi.DealID(2)]
if deal2.From.LastUpdatedEpoch != 5 || deal2.To != nil { if deal2.From.LastUpdatedEpoch != 5 || deal2.To != nil {
t.Fatal("Expected To to be nil") t.Fatal("Expected To to be nil")
} }
// Diff with non-existent deal. // Diff with non-existent deal.
noDeal := []abi.DealID{3} noDeal := []abi.DealID{4}
diffNoDealFn := preds.OnStorageMarketActorChanged(preds.OnDealStateChanged(preds.DealStateChangedForIDs(noDeal))) diffNoDealFn := preds.OnStorageMarketActorChanged(preds.OnDealStateChanged(preds.DealStateChangedForIDs(noDeal)))
changed, _, err = diffNoDealFn(ctx, oldState.Key(), newState.Key()) changed, _, err = diffNoDealFn(ctx, oldState.Key(), newState.Key())
require.NoError(t, err) require.NoError(t, err)
@ -149,7 +218,7 @@ func TestPredicates(t *testing.T) {
require.False(t, changed) require.False(t, changed)
// Test that OnDealStateChanged does not call the callback if the state has not changed // Test that OnDealStateChanged does not call the callback if the state has not changed
diffDealStateFn := preds.OnDealStateChanged(func(context.Context, *amt.Root, *amt.Root) (bool, UserData, error) { diffDealStateFn := preds.OnDealStateChanged(func(context.Context, *adt.Array, *adt.Array) (bool, UserData, error) {
t.Fatal("No state change so this should not be called") t.Fatal("No state change so this should not be called")
return false, nil, nil return false, nil, nil
}) })
@ -157,6 +226,59 @@ func TestPredicates(t *testing.T) {
changed, _, err = diffDealStateFn(ctx, marketState, marketState) changed, _, err = diffDealStateFn(ctx, marketState, marketState)
require.NoError(t, err) require.NoError(t, err)
require.False(t, changed) require.False(t, changed)
})
t.Run("deal state array predicate", func(t *testing.T) {
preds := NewStatePredicates(api)
diffArrFn := preds.OnStorageMarketActorChanged(preds.OnDealStateChanged(preds.OnDealStateAmtChanged()))
changed, _, err := diffArrFn(ctx, oldState.Key(), oldState.Key())
require.NoError(t, err)
require.False(t, changed)
changed, valArr, err := diffArrFn(ctx, oldState.Key(), newState.Key())
require.NoError(t, err)
require.True(t, changed)
changedDeals, ok := valArr.(*MarketDealStateChanges)
require.True(t, ok)
require.Len(t, changedDeals.Added, 1)
require.Equal(t, abi.DealID(3), changedDeals.Added[0].ID)
require.Equal(t, *newDeal3, changedDeals.Added[0].Deal)
require.Len(t, changedDeals.Removed, 1)
require.Len(t, changedDeals.Modified, 1)
require.Equal(t, abi.DealID(1), changedDeals.Modified[0].ID)
require.Equal(t, newDeal1, changedDeals.Modified[0].To)
require.Equal(t, oldDeal1, changedDeals.Modified[0].From)
require.Equal(t, abi.DealID(2), changedDeals.Removed[0].ID)
})
t.Run("deal proposal array predicate", func(t *testing.T) {
preds := NewStatePredicates(api)
diffArrFn := preds.OnStorageMarketActorChanged(preds.OnDealProposalChanged(preds.OnDealProposalAmtChanged()))
changed, _, err := diffArrFn(ctx, oldState.Key(), oldState.Key())
require.NoError(t, err)
require.False(t, changed)
changed, valArr, err := diffArrFn(ctx, oldState.Key(), newState.Key())
require.NoError(t, err)
require.True(t, changed)
changedProps, ok := valArr.(*MarketDealProposalChanges)
require.True(t, ok)
require.Len(t, changedProps.Added, 1)
require.Equal(t, abi.DealID(3), changedProps.Added[0].ID)
require.Equal(t, *newProp3, changedProps.Added[0].Proposal)
// proposals cannot be modified -- no modified testing
require.Len(t, changedProps.Removed, 1)
require.Equal(t, abi.DealID(2), changedProps.Removed[0].ID)
require.Equal(t, *oldProp2, changedProps.Removed[0].Proposal)
})
} }
func TestMinerSectorChange(t *testing.T) { func TestMinerSectorChange(t *testing.T) {
@ -207,14 +329,15 @@ func TestMinerSectorChange(t *testing.T) {
require.True(t, ok) require.True(t, ok)
require.Equal(t, len(sectorChanges.Added), 1) require.Equal(t, len(sectorChanges.Added), 1)
require.Equal(t, sectorChanges.Added[0], si3) require.Equal(t, 1, len(sectorChanges.Added))
require.Equal(t, si3, sectorChanges.Added[0])
require.Equal(t, len(sectorChanges.Removed), 1) require.Equal(t, 1, len(sectorChanges.Removed))
require.Equal(t, sectorChanges.Removed[0], si0) require.Equal(t, si0, sectorChanges.Removed[0])
require.Equal(t, len(sectorChanges.Extended), 1) require.Equal(t, 1, len(sectorChanges.Extended))
require.Equal(t, sectorChanges.Extended[0].From, si1) require.Equal(t, si1, sectorChanges.Extended[0].From)
require.Equal(t, sectorChanges.Extended[0].To, si1Ext) require.Equal(t, si1Ext, sectorChanges.Extended[0].To)
change, val, err = minerDiffFn(ctx, oldState.Key(), oldState.Key()) change, val, err = minerDiffFn(ctx, oldState.Key(), oldState.Key())
require.NoError(t, err) require.NoError(t, err)
@ -229,20 +352,20 @@ func TestMinerSectorChange(t *testing.T) {
sectorChanges, ok = val.(*MinerSectorChanges) sectorChanges, ok = val.(*MinerSectorChanges)
require.True(t, ok) require.True(t, ok)
require.Equal(t, len(sectorChanges.Added), 1) require.Equal(t, 1, len(sectorChanges.Added))
require.Equal(t, sectorChanges.Added[0], si0) require.Equal(t, si0, sectorChanges.Added[0])
require.Equal(t, len(sectorChanges.Removed), 1) require.Equal(t, 1, len(sectorChanges.Removed))
require.Equal(t, sectorChanges.Removed[0], si3) require.Equal(t, si3, sectorChanges.Removed[0])
require.Equal(t, len(sectorChanges.Extended), 1) require.Equal(t, 1, len(sectorChanges.Extended))
require.Equal(t, sectorChanges.Extended[0].To, si1) require.Equal(t, si1, sectorChanges.Extended[0].To)
require.Equal(t, sectorChanges.Extended[0].From, si1Ext) require.Equal(t, si1Ext, sectorChanges.Extended[0].From)
} }
func mockTipset(miner address.Address, timestamp uint64) (*types.TipSet, error) { func mockTipset(minerAddr address.Address, timestamp uint64) (*types.TipSet, error) {
return types.NewTipSet([]*types.BlockHeader{{ return types.NewTipSet([]*types.BlockHeader{{
Miner: miner, Miner: minerAddr,
Height: 5, Height: 5,
ParentStateRoot: dummyCid, ParentStateRoot: dummyCid,
Messages: dummyCid, Messages: dummyCid,
@ -253,11 +376,13 @@ func mockTipset(miner address.Address, timestamp uint64) (*types.TipSet, error)
}}) }})
} }
func createMarketState(ctx context.Context, t *testing.T, store *cbornode.BasicIpldStore, deals map[abi.DealID]*market.DealState) cid.Cid { func createMarketState(ctx context.Context, t *testing.T, store *cbornode.BasicIpldStore, deals map[abi.DealID]*market.DealState, props map[abi.DealID]*market.DealProposal) cid.Cid {
rootCid := createDealAMT(ctx, t, store, deals) dealRootCid := createDealAMT(ctx, t, store, deals)
propRootCid := createProposalAMT(ctx, t, store, props)
state := createEmptyMarketState(t, store) state := createEmptyMarketState(t, store)
state.States = rootCid state.States = dealRootCid
state.Proposals = propRootCid
stateC, err := store.Put(ctx, state) stateC, err := store.Put(ctx, state)
require.NoError(t, err) require.NoError(t, err)
@ -283,6 +408,17 @@ func createDealAMT(ctx context.Context, t *testing.T, store *cbornode.BasicIpldS
return rootCid return rootCid
} }
func createProposalAMT(ctx context.Context, t *testing.T, store *cbornode.BasicIpldStore, props map[abi.DealID]*market.DealProposal) cid.Cid {
root := amt.NewAMT(store)
for dealID, prop := range props {
err := root.Set(ctx, uint64(dealID), prop)
require.NoError(t, err)
}
rootCid, err := root.Flush(ctx)
require.NoError(t, err)
return rootCid
}
func createMinerState(ctx context.Context, t *testing.T, store *cbornode.BasicIpldStore, owner, worker address.Address, sectors []miner.SectorOnChainInfo) cid.Cid { func createMinerState(ctx context.Context, t *testing.T, store *cbornode.BasicIpldStore, owner, worker address.Address, sectors []miner.SectorOnChainInfo) cid.Cid {
rootCid := createSectorsAMT(ctx, t, store, sectors) rootCid := createSectorsAMT(ctx, t, store, sectors)

View File

@ -1073,17 +1073,14 @@ func extractSyncState(ctx context.Context) *SyncerState {
// collectHeaders collects the headers from the blocks between any two tipsets. // collectHeaders collects the headers from the blocks between any two tipsets.
// //
// `from` is the heaviest/projected/target tipset we have learned about, and // `incoming` is the heaviest/projected/target tipset we have learned about, and
// `to` is usually an anchor tipset we already have in our view of the chain // `known` is usually an anchor tipset we already have in our view of the chain
// (which could be the genesis). // (which could be the genesis).
// //
// collectHeaders checks if portions of the chain are in our ChainStore; falling // collectHeaders checks if portions of the chain are in our ChainStore; falling
// down to the network to retrieve the missing parts. If during the process, any // down to the network to retrieve the missing parts. If during the process, any
// portion we receive is in our denylist (bad list), we short-circuit. // portion we receive is in our denylist (bad list), we short-circuit.
// //
// {hint/naming}: `from` and `to` is in inverse order. `from` is the highest,
// and `to` is the lowest. This method traverses the chain backwards.
//
// {hint/usage}: This is used by collectChain, which is in turn called from the // {hint/usage}: This is used by collectChain, which is in turn called from the
// main Sync method (Syncer#Sync), so it's a pretty central method. // main Sync method (Syncer#Sync), so it's a pretty central method.
// //
@ -1093,7 +1090,7 @@ func extractSyncState(ctx context.Context) *SyncerState {
// bad. // bad.
// 2. Check the consistency of beacon entries in the from tipset. We check // 2. Check the consistency of beacon entries in the from tipset. We check
// total equality of the BeaconEntries in each block. // total equality of the BeaconEntries in each block.
// 3. Travers the chain backwards, for each tipset: // 3. Traverse the chain backwards, for each tipset:
// 3a. Load it from the chainstore; if found, it move on to its parent. // 3a. Load it from the chainstore; if found, it move on to its parent.
// 3b. Query our peers via BlockSync in batches, requesting up to a // 3b. Query our peers via BlockSync in batches, requesting up to a
// maximum of 500 tipsets every time. // maximum of 500 tipsets every time.
@ -1104,40 +1101,40 @@ func extractSyncState(ctx context.Context) *SyncerState {
// //
// All throughout the process, we keep checking if the received blocks are in // All throughout the process, we keep checking if the received blocks are in
// the deny list, and short-circuit the process if so. // the deny list, and short-circuit the process if so.
func (syncer *Syncer) collectHeaders(ctx context.Context, from *types.TipSet, to *types.TipSet) ([]*types.TipSet, error) { func (syncer *Syncer) collectHeaders(ctx context.Context, incoming *types.TipSet, known *types.TipSet) ([]*types.TipSet, error) {
ctx, span := trace.StartSpan(ctx, "collectHeaders") ctx, span := trace.StartSpan(ctx, "collectHeaders")
defer span.End() defer span.End()
ss := extractSyncState(ctx) ss := extractSyncState(ctx)
span.AddAttributes( span.AddAttributes(
trace.Int64Attribute("fromHeight", int64(from.Height())), trace.Int64Attribute("incomingHeight", int64(incoming.Height())),
trace.Int64Attribute("toHeight", int64(to.Height())), trace.Int64Attribute("knownHeight", int64(known.Height())),
) )
// Check if the parents of the from block are in the denylist. // Check if the parents of the from block are in the denylist.
// i.e. if a fork of the chain has been requested that we know to be bad. // i.e. if a fork of the chain has been requested that we know to be bad.
for _, pcid := range from.Parents().Cids() { for _, pcid := range incoming.Parents().Cids() {
if reason, ok := syncer.bad.Has(pcid); ok { if reason, ok := syncer.bad.Has(pcid); ok {
newReason := reason.Linked("linked to %s", pcid) newReason := reason.Linked("linked to %s", pcid)
for _, b := range from.Cids() { for _, b := range incoming.Cids() {
syncer.bad.Add(b, newReason) syncer.bad.Add(b, newReason)
} }
return nil, xerrors.Errorf("chain linked to block marked previously as bad (%s, %s) (reason: %s)", from.Cids(), pcid, reason) return nil, xerrors.Errorf("chain linked to block marked previously as bad (%s, %s) (reason: %s)", incoming.Cids(), pcid, reason)
} }
} }
{ {
// ensure consistency of beacon entires // ensure consistency of beacon entires
targetBE := from.Blocks()[0].BeaconEntries targetBE := incoming.Blocks()[0].BeaconEntries
sorted := sort.SliceIsSorted(targetBE, func(i, j int) bool { sorted := sort.SliceIsSorted(targetBE, func(i, j int) bool {
return targetBE[i].Round < targetBE[j].Round return targetBE[i].Round < targetBE[j].Round
}) })
if !sorted { if !sorted {
syncer.bad.Add(from.Cids()[0], NewBadBlockReason(from.Cids(), "wrong order of beacon entires")) syncer.bad.Add(incoming.Cids()[0], NewBadBlockReason(incoming.Cids(), "wrong order of beacon entires"))
return nil, xerrors.Errorf("wrong order of beacon entires") return nil, xerrors.Errorf("wrong order of beacon entires")
} }
for _, bh := range from.Blocks()[1:] { for _, bh := range incoming.Blocks()[1:] {
if len(targetBE) != len(bh.BeaconEntries) { if len(targetBE) != len(bh.BeaconEntries) {
// cannot mark bad, I think @Kubuxu // cannot mark bad, I think @Kubuxu
return nil, xerrors.Errorf("tipset contained different number for beacon entires") return nil, xerrors.Errorf("tipset contained different number for beacon entires")
@ -1152,12 +1149,12 @@ func (syncer *Syncer) collectHeaders(ctx context.Context, from *types.TipSet, to
} }
} }
blockSet := []*types.TipSet{from} blockSet := []*types.TipSet{incoming}
at := from.Parents() at := incoming.Parents()
// we want to sync all the blocks until the height above the block we have // we want to sync all the blocks until the height above the block we have
untilHeight := to.Height() + 1 untilHeight := known.Height() + 1
ss.SetHeight(blockSet[len(blockSet)-1].Height()) ss.SetHeight(blockSet[len(blockSet)-1].Height())
@ -1172,7 +1169,7 @@ loop:
syncer.bad.Add(b, newReason) syncer.bad.Add(b, newReason)
} }
return nil, xerrors.Errorf("chain contained block marked previously as bad (%s, %s) (reason: %s)", from.Cids(), bc, reason) return nil, xerrors.Errorf("chain contained block marked previously as bad (%s, %s) (reason: %s)", incoming.Cids(), bc, reason)
} }
} }
@ -1221,7 +1218,7 @@ loop:
syncer.bad.Add(b, newReason) syncer.bad.Add(b, newReason)
} }
return nil, xerrors.Errorf("chain contained block marked previously as bad (%s, %s) (reason: %s)", from.Cids(), bc, reason) return nil, xerrors.Errorf("chain contained block marked previously as bad (%s, %s) (reason: %s)", incoming.Cids(), bc, reason)
} }
} }
blockSet = append(blockSet, b) blockSet = append(blockSet, b)
@ -1233,23 +1230,23 @@ loop:
at = blks[len(blks)-1].Parents() at = blks[len(blks)-1].Parents()
} }
if !types.CidArrsEqual(blockSet[len(blockSet)-1].Parents().Cids(), to.Cids()) { // base is the tipset in the candidate chain at the height equal to our known tipset height.
last := blockSet[len(blockSet)-1] if base := blockSet[len(blockSet)-1]; !types.CidArrsEqual(base.Parents().Cids(), known.Cids()) {
if last.Parents() == to.Parents() { if base.Parents() == known.Parents() {
// common case: receiving a block thats potentially part of the same tipset as our best block // common case: receiving a block thats potentially part of the same tipset as our best block
return blockSet, nil return blockSet, nil
} }
// We have now ascertained that this is *not* a 'fast forward' // We have now ascertained that this is *not* a 'fast forward'
log.Warnf("(fork detected) synced header chain (%s - %d) does not link to our best block (%s - %d)", from.Cids(), from.Height(), to.Cids(), to.Height()) log.Warnf("(fork detected) synced header chain (%s - %d) does not link to our best block (%s - %d)", incoming.Cids(), incoming.Height(), known.Cids(), known.Height())
fork, err := syncer.syncFork(ctx, last, to) fork, err := syncer.syncFork(ctx, base, known)
if err != nil { if err != nil {
if xerrors.Is(err, ErrForkTooLong) { if xerrors.Is(err, ErrForkTooLong) {
// TODO: we're marking this block bad in the same way that we mark invalid blocks bad. Maybe distinguish? // TODO: we're marking this block bad in the same way that we mark invalid blocks bad. Maybe distinguish?
log.Warn("adding forked chain to our bad tipset cache") log.Warn("adding forked chain to our bad tipset cache")
for _, b := range from.Blocks() { for _, b := range incoming.Blocks() {
syncer.bad.Add(b.Cid(), NewBadBlockReason(from.Cids(), "fork past finality")) syncer.bad.Add(b.Cid(), NewBadBlockReason(incoming.Cids(), "fork past finality"))
} }
} }
return nil, xerrors.Errorf("failed to sync fork: %w", err) return nil, xerrors.Errorf("failed to sync fork: %w", err)
@ -1269,13 +1266,13 @@ var ErrForkTooLong = fmt.Errorf("fork longer than threshold")
// If the fork is too long (build.ForkLengthThreshold), we add the entire subchain to the // If the fork is too long (build.ForkLengthThreshold), we add the entire subchain to the
// denylist. Else, we find the common ancestor, and add the missing chain // denylist. Else, we find the common ancestor, and add the missing chain
// fragment until the fork point to the returned []TipSet. // fragment until the fork point to the returned []TipSet.
func (syncer *Syncer) syncFork(ctx context.Context, from *types.TipSet, to *types.TipSet) ([]*types.TipSet, error) { func (syncer *Syncer) syncFork(ctx context.Context, incoming *types.TipSet, known *types.TipSet) ([]*types.TipSet, error) {
tips, err := syncer.Bsync.GetBlocks(ctx, from.Parents(), int(build.ForkLengthThreshold)) tips, err := syncer.Bsync.GetBlocks(ctx, incoming.Parents(), int(build.ForkLengthThreshold))
if err != nil { if err != nil {
return nil, err return nil, err
} }
nts, err := syncer.store.LoadTipSet(to.Parents()) nts, err := syncer.store.LoadTipSet(known.Parents())
if err != nil { if err != nil {
return nil, xerrors.Errorf("failed to load next local tipset: %w", err) return nil, xerrors.Errorf("failed to load next local tipset: %w", err)
} }
@ -1285,7 +1282,7 @@ func (syncer *Syncer) syncFork(ctx context.Context, from *types.TipSet, to *type
if !syncer.Genesis.Equals(nts) { if !syncer.Genesis.Equals(nts) {
return nil, xerrors.Errorf("somehow synced chain that linked back to a different genesis (bad genesis: %s)", nts.Key()) return nil, xerrors.Errorf("somehow synced chain that linked back to a different genesis (bad genesis: %s)", nts.Key())
} }
return nil, xerrors.Errorf("synced chain forked at genesis, refusing to sync") return nil, xerrors.Errorf("synced chain forked at genesis, refusing to sync; incoming: %s")
} }
if nts.Equals(tips[cur]) { if nts.Equals(tips[cur]) {

View File

@ -43,7 +43,7 @@ func TestBigIntSerializationRoundTrip(t *testing.T) {
func TestFilRoundTrip(t *testing.T) { func TestFilRoundTrip(t *testing.T) {
testValues := []string{ testValues := []string{
"0", "1", "1.001", "100.10001", "101100", "5000.01", "5000", "0 FIL", "1 FIL", "1.001 FIL", "100.10001 FIL", "101100 FIL", "5000.01 FIL", "5000 FIL",
} }
for _, v := range testValues { for _, v := range testValues {

View File

@ -13,9 +13,9 @@ type FIL BigInt
func (f FIL) String() string { func (f FIL) String() string {
r := new(big.Rat).SetFrac(f.Int, big.NewInt(int64(build.FilecoinPrecision))) r := new(big.Rat).SetFrac(f.Int, big.NewInt(int64(build.FilecoinPrecision)))
if r.Sign() == 0 { if r.Sign() == 0 {
return "0" return "0 FIL"
} }
return strings.TrimRight(strings.TrimRight(r.FloatString(18), "0"), ".") return strings.TrimRight(strings.TrimRight(r.FloatString(18), "0"), ".") + " FIL"
} }
func (f FIL) Format(s fmt.State, ch rune) { func (f FIL) Format(s fmt.State, ch rune) {
@ -28,14 +28,35 @@ func (f FIL) Format(s fmt.State, ch rune) {
} }
func ParseFIL(s string) (FIL, error) { func ParseFIL(s string) (FIL, error) {
suffix := strings.TrimLeft(s, ".1234567890")
s = s[:len(s)-len(suffix)]
var attofil bool
if suffix != "" {
norm := strings.ToLower(strings.TrimSpace(suffix))
switch norm {
case "", "fil":
case "attofil", "afil":
attofil = true
default:
return FIL{}, fmt.Errorf("unrecognized suffix: %q", suffix)
}
}
r, ok := new(big.Rat).SetString(s) r, ok := new(big.Rat).SetString(s)
if !ok { if !ok {
return FIL{}, fmt.Errorf("failed to parse %q as a decimal number", s) return FIL{}, fmt.Errorf("failed to parse %q as a decimal number", s)
} }
if !attofil {
r = r.Mul(r, big.NewRat(int64(build.FilecoinPrecision), 1)) r = r.Mul(r, big.NewRat(int64(build.FilecoinPrecision), 1))
}
if !r.IsInt() { if !r.IsInt() {
return FIL{}, fmt.Errorf("invalid FIL value: %q", s) var pref string
if attofil {
pref = "atto"
}
return FIL{}, fmt.Errorf("invalid %sFIL value: %q", pref, s)
} }
return FIL{r.Num()}, nil return FIL{r.Num()}, nil

View File

@ -485,6 +485,8 @@ var clientFindCmd = &cli.Command{
}, },
} }
const DefaultMaxRetrievePrice = 1
var clientRetrieveCmd = &cli.Command{ var clientRetrieveCmd = &cli.Command{
Name: "retrieve", Name: "retrieve",
Usage: "retrieve data from network", Usage: "retrieve data from network",
@ -502,6 +504,10 @@ var clientRetrieveCmd = &cli.Command{
Name: "miner", Name: "miner",
Usage: "miner address for retrieval, if not present it'll use local discovery", Usage: "miner address for retrieval, if not present it'll use local discovery",
}, },
&cli.StringFlag{
Name: "maxPrice",
Usage: fmt.Sprintf("maximum price the client is willing to consider (default: %d FIL)", DefaultMaxRetrievePrice),
},
&cli.StringFlag{ &cli.StringFlag{
Name: "pieceCid", Name: "pieceCid",
Usage: "require data to be retrieved from a specific Piece CID", Usage: "require data to be retrieved from a specific Piece CID",
@ -560,6 +566,11 @@ var clientRetrieveCmd = &cli.Command{
minerStrAddr := cctx.String("miner") minerStrAddr := cctx.String("miner")
if minerStrAddr == "" { // Local discovery if minerStrAddr == "" { // Local discovery
offers, err := fapi.ClientFindData(ctx, file, pieceCid) offers, err := fapi.ClientFindData(ctx, file, pieceCid)
// sort by price low to high
sort.Slice(offers, func(i, j int) bool {
return offers[i].MinPrice.LessThan(offers[j].MinPrice)
})
if err != nil { if err != nil {
return err return err
} }
@ -584,6 +595,21 @@ var clientRetrieveCmd = &cli.Command{
return fmt.Errorf("The received offer errored: %s", offer.Err) return fmt.Errorf("The received offer errored: %s", offer.Err)
} }
maxPrice := types.FromFil(DefaultMaxRetrievePrice)
if cctx.String("maxPrice") != "" {
maxPriceFil, err := types.ParseFIL(cctx.String("maxPrice"))
if err != nil {
return xerrors.Errorf("parsing maxPrice: %w", err)
}
maxPrice = types.BigInt(maxPriceFil)
}
if offer.MinPrice.GreaterThan(maxPrice) {
return xerrors.Errorf("failed to find offer satisfying maxPrice: %s", maxPrice)
}
ref := &lapi.FileRef{ ref := &lapi.FileRef{
Path: cctx.Args().Get(1), Path: cctx.Args().Get(1),
IsCAR: cctx.Bool("car"), IsCAR: cctx.Bool("car"),

View File

@ -51,7 +51,7 @@ func main() {
if err := app.Run(os.Args); err != nil { if err := app.Run(os.Args); err != nil {
log.Warnf("%+v", err) log.Warnf("%+v", err)
return os.Exit(1)
} }
} }

View File

@ -294,6 +294,30 @@ create table if not exists miner_info
primary key (miner_id) primary key (miner_id)
); );
/*
* captures chain-specific power state for any given stateroot
*/
create table if not exists chain_power
(
state_root text not null
constraint chain_power_pk
primary key,
baseline_power text not null
);
/*
* captures miner-specific power state for any given stateroot
*/
create table if not exists miner_power
(
miner_id text not null,
state_root text not null,
raw_bytes_power text not null,
quality_adjusted_power text not null,
constraint miner_power_pk
primary key (miner_id, state_root)
);
/* used to tell when a miners sectors (proven-not-yet-expired) changed if the miner_sectors_cid's are different a new sector was added or removed (terminated/expired) */ /* used to tell when a miners sectors (proven-not-yet-expired) changed if the miner_sectors_cid's are different a new sector was added or removed (terminated/expired) */
create table if not exists miner_sectors_heads create table if not exists miner_sectors_heads
( (
@ -500,6 +524,46 @@ func (st *storage) storeActors(actors map[address.Address]map[types.Actor]actorI
return nil return nil
} }
// storeChainPower captures reward actor state as it relates to power captured on-chain
func (st *storage) storeChainPower(rewardTips map[types.TipSetKey]*rewardStateInfo) error {
tx, err := st.db.Begin()
if err != nil {
return xerrors.Errorf("begin chain_power tx: %w", err)
}
if _, err := tx.Exec(`create temp table cp (like chain_power excluding constraints) on commit drop`); err != nil {
return xerrors.Errorf("prep chain_power temp: %w", err)
}
stmt, err := tx.Prepare(`copy cp (state_root, baseline_power) from STDIN`)
if err != nil {
return xerrors.Errorf("prepare tmp chain_power: %w", err)
}
for _, rewardState := range rewardTips {
if _, err := stmt.Exec(
rewardState.stateroot.String(),
rewardState.baselinePower.String(),
); err != nil {
log.Errorw("failed to store chain power", "state_root", rewardState.stateroot, "error", err)
}
}
if err := stmt.Close(); err != nil {
return xerrors.Errorf("close prepared chain_power: %w", err)
}
if _, err := tx.Exec(`insert into chain_power select * from cp on conflict do nothing`); err != nil {
return xerrors.Errorf("insert chain_power from tmp: %w", err)
}
if err := tx.Commit(); err != nil {
return xerrors.Errorf("commit chain_power tx: %w", err)
}
return nil
}
type storeSectorsAPI interface { type storeSectorsAPI interface {
StateMinerSectors(context.Context, address.Address, *abi.BitField, bool, types.TipSetKey) ([]*api.ChainSectorInfo, error) StateMinerSectors(context.Context, address.Address, *abi.BitField, bool, types.TipSetKey) ([]*api.ChainSectorInfo, error)
} }
@ -607,6 +671,50 @@ func (st *storage) storeMiners(minerTips map[types.TipSetKey][]*minerStateInfo)
return tx.Commit() return tx.Commit()
} }
// storeMinerPower captures miner actor state as it relates to power per miner captured on-chain
func (st *storage) storeMinerPower(minerTips map[types.TipSetKey][]*minerStateInfo) error {
tx, err := st.db.Begin()
if err != nil {
return xerrors.Errorf("begin miner_power tx: %w", err)
}
if _, err := tx.Exec(`create temp table mp (like miner_power excluding constraints) on commit drop`); err != nil {
return xerrors.Errorf("prep miner_power temp: %w", err)
}
stmt, err := tx.Prepare(`copy mp (miner_id, state_root, raw_bytes_power, quality_adjusted_power) from STDIN`)
if err != nil {
return xerrors.Errorf("prepare tmp miner_power: %w", err)
}
for _, miners := range minerTips {
for _, minerInfo := range miners {
if _, err := stmt.Exec(
minerInfo.addr.String(),
minerInfo.stateroot.String(),
minerInfo.rawPower.String(),
minerInfo.qalPower.String(),
); err != nil {
log.Errorw("failed to store miner power", "miner", minerInfo.addr, "stateroot", minerInfo.stateroot, "error", err)
}
}
}
if err := stmt.Close(); err != nil {
return xerrors.Errorf("close prepared miner_power: %w", err)
}
if _, err := tx.Exec(`insert into miner_power select * from mp on conflict do nothing`); err != nil {
return xerrors.Errorf("insert miner_power from tmp: %w", err)
}
if err := tx.Commit(); err != nil {
return xerrors.Errorf("commit miner_power tx: %w", err)
}
return nil
}
func (st *storage) storeMinerSectorsHeads(minerTips map[types.TipSetKey][]*minerStateInfo, api api.FullNode) error { func (st *storage) storeMinerSectorsHeads(minerTips map[types.TipSetKey][]*minerStateInfo, api api.FullNode) error {
tx, err := st.db.Begin() tx, err := st.db.Begin()
if err != nil { if err != nil {

View File

@ -13,12 +13,14 @@ import (
"github.com/filecoin-project/go-address" "github.com/filecoin-project/go-address"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
cbg "github.com/whyrusleeping/cbor-gen" cbg "github.com/whyrusleeping/cbor-gen"
"golang.org/x/xerrors"
"github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/specs-actors/actors/abi/big" "github.com/filecoin-project/specs-actors/actors/abi/big"
"github.com/filecoin-project/specs-actors/actors/builtin" "github.com/filecoin-project/specs-actors/actors/builtin"
"github.com/filecoin-project/specs-actors/actors/builtin/miner" "github.com/filecoin-project/specs-actors/actors/builtin/miner"
"github.com/filecoin-project/specs-actors/actors/builtin/power" "github.com/filecoin-project/specs-actors/actors/builtin/power"
"github.com/filecoin-project/specs-actors/actors/builtin/reward"
"github.com/filecoin-project/specs-actors/actors/util/adt" "github.com/filecoin-project/specs-actors/actors/util/adt"
"github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/api"
@ -53,6 +55,11 @@ func runSyncer(ctx context.Context, api api.FullNode, st *storage, maxBatch int)
}() }()
} }
type rewardStateInfo struct {
stateroot cid.Cid
baselinePower big.Int
}
type minerStateInfo struct { type minerStateInfo struct {
// common // common
addr address.Address addr address.Address
@ -273,6 +280,8 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types.
} }
}) })
// map of tipset to reward state
rewardTips := make(map[types.TipSetKey]*rewardStateInfo, len(changes))
// map of tipset to all miners that had a head-change at that tipset. // map of tipset to all miners that had a head-change at that tipset.
minerTips := make(map[types.TipSetKey][]*minerStateInfo, len(changes)) minerTips := make(map[types.TipSetKey][]*minerStateInfo, len(changes))
// heads we've seen, im being paranoid // heads we've seen, im being paranoid
@ -302,16 +311,22 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types.
alk.Unlock() alk.Unlock()
}) })
log.Infof("Getting miner info") log.Infof("Getting actor change info")
minerChanges := 0 minerChanges := 0
for addr, m := range actors { for addr, m := range actors {
for actor, c := range m { for actor, c := range m {
if actor.Code != builtin.StorageMinerActorCodeID { // reward actor
if actor.Code == builtin.RewardActorCodeID {
rewardTips[c.tsKey] = &rewardStateInfo{
stateroot: c.stateroot,
baselinePower: big.Zero(),
}
continue continue
} }
// only want miner actors with head change events // miner actors with head change events
if actor.Code == builtin.StorageMinerActorCodeID {
if _, found := headsSeen[actor.Head]; found { if _, found := headsSeen[actor.Head]; found {
continue continue
} }
@ -334,7 +349,35 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types.
headsSeen[actor.Head] = struct{}{} headsSeen[actor.Head] = struct{}{}
} }
continue
} }
}
rewardProcessingStartedAt := time.Now()
parmap.Par(50, parmap.KVMapArr(rewardTips), func(it func() (types.TipSetKey, *rewardStateInfo)) {
tsKey, rewardInfo := it()
// get reward actor states at each tipset once for all updates
rewardActor, err := api.StateGetActor(ctx, builtin.RewardActorAddr, tsKey)
if err != nil {
log.Error(xerrors.Errorf("get reward state (@ %s): %w", rewardInfo.stateroot.String(), err))
return
}
rewardStateRaw, err := api.ChainReadObj(ctx, rewardActor.Head)
if err != nil {
log.Error(xerrors.Errorf("read state obj (@ %s): %w", rewardInfo.stateroot.String(), err))
return
}
var rewardActorState reward.State
if err := rewardActorState.UnmarshalCBOR(bytes.NewReader(rewardStateRaw)); err != nil {
log.Error(xerrors.Errorf("unmarshal state (@ %s): %w", rewardInfo.stateroot.String(), err))
return
}
rewardInfo.baselinePower = rewardActorState.BaselinePower
})
log.Infow("Completed Reward Processing", "duration", time.Since(rewardProcessingStartedAt).String(), "processed", len(rewardTips))
minerProcessingStartedAt := time.Now() minerProcessingStartedAt := time.Now()
log.Infow("Processing miners", "numTips", len(minerTips), "numMinerChanges", minerChanges) log.Infow("Processing miners", "numTips", len(minerTips), "numMinerChanges", minerChanges)
@ -415,25 +458,35 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types.
} }
log.Info("Storing actors") log.Info("Storing actors")
if err := st.storeActors(actors); err != nil { if err := st.storeActors(actors); err != nil {
log.Error(err) log.Error(err)
return return
} }
chainPowerStartedAt := time.Now()
if err := st.storeChainPower(rewardTips); err != nil {
log.Error(err)
}
log.Infow("Stored chain power", "duration", time.Since(chainPowerStartedAt).String())
log.Info("Storing miners") log.Info("Storing miners")
if err := st.storeMiners(minerTips); err != nil { if err := st.storeMiners(minerTips); err != nil {
log.Error(err) log.Error(err)
return return
} }
log.Info("Storing miner sectors") minerPowerStartedAt := time.Now()
if err := st.storeMinerPower(minerTips); err != nil {
log.Error(err)
}
log.Infow("Stored miner power", "duration", time.Since(minerPowerStartedAt).String())
sectorStart := time.Now() sectorStart := time.Now()
if err := st.storeSectors(minerTips, api); err != nil { if err := st.storeSectors(minerTips, api); err != nil {
log.Error(err) log.Error(err)
return return
} }
log.Infow("Finished storing miner sectors", "duration", time.Since(sectorStart).String()) log.Infow("Stored miner sectors", "duration", time.Since(sectorStart).String())
log.Info("Storing miner sectors heads") log.Info("Storing miner sectors heads")
if err := st.storeMinerSectorsHeads(minerTips, api); err != nil { if err := st.storeMinerSectorsHeads(minerTips, api); err != nil {

View File

@ -236,7 +236,7 @@ var runCmd = &cli.Command{
{ {
// init datastore for r.Exists // init datastore for r.Exists
_, err := lr.Datastore("/") _, err := lr.Datastore("/metadata")
if err != nil { if err != nil {
return err return err
} }

View File

@ -3,6 +3,7 @@ package main
import ( import (
"bytes" "bytes"
"fmt" "fmt"
"github.com/filecoin-project/go-address" "github.com/filecoin-project/go-address"
"github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/build"
"github.com/urfave/cli/v2" "github.com/urfave/cli/v2"
@ -199,7 +200,7 @@ var verifRegListVerifiersCmd = &cli.Command{
return err return err
} }
vh, err := hamt.LoadNode(ctx, cst, st.Verifiers) vh, err := hamt.LoadNode(ctx, cst, st.Verifiers, hamt.UseTreeBitWidth(5))
if err != nil { if err != nil {
return err return err
} }
@ -251,7 +252,7 @@ var verifRegListClientsCmd = &cli.Command{
return err return err
} }
vh, err := hamt.LoadNode(ctx, cst, st.VerifiedClients) vh, err := hamt.LoadNode(ctx, cst, st.VerifiedClients, hamt.UseTreeBitWidth(5))
if err != nil { if err != nil {
return err return err
} }
@ -346,7 +347,7 @@ var verifRegCheckVerifierCmd = &cli.Command{
return err return err
} }
vh, err := hamt.LoadNode(ctx, cst, st.Verifiers) vh, err := hamt.LoadNode(ctx, cst, st.Verifiers, hamt.UseTreeBitWidth(5))
if err != nil { if err != nil {
return err return err
} }

View File

@ -359,10 +359,8 @@ func migratePreSealMeta(ctx context.Context, api lapi.FullNode, metadata string,
}*/ }*/
} }
log.Infof("Setting next sector ID to %d", maxSectorID+1)
buf := make([]byte, binary.MaxVarintLen64) buf := make([]byte, binary.MaxVarintLen64)
size := binary.PutUvarint(buf, uint64(maxSectorID+1)) size := binary.PutUvarint(buf, uint64(maxSectorID))
return mds.Put(datastore.NewKey(modules.StorageCounterDSPrefix), buf[:size]) return mds.Put(datastore.NewKey(modules.StorageCounterDSPrefix), buf[:size])
} }

View File

@ -18,4 +18,6 @@ func SetupLogLevels() {
_ = logging.SetLogLevel("stores", "DEBUG") _ = logging.SetLogLevel("stores", "DEBUG")
_ = logging.SetLogLevel("nat", "INFO") _ = logging.SetLogLevel("nat", "INFO")
} }
// Always mute RtRefreshManager because it breaks terminals
_ = logging.SetLogLevel("dht/RtRefreshManager", "FATAL")
} }

View File

@ -8,7 +8,6 @@ import (
blocks "github.com/ipfs/go-block-format" blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
blockstore "github.com/ipfs/go-ipfs-blockstore" blockstore "github.com/ipfs/go-ipfs-blockstore"
) )
@ -63,7 +62,7 @@ func (m *multiReadBs) Get(cid cid.Cid) (blocks.Block, error) {
} }
if merr == nil { if merr == nil {
return nil, datastore.ErrNotFound return nil, blockstore.ErrNotFound
} }
return nil, merr return nil, merr

View File

@ -4,7 +4,10 @@ import (
"bytes" "bytes"
"context" "context"
"fmt" "fmt"
"math"
"github.com/filecoin-project/specs-actors/actors/abi/big"
"github.com/filecoin-project/lotus/api"
cborutil "github.com/filecoin-project/go-cbor-util" cborutil "github.com/filecoin-project/go-cbor-util"
"github.com/filecoin-project/specs-actors/actors/builtin" "github.com/filecoin-project/specs-actors/actors/builtin"
@ -34,9 +37,14 @@ type ManagerApi struct {
full.StateAPI full.StateAPI
} }
type StateManagerApi interface {
LoadActorState(ctx context.Context, a address.Address, out interface{}, ts *types.TipSet) (*types.Actor, error)
Call(ctx context.Context, msg *types.Message, ts *types.TipSet) (*api.InvocResult, error)
}
type Manager struct { type Manager struct {
store *Store store *Store
sm *stmgr.StateManager sm StateManagerApi
mpool full.MpoolAPI mpool full.MpoolAPI
wallet full.WalletAPI wallet full.WalletAPI
@ -54,84 +62,24 @@ func NewManager(sm *stmgr.StateManager, pchstore *Store, api ManagerApi) *Manage
} }
} }
func maxLaneFromState(st *paych.State) (uint64, error) { // Used by the tests to supply mocks
maxLane := uint64(math.MaxInt64) func newManager(sm StateManagerApi, pchstore *Store) *Manager {
for _, state := range st.LaneStates { return &Manager{
if (state.ID)+1 > maxLane+1 { store: pchstore,
maxLane = state.ID sm: sm,
} }
}
return maxLane, nil
}
func (pm *Manager) TrackInboundChannel(ctx context.Context, ch address.Address) error {
_, st, err := pm.loadPaychState(ctx, ch)
if err != nil {
return err
}
var account account.State
_, err = pm.sm.LoadActorState(ctx, st.From, &account, nil)
if err != nil {
return err
}
from := account.Address
_, err = pm.sm.LoadActorState(ctx, st.To, &account, nil)
if err != nil {
return err
}
to := account.Address
maxLane, err := maxLaneFromState(st)
if err != nil {
return err
}
return pm.store.TrackChannel(&ChannelInfo{
Channel: ch,
Control: to,
Target: from,
Direction: DirInbound,
NextLane: maxLane + 1,
})
}
func (pm *Manager) loadOutboundChannelInfo(ctx context.Context, ch address.Address) (*ChannelInfo, error) {
_, st, err := pm.loadPaychState(ctx, ch)
if err != nil {
return nil, err
}
maxLane, err := maxLaneFromState(st)
if err != nil {
return nil, err
}
var account account.State
_, err = pm.sm.LoadActorState(ctx, st.From, &account, nil)
if err != nil {
return nil, err
}
from := account.Address
_, err = pm.sm.LoadActorState(ctx, st.To, &account, nil)
if err != nil {
return nil, err
}
to := account.Address
return &ChannelInfo{
Channel: ch,
Control: from,
Target: to,
Direction: DirOutbound,
NextLane: maxLane + 1,
}, nil
} }
func (pm *Manager) TrackOutboundChannel(ctx context.Context, ch address.Address) error { func (pm *Manager) TrackOutboundChannel(ctx context.Context, ch address.Address) error {
ci, err := pm.loadOutboundChannelInfo(ctx, ch) return pm.trackChannel(ctx, ch, DirOutbound)
}
func (pm *Manager) TrackInboundChannel(ctx context.Context, ch address.Address) error {
return pm.trackChannel(ctx, ch, DirInbound)
}
func (pm *Manager) trackChannel(ctx context.Context, ch address.Address, dir uint64) error {
ci, err := pm.loadStateChannelInfo(ctx, ch, dir)
if err != nil { if err != nil {
return err return err
} }
@ -147,62 +95,91 @@ func (pm *Manager) GetChannelInfo(addr address.Address) (*ChannelInfo, error) {
return pm.store.getChannelInfo(addr) return pm.store.getChannelInfo(addr)
} }
// checks if the given voucher is valid (is or could become spendable at some point) // CheckVoucherValid checks if the given voucher is valid (is or could become spendable at some point)
func (pm *Manager) CheckVoucherValid(ctx context.Context, ch address.Address, sv *paych.SignedVoucher) error { func (pm *Manager) CheckVoucherValid(ctx context.Context, ch address.Address, sv *paych.SignedVoucher) error {
act, pca, err := pm.loadPaychState(ctx, ch) _, err := pm.checkVoucherValid(ctx, ch, sv)
if err != nil {
return err return err
}
func (pm *Manager) checkVoucherValid(ctx context.Context, ch address.Address, sv *paych.SignedVoucher) (map[uint64]*paych.LaneState, error) {
act, pchState, err := pm.loadPaychState(ctx, ch)
if err != nil {
return nil, err
} }
var account account.State var account account.State
_, err = pm.sm.LoadActorState(ctx, pca.From, &account, nil) _, err = pm.sm.LoadActorState(ctx, pchState.From, &account, nil)
if err != nil { if err != nil {
return err return nil, err
} }
from := account.Address from := account.Address
// verify signature // verify signature
vb, err := sv.SigningBytes() vb, err := sv.SigningBytes()
if err != nil { if err != nil {
return err return nil, err
} }
// TODO: technically, either party may create and sign a voucher. // TODO: technically, either party may create and sign a voucher.
// However, for now, we only accept them from the channel creator. // However, for now, we only accept them from the channel creator.
// More complex handling logic can be added later // More complex handling logic can be added later
if err := sigs.Verify(sv.Signature, from, vb); err != nil { if err := sigs.Verify(sv.Signature, from, vb); err != nil {
return err return nil, err
} }
sendAmount := sv.Amount // Check the voucher against the highest known voucher nonce / value
laneStates, err := pm.laneState(pchState, ch)
// now check the lane state if err != nil {
// TODO: should check against vouchers in our local store too return nil, err
// there might be something conflicting
ls := findLane(pca.LaneStates, uint64(sv.Lane))
if ls == nil {
} else {
if (ls.Nonce) >= sv.Nonce {
return fmt.Errorf("nonce too low")
} }
sendAmount = types.BigSub(sv.Amount, ls.Redeemed) // If the new voucher nonce value is less than the highest known
// nonce for the lane
ls, lsExists := laneStates[sv.Lane]
if lsExists && sv.Nonce <= ls.Nonce {
return nil, fmt.Errorf("nonce too low")
} }
// TODO: also account for vouchers on other lanes we've received // If the voucher amount is less than the highest known voucher amount
newTotal := types.BigAdd(sendAmount, pca.ToSend) if lsExists && sv.Amount.LessThanEqual(ls.Redeemed) {
return nil, fmt.Errorf("voucher amount is lower than amount for voucher with lower nonce")
}
// Total redeemed is the total redeemed amount for all lanes, including
// the new voucher
// eg
//
// lane 1 redeemed: 3
// lane 2 redeemed: 2
// voucher for lane 1: 5
//
// Voucher supersedes lane 1 redeemed, therefore
// effective lane 1 redeemed: 5
//
// lane 1: 5
// lane 2: 2
// -
// total: 7
totalRedeemed, err := pm.totalRedeemedWithVoucher(laneStates, sv)
if err != nil {
return nil, err
}
// Total required balance = total redeemed + toSend
// Must not exceed actor balance
newTotal := types.BigAdd(totalRedeemed, pchState.ToSend)
if act.Balance.LessThan(newTotal) { if act.Balance.LessThan(newTotal) {
return fmt.Errorf("not enough funds in channel to cover voucher") return nil, fmt.Errorf("not enough funds in channel to cover voucher")
} }
if len(sv.Merges) != 0 { if len(sv.Merges) != 0 {
return fmt.Errorf("dont currently support paych lane merges") return nil, fmt.Errorf("dont currently support paych lane merges")
} }
return nil return laneStates, nil
} }
// checks if the given voucher is currently spendable // CheckVoucherSpendable checks if the given voucher is currently spendable
func (pm *Manager) CheckVoucherSpendable(ctx context.Context, ch address.Address, sv *paych.SignedVoucher, secret []byte, proof []byte) (bool, error) { func (pm *Manager) CheckVoucherSpendable(ctx context.Context, ch address.Address, sv *paych.SignedVoucher, secret []byte, proof []byte) (bool, error) {
owner, err := pm.getPaychOwner(ctx, ch) owner, err := pm.getPaychOwner(ctx, ch)
if err != nil { if err != nil {
@ -267,10 +244,6 @@ func (pm *Manager) getPaychOwner(ctx context.Context, ch address.Address) (addre
} }
func (pm *Manager) AddVoucher(ctx context.Context, ch address.Address, sv *paych.SignedVoucher, proof []byte, minDelta types.BigInt) (types.BigInt, error) { func (pm *Manager) AddVoucher(ctx context.Context, ch address.Address, sv *paych.SignedVoucher, proof []byte, minDelta types.BigInt) (types.BigInt, error) {
if err := pm.CheckVoucherValid(ctx, ch, sv); err != nil {
return types.NewInt(0), err
}
pm.store.lk.Lock() pm.store.lk.Lock()
defer pm.store.lk.Unlock() defer pm.store.lk.Unlock()
@ -279,16 +252,7 @@ func (pm *Manager) AddVoucher(ctx context.Context, ch address.Address, sv *paych
return types.NewInt(0), err return types.NewInt(0), err
} }
laneState, err := pm.laneState(ctx, ch, uint64(sv.Lane)) // Check if the voucher has already been added
if err != nil {
return types.NewInt(0), err
}
if minDelta.GreaterThan(types.NewInt(0)) && laneState.Nonce > sv.Nonce {
return types.NewInt(0), xerrors.Errorf("already storing voucher with higher nonce; %d > %d", laneState.Nonce, sv.Nonce)
}
// look for duplicates
for i, v := range ci.Vouchers { for i, v := range ci.Vouchers {
eq, err := cborutil.Equals(sv, v.Voucher) eq, err := cborutil.Equals(sv, v.Voucher)
if err != nil { if err != nil {
@ -297,15 +261,10 @@ func (pm *Manager) AddVoucher(ctx context.Context, ch address.Address, sv *paych
if !eq { if !eq {
continue continue
} }
if v.Proof != nil {
if !bytes.Equal(v.Proof, proof) {
log.Warnf("AddVoucher: multiple proofs for single voucher, storing both")
break
}
log.Warnf("AddVoucher: voucher re-added with matching proof")
return types.NewInt(0), nil
}
// This is a duplicate voucher.
// Update the proof on the existing voucher
if len(proof) > 0 && !bytes.Equal(v.Proof, proof) {
log.Warnf("AddVoucher: adding proof to stored voucher") log.Warnf("AddVoucher: adding proof to stored voucher")
ci.Vouchers[i] = &VoucherInfo{ ci.Vouchers[i] = &VoucherInfo{
Voucher: v.Voucher, Voucher: v.Voucher,
@ -315,9 +274,28 @@ func (pm *Manager) AddVoucher(ctx context.Context, ch address.Address, sv *paych
return types.NewInt(0), pm.store.putChannelInfo(ci) return types.NewInt(0), pm.store.putChannelInfo(ci)
} }
delta := types.BigSub(sv.Amount, laneState.Redeemed) // Otherwise just ignore the duplicate voucher
log.Warnf("AddVoucher: voucher re-added with matching proof")
return types.NewInt(0), nil
}
// Check voucher validity
laneStates, err := pm.checkVoucherValid(ctx, ch, sv)
if err != nil {
return types.NewInt(0), err
}
// The change in value is the delta between the voucher amount and
// the highest previous voucher amount for the lane
laneState, exists := laneStates[sv.Lane]
redeemed := big.NewInt(0)
if exists {
redeemed = laneState.Redeemed
}
delta := types.BigSub(sv.Amount, redeemed)
if minDelta.GreaterThan(delta) { if minDelta.GreaterThan(delta) {
return delta, xerrors.Errorf("addVoucher: supplied token amount too low; minD=%s, D=%s; laneAmt=%s; v.Amt=%s", minDelta, delta, laneState.Redeemed, sv.Amount) return delta, xerrors.Errorf("addVoucher: supplied token amount too low; minD=%s, D=%s; laneAmt=%s; v.Amt=%s", minDelta, delta, redeemed, sv.Amount)
} }
ci.Vouchers = append(ci.Vouchers, &VoucherInfo{ ci.Vouchers = append(ci.Vouchers, &VoucherInfo{
@ -325,7 +303,7 @@ func (pm *Manager) AddVoucher(ctx context.Context, ch address.Address, sv *paych
Proof: proof, Proof: proof,
}) })
if ci.NextLane <= (sv.Lane) { if ci.NextLane <= sv.Lane {
ci.NextLane = sv.Lane + 1 ci.NextLane = sv.Lane + 1
} }
@ -333,6 +311,7 @@ func (pm *Manager) AddVoucher(ctx context.Context, ch address.Address, sv *paych
} }
func (pm *Manager) AllocateLane(ch address.Address) (uint64, error) { func (pm *Manager) AllocateLane(ch address.Address) (uint64, error) {
// TODO: should this take into account lane state?
return pm.store.AllocateLane(ch) return pm.store.AllocateLane(ch)
} }
@ -355,6 +334,7 @@ func (pm *Manager) OutboundChanTo(from, to address.Address) (address.Address, er
} }
func (pm *Manager) NextNonceForLane(ctx context.Context, ch address.Address, lane uint64) (uint64, error) { func (pm *Manager) NextNonceForLane(ctx context.Context, ch address.Address, lane uint64) (uint64, error) {
// TODO: should this take into account lane state?
vouchers, err := pm.store.VouchersForPaych(ch) vouchers, err := pm.store.VouchersForPaych(ch)
if err != nil { if err != nil {
return 0, err return 0, err
@ -362,9 +342,9 @@ func (pm *Manager) NextNonceForLane(ctx context.Context, ch address.Address, lan
var maxnonce uint64 var maxnonce uint64
for _, v := range vouchers { for _, v := range vouchers {
if uint64(v.Voucher.Lane) == lane { if v.Voucher.Lane == lane {
if uint64(v.Voucher.Nonce) > maxnonce { if v.Voucher.Nonce > maxnonce {
maxnonce = uint64(v.Voucher.Nonce) maxnonce = v.Voucher.Nonce
} }
} }
} }

720
paychmgr/paych_test.go Normal file
View File

@ -0,0 +1,720 @@
package paychmgr
import (
"context"
"fmt"
"sync"
"testing"
"github.com/filecoin-project/specs-actors/actors/builtin"
"github.com/ipfs/go-cid"
"github.com/filecoin-project/lotus/lib/sigs"
"github.com/filecoin-project/specs-actors/actors/crypto"
"github.com/stretchr/testify/require"
"github.com/filecoin-project/specs-actors/actors/abi/big"
"github.com/filecoin-project/specs-actors/actors/abi"
tutils "github.com/filecoin-project/specs-actors/support/testing"
"github.com/filecoin-project/specs-actors/actors/builtin/paych"
"github.com/filecoin-project/specs-actors/actors/builtin/account"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/types"
ds "github.com/ipfs/go-datastore"
ds_sync "github.com/ipfs/go-datastore/sync"
)
type testPchState struct {
actor *types.Actor
state paych.State
}
type mockStateManager struct {
lk sync.Mutex
accountState map[address.Address]account.State
paychState map[address.Address]testPchState
response *api.InvocResult
}
func newMockStateManager() *mockStateManager {
return &mockStateManager{
accountState: make(map[address.Address]account.State),
paychState: make(map[address.Address]testPchState),
}
}
func (sm *mockStateManager) setAccountState(a address.Address, state account.State) {
sm.lk.Lock()
defer sm.lk.Unlock()
sm.accountState[a] = state
}
func (sm *mockStateManager) setPaychState(a address.Address, actor *types.Actor, state paych.State) {
sm.lk.Lock()
defer sm.lk.Unlock()
sm.paychState[a] = testPchState{actor, state}
}
func (sm *mockStateManager) LoadActorState(ctx context.Context, a address.Address, out interface{}, ts *types.TipSet) (*types.Actor, error) {
sm.lk.Lock()
defer sm.lk.Unlock()
if outState, ok := out.(*account.State); ok {
*outState = sm.accountState[a]
return nil, nil
}
if outState, ok := out.(*paych.State); ok {
info := sm.paychState[a]
*outState = info.state
return info.actor, nil
}
panic(fmt.Sprintf("unexpected state type %v", out))
}
func (sm *mockStateManager) Call(ctx context.Context, msg *types.Message, ts *types.TipSet) (*api.InvocResult, error) {
return sm.response, nil
}
func TestPaychOutbound(t *testing.T) {
ctx := context.Background()
store := NewStore(ds_sync.MutexWrap(ds.NewMapDatastore()))
ch := tutils.NewIDAddr(t, 100)
from := tutils.NewIDAddr(t, 101)
to := tutils.NewIDAddr(t, 102)
fromAcct := tutils.NewIDAddr(t, 201)
toAcct := tutils.NewIDAddr(t, 202)
sm := newMockStateManager()
sm.setAccountState(fromAcct, account.State{Address: from})
sm.setAccountState(toAcct, account.State{Address: to})
sm.setPaychState(ch, nil, paych.State{
From: fromAcct,
To: toAcct,
ToSend: big.NewInt(0),
SettlingAt: abi.ChainEpoch(0),
MinSettleHeight: abi.ChainEpoch(0),
LaneStates: []*paych.LaneState{},
})
mgr := newManager(sm, store)
err := mgr.TrackOutboundChannel(ctx, ch)
require.NoError(t, err)
ci, err := mgr.GetChannelInfo(ch)
require.NoError(t, err)
require.Equal(t, ci.Channel, ch)
require.Equal(t, ci.Control, from)
require.Equal(t, ci.Target, to)
require.EqualValues(t, ci.Direction, DirOutbound)
require.EqualValues(t, ci.NextLane, 0)
require.Len(t, ci.Vouchers, 0)
}
func TestPaychInbound(t *testing.T) {
ctx := context.Background()
store := NewStore(ds_sync.MutexWrap(ds.NewMapDatastore()))
ch := tutils.NewIDAddr(t, 100)
from := tutils.NewIDAddr(t, 101)
to := tutils.NewIDAddr(t, 102)
fromAcct := tutils.NewIDAddr(t, 201)
toAcct := tutils.NewIDAddr(t, 202)
sm := newMockStateManager()
sm.setAccountState(fromAcct, account.State{Address: from})
sm.setAccountState(toAcct, account.State{Address: to})
sm.setPaychState(ch, nil, paych.State{
From: fromAcct,
To: toAcct,
ToSend: big.NewInt(0),
SettlingAt: abi.ChainEpoch(0),
MinSettleHeight: abi.ChainEpoch(0),
LaneStates: []*paych.LaneState{},
})
mgr := newManager(sm, store)
err := mgr.TrackInboundChannel(ctx, ch)
require.NoError(t, err)
ci, err := mgr.GetChannelInfo(ch)
require.NoError(t, err)
require.Equal(t, ci.Channel, ch)
require.Equal(t, ci.Control, to)
require.Equal(t, ci.Target, from)
require.EqualValues(t, ci.Direction, DirInbound)
require.EqualValues(t, ci.NextLane, 0)
require.Len(t, ci.Vouchers, 0)
}
func TestCheckVoucherValid(t *testing.T) {
ctx := context.Background()
fromKeyPrivate, fromKeyPublic := testGenerateKeyPair(t)
randKeyPrivate, _ := testGenerateKeyPair(t)
ch := tutils.NewIDAddr(t, 100)
from := tutils.NewSECP256K1Addr(t, string(fromKeyPublic))
to := tutils.NewSECP256K1Addr(t, "secpTo")
fromAcct := tutils.NewActorAddr(t, "fromAct")
toAcct := tutils.NewActorAddr(t, "toAct")
sm := newMockStateManager()
sm.setAccountState(fromAcct, account.State{Address: from})
sm.setAccountState(toAcct, account.State{Address: to})
tcases := []struct {
name string
expectError bool
key []byte
actorBalance big.Int
toSend big.Int
voucherAmount big.Int
voucherLane uint64
voucherNonce uint64
laneStates []*paych.LaneState
}{{
name: "passes when voucher amount < balance",
key: fromKeyPrivate,
actorBalance: big.NewInt(10),
toSend: big.NewInt(0),
voucherAmount: big.NewInt(5),
}, {
name: "fails when funds too low",
expectError: true,
key: fromKeyPrivate,
actorBalance: big.NewInt(5),
toSend: big.NewInt(0),
voucherAmount: big.NewInt(10),
}, {
name: "fails when invalid signature",
expectError: true,
key: randKeyPrivate,
actorBalance: big.NewInt(10),
toSend: big.NewInt(0),
voucherAmount: big.NewInt(5),
}, {
name: "fails when nonce too low",
expectError: true,
key: fromKeyPrivate,
actorBalance: big.NewInt(10),
toSend: big.NewInt(0),
voucherAmount: big.NewInt(5),
voucherLane: 1,
voucherNonce: 2,
laneStates: []*paych.LaneState{{
ID: 1,
Redeemed: big.NewInt(2),
Nonce: 3,
}},
}, {
name: "passes when nonce higher",
key: fromKeyPrivate,
actorBalance: big.NewInt(10),
toSend: big.NewInt(0),
voucherAmount: big.NewInt(5),
voucherLane: 1,
voucherNonce: 3,
laneStates: []*paych.LaneState{{
ID: 1,
Redeemed: big.NewInt(2),
Nonce: 2,
}},
}, {
name: "passes when nonce for different lane",
key: fromKeyPrivate,
actorBalance: big.NewInt(10),
toSend: big.NewInt(0),
voucherAmount: big.NewInt(5),
voucherLane: 2,
voucherNonce: 2,
laneStates: []*paych.LaneState{{
ID: 1,
Redeemed: big.NewInt(2),
Nonce: 3,
}},
}, {
name: "fails when voucher has higher nonce but lower value than lane state",
expectError: true,
key: fromKeyPrivate,
actorBalance: big.NewInt(10),
toSend: big.NewInt(0),
voucherAmount: big.NewInt(5),
voucherLane: 1,
voucherNonce: 3,
laneStates: []*paych.LaneState{{
ID: 1,
Redeemed: big.NewInt(6),
Nonce: 2,
}},
}, {
name: "fails when voucher + ToSend > balance",
expectError: true,
key: fromKeyPrivate,
actorBalance: big.NewInt(10),
toSend: big.NewInt(9),
voucherAmount: big.NewInt(2),
}, {
// voucher supersedes lane 1 redeemed so
// lane 1 effective redeemed = voucher amount
//
// required balance = toSend + total redeemed
// = 1 + 6 (lane1)
// = 7
// So required balance: 7 < actor balance: 10
name: "passes when voucher + total redeemed <= balance",
key: fromKeyPrivate,
actorBalance: big.NewInt(10),
toSend: big.NewInt(1),
voucherAmount: big.NewInt(6),
voucherLane: 1,
voucherNonce: 2,
laneStates: []*paych.LaneState{{
ID: 1, // Lane 1 (same as voucher lane 1)
Redeemed: big.NewInt(4),
Nonce: 1,
}},
}, {
// required balance = toSend + total redeemed
// = 1 + 4 (lane 2) + 6 (voucher lane 1)
// = 11
// So required balance: 11 > actor balance: 10
name: "fails when voucher + total redeemed > balance",
expectError: true,
key: fromKeyPrivate,
actorBalance: big.NewInt(10),
toSend: big.NewInt(1),
voucherAmount: big.NewInt(6),
voucherLane: 1,
voucherNonce: 1,
laneStates: []*paych.LaneState{{
ID: 2, // Lane 2 (different from voucher lane 1)
Redeemed: big.NewInt(4),
Nonce: 1,
}},
}}
for _, tcase := range tcases {
tcase := tcase
t.Run(tcase.name, func(t *testing.T) {
store := NewStore(ds_sync.MutexWrap(ds.NewMapDatastore()))
act := &types.Actor{
Code: builtin.AccountActorCodeID,
Head: cid.Cid{},
Nonce: 0,
Balance: tcase.actorBalance,
}
sm.setPaychState(ch, act, paych.State{
From: fromAcct,
To: toAcct,
ToSend: tcase.toSend,
SettlingAt: abi.ChainEpoch(0),
MinSettleHeight: abi.ChainEpoch(0),
LaneStates: tcase.laneStates,
})
mgr := newManager(sm, store)
err := mgr.TrackInboundChannel(ctx, ch)
require.NoError(t, err)
sv := testCreateVoucher(t, tcase.voucherLane, tcase.voucherNonce, tcase.voucherAmount, tcase.key)
err = mgr.CheckVoucherValid(ctx, ch, sv)
if tcase.expectError {
require.Error(t, err)
} else {
require.NoError(t, err)
}
})
}
}
func TestCheckVoucherValidCountingAllLanes(t *testing.T) {
ctx := context.Background()
fromKeyPrivate, fromKeyPublic := testGenerateKeyPair(t)
ch := tutils.NewIDAddr(t, 100)
from := tutils.NewSECP256K1Addr(t, string(fromKeyPublic))
to := tutils.NewSECP256K1Addr(t, "secpTo")
fromAcct := tutils.NewActorAddr(t, "fromAct")
toAcct := tutils.NewActorAddr(t, "toAct")
minDelta := big.NewInt(0)
sm := newMockStateManager()
sm.setAccountState(fromAcct, account.State{Address: from})
sm.setAccountState(toAcct, account.State{Address: to})
store := NewStore(ds_sync.MutexWrap(ds.NewMapDatastore()))
actorBalance := big.NewInt(10)
toSend := big.NewInt(1)
laneStates := []*paych.LaneState{{
ID: 1,
Nonce: 1,
Redeemed: big.NewInt(3),
}, {
ID: 2,
Nonce: 1,
Redeemed: big.NewInt(4),
}}
act := &types.Actor{
Code: builtin.AccountActorCodeID,
Head: cid.Cid{},
Nonce: 0,
Balance: actorBalance,
}
sm.setPaychState(ch, act, paych.State{
From: fromAcct,
To: toAcct,
ToSend: toSend,
SettlingAt: abi.ChainEpoch(0),
MinSettleHeight: abi.ChainEpoch(0),
LaneStates: laneStates,
})
mgr := newManager(sm, store)
err := mgr.TrackInboundChannel(ctx, ch)
require.NoError(t, err)
//
// Should not be possible to add a voucher with a value such that
// <total lane Redeemed> + toSend > <actor balance>
//
// lane 1 redeemed: 3
// voucher amount (lane 1): 6
// lane 1 redeemed (with voucher): 6
//
// Lane 1: 6
// Lane 2: 4
// toSend: 1
// --
// total: 11
//
// actor balance is 10 so total is too high.
//
voucherLane := uint64(1)
voucherNonce := uint64(2)
voucherAmount := big.NewInt(6)
sv := testCreateVoucher(t, voucherLane, voucherNonce, voucherAmount, fromKeyPrivate)
err = mgr.CheckVoucherValid(ctx, ch, sv)
require.Error(t, err)
//
// lane 1 redeemed: 3
// voucher amount (lane 1): 4
// lane 1 redeemed (with voucher): 4
//
// Lane 1: 4
// Lane 2: 4
// toSend: 1
// --
// total: 9
//
// actor balance is 10 so total is ok.
//
voucherAmount = big.NewInt(4)
sv = testCreateVoucher(t, voucherLane, voucherNonce, voucherAmount, fromKeyPrivate)
err = mgr.CheckVoucherValid(ctx, ch, sv)
require.NoError(t, err)
// Add voucher to lane 1, so Lane 1 effective redeemed
// (with first voucher) is now 4
_, err = mgr.AddVoucher(ctx, ch, sv, nil, minDelta)
require.NoError(t, err)
//
// lane 1 redeemed: 4
// voucher amount (lane 1): 6
// lane 1 redeemed (with voucher): 6
//
// Lane 1: 6
// Lane 2: 4
// toSend: 1
// --
// total: 11
//
// actor balance is 10 so total is too high.
//
voucherNonce++
voucherAmount = big.NewInt(6)
sv = testCreateVoucher(t, voucherLane, voucherNonce, voucherAmount, fromKeyPrivate)
err = mgr.CheckVoucherValid(ctx, ch, sv)
require.Error(t, err)
//
// lane 1 redeemed: 4
// voucher amount (lane 1): 5
// lane 1 redeemed (with voucher): 5
//
// Lane 1: 5
// Lane 2: 4
// toSend: 1
// --
// total: 10
//
// actor balance is 10 so total is ok.
//
voucherAmount = big.NewInt(5)
sv = testCreateVoucher(t, voucherLane, voucherNonce, voucherAmount, fromKeyPrivate)
err = mgr.CheckVoucherValid(ctx, ch, sv)
require.NoError(t, err)
}
func TestAddVoucherDelta(t *testing.T) {
ctx := context.Background()
// Set up a manager with a single payment channel
mgr, ch, fromKeyPrivate := testSetupMgrWithChannel(ctx, t)
voucherLane := uint64(1)
// Expect error when adding a voucher whose amount is less than minDelta
minDelta := big.NewInt(2)
nonce := uint64(1)
voucherAmount := big.NewInt(1)
sv := testCreateVoucher(t, voucherLane, nonce, voucherAmount, fromKeyPrivate)
_, err := mgr.AddVoucher(ctx, ch, sv, nil, minDelta)
require.Error(t, err)
// Expect success when adding a voucher whose amount is equal to minDelta
nonce++
voucherAmount = big.NewInt(2)
sv = testCreateVoucher(t, voucherLane, nonce, voucherAmount, fromKeyPrivate)
delta, err := mgr.AddVoucher(ctx, ch, sv, nil, minDelta)
require.NoError(t, err)
require.EqualValues(t, delta.Int64(), 2)
// Check that delta is correct when there's an existing voucher
nonce++
voucherAmount = big.NewInt(5)
sv = testCreateVoucher(t, voucherLane, nonce, voucherAmount, fromKeyPrivate)
delta, err = mgr.AddVoucher(ctx, ch, sv, nil, minDelta)
require.NoError(t, err)
require.EqualValues(t, delta.Int64(), 3)
// Check that delta is correct when voucher added to a different lane
nonce = uint64(1)
voucherAmount = big.NewInt(6)
voucherLane = uint64(2)
sv = testCreateVoucher(t, voucherLane, nonce, voucherAmount, fromKeyPrivate)
delta, err = mgr.AddVoucher(ctx, ch, sv, nil, minDelta)
require.NoError(t, err)
require.EqualValues(t, delta.Int64(), 6)
}
func TestAddVoucherNextLane(t *testing.T) {
ctx := context.Background()
// Set up a manager with a single payment channel
mgr, ch, fromKeyPrivate := testSetupMgrWithChannel(ctx, t)
minDelta := big.NewInt(0)
voucherAmount := big.NewInt(2)
// Add a voucher in lane 2
nonce := uint64(1)
voucherLane := uint64(2)
sv := testCreateVoucher(t, voucherLane, nonce, voucherAmount, fromKeyPrivate)
_, err := mgr.AddVoucher(ctx, ch, sv, nil, minDelta)
require.NoError(t, err)
ci, err := mgr.GetChannelInfo(ch)
require.NoError(t, err)
require.EqualValues(t, ci.NextLane, 3)
// Add a voucher in lane 1
voucherLane = uint64(1)
sv = testCreateVoucher(t, voucherLane, nonce, voucherAmount, fromKeyPrivate)
_, err = mgr.AddVoucher(ctx, ch, sv, nil, minDelta)
require.NoError(t, err)
ci, err = mgr.GetChannelInfo(ch)
require.NoError(t, err)
require.EqualValues(t, ci.NextLane, 3)
// Add a voucher in lane 5
voucherLane = uint64(5)
sv = testCreateVoucher(t, voucherLane, nonce, voucherAmount, fromKeyPrivate)
_, err = mgr.AddVoucher(ctx, ch, sv, nil, minDelta)
require.NoError(t, err)
ci, err = mgr.GetChannelInfo(ch)
require.NoError(t, err)
require.EqualValues(t, ci.NextLane, 6)
}
func TestAddVoucherProof(t *testing.T) {
ctx := context.Background()
// Set up a manager with a single payment channel
mgr, ch, fromKeyPrivate := testSetupMgrWithChannel(ctx, t)
nonce := uint64(1)
voucherAmount := big.NewInt(1)
minDelta := big.NewInt(0)
voucherAmount = big.NewInt(2)
voucherLane := uint64(1)
// Add a voucher with no proof
var proof []byte
sv := testCreateVoucher(t, voucherLane, nonce, voucherAmount, fromKeyPrivate)
_, err := mgr.AddVoucher(ctx, ch, sv, nil, minDelta)
require.NoError(t, err)
// Expect one voucher with no proof
ci, err := mgr.GetChannelInfo(ch)
require.NoError(t, err)
require.Len(t, ci.Vouchers, 1)
require.Len(t, ci.Vouchers[0].Proof, 0)
// Add same voucher with no proof
voucherLane = uint64(1)
_, err = mgr.AddVoucher(ctx, ch, sv, proof, minDelta)
require.NoError(t, err)
// Expect one voucher with no proof
ci, err = mgr.GetChannelInfo(ch)
require.NoError(t, err)
require.Len(t, ci.Vouchers, 1)
require.Len(t, ci.Vouchers[0].Proof, 0)
// Add same voucher with proof
proof = []byte{1}
_, err = mgr.AddVoucher(ctx, ch, sv, proof, minDelta)
require.NoError(t, err)
// Should add proof to existing voucher
ci, err = mgr.GetChannelInfo(ch)
require.NoError(t, err)
require.Len(t, ci.Vouchers, 1)
require.Len(t, ci.Vouchers[0].Proof, 1)
}
func TestAllocateLane(t *testing.T) {
ctx := context.Background()
// Set up a manager with a single payment channel
mgr, ch, _ := testSetupMgrWithChannel(ctx, t)
// First lane should be 0
lane, err := mgr.AllocateLane(ch)
require.NoError(t, err)
require.EqualValues(t, lane, 0)
// Next lane should be 1
lane, err = mgr.AllocateLane(ch)
require.NoError(t, err)
require.EqualValues(t, lane, 1)
}
func TestNextNonceForLane(t *testing.T) {
ctx := context.Background()
// Set up a manager with a single payment channel
mgr, ch, key := testSetupMgrWithChannel(ctx, t)
// Expect next nonce for non-existent lane to be 1
next, err := mgr.NextNonceForLane(ctx, ch, 1)
require.NoError(t, err)
require.EqualValues(t, next, 1)
voucherAmount := big.NewInt(1)
minDelta := big.NewInt(0)
voucherAmount = big.NewInt(2)
// Add vouchers such that we have
// lane 1: nonce 2
// lane 1: nonce 4
// lane 2: nonce 7
voucherLane := uint64(1)
for _, nonce := range []uint64{2, 4} {
voucherAmount = big.Add(voucherAmount, big.NewInt(1))
sv := testCreateVoucher(t, voucherLane, nonce, voucherAmount, key)
_, err := mgr.AddVoucher(ctx, ch, sv, nil, minDelta)
require.NoError(t, err)
}
voucherLane = uint64(2)
nonce := uint64(7)
sv := testCreateVoucher(t, voucherLane, nonce, voucherAmount, key)
_, err = mgr.AddVoucher(ctx, ch, sv, nil, minDelta)
require.NoError(t, err)
// Expect next nonce for lane 1 to be 5
next, err = mgr.NextNonceForLane(ctx, ch, 1)
require.NoError(t, err)
require.EqualValues(t, next, 5)
// Expect next nonce for lane 2 to be 8
next, err = mgr.NextNonceForLane(ctx, ch, 2)
require.NoError(t, err)
require.EqualValues(t, next, 8)
}
func testSetupMgrWithChannel(ctx context.Context, t *testing.T) (*Manager, address.Address, []byte) {
fromKeyPrivate, fromKeyPublic := testGenerateKeyPair(t)
ch := tutils.NewIDAddr(t, 100)
from := tutils.NewSECP256K1Addr(t, string(fromKeyPublic))
to := tutils.NewSECP256K1Addr(t, "secpTo")
fromAcct := tutils.NewActorAddr(t, "fromAct")
toAcct := tutils.NewActorAddr(t, "toAct")
sm := newMockStateManager()
sm.setAccountState(fromAcct, account.State{Address: from})
sm.setAccountState(toAcct, account.State{Address: to})
act := &types.Actor{
Code: builtin.AccountActorCodeID,
Head: cid.Cid{},
Nonce: 0,
Balance: big.NewInt(20),
}
sm.setPaychState(ch, act, paych.State{
From: fromAcct,
To: toAcct,
ToSend: big.NewInt(0),
SettlingAt: abi.ChainEpoch(0),
MinSettleHeight: abi.ChainEpoch(0),
LaneStates: []*paych.LaneState{},
})
store := NewStore(ds_sync.MutexWrap(ds.NewMapDatastore()))
mgr := newManager(sm, store)
err := mgr.TrackInboundChannel(ctx, ch)
require.NoError(t, err)
return mgr, ch, fromKeyPrivate
}
func testGenerateKeyPair(t *testing.T) ([]byte, []byte) {
priv, err := sigs.Generate(crypto.SigTypeSecp256k1)
require.NoError(t, err)
pub, err := sigs.ToPublic(crypto.SigTypeSecp256k1, priv)
require.NoError(t, err)
return priv, pub
}
func testCreateVoucher(t *testing.T, voucherLane uint64, nonce uint64, voucherAmount big.Int, key []byte) *paych.SignedVoucher {
sv := &paych.SignedVoucher{
Lane: voucherLane,
Nonce: nonce,
Amount: voucherAmount,
}
signingBytes, err := sv.SigningBytes()
require.NoError(t, err)
sig, err := sigs.Sign(crypto.SigTypeSecp256k1, key, signingBytes)
require.NoError(t, err)
sv.Signature = sig
return sv
}

View File

@ -75,7 +75,7 @@ func (pm *Manager) waitForPaychCreateMsg(ctx context.Context, mcid cid.Cid) {
} }
paychaddr := decodedReturn.RobustAddress paychaddr := decodedReturn.RobustAddress
ci, err := pm.loadOutboundChannelInfo(ctx, paychaddr) ci, err := pm.loadStateChannelInfo(ctx, paychaddr, DirOutbound)
if err != nil { if err != nil {
log.Errorf("loading channel info: %w", err) log.Errorf("loading channel info: %w", err)
return return

View File

@ -3,6 +3,10 @@ package paychmgr
import ( import (
"context" "context"
"github.com/filecoin-project/specs-actors/actors/abi/big"
"github.com/filecoin-project/specs-actors/actors/builtin/account"
"github.com/filecoin-project/go-address" "github.com/filecoin-project/go-address"
"github.com/filecoin-project/specs-actors/actors/builtin/paych" "github.com/filecoin-project/specs-actors/actors/builtin/paych"
xerrors "golang.org/x/xerrors" xerrors "golang.org/x/xerrors"
@ -20,55 +24,91 @@ func (pm *Manager) loadPaychState(ctx context.Context, ch address.Address) (*typ
return act, &pcast, nil return act, &pcast, nil
} }
func findLane(states []*paych.LaneState, lane uint64) *paych.LaneState { func (pm *Manager) loadStateChannelInfo(ctx context.Context, ch address.Address, dir uint64) (*ChannelInfo, error) {
var ls *paych.LaneState _, st, err := pm.loadPaychState(ctx, ch)
for _, laneState := range states { if err != nil {
if uint64(laneState.ID) == lane { return nil, err
ls = laneState
break
} }
var account account.State
_, err = pm.sm.LoadActorState(ctx, st.From, &account, nil)
if err != nil {
return nil, err
} }
return ls from := account.Address
_, err = pm.sm.LoadActorState(ctx, st.To, &account, nil)
if err != nil {
return nil, err
}
to := account.Address
ci := &ChannelInfo{
Channel: ch,
Direction: dir,
NextLane: nextLaneFromState(st),
}
if dir == DirOutbound {
ci.Control = from
ci.Target = to
} else {
ci.Control = to
ci.Target = from
}
return ci, nil
} }
func (pm *Manager) laneState(ctx context.Context, ch address.Address, lane uint64) (paych.LaneState, error) { func nextLaneFromState(st *paych.State) uint64 {
_, state, err := pm.loadPaychState(ctx, ch) if len(st.LaneStates) == 0 {
if err != nil { return 0
return paych.LaneState{}, err
} }
maxLane := st.LaneStates[0].ID
for _, state := range st.LaneStates {
if state.ID > maxLane {
maxLane = state.ID
}
}
return maxLane + 1
}
// laneState gets the LaneStates from chain, then applies all vouchers in
// the data store over the chain state
func (pm *Manager) laneState(state *paych.State, ch address.Address) (map[uint64]*paych.LaneState, error) {
// TODO: we probably want to call UpdateChannelState with all vouchers to be fully correct // TODO: we probably want to call UpdateChannelState with all vouchers to be fully correct
// (but technically dont't need to) // (but technically dont't need to)
// TODO: make sure this is correct laneStates := make(map[uint64]*paych.LaneState, len(state.LaneStates))
ls := findLane(state.LaneStates, lane) // Get the lane state from the chain
if ls == nil { for _, laneState := range state.LaneStates {
ls = &paych.LaneState{ laneStates[laneState.ID] = laneState
ID: lane,
Redeemed: types.NewInt(0),
Nonce: 0,
}
} }
// Apply locally stored vouchers
vouchers, err := pm.store.VouchersForPaych(ch) vouchers, err := pm.store.VouchersForPaych(ch)
if err != nil { if err != nil && err != ErrChannelNotTracked {
if err == ErrChannelNotTracked { return nil, err
return *ls, nil
}
return paych.LaneState{}, err
} }
for _, v := range vouchers { for _, v := range vouchers {
for range v.Voucher.Merges { for range v.Voucher.Merges {
return paych.LaneState{}, xerrors.Errorf("paych merges not handled yet") return nil, xerrors.Errorf("paych merges not handled yet")
} }
if v.Voucher.Lane != lane { // If there's a voucher for a lane that isn't in chain state just
continue // create it
ls, ok := laneStates[v.Voucher.Lane]
if !ok {
ls = &paych.LaneState{
ID: v.Voucher.Lane,
Redeemed: types.NewInt(0),
Nonce: 0,
}
laneStates[v.Voucher.Lane] = ls
} }
if v.Voucher.Nonce < ls.Nonce { if v.Voucher.Nonce < ls.Nonce {
log.Warnf("Found outdated voucher: ch=%s, lane=%d, v.nonce=%d lane.nonce=%d", ch, lane, v.Voucher.Nonce, ls.Nonce)
continue continue
} }
@ -76,5 +116,36 @@ func (pm *Manager) laneState(ctx context.Context, ch address.Address, lane uint6
ls.Redeemed = v.Voucher.Amount ls.Redeemed = v.Voucher.Amount
} }
return *ls, nil return laneStates, nil
}
// Get the total redeemed amount across all lanes, after applying the voucher
func (pm *Manager) totalRedeemedWithVoucher(laneStates map[uint64]*paych.LaneState, sv *paych.SignedVoucher) (big.Int, error) {
// TODO: merges
if len(sv.Merges) != 0 {
return big.Int{}, xerrors.Errorf("dont currently support paych lane merges")
}
total := big.NewInt(0)
for _, ls := range laneStates {
total = big.Add(total, ls.Redeemed)
}
lane, ok := laneStates[sv.Lane]
if ok {
// If the voucher is for an existing lane, and the voucher nonce
// and is higher than the lane nonce
if sv.Nonce > lane.Nonce {
// Add the delta between the redeemed amount and the voucher
// amount to the total
delta := big.Sub(sv.Amount, lane.Redeemed)
total = big.Add(total, delta)
}
} else {
// If the voucher is *not* for an existing lane, just add its
// value (implicitly a new lane will be created for the voucher)
total = big.Add(total, sv.Amount)
}
return total, nil
} }

View File

@ -60,7 +60,7 @@ func TestStore(t *testing.T) {
require.Len(t, vouchers, 1) require.Len(t, vouchers, 1)
// Requesting voucher for non-existent channel should error // Requesting voucher for non-existent channel should error
vouchers, err = store.VouchersForPaych(tutils.NewIDAddr(t, 300)) _, err = store.VouchersForPaych(tutils.NewIDAddr(t, 300))
require.Equal(t, err, ErrChannelNotTracked) require.Equal(t, err, ErrChannelNotTracked)
// Allocate lane for channel // Allocate lane for channel
@ -74,7 +74,7 @@ func TestStore(t *testing.T) {
require.Equal(t, lane, uint64(1)) require.Equal(t, lane, uint64(1))
// Allocate next lane for non-existent channel should error // Allocate next lane for non-existent channel should error
lane, err = store.AllocateLane(tutils.NewIDAddr(t, 300)) _, err = store.AllocateLane(tutils.NewIDAddr(t, 300))
require.Equal(t, err, ErrChannelNotTracked) require.Equal(t, err, ErrChannelNotTracked)
} }

View File

@ -176,17 +176,18 @@ func (ht *apiIpldStore) Put(ctx context.Context, v interface{}) (cid.Cid, error)
} }
func RecordTipsetStatePoints(ctx context.Context, api api.FullNode, pl *PointList, tipset *types.TipSet) error { func RecordTipsetStatePoints(ctx context.Context, api api.FullNode, pl *PointList, tipset *types.TipSet) error {
pc, err := api.StatePledgeCollateral(ctx, tipset.Key())
if err != nil {
return err
}
attoFil := types.NewInt(build.FilecoinPrecision).Int attoFil := types.NewInt(build.FilecoinPrecision).Int
pcFil := new(big.Rat).SetFrac(pc.Int, attoFil) //TODO: StatePledgeCollateral API is not implemented and is commented out - re-enable this block once the API is implemented again.
pcFilFloat, _ := pcFil.Float64() //pc, err := api.StatePledgeCollateral(ctx, tipset.Key())
p := NewPoint("chain.pledge_collateral", pcFilFloat) //if err != nil {
pl.AddPoint(p) //return err
//}
//pcFil := new(big.Rat).SetFrac(pc.Int, attoFil)
//pcFilFloat, _ := pcFil.Float64()
//p := NewPoint("chain.pledge_collateral", pcFilFloat)
//pl.AddPoint(p)
netBal, err := api.WalletBalance(ctx, builtin.RewardActorAddr) netBal, err := api.WalletBalance(ctx, builtin.RewardActorAddr)
if err != nil { if err != nil {
@ -195,7 +196,7 @@ func RecordTipsetStatePoints(ctx context.Context, api api.FullNode, pl *PointLis
netBalFil := new(big.Rat).SetFrac(netBal.Int, attoFil) netBalFil := new(big.Rat).SetFrac(netBal.Int, attoFil)
netBalFilFloat, _ := netBalFil.Float64() netBalFilFloat, _ := netBalFil.Float64()
p = NewPoint("network.balance", netBalFilFloat) p := NewPoint("network.balance", netBalFilFloat)
pl.AddPoint(p) pl.AddPoint(p)
totalPower, err := api.StateMinerPower(ctx, address.Address{}, tipset.Key()) totalPower, err := api.StateMinerPower(ctx, address.Address{}, tipset.Key())