chainsync: make batch size configurable
This commit is contained in:
parent
ec50048a29
commit
837dd0d241
@ -60,6 +60,10 @@ var runCmd = &cli.Command{
|
|||||||
Name: "front",
|
Name: "front",
|
||||||
Value: "127.0.0.1:8418",
|
Value: "127.0.0.1:8418",
|
||||||
},
|
},
|
||||||
|
&cli.IntFlag{
|
||||||
|
Name: "max-batch",
|
||||||
|
Value: 1000,
|
||||||
|
},
|
||||||
},
|
},
|
||||||
Action: func(cctx *cli.Context) error {
|
Action: func(cctx *cli.Context) error {
|
||||||
api, closer, err := lcli.GetFullNodeAPI(cctx)
|
api, closer, err := lcli.GetFullNodeAPI(cctx)
|
||||||
@ -76,13 +80,15 @@ var runCmd = &cli.Command{
|
|||||||
|
|
||||||
log.Info("Remote version: %s", v.Version)
|
log.Info("Remote version: %s", v.Version)
|
||||||
|
|
||||||
|
maxBatch := cctx.Int("max-batch")
|
||||||
|
|
||||||
st, err := openStorage(cctx.String("db"))
|
st, err := openStorage(cctx.String("db"))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
defer st.close()
|
defer st.close()
|
||||||
|
|
||||||
runSyncer(ctx, api, st)
|
runSyncer(ctx, api, st, maxBatch)
|
||||||
|
|
||||||
h, err := newHandler(api, st)
|
h, err := newHandler(api, st)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -18,9 +18,7 @@ import (
|
|||||||
"github.com/filecoin-project/lotus/chain/types"
|
"github.com/filecoin-project/lotus/chain/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
const maxBatch = 3000
|
func runSyncer(ctx context.Context, api api.FullNode, st *storage, maxBatch int) {
|
||||||
|
|
||||||
func runSyncer(ctx context.Context, api api.FullNode, st *storage) {
|
|
||||||
notifs, err := api.ChainNotify(ctx)
|
notifs, err := api.ChainNotify(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
@ -32,7 +30,7 @@ func runSyncer(ctx context.Context, api api.FullNode, st *storage) {
|
|||||||
case store.HCCurrent:
|
case store.HCCurrent:
|
||||||
fallthrough
|
fallthrough
|
||||||
case store.HCApply:
|
case store.HCApply:
|
||||||
syncHead(ctx, api, st, change.Val)
|
syncHead(ctx, api, st, change.Val, maxBatch)
|
||||||
case store.HCRevert:
|
case store.HCRevert:
|
||||||
log.Warnf("revert todo")
|
log.Warnf("revert todo")
|
||||||
}
|
}
|
||||||
@ -65,7 +63,7 @@ type actorInfo struct {
|
|||||||
state string
|
state string
|
||||||
}
|
}
|
||||||
|
|
||||||
func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipSet) {
|
func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipSet, maxBatch int) {
|
||||||
var alk sync.Mutex
|
var alk sync.Mutex
|
||||||
|
|
||||||
log.Infof("Getting synced block list")
|
log.Infof("Getting synced block list")
|
||||||
@ -123,7 +121,7 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipS
|
|||||||
|
|
||||||
toSync := map[cid.Cid]*types.BlockHeader{}
|
toSync := map[cid.Cid]*types.BlockHeader{}
|
||||||
for c, header := range allToSync {
|
for c, header := range allToSync {
|
||||||
if header.Height < minH+maxBatch {
|
if header.Height < minH+uint64(maxBatch) {
|
||||||
toSync[c] = header
|
toSync[c] = header
|
||||||
addresses[header.Miner] = address.Undef
|
addresses[header.Miner] = address.Undef
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user