chainwatch: sync in batches

This commit is contained in:
Łukasz Magiera 2020-01-08 17:29:46 +01:00
parent 1b5e1c4753
commit 79028397ad

View File

@ -4,6 +4,7 @@ import (
"bytes" "bytes"
"container/list" "container/list"
"context" "context"
"math"
"sync" "sync"
"github.com/filecoin-project/go-address" "github.com/filecoin-project/go-address"
@ -16,6 +17,8 @@ import (
"github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types"
) )
const maxBatch = 10000
func runSyncer(ctx context.Context, api api.FullNode, st *storage) { func runSyncer(ctx context.Context, api api.FullNode, st *storage) {
notifs, err := api.ChainNotify(ctx) notifs, err := api.ChainNotify(ctx)
if err != nil { if err != nil {
@ -68,7 +71,7 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipS
log.Infof("Getting headers / actors") log.Infof("Getting headers / actors")
toSync := map[cid.Cid]*types.BlockHeader{} allToSync := map[cid.Cid]*types.BlockHeader{}
toVisit := list.New() toVisit := list.New()
for _, header := range ts.Blocks() { for _, header := range ts.Blocks() {
@ -79,15 +82,15 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipS
bh := toVisit.Remove(toVisit.Back()).(*types.BlockHeader) bh := toVisit.Remove(toVisit.Back()).(*types.BlockHeader)
_, has := hazlist[bh.Cid()] _, has := hazlist[bh.Cid()]
if _, seen := toSync[bh.Cid()]; seen || has { if _, seen := allToSync[bh.Cid()]; seen || has {
continue continue
} }
toSync[bh.Cid()] = bh allToSync[bh.Cid()] = bh
addresses[bh.Miner] = address.Undef addresses[bh.Miner] = address.Undef
if len(toSync)%500 == 10 { if len(allToSync)%500 == 10 {
log.Infof("todo: (%d) %s", len(toSync), bh.Cid()) log.Infof("todo: (%d) %s @%d", len(allToSync), bh.Cid(), bh.Height)
} }
if len(bh.Parents) == 0 { if len(bh.Parents) == 0 {
@ -105,197 +108,218 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipS
} }
} }
log.Infof("Syncing %d blocks", len(toSync)) for len(allToSync) > 0 {
minH := uint64(math.MaxUint64)
paDone := 0 for _, header := range allToSync {
par(50, maparr(toSync), func(bh *types.BlockHeader) { if header.Height < minH {
paDone++ minH = header.Height
if paDone%100 == 0 { }
log.Infof("pa: %d %d%%", paDone, (paDone*100)/len(toSync))
} }
if len(bh.Parents) == 0 { // genesis case toSync := map[cid.Cid]*types.BlockHeader{}
ts, err := types.NewTipSet([]*types.BlockHeader{bh}) for c, header := range allToSync {
aadrs, err := api.StateListActors(ctx, ts) if header.Height < minH+maxBatch {
toSync[c] = header
}
}
for c := range toSync {
delete(allToSync, c)
}
log.Infof("Syncing %d blocks", len(toSync))
paDone := 0
par(50, maparr(toSync), func(bh *types.BlockHeader) {
paDone++
if paDone%100 == 0 {
log.Infof("pa: %d %d%%", paDone, (paDone*100)/len(toSync))
}
if len(bh.Parents) == 0 { // genesis case
ts, err := types.NewTipSet([]*types.BlockHeader{bh})
aadrs, err := api.StateListActors(ctx, ts)
if err != nil {
log.Error(err)
return
}
par(50, aadrs, func(addr address.Address) {
act, err := api.StateGetActor(ctx, addr, ts)
if err != nil {
log.Error(err)
return
}
alk.Lock()
_, ok := actors[addr]
if !ok {
actors[addr] = map[types.Actor]cid.Cid{}
}
actors[addr][*act] = bh.ParentStateRoot
addresses[addr] = address.Undef
alk.Unlock()
})
return
}
pts, err := api.ChainGetTipSet(ctx, types.NewTipSetKey(bh.Parents...))
if err != nil { if err != nil {
log.Error(err) log.Error(err)
return return
} }
par(50, aadrs, func(addr address.Address) { changes, err := api.StateChangedActors(ctx, pts.ParentState(), bh.ParentStateRoot)
act, err := api.StateGetActor(ctx, addr, ts) if err != nil {
log.Error(err)
return
}
for a, act := range changes {
addr, err := address.NewFromString(a)
if err != nil { if err != nil {
log.Error(err) log.Error(err)
return return
} }
alk.Lock() alk.Lock()
_, ok := actors[addr] _, ok := actors[addr]
if !ok { if !ok {
actors[addr] = map[types.Actor]cid.Cid{} actors[addr] = map[types.Actor]cid.Cid{}
} }
actors[addr][*act] = bh.ParentStateRoot actors[addr][act] = bh.ParentStateRoot
addresses[addr] = address.Undef addresses[addr] = address.Undef
alk.Unlock() alk.Unlock()
}) }
})
return log.Infof("Getting messages")
msgs, incls := fetchMessages(ctx, api, toSync)
log.Infof("Resolving addresses")
for _, message := range msgs {
addresses[message.To] = address.Undef
addresses[message.From] = address.Undef
} }
pts, err := api.ChainGetTipSet(ctx, types.NewTipSetKey(bh.Parents...)) par(50, kmaparr(addresses), func(addr address.Address) {
if err != nil { raddr, err := api.StateLookupID(ctx, addr, nil)
log.Error(err) if err != nil {
return log.Warn(err)
return
}
alk.Lock()
addresses[addr] = raddr
alk.Unlock()
})
log.Infof("Getting miner info")
miners := map[minerKey]*minerInfo{}
for addr, m := range actors {
for actor, c := range m {
if actor.Code != actors2.StorageMinerCodeCid {
continue
}
miners[minerKey{
addr: addr,
act: actor,
stateroot: c,
}] = &minerInfo{}
}
} }
changes, err := api.StateChangedActors(ctx, pts.ParentState(), bh.ParentStateRoot) par(50, kvmaparr(miners), func(it func() (minerKey, *minerInfo)) {
if err != nil { k, info := it()
log.Error(err)
return
}
for a, act := range changes { sszs, err := api.StateMinerSectorCount(ctx, k.addr, nil)
addr, err := address.NewFromString(a) if err != nil {
log.Error(err)
return
}
info.psize = sszs.Pset
info.ssize = sszs.Sset
astb, err := api.ChainReadObj(ctx, k.act.Head)
if err != nil { if err != nil {
log.Error(err) log.Error(err)
return return
} }
alk.Lock() if err := info.state.UnmarshalCBOR(bytes.NewReader(astb)); err != nil {
_, ok := actors[addr] log.Error(err)
if !ok { return
actors[addr] = map[types.Actor]cid.Cid{}
}
actors[addr][act] = bh.ParentStateRoot
addresses[addr] = address.Undef
alk.Unlock()
}
})
log.Infof("Getting messages")
msgs, incls := fetchMessages(ctx, api, toSync)
log.Infof("Resolving addresses")
for _, message := range msgs {
addresses[message.To] = address.Undef
addresses[message.From] = address.Undef
}
par(50, kmaparr(addresses), func(addr address.Address) {
raddr, err := api.StateLookupID(ctx, addr, nil)
if err != nil {
log.Warn(err)
return
}
alk.Lock()
addresses[addr] = raddr
alk.Unlock()
})
log.Infof("Getting miner info")
miners := map[minerKey]*minerInfo{}
for addr, m := range actors {
for actor, c := range m {
if actor.Code != actors2.StorageMinerCodeCid {
continue
} }
miners[minerKey{ ib, err := api.ChainReadObj(ctx, info.state.Info)
addr: addr, if err != nil {
act: actor, log.Error(err)
stateroot: c, return
}] = &minerInfo{} }
}
}
par(50, kvmaparr(miners), func(it func() (minerKey, *minerInfo)) { if err := info.info.UnmarshalCBOR(bytes.NewReader(ib)); err != nil {
k, info := it() log.Error(err)
return
}
})
sszs, err := api.StateMinerSectorCount(ctx, k.addr, nil) log.Info("Getting receipts")
if err != nil {
log.Error(err) receipts := fetchParentReceipts(ctx, api, toSync)
log.Info("Storing headers")
if err := st.storeHeaders(toSync, true); err != nil {
log.Errorf("%+v", err)
return return
} }
info.psize = sszs.Pset
info.ssize = sszs.Sset
astb, err := api.ChainReadObj(ctx, k.act.Head) log.Info("Storing address mapping")
if err != nil {
if err := st.storeAddressMap(addresses); err != nil {
log.Error(err) log.Error(err)
return return
} }
if err := info.state.UnmarshalCBOR(bytes.NewReader(astb)); err != nil { log.Info("Storing actors")
if err := st.storeActors(actors); err != nil {
log.Error(err) log.Error(err)
return return
} }
ib, err := api.ChainReadObj(ctx, info.state.Info) log.Info("Storing miners")
if err != nil {
if err := st.storeMiners(miners); err != nil {
log.Error(err) log.Error(err)
return return
} }
if err := info.info.UnmarshalCBOR(bytes.NewReader(ib)); err != nil { log.Infof("Storing messages")
if err := st.storeMessages(msgs); err != nil {
log.Error(err) log.Error(err)
return return
} }
})
log.Info("Getting receipts") log.Info("Storing message inclusions")
receipts := fetchParentReceipts(ctx, api, toSync) if err := st.storeMsgInclusions(incls); err != nil {
log.Error(err)
return
}
log.Info("Storing headers") log.Infof("Storing parent receipts")
if err := st.storeHeaders(toSync, true); err != nil { if err := st.storeReceipts(receipts); err != nil {
log.Errorf("%+v", err) log.Error(err)
return return
} }
log.Infof("Sync stage done")
log.Info("Storing address mapping")
if err := st.storeAddressMap(addresses); err != nil {
log.Error(err)
return
}
log.Info("Storing actors")
if err := st.storeActors(actors); err != nil {
log.Error(err)
return
}
log.Info("Storing miners")
if err := st.storeMiners(miners); err != nil {
log.Error(err)
return
}
log.Infof("Storing messages")
if err := st.storeMessages(msgs); err != nil {
log.Error(err)
return
}
log.Info("Storing message inclusions")
if err := st.storeMsgInclusions(incls); err != nil {
log.Error(err)
return
}
log.Infof("Storing parent receipts")
if err := st.storeReceipts(receipts); err != nil {
log.Error(err)
return
} }
log.Infof("Sync done") log.Infof("Sync done")