From 5fa3021a79d8c337816ac3e049b1e2cf4180ac5d Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Wed, 1 Jul 2020 17:29:11 -0700 Subject: [PATCH 01/13] set proper bitwidth for verifreg command hamts --- cmd/lotus-shed/verifreg.go | 25 +++++++++++++++++++------ 1 file changed, 19 insertions(+), 6 deletions(-) diff --git a/cmd/lotus-shed/verifreg.go b/cmd/lotus-shed/verifreg.go index a448fcaf9..b8fd3fe9c 100644 --- a/cmd/lotus-shed/verifreg.go +++ b/cmd/lotus-shed/verifreg.go @@ -3,7 +3,9 @@ package main import ( "bytes" "fmt" + "github.com/filecoin-project/lotus/build" + "golang.org/x/xerrors" "github.com/filecoin-project/go-address" "github.com/urfave/cli/v2" @@ -199,7 +201,7 @@ var verifRegListVerifiersCmd = &cli.Command{ return err } - vh, err := hamt.LoadNode(ctx, cst, st.Verifiers) + vh, err := hamt.LoadNode(ctx, cst, st.Verifiers, hamt.UseTreeBitWidth(5)) if err != nil { return err } @@ -251,11 +253,12 @@ var verifRegListClientsCmd = &cli.Command{ return err } - vh, err := hamt.LoadNode(ctx, cst, st.VerifiedClients) + vh, err := hamt.LoadNode(ctx, cst, st.VerifiedClients, hamt.UseTreeBitWidth(5)) if err != nil { return err } + var keys []string if err := vh.ForEach(ctx, func(k string, val interface{}) error { addr, err := address.NewFromBytes([]byte(k)) if err != nil { @@ -268,13 +271,22 @@ var verifRegListClientsCmd = &cli.Command{ return err } - fmt.Printf("%s: %s\n", addr, dcap) + fmt.Printf("%s: %s %v\n", addr, dcap, []byte(k)) + + keys = append(keys, k) return nil }); err != nil { return err } + for _, k := range keys { + _, err := vh.FindRaw(ctx, k) + if err != nil { + fmt.Println("failed to find key: ", []byte(k), err) + } + } + return nil }, } @@ -312,14 +324,15 @@ var verifRegCheckClientCmd = &cli.Command{ return err } - vh, err := hamt.LoadNode(ctx, cst, st.VerifiedClients) + vh, err := hamt.LoadNode(ctx, cst, st.VerifiedClients, hamt.UseTreeBitWidth(5)) if err != nil { return err } + fmt.Println("key: ", caddr.Bytes()) var dcap verifreg.DataCap if err := vh.Find(ctx, string(caddr.Bytes()), &dcap); err != nil { - return err + return xerrors.Errorf("failed to lookup address: %w", err) } fmt.Println(dcap) @@ -361,7 +374,7 @@ var verifRegCheckVerifierCmd = &cli.Command{ return err } - vh, err := hamt.LoadNode(ctx, cst, st.Verifiers) + vh, err := hamt.LoadNode(ctx, cst, st.Verifiers, hamt.UseTreeBitWidth(5)) if err != nil { return err } From 0285a9741cd01a30405069588f7db3b7b0df2195 Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Tue, 7 Jul 2020 16:43:30 -0700 Subject: [PATCH 02/13] remove debug logs --- cmd/lotus-shed/verifreg.go | 13 +------------ 1 file changed, 1 insertion(+), 12 deletions(-) diff --git a/cmd/lotus-shed/verifreg.go b/cmd/lotus-shed/verifreg.go index b8fd3fe9c..e7187f029 100644 --- a/cmd/lotus-shed/verifreg.go +++ b/cmd/lotus-shed/verifreg.go @@ -258,7 +258,6 @@ var verifRegListClientsCmd = &cli.Command{ return err } - var keys []string if err := vh.ForEach(ctx, func(k string, val interface{}) error { addr, err := address.NewFromBytes([]byte(k)) if err != nil { @@ -271,22 +270,13 @@ var verifRegListClientsCmd = &cli.Command{ return err } - fmt.Printf("%s: %s %v\n", addr, dcap, []byte(k)) - - keys = append(keys, k) + fmt.Printf("%s: %s\n", addr, dcap) return nil }); err != nil { return err } - for _, k := range keys { - _, err := vh.FindRaw(ctx, k) - if err != nil { - fmt.Println("failed to find key: ", []byte(k), err) - } - } - return nil }, } @@ -329,7 +319,6 @@ var verifRegCheckClientCmd = &cli.Command{ return err } - fmt.Println("key: ", caddr.Bytes()) var dcap verifreg.DataCap if err := vh.Find(ctx, string(caddr.Bytes()), &dcap); err != nil { return xerrors.Errorf("failed to lookup address: %w", err) From 8fbeebb86d0b64fe95e3fd835c0f74e8df080037 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Wed, 8 Jul 2020 22:02:28 +0200 Subject: [PATCH 03/13] rename sync variables. --- chain/sync.go | 59 ++++++++++++++++++++++++--------------------------- 1 file changed, 28 insertions(+), 31 deletions(-) diff --git a/chain/sync.go b/chain/sync.go index ba4b73567..0de624a99 100644 --- a/chain/sync.go +++ b/chain/sync.go @@ -1069,17 +1069,14 @@ func extractSyncState(ctx context.Context) *SyncerState { // collectHeaders collects the headers from the blocks between any two tipsets. // -// `from` is the heaviest/projected/target tipset we have learned about, and -// `to` is usually an anchor tipset we already have in our view of the chain +// `incoming` is the heaviest/projected/target tipset we have learned about, and +// `known` is usually an anchor tipset we already have in our view of the chain // (which could be the genesis). // // collectHeaders checks if portions of the chain are in our ChainStore; falling // down to the network to retrieve the missing parts. If during the process, any // portion we receive is in our denylist (bad list), we short-circuit. // -// {hint/naming}: `from` and `to` is in inverse order. `from` is the highest, -// and `to` is the lowest. This method traverses the chain backwards. -// // {hint/usage}: This is used by collectChain, which is in turn called from the // main Sync method (Syncer#Sync), so it's a pretty central method. // @@ -1089,7 +1086,7 @@ func extractSyncState(ctx context.Context) *SyncerState { // bad. // 2. Check the consistency of beacon entries in the from tipset. We check // total equality of the BeaconEntries in each block. -// 3. Travers the chain backwards, for each tipset: +// 3. Traverse the chain backwards, for each tipset: // 3a. Load it from the chainstore; if found, it move on to its parent. // 3b. Query our peers via BlockSync in batches, requesting up to a // maximum of 500 tipsets every time. @@ -1100,40 +1097,40 @@ func extractSyncState(ctx context.Context) *SyncerState { // // All throughout the process, we keep checking if the received blocks are in // the deny list, and short-circuit the process if so. -func (syncer *Syncer) collectHeaders(ctx context.Context, from *types.TipSet, to *types.TipSet) ([]*types.TipSet, error) { +func (syncer *Syncer) collectHeaders(ctx context.Context, incoming *types.TipSet, known *types.TipSet) ([]*types.TipSet, error) { ctx, span := trace.StartSpan(ctx, "collectHeaders") defer span.End() ss := extractSyncState(ctx) span.AddAttributes( - trace.Int64Attribute("fromHeight", int64(from.Height())), - trace.Int64Attribute("toHeight", int64(to.Height())), + trace.Int64Attribute("fromHeight", int64(incoming.Height())), + trace.Int64Attribute("toHeight", int64(known.Height())), ) // Check if the parents of the from block are in the denylist. // i.e. if a fork of the chain has been requested that we know to be bad. - for _, pcid := range from.Parents().Cids() { + for _, pcid := range incoming.Parents().Cids() { if reason, ok := syncer.bad.Has(pcid); ok { newReason := reason.Linked("linked to %s", pcid) - for _, b := range from.Cids() { + for _, b := range incoming.Cids() { syncer.bad.Add(b, newReason) } - return nil, xerrors.Errorf("chain linked to block marked previously as bad (%s, %s) (reason: %s)", from.Cids(), pcid, reason) + return nil, xerrors.Errorf("chain linked to block marked previously as bad (%s, %s) (reason: %s)", incoming.Cids(), pcid, reason) } } { // ensure consistency of beacon entires - targetBE := from.Blocks()[0].BeaconEntries + targetBE := incoming.Blocks()[0].BeaconEntries sorted := sort.SliceIsSorted(targetBE, func(i, j int) bool { return targetBE[i].Round < targetBE[j].Round }) if !sorted { - syncer.bad.Add(from.Cids()[0], NewBadBlockReason(from.Cids(), "wrong order of beacon entires")) + syncer.bad.Add(incoming.Cids()[0], NewBadBlockReason(incoming.Cids(), "wrong order of beacon entires")) return nil, xerrors.Errorf("wrong order of beacon entires") } - for _, bh := range from.Blocks()[1:] { + for _, bh := range incoming.Blocks()[1:] { if len(targetBE) != len(bh.BeaconEntries) { // cannot mark bad, I think @Kubuxu return nil, xerrors.Errorf("tipset contained different number for beacon entires") @@ -1148,12 +1145,12 @@ func (syncer *Syncer) collectHeaders(ctx context.Context, from *types.TipSet, to } } - blockSet := []*types.TipSet{from} + blockSet := []*types.TipSet{incoming} - at := from.Parents() + at := incoming.Parents() // we want to sync all the blocks until the height above the block we have - untilHeight := to.Height() + 1 + untilHeight := known.Height() + 1 ss.SetHeight(blockSet[len(blockSet)-1].Height()) @@ -1168,7 +1165,7 @@ loop: syncer.bad.Add(b, newReason) } - return nil, xerrors.Errorf("chain contained block marked previously as bad (%s, %s) (reason: %s)", from.Cids(), bc, reason) + return nil, xerrors.Errorf("chain contained block marked previously as bad (%s, %s) (reason: %s)", incoming.Cids(), bc, reason) } } @@ -1217,7 +1214,7 @@ loop: syncer.bad.Add(b, newReason) } - return nil, xerrors.Errorf("chain contained block marked previously as bad (%s, %s) (reason: %s)", from.Cids(), bc, reason) + return nil, xerrors.Errorf("chain contained block marked previously as bad (%s, %s) (reason: %s)", incoming.Cids(), bc, reason) } } blockSet = append(blockSet, b) @@ -1229,23 +1226,23 @@ loop: at = blks[len(blks)-1].Parents() } - if !types.CidArrsEqual(blockSet[len(blockSet)-1].Parents().Cids(), to.Cids()) { - last := blockSet[len(blockSet)-1] - if last.Parents() == to.Parents() { + // base is the tipset in the candidate chain at the height equal to our known tipset height. + if base := blockSet[len(blockSet)-1]; !types.CidArrsEqual(base.Parents().Cids(), known.Cids()) { + if base.Parents() == known.Parents() { // common case: receiving a block thats potentially part of the same tipset as our best block return blockSet, nil } // We have now ascertained that this is *not* a 'fast forward' - log.Warnf("(fork detected) synced header chain (%s - %d) does not link to our best block (%s - %d)", from.Cids(), from.Height(), to.Cids(), to.Height()) - fork, err := syncer.syncFork(ctx, last, to) + log.Warnf("(fork detected) synced header chain (%s - %d) does not link to our best block (%s - %d)", incoming.Cids(), incoming.Height(), known.Cids(), known.Height()) + fork, err := syncer.syncFork(ctx, base, known) if err != nil { if xerrors.Is(err, ErrForkTooLong) { // TODO: we're marking this block bad in the same way that we mark invalid blocks bad. Maybe distinguish? log.Warn("adding forked chain to our bad tipset cache") - for _, b := range from.Blocks() { - syncer.bad.Add(b.Cid(), NewBadBlockReason(from.Cids(), "fork past finality")) + for _, b := range incoming.Blocks() { + syncer.bad.Add(b.Cid(), NewBadBlockReason(incoming.Cids(), "fork past finality")) } } return nil, xerrors.Errorf("failed to sync fork: %w", err) @@ -1265,13 +1262,13 @@ var ErrForkTooLong = fmt.Errorf("fork longer than threshold") // If the fork is too long (build.ForkLengthThreshold), we add the entire subchain to the // denylist. Else, we find the common ancestor, and add the missing chain // fragment until the fork point to the returned []TipSet. -func (syncer *Syncer) syncFork(ctx context.Context, from *types.TipSet, to *types.TipSet) ([]*types.TipSet, error) { - tips, err := syncer.Bsync.GetBlocks(ctx, from.Parents(), int(build.ForkLengthThreshold)) +func (syncer *Syncer) syncFork(ctx context.Context, incoming *types.TipSet, known *types.TipSet) ([]*types.TipSet, error) { + tips, err := syncer.Bsync.GetBlocks(ctx, incoming.Parents(), int(build.ForkLengthThreshold)) if err != nil { return nil, err } - nts, err := syncer.store.LoadTipSet(to.Parents()) + nts, err := syncer.store.LoadTipSet(known.Parents()) if err != nil { return nil, xerrors.Errorf("failed to load next local tipset: %w", err) } @@ -1281,7 +1278,7 @@ func (syncer *Syncer) syncFork(ctx context.Context, from *types.TipSet, to *type if !syncer.Genesis.Equals(nts) { return nil, xerrors.Errorf("somehow synced chain that linked back to a different genesis (bad genesis: %s)", nts.Key()) } - return nil, xerrors.Errorf("synced chain forked at genesis, refusing to sync") + return nil, xerrors.Errorf("synced chain forked at genesis, refusing to sync; incoming: %s") } if nts.Equals(tips[cur]) { From b9effac437816baace9a79bedba273798f52d598 Mon Sep 17 00:00:00 2001 From: Mike Greenberg Date: Wed, 8 Jul 2020 18:30:24 -0400 Subject: [PATCH 04/13] feat(chainwatch): Capture baseline power in chain_power --- cmd/lotus-chainwatch/storage.go | 51 ++++++++++++++++ cmd/lotus-chainwatch/sync.go | 103 +++++++++++++++++++++++--------- 2 files changed, 126 insertions(+), 28 deletions(-) diff --git a/cmd/lotus-chainwatch/storage.go b/cmd/lotus-chainwatch/storage.go index db2b5abfd..9647c9f47 100644 --- a/cmd/lotus-chainwatch/storage.go +++ b/cmd/lotus-chainwatch/storage.go @@ -294,6 +294,17 @@ create table if not exists miner_info primary key (miner_id) ); +/* +* captures chain-specific power state for any given stateroot +*/ +create table if not exists chain_power +( + stateroot text not null + constraint chain_power_pk + primary key, + baseline_power text not null +); + /* used to tell when a miners sectors (proven-not-yet-expired) changed if the miner_sectors_cid's are different a new sector was added or removed (terminated/expired) */ create table if not exists miner_sectors_heads ( @@ -500,6 +511,46 @@ func (st *storage) storeActors(actors map[address.Address]map[types.Actor]actorI return nil } +// storeChainPower captures reward actor state as it relates to power captured on-chain +func (st *storage) storeChainPower(rewardTips map[types.TipSetKey]*rewardStateInfo) error { + tx, err := st.db.Begin() + if err != nil { + return xerrors.Errorf("begin chain_power tx: %w", err) + } + + if _, err := tx.Exec(`create temp table cp (like chain_power excluding constraints) on commit drop`); err != nil { + return xerrors.Errorf("prep chain_power temp: %w", err) + } + + stmt, err := tx.Prepare(`copy cp (stateroot, baseline_power) from STDIN`) + if err != nil { + return xerrors.Errorf("prepare tmp chain_power: %w", err) + } + + for _, rewardState := range rewardTips { + if _, err := stmt.Exec( + rewardState.stateroot.String(), + rewardState.baselinePower.String(), + ); err != nil { + return xerrors.Errorf("exec prepared chain_power: %w", err) + } + } + + if err := stmt.Close(); err != nil { + return xerrors.Errorf("close prepared chain_power: %w", err) + } + + if _, err := tx.Exec(`insert into chain_power select * from cp on conflict do nothing`); err != nil { + return xerrors.Errorf("insert chain_power from tmp: %w", err) + } + + if err := tx.Commit(); err != nil { + return xerrors.Errorf("commit chain_power tx: %w", err) + } + + return nil +} + type storeSectorsAPI interface { StateMinerSectors(context.Context, address.Address, *abi.BitField, bool, types.TipSetKey) ([]*api.ChainSectorInfo, error) } diff --git a/cmd/lotus-chainwatch/sync.go b/cmd/lotus-chainwatch/sync.go index bbefb3e2c..fe4970ffb 100644 --- a/cmd/lotus-chainwatch/sync.go +++ b/cmd/lotus-chainwatch/sync.go @@ -13,12 +13,14 @@ import ( "github.com/filecoin-project/go-address" "github.com/ipfs/go-cid" cbg "github.com/whyrusleeping/cbor-gen" + "golang.org/x/xerrors" "github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/abi/big" "github.com/filecoin-project/specs-actors/actors/builtin" "github.com/filecoin-project/specs-actors/actors/builtin/miner" "github.com/filecoin-project/specs-actors/actors/builtin/power" + "github.com/filecoin-project/specs-actors/actors/builtin/reward" "github.com/filecoin-project/specs-actors/actors/util/adt" "github.com/filecoin-project/lotus/api" @@ -53,6 +55,11 @@ func runSyncer(ctx context.Context, api api.FullNode, st *storage, maxBatch int) }() } +type rewardStateInfo struct { + stateroot cid.Cid + baselinePower big.Int +} + type minerStateInfo struct { // common addr address.Address @@ -273,6 +280,8 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types. } }) + // map of tipset to reward state + rewardTips := make(map[types.TipSetKey]*rewardStateInfo, len(changes)) // map of tipset to all miners that had a head-change at that tipset. minerTips := make(map[types.TipSetKey][]*minerStateInfo, len(changes)) // heads we've seen, im being paranoid @@ -302,40 +311,74 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types. alk.Unlock() }) - log.Infof("Getting miner info") + log.Infof("Getting actor change info") minerChanges := 0 for addr, m := range actors { for actor, c := range m { + // reward actor + if actor.Code != builtin.RewardActorCodeID { + rewardTips[c.tsKey] = &rewardStateInfo{ + stateroot: c.stateroot, + baselinePower: big.Zero(), + } + continue + } + + // miner actors with head change events if actor.Code != builtin.StorageMinerActorCodeID { - continue + if _, found := headsSeen[actor.Head]; found { + continue + } + minerChanges++ + + minerTips[c.tsKey] = append(minerTips[c.tsKey], &minerStateInfo{ + addr: addr, + act: actor, + stateroot: c.stateroot, + + tsKey: c.tsKey, + parentTsKey: c.parentTsKey, + + state: miner.State{}, + info: miner.MinerInfo{}, + + rawPower: big.Zero(), + qalPower: big.Zero(), + }) + + headsSeen[actor.Head] = struct{}{} } - - // only want miner actors with head change events - if _, found := headsSeen[actor.Head]; found { - continue - } - minerChanges++ - - minerTips[c.tsKey] = append(minerTips[c.tsKey], &minerStateInfo{ - addr: addr, - act: actor, - stateroot: c.stateroot, - - tsKey: c.tsKey, - parentTsKey: c.parentTsKey, - - state: miner.State{}, - info: miner.MinerInfo{}, - - rawPower: big.Zero(), - qalPower: big.Zero(), - }) - - headsSeen[actor.Head] = struct{}{} + continue } } + rewardProcessingStartedAt := time.Now() + parmap.Par(50, parmap.KVMapArr(rewardTips), func(it func() (types.TipSetKey, *rewardStateInfo)) { + tsKey, rewardInfo := it() + // get reward actor states at each tipset once for all updates + rewardActor, err := api.StateGetActor(ctx, builtin.RewardActorAddr, tsKey) + if err != nil { + log.Error(xerrors.Errorf("get reward state (@ %s): %w", rewardInfo.stateroot.String(), err)) + return + } + + rewardStateRaw, err := api.ChainReadObj(ctx, rewardActor.Head) + if err != nil { + log.Error(xerrors.Errorf("read state obj (@ %s): %w", rewardInfo.stateroot.String(), err)) + return + } + + var rewardActorState reward.State + if err := rewardActorState.UnmarshalCBOR(bytes.NewReader(rewardStateRaw)); err != nil { + log.Error(xerrors.Errorf("unmarshal state (@ %s): %w", rewardInfo.stateroot.String(), err)) + return + } + + rewardInfo.baselinePower = rewardActorState.BaselinePower + }) + log.Infow("Completed Reward Processing", "duration", time.Since(rewardProcessingStartedAt).String(), "processed", len(rewardTips)) + minerProcessingStartedAt := time.Now() log.Infow("Processing miners", "numTips", len(minerTips), "numMinerChanges", minerChanges) // extract the power actor state at each tipset, loop over all miners that changed at said tipset and extract their @@ -411,25 +454,29 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types. } log.Info("Storing actors") - if err := st.storeActors(actors); err != nil { log.Error(err) return } + chainPowerStartedAt := time.Now() + if err := st.storeChainPower(rewardTips); err != nil { + log.Error(err) + } + log.Infow("Stored chain power", "duration", time.Since(chainPowerStartedAt).String()) + log.Info("Storing miners") if err := st.storeMiners(minerTips); err != nil { log.Error(err) return } - log.Info("Storing miner sectors") sectorStart := time.Now() if err := st.storeSectors(minerTips, api); err != nil { log.Error(err) return } - log.Infow("Finished storing miner sectors", "duration", time.Since(sectorStart).String()) + log.Infow("Stored miner sectors", "duration", time.Since(sectorStart).String()) log.Info("Storing miner sectors heads") if err := st.storeMinerSectorsHeads(minerTips, api); err != nil { From 108167936075dfac41b26444274b0dcb3c530c29 Mon Sep 17 00:00:00 2001 From: Mike Greenberg Date: Wed, 8 Jul 2020 18:30:24 -0400 Subject: [PATCH 05/13] feat(chainwatch): Capture miner_power data --- cmd/lotus-chainwatch/storage.go | 59 ++++++++++++++++++++++++++++++++- cmd/lotus-chainwatch/sync.go | 6 ++++ 2 files changed, 64 insertions(+), 1 deletion(-) diff --git a/cmd/lotus-chainwatch/storage.go b/cmd/lotus-chainwatch/storage.go index 9647c9f47..c48435829 100644 --- a/cmd/lotus-chainwatch/storage.go +++ b/cmd/lotus-chainwatch/storage.go @@ -305,6 +305,19 @@ create table if not exists chain_power baseline_power text not null ); +/* +* captures miner-specific power state for any given stateroot +*/ +create table if not exists miner_power +( + miner_id text not null, + stateroot text not null, + raw_bytes_power text not null, + quality_adjusted_power text not null, + constraint miner_power_pk + primary key (miner_id, stateroot) +); + /* used to tell when a miners sectors (proven-not-yet-expired) changed if the miner_sectors_cid's are different a new sector was added or removed (terminated/expired) */ create table if not exists miner_sectors_heads ( @@ -532,7 +545,7 @@ func (st *storage) storeChainPower(rewardTips map[types.TipSetKey]*rewardStateIn rewardState.stateroot.String(), rewardState.baselinePower.String(), ); err != nil { - return xerrors.Errorf("exec prepared chain_power: %w", err) + log.Errorw("failed to store chain power", "stateroot", rewardState.stateroot, "error", err) } } @@ -658,6 +671,50 @@ func (st *storage) storeMiners(minerTips map[types.TipSetKey][]*minerStateInfo) return tx.Commit() } +// storeMinerPower captures miner actor state as it relates to power per miner captured on-chain +func (st *storage) storeMinerPower(minerTips map[types.TipSetKey][]*minerStateInfo) error { + tx, err := st.db.Begin() + if err != nil { + return xerrors.Errorf("begin miner_power tx: %w", err) + } + + if _, err := tx.Exec(`create temp table mp (like miner_power excluding constraints) on commit drop`); err != nil { + return xerrors.Errorf("prep miner_power temp: %w", err) + } + + stmt, err := tx.Prepare(`copy cp (miner_id, stateroot, raw_bytes_power, quality_adjusted_power) from STDIN`) + if err != nil { + return xerrors.Errorf("prepare tmp miner_power: %w", err) + } + + for _, miners := range minerTips { + for _, minerInfo := range miners { + if _, err := stmt.Exec( + minerInfo.addr.String(), + minerInfo.stateroot.String(), + minerInfo.rawPower.String(), + minerInfo.qalPower.String(), + ); err != nil { + log.Errorw("failed to store miner power", "miner", minerInfo.addr, "stateroot", minerInfo.stateroot, "error", err) + } + } + } + + if err := stmt.Close(); err != nil { + return xerrors.Errorf("close prepared miner_power: %w", err) + } + + if _, err := tx.Exec(`insert into miner_power select * from mp on conflict do nothing`); err != nil { + return xerrors.Errorf("insert miner_power from tmp: %w", err) + } + + if err := tx.Commit(); err != nil { + return xerrors.Errorf("commit miner_power tx: %w", err) + } + + return nil +} + func (st *storage) storeMinerSectorsHeads(minerTips map[types.TipSetKey][]*minerStateInfo, api api.FullNode) error { tx, err := st.db.Begin() if err != nil { diff --git a/cmd/lotus-chainwatch/sync.go b/cmd/lotus-chainwatch/sync.go index fe4970ffb..4ba4ea33c 100644 --- a/cmd/lotus-chainwatch/sync.go +++ b/cmd/lotus-chainwatch/sync.go @@ -471,6 +471,12 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types. return } + minerPowerStartedAt := time.Now() + if err := st.storeMinerPower(minerTips); err != nil { + log.Error(err) + } + log.Infow("Stored miner power", "duration", time.Since(minerPowerStartedAt).String()) + sectorStart := time.Now() if err := st.storeSectors(minerTips, api); err != nil { log.Error(err) From a79b31c23021860b5d5223ca622a7c4d7ed7121a Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Wed, 8 Jul 2020 16:16:45 -0700 Subject: [PATCH 06/13] produce and parse FIL suffix on FIL strings --- chain/types/fil.go | 26 +++++++++++++++++++++++--- 1 file changed, 23 insertions(+), 3 deletions(-) diff --git a/chain/types/fil.go b/chain/types/fil.go index 527078e0f..941cfbaa9 100644 --- a/chain/types/fil.go +++ b/chain/types/fil.go @@ -15,7 +15,7 @@ func (f FIL) String() string { if r.Sign() == 0 { return "0" } - return strings.TrimRight(strings.TrimRight(r.FloatString(18), "0"), ".") + return strings.TrimRight(strings.TrimRight(r.FloatString(18), "0"), ".") + " FIL" } func (f FIL) Format(s fmt.State, ch rune) { @@ -28,14 +28,34 @@ func (f FIL) Format(s fmt.State, ch rune) { } func ParseFIL(s string) (FIL, error) { + suffix := strings.TrimLeft(s, ".1234567890") + var attofil bool + if suffix != "" { + norm := strings.ToLower(strings.TrimSpace(suffix)) + switch norm { + case "", "fil": + case "attofil", "afil": + attofil = true + default: + return FIL{}, fmt.Errorf("unrecognized suffix: %q", suffix) + } + } + r, ok := new(big.Rat).SetString(s) if !ok { return FIL{}, fmt.Errorf("failed to parse %q as a decimal number", s) } - r = r.Mul(r, big.NewRat(int64(build.FilecoinPrecision), 1)) + if !attofil { + r = r.Mul(r, big.NewRat(int64(build.FilecoinPrecision), 1)) + } + if !r.IsInt() { - return FIL{}, fmt.Errorf("invalid FIL value: %q", s) + var pref string + if attofil { + pref = "atto" + } + return FIL{}, fmt.Errorf("invalid %sFIL value: %q", pref, s) } return FIL{r.Num()}, nil From 57a1418ff87d8ddded0b163e96b4b74bb8280747 Mon Sep 17 00:00:00 2001 From: Aayush Rajasekaran Date: Thu, 9 Jul 2020 01:05:27 -0400 Subject: [PATCH 07/13] Correct initial setup of sector id counter --- cmd/lotus-storage-miner/init.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/cmd/lotus-storage-miner/init.go b/cmd/lotus-storage-miner/init.go index 14972c69a..208c17d03 100644 --- a/cmd/lotus-storage-miner/init.go +++ b/cmd/lotus-storage-miner/init.go @@ -359,10 +359,8 @@ func migratePreSealMeta(ctx context.Context, api lapi.FullNode, metadata string, }*/ } - log.Infof("Setting next sector ID to %d", maxSectorID+1) - buf := make([]byte, binary.MaxVarintLen64) - size := binary.PutUvarint(buf, uint64(maxSectorID+1)) + size := binary.PutUvarint(buf, uint64(maxSectorID)) return mds.Put(datastore.NewKey(modules.StorageCounterDSPrefix), buf[:size]) } From b0cf3d322896ee085bc072527f2105cf36841fcb Mon Sep 17 00:00:00 2001 From: Jakub Sztandera Date: Thu, 9 Jul 2020 16:19:44 +0200 Subject: [PATCH 08/13] Mute RtRefreshManager logs as they break terminal emulation RtRefreshManager logs random binary strings, which somtimes might be partial multi-byte Unicode or ASCI escape codes. Signed-off-by: Jakub Sztandera --- lib/lotuslog/levels.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/lib/lotuslog/levels.go b/lib/lotuslog/levels.go index ae3959568..d8b4678b7 100644 --- a/lib/lotuslog/levels.go +++ b/lib/lotuslog/levels.go @@ -18,4 +18,6 @@ func SetupLogLevels() { _ = logging.SetLogLevel("stores", "DEBUG") _ = logging.SetLogLevel("nat", "INFO") } + // Always mute RtRefreshManager because it breaks terminals + _ = logging.SetLogLevel("dht/RtRefreshManager", "FATAL") } From a3602d4fcf31a49ccf6069b34b7931b5456d41a7 Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Thu, 9 Jul 2020 17:26:06 -0700 Subject: [PATCH 09/13] fix tests, handle parsing with suffixes properly --- chain/types/bigint_test.go | 2 +- chain/types/fil.go | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/chain/types/bigint_test.go b/chain/types/bigint_test.go index 43e5633b2..60130277e 100644 --- a/chain/types/bigint_test.go +++ b/chain/types/bigint_test.go @@ -43,7 +43,7 @@ func TestBigIntSerializationRoundTrip(t *testing.T) { func TestFilRoundTrip(t *testing.T) { testValues := []string{ - "0", "1", "1.001", "100.10001", "101100", "5000.01", "5000", + "0 FIL", "1 FIL", "1.001 FIL", "100.10001 FIL", "101100 FIL", "5000.01 FIL", "5000 FIL", } for _, v := range testValues { diff --git a/chain/types/fil.go b/chain/types/fil.go index 941cfbaa9..1d912d9c0 100644 --- a/chain/types/fil.go +++ b/chain/types/fil.go @@ -13,7 +13,7 @@ type FIL BigInt func (f FIL) String() string { r := new(big.Rat).SetFrac(f.Int, big.NewInt(int64(build.FilecoinPrecision))) if r.Sign() == 0 { - return "0" + return "0 FIL" } return strings.TrimRight(strings.TrimRight(r.FloatString(18), "0"), ".") + " FIL" } @@ -29,6 +29,7 @@ func (f FIL) Format(s fmt.State, ch rune) { func ParseFIL(s string) (FIL, error) { suffix := strings.TrimLeft(s, ".1234567890") + s = s[:len(s)-len(suffix)] var attofil bool if suffix != "" { norm := strings.ToLower(strings.TrimSpace(suffix)) From 0dbca1377aa0c34a81024a0b95b94198773de3e7 Mon Sep 17 00:00:00 2001 From: Anton Evangelatov Date: Fri, 10 Jul 2020 17:52:44 +0200 Subject: [PATCH 10/13] disable call to StatePledgeCollateral --- tools/stats/metrics.go | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/tools/stats/metrics.go b/tools/stats/metrics.go index 626363731..d0778ba1a 100644 --- a/tools/stats/metrics.go +++ b/tools/stats/metrics.go @@ -176,17 +176,18 @@ func (ht *apiIpldStore) Put(ctx context.Context, v interface{}) (cid.Cid, error) } func RecordTipsetStatePoints(ctx context.Context, api api.FullNode, pl *PointList, tipset *types.TipSet) error { - pc, err := api.StatePledgeCollateral(ctx, tipset.Key()) - if err != nil { - return err - } - attoFil := types.NewInt(build.FilecoinPrecision).Int - pcFil := new(big.Rat).SetFrac(pc.Int, attoFil) - pcFilFloat, _ := pcFil.Float64() - p := NewPoint("chain.pledge_collateral", pcFilFloat) - pl.AddPoint(p) + //TODO: StatePledgeCollateral API is not implemented and is commented out - re-enable this block once the API is implemented again. + //pc, err := api.StatePledgeCollateral(ctx, tipset.Key()) + //if err != nil { + //return err + //} + + //pcFil := new(big.Rat).SetFrac(pc.Int, attoFil) + //pcFilFloat, _ := pcFil.Float64() + //p := NewPoint("chain.pledge_collateral", pcFilFloat) + //pl.AddPoint(p) netBal, err := api.WalletBalance(ctx, builtin.RewardActorAddr) if err != nil { @@ -195,7 +196,7 @@ func RecordTipsetStatePoints(ctx context.Context, api api.FullNode, pl *PointLis netBalFil := new(big.Rat).SetFrac(netBal.Int, attoFil) netBalFilFloat, _ := netBalFil.Float64() - p = NewPoint("network.balance", netBalFilFloat) + p := NewPoint("network.balance", netBalFilFloat) pl.AddPoint(p) totalPower, err := api.StateMinerPower(ctx, address.Address{}, tipset.Key()) From 33322906265f8756a1cb8ea9b15fdadad8851492 Mon Sep 17 00:00:00 2001 From: Mike Greenberg Date: Fri, 10 Jul 2020 11:54:11 -0400 Subject: [PATCH 11/13] fix: Logic bug; Schema column update stateroot -> state_root --- cmd/lotus-chainwatch/storage.go | 12 ++++++------ cmd/lotus-chainwatch/sync.go | 4 ++-- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/cmd/lotus-chainwatch/storage.go b/cmd/lotus-chainwatch/storage.go index c48435829..8a4215bd2 100644 --- a/cmd/lotus-chainwatch/storage.go +++ b/cmd/lotus-chainwatch/storage.go @@ -299,7 +299,7 @@ create table if not exists miner_info */ create table if not exists chain_power ( - stateroot text not null + state_root text not null constraint chain_power_pk primary key, baseline_power text not null @@ -311,11 +311,11 @@ create table if not exists chain_power create table if not exists miner_power ( miner_id text not null, - stateroot text not null, + state_root text not null, raw_bytes_power text not null, quality_adjusted_power text not null, constraint miner_power_pk - primary key (miner_id, stateroot) + primary key (miner_id, state_root) ); /* used to tell when a miners sectors (proven-not-yet-expired) changed if the miner_sectors_cid's are different a new sector was added or removed (terminated/expired) */ @@ -535,7 +535,7 @@ func (st *storage) storeChainPower(rewardTips map[types.TipSetKey]*rewardStateIn return xerrors.Errorf("prep chain_power temp: %w", err) } - stmt, err := tx.Prepare(`copy cp (stateroot, baseline_power) from STDIN`) + stmt, err := tx.Prepare(`copy cp (state_root, baseline_power) from STDIN`) if err != nil { return xerrors.Errorf("prepare tmp chain_power: %w", err) } @@ -545,7 +545,7 @@ func (st *storage) storeChainPower(rewardTips map[types.TipSetKey]*rewardStateIn rewardState.stateroot.String(), rewardState.baselinePower.String(), ); err != nil { - log.Errorw("failed to store chain power", "stateroot", rewardState.stateroot, "error", err) + log.Errorw("failed to store chain power", "state_root", rewardState.stateroot, "error", err) } } @@ -682,7 +682,7 @@ func (st *storage) storeMinerPower(minerTips map[types.TipSetKey][]*minerStateIn return xerrors.Errorf("prep miner_power temp: %w", err) } - stmt, err := tx.Prepare(`copy cp (miner_id, stateroot, raw_bytes_power, quality_adjusted_power) from STDIN`) + stmt, err := tx.Prepare(`copy mp (miner_id, state_root, raw_bytes_power, quality_adjusted_power) from STDIN`) if err != nil { return xerrors.Errorf("prepare tmp miner_power: %w", err) } diff --git a/cmd/lotus-chainwatch/sync.go b/cmd/lotus-chainwatch/sync.go index 4ba4ea33c..442fd9c0c 100644 --- a/cmd/lotus-chainwatch/sync.go +++ b/cmd/lotus-chainwatch/sync.go @@ -317,7 +317,7 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types. for addr, m := range actors { for actor, c := range m { // reward actor - if actor.Code != builtin.RewardActorCodeID { + if actor.Code == builtin.RewardActorCodeID { rewardTips[c.tsKey] = &rewardStateInfo{ stateroot: c.stateroot, baselinePower: big.Zero(), @@ -326,7 +326,7 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types. } // miner actors with head change events - if actor.Code != builtin.StorageMinerActorCodeID { + if actor.Code == builtin.StorageMinerActorCodeID { if _, found := headsSeen[actor.Head]; found { continue } From ebd0e93a9943e450bd757c698355df913e31e9b4 Mon Sep 17 00:00:00 2001 From: Mike Greenberg Date: Fri, 10 Jul 2020 11:58:41 -0400 Subject: [PATCH 12/13] chainwatch: app exit with non-zero on error --- cmd/lotus-chainwatch/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/lotus-chainwatch/main.go b/cmd/lotus-chainwatch/main.go index b5ceb7348..d3c5a570b 100644 --- a/cmd/lotus-chainwatch/main.go +++ b/cmd/lotus-chainwatch/main.go @@ -51,7 +51,7 @@ func main() { if err := app.Run(os.Args); err != nil { log.Warnf("%+v", err) - return + os.Exit(1) } } From dc9df10f5a0b5adb5d981587badb06c820c79d3f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Fri, 10 Jul 2020 21:31:58 +0200 Subject: [PATCH 13/13] sync: Also rename span attributes --- chain/sync.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/chain/sync.go b/chain/sync.go index 0de624a99..ccc52cf57 100644 --- a/chain/sync.go +++ b/chain/sync.go @@ -1103,8 +1103,8 @@ func (syncer *Syncer) collectHeaders(ctx context.Context, incoming *types.TipSet ss := extractSyncState(ctx) span.AddAttributes( - trace.Int64Attribute("fromHeight", int64(incoming.Height())), - trace.Int64Attribute("toHeight", int64(known.Height())), + trace.Int64Attribute("incomingHeight", int64(incoming.Height())), + trace.Int64Attribute("knownHeight", int64(known.Height())), ) // Check if the parents of the from block are in the denylist.