Merge pull request #2014 from filecoin-project/fix/chain-reorgs-race

fix a potential race with chain reorgs notifees.
This commit is contained in:
Łukasz Magiera 2020-06-12 20:35:16 +02:00 committed by GitHub
commit 8e025db656
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 27 additions and 21 deletions

View File

@ -45,7 +45,7 @@ commands:
- 'v25-2k-lotus-params' - 'v25-2k-lotus-params'
paths: paths:
- /var/tmp/filecoin-proof-parameters/ - /var/tmp/filecoin-proof-parameters/
- run: ./lotus fetch-params --proving-params 2048 - run: ./lotus fetch-params 2048
- save_cache: - save_cache:
name: Save parameters cache name: Save parameters cache
key: 'v25-2k-lotus-params' key: 'v25-2k-lotus-params'

View File

@ -48,6 +48,9 @@ var log = logging.Logger("chainstore")
var chainHeadKey = dstore.NewKey("head") var chainHeadKey = dstore.NewKey("head")
// ReorgNotifee represents a callback that gets called upon reorgs.
type ReorgNotifee func(rev, app []*types.TipSet) error
type ChainStore struct { type ChainStore struct {
bs bstore.Blockstore bs bstore.Blockstore
ds dstore.Datastore ds dstore.Datastore
@ -63,8 +66,8 @@ type ChainStore struct {
cindex *ChainIndex cindex *ChainIndex
reorgCh chan<- reorg reorgCh chan<- reorg
headChangeNotifs []func(rev, app []*types.TipSet) error reorgNotifeeCh chan ReorgNotifee
mmCache *lru.ARCCache mmCache *lru.ARCCache
tsCache *lru.ARCCache tsCache *lru.ARCCache
@ -89,8 +92,6 @@ func NewChainStore(bs bstore.Blockstore, ds dstore.Batching, vmcalls runtime.Sys
cs.cindex = ci cs.cindex = ci
cs.reorgCh = cs.reorgWorker(context.TODO())
hcnf := func(rev, app []*types.TipSet) error { hcnf := func(rev, app []*types.TipSet) error {
cs.pubLk.Lock() cs.pubLk.Lock()
defer cs.pubLk.Unlock() defer cs.pubLk.Unlock()
@ -122,7 +123,8 @@ func NewChainStore(bs bstore.Blockstore, ds dstore.Batching, vmcalls runtime.Sys
return nil return nil
} }
cs.headChangeNotifs = append(cs.headChangeNotifs, hcnf, hcmetric) cs.reorgNotifeeCh = make(chan ReorgNotifee)
cs.reorgCh = cs.reorgWorker(context.TODO(), []ReorgNotifee{hcnf, hcmetric})
return cs return cs
} }
@ -211,8 +213,8 @@ func (cs *ChainStore) SubHeadChanges(ctx context.Context) chan []*api.HeadChange
return out return out
} }
func (cs *ChainStore) SubscribeHeadChanges(f func(rev, app []*types.TipSet) error) { func (cs *ChainStore) SubscribeHeadChanges(f ReorgNotifee) {
cs.headChangeNotifs = append(cs.headChangeNotifs, f) cs.reorgNotifeeCh <- f
} }
func (cs *ChainStore) SetGenesis(b *types.BlockHeader) error { func (cs *ChainStore) SetGenesis(b *types.BlockHeader) error {
@ -273,13 +275,19 @@ type reorg struct {
new *types.TipSet new *types.TipSet
} }
func (cs *ChainStore) reorgWorker(ctx context.Context) chan<- reorg { func (cs *ChainStore) reorgWorker(ctx context.Context, initialNotifees []ReorgNotifee) chan<- reorg {
out := make(chan reorg, 32) out := make(chan reorg, 32)
notifees := make([]ReorgNotifee, len(initialNotifees))
copy(notifees, initialNotifees)
go func() { go func() {
defer log.Warn("reorgWorker quit") defer log.Warn("reorgWorker quit")
for { for {
select { select {
case n := <-cs.reorgNotifeeCh:
notifees = append(notifees, n)
case r := <-out: case r := <-out:
revert, apply, err := cs.ReorgOps(r.old, r.new) revert, apply, err := cs.ReorgOps(r.old, r.new)
if err != nil { if err != nil {
@ -293,7 +301,7 @@ func (cs *ChainStore) reorgWorker(ctx context.Context) chan<- reorg {
apply[i], apply[opp] = apply[opp], apply[i] apply[i], apply[opp] = apply[opp], apply[i]
} }
for _, hcf := range cs.headChangeNotifs { for _, hcf := range notifees {
if err := hcf(revert, apply); err != nil { if err := hcf(revert, apply); err != nil {
log.Error("head change func errored (BAD): ", err) log.Error("head change func errored (BAD): ", err)
} }

View File

@ -10,18 +10,16 @@ import (
) )
var fetchParamCmd = &cli.Command{ var fetchParamCmd = &cli.Command{
Name: "fetch-params", Name: "fetch-params",
Usage: "Fetch proving parameters", Usage: "Fetch proving parameters",
Flags: []cli.Flag{ ArgsUsage: "[sectorSize]",
&cli.StringFlag{
Name: "proving-params",
Usage: "download params used creating proofs for given size, i.e. 32GiB",
},
},
Action: func(cctx *cli.Context) error { Action: func(cctx *cli.Context) error {
sectorSizeInt, err := units.RAMInBytes(cctx.String("proving-params")) if !cctx.Args().Present() {
return xerrors.Errorf("must pass sector size to fetch params for (specify as \"32GiB\", for instance)")
}
sectorSizeInt, err := units.RAMInBytes(cctx.Args().First())
if err != nil { if err != nil {
return err return xerrors.Errorf("error parsing sector size (specify as \"32GiB\", for instance): %w", err)
} }
sectorSize := uint64(sectorSizeInt) sectorSize := uint64(sectorSizeInt)

View File

@ -8,7 +8,7 @@ make 2k
Download the 2048 byte parameters: Download the 2048 byte parameters:
```sh ```sh
./lotus fetch-params --proving-params 2048 ./lotus fetch-params 2048
``` ```
Pre-seal some sectors: Pre-seal some sectors: