From a98c4038f40256074d5a4a44ce3e206306fd65b3 Mon Sep 17 00:00:00 2001 From: frrist Date: Wed, 24 Jun 2020 10:39:45 -0700 Subject: [PATCH] fix: use tipset corresponding to stateroot - Use te tipsetkey corresponding to the stateroot when fetching actor data from the lotus api. --- cmd/lotus-chainwatch/sync.go | 48 ++++++++++++++++++++++-------------- 1 file changed, 30 insertions(+), 18 deletions(-) diff --git a/cmd/lotus-chainwatch/sync.go b/cmd/lotus-chainwatch/sync.go index d42a72b9b..88afb647e 100644 --- a/cmd/lotus-chainwatch/sync.go +++ b/cmd/lotus-chainwatch/sync.go @@ -53,6 +53,7 @@ type minerKey struct { addr address.Address act types.Actor stateroot cid.Cid + tsKey types.TipSetKey } type minerInfo struct { @@ -66,10 +67,11 @@ type minerInfo struct { type actorInfo struct { stateroot cid.Cid + tsKey types.TipSetKey state string } -func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipSet, maxBatch int) { +func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types.TipSet, maxBatch int) { var alk sync.Mutex log.Infof("Getting synced block list") @@ -81,7 +83,7 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipS allToSync := map[cid.Cid]*types.BlockHeader{} toVisit := list.New() - for _, header := range ts.Blocks() { + for _, header := range headTs.Blocks() { toVisit.PushBack(header) } @@ -116,7 +118,7 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipS for len(allToSync) > 0 { actors := map[address.Address]map[types.Actor]actorInfo{} - addresses := map[address.Address]address.Address{} + addressToID := map[address.Address]address.Address{} minH := abi.ChainEpoch(math.MaxInt64) for _, header := range allToSync { @@ -129,7 +131,7 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipS for c, header := range allToSync { if header.Height < minH+abi.ChainEpoch(maxBatch) { toSync[c] = header - addresses[header.Miner] = address.Undef + addressToID[header.Miner] = address.Undef } } for c := range toSync { @@ -146,20 +148,20 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipS } if len(bh.Parents) == 0 { // genesis case - ts, _ := types.NewTipSet([]*types.BlockHeader{bh}) - aadrs, err := api.StateListActors(ctx, ts.Key()) + genesisTs, _ := types.NewTipSet([]*types.BlockHeader{bh}) + aadrs, err := api.StateListActors(ctx, genesisTs.Key()) if err != nil { log.Error(err) return } parmap.Par(50, aadrs, func(addr address.Address) { - act, err := api.StateGetActor(ctx, addr, ts.Key()) + act, err := api.StateGetActor(ctx, addr, genesisTs.Key()) if err != nil { log.Error(err) return } - ast, err := api.StateReadState(ctx, act, ts.Key()) + ast, err := api.StateReadState(ctx, act, genesisTs.Key()) if err != nil { log.Error(err) return @@ -177,9 +179,10 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipS } actors[addr][*act] = actorInfo{ stateroot: bh.ParentStateRoot, + tsKey: genesisTs.Key(), state: string(state), } - addresses[addr] = address.Undef + addressToID[addr] = address.Undef alk.Unlock() }) @@ -206,11 +209,13 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipS log.Error(err) return } + ast, err := api.StateReadState(ctx, &act, pts.Key()) if err != nil { log.Error(err) return } + state, err := json.Marshal(ast.State) if err != nil { log.Error(err) @@ -225,8 +230,9 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipS actors[addr][act] = actorInfo{ stateroot: bh.ParentStateRoot, state: string(state), + tsKey: pts.Key(), } - addresses[addr] = address.Undef + addressToID[addr] = address.Undef alk.Unlock() } }) @@ -238,18 +244,20 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipS log.Infof("Resolving addresses") for _, message := range msgs { - addresses[message.To] = address.Undef - addresses[message.From] = address.Undef + addressToID[message.To] = address.Undef + addressToID[message.From] = address.Undef } - parmap.Par(50, parmap.KMapArr(addresses), func(addr address.Address) { + parmap.Par(50, parmap.KMapArr(addressToID), func(addr address.Address) { + // FIXME: cannot use EmptyTSK here since actorID's can change during reorgs, need to use the corresponding tipset. + // TODO: figure out a way to get the corresponding tipset... raddr, err := api.StateLookupID(ctx, addr, types.EmptyTSK) if err != nil { log.Warn(err) return } alk.Lock() - addresses[addr] = raddr + addressToID[addr] = raddr alk.Unlock() }) @@ -267,6 +275,7 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipS addr: addr, act: actor, stateroot: c.stateroot, + tsKey: c.tsKey, }] = &minerInfo{} } } @@ -274,14 +283,17 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipS parmap.Par(50, parmap.KVMapArr(miners), func(it func() (minerKey, *minerInfo)) { k, info := it() - pow, err := api.StateMinerPower(ctx, k.addr, types.EmptyTSK) + // TODO: get the storage power actors state and and pull the miner power from there, currently this hits the + // storage power actor once for each miner for each tipset, we can do better by just getting it for each tipset + // and reading each miner power from the result. + pow, err := api.StateMinerPower(ctx, k.addr, k.tsKey) if err != nil { log.Error(err) // Not sure why this would fail, but its probably worth continuing } info.power = pow.MinerPower.QualityAdjPower - sszs, err := api.StateMinerSectorCount(ctx, k.addr, types.EmptyTSK) + sszs, err := api.StateMinerSectorCount(ctx, k.addr, k.tsKey) if err != nil { log.Error(err) return @@ -316,7 +328,7 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipS log.Info("Storing address mapping") - if err := st.storeAddressMap(addresses); err != nil { + if err := st.storeAddressMap(addressToID); err != nil { log.Error(err) return } @@ -361,7 +373,7 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipS log.Infof("Get deals") // TODO: incremental, gather expired - deals, err := api.StateMarketDeals(ctx, ts.Key()) + deals, err := api.StateMarketDeals(ctx, headTs.Key()) if err != nil { log.Error(err) return