Merge pull request #1282 from filecoin-project/fix/chainwatch-memory
chainwatch: reduce memory usage during large chain gaps
This commit is contained in:
commit
7d499df001
@ -3,6 +3,7 @@ package main
|
||||
import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
_ "net/http/pprof"
|
||||
"os"
|
||||
|
||||
logging "github.com/ipfs/go-log/v2"
|
||||
@ -59,6 +60,10 @@ var runCmd = &cli.Command{
|
||||
Name: "front",
|
||||
Value: "127.0.0.1:8418",
|
||||
},
|
||||
&cli.IntFlag{
|
||||
Name: "max-batch",
|
||||
Value: 1000,
|
||||
},
|
||||
},
|
||||
Action: func(cctx *cli.Context) error {
|
||||
api, closer, err := lcli.GetFullNodeAPI(cctx)
|
||||
@ -75,13 +80,15 @@ var runCmd = &cli.Command{
|
||||
|
||||
log.Info("Remote version: %s", v.Version)
|
||||
|
||||
maxBatch := cctx.Int("max-batch")
|
||||
|
||||
st, err := openStorage(cctx.String("db"))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer st.close()
|
||||
|
||||
runSyncer(ctx, api, st)
|
||||
runSyncer(ctx, api, st, maxBatch)
|
||||
|
||||
h, err := newHandler(api, st)
|
||||
if err != nil {
|
||||
|
@ -18,9 +18,7 @@ import (
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
)
|
||||
|
||||
const maxBatch = 3000
|
||||
|
||||
func runSyncer(ctx context.Context, api api.FullNode, st *storage) {
|
||||
func runSyncer(ctx context.Context, api api.FullNode, st *storage, maxBatch int) {
|
||||
notifs, err := api.ChainNotify(ctx)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
@ -32,7 +30,7 @@ func runSyncer(ctx context.Context, api api.FullNode, st *storage) {
|
||||
case store.HCCurrent:
|
||||
fallthrough
|
||||
case store.HCApply:
|
||||
syncHead(ctx, api, st, change.Val)
|
||||
syncHead(ctx, api, st, change.Val, maxBatch)
|
||||
case store.HCRevert:
|
||||
log.Warnf("revert todo")
|
||||
}
|
||||
@ -65,9 +63,7 @@ type actorInfo struct {
|
||||
state string
|
||||
}
|
||||
|
||||
func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipSet) {
|
||||
addresses := map[address.Address]address.Address{}
|
||||
actors := map[address.Address]map[types.Actor]actorInfo{}
|
||||
func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipSet, maxBatch int) {
|
||||
var alk sync.Mutex
|
||||
|
||||
log.Infof("Getting synced block list")
|
||||
@ -92,7 +88,6 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipS
|
||||
}
|
||||
|
||||
allToSync[bh.Cid()] = bh
|
||||
addresses[bh.Miner] = address.Undef
|
||||
|
||||
if len(allToSync)%500 == 10 {
|
||||
log.Infof("todo: (%d) %s @%d", len(allToSync), bh.Cid(), bh.Height)
|
||||
@ -114,6 +109,8 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipS
|
||||
}
|
||||
|
||||
for len(allToSync) > 0 {
|
||||
actors := map[address.Address]map[types.Actor]actorInfo{}
|
||||
addresses := map[address.Address]address.Address{}
|
||||
minH := uint64(math.MaxUint64)
|
||||
|
||||
for _, header := range allToSync {
|
||||
@ -124,8 +121,9 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipS
|
||||
|
||||
toSync := map[cid.Cid]*types.BlockHeader{}
|
||||
for c, header := range allToSync {
|
||||
if header.Height < minH+maxBatch {
|
||||
if header.Height < minH+uint64(maxBatch) {
|
||||
toSync[c] = header
|
||||
addresses[header.Miner] = address.Undef
|
||||
}
|
||||
}
|
||||
for c := range toSync {
|
||||
@ -200,7 +198,7 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipS
|
||||
log.Error(err)
|
||||
return
|
||||
}
|
||||
ast, err := api.StateReadState(ctx, &act, ts.Key())
|
||||
ast, err := api.StateReadState(ctx, &act, pts.Key())
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
return
|
||||
|
Loading…
Reference in New Issue
Block a user