revert #7646 from v1.14.0 - this is not concensus breaking change and needs more testing before landing

This commit is contained in:
Jennifer Wang 2022-01-14 17:28:01 -05:00
parent 5f1ae545e3
commit ca444bb71d
5 changed files with 375 additions and 472 deletions

View File

@ -64,7 +64,7 @@ func NewAutobatch(ctx context.Context, backingBs Blockstore, bufferCapacity int)
return bs return bs
} }
func (bs *AutobatchBlockstore) Put(ctx context.Context, blk block.Block) error { func (bs *AutobatchBlockstore) Put(blk block.Block) error {
bs.stateLock.Lock() bs.stateLock.Lock()
defer bs.stateLock.Unlock() defer bs.stateLock.Unlock()
@ -94,19 +94,19 @@ func (bs *AutobatchBlockstore) flushWorker(ctx context.Context) {
case <-bs.flushCh: case <-bs.flushCh:
// TODO: check if we _should_ actually flush. We could get a spurious wakeup // TODO: check if we _should_ actually flush. We could get a spurious wakeup
// here. // here.
putErr := bs.doFlush(ctx, false) putErr := bs.doFlush(false)
for putErr != nil { for putErr != nil {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return
case <-time.After(bs.flushRetryDelay): case <-time.After(bs.flushRetryDelay):
autolog.Errorf("FLUSH ERRORED: %w, retrying after %v", putErr, bs.flushRetryDelay) autolog.Errorf("FLUSH ERRORED: %w, retrying after %v", putErr, bs.flushRetryDelay)
putErr = bs.doFlush(ctx, true) putErr = bs.doFlush(true)
} }
} }
case <-ctx.Done(): case <-ctx.Done():
// Do one last flush. // Do one last flush.
_ = bs.doFlush(ctx, false) _ = bs.doFlush(false)
return return
} }
} }
@ -114,13 +114,13 @@ func (bs *AutobatchBlockstore) flushWorker(ctx context.Context) {
// caller must NOT hold stateLock // caller must NOT hold stateLock
// set retryOnly to true to only retry a failed flush and not flush anything new. // set retryOnly to true to only retry a failed flush and not flush anything new.
func (bs *AutobatchBlockstore) doFlush(ctx context.Context, retryOnly bool) error { func (bs *AutobatchBlockstore) doFlush(retryOnly bool) error {
bs.doFlushLock.Lock() bs.doFlushLock.Lock()
defer bs.doFlushLock.Unlock() defer bs.doFlushLock.Unlock()
// If we failed to flush last time, try flushing again. // If we failed to flush last time, try flushing again.
if bs.flushErr != nil { if bs.flushErr != nil {
bs.flushErr = bs.backingBs.PutMany(ctx, bs.flushingBatch.blockList) bs.flushErr = bs.backingBs.PutMany(bs.flushingBatch.blockList)
} }
// If we failed, or we're _only_ retrying, bail. // If we failed, or we're _only_ retrying, bail.
@ -137,7 +137,7 @@ func (bs *AutobatchBlockstore) doFlush(ctx context.Context, retryOnly bool) erro
bs.stateLock.Unlock() bs.stateLock.Unlock()
// And try to flush it. // And try to flush it.
bs.flushErr = bs.backingBs.PutMany(ctx, bs.flushingBatch.blockList) bs.flushErr = bs.backingBs.PutMany(bs.flushingBatch.blockList)
// If we succeeded, reset the batch. Otherwise, we'll try again next time. // If we succeeded, reset the batch. Otherwise, we'll try again next time.
if bs.flushErr == nil { if bs.flushErr == nil {
@ -151,7 +151,7 @@ func (bs *AutobatchBlockstore) doFlush(ctx context.Context, retryOnly bool) erro
// caller must NOT hold stateLock // caller must NOT hold stateLock
func (bs *AutobatchBlockstore) Flush(ctx context.Context) error { func (bs *AutobatchBlockstore) Flush(ctx context.Context) error {
return bs.doFlush(ctx, false) return bs.doFlush(false)
} }
func (bs *AutobatchBlockstore) Shutdown(ctx context.Context) error { func (bs *AutobatchBlockstore) Shutdown(ctx context.Context) error {
@ -169,9 +169,9 @@ func (bs *AutobatchBlockstore) Shutdown(ctx context.Context) error {
return bs.flushErr return bs.flushErr
} }
func (bs *AutobatchBlockstore) Get(ctx context.Context, c cid.Cid) (block.Block, error) { func (bs *AutobatchBlockstore) Get(c cid.Cid) (block.Block, error) {
// may seem backward to check the backingBs first, but that is the likeliest case // may seem backward to check the backingBs first, but that is the likeliest case
blk, err := bs.backingBs.Get(ctx, c) blk, err := bs.backingBs.Get(c)
if err == nil { if err == nil {
return blk, nil return blk, nil
} }
@ -192,10 +192,10 @@ func (bs *AutobatchBlockstore) Get(ctx context.Context, c cid.Cid) (block.Block,
return v, nil return v, nil
} }
return bs.Get(ctx, c) return bs.Get(c)
} }
func (bs *AutobatchBlockstore) DeleteBlock(context.Context, cid.Cid) error { func (bs *AutobatchBlockstore) DeleteBlock(cid.Cid) error {
// if we wanted to support this, we would have to: // if we wanted to support this, we would have to:
// - flush // - flush
// - delete from the backingBs (if present) // - delete from the backingBs (if present)
@ -204,13 +204,13 @@ func (bs *AutobatchBlockstore) DeleteBlock(context.Context, cid.Cid) error {
return xerrors.New("deletion is unsupported") return xerrors.New("deletion is unsupported")
} }
func (bs *AutobatchBlockstore) DeleteMany(ctx context.Context, cids []cid.Cid) error { func (bs *AutobatchBlockstore) DeleteMany(cids []cid.Cid) error {
// see note in DeleteBlock() // see note in DeleteBlock()
return xerrors.New("deletion is unsupported") return xerrors.New("deletion is unsupported")
} }
func (bs *AutobatchBlockstore) Has(ctx context.Context, c cid.Cid) (bool, error) { func (bs *AutobatchBlockstore) Has(c cid.Cid) (bool, error) {
_, err := bs.Get(ctx, c) _, err := bs.Get(c)
if err == nil { if err == nil {
return true, nil return true, nil
} }
@ -221,8 +221,8 @@ func (bs *AutobatchBlockstore) Has(ctx context.Context, c cid.Cid) (bool, error)
return false, err return false, err
} }
func (bs *AutobatchBlockstore) GetSize(ctx context.Context, c cid.Cid) (int, error) { func (bs *AutobatchBlockstore) GetSize(c cid.Cid) (int, error) {
blk, err := bs.Get(ctx, c) blk, err := bs.Get(c)
if err != nil { if err != nil {
return 0, err return 0, err
} }
@ -230,9 +230,9 @@ func (bs *AutobatchBlockstore) GetSize(ctx context.Context, c cid.Cid) (int, err
return len(blk.RawData()), nil return len(blk.RawData()), nil
} }
func (bs *AutobatchBlockstore) PutMany(ctx context.Context, blks []block.Block) error { func (bs *AutobatchBlockstore) PutMany(blks []block.Block) error {
for _, blk := range blks { for _, blk := range blks {
if err := bs.Put(ctx, blk); err != nil { if err := bs.Put(blk); err != nil {
return err return err
} }
} }
@ -252,8 +252,8 @@ func (bs *AutobatchBlockstore) HashOnRead(enabled bool) {
bs.backingBs.HashOnRead(enabled) bs.backingBs.HashOnRead(enabled)
} }
func (bs *AutobatchBlockstore) View(ctx context.Context, cid cid.Cid, callback func([]byte) error) error { func (bs *AutobatchBlockstore) View(cid cid.Cid, callback func([]byte) error) error {
blk, err := bs.Get(ctx, cid) blk, err := bs.Get(cid)
if err != nil { if err != nil {
return err return err
} }

View File

@ -82,12 +82,12 @@ var migrationsCmd = &cli.Command{
cache := nv15.NewMemMigrationCache() cache := nv15.NewMemMigrationCache()
blk, err := cs.GetBlock(ctx, blkCid) blk, err := cs.GetBlock(blkCid)
if err != nil { if err != nil {
return err return err
} }
migrationTs, err := cs.LoadTipSet(ctx, types.NewTipSetKey(blk.Parents...)) migrationTs, err := cs.LoadTipSet(types.NewTipSetKey(blk.Parents...))
if err != nil { if err != nil {
return err return err
} }

View File

@ -0,0 +1,181 @@
package main
import (
"bytes"
"context"
"fmt"
"io"
"strconv"
"github.com/filecoin-project/lotus/chain/actors/builtin"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/chain/actors/builtin/market"
"github.com/filecoin-project/lotus/chain/actors/adt"
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
"github.com/filecoin-project/lotus/chain/consensus/filcns"
"github.com/filecoin-project/lotus/chain/state"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/node/repo"
miner2 "github.com/filecoin-project/specs-actors/actors/builtin/miner"
"github.com/ipfs/go-cid"
cbor "github.com/ipfs/go-ipld-cbor"
"github.com/urfave/cli/v2"
)
var terminationsCmd = &cli.Command{
Name: "terminations",
Description: "Lists terminated deals from the past 2 days",
ArgsUsage: "[block to look back from] [lookback period (epochs)]",
Flags: []cli.Flag{
&cli.StringFlag{
Name: "repo",
Value: "~/.lotus",
},
},
Action: func(cctx *cli.Context) error {
ctx := context.TODO()
if cctx.NArg() != 2 {
return fmt.Errorf("must pass block cid && lookback period")
}
blkCid, err := cid.Decode(cctx.Args().First())
if err != nil {
return fmt.Errorf("failed to parse input: %w", err)
}
fsrepo, err := repo.NewFS(cctx.String("repo"))
if err != nil {
return err
}
lkrepo, err := fsrepo.Lock(repo.FullNode)
if err != nil {
return err
}
defer lkrepo.Close() //nolint:errcheck
bs, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore)
if err != nil {
return fmt.Errorf("failed to open blockstore: %w", err)
}
defer func() {
if c, ok := bs.(io.Closer); ok {
if err := c.Close(); err != nil {
log.Warnf("failed to close blockstore: %s", err)
}
}
}()
mds, err := lkrepo.Datastore(context.Background(), "/metadata")
if err != nil {
return err
}
cs := store.NewChainStore(bs, bs, mds, filcns.Weight, nil)
defer cs.Close() //nolint:errcheck
cst := cbor.NewCborStore(bs)
store := adt.WrapStore(ctx, cst)
blk, err := cs.GetBlock(blkCid)
if err != nil {
return err
}
lbp, err := strconv.Atoi(cctx.Args().Get(1))
if err != nil {
return fmt.Errorf("failed to parse input: %w", err)
}
cutoff := blk.Height - abi.ChainEpoch(lbp)
for blk.Height > cutoff {
pts, err := cs.LoadTipSet(types.NewTipSetKey(blk.Parents...))
if err != nil {
return err
}
blk = pts.Blocks()[0]
msgs, err := cs.MessagesForTipset(pts)
if err != nil {
return err
}
for _, v := range msgs {
msg := v.VMMessage()
if msg.Method != miner.Methods.TerminateSectors {
continue
}
tree, err := state.LoadStateTree(cst, blk.ParentStateRoot)
if err != nil {
return err
}
minerAct, err := tree.GetActor(msg.To)
if err != nil {
return err
}
if !builtin.IsStorageMinerActor(minerAct.Code) {
continue
}
minerSt, err := miner.Load(store, minerAct)
if err != nil {
return err
}
marketAct, err := tree.GetActor(market.Address)
if err != nil {
return err
}
marketSt, err := market.Load(store, marketAct)
if err != nil {
return err
}
proposals, err := marketSt.Proposals()
if err != nil {
return err
}
var termParams miner2.TerminateSectorsParams
err = termParams.UnmarshalCBOR(bytes.NewBuffer(msg.Params))
if err != nil {
return err
}
for _, t := range termParams.Terminations {
sectors, err := minerSt.LoadSectors(&t.Sectors)
if err != nil {
return err
}
for _, sector := range sectors {
for _, deal := range sector.DealIDs {
prop, find, err := proposals.Get(deal)
if err != nil {
return err
}
if find {
fmt.Printf("%s, %d, %d, %s, %s, %s\n", msg.To, sector.SectorNumber, deal, prop.Client, prop.PieceCID, prop.Label)
}
}
}
}
}
}
return nil
},
}

View File

@ -9,7 +9,7 @@ require (
github.com/drand/drand v1.2.1 github.com/drand/drand v1.2.1
github.com/filecoin-project/go-address v0.0.6 github.com/filecoin-project/go-address v0.0.6
github.com/filecoin-project/go-data-transfer v1.11.4 github.com/filecoin-project/go-data-transfer v1.11.4
github.com/filecoin-project/go-fil-markets v1.13.3 github.com/filecoin-project/go-fil-markets v1.13.4
github.com/filecoin-project/go-jsonrpc v0.1.5 github.com/filecoin-project/go-jsonrpc v0.1.5
github.com/filecoin-project/go-state-types v0.1.3 github.com/filecoin-project/go-state-types v0.1.3
github.com/filecoin-project/go-storedcounter v0.1.0 github.com/filecoin-project/go-storedcounter v0.1.0
@ -20,7 +20,7 @@ require (
github.com/hashicorp/go-multierror v1.1.1 github.com/hashicorp/go-multierror v1.1.1
github.com/influxdata/influxdb v1.9.4 // indirect github.com/influxdata/influxdb v1.9.4 // indirect
github.com/ipfs/go-cid v0.1.0 github.com/ipfs/go-cid v0.1.0
github.com/ipfs/go-datastore v0.4.6 github.com/ipfs/go-datastore v0.5.1
github.com/ipfs/go-ipfs-files v0.0.9 github.com/ipfs/go-ipfs-files v0.0.9
github.com/ipfs/go-ipld-format v0.2.0 github.com/ipfs/go-ipld-format v0.2.0
github.com/ipfs/go-log/v2 v2.3.0 github.com/ipfs/go-log/v2 v2.3.0

File diff suppressed because it is too large Load Diff