lotus/cmd/lotus-chainwatch/sync.go

450 lines
9.0 KiB
Go
Raw Normal View History

2019-11-15 16:38:56 +00:00
package main
import (
"bytes"
2019-11-15 16:38:56 +00:00
"container/list"
"context"
2020-01-19 16:18:47 +00:00
"encoding/json"
2020-01-08 16:29:46 +00:00
"math"
"sync"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/specs-actors/actors/abi"
actors2 "github.com/filecoin-project/lotus/chain/actors"
2019-11-15 16:38:56 +00:00
"github.com/ipfs/go-cid"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types"
)
func runSyncer(ctx context.Context, api api.FullNode, st *storage, maxBatch int) {
2019-11-15 16:38:56 +00:00
notifs, err := api.ChainNotify(ctx)
if err != nil {
panic(err)
}
go func() {
for notif := range notifs {
for _, change := range notif {
switch change.Type {
case store.HCCurrent:
fallthrough
case store.HCApply:
syncHead(ctx, api, st, change.Val, maxBatch)
2019-11-15 16:38:56 +00:00
case store.HCRevert:
log.Warnf("revert todo")
}
if change.Type == store.HCCurrent {
go subMpool(ctx, api, st)
go subBlocks(ctx, api, st)
}
2019-11-15 16:38:56 +00:00
}
}
}()
}
type minerKey struct {
addr address.Address
act types.Actor
stateroot cid.Cid
}
type minerInfo struct {
state actors2.StorageMinerActorState
info actors2.MinerInfo
2019-12-11 23:31:59 +00:00
ssize uint64
psize uint64
}
2020-01-19 16:18:47 +00:00
type actorInfo struct {
stateroot cid.Cid
state string
}
func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipSet, maxBatch int) {
2019-11-15 18:37:57 +00:00
var alk sync.Mutex
2019-12-11 22:17:44 +00:00
log.Infof("Getting synced block list")
hazlist := st.hasList()
2019-11-15 18:37:57 +00:00
log.Infof("Getting headers / actors")
2020-01-08 16:29:46 +00:00
allToSync := map[cid.Cid]*types.BlockHeader{}
2019-11-15 16:38:56 +00:00
toVisit := list.New()
for _, header := range ts.Blocks() {
toVisit.PushBack(header)
}
for toVisit.Len() > 0 {
bh := toVisit.Remove(toVisit.Back()).(*types.BlockHeader)
2019-12-11 22:17:44 +00:00
_, has := hazlist[bh.Cid()]
2020-01-08 16:29:46 +00:00
if _, seen := allToSync[bh.Cid()]; seen || has {
2019-11-15 18:37:57 +00:00
continue
2019-11-15 16:38:56 +00:00
}
2019-11-15 18:37:57 +00:00
2020-01-08 16:29:46 +00:00
allToSync[bh.Cid()] = bh
2019-11-15 18:37:57 +00:00
2020-01-08 16:29:46 +00:00
if len(allToSync)%500 == 10 {
log.Infof("todo: (%d) %s @%d", len(allToSync), bh.Cid(), bh.Height)
2019-11-15 16:38:56 +00:00
}
if len(bh.Parents) == 0 {
2019-11-15 18:37:57 +00:00
continue
2019-11-15 16:38:56 +00:00
}
pts, err := api.ChainGetTipSet(ctx, types.NewTipSetKey(bh.Parents...))
if err != nil {
log.Error(err)
continue
}
for _, header := range pts.Blocks() {
toVisit.PushBack(header)
}
}
2020-01-08 16:29:46 +00:00
for len(allToSync) > 0 {
actors := map[address.Address]map[types.Actor]actorInfo{}
addresses := map[address.Address]address.Address{}
minH := abi.ChainEpoch(math.MaxUint64)
2019-11-15 16:38:56 +00:00
2020-01-08 16:29:46 +00:00
for _, header := range allToSync {
if header.Height < minH {
minH = header.Height
}
2019-11-15 18:37:57 +00:00
}
2020-01-08 16:29:46 +00:00
toSync := map[cid.Cid]*types.BlockHeader{}
for c, header := range allToSync {
if header.Height < minH+uint64(maxBatch) {
2020-01-08 16:29:46 +00:00
toSync[c] = header
addresses[header.Miner] = address.Undef
2020-01-08 16:29:46 +00:00
}
}
for c := range toSync {
delete(allToSync, c)
}
log.Infof("Syncing %d blocks", len(toSync))
paDone := 0
par(50, maparr(toSync), func(bh *types.BlockHeader) {
paDone++
if paDone%100 == 0 {
log.Infof("pa: %d %d%%", paDone, (paDone*100)/len(toSync))
}
if len(bh.Parents) == 0 { // genesis case
ts, err := types.NewTipSet([]*types.BlockHeader{bh})
aadrs, err := api.StateListActors(ctx, ts.Key())
2020-01-08 16:29:46 +00:00
if err != nil {
log.Error(err)
return
}
par(50, aadrs, func(addr address.Address) {
act, err := api.StateGetActor(ctx, addr, ts.Key())
2020-01-08 16:29:46 +00:00
if err != nil {
log.Error(err)
return
}
ast, err := api.StateReadState(ctx, act, ts.Key())
2020-01-19 16:18:47 +00:00
if err != nil {
log.Error(err)
return
}
state, err := json.Marshal(ast.State)
if err != nil {
log.Error(err)
return
}
2020-01-08 16:29:46 +00:00
alk.Lock()
_, ok := actors[addr]
if !ok {
2020-01-19 16:18:47 +00:00
actors[addr] = map[types.Actor]actorInfo{}
}
actors[addr][*act] = actorInfo{
stateroot: bh.ParentStateRoot,
state: string(state),
2020-01-08 16:29:46 +00:00
}
addresses[addr] = address.Undef
alk.Unlock()
})
return
}
pts, err := api.ChainGetTipSet(ctx, types.NewTipSetKey(bh.Parents...))
if err != nil {
log.Error(err)
return
}
2020-01-08 16:29:46 +00:00
changes, err := api.StateChangedActors(ctx, pts.ParentState(), bh.ParentStateRoot)
if err != nil {
log.Error(err)
return
}
for a, act := range changes {
addr, err := address.NewFromString(a)
if err != nil {
log.Error(err)
return
}
ast, err := api.StateReadState(ctx, &act, pts.Key())
2020-01-19 16:18:47 +00:00
if err != nil {
log.Error(err)
return
}
state, err := json.Marshal(ast.State)
if err != nil {
log.Error(err)
return
}
2020-01-08 16:29:46 +00:00
alk.Lock()
_, ok := actors[addr]
if !ok {
2020-01-19 16:18:47 +00:00
actors[addr] = map[types.Actor]actorInfo{}
}
actors[addr][act] = actorInfo{
stateroot: bh.ParentStateRoot,
state: string(state),
}
addresses[addr] = address.Undef
alk.Unlock()
2020-01-08 16:29:46 +00:00
}
})
2020-01-08 16:29:46 +00:00
log.Infof("Getting messages")
2020-01-08 16:29:46 +00:00
msgs, incls := fetchMessages(ctx, api, toSync)
2020-01-08 16:29:46 +00:00
log.Infof("Resolving addresses")
for _, message := range msgs {
addresses[message.To] = address.Undef
addresses[message.From] = address.Undef
2019-11-15 18:37:57 +00:00
}
2020-01-08 16:29:46 +00:00
par(50, kmaparr(addresses), func(addr address.Address) {
raddr, err := api.StateLookupID(ctx, addr, types.EmptyTSK)
2019-11-15 18:37:57 +00:00
if err != nil {
2020-01-08 16:29:46 +00:00
log.Warn(err)
2019-11-15 18:37:57 +00:00
return
}
alk.Lock()
2020-01-08 16:29:46 +00:00
addresses[addr] = raddr
2019-11-15 18:37:57 +00:00
alk.Unlock()
2020-01-08 16:29:46 +00:00
})
2019-12-12 18:34:28 +00:00
2020-01-08 16:29:46 +00:00
log.Infof("Getting miner info")
2019-12-12 18:34:28 +00:00
2020-01-08 16:29:46 +00:00
miners := map[minerKey]*minerInfo{}
2019-12-12 18:34:28 +00:00
2020-01-08 16:29:46 +00:00
for addr, m := range actors {
for actor, c := range m {
if actor.Code != actors2.StorageMinerCodeCid {
2020-01-08 16:29:46 +00:00
continue
}
2019-11-15 18:37:57 +00:00
2020-01-08 16:29:46 +00:00
miners[minerKey{
addr: addr,
act: actor,
2020-01-19 16:18:47 +00:00
stateroot: c.stateroot,
2020-01-08 16:29:46 +00:00
}] = &minerInfo{}
}
2019-12-12 18:34:28 +00:00
}
2020-01-08 16:29:46 +00:00
par(50, kvmaparr(miners), func(it func() (minerKey, *minerInfo)) {
k, info := it()
sszs, err := api.StateMinerSectorCount(ctx, k.addr, types.EmptyTSK)
2020-01-08 16:29:46 +00:00
if err != nil {
log.Error(err)
return
}
info.psize = sszs.Pset
info.ssize = sszs.Sset
2020-01-08 16:29:46 +00:00
astb, err := api.ChainReadObj(ctx, k.act.Head)
if err != nil {
log.Error(err)
return
}
2020-01-08 16:29:46 +00:00
if err := info.state.UnmarshalCBOR(bytes.NewReader(astb)); err != nil {
log.Error(err)
return
}
2020-01-08 16:29:46 +00:00
ib, err := api.ChainReadObj(ctx, info.state.Info)
if err != nil {
log.Error(err)
return
}
2020-01-08 16:29:46 +00:00
if err := info.info.UnmarshalCBOR(bytes.NewReader(ib)); err != nil {
log.Error(err)
return
}
})
log.Info("Getting receipts")
receipts := fetchParentReceipts(ctx, api, toSync)
log.Info("Storing headers")
if err := st.storeHeaders(toSync, true); err != nil {
log.Errorf("%+v", err)
2019-12-11 23:31:59 +00:00
return
}
2020-01-08 16:29:46 +00:00
log.Info("Storing address mapping")
if err := st.storeAddressMap(addresses); err != nil {
log.Error(err)
return
}
2020-01-08 16:29:46 +00:00
log.Info("Storing actors")
if err := st.storeActors(actors); err != nil {
log.Error(err)
return
}
2020-01-08 16:29:46 +00:00
log.Info("Storing miners")
if err := st.storeMiners(miners); err != nil {
log.Error(err)
return
}
2020-01-08 16:29:46 +00:00
log.Infof("Storing messages")
if err := st.storeMessages(msgs); err != nil {
log.Error(err)
return
}
2019-12-12 18:34:28 +00:00
2020-01-08 16:29:46 +00:00
log.Info("Storing message inclusions")
2019-12-12 18:34:28 +00:00
2020-01-08 16:29:46 +00:00
if err := st.storeMsgInclusions(incls); err != nil {
log.Error(err)
return
}
2019-12-10 23:42:36 +00:00
2020-01-08 16:29:46 +00:00
log.Infof("Storing parent receipts")
2019-12-10 23:42:36 +00:00
2020-01-08 16:29:46 +00:00
if err := st.storeReceipts(receipts); err != nil {
log.Error(err)
return
}
log.Infof("Sync stage done")
2019-12-10 23:42:36 +00:00
}
2020-01-20 00:49:52 +00:00
log.Infof("Get deals")
// TODO: incremental, gather expired
deals, err := api.StateMarketDeals(ctx, ts.Key())
2020-01-20 00:49:52 +00:00
if err != nil {
log.Error(err)
return
}
log.Infof("Store deals")
if err := st.storeDeals(deals); err != nil {
log.Error(err)
return
}
2020-01-22 15:10:22 +00:00
log.Infof("Refresh views")
if err := st.refreshViews(); err != nil {
log.Error(err)
return
}
2019-11-15 16:38:56 +00:00
log.Infof("Sync done")
}
2019-11-15 18:37:57 +00:00
func fetchMessages(ctx context.Context, api api.FullNode, toSync map[cid.Cid]*types.BlockHeader) (map[cid.Cid]*types.Message, map[cid.Cid][]cid.Cid) {
2019-11-15 16:38:56 +00:00
var lk sync.Mutex
messages := map[cid.Cid]*types.Message{}
inclusions := map[cid.Cid][]cid.Cid{} // block -> msgs
2019-11-15 18:37:57 +00:00
par(50, maparr(toSync), func(header *types.BlockHeader) {
msgs, err := api.ChainGetBlockMessages(ctx, header.Cid())
if err != nil {
log.Error(err)
return
2019-11-15 16:38:56 +00:00
}
2019-11-15 18:37:57 +00:00
vmm := make([]*types.Message, 0, len(msgs.Cids))
for _, m := range msgs.BlsMessages {
vmm = append(vmm, m)
}
2019-11-15 16:38:56 +00:00
2019-11-15 18:37:57 +00:00
for _, m := range msgs.SecpkMessages {
vmm = append(vmm, &m.Message)
}
2019-11-15 16:38:56 +00:00
2019-11-15 18:37:57 +00:00
lk.Lock()
for _, message := range vmm {
messages[message.Cid()] = message
inclusions[header.Cid()] = append(inclusions[header.Cid()], message.Cid())
}
lk.Unlock()
})
2019-11-15 16:38:56 +00:00
return messages, inclusions
}
2019-12-03 11:05:12 +00:00
type mrec struct {
msg cid.Cid
state cid.Cid
idx int
}
func fetchParentReceipts(ctx context.Context, api api.FullNode, toSync map[cid.Cid]*types.BlockHeader) map[mrec]*types.MessageReceipt {
var lk sync.Mutex
out := map[mrec]*types.MessageReceipt{}
par(50, maparr(toSync), func(header *types.BlockHeader) {
recs, err := api.ChainGetParentReceipts(ctx, header.Cid())
if err != nil {
log.Error(err)
return
}
msgs, err := api.ChainGetParentMessages(ctx, header.Cid())
if err != nil {
log.Error(err)
return
}
lk.Lock()
for i, r := range recs {
out[mrec{
msg: msgs[i].Cid,
state: header.ParentStateRoot,
idx: i,
}] = r
}
lk.Unlock()
})
return out
}