From 5fa3021a79d8c337816ac3e049b1e2cf4180ac5d Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Wed, 1 Jul 2020 17:29:11 -0700 Subject: [PATCH 01/25] set proper bitwidth for verifreg command hamts --- cmd/lotus-shed/verifreg.go | 25 +++++++++++++++++++------ 1 file changed, 19 insertions(+), 6 deletions(-) diff --git a/cmd/lotus-shed/verifreg.go b/cmd/lotus-shed/verifreg.go index a448fcaf9..b8fd3fe9c 100644 --- a/cmd/lotus-shed/verifreg.go +++ b/cmd/lotus-shed/verifreg.go @@ -3,7 +3,9 @@ package main import ( "bytes" "fmt" + "github.com/filecoin-project/lotus/build" + "golang.org/x/xerrors" "github.com/filecoin-project/go-address" "github.com/urfave/cli/v2" @@ -199,7 +201,7 @@ var verifRegListVerifiersCmd = &cli.Command{ return err } - vh, err := hamt.LoadNode(ctx, cst, st.Verifiers) + vh, err := hamt.LoadNode(ctx, cst, st.Verifiers, hamt.UseTreeBitWidth(5)) if err != nil { return err } @@ -251,11 +253,12 @@ var verifRegListClientsCmd = &cli.Command{ return err } - vh, err := hamt.LoadNode(ctx, cst, st.VerifiedClients) + vh, err := hamt.LoadNode(ctx, cst, st.VerifiedClients, hamt.UseTreeBitWidth(5)) if err != nil { return err } + var keys []string if err := vh.ForEach(ctx, func(k string, val interface{}) error { addr, err := address.NewFromBytes([]byte(k)) if err != nil { @@ -268,13 +271,22 @@ var verifRegListClientsCmd = &cli.Command{ return err } - fmt.Printf("%s: %s\n", addr, dcap) + fmt.Printf("%s: %s %v\n", addr, dcap, []byte(k)) + + keys = append(keys, k) return nil }); err != nil { return err } + for _, k := range keys { + _, err := vh.FindRaw(ctx, k) + if err != nil { + fmt.Println("failed to find key: ", []byte(k), err) + } + } + return nil }, } @@ -312,14 +324,15 @@ var verifRegCheckClientCmd = &cli.Command{ return err } - vh, err := hamt.LoadNode(ctx, cst, st.VerifiedClients) + vh, err := hamt.LoadNode(ctx, cst, st.VerifiedClients, hamt.UseTreeBitWidth(5)) if err != nil { return err } + fmt.Println("key: ", caddr.Bytes()) var dcap verifreg.DataCap if err := vh.Find(ctx, string(caddr.Bytes()), &dcap); err != nil { - return err + return xerrors.Errorf("failed to lookup address: %w", err) } fmt.Println(dcap) @@ -361,7 +374,7 @@ var verifRegCheckVerifierCmd = &cli.Command{ return err } - vh, err := hamt.LoadNode(ctx, cst, st.Verifiers) + vh, err := hamt.LoadNode(ctx, cst, st.Verifiers, hamt.UseTreeBitWidth(5)) if err != nil { return err } From 0285a9741cd01a30405069588f7db3b7b0df2195 Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Tue, 7 Jul 2020 16:43:30 -0700 Subject: [PATCH 02/25] remove debug logs --- cmd/lotus-shed/verifreg.go | 13 +------------ 1 file changed, 1 insertion(+), 12 deletions(-) diff --git a/cmd/lotus-shed/verifreg.go b/cmd/lotus-shed/verifreg.go index b8fd3fe9c..e7187f029 100644 --- a/cmd/lotus-shed/verifreg.go +++ b/cmd/lotus-shed/verifreg.go @@ -258,7 +258,6 @@ var verifRegListClientsCmd = &cli.Command{ return err } - var keys []string if err := vh.ForEach(ctx, func(k string, val interface{}) error { addr, err := address.NewFromBytes([]byte(k)) if err != nil { @@ -271,22 +270,13 @@ var verifRegListClientsCmd = &cli.Command{ return err } - fmt.Printf("%s: %s %v\n", addr, dcap, []byte(k)) - - keys = append(keys, k) + fmt.Printf("%s: %s\n", addr, dcap) return nil }); err != nil { return err } - for _, k := range keys { - _, err := vh.FindRaw(ctx, k) - if err != nil { - fmt.Println("failed to find key: ", []byte(k), err) - } - } - return nil }, } @@ -329,7 +319,6 @@ var verifRegCheckClientCmd = &cli.Command{ return err } - fmt.Println("key: ", caddr.Bytes()) var dcap verifreg.DataCap if err := vh.Find(ctx, string(caddr.Bytes()), &dcap); err != nil { return xerrors.Errorf("failed to lookup address: %w", err) From 8fbeebb86d0b64fe95e3fd835c0f74e8df080037 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Wed, 8 Jul 2020 22:02:28 +0200 Subject: [PATCH 03/25] rename sync variables. --- chain/sync.go | 59 ++++++++++++++++++++++++--------------------------- 1 file changed, 28 insertions(+), 31 deletions(-) diff --git a/chain/sync.go b/chain/sync.go index ba4b73567..0de624a99 100644 --- a/chain/sync.go +++ b/chain/sync.go @@ -1069,17 +1069,14 @@ func extractSyncState(ctx context.Context) *SyncerState { // collectHeaders collects the headers from the blocks between any two tipsets. // -// `from` is the heaviest/projected/target tipset we have learned about, and -// `to` is usually an anchor tipset we already have in our view of the chain +// `incoming` is the heaviest/projected/target tipset we have learned about, and +// `known` is usually an anchor tipset we already have in our view of the chain // (which could be the genesis). // // collectHeaders checks if portions of the chain are in our ChainStore; falling // down to the network to retrieve the missing parts. If during the process, any // portion we receive is in our denylist (bad list), we short-circuit. // -// {hint/naming}: `from` and `to` is in inverse order. `from` is the highest, -// and `to` is the lowest. This method traverses the chain backwards. -// // {hint/usage}: This is used by collectChain, which is in turn called from the // main Sync method (Syncer#Sync), so it's a pretty central method. // @@ -1089,7 +1086,7 @@ func extractSyncState(ctx context.Context) *SyncerState { // bad. // 2. Check the consistency of beacon entries in the from tipset. We check // total equality of the BeaconEntries in each block. -// 3. Travers the chain backwards, for each tipset: +// 3. Traverse the chain backwards, for each tipset: // 3a. Load it from the chainstore; if found, it move on to its parent. // 3b. Query our peers via BlockSync in batches, requesting up to a // maximum of 500 tipsets every time. @@ -1100,40 +1097,40 @@ func extractSyncState(ctx context.Context) *SyncerState { // // All throughout the process, we keep checking if the received blocks are in // the deny list, and short-circuit the process if so. -func (syncer *Syncer) collectHeaders(ctx context.Context, from *types.TipSet, to *types.TipSet) ([]*types.TipSet, error) { +func (syncer *Syncer) collectHeaders(ctx context.Context, incoming *types.TipSet, known *types.TipSet) ([]*types.TipSet, error) { ctx, span := trace.StartSpan(ctx, "collectHeaders") defer span.End() ss := extractSyncState(ctx) span.AddAttributes( - trace.Int64Attribute("fromHeight", int64(from.Height())), - trace.Int64Attribute("toHeight", int64(to.Height())), + trace.Int64Attribute("fromHeight", int64(incoming.Height())), + trace.Int64Attribute("toHeight", int64(known.Height())), ) // Check if the parents of the from block are in the denylist. // i.e. if a fork of the chain has been requested that we know to be bad. - for _, pcid := range from.Parents().Cids() { + for _, pcid := range incoming.Parents().Cids() { if reason, ok := syncer.bad.Has(pcid); ok { newReason := reason.Linked("linked to %s", pcid) - for _, b := range from.Cids() { + for _, b := range incoming.Cids() { syncer.bad.Add(b, newReason) } - return nil, xerrors.Errorf("chain linked to block marked previously as bad (%s, %s) (reason: %s)", from.Cids(), pcid, reason) + return nil, xerrors.Errorf("chain linked to block marked previously as bad (%s, %s) (reason: %s)", incoming.Cids(), pcid, reason) } } { // ensure consistency of beacon entires - targetBE := from.Blocks()[0].BeaconEntries + targetBE := incoming.Blocks()[0].BeaconEntries sorted := sort.SliceIsSorted(targetBE, func(i, j int) bool { return targetBE[i].Round < targetBE[j].Round }) if !sorted { - syncer.bad.Add(from.Cids()[0], NewBadBlockReason(from.Cids(), "wrong order of beacon entires")) + syncer.bad.Add(incoming.Cids()[0], NewBadBlockReason(incoming.Cids(), "wrong order of beacon entires")) return nil, xerrors.Errorf("wrong order of beacon entires") } - for _, bh := range from.Blocks()[1:] { + for _, bh := range incoming.Blocks()[1:] { if len(targetBE) != len(bh.BeaconEntries) { // cannot mark bad, I think @Kubuxu return nil, xerrors.Errorf("tipset contained different number for beacon entires") @@ -1148,12 +1145,12 @@ func (syncer *Syncer) collectHeaders(ctx context.Context, from *types.TipSet, to } } - blockSet := []*types.TipSet{from} + blockSet := []*types.TipSet{incoming} - at := from.Parents() + at := incoming.Parents() // we want to sync all the blocks until the height above the block we have - untilHeight := to.Height() + 1 + untilHeight := known.Height() + 1 ss.SetHeight(blockSet[len(blockSet)-1].Height()) @@ -1168,7 +1165,7 @@ loop: syncer.bad.Add(b, newReason) } - return nil, xerrors.Errorf("chain contained block marked previously as bad (%s, %s) (reason: %s)", from.Cids(), bc, reason) + return nil, xerrors.Errorf("chain contained block marked previously as bad (%s, %s) (reason: %s)", incoming.Cids(), bc, reason) } } @@ -1217,7 +1214,7 @@ loop: syncer.bad.Add(b, newReason) } - return nil, xerrors.Errorf("chain contained block marked previously as bad (%s, %s) (reason: %s)", from.Cids(), bc, reason) + return nil, xerrors.Errorf("chain contained block marked previously as bad (%s, %s) (reason: %s)", incoming.Cids(), bc, reason) } } blockSet = append(blockSet, b) @@ -1229,23 +1226,23 @@ loop: at = blks[len(blks)-1].Parents() } - if !types.CidArrsEqual(blockSet[len(blockSet)-1].Parents().Cids(), to.Cids()) { - last := blockSet[len(blockSet)-1] - if last.Parents() == to.Parents() { + // base is the tipset in the candidate chain at the height equal to our known tipset height. + if base := blockSet[len(blockSet)-1]; !types.CidArrsEqual(base.Parents().Cids(), known.Cids()) { + if base.Parents() == known.Parents() { // common case: receiving a block thats potentially part of the same tipset as our best block return blockSet, nil } // We have now ascertained that this is *not* a 'fast forward' - log.Warnf("(fork detected) synced header chain (%s - %d) does not link to our best block (%s - %d)", from.Cids(), from.Height(), to.Cids(), to.Height()) - fork, err := syncer.syncFork(ctx, last, to) + log.Warnf("(fork detected) synced header chain (%s - %d) does not link to our best block (%s - %d)", incoming.Cids(), incoming.Height(), known.Cids(), known.Height()) + fork, err := syncer.syncFork(ctx, base, known) if err != nil { if xerrors.Is(err, ErrForkTooLong) { // TODO: we're marking this block bad in the same way that we mark invalid blocks bad. Maybe distinguish? log.Warn("adding forked chain to our bad tipset cache") - for _, b := range from.Blocks() { - syncer.bad.Add(b.Cid(), NewBadBlockReason(from.Cids(), "fork past finality")) + for _, b := range incoming.Blocks() { + syncer.bad.Add(b.Cid(), NewBadBlockReason(incoming.Cids(), "fork past finality")) } } return nil, xerrors.Errorf("failed to sync fork: %w", err) @@ -1265,13 +1262,13 @@ var ErrForkTooLong = fmt.Errorf("fork longer than threshold") // If the fork is too long (build.ForkLengthThreshold), we add the entire subchain to the // denylist. Else, we find the common ancestor, and add the missing chain // fragment until the fork point to the returned []TipSet. -func (syncer *Syncer) syncFork(ctx context.Context, from *types.TipSet, to *types.TipSet) ([]*types.TipSet, error) { - tips, err := syncer.Bsync.GetBlocks(ctx, from.Parents(), int(build.ForkLengthThreshold)) +func (syncer *Syncer) syncFork(ctx context.Context, incoming *types.TipSet, known *types.TipSet) ([]*types.TipSet, error) { + tips, err := syncer.Bsync.GetBlocks(ctx, incoming.Parents(), int(build.ForkLengthThreshold)) if err != nil { return nil, err } - nts, err := syncer.store.LoadTipSet(to.Parents()) + nts, err := syncer.store.LoadTipSet(known.Parents()) if err != nil { return nil, xerrors.Errorf("failed to load next local tipset: %w", err) } @@ -1281,7 +1278,7 @@ func (syncer *Syncer) syncFork(ctx context.Context, from *types.TipSet, to *type if !syncer.Genesis.Equals(nts) { return nil, xerrors.Errorf("somehow synced chain that linked back to a different genesis (bad genesis: %s)", nts.Key()) } - return nil, xerrors.Errorf("synced chain forked at genesis, refusing to sync") + return nil, xerrors.Errorf("synced chain forked at genesis, refusing to sync; incoming: %s") } if nts.Equals(tips[cur]) { From b9effac437816baace9a79bedba273798f52d598 Mon Sep 17 00:00:00 2001 From: Mike Greenberg Date: Wed, 8 Jul 2020 18:30:24 -0400 Subject: [PATCH 04/25] feat(chainwatch): Capture baseline power in chain_power --- cmd/lotus-chainwatch/storage.go | 51 ++++++++++++++++ cmd/lotus-chainwatch/sync.go | 103 +++++++++++++++++++++++--------- 2 files changed, 126 insertions(+), 28 deletions(-) diff --git a/cmd/lotus-chainwatch/storage.go b/cmd/lotus-chainwatch/storage.go index db2b5abfd..9647c9f47 100644 --- a/cmd/lotus-chainwatch/storage.go +++ b/cmd/lotus-chainwatch/storage.go @@ -294,6 +294,17 @@ create table if not exists miner_info primary key (miner_id) ); +/* +* captures chain-specific power state for any given stateroot +*/ +create table if not exists chain_power +( + stateroot text not null + constraint chain_power_pk + primary key, + baseline_power text not null +); + /* used to tell when a miners sectors (proven-not-yet-expired) changed if the miner_sectors_cid's are different a new sector was added or removed (terminated/expired) */ create table if not exists miner_sectors_heads ( @@ -500,6 +511,46 @@ func (st *storage) storeActors(actors map[address.Address]map[types.Actor]actorI return nil } +// storeChainPower captures reward actor state as it relates to power captured on-chain +func (st *storage) storeChainPower(rewardTips map[types.TipSetKey]*rewardStateInfo) error { + tx, err := st.db.Begin() + if err != nil { + return xerrors.Errorf("begin chain_power tx: %w", err) + } + + if _, err := tx.Exec(`create temp table cp (like chain_power excluding constraints) on commit drop`); err != nil { + return xerrors.Errorf("prep chain_power temp: %w", err) + } + + stmt, err := tx.Prepare(`copy cp (stateroot, baseline_power) from STDIN`) + if err != nil { + return xerrors.Errorf("prepare tmp chain_power: %w", err) + } + + for _, rewardState := range rewardTips { + if _, err := stmt.Exec( + rewardState.stateroot.String(), + rewardState.baselinePower.String(), + ); err != nil { + return xerrors.Errorf("exec prepared chain_power: %w", err) + } + } + + if err := stmt.Close(); err != nil { + return xerrors.Errorf("close prepared chain_power: %w", err) + } + + if _, err := tx.Exec(`insert into chain_power select * from cp on conflict do nothing`); err != nil { + return xerrors.Errorf("insert chain_power from tmp: %w", err) + } + + if err := tx.Commit(); err != nil { + return xerrors.Errorf("commit chain_power tx: %w", err) + } + + return nil +} + type storeSectorsAPI interface { StateMinerSectors(context.Context, address.Address, *abi.BitField, bool, types.TipSetKey) ([]*api.ChainSectorInfo, error) } diff --git a/cmd/lotus-chainwatch/sync.go b/cmd/lotus-chainwatch/sync.go index bbefb3e2c..fe4970ffb 100644 --- a/cmd/lotus-chainwatch/sync.go +++ b/cmd/lotus-chainwatch/sync.go @@ -13,12 +13,14 @@ import ( "github.com/filecoin-project/go-address" "github.com/ipfs/go-cid" cbg "github.com/whyrusleeping/cbor-gen" + "golang.org/x/xerrors" "github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/abi/big" "github.com/filecoin-project/specs-actors/actors/builtin" "github.com/filecoin-project/specs-actors/actors/builtin/miner" "github.com/filecoin-project/specs-actors/actors/builtin/power" + "github.com/filecoin-project/specs-actors/actors/builtin/reward" "github.com/filecoin-project/specs-actors/actors/util/adt" "github.com/filecoin-project/lotus/api" @@ -53,6 +55,11 @@ func runSyncer(ctx context.Context, api api.FullNode, st *storage, maxBatch int) }() } +type rewardStateInfo struct { + stateroot cid.Cid + baselinePower big.Int +} + type minerStateInfo struct { // common addr address.Address @@ -273,6 +280,8 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types. } }) + // map of tipset to reward state + rewardTips := make(map[types.TipSetKey]*rewardStateInfo, len(changes)) // map of tipset to all miners that had a head-change at that tipset. minerTips := make(map[types.TipSetKey][]*minerStateInfo, len(changes)) // heads we've seen, im being paranoid @@ -302,40 +311,74 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types. alk.Unlock() }) - log.Infof("Getting miner info") + log.Infof("Getting actor change info") minerChanges := 0 for addr, m := range actors { for actor, c := range m { + // reward actor + if actor.Code != builtin.RewardActorCodeID { + rewardTips[c.tsKey] = &rewardStateInfo{ + stateroot: c.stateroot, + baselinePower: big.Zero(), + } + continue + } + + // miner actors with head change events if actor.Code != builtin.StorageMinerActorCodeID { - continue + if _, found := headsSeen[actor.Head]; found { + continue + } + minerChanges++ + + minerTips[c.tsKey] = append(minerTips[c.tsKey], &minerStateInfo{ + addr: addr, + act: actor, + stateroot: c.stateroot, + + tsKey: c.tsKey, + parentTsKey: c.parentTsKey, + + state: miner.State{}, + info: miner.MinerInfo{}, + + rawPower: big.Zero(), + qalPower: big.Zero(), + }) + + headsSeen[actor.Head] = struct{}{} } - - // only want miner actors with head change events - if _, found := headsSeen[actor.Head]; found { - continue - } - minerChanges++ - - minerTips[c.tsKey] = append(minerTips[c.tsKey], &minerStateInfo{ - addr: addr, - act: actor, - stateroot: c.stateroot, - - tsKey: c.tsKey, - parentTsKey: c.parentTsKey, - - state: miner.State{}, - info: miner.MinerInfo{}, - - rawPower: big.Zero(), - qalPower: big.Zero(), - }) - - headsSeen[actor.Head] = struct{}{} + continue } } + rewardProcessingStartedAt := time.Now() + parmap.Par(50, parmap.KVMapArr(rewardTips), func(it func() (types.TipSetKey, *rewardStateInfo)) { + tsKey, rewardInfo := it() + // get reward actor states at each tipset once for all updates + rewardActor, err := api.StateGetActor(ctx, builtin.RewardActorAddr, tsKey) + if err != nil { + log.Error(xerrors.Errorf("get reward state (@ %s): %w", rewardInfo.stateroot.String(), err)) + return + } + + rewardStateRaw, err := api.ChainReadObj(ctx, rewardActor.Head) + if err != nil { + log.Error(xerrors.Errorf("read state obj (@ %s): %w", rewardInfo.stateroot.String(), err)) + return + } + + var rewardActorState reward.State + if err := rewardActorState.UnmarshalCBOR(bytes.NewReader(rewardStateRaw)); err != nil { + log.Error(xerrors.Errorf("unmarshal state (@ %s): %w", rewardInfo.stateroot.String(), err)) + return + } + + rewardInfo.baselinePower = rewardActorState.BaselinePower + }) + log.Infow("Completed Reward Processing", "duration", time.Since(rewardProcessingStartedAt).String(), "processed", len(rewardTips)) + minerProcessingStartedAt := time.Now() log.Infow("Processing miners", "numTips", len(minerTips), "numMinerChanges", minerChanges) // extract the power actor state at each tipset, loop over all miners that changed at said tipset and extract their @@ -411,25 +454,29 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types. } log.Info("Storing actors") - if err := st.storeActors(actors); err != nil { log.Error(err) return } + chainPowerStartedAt := time.Now() + if err := st.storeChainPower(rewardTips); err != nil { + log.Error(err) + } + log.Infow("Stored chain power", "duration", time.Since(chainPowerStartedAt).String()) + log.Info("Storing miners") if err := st.storeMiners(minerTips); err != nil { log.Error(err) return } - log.Info("Storing miner sectors") sectorStart := time.Now() if err := st.storeSectors(minerTips, api); err != nil { log.Error(err) return } - log.Infow("Finished storing miner sectors", "duration", time.Since(sectorStart).String()) + log.Infow("Stored miner sectors", "duration", time.Since(sectorStart).String()) log.Info("Storing miner sectors heads") if err := st.storeMinerSectorsHeads(minerTips, api); err != nil { From 108167936075dfac41b26444274b0dcb3c530c29 Mon Sep 17 00:00:00 2001 From: Mike Greenberg Date: Wed, 8 Jul 2020 18:30:24 -0400 Subject: [PATCH 05/25] feat(chainwatch): Capture miner_power data --- cmd/lotus-chainwatch/storage.go | 59 ++++++++++++++++++++++++++++++++- cmd/lotus-chainwatch/sync.go | 6 ++++ 2 files changed, 64 insertions(+), 1 deletion(-) diff --git a/cmd/lotus-chainwatch/storage.go b/cmd/lotus-chainwatch/storage.go index 9647c9f47..c48435829 100644 --- a/cmd/lotus-chainwatch/storage.go +++ b/cmd/lotus-chainwatch/storage.go @@ -305,6 +305,19 @@ create table if not exists chain_power baseline_power text not null ); +/* +* captures miner-specific power state for any given stateroot +*/ +create table if not exists miner_power +( + miner_id text not null, + stateroot text not null, + raw_bytes_power text not null, + quality_adjusted_power text not null, + constraint miner_power_pk + primary key (miner_id, stateroot) +); + /* used to tell when a miners sectors (proven-not-yet-expired) changed if the miner_sectors_cid's are different a new sector was added or removed (terminated/expired) */ create table if not exists miner_sectors_heads ( @@ -532,7 +545,7 @@ func (st *storage) storeChainPower(rewardTips map[types.TipSetKey]*rewardStateIn rewardState.stateroot.String(), rewardState.baselinePower.String(), ); err != nil { - return xerrors.Errorf("exec prepared chain_power: %w", err) + log.Errorw("failed to store chain power", "stateroot", rewardState.stateroot, "error", err) } } @@ -658,6 +671,50 @@ func (st *storage) storeMiners(minerTips map[types.TipSetKey][]*minerStateInfo) return tx.Commit() } +// storeMinerPower captures miner actor state as it relates to power per miner captured on-chain +func (st *storage) storeMinerPower(minerTips map[types.TipSetKey][]*minerStateInfo) error { + tx, err := st.db.Begin() + if err != nil { + return xerrors.Errorf("begin miner_power tx: %w", err) + } + + if _, err := tx.Exec(`create temp table mp (like miner_power excluding constraints) on commit drop`); err != nil { + return xerrors.Errorf("prep miner_power temp: %w", err) + } + + stmt, err := tx.Prepare(`copy cp (miner_id, stateroot, raw_bytes_power, quality_adjusted_power) from STDIN`) + if err != nil { + return xerrors.Errorf("prepare tmp miner_power: %w", err) + } + + for _, miners := range minerTips { + for _, minerInfo := range miners { + if _, err := stmt.Exec( + minerInfo.addr.String(), + minerInfo.stateroot.String(), + minerInfo.rawPower.String(), + minerInfo.qalPower.String(), + ); err != nil { + log.Errorw("failed to store miner power", "miner", minerInfo.addr, "stateroot", minerInfo.stateroot, "error", err) + } + } + } + + if err := stmt.Close(); err != nil { + return xerrors.Errorf("close prepared miner_power: %w", err) + } + + if _, err := tx.Exec(`insert into miner_power select * from mp on conflict do nothing`); err != nil { + return xerrors.Errorf("insert miner_power from tmp: %w", err) + } + + if err := tx.Commit(); err != nil { + return xerrors.Errorf("commit miner_power tx: %w", err) + } + + return nil +} + func (st *storage) storeMinerSectorsHeads(minerTips map[types.TipSetKey][]*minerStateInfo, api api.FullNode) error { tx, err := st.db.Begin() if err != nil { diff --git a/cmd/lotus-chainwatch/sync.go b/cmd/lotus-chainwatch/sync.go index fe4970ffb..4ba4ea33c 100644 --- a/cmd/lotus-chainwatch/sync.go +++ b/cmd/lotus-chainwatch/sync.go @@ -471,6 +471,12 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types. return } + minerPowerStartedAt := time.Now() + if err := st.storeMinerPower(minerTips); err != nil { + log.Error(err) + } + log.Infow("Stored miner power", "duration", time.Since(minerPowerStartedAt).String()) + sectorStart := time.Now() if err := st.storeSectors(minerTips, api); err != nil { log.Error(err) From a79b31c23021860b5d5223ca622a7c4d7ed7121a Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Wed, 8 Jul 2020 16:16:45 -0700 Subject: [PATCH 06/25] produce and parse FIL suffix on FIL strings --- chain/types/fil.go | 26 +++++++++++++++++++++++--- 1 file changed, 23 insertions(+), 3 deletions(-) diff --git a/chain/types/fil.go b/chain/types/fil.go index 527078e0f..941cfbaa9 100644 --- a/chain/types/fil.go +++ b/chain/types/fil.go @@ -15,7 +15,7 @@ func (f FIL) String() string { if r.Sign() == 0 { return "0" } - return strings.TrimRight(strings.TrimRight(r.FloatString(18), "0"), ".") + return strings.TrimRight(strings.TrimRight(r.FloatString(18), "0"), ".") + " FIL" } func (f FIL) Format(s fmt.State, ch rune) { @@ -28,14 +28,34 @@ func (f FIL) Format(s fmt.State, ch rune) { } func ParseFIL(s string) (FIL, error) { + suffix := strings.TrimLeft(s, ".1234567890") + var attofil bool + if suffix != "" { + norm := strings.ToLower(strings.TrimSpace(suffix)) + switch norm { + case "", "fil": + case "attofil", "afil": + attofil = true + default: + return FIL{}, fmt.Errorf("unrecognized suffix: %q", suffix) + } + } + r, ok := new(big.Rat).SetString(s) if !ok { return FIL{}, fmt.Errorf("failed to parse %q as a decimal number", s) } - r = r.Mul(r, big.NewRat(int64(build.FilecoinPrecision), 1)) + if !attofil { + r = r.Mul(r, big.NewRat(int64(build.FilecoinPrecision), 1)) + } + if !r.IsInt() { - return FIL{}, fmt.Errorf("invalid FIL value: %q", s) + var pref string + if attofil { + pref = "atto" + } + return FIL{}, fmt.Errorf("invalid %sFIL value: %q", pref, s) } return FIL{r.Num()}, nil From 57a1418ff87d8ddded0b163e96b4b74bb8280747 Mon Sep 17 00:00:00 2001 From: Aayush Rajasekaran Date: Thu, 9 Jul 2020 01:05:27 -0400 Subject: [PATCH 07/25] Correct initial setup of sector id counter --- cmd/lotus-storage-miner/init.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/cmd/lotus-storage-miner/init.go b/cmd/lotus-storage-miner/init.go index 14972c69a..208c17d03 100644 --- a/cmd/lotus-storage-miner/init.go +++ b/cmd/lotus-storage-miner/init.go @@ -359,10 +359,8 @@ func migratePreSealMeta(ctx context.Context, api lapi.FullNode, metadata string, }*/ } - log.Infof("Setting next sector ID to %d", maxSectorID+1) - buf := make([]byte, binary.MaxVarintLen64) - size := binary.PutUvarint(buf, uint64(maxSectorID+1)) + size := binary.PutUvarint(buf, uint64(maxSectorID)) return mds.Put(datastore.NewKey(modules.StorageCounterDSPrefix), buf[:size]) } From b0cf3d322896ee085bc072527f2105cf36841fcb Mon Sep 17 00:00:00 2001 From: Jakub Sztandera Date: Thu, 9 Jul 2020 16:19:44 +0200 Subject: [PATCH 08/25] Mute RtRefreshManager logs as they break terminal emulation RtRefreshManager logs random binary strings, which somtimes might be partial multi-byte Unicode or ASCI escape codes. Signed-off-by: Jakub Sztandera --- lib/lotuslog/levels.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/lib/lotuslog/levels.go b/lib/lotuslog/levels.go index ae3959568..d8b4678b7 100644 --- a/lib/lotuslog/levels.go +++ b/lib/lotuslog/levels.go @@ -18,4 +18,6 @@ func SetupLogLevels() { _ = logging.SetLogLevel("stores", "DEBUG") _ = logging.SetLogLevel("nat", "INFO") } + // Always mute RtRefreshManager because it breaks terminals + _ = logging.SetLogLevel("dht/RtRefreshManager", "FATAL") } From 770ee8b54d71c8f1aea71530f5b390d115a80597 Mon Sep 17 00:00:00 2001 From: Dirk McCormick Date: Thu, 9 Jul 2020 16:35:43 -0400 Subject: [PATCH 09/25] test: payment channel manager voucher and channel info tests --- paychmgr/paych.go | 44 +++- paychmgr/paych_test.go | 564 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 607 insertions(+), 1 deletion(-) create mode 100644 paychmgr/paych_test.go diff --git a/paychmgr/paych.go b/paychmgr/paych.go index 763c448f9..a02630ac7 100644 --- a/paychmgr/paych.go +++ b/paychmgr/paych.go @@ -6,6 +6,8 @@ import ( "fmt" "math" + "github.com/filecoin-project/lotus/api" + cborutil "github.com/filecoin-project/go-cbor-util" "github.com/filecoin-project/specs-actors/actors/builtin" "github.com/filecoin-project/specs-actors/actors/builtin/account" @@ -34,9 +36,23 @@ type ManagerApi struct { full.StateAPI } +type StateManagerApi interface { + LoadActorState(ctx context.Context, a address.Address, out interface{}, ts *types.TipSet) (*types.Actor, error) + Call(ctx context.Context, msg *types.Message, ts *types.TipSet) (*api.InvocResult, error) +} + +//type StateApi interface { +// StateWaitMsg(ctx context.Context, msg cid.Cid, confidence uint64) (*api.MsgLookup, error) +//} +// +//type MpoolApi interface { +// MpoolPushMessage(ctx context.Context, msg *types.Message) (*types.SignedMessage, error) +//} + type Manager struct { store *Store - sm *stmgr.StateManager + //sm *stmgr.StateManager + sm StateManagerApi mpool full.MpoolAPI wallet full.WalletAPI @@ -54,6 +70,18 @@ func NewManager(sm *stmgr.StateManager, pchstore *Store, api ManagerApi) *Manage } } +// Used by the tests to supply mocks +func newManager(sm StateManagerApi, pchstore *Store) *Manager { + return &Manager{ + store: pchstore, + sm: sm, + + //mpool: api.MpoolAPI, + //wallet: api.WalletAPI, + //state: api.StateAPI, + } +} + func maxLaneFromState(st *paych.State) (uint64, error) { maxLane := uint64(math.MaxInt64) for _, state := range st.LaneStates { @@ -186,6 +214,7 @@ func (pm *Manager) CheckVoucherValid(ctx context.Context, ch address.Address, sv return fmt.Errorf("nonce too low") } + // TODO: return error if ls.Redeemed > vs.Amount sendAmount = types.BigSub(sv.Amount, ls.Redeemed) } @@ -284,10 +313,19 @@ func (pm *Manager) AddVoucher(ctx context.Context, ch address.Address, sv *paych return types.NewInt(0), err } + // TODO: I believe this check is redundant because + // CheckVoucherValid() already returns an error if laneState.Nonce >= sv.Nonce if minDelta.GreaterThan(types.NewInt(0)) && laneState.Nonce > sv.Nonce { return types.NewInt(0), xerrors.Errorf("already storing voucher with higher nonce; %d > %d", laneState.Nonce, sv.Nonce) } + // TODO: + // It's possible to repeatedly add a voucher with the same proof: + // 1. add a voucher with proof P1 + // 2. add a voucher with proof P2 + // 3. add a voucher with proof P2 (again) + // Voucher with proof P2 has been added twice + // // look for duplicates for i, v := range ci.Vouchers { eq, err := cborutil.Equals(sv, v.Voucher) @@ -297,6 +335,8 @@ func (pm *Manager) AddVoucher(ctx context.Context, ch address.Address, sv *paych if !eq { continue } + // TODO: CBOR encoding / decoding changes nil into []byte{}, so instead of + // checking v.Proof against nil we should check len(v.Proof) == 0 if v.Proof != nil { if !bytes.Equal(v.Proof, proof) { log.Warnf("AddVoucher: multiple proofs for single voucher, storing both") @@ -333,6 +373,7 @@ func (pm *Manager) AddVoucher(ctx context.Context, ch address.Address, sv *paych } func (pm *Manager) AllocateLane(ch address.Address) (uint64, error) { + // TODO: should this take into account lane state? return pm.store.AllocateLane(ch) } @@ -355,6 +396,7 @@ func (pm *Manager) OutboundChanTo(from, to address.Address) (address.Address, er } func (pm *Manager) NextNonceForLane(ctx context.Context, ch address.Address, lane uint64) (uint64, error) { + // TODO: should this take into account lane state? vouchers, err := pm.store.VouchersForPaych(ch) if err != nil { return 0, err diff --git a/paychmgr/paych_test.go b/paychmgr/paych_test.go new file mode 100644 index 000000000..50e3503f5 --- /dev/null +++ b/paychmgr/paych_test.go @@ -0,0 +1,564 @@ +package paychmgr + +import ( + "context" + "fmt" + "sync" + "testing" + + "github.com/filecoin-project/specs-actors/actors/builtin" + "github.com/ipfs/go-cid" + + "github.com/filecoin-project/lotus/lib/sigs" + "github.com/filecoin-project/specs-actors/actors/crypto" + + "github.com/stretchr/testify/require" + + "github.com/filecoin-project/specs-actors/actors/abi/big" + + "github.com/filecoin-project/specs-actors/actors/abi" + tutils "github.com/filecoin-project/specs-actors/support/testing" + + "github.com/filecoin-project/specs-actors/actors/builtin/paych" + + "github.com/filecoin-project/specs-actors/actors/builtin/account" + + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/chain/types" + + ds "github.com/ipfs/go-datastore" + ds_sync "github.com/ipfs/go-datastore/sync" +) + +type testPchState struct { + actor *types.Actor + state paych.State +} + +type mockStateManager struct { + lk sync.Mutex + accountState map[address.Address]account.State + paychState map[address.Address]testPchState + response *api.InvocResult +} + +func newMockStateManager() *mockStateManager { + return &mockStateManager{ + accountState: make(map[address.Address]account.State), + paychState: make(map[address.Address]testPchState), + } +} + +func (sm *mockStateManager) setAccountState(a address.Address, state account.State) { + sm.lk.Lock() + defer sm.lk.Unlock() + sm.accountState[a] = state +} + +func (sm *mockStateManager) setPaychState(a address.Address, actor *types.Actor, state paych.State) { + sm.lk.Lock() + defer sm.lk.Unlock() + sm.paychState[a] = testPchState{actor, state} +} + +func (sm *mockStateManager) LoadActorState(ctx context.Context, a address.Address, out interface{}, ts *types.TipSet) (*types.Actor, error) { + sm.lk.Lock() + defer sm.lk.Unlock() + + if outState, ok := out.(*account.State); ok { + *outState = sm.accountState[a] + return nil, nil + } + if outState, ok := out.(*paych.State); ok { + info := sm.paychState[a] + *outState = info.state + return info.actor, nil + } + panic(fmt.Sprintf("unexpected state type %v", out)) +} + +func (sm *mockStateManager) Call(ctx context.Context, msg *types.Message, ts *types.TipSet) (*api.InvocResult, error) { + return sm.response, nil +} + +func TestPaychOutbound(t *testing.T) { + ctx := context.Background() + store := NewStore(ds_sync.MutexWrap(ds.NewMapDatastore())) + + ch := tutils.NewIDAddr(t, 100) + from := tutils.NewIDAddr(t, 101) + to := tutils.NewIDAddr(t, 102) + fromAcct := tutils.NewIDAddr(t, 201) + toAcct := tutils.NewIDAddr(t, 202) + + sm := newMockStateManager() + sm.setAccountState(fromAcct, account.State{from}) + sm.setAccountState(toAcct, account.State{to}) + sm.setPaychState(ch, nil, paych.State{ + From: fromAcct, + To: toAcct, + ToSend: big.NewInt(0), + SettlingAt: abi.ChainEpoch(0), + MinSettleHeight: abi.ChainEpoch(0), + LaneStates: []*paych.LaneState{}, + }) + + mgr := newManager(sm, store) + err := mgr.TrackOutboundChannel(ctx, ch) + require.NoError(t, err) + + ci, err := mgr.GetChannelInfo(ch) + require.NoError(t, err) + require.Equal(t, ci.Channel, ch) + require.Equal(t, ci.Control, from) + require.Equal(t, ci.Target, to) + require.EqualValues(t, ci.Direction, DirOutbound) + require.EqualValues(t, ci.NextLane, 0) + require.Len(t, ci.Vouchers, 0) +} + +func TestPaychInbound(t *testing.T) { + ctx := context.Background() + store := NewStore(ds_sync.MutexWrap(ds.NewMapDatastore())) + + ch := tutils.NewIDAddr(t, 100) + from := tutils.NewIDAddr(t, 101) + to := tutils.NewIDAddr(t, 102) + fromAcct := tutils.NewIDAddr(t, 201) + toAcct := tutils.NewIDAddr(t, 202) + + sm := newMockStateManager() + sm.setAccountState(fromAcct, account.State{from}) + sm.setAccountState(toAcct, account.State{to}) + sm.setPaychState(ch, nil, paych.State{ + From: fromAcct, + To: toAcct, + ToSend: big.NewInt(0), + SettlingAt: abi.ChainEpoch(0), + MinSettleHeight: abi.ChainEpoch(0), + LaneStates: []*paych.LaneState{}, + }) + + mgr := newManager(sm, store) + err := mgr.TrackInboundChannel(ctx, ch) + require.NoError(t, err) + + ci, err := mgr.GetChannelInfo(ch) + require.NoError(t, err) + require.Equal(t, ci.Channel, ch) + require.Equal(t, ci.Control, to) + require.Equal(t, ci.Target, from) + require.EqualValues(t, ci.Direction, DirInbound) + require.EqualValues(t, ci.NextLane, 0) + require.Len(t, ci.Vouchers, 0) +} + +func TestCheckVoucherValid(t *testing.T) { + ctx := context.Background() + + fromKeyPrivate, fromKeyPublic := testGenerateKeyPair(t) + randKeyPrivate, _ := testGenerateKeyPair(t) + + ch := tutils.NewIDAddr(t, 100) + from := tutils.NewSECP256K1Addr(t, string(fromKeyPublic)) + to := tutils.NewSECP256K1Addr(t, "secpTo") + fromAcct := tutils.NewActorAddr(t, "fromAct") + toAcct := tutils.NewActorAddr(t, "toAct") + + sm := newMockStateManager() + sm.setAccountState(fromAcct, account.State{from}) + sm.setAccountState(toAcct, account.State{to}) + + tcases := []struct { + name string + expectError bool + key []byte + actorBalance big.Int + toSend big.Int + voucherAmount big.Int + voucherLane uint64 + voucherNonce uint64 + laneStates []*paych.LaneState + }{{ + name: "passes when voucher amount < balance", + key: fromKeyPrivate, + actorBalance: big.NewInt(10), + toSend: big.NewInt(0), + voucherAmount: big.NewInt(5), + }, { + name: "fails when funds too low", + expectError: true, + key: fromKeyPrivate, + actorBalance: big.NewInt(5), + toSend: big.NewInt(0), + voucherAmount: big.NewInt(10), + }, { + name: "fails when invalid signature", + expectError: true, + key: randKeyPrivate, + actorBalance: big.NewInt(10), + toSend: big.NewInt(0), + voucherAmount: big.NewInt(5), + }, { + name: "fails when nonce too low", + expectError: true, + key: fromKeyPrivate, + actorBalance: big.NewInt(10), + toSend: big.NewInt(0), + voucherAmount: big.NewInt(5), + voucherLane: 1, + voucherNonce: 2, + laneStates: []*paych.LaneState{{ + ID: 1, + Redeemed: big.NewInt(2), + Nonce: 3, + }}, + }, { + name: "passes when nonce higher", + key: fromKeyPrivate, + actorBalance: big.NewInt(10), + toSend: big.NewInt(0), + voucherAmount: big.NewInt(5), + voucherLane: 1, + voucherNonce: 3, + laneStates: []*paych.LaneState{{ + ID: 1, + Redeemed: big.NewInt(2), + Nonce: 2, + }}, + }, { + name: "passes when nonce for different lane", + key: fromKeyPrivate, + actorBalance: big.NewInt(10), + toSend: big.NewInt(0), + voucherAmount: big.NewInt(5), + voucherLane: 2, + voucherNonce: 2, + laneStates: []*paych.LaneState{{ + ID: 1, + Redeemed: big.NewInt(2), + Nonce: 3, + }}, + }, { + name: "fails when voucher has higher nonce but lower value than lane state", + expectError: true, + key: fromKeyPrivate, + actorBalance: big.NewInt(10), + toSend: big.NewInt(0), + voucherAmount: big.NewInt(5), + voucherLane: 1, + voucherNonce: 3, + laneStates: []*paych.LaneState{{ + ID: 1, + Redeemed: big.NewInt(6), + Nonce: 2, + }}, + }, { + name: "fails when voucher + ToSend > balance", + expectError: true, + key: fromKeyPrivate, + actorBalance: big.NewInt(10), + toSend: big.NewInt(9), + voucherAmount: big.NewInt(2), + }, { + // required balance = toSend + (voucher - redeemed) + // = 0 + (11 - 2) + // = 9 + // So required balance: 9 < actor balance: 10 + name: "passes when voucher - redeemed < balance", + key: fromKeyPrivate, + actorBalance: big.NewInt(10), + toSend: big.NewInt(0), + voucherAmount: big.NewInt(11), + voucherLane: 1, + voucherNonce: 3, + laneStates: []*paych.LaneState{{ + ID: 1, + Redeemed: big.NewInt(2), + Nonce: 2, + }}, + }} + + for _, tcase := range tcases { + t.Run(tcase.name, func(t *testing.T) { + store := NewStore(ds_sync.MutexWrap(ds.NewMapDatastore())) + + act := &types.Actor{ + Code: builtin.AccountActorCodeID, + Head: cid.Cid{}, + Nonce: 0, + Balance: tcase.actorBalance, + } + sm.setPaychState(ch, act, paych.State{ + From: fromAcct, + To: toAcct, + ToSend: tcase.toSend, + SettlingAt: abi.ChainEpoch(0), + MinSettleHeight: abi.ChainEpoch(0), + LaneStates: tcase.laneStates, + }) + + mgr := newManager(sm, store) + err := mgr.TrackInboundChannel(ctx, ch) + require.NoError(t, err) + + sv := testCreateVoucher(t, tcase.voucherLane, tcase.voucherNonce, tcase.voucherAmount, tcase.key) + + err = mgr.CheckVoucherValid(ctx, ch, sv) + if tcase.expectError { + require.Error(t, err) + } else { + require.NoError(t, err) + } + }) + } +} + +func TestAddVoucherDelta(t *testing.T) { + ctx := context.Background() + + // Set up a manager with a single payment channel + mgr, ch, fromKeyPrivate := testSetupMgrWithChannel(t, ctx) + + voucherLane := uint64(1) + + // Expect error when adding a voucher whose amount is less than minDelta + minDelta := big.NewInt(2) + nonce := uint64(1) + voucherAmount := big.NewInt(1) + sv := testCreateVoucher(t, voucherLane, nonce, voucherAmount, fromKeyPrivate) + _, err := mgr.AddVoucher(ctx, ch, sv, nil, minDelta) + require.Error(t, err) + + // Expect success when adding a voucher whose amount is equal to minDelta + nonce++ + voucherAmount = big.NewInt(2) + sv = testCreateVoucher(t, voucherLane, nonce, voucherAmount, fromKeyPrivate) + delta, err := mgr.AddVoucher(ctx, ch, sv, nil, minDelta) + require.NoError(t, err) + require.EqualValues(t, delta.Int64(), 2) + + // Check that delta is correct when there's an existing voucher + nonce++ + voucherAmount = big.NewInt(5) + sv = testCreateVoucher(t, voucherLane, nonce, voucherAmount, fromKeyPrivate) + delta, err = mgr.AddVoucher(ctx, ch, sv, nil, minDelta) + require.NoError(t, err) + require.EqualValues(t, delta.Int64(), 3) + + // Check that delta is correct when voucher added to a different lane + nonce = uint64(1) + voucherAmount = big.NewInt(6) + voucherLane = uint64(2) + sv = testCreateVoucher(t, voucherLane, nonce, voucherAmount, fromKeyPrivate) + delta, err = mgr.AddVoucher(ctx, ch, sv, nil, minDelta) + require.NoError(t, err) + require.EqualValues(t, delta.Int64(), 6) +} + +func TestAddVoucherNextLane(t *testing.T) { + ctx := context.Background() + + // Set up a manager with a single payment channel + mgr, ch, fromKeyPrivate := testSetupMgrWithChannel(t, ctx) + + minDelta := big.NewInt(0) + voucherAmount := big.NewInt(2) + + // Add a voucher in lane: 2 + nonce := uint64(1) + voucherLane := uint64(2) + sv := testCreateVoucher(t, voucherLane, nonce, voucherAmount, fromKeyPrivate) + _, err := mgr.AddVoucher(ctx, ch, sv, nil, minDelta) + require.NoError(t, err) + + ci, err := mgr.GetChannelInfo(ch) + require.NoError(t, err) + require.EqualValues(t, ci.NextLane, 3) + + // Add a voucher in lane: 1 + nonce++ + voucherLane = uint64(1) + sv = testCreateVoucher(t, voucherLane, nonce, voucherAmount, fromKeyPrivate) + _, err = mgr.AddVoucher(ctx, ch, sv, nil, minDelta) + require.NoError(t, err) + + ci, err = mgr.GetChannelInfo(ch) + require.NoError(t, err) + require.EqualValues(t, ci.NextLane, 3) + + // Add a voucher in lane: 5 + nonce++ + voucherLane = uint64(1) + sv = testCreateVoucher(t, voucherLane, nonce, voucherAmount, fromKeyPrivate) + _, err = mgr.AddVoucher(ctx, ch, sv, nil, minDelta) + require.NoError(t, err) + + ci, err = mgr.GetChannelInfo(ch) + require.NoError(t, err) + require.EqualValues(t, ci.NextLane, 6) +} + +func TestAddVoucherProof(t *testing.T) { + ctx := context.Background() + + // Set up a manager with a single payment channel + mgr, ch, fromKeyPrivate := testSetupMgrWithChannel(t, ctx) + + nonce := uint64(1) + voucherAmount := big.NewInt(1) + minDelta := big.NewInt(0) + voucherAmount = big.NewInt(2) + voucherLane := uint64(1) + + // Add a voucher with no proof + var proof []byte + sv := testCreateVoucher(t, voucherLane, nonce, voucherAmount, fromKeyPrivate) + _, err := mgr.AddVoucher(ctx, ch, sv, nil, minDelta) + require.NoError(t, err) + + // Expect one voucher with no proof + ci, err := mgr.GetChannelInfo(ch) + require.NoError(t, err) + require.Len(t, ci.Vouchers, 1) + require.Len(t, ci.Vouchers[0].Proof, 0) + + // Add same voucher with no proof + voucherLane = uint64(1) + _, err = mgr.AddVoucher(ctx, ch, sv, proof, minDelta) + require.NoError(t, err) + + // Expect one voucher with no proof + ci, err = mgr.GetChannelInfo(ch) + require.NoError(t, err) + require.Len(t, ci.Vouchers, 1) + require.Len(t, ci.Vouchers[0].Proof, 0) + + // Add same voucher with proof + proof = []byte{1} + _, err = mgr.AddVoucher(ctx, ch, sv, nil, minDelta) + require.NoError(t, err) + + // Should add proof to existing voucher + ci, err = mgr.GetChannelInfo(ch) + require.NoError(t, err) + require.Len(t, ci.Vouchers, 1) + require.Len(t, ci.Vouchers[0].Proof, 1) +} + +func TestAllocateLane(t *testing.T) { + ctx := context.Background() + + // Set up a manager with a single payment channel + mgr, ch, _ := testSetupMgrWithChannel(t, ctx) + + lane, err := mgr.AllocateLane(ch) + require.NoError(t, err) + require.EqualValues(t, lane, 1) + + lane, err = mgr.AllocateLane(ch) + require.NoError(t, err) + require.EqualValues(t, lane, 2) +} + +func TestNextNonceForLane(t *testing.T) { + ctx := context.Background() + + // Set up a manager with a single payment channel + mgr, ch, key := testSetupMgrWithChannel(t, ctx) + + // Expect next nonce for non-existent lane to be 1 + next, err := mgr.NextNonceForLane(ctx, ch, 1) + require.NoError(t, err) + require.EqualValues(t, next, 1) + + voucherAmount := big.NewInt(1) + minDelta := big.NewInt(0) + voucherAmount = big.NewInt(2) + + // Add vouchers such that we have + // lane 1: nonce 3 + // lane 1: nonce 2 + // lane 2: nonce 5 + voucherLane := uint64(1) + for _, nonce := range []uint64{3, 2} { + sv := testCreateVoucher(t, voucherLane, nonce, voucherAmount, key) + _, err := mgr.AddVoucher(ctx, ch, sv, nil, minDelta) + require.NoError(t, err) + } + + voucherLane = uint64(2) + nonce := uint64(5) + sv := testCreateVoucher(t, voucherLane, nonce, voucherAmount, key) + _, err = mgr.AddVoucher(ctx, ch, sv, nil, minDelta) + require.NoError(t, err) + + // Expect next nonce for lane 1 to be 4 + next, err = mgr.NextNonceForLane(ctx, ch, 1) + require.NoError(t, err) + require.EqualValues(t, next, 4) + + // Expect next nonce for lane 2 to be 6 + next, err = mgr.NextNonceForLane(ctx, ch, 2) + require.NoError(t, err) + require.EqualValues(t, next, 6) +} + +func testSetupMgrWithChannel(t *testing.T, ctx context.Context) (*Manager, address.Address, []byte) { + fromKeyPrivate, fromKeyPublic := testGenerateKeyPair(t) + + ch := tutils.NewIDAddr(t, 100) + from := tutils.NewSECP256K1Addr(t, string(fromKeyPublic)) + to := tutils.NewSECP256K1Addr(t, "secpTo") + fromAcct := tutils.NewActorAddr(t, "fromAct") + toAcct := tutils.NewActorAddr(t, "toAct") + + sm := newMockStateManager() + sm.setAccountState(fromAcct, account.State{from}) + sm.setAccountState(toAcct, account.State{to}) + + act := &types.Actor{ + Code: builtin.AccountActorCodeID, + Head: cid.Cid{}, + Nonce: 0, + Balance: big.NewInt(10), + } + sm.setPaychState(ch, act, paych.State{ + From: fromAcct, + To: toAcct, + ToSend: big.NewInt(0), + SettlingAt: abi.ChainEpoch(0), + MinSettleHeight: abi.ChainEpoch(0), + LaneStates: []*paych.LaneState{}, + }) + + store := NewStore(ds_sync.MutexWrap(ds.NewMapDatastore())) + mgr := newManager(sm, store) + err := mgr.TrackInboundChannel(ctx, ch) + require.NoError(t, err) + return mgr, ch, fromKeyPrivate +} + +func testGenerateKeyPair(t *testing.T) ([]byte, []byte) { + priv, err := sigs.Generate(crypto.SigTypeSecp256k1) + require.NoError(t, err) + pub, err := sigs.ToPublic(crypto.SigTypeSecp256k1, priv) + require.NoError(t, err) + return priv, pub +} + +func testCreateVoucher(t *testing.T, voucherLane uint64, nonce uint64, voucherAmount big.Int, key []byte) *paych.SignedVoucher { + sv := &paych.SignedVoucher{ + Lane: voucherLane, + Nonce: nonce, + Amount: voucherAmount, + } + + signingBytes, err := sv.SigningBytes() + require.NoError(t, err) + sig, err := sigs.Sign(crypto.SigTypeSecp256k1, key, signingBytes) + require.NoError(t, err) + sv.Signature = sig + return sv +} From 1cdb008bd508dd5ca8eceeaeb50cf027d1f4102f Mon Sep 17 00:00:00 2001 From: Dirk McCormick Date: Thu, 9 Jul 2020 17:20:17 -0400 Subject: [PATCH 10/25] fix: lane allocation --- paychmgr/paych.go | 29 +++++++++++------------------ paychmgr/paych_test.go | 16 ++++++++-------- 2 files changed, 19 insertions(+), 26 deletions(-) diff --git a/paychmgr/paych.go b/paychmgr/paych.go index a02630ac7..a715107e8 100644 --- a/paychmgr/paych.go +++ b/paychmgr/paych.go @@ -4,7 +4,6 @@ import ( "bytes" "context" "fmt" - "math" "github.com/filecoin-project/lotus/api" @@ -82,14 +81,18 @@ func newManager(sm StateManagerApi, pchstore *Store) *Manager { } } -func maxLaneFromState(st *paych.State) (uint64, error) { - maxLane := uint64(math.MaxInt64) +func nextLaneFromState(st *paych.State) uint64 { + if len(st.LaneStates) == 0 { + return 0 + } + + maxLane := st.LaneStates[0].ID for _, state := range st.LaneStates { - if (state.ID)+1 > maxLane+1 { + if state.ID > maxLane { maxLane = state.ID } } - return maxLane, nil + return maxLane + 1 } func (pm *Manager) TrackInboundChannel(ctx context.Context, ch address.Address) error { @@ -110,18 +113,13 @@ func (pm *Manager) TrackInboundChannel(ctx context.Context, ch address.Address) } to := account.Address - maxLane, err := maxLaneFromState(st) - if err != nil { - return err - } - return pm.store.TrackChannel(&ChannelInfo{ Channel: ch, Control: to, Target: from, Direction: DirInbound, - NextLane: maxLane + 1, + NextLane: nextLaneFromState(st), }) } @@ -131,11 +129,6 @@ func (pm *Manager) loadOutboundChannelInfo(ctx context.Context, ch address.Addre return nil, err } - maxLane, err := maxLaneFromState(st) - if err != nil { - return nil, err - } - var account account.State _, err = pm.sm.LoadActorState(ctx, st.From, &account, nil) if err != nil { @@ -154,7 +147,7 @@ func (pm *Manager) loadOutboundChannelInfo(ctx context.Context, ch address.Addre Target: to, Direction: DirOutbound, - NextLane: maxLane + 1, + NextLane: nextLaneFromState(st), }, nil } @@ -365,7 +358,7 @@ func (pm *Manager) AddVoucher(ctx context.Context, ch address.Address, sv *paych Proof: proof, }) - if ci.NextLane <= (sv.Lane) { + if ci.NextLane <= sv.Lane { ci.NextLane = sv.Lane + 1 } diff --git a/paychmgr/paych_test.go b/paychmgr/paych_test.go index 50e3503f5..b735a22b1 100644 --- a/paychmgr/paych_test.go +++ b/paychmgr/paych_test.go @@ -366,7 +366,7 @@ func TestAddVoucherNextLane(t *testing.T) { minDelta := big.NewInt(0) voucherAmount := big.NewInt(2) - // Add a voucher in lane: 2 + // Add a voucher in lane 2 nonce := uint64(1) voucherLane := uint64(2) sv := testCreateVoucher(t, voucherLane, nonce, voucherAmount, fromKeyPrivate) @@ -377,8 +377,7 @@ func TestAddVoucherNextLane(t *testing.T) { require.NoError(t, err) require.EqualValues(t, ci.NextLane, 3) - // Add a voucher in lane: 1 - nonce++ + // Add a voucher in lane 1 voucherLane = uint64(1) sv = testCreateVoucher(t, voucherLane, nonce, voucherAmount, fromKeyPrivate) _, err = mgr.AddVoucher(ctx, ch, sv, nil, minDelta) @@ -388,9 +387,8 @@ func TestAddVoucherNextLane(t *testing.T) { require.NoError(t, err) require.EqualValues(t, ci.NextLane, 3) - // Add a voucher in lane: 5 - nonce++ - voucherLane = uint64(1) + // Add a voucher in lane 5 + voucherLane = uint64(5) sv = testCreateVoucher(t, voucherLane, nonce, voucherAmount, fromKeyPrivate) _, err = mgr.AddVoucher(ctx, ch, sv, nil, minDelta) require.NoError(t, err) @@ -453,13 +451,15 @@ func TestAllocateLane(t *testing.T) { // Set up a manager with a single payment channel mgr, ch, _ := testSetupMgrWithChannel(t, ctx) + // First lane should be 0 lane, err := mgr.AllocateLane(ch) require.NoError(t, err) - require.EqualValues(t, lane, 1) + require.EqualValues(t, lane, 0) + // Next lane should be 1 lane, err = mgr.AllocateLane(ch) require.NoError(t, err) - require.EqualValues(t, lane, 2) + require.EqualValues(t, lane, 1) } func TestNextNonceForLane(t *testing.T) { From bf6b76a4fb02ef13e8c2630b2071e990393c6ade Mon Sep 17 00:00:00 2001 From: frrist Date: Tue, 7 Jul 2020 16:39:32 -0700 Subject: [PATCH 11/25] feat: add market deal state & proposal predicates - detects changes in the market deal proposal and market deal state amts. --- chain/events/state/predicates.go | 190 ++++++++++++++-- chain/events/state/predicates_test.go | 302 ++++++++++++++++++-------- 2 files changed, 392 insertions(+), 100 deletions(-) diff --git a/chain/events/state/predicates.go b/chain/events/state/predicates.go index 793a00806..16abc4928 100644 --- a/chain/events/state/predicates.go +++ b/chain/events/state/predicates.go @@ -4,13 +4,11 @@ import ( "bytes" "context" + "github.com/filecoin-project/go-address" "github.com/ipfs/go-cid" cbor "github.com/ipfs/go-ipld-cbor" typegen "github.com/whyrusleeping/cbor-gen" - "github.com/filecoin-project/go-address" - "github.com/filecoin-project/go-amt-ipld/v2" - "github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/builtin" "github.com/filecoin-project/specs-actors/actors/builtin/market" @@ -87,20 +85,25 @@ func (sp *StatePredicates) OnStorageMarketActorChanged(diffStorageMarketState Di }) } -type DiffDealStatesFunc func(ctx context.Context, oldDealStateRoot *amt.Root, newDealStateRoot *amt.Root) (changed bool, user UserData, err error) +type DiffAdtArraysFunc func(ctx context.Context, oldDealStateRoot, newDealStateRoot *adt.Array) (changed bool, user UserData, err error) -// OnDealStateChanged calls diffDealStates when the market state changes -func (sp *StatePredicates) OnDealStateChanged(diffDealStates DiffDealStatesFunc) DiffStorageMarketStateFunc { +// OnDealStateChanged calls diffDealStates when the market deal state changes +func (sp *StatePredicates) OnDealStateChanged(diffDealStates DiffAdtArraysFunc) DiffStorageMarketStateFunc { return func(ctx context.Context, oldState *market.State, newState *market.State) (changed bool, user UserData, err error) { if oldState.States.Equals(newState.States) { return false, nil, nil } - oldRoot, err := amt.LoadAMT(ctx, sp.cst, oldState.States) + ctxStore := &contextStore{ + ctx: ctx, + cst: sp.cst, + } + + oldRoot, err := adt.AsArray(ctxStore, oldState.States) if err != nil { return false, nil, err } - newRoot, err := amt.LoadAMT(ctx, sp.cst, newState.States) + newRoot, err := adt.AsArray(ctxStore, newState.States) if err != nil { return false, nil, err } @@ -109,31 +112,188 @@ func (sp *StatePredicates) OnDealStateChanged(diffDealStates DiffDealStatesFunc) } } +// OnDealProposalChanged calls diffDealProps when the market proposal state changes +func (sp *StatePredicates) OnDealProposalChanged(diffDealProps DiffAdtArraysFunc) DiffStorageMarketStateFunc { + return func(ctx context.Context, oldState *market.State, newState *market.State) (changed bool, user UserData, err error) { + if oldState.Proposals.Equals(newState.Proposals) { + return false, nil, nil + } + + ctxStore := &contextStore{ + ctx: ctx, + cst: sp.cst, + } + + oldRoot, err := adt.AsArray(ctxStore, oldState.Proposals) + if err != nil { + return false, nil, err + } + newRoot, err := adt.AsArray(ctxStore, newState.Proposals) + if err != nil { + return false, nil, err + } + + return diffDealProps(ctx, oldRoot, newRoot) + } +} + +var _ AdtArrayDiff = &MarketDealProposalChanges{} + +type MarketDealProposalChanges struct { + Added []ProposalIDState + Removed []ProposalIDState +} + +type ProposalIDState struct { + ID abi.DealID + Proposal market.DealProposal +} + +func (m *MarketDealProposalChanges) Add(key uint64, val *typegen.Deferred) error { + dp := new(market.DealProposal) + err := dp.UnmarshalCBOR(bytes.NewReader(val.Raw)) + if err != nil { + return err + } + m.Added = append(m.Added, ProposalIDState{abi.DealID(key), *dp}) + return nil +} + +func (m *MarketDealProposalChanges) Modify(key uint64, from, to *typegen.Deferred) error { + // short circuit, DealProposals are static + return nil +} + +func (m *MarketDealProposalChanges) Remove(key uint64, val *typegen.Deferred) error { + dp := new(market.DealProposal) + err := dp.UnmarshalCBOR(bytes.NewReader(val.Raw)) + if err != nil { + return err + } + m.Removed = append(m.Removed, ProposalIDState{abi.DealID(key), *dp}) + return nil +} + +// OnDealProposalAmtChanged detects changes in the deal proposal AMT for all deal proposals and returns a MarketProposalsChanges structure containing: +// - Added Proposals +// - Modified Proposals +// - Removed Proposals +func (sp *StatePredicates) OnDealProposalAmtChanged() DiffAdtArraysFunc { + return func(ctx context.Context, oldDealProps, newDealProps *adt.Array) (changed bool, user UserData, err error) { + proposalChanges := new(MarketDealProposalChanges) + if err := DiffAdtArray(oldDealProps, newDealProps, proposalChanges); err != nil { + return false, nil, err + } + + if len(proposalChanges.Added)+len(proposalChanges.Removed) == 0 { + return false, nil, nil + } + + return true, proposalChanges, nil + } +} + +var _ AdtArrayDiff = &MarketDealStateChanges{} + +type MarketDealStateChanges struct { + Added []DealIDState + Modified []DealStateChange + Removed []DealIDState +} + +type DealIDState struct { + ID abi.DealID + Deal market.DealState +} + +func (m *MarketDealStateChanges) Add(key uint64, val *typegen.Deferred) error { + ds := new(market.DealState) + err := ds.UnmarshalCBOR(bytes.NewReader(val.Raw)) + if err != nil { + return err + } + m.Added = append(m.Added, DealIDState{abi.DealID(key), *ds}) + return nil +} + +func (m *MarketDealStateChanges) Modify(key uint64, from, to *typegen.Deferred) error { + dsFrom := new(market.DealState) + if err := dsFrom.UnmarshalCBOR(bytes.NewReader(from.Raw)); err != nil { + return err + } + + dsTo := new(market.DealState) + if err := dsTo.UnmarshalCBOR(bytes.NewReader(to.Raw)); err != nil { + return err + } + + if *dsFrom != *dsTo { + m.Modified = append(m.Modified, DealStateChange{abi.DealID(key), dsFrom, dsTo}) + } + return nil +} + +func (m *MarketDealStateChanges) Remove(key uint64, val *typegen.Deferred) error { + ds := new(market.DealState) + err := ds.UnmarshalCBOR(bytes.NewReader(val.Raw)) + if err != nil { + return err + } + m.Removed = append(m.Removed, DealIDState{abi.DealID(key), *ds}) + return nil +} + +// OnDealStateAmtChanged detects changes in the deal state AMT for all deal states and returns a MarketDealStateChanges structure containing: +// - Added Deals +// - Modified Deals +// - Removed Deals +func (sp *StatePredicates) OnDealStateAmtChanged() DiffAdtArraysFunc { + return func(ctx context.Context, oldDealStates, newDealStates *adt.Array) (changed bool, user UserData, err error) { + dealStateChanges := new(MarketDealStateChanges) + if err := DiffAdtArray(oldDealStates, newDealStates, dealStateChanges); err != nil { + return false, nil, err + } + + if len(dealStateChanges.Added)+len(dealStateChanges.Modified)+len(dealStateChanges.Removed) == 0 { + return false, nil, nil + } + + return true, dealStateChanges, nil + } +} + // ChangedDeals is a set of changes to deal state type ChangedDeals map[abi.DealID]DealStateChange // DealStateChange is a change in deal state from -> to type DealStateChange struct { - From market.DealState - To market.DealState + ID abi.DealID + From *market.DealState + To *market.DealState } // DealStateChangedForIDs detects changes in the deal state AMT for the given deal IDs -func (sp *StatePredicates) DealStateChangedForIDs(dealIds []abi.DealID) DiffDealStatesFunc { - return func(ctx context.Context, oldDealStateRoot *amt.Root, newDealStateRoot *amt.Root) (changed bool, user UserData, err error) { +func (sp *StatePredicates) DealStateChangedForIDs(dealIds []abi.DealID) DiffAdtArraysFunc { + return func(ctx context.Context, oldDealStateArray, newDealStateArray *adt.Array) (changed bool, user UserData, err error) { changedDeals := make(ChangedDeals) for _, dealID := range dealIds { + var oldDealPtr, newDealPtr *market.DealState var oldDeal, newDeal market.DealState - err := oldDealStateRoot.Get(ctx, uint64(dealID), &oldDeal) + + _, err := oldDealStateArray.Get(uint64(dealID), &oldDeal) if err != nil { return false, nil, err } - err = newDealStateRoot.Get(ctx, uint64(dealID), &newDeal) + oldDealPtr = &oldDeal + + _, err = newDealStateArray.Get(uint64(dealID), &newDeal) if err != nil { return false, nil, err } + newDealPtr = &newDeal + if oldDeal != newDeal { - changedDeals[dealID] = DealStateChange{oldDeal, newDeal} + changedDeals[dealID] = DealStateChange{dealID, oldDealPtr, newDealPtr} } } if len(changedDeals) > 0 { diff --git a/chain/events/state/predicates_test.go b/chain/events/state/predicates_test.go index 7ab3dd074..072b2946e 100644 --- a/chain/events/state/predicates_test.go +++ b/chain/events/state/predicates_test.go @@ -21,6 +21,7 @@ import ( "github.com/filecoin-project/specs-actors/actors/builtin/market" "github.com/filecoin-project/specs-actors/actors/builtin/miner" "github.com/filecoin-project/specs-actors/actors/crypto" + "github.com/filecoin-project/specs-actors/actors/util/adt" tutils "github.com/filecoin-project/specs-actors/support/testing" "github.com/filecoin-project/lotus/chain/types" @@ -65,99 +66,216 @@ func (m mockAPI) setActor(tsk types.TipSetKey, act *types.Actor) { m.ts[tsk] = act } -func TestPredicates(t *testing.T) { +func TestMarketPredicates(t *testing.T) { ctx := context.Background() bs := bstore.NewBlockstore(ds_sync.MutexWrap(ds.NewMapDatastore())) store := cbornode.NewCborStore(bs) + oldDeal1 := &market.DealState{ + SectorStartEpoch: 1, + LastUpdatedEpoch: 2, + SlashEpoch: 0, + } + oldDeal2 := &market.DealState{ + SectorStartEpoch: 4, + LastUpdatedEpoch: 5, + SlashEpoch: 0, + } oldDeals := map[abi.DealID]*market.DealState{ - abi.DealID(1): { - SectorStartEpoch: 1, - LastUpdatedEpoch: 2, - SlashEpoch: 0, - }, - abi.DealID(2): { - SectorStartEpoch: 4, - LastUpdatedEpoch: 5, - SlashEpoch: 0, - }, + abi.DealID(1): oldDeal1, + abi.DealID(2): oldDeal2, } - oldStateC := createMarketState(ctx, t, store, oldDeals) + oldProp1 := &market.DealProposal{ + PieceCID: dummyCid, + PieceSize: 0, + VerifiedDeal: false, + Client: tutils.NewIDAddr(t, 1), + Provider: tutils.NewIDAddr(t, 1), + StartEpoch: 1, + EndEpoch: 2, + StoragePricePerEpoch: big.Zero(), + ProviderCollateral: big.Zero(), + ClientCollateral: big.Zero(), + } + oldProp2 := &market.DealProposal{ + PieceCID: dummyCid, + PieceSize: 0, + VerifiedDeal: false, + Client: tutils.NewIDAddr(t, 1), + Provider: tutils.NewIDAddr(t, 1), + StartEpoch: 2, + EndEpoch: 3, + StoragePricePerEpoch: big.Zero(), + ProviderCollateral: big.Zero(), + ClientCollateral: big.Zero(), + } + oldProps := map[abi.DealID]*market.DealProposal{ + abi.DealID(1): oldProp1, + abi.DealID(2): oldProp2, + } + + oldStateC := createMarketState(ctx, t, store, oldDeals, oldProps) + + newDeal1 := &market.DealState{ + SectorStartEpoch: 1, + LastUpdatedEpoch: 3, + SlashEpoch: 0, + } + newDeal2 := &market.DealState{ + SectorStartEpoch: 4, + LastUpdatedEpoch: 6, + SlashEpoch: 6, + } + // added + newDeal3 := &market.DealState{ + SectorStartEpoch: 1, + LastUpdatedEpoch: 2, + SlashEpoch: 3, + } newDeals := map[abi.DealID]*market.DealState{ - abi.DealID(1): { - SectorStartEpoch: 1, - LastUpdatedEpoch: 3, - SlashEpoch: 0, - }, - abi.DealID(2): { - SectorStartEpoch: 4, - LastUpdatedEpoch: 6, - SlashEpoch: 6, - }, + abi.DealID(1): newDeal1, + abi.DealID(2): newDeal2, + abi.DealID(3): newDeal3, } - newStateC := createMarketState(ctx, t, store, newDeals) - miner, err := address.NewFromString("t00") + // added + newProp3 := &market.DealProposal{ + PieceCID: dummyCid, + PieceSize: 0, + VerifiedDeal: false, + Client: tutils.NewIDAddr(t, 1), + Provider: tutils.NewIDAddr(t, 1), + StartEpoch: 4, + EndEpoch: 4, + StoragePricePerEpoch: big.Zero(), + ProviderCollateral: big.Zero(), + ClientCollateral: big.Zero(), + } + newProps := map[abi.DealID]*market.DealProposal{ + abi.DealID(1): oldProp1, // 1 was persisted + // prop 2 was removed + abi.DealID(3): newProp3, // new + // NB: DealProposals cannot be modified, so don't test that case. + } + newStateC := createMarketState(ctx, t, store, newDeals, newProps) + + minerAddr, err := address.NewFromString("t00") require.NoError(t, err) - oldState, err := mockTipset(miner, 1) + oldState, err := mockTipset(minerAddr, 1) require.NoError(t, err) - newState, err := mockTipset(miner, 2) + newState, err := mockTipset(minerAddr, 2) require.NoError(t, err) api := newMockAPI(bs) api.setActor(oldState.Key(), &types.Actor{Head: oldStateC}) api.setActor(newState.Key(), &types.Actor{Head: newStateC}) - preds := NewStatePredicates(api) + t.Run("deal ID predicate", func(t *testing.T) { + preds := NewStatePredicates(api) - dealIds := []abi.DealID{abi.DealID(1), abi.DealID(2)} - diffFn := preds.OnStorageMarketActorChanged(preds.OnDealStateChanged(preds.DealStateChangedForIDs(dealIds))) + dealIds := []abi.DealID{abi.DealID(1), abi.DealID(2)} + diffIDFn := preds.OnStorageMarketActorChanged(preds.OnDealStateChanged(preds.DealStateChangedForIDs(dealIds))) - // Diff a state against itself: expect no change - changed, _, err := diffFn(ctx, oldState.Key(), oldState.Key()) - require.NoError(t, err) - require.False(t, changed) + // Diff a state against itself: expect no change + changed, _, err := diffIDFn(ctx, oldState.Key(), oldState.Key()) + require.NoError(t, err) + require.False(t, changed) - // Diff old state against new state - changed, val, err := diffFn(ctx, oldState.Key(), newState.Key()) - require.NoError(t, err) - require.True(t, changed) + // Diff old state against new state + changed, valIDs, err := diffIDFn(ctx, oldState.Key(), newState.Key()) + require.NoError(t, err) + require.True(t, changed) - changedDeals, ok := val.(ChangedDeals) - require.True(t, ok) - require.Len(t, changedDeals, 2) - require.Contains(t, changedDeals, abi.DealID(1)) - require.Contains(t, changedDeals, abi.DealID(2)) - deal1 := changedDeals[abi.DealID(1)] - if deal1.From.LastUpdatedEpoch != 2 || deal1.To.LastUpdatedEpoch != 3 { - t.Fatal("Unexpected change to LastUpdatedEpoch") - } - deal2 := changedDeals[abi.DealID(2)] - if deal2.From.SlashEpoch != 0 || deal2.To.SlashEpoch != 6 { - t.Fatal("Unexpected change to SlashEpoch") - } + changedDealIDs, ok := valIDs.(ChangedDeals) + require.True(t, ok) + require.Len(t, changedDealIDs, 2) + require.Contains(t, changedDealIDs, abi.DealID(1)) + require.Contains(t, changedDealIDs, abi.DealID(2)) + deal1 := changedDealIDs[abi.DealID(1)] + if deal1.From.LastUpdatedEpoch != 2 || deal1.To.LastUpdatedEpoch != 3 { + t.Fatal("Unexpected change to LastUpdatedEpoch") + } + deal2 := changedDealIDs[abi.DealID(2)] + if deal2.From.SlashEpoch != 0 || deal2.To.SlashEpoch != 6 { + t.Fatal("Unexpected change to SlashEpoch") + } - // Test that OnActorStateChanged does not call the callback if the state has not changed - mockAddr, err := address.NewFromString("t01") - require.NoError(t, err) - actorDiffFn := preds.OnActorStateChanged(mockAddr, func(context.Context, cid.Cid, cid.Cid) (bool, UserData, error) { - t.Fatal("No state change so this should not be called") - return false, nil, nil + // Test that OnActorStateChanged does not call the callback if the state has not changed + mockAddr, err := address.NewFromString("t01") + require.NoError(t, err) + actorDiffFn := preds.OnActorStateChanged(mockAddr, func(context.Context, cid.Cid, cid.Cid) (bool, UserData, error) { + t.Fatal("No state change so this should not be called") + return false, nil, nil + }) + changed, _, err = actorDiffFn(ctx, oldState.Key(), oldState.Key()) + require.NoError(t, err) + require.False(t, changed) + + // Test that OnDealStateChanged does not call the callback if the state has not changed + diffDealStateFn := preds.OnDealStateChanged(func(context.Context, *adt.Array, *adt.Array) (bool, UserData, error) { + t.Fatal("No state change so this should not be called") + return false, nil, nil + }) + marketState := createEmptyMarketState(t, store) + changed, _, err = diffDealStateFn(ctx, marketState, marketState) + require.NoError(t, err) + require.False(t, changed) }) - changed, _, err = actorDiffFn(ctx, oldState.Key(), oldState.Key()) - require.NoError(t, err) - require.False(t, changed) - // Test that OnDealStateChanged does not call the callback if the state has not changed - diffDealStateFn := preds.OnDealStateChanged(func(context.Context, *amt.Root, *amt.Root) (bool, UserData, error) { - t.Fatal("No state change so this should not be called") - return false, nil, nil + t.Run("deal state array predicate", func(t *testing.T) { + preds := NewStatePredicates(api) + diffArrFn := preds.OnStorageMarketActorChanged(preds.OnDealStateChanged(preds.OnDealStateAmtChanged())) + + changed, _, err := diffArrFn(ctx, oldState.Key(), oldState.Key()) + require.NoError(t, err) + require.False(t, changed) + + changed, valArr, err := diffArrFn(ctx, oldState.Key(), newState.Key()) + require.NoError(t, err) + require.True(t, changed) + + changedDeals, ok := valArr.(*MarketDealStateChanges) + require.True(t, ok) + require.Len(t, changedDeals.Added, 1) + require.Equal(t, abi.DealID(3), changedDeals.Added[0].ID) + require.Equal(t, *newDeal3, changedDeals.Added[0].Deal) + + require.Len(t, changedDeals.Removed, 0) + + require.Len(t, changedDeals.Modified, 2) + require.Equal(t, abi.DealID(1), changedDeals.Modified[0].ID) + require.Equal(t, newDeal1, changedDeals.Modified[0].To) + require.Equal(t, oldDeal1, changedDeals.Modified[0].From) + + require.Equal(t, abi.DealID(2), changedDeals.Modified[1].ID) + require.Equal(t, oldDeal2, changedDeals.Modified[1].From) + require.Equal(t, newDeal2, changedDeals.Modified[1].To) + }) + + t.Run("deal proposal array predicate", func(t *testing.T) { + preds := NewStatePredicates(api) + diffArrFn := preds.OnStorageMarketActorChanged(preds.OnDealProposalChanged(preds.OnDealProposalAmtChanged())) + changed, _, err := diffArrFn(ctx, oldState.Key(), oldState.Key()) + require.NoError(t, err) + require.False(t, changed) + + changed, valArr, err := diffArrFn(ctx, oldState.Key(), newState.Key()) + require.NoError(t, err) + require.True(t, changed) + + changedProps, ok := valArr.(*MarketDealProposalChanges) + require.True(t, ok) + require.Len(t, changedProps.Added, 1) + require.Equal(t, abi.DealID(3), changedProps.Added[0].ID) + require.Equal(t, *newProp3, changedProps.Added[0].Proposal) + + // proposals cannot be modified -- no modified testing + + require.Len(t, changedProps.Removed, 1) + require.Equal(t, abi.DealID(2), changedProps.Removed[0].ID) + require.Equal(t, *oldProp2, changedProps.Removed[0].Proposal) }) - marketState := createEmptyMarketState(t, store) - changed, _, err = diffDealStateFn(ctx, marketState, marketState) - require.NoError(t, err) - require.False(t, changed) } func TestMinerSectorChange(t *testing.T) { @@ -208,14 +326,15 @@ func TestMinerSectorChange(t *testing.T) { require.True(t, ok) require.Equal(t, len(sectorChanges.Added), 1) - require.Equal(t, sectorChanges.Added[0], si3) + require.Equal(t, 1, len(sectorChanges.Added)) + require.Equal(t, si3, sectorChanges.Added[0]) - require.Equal(t, len(sectorChanges.Removed), 1) - require.Equal(t, sectorChanges.Removed[0], si0) + require.Equal(t, 1, len(sectorChanges.Removed)) + require.Equal(t, si0, sectorChanges.Removed[0]) - require.Equal(t, len(sectorChanges.Extended), 1) - require.Equal(t, sectorChanges.Extended[0].From, si1) - require.Equal(t, sectorChanges.Extended[0].To, si1Ext) + require.Equal(t, 1, len(sectorChanges.Extended)) + require.Equal(t, si1, sectorChanges.Extended[0].From) + require.Equal(t, si1Ext, sectorChanges.Extended[0].To) change, val, err = minerDiffFn(ctx, oldState.Key(), oldState.Key()) require.NoError(t, err) @@ -230,20 +349,20 @@ func TestMinerSectorChange(t *testing.T) { sectorChanges, ok = val.(*MinerSectorChanges) require.True(t, ok) - require.Equal(t, len(sectorChanges.Added), 1) - require.Equal(t, sectorChanges.Added[0], si0) + require.Equal(t, 1, len(sectorChanges.Added)) + require.Equal(t, si0, sectorChanges.Added[0]) - require.Equal(t, len(sectorChanges.Removed), 1) - require.Equal(t, sectorChanges.Removed[0], si3) + require.Equal(t, 1, len(sectorChanges.Removed)) + require.Equal(t, si3, sectorChanges.Removed[0]) - require.Equal(t, len(sectorChanges.Extended), 1) - require.Equal(t, sectorChanges.Extended[0].To, si1) - require.Equal(t, sectorChanges.Extended[0].From, si1Ext) + require.Equal(t, 1, len(sectorChanges.Extended)) + require.Equal(t, si1, sectorChanges.Extended[0].To) + require.Equal(t, si1Ext, sectorChanges.Extended[0].From) } -func mockTipset(miner address.Address, timestamp uint64) (*types.TipSet, error) { +func mockTipset(minerAddr address.Address, timestamp uint64) (*types.TipSet, error) { return types.NewTipSet([]*types.BlockHeader{{ - Miner: miner, + Miner: minerAddr, Height: 5, ParentStateRoot: dummyCid, Messages: dummyCid, @@ -254,11 +373,13 @@ func mockTipset(miner address.Address, timestamp uint64) (*types.TipSet, error) }}) } -func createMarketState(ctx context.Context, t *testing.T, store *cbornode.BasicIpldStore, deals map[abi.DealID]*market.DealState) cid.Cid { - rootCid := createDealAMT(ctx, t, store, deals) +func createMarketState(ctx context.Context, t *testing.T, store *cbornode.BasicIpldStore, deals map[abi.DealID]*market.DealState, props map[abi.DealID]*market.DealProposal) cid.Cid { + dealRootCid := createDealAMT(ctx, t, store, deals) + propRootCid := createProposalAMT(ctx, t, store, props) state := createEmptyMarketState(t, store) - state.States = rootCid + state.States = dealRootCid + state.Proposals = propRootCid stateC, err := store.Put(ctx, state) require.NoError(t, err) @@ -284,6 +405,17 @@ func createDealAMT(ctx context.Context, t *testing.T, store *cbornode.BasicIpldS return rootCid } +func createProposalAMT(ctx context.Context, t *testing.T, store *cbornode.BasicIpldStore, props map[abi.DealID]*market.DealProposal) cid.Cid { + root := amt.NewAMT(store) + for dealID, prop := range props { + err := root.Set(ctx, uint64(dealID), prop) + require.NoError(t, err) + } + rootCid, err := root.Flush(ctx) + require.NoError(t, err) + return rootCid +} + func createMinerState(ctx context.Context, t *testing.T, store *cbornode.BasicIpldStore, owner, worker address.Address, sectors []miner.SectorOnChainInfo) cid.Cid { rootCid := createSectorsAMT(ctx, t, store, sectors) From 6c70ef7c7d2fa1188d5cbd307da903ae91433301 Mon Sep 17 00:00:00 2001 From: Dirk McCormick Date: Thu, 9 Jul 2020 18:27:39 -0400 Subject: [PATCH 12/25] refactor: simplify state management --- paychmgr/paych.go | 206 +++++++++++++---------------------------- paychmgr/paych_test.go | 19 ++-- paychmgr/simple.go | 2 +- paychmgr/state.go | 60 ++++++++++-- 4 files changed, 130 insertions(+), 157 deletions(-) diff --git a/paychmgr/paych.go b/paychmgr/paych.go index a715107e8..12ab70bfc 100644 --- a/paychmgr/paych.go +++ b/paychmgr/paych.go @@ -40,18 +40,9 @@ type StateManagerApi interface { Call(ctx context.Context, msg *types.Message, ts *types.TipSet) (*api.InvocResult, error) } -//type StateApi interface { -// StateWaitMsg(ctx context.Context, msg cid.Cid, confidence uint64) (*api.MsgLookup, error) -//} -// -//type MpoolApi interface { -// MpoolPushMessage(ctx context.Context, msg *types.Message) (*types.SignedMessage, error) -//} - type Manager struct { store *Store - //sm *stmgr.StateManager - sm StateManagerApi + sm StateManagerApi mpool full.MpoolAPI wallet full.WalletAPI @@ -74,85 +65,19 @@ func newManager(sm StateManagerApi, pchstore *Store) *Manager { return &Manager{ store: pchstore, sm: sm, - - //mpool: api.MpoolAPI, - //wallet: api.WalletAPI, - //state: api.StateAPI, } } -func nextLaneFromState(st *paych.State) uint64 { - if len(st.LaneStates) == 0 { - return 0 - } - - maxLane := st.LaneStates[0].ID - for _, state := range st.LaneStates { - if state.ID > maxLane { - maxLane = state.ID - } - } - return maxLane + 1 -} - -func (pm *Manager) TrackInboundChannel(ctx context.Context, ch address.Address) error { - _, st, err := pm.loadPaychState(ctx, ch) - if err != nil { - return err - } - - var account account.State - _, err = pm.sm.LoadActorState(ctx, st.From, &account, nil) - if err != nil { - return err - } - from := account.Address - _, err = pm.sm.LoadActorState(ctx, st.To, &account, nil) - if err != nil { - return err - } - to := account.Address - - return pm.store.TrackChannel(&ChannelInfo{ - Channel: ch, - Control: to, - Target: from, - - Direction: DirInbound, - NextLane: nextLaneFromState(st), - }) -} - -func (pm *Manager) loadOutboundChannelInfo(ctx context.Context, ch address.Address) (*ChannelInfo, error) { - _, st, err := pm.loadPaychState(ctx, ch) - if err != nil { - return nil, err - } - - var account account.State - _, err = pm.sm.LoadActorState(ctx, st.From, &account, nil) - if err != nil { - return nil, err - } - from := account.Address - _, err = pm.sm.LoadActorState(ctx, st.To, &account, nil) - if err != nil { - return nil, err - } - to := account.Address - - return &ChannelInfo{ - Channel: ch, - Control: from, - Target: to, - - Direction: DirOutbound, - NextLane: nextLaneFromState(st), - }, nil -} - func (pm *Manager) TrackOutboundChannel(ctx context.Context, ch address.Address) error { - ci, err := pm.loadOutboundChannelInfo(ctx, ch) + return pm.trackChannel(ctx, ch, DirOutbound) +} + +func (pm *Manager) TrackInboundChannel(ctx context.Context, ch address.Address) error { + return pm.trackChannel(ctx, ch, DirInbound) +} + +func (pm *Manager) trackChannel(ctx context.Context, ch address.Address, dir uint64) error { + ci, err := pm.loadStateChannelInfo(ctx, ch, dir) if err != nil { return err } @@ -170,58 +95,68 @@ func (pm *Manager) GetChannelInfo(addr address.Address) (*ChannelInfo, error) { // checks if the given voucher is valid (is or could become spendable at some point) func (pm *Manager) CheckVoucherValid(ctx context.Context, ch address.Address, sv *paych.SignedVoucher) error { + _, err := pm.checkVoucherValid(ctx, ch, sv) + return err +} + +func (pm *Manager) checkVoucherValid(ctx context.Context, ch address.Address, sv *paych.SignedVoucher) (*paych.State, error) { act, pca, err := pm.loadPaychState(ctx, ch) if err != nil { - return err + return nil, err } var account account.State _, err = pm.sm.LoadActorState(ctx, pca.From, &account, nil) if err != nil { - return err + return nil, err } from := account.Address // verify signature vb, err := sv.SigningBytes() if err != nil { - return err + return nil, err } // TODO: technically, either party may create and sign a voucher. // However, for now, we only accept them from the channel creator. // More complex handling logic can be added later if err := sigs.Verify(sv.Signature, from, vb); err != nil { - return err + return nil, err } sendAmount := sv.Amount - // now check the lane state - // TODO: should check against vouchers in our local store too - // there might be something conflicting - ls := findLane(pca.LaneStates, uint64(sv.Lane)) - if ls == nil { - } else { - if (ls.Nonce) >= sv.Nonce { - return fmt.Errorf("nonce too low") - } - - // TODO: return error if ls.Redeemed > vs.Amount - sendAmount = types.BigSub(sv.Amount, ls.Redeemed) + // Check the voucher against the highest known voucher nonce / value + ls, err := pm.laneState(pca, ch, sv.Lane) + if err != nil { + return nil, err } + // If there has been at least once voucher redeemed, and the voucher + // nonce value is less than the highest known nonce + if ls.Redeemed.Int64() > 0 && sv.Nonce <= ls.Nonce { + return nil, fmt.Errorf("nonce too low") + } + // If the voucher amount is less than the highest known voucher amount + if sv.Amount.LessThanEqual(ls.Redeemed) { + return nil, fmt.Errorf("voucher amount is lower than amount for voucher with lower nonce") + } + + // Only send the difference between the voucher amount and what has already + // been redeemed + sendAmount = types.BigSub(sv.Amount, ls.Redeemed) // TODO: also account for vouchers on other lanes we've received newTotal := types.BigAdd(sendAmount, pca.ToSend) if act.Balance.LessThan(newTotal) { - return fmt.Errorf("not enough funds in channel to cover voucher") + return nil, fmt.Errorf("not enough funds in channel to cover voucher") } if len(sv.Merges) != 0 { - return fmt.Errorf("dont currently support paych lane merges") + return nil, fmt.Errorf("dont currently support paych lane merges") } - return nil + return pca, nil } // checks if the given voucher is currently spendable @@ -289,10 +224,6 @@ func (pm *Manager) getPaychOwner(ctx context.Context, ch address.Address) (addre } func (pm *Manager) AddVoucher(ctx context.Context, ch address.Address, sv *paych.SignedVoucher, proof []byte, minDelta types.BigInt) (types.BigInt, error) { - if err := pm.CheckVoucherValid(ctx, ch, sv); err != nil { - return types.NewInt(0), err - } - pm.store.lk.Lock() defer pm.store.lk.Unlock() @@ -301,25 +232,7 @@ func (pm *Manager) AddVoucher(ctx context.Context, ch address.Address, sv *paych return types.NewInt(0), err } - laneState, err := pm.laneState(ctx, ch, uint64(sv.Lane)) - if err != nil { - return types.NewInt(0), err - } - - // TODO: I believe this check is redundant because - // CheckVoucherValid() already returns an error if laneState.Nonce >= sv.Nonce - if minDelta.GreaterThan(types.NewInt(0)) && laneState.Nonce > sv.Nonce { - return types.NewInt(0), xerrors.Errorf("already storing voucher with higher nonce; %d > %d", laneState.Nonce, sv.Nonce) - } - - // TODO: - // It's possible to repeatedly add a voucher with the same proof: - // 1. add a voucher with proof P1 - // 2. add a voucher with proof P2 - // 3. add a voucher with proof P2 (again) - // Voucher with proof P2 has been added twice - // - // look for duplicates + // Check if the voucher has already been added for i, v := range ci.Vouchers { eq, err := cborutil.Equals(sv, v.Voucher) if err != nil { @@ -328,24 +241,35 @@ func (pm *Manager) AddVoucher(ctx context.Context, ch address.Address, sv *paych if !eq { continue } - // TODO: CBOR encoding / decoding changes nil into []byte{}, so instead of - // checking v.Proof against nil we should check len(v.Proof) == 0 - if v.Proof != nil { - if !bytes.Equal(v.Proof, proof) { - log.Warnf("AddVoucher: multiple proofs for single voucher, storing both") - break + + // This is a duplicate voucher. + // Update the proof on the existing voucher + if len(proof) > 0 && !bytes.Equal(v.Proof, proof) { + log.Warnf("AddVoucher: adding proof to stored voucher") + ci.Vouchers[i] = &VoucherInfo{ + Voucher: v.Voucher, + Proof: proof, } - log.Warnf("AddVoucher: voucher re-added with matching proof") - return types.NewInt(0), nil + + return types.NewInt(0), pm.store.putChannelInfo(ci) } - log.Warnf("AddVoucher: adding proof to stored voucher") - ci.Vouchers[i] = &VoucherInfo{ - Voucher: v.Voucher, - Proof: proof, - } + // Otherwise just ignore the duplicate voucher + log.Warnf("AddVoucher: voucher re-added with matching proof") + return types.NewInt(0), nil + } - return types.NewInt(0), pm.store.putChannelInfo(ci) + // Check voucher validity + pchState, err := pm.checkVoucherValid(ctx, ch, sv) + if err != nil { + return types.NewInt(0), err + } + + // The change in value is the delta between the voucher amount and + // the highest previous voucher amount + laneState, err := pm.laneState(pchState, ch, sv.Lane) + if err != nil { + return types.NewInt(0), err } delta := types.BigSub(sv.Amount, laneState.Redeemed) diff --git a/paychmgr/paych_test.go b/paychmgr/paych_test.go index b735a22b1..301621d7b 100644 --- a/paychmgr/paych_test.go +++ b/paychmgr/paych_test.go @@ -435,7 +435,7 @@ func TestAddVoucherProof(t *testing.T) { // Add same voucher with proof proof = []byte{1} - _, err = mgr.AddVoucher(ctx, ch, sv, nil, minDelta) + _, err = mgr.AddVoucher(ctx, ch, sv, proof, minDelta) require.NoError(t, err) // Should add proof to existing voucher @@ -478,31 +478,32 @@ func TestNextNonceForLane(t *testing.T) { voucherAmount = big.NewInt(2) // Add vouchers such that we have - // lane 1: nonce 3 // lane 1: nonce 2 - // lane 2: nonce 5 + // lane 1: nonce 4 + // lane 2: nonce 7 voucherLane := uint64(1) - for _, nonce := range []uint64{3, 2} { + for _, nonce := range []uint64{2, 4} { + voucherAmount = big.Add(voucherAmount, big.NewInt(1)) sv := testCreateVoucher(t, voucherLane, nonce, voucherAmount, key) _, err := mgr.AddVoucher(ctx, ch, sv, nil, minDelta) require.NoError(t, err) } voucherLane = uint64(2) - nonce := uint64(5) + nonce := uint64(7) sv := testCreateVoucher(t, voucherLane, nonce, voucherAmount, key) _, err = mgr.AddVoucher(ctx, ch, sv, nil, minDelta) require.NoError(t, err) - // Expect next nonce for lane 1 to be 4 + // Expect next nonce for lane 1 to be 5 next, err = mgr.NextNonceForLane(ctx, ch, 1) require.NoError(t, err) - require.EqualValues(t, next, 4) + require.EqualValues(t, next, 5) - // Expect next nonce for lane 2 to be 6 + // Expect next nonce for lane 2 to be 8 next, err = mgr.NextNonceForLane(ctx, ch, 2) require.NoError(t, err) - require.EqualValues(t, next, 6) + require.EqualValues(t, next, 8) } func testSetupMgrWithChannel(t *testing.T, ctx context.Context) (*Manager, address.Address, []byte) { diff --git a/paychmgr/simple.go b/paychmgr/simple.go index d0dee5e19..4d275a1a7 100644 --- a/paychmgr/simple.go +++ b/paychmgr/simple.go @@ -75,7 +75,7 @@ func (pm *Manager) waitForPaychCreateMsg(ctx context.Context, mcid cid.Cid) { } paychaddr := decodedReturn.RobustAddress - ci, err := pm.loadOutboundChannelInfo(ctx, paychaddr) + ci, err := pm.loadStateChannelInfo(ctx, paychaddr, DirOutbound) if err != nil { log.Errorf("loading channel info: %w", err) return diff --git a/paychmgr/state.go b/paychmgr/state.go index 6aff6bd9e..dd67ee84b 100644 --- a/paychmgr/state.go +++ b/paychmgr/state.go @@ -3,6 +3,8 @@ package paychmgr import ( "context" + "github.com/filecoin-project/specs-actors/actors/builtin/account" + "github.com/filecoin-project/go-address" "github.com/filecoin-project/specs-actors/actors/builtin/paych" xerrors "golang.org/x/xerrors" @@ -20,6 +22,55 @@ func (pm *Manager) loadPaychState(ctx context.Context, ch address.Address) (*typ return act, &pcast, nil } +func (pm *Manager) loadStateChannelInfo(ctx context.Context, ch address.Address, dir uint64) (*ChannelInfo, error) { + _, st, err := pm.loadPaychState(ctx, ch) + if err != nil { + return nil, err + } + + var account account.State + _, err = pm.sm.LoadActorState(ctx, st.From, &account, nil) + if err != nil { + return nil, err + } + from := account.Address + _, err = pm.sm.LoadActorState(ctx, st.To, &account, nil) + if err != nil { + return nil, err + } + to := account.Address + + ci := &ChannelInfo{ + Channel: ch, + Direction: dir, + NextLane: nextLaneFromState(st), + } + + if dir == DirOutbound { + ci.Control = from + ci.Target = to + } else { + ci.Control = to + ci.Target = from + } + + return ci, nil +} + +func nextLaneFromState(st *paych.State) uint64 { + if len(st.LaneStates) == 0 { + return 0 + } + + maxLane := st.LaneStates[0].ID + for _, state := range st.LaneStates { + if state.ID > maxLane { + maxLane = state.ID + } + } + return maxLane + 1 +} + func findLane(states []*paych.LaneState, lane uint64) *paych.LaneState { var ls *paych.LaneState for _, laneState := range states { @@ -31,16 +82,12 @@ func findLane(states []*paych.LaneState, lane uint64) *paych.LaneState { return ls } -func (pm *Manager) laneState(ctx context.Context, ch address.Address, lane uint64) (paych.LaneState, error) { - _, state, err := pm.loadPaychState(ctx, ch) - if err != nil { - return paych.LaneState{}, err - } - +func (pm *Manager) laneState(state *paych.State, ch address.Address, lane uint64) (paych.LaneState, error) { // TODO: we probably want to call UpdateChannelState with all vouchers to be fully correct // (but technically dont't need to) // TODO: make sure this is correct + // Get the lane state from the chain ls := findLane(state.LaneStates, lane) if ls == nil { ls = &paych.LaneState{ @@ -50,6 +97,7 @@ func (pm *Manager) laneState(ctx context.Context, ch address.Address, lane uint6 } } + // Apply locally stored vouchers vouchers, err := pm.store.VouchersForPaych(ch) if err != nil { if err == ErrChannelNotTracked { From b888385ba13e1318b33830e6aad09ae552ed89f9 Mon Sep 17 00:00:00 2001 From: Dirk McCormick Date: Thu, 9 Jul 2020 18:49:43 -0400 Subject: [PATCH 13/25] refactor: lint fixes --- paychmgr/paych.go | 10 +++++----- paychmgr/paych_test.go | 29 +++++++++++++++-------------- paychmgr/state.go | 2 +- paychmgr/store_test.go | 4 ++-- 4 files changed, 23 insertions(+), 22 deletions(-) diff --git a/paychmgr/paych.go b/paychmgr/paych.go index 12ab70bfc..740a97103 100644 --- a/paychmgr/paych.go +++ b/paychmgr/paych.go @@ -93,7 +93,7 @@ func (pm *Manager) GetChannelInfo(addr address.Address) (*ChannelInfo, error) { return pm.store.getChannelInfo(addr) } -// checks if the given voucher is valid (is or could become spendable at some point) +// CheckVoucherValid checks if the given voucher is valid (is or could become spendable at some point) func (pm *Manager) CheckVoucherValid(ctx context.Context, ch address.Address, sv *paych.SignedVoucher) error { _, err := pm.checkVoucherValid(ctx, ch, sv) return err @@ -159,7 +159,7 @@ func (pm *Manager) checkVoucherValid(ctx context.Context, ch address.Address, sv return pca, nil } -// checks if the given voucher is currently spendable +// CheckVoucherSpendable checks if the given voucher is currently spendable func (pm *Manager) CheckVoucherSpendable(ctx context.Context, ch address.Address, sv *paych.SignedVoucher, secret []byte, proof []byte) (bool, error) { owner, err := pm.getPaychOwner(ctx, ch) if err != nil { @@ -321,9 +321,9 @@ func (pm *Manager) NextNonceForLane(ctx context.Context, ch address.Address, lan var maxnonce uint64 for _, v := range vouchers { - if uint64(v.Voucher.Lane) == lane { - if uint64(v.Voucher.Nonce) > maxnonce { - maxnonce = uint64(v.Voucher.Nonce) + if v.Voucher.Lane == lane { + if v.Voucher.Nonce > maxnonce { + maxnonce = v.Voucher.Nonce } } } diff --git a/paychmgr/paych_test.go b/paychmgr/paych_test.go index 301621d7b..e39f43335 100644 --- a/paychmgr/paych_test.go +++ b/paychmgr/paych_test.go @@ -93,8 +93,8 @@ func TestPaychOutbound(t *testing.T) { toAcct := tutils.NewIDAddr(t, 202) sm := newMockStateManager() - sm.setAccountState(fromAcct, account.State{from}) - sm.setAccountState(toAcct, account.State{to}) + sm.setAccountState(fromAcct, account.State{Address: from}) + sm.setAccountState(toAcct, account.State{Address: to}) sm.setPaychState(ch, nil, paych.State{ From: fromAcct, To: toAcct, @@ -129,8 +129,8 @@ func TestPaychInbound(t *testing.T) { toAcct := tutils.NewIDAddr(t, 202) sm := newMockStateManager() - sm.setAccountState(fromAcct, account.State{from}) - sm.setAccountState(toAcct, account.State{to}) + sm.setAccountState(fromAcct, account.State{Address: from}) + sm.setAccountState(toAcct, account.State{Address: to}) sm.setPaychState(ch, nil, paych.State{ From: fromAcct, To: toAcct, @@ -167,8 +167,8 @@ func TestCheckVoucherValid(t *testing.T) { toAcct := tutils.NewActorAddr(t, "toAct") sm := newMockStateManager() - sm.setAccountState(fromAcct, account.State{from}) - sm.setAccountState(toAcct, account.State{to}) + sm.setAccountState(fromAcct, account.State{Address: from}) + sm.setAccountState(toAcct, account.State{Address: to}) tcases := []struct { name string @@ -281,6 +281,7 @@ func TestCheckVoucherValid(t *testing.T) { }} for _, tcase := range tcases { + tcase := tcase t.Run(tcase.name, func(t *testing.T) { store := NewStore(ds_sync.MutexWrap(ds.NewMapDatastore())) @@ -319,7 +320,7 @@ func TestAddVoucherDelta(t *testing.T) { ctx := context.Background() // Set up a manager with a single payment channel - mgr, ch, fromKeyPrivate := testSetupMgrWithChannel(t, ctx) + mgr, ch, fromKeyPrivate := testSetupMgrWithChannel(ctx, t) voucherLane := uint64(1) @@ -361,7 +362,7 @@ func TestAddVoucherNextLane(t *testing.T) { ctx := context.Background() // Set up a manager with a single payment channel - mgr, ch, fromKeyPrivate := testSetupMgrWithChannel(t, ctx) + mgr, ch, fromKeyPrivate := testSetupMgrWithChannel(ctx, t) minDelta := big.NewInt(0) voucherAmount := big.NewInt(2) @@ -402,7 +403,7 @@ func TestAddVoucherProof(t *testing.T) { ctx := context.Background() // Set up a manager with a single payment channel - mgr, ch, fromKeyPrivate := testSetupMgrWithChannel(t, ctx) + mgr, ch, fromKeyPrivate := testSetupMgrWithChannel(ctx, t) nonce := uint64(1) voucherAmount := big.NewInt(1) @@ -449,7 +450,7 @@ func TestAllocateLane(t *testing.T) { ctx := context.Background() // Set up a manager with a single payment channel - mgr, ch, _ := testSetupMgrWithChannel(t, ctx) + mgr, ch, _ := testSetupMgrWithChannel(ctx, t) // First lane should be 0 lane, err := mgr.AllocateLane(ch) @@ -466,7 +467,7 @@ func TestNextNonceForLane(t *testing.T) { ctx := context.Background() // Set up a manager with a single payment channel - mgr, ch, key := testSetupMgrWithChannel(t, ctx) + mgr, ch, key := testSetupMgrWithChannel(ctx, t) // Expect next nonce for non-existent lane to be 1 next, err := mgr.NextNonceForLane(ctx, ch, 1) @@ -506,7 +507,7 @@ func TestNextNonceForLane(t *testing.T) { require.EqualValues(t, next, 8) } -func testSetupMgrWithChannel(t *testing.T, ctx context.Context) (*Manager, address.Address, []byte) { +func testSetupMgrWithChannel(ctx context.Context, t *testing.T) (*Manager, address.Address, []byte) { fromKeyPrivate, fromKeyPublic := testGenerateKeyPair(t) ch := tutils.NewIDAddr(t, 100) @@ -516,8 +517,8 @@ func testSetupMgrWithChannel(t *testing.T, ctx context.Context) (*Manager, addre toAcct := tutils.NewActorAddr(t, "toAct") sm := newMockStateManager() - sm.setAccountState(fromAcct, account.State{from}) - sm.setAccountState(toAcct, account.State{to}) + sm.setAccountState(fromAcct, account.State{Address: from}) + sm.setAccountState(toAcct, account.State{Address: to}) act := &types.Actor{ Code: builtin.AccountActorCodeID, diff --git a/paychmgr/state.go b/paychmgr/state.go index dd67ee84b..3012be9cc 100644 --- a/paychmgr/state.go +++ b/paychmgr/state.go @@ -74,7 +74,7 @@ func nextLaneFromState(st *paych.State) uint64 { func findLane(states []*paych.LaneState, lane uint64) *paych.LaneState { var ls *paych.LaneState for _, laneState := range states { - if uint64(laneState.ID) == lane { + if laneState.ID == lane { ls = laneState break } diff --git a/paychmgr/store_test.go b/paychmgr/store_test.go index 6ef407f4f..094226464 100644 --- a/paychmgr/store_test.go +++ b/paychmgr/store_test.go @@ -60,7 +60,7 @@ func TestStore(t *testing.T) { require.Len(t, vouchers, 1) // Requesting voucher for non-existent channel should error - vouchers, err = store.VouchersForPaych(tutils.NewIDAddr(t, 300)) + _, err = store.VouchersForPaych(tutils.NewIDAddr(t, 300)) require.Equal(t, err, ErrChannelNotTracked) // Allocate lane for channel @@ -74,7 +74,7 @@ func TestStore(t *testing.T) { require.Equal(t, lane, uint64(1)) // Allocate next lane for non-existent channel should error - lane, err = store.AllocateLane(tutils.NewIDAddr(t, 300)) + _, err = store.AllocateLane(tutils.NewIDAddr(t, 300)) require.Equal(t, err, ErrChannelNotTracked) } From a3602d4fcf31a49ccf6069b34b7931b5456d41a7 Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Thu, 9 Jul 2020 17:26:06 -0700 Subject: [PATCH 14/25] fix tests, handle parsing with suffixes properly --- chain/types/bigint_test.go | 2 +- chain/types/fil.go | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/chain/types/bigint_test.go b/chain/types/bigint_test.go index 43e5633b2..60130277e 100644 --- a/chain/types/bigint_test.go +++ b/chain/types/bigint_test.go @@ -43,7 +43,7 @@ func TestBigIntSerializationRoundTrip(t *testing.T) { func TestFilRoundTrip(t *testing.T) { testValues := []string{ - "0", "1", "1.001", "100.10001", "101100", "5000.01", "5000", + "0 FIL", "1 FIL", "1.001 FIL", "100.10001 FIL", "101100 FIL", "5000.01 FIL", "5000 FIL", } for _, v := range testValues { diff --git a/chain/types/fil.go b/chain/types/fil.go index 941cfbaa9..1d912d9c0 100644 --- a/chain/types/fil.go +++ b/chain/types/fil.go @@ -13,7 +13,7 @@ type FIL BigInt func (f FIL) String() string { r := new(big.Rat).SetFrac(f.Int, big.NewInt(int64(build.FilecoinPrecision))) if r.Sign() == 0 { - return "0" + return "0 FIL" } return strings.TrimRight(strings.TrimRight(r.FloatString(18), "0"), ".") + " FIL" } @@ -29,6 +29,7 @@ func (f FIL) Format(s fmt.State, ch rune) { func ParseFIL(s string) (FIL, error) { suffix := strings.TrimLeft(s, ".1234567890") + s = s[:len(s)-len(suffix)] var attofil bool if suffix != "" { norm := strings.ToLower(strings.TrimSpace(suffix)) From 65648e6469a928aee578a76007397e211aa9438c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Fri, 10 Jul 2020 16:13:35 +0200 Subject: [PATCH 15/25] Fix seal-worker init --- cmd/lotus-seal-worker/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/lotus-seal-worker/main.go b/cmd/lotus-seal-worker/main.go index ff45687f8..2ec186eaa 100644 --- a/cmd/lotus-seal-worker/main.go +++ b/cmd/lotus-seal-worker/main.go @@ -236,7 +236,7 @@ var runCmd = &cli.Command{ { // init datastore for r.Exists - _, err := lr.Datastore("/") + _, err := lr.Datastore("/metadata") if err != nil { return err } From 0dbca1377aa0c34a81024a0b95b94198773de3e7 Mon Sep 17 00:00:00 2001 From: Anton Evangelatov Date: Fri, 10 Jul 2020 17:52:44 +0200 Subject: [PATCH 16/25] disable call to StatePledgeCollateral --- tools/stats/metrics.go | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/tools/stats/metrics.go b/tools/stats/metrics.go index 626363731..d0778ba1a 100644 --- a/tools/stats/metrics.go +++ b/tools/stats/metrics.go @@ -176,17 +176,18 @@ func (ht *apiIpldStore) Put(ctx context.Context, v interface{}) (cid.Cid, error) } func RecordTipsetStatePoints(ctx context.Context, api api.FullNode, pl *PointList, tipset *types.TipSet) error { - pc, err := api.StatePledgeCollateral(ctx, tipset.Key()) - if err != nil { - return err - } - attoFil := types.NewInt(build.FilecoinPrecision).Int - pcFil := new(big.Rat).SetFrac(pc.Int, attoFil) - pcFilFloat, _ := pcFil.Float64() - p := NewPoint("chain.pledge_collateral", pcFilFloat) - pl.AddPoint(p) + //TODO: StatePledgeCollateral API is not implemented and is commented out - re-enable this block once the API is implemented again. + //pc, err := api.StatePledgeCollateral(ctx, tipset.Key()) + //if err != nil { + //return err + //} + + //pcFil := new(big.Rat).SetFrac(pc.Int, attoFil) + //pcFilFloat, _ := pcFil.Float64() + //p := NewPoint("chain.pledge_collateral", pcFilFloat) + //pl.AddPoint(p) netBal, err := api.WalletBalance(ctx, builtin.RewardActorAddr) if err != nil { @@ -195,7 +196,7 @@ func RecordTipsetStatePoints(ctx context.Context, api api.FullNode, pl *PointLis netBalFil := new(big.Rat).SetFrac(netBal.Int, attoFil) netBalFilFloat, _ := netBalFil.Float64() - p = NewPoint("network.balance", netBalFilFloat) + p := NewPoint("network.balance", netBalFilFloat) pl.AddPoint(p) totalPower, err := api.StateMinerPower(ctx, address.Address{}, tipset.Key()) From 33322906265f8756a1cb8ea9b15fdadad8851492 Mon Sep 17 00:00:00 2001 From: Mike Greenberg Date: Fri, 10 Jul 2020 11:54:11 -0400 Subject: [PATCH 17/25] fix: Logic bug; Schema column update stateroot -> state_root --- cmd/lotus-chainwatch/storage.go | 12 ++++++------ cmd/lotus-chainwatch/sync.go | 4 ++-- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/cmd/lotus-chainwatch/storage.go b/cmd/lotus-chainwatch/storage.go index c48435829..8a4215bd2 100644 --- a/cmd/lotus-chainwatch/storage.go +++ b/cmd/lotus-chainwatch/storage.go @@ -299,7 +299,7 @@ create table if not exists miner_info */ create table if not exists chain_power ( - stateroot text not null + state_root text not null constraint chain_power_pk primary key, baseline_power text not null @@ -311,11 +311,11 @@ create table if not exists chain_power create table if not exists miner_power ( miner_id text not null, - stateroot text not null, + state_root text not null, raw_bytes_power text not null, quality_adjusted_power text not null, constraint miner_power_pk - primary key (miner_id, stateroot) + primary key (miner_id, state_root) ); /* used to tell when a miners sectors (proven-not-yet-expired) changed if the miner_sectors_cid's are different a new sector was added or removed (terminated/expired) */ @@ -535,7 +535,7 @@ func (st *storage) storeChainPower(rewardTips map[types.TipSetKey]*rewardStateIn return xerrors.Errorf("prep chain_power temp: %w", err) } - stmt, err := tx.Prepare(`copy cp (stateroot, baseline_power) from STDIN`) + stmt, err := tx.Prepare(`copy cp (state_root, baseline_power) from STDIN`) if err != nil { return xerrors.Errorf("prepare tmp chain_power: %w", err) } @@ -545,7 +545,7 @@ func (st *storage) storeChainPower(rewardTips map[types.TipSetKey]*rewardStateIn rewardState.stateroot.String(), rewardState.baselinePower.String(), ); err != nil { - log.Errorw("failed to store chain power", "stateroot", rewardState.stateroot, "error", err) + log.Errorw("failed to store chain power", "state_root", rewardState.stateroot, "error", err) } } @@ -682,7 +682,7 @@ func (st *storage) storeMinerPower(minerTips map[types.TipSetKey][]*minerStateIn return xerrors.Errorf("prep miner_power temp: %w", err) } - stmt, err := tx.Prepare(`copy cp (miner_id, stateroot, raw_bytes_power, quality_adjusted_power) from STDIN`) + stmt, err := tx.Prepare(`copy mp (miner_id, state_root, raw_bytes_power, quality_adjusted_power) from STDIN`) if err != nil { return xerrors.Errorf("prepare tmp miner_power: %w", err) } diff --git a/cmd/lotus-chainwatch/sync.go b/cmd/lotus-chainwatch/sync.go index 4ba4ea33c..442fd9c0c 100644 --- a/cmd/lotus-chainwatch/sync.go +++ b/cmd/lotus-chainwatch/sync.go @@ -317,7 +317,7 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types. for addr, m := range actors { for actor, c := range m { // reward actor - if actor.Code != builtin.RewardActorCodeID { + if actor.Code == builtin.RewardActorCodeID { rewardTips[c.tsKey] = &rewardStateInfo{ stateroot: c.stateroot, baselinePower: big.Zero(), @@ -326,7 +326,7 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types. } // miner actors with head change events - if actor.Code != builtin.StorageMinerActorCodeID { + if actor.Code == builtin.StorageMinerActorCodeID { if _, found := headsSeen[actor.Head]; found { continue } From ebd0e93a9943e450bd757c698355df913e31e9b4 Mon Sep 17 00:00:00 2001 From: Mike Greenberg Date: Fri, 10 Jul 2020 11:58:41 -0400 Subject: [PATCH 18/25] chainwatch: app exit with non-zero on error --- cmd/lotus-chainwatch/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/lotus-chainwatch/main.go b/cmd/lotus-chainwatch/main.go index b5ceb7348..d3c5a570b 100644 --- a/cmd/lotus-chainwatch/main.go +++ b/cmd/lotus-chainwatch/main.go @@ -51,7 +51,7 @@ func main() { if err := app.Run(os.Args); err != nil { log.Warnf("%+v", err) - return + os.Exit(1) } } From ede57c0b12da22790eec9cf54e444dbf3e491ccd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Fri, 10 Jul 2020 18:33:26 +0200 Subject: [PATCH 19/25] Fix retrieval datastore-key-not-found error --- node/repo/importmgr/mbstore.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/node/repo/importmgr/mbstore.go b/node/repo/importmgr/mbstore.go index 889752cf2..3b6058bee 100644 --- a/node/repo/importmgr/mbstore.go +++ b/node/repo/importmgr/mbstore.go @@ -8,7 +8,6 @@ import ( blocks "github.com/ipfs/go-block-format" "github.com/ipfs/go-cid" - "github.com/ipfs/go-datastore" blockstore "github.com/ipfs/go-ipfs-blockstore" ) @@ -63,7 +62,7 @@ func (m *multiReadBs) Get(cid cid.Cid) (blocks.Block, error) { } if merr == nil { - return nil, datastore.ErrNotFound + return nil, blockstore.ErrNotFound } return nil, merr From 5411ebb806908c8c989fd7d32fe91bc6916fa5f1 Mon Sep 17 00:00:00 2001 From: Ingar Shu Date: Thu, 9 Jul 2020 11:47:12 -0700 Subject: [PATCH 20/25] Add maxPrice option to client retrieval - Use best offer if multiple offers returned --- cli/client.go | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/cli/client.go b/cli/client.go index 8c2ad7eb8..954ec0af5 100644 --- a/cli/client.go +++ b/cli/client.go @@ -502,6 +502,10 @@ var clientRetrieveCmd = &cli.Command{ Name: "miner", Usage: "miner address for retrieval, if not present it'll use local discovery", }, + &cli.StringFlag{ + Name: "maxPrice", + Usage: "maximum price the client is willing to consider", + }, &cli.StringFlag{ Name: "pieceCid", Usage: "require data to be retrieved from a specific Piece CID", @@ -560,6 +564,11 @@ var clientRetrieveCmd = &cli.Command{ minerStrAddr := cctx.String("miner") if minerStrAddr == "" { // Local discovery offers, err := fapi.ClientFindData(ctx, file, pieceCid) + + // sort by price low to high + sort.Slice(offers, func(i, j int) bool { + return offers[i].MinPrice.LessThan(offers[j].MinPrice) + }) if err != nil { return err } @@ -584,6 +593,17 @@ var clientRetrieveCmd = &cli.Command{ return fmt.Errorf("The received offer errored: %s", offer.Err) } + if cctx.String("maxPrice") != "" { + maxPrice, err := types.ParseFIL(cctx.String("maxPrice")) + if err != nil { + return err + } + + if offer.MinPrice.GreaterThan(types.BigInt(maxPrice)) { + return xerrors.Errorf("Failed to find offer satisfying maxPrice: %w", maxPrice) + } + } + ref := &lapi.FileRef{ Path: cctx.Args().Get(1), IsCAR: cctx.Bool("car"), From 7929a335c3cd4fafd9960e78f3f3c9fbf7ec772f Mon Sep 17 00:00:00 2001 From: Ingar Shu Date: Fri, 10 Jul 2020 10:04:41 -0700 Subject: [PATCH 21/25] Add default max price, fix error messaging --- cli/client.go | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/cli/client.go b/cli/client.go index 954ec0af5..db48e7c9a 100644 --- a/cli/client.go +++ b/cli/client.go @@ -485,6 +485,8 @@ var clientFindCmd = &cli.Command{ }, } +const DefaultMaxRetrievePrice = 1 + var clientRetrieveCmd = &cli.Command{ Name: "retrieve", Usage: "retrieve data from network", @@ -504,7 +506,7 @@ var clientRetrieveCmd = &cli.Command{ }, &cli.StringFlag{ Name: "maxPrice", - Usage: "maximum price the client is willing to consider", + Usage: fmt.Sprintf("maximum price the client is willing to consider (default: %d FIL)", DefaultMaxRetrievePrice), }, &cli.StringFlag{ Name: "pieceCid", @@ -593,15 +595,19 @@ var clientRetrieveCmd = &cli.Command{ return fmt.Errorf("The received offer errored: %s", offer.Err) } + maxPrice := types.NewInt(DefaultMaxRetrievePrice) + if cctx.String("maxPrice") != "" { - maxPrice, err := types.ParseFIL(cctx.String("maxPrice")) + maxPriceFil, err := types.ParseFIL(cctx.String("maxPrice")) if err != nil { - return err + return xerrors.Errorf("parsing maxPrice: %w", err) } - if offer.MinPrice.GreaterThan(types.BigInt(maxPrice)) { - return xerrors.Errorf("Failed to find offer satisfying maxPrice: %w", maxPrice) - } + maxPrice = types.BigInt(maxPriceFil) + } + + if offer.MinPrice.GreaterThan(maxPrice) { + return xerrors.Errorf("failed to find offer satisfying maxPrice: %s", maxPrice) } ref := &lapi.FileRef{ From f07c7377b6ea3fe4d065137b7122f9765ab6fcb7 Mon Sep 17 00:00:00 2001 From: Dirk McCormick Date: Fri, 10 Jul 2020 14:06:52 -0400 Subject: [PATCH 22/25] feat: account for other vouchers when calculating voucher validity --- paychmgr/paych.go | 67 ++++++++++------ paychmgr/paych_test.go | 178 ++++++++++++++++++++++++++++++++++++++--- paychmgr/state.go | 78 +++++++++++------- 3 files changed, 258 insertions(+), 65 deletions(-) diff --git a/paychmgr/paych.go b/paychmgr/paych.go index 740a97103..64ce6e3a7 100644 --- a/paychmgr/paych.go +++ b/paychmgr/paych.go @@ -5,6 +5,8 @@ import ( "context" "fmt" + "github.com/filecoin-project/specs-actors/actors/abi/big" + "github.com/filecoin-project/lotus/api" cborutil "github.com/filecoin-project/go-cbor-util" @@ -99,14 +101,14 @@ func (pm *Manager) CheckVoucherValid(ctx context.Context, ch address.Address, sv return err } -func (pm *Manager) checkVoucherValid(ctx context.Context, ch address.Address, sv *paych.SignedVoucher) (*paych.State, error) { - act, pca, err := pm.loadPaychState(ctx, ch) +func (pm *Manager) checkVoucherValid(ctx context.Context, ch address.Address, sv *paych.SignedVoucher) (map[uint64]*paych.LaneState, error) { + act, pchState, err := pm.loadPaychState(ctx, ch) if err != nil { return nil, err } var account account.State - _, err = pm.sm.LoadActorState(ctx, pca.From, &account, nil) + _, err = pm.sm.LoadActorState(ctx, pchState.From, &account, nil) if err != nil { return nil, err } @@ -125,29 +127,47 @@ func (pm *Manager) checkVoucherValid(ctx context.Context, ch address.Address, sv return nil, err } - sendAmount := sv.Amount - // Check the voucher against the highest known voucher nonce / value - ls, err := pm.laneState(pca, ch, sv.Lane) + laneStates, err := pm.laneState(pchState, ch) if err != nil { return nil, err } - // If there has been at least once voucher redeemed, and the voucher - // nonce value is less than the highest known nonce - if ls.Redeemed.Int64() > 0 && sv.Nonce <= ls.Nonce { + + // If the new voucher nonce value is less than the highest known + // nonce for the lane + ls, lsExists := laneStates[sv.Lane] + if lsExists && sv.Nonce <= ls.Nonce { return nil, fmt.Errorf("nonce too low") } + // If the voucher amount is less than the highest known voucher amount - if sv.Amount.LessThanEqual(ls.Redeemed) { + if lsExists && sv.Amount.LessThanEqual(ls.Redeemed) { return nil, fmt.Errorf("voucher amount is lower than amount for voucher with lower nonce") } - // Only send the difference between the voucher amount and what has already - // been redeemed - sendAmount = types.BigSub(sv.Amount, ls.Redeemed) + // Total redeemed is the total redeemed amount for all lanes, including + // the new voucher + // eg + // + // lane 1 redeemed: 3 + // lane 2 redeemed: 2 + // voucher for lane 1: 5 + // + // Voucher supersedes lane 1 redeemed, therefore + // effective lane 1 redeemed: 5 + // + // lane 1: 5 + // lane 2: 2 + // - + // total: 7 + totalRedeemed, err := pm.totalRedeemedWithVoucher(laneStates, sv) + if err != nil { + return nil, err + } - // TODO: also account for vouchers on other lanes we've received - newTotal := types.BigAdd(sendAmount, pca.ToSend) + // Total required balance = total redeemed + toSend + // Must not exceed actor balance + newTotal := types.BigAdd(totalRedeemed, pchState.ToSend) if act.Balance.LessThan(newTotal) { return nil, fmt.Errorf("not enough funds in channel to cover voucher") } @@ -156,7 +176,7 @@ func (pm *Manager) checkVoucherValid(ctx context.Context, ch address.Address, sv return nil, fmt.Errorf("dont currently support paych lane merges") } - return pca, nil + return laneStates, nil } // CheckVoucherSpendable checks if the given voucher is currently spendable @@ -260,21 +280,22 @@ func (pm *Manager) AddVoucher(ctx context.Context, ch address.Address, sv *paych } // Check voucher validity - pchState, err := pm.checkVoucherValid(ctx, ch, sv) + laneStates, err := pm.checkVoucherValid(ctx, ch, sv) if err != nil { return types.NewInt(0), err } // The change in value is the delta between the voucher amount and - // the highest previous voucher amount - laneState, err := pm.laneState(pchState, ch, sv.Lane) - if err != nil { - return types.NewInt(0), err + // the highest previous voucher amount for the lane + laneState, exists := laneStates[sv.Lane] + redeemed := big.NewInt(0) + if exists { + redeemed = laneState.Redeemed } - delta := types.BigSub(sv.Amount, laneState.Redeemed) + delta := types.BigSub(sv.Amount, redeemed) if minDelta.GreaterThan(delta) { - return delta, xerrors.Errorf("addVoucher: supplied token amount too low; minD=%s, D=%s; laneAmt=%s; v.Amt=%s", minDelta, delta, laneState.Redeemed, sv.Amount) + return delta, xerrors.Errorf("addVoucher: supplied token amount too low; minD=%s, D=%s; laneAmt=%s; v.Amt=%s", minDelta, delta, redeemed, sv.Amount) } ci.Vouchers = append(ci.Vouchers, &VoucherInfo{ diff --git a/paychmgr/paych_test.go b/paychmgr/paych_test.go index e39f43335..64e344ea7 100644 --- a/paychmgr/paych_test.go +++ b/paychmgr/paych_test.go @@ -262,21 +262,42 @@ func TestCheckVoucherValid(t *testing.T) { toSend: big.NewInt(9), voucherAmount: big.NewInt(2), }, { - // required balance = toSend + (voucher - redeemed) - // = 0 + (11 - 2) - // = 9 - // So required balance: 9 < actor balance: 10 - name: "passes when voucher - redeemed < balance", + // voucher supersedes lane 1 redeemed so + // lane 1 effective redeemed = voucher amount + // + // required balance = toSend + total redeemed + // = 1 + 6 (lane1) + // = 7 + // So required balance: 7 < actor balance: 10 + name: "passes when voucher + total redeemed <= balance", key: fromKeyPrivate, actorBalance: big.NewInt(10), - toSend: big.NewInt(0), - voucherAmount: big.NewInt(11), + toSend: big.NewInt(1), + voucherAmount: big.NewInt(6), voucherLane: 1, - voucherNonce: 3, + voucherNonce: 2, laneStates: []*paych.LaneState{{ - ID: 1, - Redeemed: big.NewInt(2), - Nonce: 2, + ID: 1, // Lane 1 (same as voucher lane 1) + Redeemed: big.NewInt(4), + Nonce: 1, + }}, + }, { + // required balance = toSend + total redeemed + // = 1 + 4 (lane 2) + 6 (voucher lane 1) + // = 11 + // So required balance: 11 > actor balance: 10 + name: "fails when voucher + total redeemed > balance", + expectError: true, + key: fromKeyPrivate, + actorBalance: big.NewInt(10), + toSend: big.NewInt(1), + voucherAmount: big.NewInt(6), + voucherLane: 1, + voucherNonce: 1, + laneStates: []*paych.LaneState{{ + ID: 2, // Lane 2 (different from voucher lane 1) + Redeemed: big.NewInt(4), + Nonce: 1, }}, }} @@ -316,6 +337,139 @@ func TestCheckVoucherValid(t *testing.T) { } } +func TestCheckVoucherValidCountingAllLanes(t *testing.T) { + ctx := context.Background() + + fromKeyPrivate, fromKeyPublic := testGenerateKeyPair(t) + + ch := tutils.NewIDAddr(t, 100) + from := tutils.NewSECP256K1Addr(t, string(fromKeyPublic)) + to := tutils.NewSECP256K1Addr(t, "secpTo") + fromAcct := tutils.NewActorAddr(t, "fromAct") + toAcct := tutils.NewActorAddr(t, "toAct") + minDelta := big.NewInt(0) + + sm := newMockStateManager() + sm.setAccountState(fromAcct, account.State{Address: from}) + sm.setAccountState(toAcct, account.State{Address: to}) + + store := NewStore(ds_sync.MutexWrap(ds.NewMapDatastore())) + + actorBalance := big.NewInt(10) + toSend := big.NewInt(1) + laneStates := []*paych.LaneState{{ + ID: 1, + Nonce: 1, + Redeemed: big.NewInt(3), + }, { + ID: 2, + Nonce: 1, + Redeemed: big.NewInt(4), + }} + + act := &types.Actor{ + Code: builtin.AccountActorCodeID, + Head: cid.Cid{}, + Nonce: 0, + Balance: actorBalance, + } + sm.setPaychState(ch, act, paych.State{ + From: fromAcct, + To: toAcct, + ToSend: toSend, + SettlingAt: abi.ChainEpoch(0), + MinSettleHeight: abi.ChainEpoch(0), + LaneStates: laneStates, + }) + + mgr := newManager(sm, store) + err := mgr.TrackInboundChannel(ctx, ch) + require.NoError(t, err) + + // + // Should not be possible to add a voucher with a value such that + // + toSend > + // + // lane 1 redeemed: 3 + // voucher amount (lane 1): 6 + // lane 1 redeemed (with voucher): 6 + // + // Lane 1: 6 + // Lane 2: 4 + // toSend: 1 + // -- + // total: 11 + // + // actor balance is 10 so total is too high. + // + voucherLane := uint64(1) + voucherNonce := uint64(2) + voucherAmount := big.NewInt(6) + sv := testCreateVoucher(t, voucherLane, voucherNonce, voucherAmount, fromKeyPrivate) + err = mgr.CheckVoucherValid(ctx, ch, sv) + require.Error(t, err) + + // + // lane 1 redeemed: 3 + // voucher amount (lane 1): 4 + // lane 1 redeemed (with voucher): 4 + // + // Lane 1: 4 + // Lane 2: 4 + // toSend: 1 + // -- + // total: 9 + // + // actor balance is 10 so total is ok. + // + voucherAmount = big.NewInt(4) + sv = testCreateVoucher(t, voucherLane, voucherNonce, voucherAmount, fromKeyPrivate) + err = mgr.CheckVoucherValid(ctx, ch, sv) + require.NoError(t, err) + + // Add voucher to lane 1, so Lane 1 effective redeemed + // (with first voucher) is now 4 + _, err = mgr.AddVoucher(ctx, ch, sv, nil, minDelta) + require.NoError(t, err) + + // + // lane 1 redeemed: 4 + // voucher amount (lane 1): 6 + // lane 1 redeemed (with voucher): 6 + // + // Lane 1: 6 + // Lane 2: 4 + // toSend: 1 + // -- + // total: 11 + // + // actor balance is 10 so total is too high. + // + voucherNonce++ + voucherAmount = big.NewInt(6) + sv = testCreateVoucher(t, voucherLane, voucherNonce, voucherAmount, fromKeyPrivate) + err = mgr.CheckVoucherValid(ctx, ch, sv) + require.Error(t, err) + + // + // lane 1 redeemed: 4 + // voucher amount (lane 1): 5 + // lane 1 redeemed (with voucher): 5 + // + // Lane 1: 5 + // Lane 2: 4 + // toSend: 1 + // -- + // total: 10 + // + // actor balance is 10 so total is ok. + // + voucherAmount = big.NewInt(5) + sv = testCreateVoucher(t, voucherLane, voucherNonce, voucherAmount, fromKeyPrivate) + err = mgr.CheckVoucherValid(ctx, ch, sv) + require.NoError(t, err) +} + func TestAddVoucherDelta(t *testing.T) { ctx := context.Background() @@ -524,7 +678,7 @@ func testSetupMgrWithChannel(ctx context.Context, t *testing.T) (*Manager, addre Code: builtin.AccountActorCodeID, Head: cid.Cid{}, Nonce: 0, - Balance: big.NewInt(10), + Balance: big.NewInt(20), } sm.setPaychState(ch, act, paych.State{ From: fromAcct, diff --git a/paychmgr/state.go b/paychmgr/state.go index 3012be9cc..2c7ca73dc 100644 --- a/paychmgr/state.go +++ b/paychmgr/state.go @@ -3,6 +3,8 @@ package paychmgr import ( "context" + "github.com/filecoin-project/specs-actors/actors/abi/big" + "github.com/filecoin-project/specs-actors/actors/builtin/account" "github.com/filecoin-project/go-address" @@ -71,52 +73,42 @@ func nextLaneFromState(st *paych.State) uint64 { return maxLane + 1 } -func findLane(states []*paych.LaneState, lane uint64) *paych.LaneState { - var ls *paych.LaneState - for _, laneState := range states { - if laneState.ID == lane { - ls = laneState - break - } - } - return ls -} - -func (pm *Manager) laneState(state *paych.State, ch address.Address, lane uint64) (paych.LaneState, error) { +// laneState gets the LaneStates from chain, then applies all vouchers in +// the data store over the chain state +func (pm *Manager) laneState(state *paych.State, ch address.Address) (map[uint64]*paych.LaneState, error) { // TODO: we probably want to call UpdateChannelState with all vouchers to be fully correct // (but technically dont't need to) - // TODO: make sure this is correct + laneStates := make(map[uint64]*paych.LaneState, len(state.LaneStates)) // Get the lane state from the chain - ls := findLane(state.LaneStates, lane) - if ls == nil { - ls = &paych.LaneState{ - ID: lane, - Redeemed: types.NewInt(0), - Nonce: 0, - } + for _, laneState := range state.LaneStates { + laneStates[laneState.ID] = laneState } // Apply locally stored vouchers vouchers, err := pm.store.VouchersForPaych(ch) - if err != nil { - if err == ErrChannelNotTracked { - return *ls, nil - } - return paych.LaneState{}, err + if err != nil && err != ErrChannelNotTracked { + return nil, err } for _, v := range vouchers { for range v.Voucher.Merges { - return paych.LaneState{}, xerrors.Errorf("paych merges not handled yet") + return nil, xerrors.Errorf("paych merges not handled yet") } - if v.Voucher.Lane != lane { - continue + // If there's a voucher for a lane that isn't in chain state just + // create it + ls, ok := laneStates[v.Voucher.Lane] + if !ok { + ls = &paych.LaneState{ + ID: v.Voucher.Lane, + Redeemed: types.NewInt(0), + Nonce: 0, + } + laneStates[v.Voucher.Lane] = ls } if v.Voucher.Nonce < ls.Nonce { - log.Warnf("Found outdated voucher: ch=%s, lane=%d, v.nonce=%d lane.nonce=%d", ch, lane, v.Voucher.Nonce, ls.Nonce) continue } @@ -124,5 +116,31 @@ func (pm *Manager) laneState(state *paych.State, ch address.Address, lane uint64 ls.Redeemed = v.Voucher.Amount } - return *ls, nil + return laneStates, nil +} + +// Get the total redeemed amount across all lanes, after applying the voucher +func (pm *Manager) totalRedeemedWithVoucher(laneStates map[uint64]*paych.LaneState, sv *paych.SignedVoucher) (big.Int, error) { + total := big.NewInt(0) + for _, ls := range laneStates { + total = big.Add(total, ls.Redeemed) + } + + lane, ok := laneStates[sv.Lane] + if ok { + // If the voucher is for an existing lane, and the voucher nonce + // and is higher than the lane nonce + if sv.Nonce > lane.Nonce { + // Add the delta between the redeemed amount and the voucher + // amount to the total + delta := big.Sub(sv.Amount, lane.Redeemed) + total = big.Add(total, delta) + } + } else { + // If the voucher is *not* for an existing lane, just add its + // value (implicitly a new lane will be created for the voucher) + total = big.Add(total, sv.Amount) + } + + return total, nil } From 3fcf7a0344935667ec48d1c400a7971d3f20e27a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Fri, 10 Jul 2020 21:13:52 +0200 Subject: [PATCH 23/25] client: Fix an off-by-10^18 error setting retrieval maxPrice --- cli/client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cli/client.go b/cli/client.go index db48e7c9a..f80ed99ec 100644 --- a/cli/client.go +++ b/cli/client.go @@ -595,7 +595,7 @@ var clientRetrieveCmd = &cli.Command{ return fmt.Errorf("The received offer errored: %s", offer.Err) } - maxPrice := types.NewInt(DefaultMaxRetrievePrice) + maxPrice := types.FromFil(DefaultMaxRetrievePrice) if cctx.String("maxPrice") != "" { maxPriceFil, err := types.ParseFIL(cctx.String("maxPrice")) From a12e5884abe3bb2f7c7ca283452d25e934f8c6ea Mon Sep 17 00:00:00 2001 From: Dirk McCormick Date: Fri, 10 Jul 2020 15:24:13 -0400 Subject: [PATCH 24/25] fix: add TODO about lane merging --- paychmgr/state.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/paychmgr/state.go b/paychmgr/state.go index 2c7ca73dc..7d06a35a4 100644 --- a/paychmgr/state.go +++ b/paychmgr/state.go @@ -121,6 +121,11 @@ func (pm *Manager) laneState(state *paych.State, ch address.Address) (map[uint64 // Get the total redeemed amount across all lanes, after applying the voucher func (pm *Manager) totalRedeemedWithVoucher(laneStates map[uint64]*paych.LaneState, sv *paych.SignedVoucher) (big.Int, error) { + // TODO: merges + if len(sv.Merges) != 0 { + return big.Int{}, xerrors.Errorf("dont currently support paych lane merges") + } + total := big.NewInt(0) for _, ls := range laneStates { total = big.Add(total, ls.Redeemed) From dc9df10f5a0b5adb5d981587badb06c820c79d3f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Fri, 10 Jul 2020 21:31:58 +0200 Subject: [PATCH 25/25] sync: Also rename span attributes --- chain/sync.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/chain/sync.go b/chain/sync.go index 0de624a99..ccc52cf57 100644 --- a/chain/sync.go +++ b/chain/sync.go @@ -1103,8 +1103,8 @@ func (syncer *Syncer) collectHeaders(ctx context.Context, incoming *types.TipSet ss := extractSyncState(ctx) span.AddAttributes( - trace.Int64Attribute("fromHeight", int64(incoming.Height())), - trace.Int64Attribute("toHeight", int64(known.Height())), + trace.Int64Attribute("incomingHeight", int64(incoming.Height())), + trace.Int64Attribute("knownHeight", int64(known.Height())), ) // Check if the parents of the from block are in the denylist.