fix: use tipset corresponding to stateroot
- Use te tipsetkey corresponding to the stateroot when fetching actor data from the lotus api.
This commit is contained in:
parent
7d9f9ba756
commit
a98c4038f4
@ -53,6 +53,7 @@ type minerKey struct {
|
|||||||
addr address.Address
|
addr address.Address
|
||||||
act types.Actor
|
act types.Actor
|
||||||
stateroot cid.Cid
|
stateroot cid.Cid
|
||||||
|
tsKey types.TipSetKey
|
||||||
}
|
}
|
||||||
|
|
||||||
type minerInfo struct {
|
type minerInfo struct {
|
||||||
@ -66,10 +67,11 @@ type minerInfo struct {
|
|||||||
|
|
||||||
type actorInfo struct {
|
type actorInfo struct {
|
||||||
stateroot cid.Cid
|
stateroot cid.Cid
|
||||||
|
tsKey types.TipSetKey
|
||||||
state string
|
state string
|
||||||
}
|
}
|
||||||
|
|
||||||
func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipSet, maxBatch int) {
|
func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types.TipSet, maxBatch int) {
|
||||||
var alk sync.Mutex
|
var alk sync.Mutex
|
||||||
|
|
||||||
log.Infof("Getting synced block list")
|
log.Infof("Getting synced block list")
|
||||||
@ -81,7 +83,7 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipS
|
|||||||
allToSync := map[cid.Cid]*types.BlockHeader{}
|
allToSync := map[cid.Cid]*types.BlockHeader{}
|
||||||
toVisit := list.New()
|
toVisit := list.New()
|
||||||
|
|
||||||
for _, header := range ts.Blocks() {
|
for _, header := range headTs.Blocks() {
|
||||||
toVisit.PushBack(header)
|
toVisit.PushBack(header)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -116,7 +118,7 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipS
|
|||||||
|
|
||||||
for len(allToSync) > 0 {
|
for len(allToSync) > 0 {
|
||||||
actors := map[address.Address]map[types.Actor]actorInfo{}
|
actors := map[address.Address]map[types.Actor]actorInfo{}
|
||||||
addresses := map[address.Address]address.Address{}
|
addressToID := map[address.Address]address.Address{}
|
||||||
minH := abi.ChainEpoch(math.MaxInt64)
|
minH := abi.ChainEpoch(math.MaxInt64)
|
||||||
|
|
||||||
for _, header := range allToSync {
|
for _, header := range allToSync {
|
||||||
@ -129,7 +131,7 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipS
|
|||||||
for c, header := range allToSync {
|
for c, header := range allToSync {
|
||||||
if header.Height < minH+abi.ChainEpoch(maxBatch) {
|
if header.Height < minH+abi.ChainEpoch(maxBatch) {
|
||||||
toSync[c] = header
|
toSync[c] = header
|
||||||
addresses[header.Miner] = address.Undef
|
addressToID[header.Miner] = address.Undef
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
for c := range toSync {
|
for c := range toSync {
|
||||||
@ -146,20 +148,20 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipS
|
|||||||
}
|
}
|
||||||
|
|
||||||
if len(bh.Parents) == 0 { // genesis case
|
if len(bh.Parents) == 0 { // genesis case
|
||||||
ts, _ := types.NewTipSet([]*types.BlockHeader{bh})
|
genesisTs, _ := types.NewTipSet([]*types.BlockHeader{bh})
|
||||||
aadrs, err := api.StateListActors(ctx, ts.Key())
|
aadrs, err := api.StateListActors(ctx, genesisTs.Key())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error(err)
|
log.Error(err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
parmap.Par(50, aadrs, func(addr address.Address) {
|
parmap.Par(50, aadrs, func(addr address.Address) {
|
||||||
act, err := api.StateGetActor(ctx, addr, ts.Key())
|
act, err := api.StateGetActor(ctx, addr, genesisTs.Key())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error(err)
|
log.Error(err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
ast, err := api.StateReadState(ctx, act, ts.Key())
|
ast, err := api.StateReadState(ctx, act, genesisTs.Key())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error(err)
|
log.Error(err)
|
||||||
return
|
return
|
||||||
@ -177,9 +179,10 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipS
|
|||||||
}
|
}
|
||||||
actors[addr][*act] = actorInfo{
|
actors[addr][*act] = actorInfo{
|
||||||
stateroot: bh.ParentStateRoot,
|
stateroot: bh.ParentStateRoot,
|
||||||
|
tsKey: genesisTs.Key(),
|
||||||
state: string(state),
|
state: string(state),
|
||||||
}
|
}
|
||||||
addresses[addr] = address.Undef
|
addressToID[addr] = address.Undef
|
||||||
alk.Unlock()
|
alk.Unlock()
|
||||||
})
|
})
|
||||||
|
|
||||||
@ -206,11 +209,13 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipS
|
|||||||
log.Error(err)
|
log.Error(err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
ast, err := api.StateReadState(ctx, &act, pts.Key())
|
ast, err := api.StateReadState(ctx, &act, pts.Key())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error(err)
|
log.Error(err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
state, err := json.Marshal(ast.State)
|
state, err := json.Marshal(ast.State)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error(err)
|
log.Error(err)
|
||||||
@ -225,8 +230,9 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipS
|
|||||||
actors[addr][act] = actorInfo{
|
actors[addr][act] = actorInfo{
|
||||||
stateroot: bh.ParentStateRoot,
|
stateroot: bh.ParentStateRoot,
|
||||||
state: string(state),
|
state: string(state),
|
||||||
|
tsKey: pts.Key(),
|
||||||
}
|
}
|
||||||
addresses[addr] = address.Undef
|
addressToID[addr] = address.Undef
|
||||||
alk.Unlock()
|
alk.Unlock()
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
@ -238,18 +244,20 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipS
|
|||||||
log.Infof("Resolving addresses")
|
log.Infof("Resolving addresses")
|
||||||
|
|
||||||
for _, message := range msgs {
|
for _, message := range msgs {
|
||||||
addresses[message.To] = address.Undef
|
addressToID[message.To] = address.Undef
|
||||||
addresses[message.From] = address.Undef
|
addressToID[message.From] = address.Undef
|
||||||
}
|
}
|
||||||
|
|
||||||
parmap.Par(50, parmap.KMapArr(addresses), func(addr address.Address) {
|
parmap.Par(50, parmap.KMapArr(addressToID), func(addr address.Address) {
|
||||||
|
// FIXME: cannot use EmptyTSK here since actorID's can change during reorgs, need to use the corresponding tipset.
|
||||||
|
// TODO: figure out a way to get the corresponding tipset...
|
||||||
raddr, err := api.StateLookupID(ctx, addr, types.EmptyTSK)
|
raddr, err := api.StateLookupID(ctx, addr, types.EmptyTSK)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warn(err)
|
log.Warn(err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
alk.Lock()
|
alk.Lock()
|
||||||
addresses[addr] = raddr
|
addressToID[addr] = raddr
|
||||||
alk.Unlock()
|
alk.Unlock()
|
||||||
})
|
})
|
||||||
|
|
||||||
@ -267,6 +275,7 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipS
|
|||||||
addr: addr,
|
addr: addr,
|
||||||
act: actor,
|
act: actor,
|
||||||
stateroot: c.stateroot,
|
stateroot: c.stateroot,
|
||||||
|
tsKey: c.tsKey,
|
||||||
}] = &minerInfo{}
|
}] = &minerInfo{}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -274,14 +283,17 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipS
|
|||||||
parmap.Par(50, parmap.KVMapArr(miners), func(it func() (minerKey, *minerInfo)) {
|
parmap.Par(50, parmap.KVMapArr(miners), func(it func() (minerKey, *minerInfo)) {
|
||||||
k, info := it()
|
k, info := it()
|
||||||
|
|
||||||
pow, err := api.StateMinerPower(ctx, k.addr, types.EmptyTSK)
|
// TODO: get the storage power actors state and and pull the miner power from there, currently this hits the
|
||||||
|
// storage power actor once for each miner for each tipset, we can do better by just getting it for each tipset
|
||||||
|
// and reading each miner power from the result.
|
||||||
|
pow, err := api.StateMinerPower(ctx, k.addr, k.tsKey)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error(err)
|
log.Error(err)
|
||||||
// Not sure why this would fail, but its probably worth continuing
|
// Not sure why this would fail, but its probably worth continuing
|
||||||
}
|
}
|
||||||
info.power = pow.MinerPower.QualityAdjPower
|
info.power = pow.MinerPower.QualityAdjPower
|
||||||
|
|
||||||
sszs, err := api.StateMinerSectorCount(ctx, k.addr, types.EmptyTSK)
|
sszs, err := api.StateMinerSectorCount(ctx, k.addr, k.tsKey)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error(err)
|
log.Error(err)
|
||||||
return
|
return
|
||||||
@ -316,7 +328,7 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipS
|
|||||||
|
|
||||||
log.Info("Storing address mapping")
|
log.Info("Storing address mapping")
|
||||||
|
|
||||||
if err := st.storeAddressMap(addresses); err != nil {
|
if err := st.storeAddressMap(addressToID); err != nil {
|
||||||
log.Error(err)
|
log.Error(err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -361,7 +373,7 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipS
|
|||||||
log.Infof("Get deals")
|
log.Infof("Get deals")
|
||||||
|
|
||||||
// TODO: incremental, gather expired
|
// TODO: incremental, gather expired
|
||||||
deals, err := api.StateMarketDeals(ctx, ts.Key())
|
deals, err := api.StateMarketDeals(ctx, headTs.Key())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error(err)
|
log.Error(err)
|
||||||
return
|
return
|
||||||
|
Loading…
Reference in New Issue
Block a user