Merge remote-tracking branch 'origin/nonsense/split-market-miner-processes' into feat/replace-multistore-carv2
This commit is contained in:
commit
b40985a57c
@ -811,6 +811,11 @@ workflows:
|
|||||||
suite: itest-multisig
|
suite: itest-multisig
|
||||||
target: "./itests/multisig_test.go"
|
target: "./itests/multisig_test.go"
|
||||||
|
|
||||||
|
- test:
|
||||||
|
name: test-itest-nonce
|
||||||
|
suite: itest-nonce
|
||||||
|
target: "./itests/nonce_test.go"
|
||||||
|
|
||||||
- test:
|
- test:
|
||||||
name: test-itest-paych_api
|
name: test-itest-paych_api
|
||||||
suite: itest-paych_api
|
suite: itest-paych_api
|
||||||
@ -826,6 +831,11 @@ workflows:
|
|||||||
suite: itest-sdr_upgrade
|
suite: itest-sdr_upgrade
|
||||||
target: "./itests/sdr_upgrade_test.go"
|
target: "./itests/sdr_upgrade_test.go"
|
||||||
|
|
||||||
|
- test:
|
||||||
|
name: test-itest-sector_finalize_early
|
||||||
|
suite: itest-sector_finalize_early
|
||||||
|
target: "./itests/sector_finalize_early_test.go"
|
||||||
|
|
||||||
- test:
|
- test:
|
||||||
name: test-itest-sector_pledge
|
name: test-itest-sector_pledge
|
||||||
suite: itest-sector_pledge
|
suite: itest-sector_pledge
|
||||||
|
@ -547,7 +547,7 @@ func (st *StateTree) Version() types.StateTreeVersion {
|
|||||||
return st.version
|
return st.version
|
||||||
}
|
}
|
||||||
|
|
||||||
func Diff(oldTree, newTree *StateTree) (map[string]types.Actor, error) {
|
func Diff(ctx context.Context, oldTree, newTree *StateTree) (map[string]types.Actor, error) {
|
||||||
out := map[string]types.Actor{}
|
out := map[string]types.Actor{}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@ -555,6 +555,10 @@ func Diff(oldTree, newTree *StateTree) (map[string]types.Actor, error) {
|
|||||||
buf = bytes.NewReader(nil)
|
buf = bytes.NewReader(nil)
|
||||||
)
|
)
|
||||||
if err := newTree.root.ForEach(&ncval, func(k string) error {
|
if err := newTree.root.ForEach(&ncval, func(k string) error {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return ctx.Err()
|
||||||
|
default:
|
||||||
var act types.Actor
|
var act types.Actor
|
||||||
|
|
||||||
addr, err := address.NewFromBytes([]byte(k))
|
addr, err := address.NewFromBytes([]byte(k))
|
||||||
@ -582,6 +586,7 @@ func Diff(oldTree, newTree *StateTree) (map[string]types.Actor, error) {
|
|||||||
out[addr.String()] = act
|
out[addr.String()] = act
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
}
|
||||||
}); err != nil {
|
}); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -23,6 +23,11 @@ func (f FIL) Unitless() string {
|
|||||||
return strings.TrimRight(strings.TrimRight(r.FloatString(18), "0"), ".")
|
return strings.TrimRight(strings.TrimRight(r.FloatString(18), "0"), ".")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var AttoFil = NewInt(1)
|
||||||
|
var FemtoFil = BigMul(AttoFil, NewInt(1000))
|
||||||
|
var PicoFil = BigMul(FemtoFil, NewInt(1000))
|
||||||
|
var NanoFil = BigMul(PicoFil, NewInt(1000))
|
||||||
|
|
||||||
var unitPrefixes = []string{"a", "f", "p", "n", "μ", "m"}
|
var unitPrefixes = []string{"a", "f", "p", "n", "μ", "m"}
|
||||||
|
|
||||||
func (f FIL) Short() string {
|
func (f FIL) Short() string {
|
||||||
|
@ -1031,6 +1031,8 @@ var ChainExportCmd = &cli.Command{
|
|||||||
Flags: []cli.Flag{
|
Flags: []cli.Flag{
|
||||||
&cli.StringFlag{
|
&cli.StringFlag{
|
||||||
Name: "tipset",
|
Name: "tipset",
|
||||||
|
Usage: "specify tipset to start the export from",
|
||||||
|
Value: "@head",
|
||||||
},
|
},
|
||||||
&cli.Int64Flag{
|
&cli.Int64Flag{
|
||||||
Name: "recent-stateroots",
|
Name: "recent-stateroots",
|
||||||
|
@ -210,7 +210,7 @@ var filplusCheckClientCmd = &cli.Command{
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if dcap == nil {
|
if dcap == nil {
|
||||||
return xerrors.Errorf("client %s is not a verified client", err)
|
return xerrors.Errorf("client %s is not a verified client", caddr)
|
||||||
}
|
}
|
||||||
|
|
||||||
fmt.Println(*dcap)
|
fmt.Println(*dcap)
|
||||||
|
@ -101,7 +101,7 @@ var storageAttachCmd = &cli.Command{
|
|||||||
}
|
}
|
||||||
|
|
||||||
if !(cfg.CanStore || cfg.CanSeal) {
|
if !(cfg.CanStore || cfg.CanSeal) {
|
||||||
return xerrors.Errorf("must specify at least one of --store of --seal")
|
return xerrors.Errorf("must specify at least one of --store or --seal")
|
||||||
}
|
}
|
||||||
|
|
||||||
b, err := json.MarshalIndent(cfg, "", " ")
|
b, err := json.MarshalIndent(cfg, "", " ")
|
||||||
|
@ -470,7 +470,7 @@ func storageMinerInit(ctx context.Context, cctx *cli.Context, api v1api.FullNode
|
|||||||
AllowPreCommit2: true,
|
AllowPreCommit2: true,
|
||||||
AllowCommit: true,
|
AllowCommit: true,
|
||||||
AllowUnseal: true,
|
AllowUnseal: true,
|
||||||
}, sa, wsts, smsts)
|
}, wsts, smsts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -31,7 +31,6 @@ import (
|
|||||||
"github.com/filecoin-project/lotus/node/repo"
|
"github.com/filecoin-project/lotus/node/repo"
|
||||||
)
|
)
|
||||||
|
|
||||||
// ctx context.Context, api lapi.FullNode, addr address.Address, peerid peer.ID
|
|
||||||
func restore(ctx context.Context, cctx *cli.Context, manageConfig func(*config.StorageMiner) error, after func(api lapi.FullNode, addr address.Address, peerid peer.ID, mi miner.MinerInfo) error) error {
|
func restore(ctx context.Context, cctx *cli.Context, manageConfig func(*config.StorageMiner) error, after func(api lapi.FullNode, addr address.Address, peerid peer.ID, mi miner.MinerInfo) error) error {
|
||||||
if cctx.Args().Len() != 1 {
|
if cctx.Args().Len() != 1 {
|
||||||
return xerrors.Errorf("expected 1 argument")
|
return xerrors.Errorf("expected 1 argument")
|
||||||
|
@ -175,8 +175,6 @@ var runCmd = &cli.Command{
|
|||||||
if err := minerapi.NetConnect(ctx, remoteAddrs); err != nil {
|
if err := minerapi.NetConnect(ctx, remoteAddrs); err != nil {
|
||||||
return xerrors.Errorf("connecting to full node (libp2p): %w", err)
|
return xerrors.Errorf("connecting to full node (libp2p): %w", err)
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
log.Infof("No markets subsystem enabled, so no libp2p network bootstrapping")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Infof("Remote version %s", v)
|
log.Infof("Remote version %s", v)
|
||||||
|
@ -145,7 +145,7 @@ over time
|
|||||||
}
|
}
|
||||||
|
|
||||||
if !(cfg.CanStore || cfg.CanSeal) {
|
if !(cfg.CanStore || cfg.CanSeal) {
|
||||||
return xerrors.Errorf("must specify at least one of --store of --seal")
|
return xerrors.Errorf("must specify at least one of --store or --seal")
|
||||||
}
|
}
|
||||||
|
|
||||||
b, err := json.MarshalIndent(cfg, "", " ")
|
b, err := json.MarshalIndent(cfg, "", " ")
|
||||||
|
@ -2140,7 +2140,7 @@ USAGE:
|
|||||||
lotus chain export [command options] [outputPath]
|
lotus chain export [command options] [outputPath]
|
||||||
|
|
||||||
OPTIONS:
|
OPTIONS:
|
||||||
--tipset value
|
--tipset value specify tipset to start the export from (default: "@head")
|
||||||
--recent-stateroots value specify the number of recent state roots to include in the export (default: 0)
|
--recent-stateroots value specify the number of recent state roots to include in the export (default: 0)
|
||||||
--skip-old-msgs (default: false)
|
--skip-old-msgs (default: false)
|
||||||
--help, -h show help (default: false)
|
--help, -h show help (default: false)
|
||||||
|
19
extern/sector-storage/manager.go
vendored
19
extern/sector-storage/manager.go
vendored
@ -120,7 +120,7 @@ type StorageAuth http.Header
|
|||||||
type WorkerStateStore *statestore.StateStore
|
type WorkerStateStore *statestore.StateStore
|
||||||
type ManagerStateStore *statestore.StateStore
|
type ManagerStateStore *statestore.StateStore
|
||||||
|
|
||||||
func New(ctx context.Context, lstor *stores.Local, stor *stores.Remote, ls stores.LocalStorage, si stores.SectorIndex, sc SealerConfig, sa StorageAuth, wss WorkerStateStore, mss ManagerStateStore) (*Manager, error) {
|
func New(ctx context.Context, lstor *stores.Local, stor *stores.Remote, ls stores.LocalStorage, si stores.SectorIndex, sc SealerConfig, wss WorkerStateStore, mss ManagerStateStore) (*Manager, error) {
|
||||||
prover, err := ffiwrapper.New(&readonlyProvider{stor: lstor, index: si})
|
prover, err := ffiwrapper.New(&readonlyProvider{stor: lstor, index: si})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, xerrors.Errorf("creating prover instance: %w", err)
|
return nil, xerrors.Errorf("creating prover instance: %w", err)
|
||||||
@ -526,10 +526,25 @@ func (m *Manager) FinalizeSector(ctx context.Context, sector storage.SectorRef,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pathType := storiface.PathStorage
|
||||||
|
{
|
||||||
|
sealedStores, err := m.index.StorageFindSector(ctx, sector.ID, storiface.FTSealed, 0, false)
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("finding sealed sector: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, store := range sealedStores {
|
||||||
|
if store.CanSeal {
|
||||||
|
pathType = storiface.PathSealing
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
selector := newExistingSelector(m.index, sector.ID, storiface.FTCache|storiface.FTSealed, false)
|
selector := newExistingSelector(m.index, sector.ID, storiface.FTCache|storiface.FTSealed, false)
|
||||||
|
|
||||||
err := m.sched.Schedule(ctx, sector, sealtasks.TTFinalize, selector,
|
err := m.sched.Schedule(ctx, sector, sealtasks.TTFinalize, selector,
|
||||||
m.schedFetch(sector, storiface.FTCache|storiface.FTSealed|unsealed, storiface.PathSealing, storiface.AcquireMove),
|
m.schedFetch(sector, storiface.FTCache|storiface.FTSealed|unsealed, pathType, storiface.AcquireMove),
|
||||||
func(ctx context.Context, w Worker) error {
|
func(ctx context.Context, w Worker) error {
|
||||||
_, err := m.waitSimpleCall(ctx)(w.FinalizeSector(ctx, sector, keepUnsealed))
|
_, err := m.waitSimpleCall(ctx)(w.FinalizeSector(ctx, sector, keepUnsealed))
|
||||||
return err
|
return err
|
||||||
|
2
extern/sector-storage/piece_provider_test.go
vendored
2
extern/sector-storage/piece_provider_test.go
vendored
@ -216,7 +216,7 @@ func newPieceProviderTestHarness(t *testing.T, mgrConfig SealerConfig, sectorPro
|
|||||||
wsts := statestore.New(namespace.Wrap(dstore, datastore.NewKey("/worker/calls")))
|
wsts := statestore.New(namespace.Wrap(dstore, datastore.NewKey("/worker/calls")))
|
||||||
smsts := statestore.New(namespace.Wrap(dstore, datastore.NewKey("/stmgr/calls")))
|
smsts := statestore.New(namespace.Wrap(dstore, datastore.NewKey("/stmgr/calls")))
|
||||||
|
|
||||||
mgr, err := New(ctx, localStore, remoteStore, storage, index, mgrConfig, nil, wsts, smsts)
|
mgr, err := New(ctx, localStore, remoteStore, storage, index, mgrConfig, wsts, smsts)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
// start a http server on the manager to serve sector file requests.
|
// start a http server on the manager to serve sector file requests.
|
||||||
|
1
extern/sector-storage/stores/remote.go
vendored
1
extern/sector-storage/stores/remote.go
vendored
@ -53,7 +53,6 @@ func (r *Remote) RemoveCopies(ctx context.Context, s abi.SectorID, types storifa
|
|||||||
}
|
}
|
||||||
|
|
||||||
func NewRemote(local Store, index SectorIndex, auth http.Header, fetchLimit int, pfHandler partialFileHandler) *Remote {
|
func NewRemote(local Store, index SectorIndex, auth http.Header, fetchLimit int, pfHandler partialFileHandler) *Remote {
|
||||||
fmt.Printf("Creating NewRemote: %#v \n", auth)
|
|
||||||
return &Remote{
|
return &Remote{
|
||||||
local: local,
|
local: local,
|
||||||
index: index,
|
index: index,
|
||||||
|
2
extern/storage-sealing/checks.go
vendored
2
extern/storage-sealing/checks.go
vendored
@ -62,7 +62,7 @@ func checkPieces(ctx context.Context, maddr address.Address, si SectorInfo, api
|
|||||||
}
|
}
|
||||||
|
|
||||||
if proposal.PieceCID != p.Piece.PieceCID {
|
if proposal.PieceCID != p.Piece.PieceCID {
|
||||||
return &ErrInvalidDeals{xerrors.Errorf("piece %d (of %d) of sector %d refers deal %d with wrong PieceCID: %x != %x", i, len(si.Pieces), si.SectorNumber, p.DealInfo.DealID, p.Piece.PieceCID, proposal.PieceCID)}
|
return &ErrInvalidDeals{xerrors.Errorf("piece %d (of %d) of sector %d refers deal %d with wrong PieceCID: %s != %s", i, len(si.Pieces), si.SectorNumber, p.DealInfo.DealID, p.Piece.PieceCID, proposal.PieceCID)}
|
||||||
}
|
}
|
||||||
|
|
||||||
if p.Piece.Size != proposal.PieceSize {
|
if p.Piece.Size != proposal.PieceSize {
|
||||||
|
47
extern/storage-sealing/commit_batch.go
vendored
47
extern/storage-sealing/commit_batch.go
vendored
@ -106,6 +106,7 @@ func (b *CommitBatcher) run() {
|
|||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
timer := time.NewTimer(b.batchWait(cfg.CommitBatchWait, cfg.CommitBatchSlack))
|
||||||
for {
|
for {
|
||||||
if forceRes != nil {
|
if forceRes != nil {
|
||||||
forceRes <- lastMsg
|
forceRes <- lastMsg
|
||||||
@ -121,7 +122,7 @@ func (b *CommitBatcher) run() {
|
|||||||
return
|
return
|
||||||
case <-b.notify:
|
case <-b.notify:
|
||||||
sendAboveMax = true
|
sendAboveMax = true
|
||||||
case <-b.batchWait(cfg.CommitBatchWait, cfg.CommitBatchSlack):
|
case <-timer.C:
|
||||||
// do nothing
|
// do nothing
|
||||||
case fr := <-b.force: // user triggered
|
case fr := <-b.force: // user triggered
|
||||||
forceRes = fr
|
forceRes = fr
|
||||||
@ -132,17 +133,26 @@ func (b *CommitBatcher) run() {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warnw("CommitBatcher processBatch error", "error", err)
|
log.Warnw("CommitBatcher processBatch error", "error", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if !timer.Stop() {
|
||||||
|
select {
|
||||||
|
case <-timer.C:
|
||||||
|
default:
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *CommitBatcher) batchWait(maxWait, slack time.Duration) <-chan time.Time {
|
timer.Reset(b.batchWait(cfg.CommitBatchWait, cfg.CommitBatchSlack))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *CommitBatcher) batchWait(maxWait, slack time.Duration) time.Duration {
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
|
|
||||||
b.lk.Lock()
|
b.lk.Lock()
|
||||||
defer b.lk.Unlock()
|
defer b.lk.Unlock()
|
||||||
|
|
||||||
if len(b.todo) == 0 {
|
if len(b.todo) == 0 {
|
||||||
return nil
|
return maxWait
|
||||||
}
|
}
|
||||||
|
|
||||||
var cutoff time.Time
|
var cutoff time.Time
|
||||||
@ -160,12 +170,12 @@ func (b *CommitBatcher) batchWait(maxWait, slack time.Duration) <-chan time.Time
|
|||||||
}
|
}
|
||||||
|
|
||||||
if cutoff.IsZero() {
|
if cutoff.IsZero() {
|
||||||
return time.After(maxWait)
|
return maxWait
|
||||||
}
|
}
|
||||||
|
|
||||||
cutoff = cutoff.Add(-slack)
|
cutoff = cutoff.Add(-slack)
|
||||||
if cutoff.Before(now) {
|
if cutoff.Before(now) {
|
||||||
return time.After(time.Nanosecond) // can't return 0
|
return time.Nanosecond // can't return 0
|
||||||
}
|
}
|
||||||
|
|
||||||
wait := cutoff.Sub(now)
|
wait := cutoff.Sub(now)
|
||||||
@ -173,7 +183,7 @@ func (b *CommitBatcher) batchWait(maxWait, slack time.Duration) <-chan time.Time
|
|||||||
wait = maxWait
|
wait = maxWait
|
||||||
}
|
}
|
||||||
|
|
||||||
return time.After(wait)
|
return wait
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *CommitBatcher) maybeStartBatch(notif bool) ([]sealiface.CommitBatchRes, error) {
|
func (b *CommitBatcher) maybeStartBatch(notif bool) ([]sealiface.CommitBatchRes, error) {
|
||||||
@ -196,7 +206,25 @@ func (b *CommitBatcher) maybeStartBatch(notif bool) ([]sealiface.CommitBatchRes,
|
|||||||
|
|
||||||
var res []sealiface.CommitBatchRes
|
var res []sealiface.CommitBatchRes
|
||||||
|
|
||||||
if total < cfg.MinCommitBatch || total < miner5.MinAggregatedSectors {
|
individual := (total < cfg.MinCommitBatch) || (total < miner5.MinAggregatedSectors)
|
||||||
|
|
||||||
|
if !individual && !cfg.AggregateAboveBaseFee.Equals(big.Zero()) {
|
||||||
|
tok, _, err := b.api.ChainHead(b.mctx)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
bf, err := b.api.ChainBaseFee(b.mctx, tok)
|
||||||
|
if err != nil {
|
||||||
|
return nil, xerrors.Errorf("couldn't get base fee: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if bf.LessThan(cfg.AggregateAboveBaseFee) {
|
||||||
|
individual = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if individual {
|
||||||
res, err = b.processIndividually()
|
res, err = b.processIndividually()
|
||||||
} else {
|
} else {
|
||||||
res, err = b.processBatch(cfg)
|
res, err = b.processBatch(cfg)
|
||||||
@ -232,7 +260,9 @@ func (b *CommitBatcher) processBatch(cfg sealiface.Config) ([]sealiface.CommitBa
|
|||||||
|
|
||||||
total := len(b.todo)
|
total := len(b.todo)
|
||||||
|
|
||||||
var res sealiface.CommitBatchRes
|
res := sealiface.CommitBatchRes{
|
||||||
|
FailedSectors: map[abi.SectorNumber]string{},
|
||||||
|
}
|
||||||
|
|
||||||
params := miner5.ProveCommitAggregateParams{
|
params := miner5.ProveCommitAggregateParams{
|
||||||
SectorNumbers: bitfield.New(),
|
SectorNumbers: bitfield.New(),
|
||||||
@ -347,6 +377,7 @@ func (b *CommitBatcher) processIndividually() ([]sealiface.CommitBatchRes, error
|
|||||||
for sn, info := range b.todo {
|
for sn, info := range b.todo {
|
||||||
r := sealiface.CommitBatchRes{
|
r := sealiface.CommitBatchRes{
|
||||||
Sectors: []abi.SectorNumber{sn},
|
Sectors: []abi.SectorNumber{sn},
|
||||||
|
FailedSectors: map[abi.SectorNumber]string{},
|
||||||
}
|
}
|
||||||
|
|
||||||
mcid, err := b.processSingle(mi, sn, info, tok)
|
mcid, err := b.processSingle(mi, sn, info, tok)
|
||||||
|
115
extern/storage-sealing/commit_batch_test.go
vendored
115
extern/storage-sealing/commit_batch_test.go
vendored
@ -20,6 +20,7 @@ import (
|
|||||||
|
|
||||||
"github.com/filecoin-project/lotus/api"
|
"github.com/filecoin-project/lotus/api"
|
||||||
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
|
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
|
||||||
|
"github.com/filecoin-project/lotus/chain/types"
|
||||||
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
|
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
|
||||||
sealing "github.com/filecoin-project/lotus/extern/storage-sealing"
|
sealing "github.com/filecoin-project/lotus/extern/storage-sealing"
|
||||||
"github.com/filecoin-project/lotus/extern/storage-sealing/mocks"
|
"github.com/filecoin-project/lotus/extern/storage-sealing/mocks"
|
||||||
@ -58,6 +59,8 @@ func TestCommitBatcher(t *testing.T) {
|
|||||||
CommitBatchWait: 24 * time.Hour,
|
CommitBatchWait: 24 * time.Hour,
|
||||||
CommitBatchSlack: 1 * time.Hour,
|
CommitBatchSlack: 1 * time.Hour,
|
||||||
|
|
||||||
|
AggregateAboveBaseFee: types.BigMul(types.PicoFil, types.NewInt(150)), // 0.15 nFIL
|
||||||
|
|
||||||
TerminateBatchMin: 1,
|
TerminateBatchMin: 1,
|
||||||
TerminateBatchMax: 100,
|
TerminateBatchMax: 100,
|
||||||
TerminateBatchWait: 5 * time.Minute,
|
TerminateBatchWait: 5 * time.Minute,
|
||||||
@ -143,7 +146,7 @@ func TestCommitBatcher(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
expectSend := func(expect []abi.SectorNumber) action {
|
expectSend := func(expect []abi.SectorNumber, aboveBalancer, failOnePCI bool) action {
|
||||||
return func(t *testing.T, s *mocks.MockCommitBatcherApi, pcb *sealing.CommitBatcher) promise {
|
return func(t *testing.T, s *mocks.MockCommitBatcherApi, pcb *sealing.CommitBatcher) promise {
|
||||||
s.EXPECT().StateMinerInfo(gomock.Any(), gomock.Any(), gomock.Any()).Return(miner.MinerInfo{Owner: t0123, Worker: t0123}, nil)
|
s.EXPECT().StateMinerInfo(gomock.Any(), gomock.Any(), gomock.Any()).Return(miner.MinerInfo{Owner: t0123, Worker: t0123}, nil)
|
||||||
|
|
||||||
@ -153,14 +156,40 @@ func TestCommitBatcher(t *testing.T) {
|
|||||||
batch = true
|
batch = true
|
||||||
ti = 1
|
ti = 1
|
||||||
}
|
}
|
||||||
|
|
||||||
|
basefee := types.PicoFil
|
||||||
|
if aboveBalancer {
|
||||||
|
basefee = types.NanoFil
|
||||||
|
}
|
||||||
|
|
||||||
|
if batch {
|
||||||
s.EXPECT().ChainHead(gomock.Any()).Return(nil, abi.ChainEpoch(1), nil)
|
s.EXPECT().ChainHead(gomock.Any()).Return(nil, abi.ChainEpoch(1), nil)
|
||||||
|
s.EXPECT().ChainBaseFee(gomock.Any(), gomock.Any()).Return(basefee, nil)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !aboveBalancer {
|
||||||
|
batch = false
|
||||||
|
ti = len(expect)
|
||||||
|
}
|
||||||
|
|
||||||
|
s.EXPECT().ChainHead(gomock.Any()).Return(nil, abi.ChainEpoch(1), nil)
|
||||||
|
|
||||||
|
pciC := len(expect)
|
||||||
|
if failOnePCI {
|
||||||
|
s.EXPECT().StateSectorPreCommitInfo(gomock.Any(), gomock.Any(), abi.SectorNumber(1), gomock.Any()).Return(nil, nil).Times(1) // not found
|
||||||
|
pciC = len(expect) - 1
|
||||||
|
if !batch {
|
||||||
|
ti--
|
||||||
|
}
|
||||||
|
}
|
||||||
s.EXPECT().StateSectorPreCommitInfo(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(&miner.SectorPreCommitOnChainInfo{
|
s.EXPECT().StateSectorPreCommitInfo(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(&miner.SectorPreCommitOnChainInfo{
|
||||||
PreCommitDeposit: big.Zero(),
|
PreCommitDeposit: big.Zero(),
|
||||||
}, nil).Times(len(expect))
|
}, nil).Times(pciC)
|
||||||
s.EXPECT().StateMinerInitialPledgeCollateral(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(big.Zero(), nil).Times(len(expect))
|
s.EXPECT().StateMinerInitialPledgeCollateral(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(big.Zero(), nil).Times(pciC)
|
||||||
|
|
||||||
if batch {
|
if batch {
|
||||||
s.EXPECT().StateNetworkVersion(gomock.Any(), gomock.Any()).Return(network.Version13, nil)
|
s.EXPECT().StateNetworkVersion(gomock.Any(), gomock.Any()).Return(network.Version13, nil)
|
||||||
s.EXPECT().ChainBaseFee(gomock.Any(), gomock.Any()).Return(big.NewInt(2000), nil)
|
s.EXPECT().ChainBaseFee(gomock.Any(), gomock.Any()).Return(basefee, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
s.EXPECT().SendMsg(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), funMatcher(func(i interface{}) bool {
|
s.EXPECT().SendMsg(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), funMatcher(func(i interface{}) bool {
|
||||||
@ -183,11 +212,11 @@ func TestCommitBatcher(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
flush := func(expect []abi.SectorNumber) action {
|
flush := func(expect []abi.SectorNumber, aboveBalancer, failOnePCI bool) action {
|
||||||
return func(t *testing.T, s *mocks.MockCommitBatcherApi, pcb *sealing.CommitBatcher) promise {
|
return func(t *testing.T, s *mocks.MockCommitBatcherApi, pcb *sealing.CommitBatcher) promise {
|
||||||
_ = expectSend(expect)(t, s, pcb)
|
_ = expectSend(expect, aboveBalancer, failOnePCI)(t, s, pcb)
|
||||||
|
|
||||||
batch := len(expect) >= minBatch
|
batch := len(expect) >= minBatch && aboveBalancer
|
||||||
|
|
||||||
r, err := pcb.Flush(ctx)
|
r, err := pcb.Flush(ctx)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
@ -198,6 +227,13 @@ func TestCommitBatcher(t *testing.T) {
|
|||||||
return r[0].Sectors[i] < r[0].Sectors[j]
|
return r[0].Sectors[i] < r[0].Sectors[j]
|
||||||
})
|
})
|
||||||
require.Equal(t, expect, r[0].Sectors)
|
require.Equal(t, expect, r[0].Sectors)
|
||||||
|
if !failOnePCI {
|
||||||
|
require.Len(t, r[0].FailedSectors, 0)
|
||||||
|
} else {
|
||||||
|
require.Len(t, r[0].FailedSectors, 1)
|
||||||
|
_, found := r[0].FailedSectors[1]
|
||||||
|
require.True(t, found)
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
require.Len(t, r, len(expect))
|
require.Len(t, r, len(expect))
|
||||||
for _, res := range r {
|
for _, res := range r {
|
||||||
@ -209,6 +245,13 @@ func TestCommitBatcher(t *testing.T) {
|
|||||||
})
|
})
|
||||||
for i, res := range r {
|
for i, res := range r {
|
||||||
require.Equal(t, abi.SectorNumber(i), res.Sectors[0])
|
require.Equal(t, abi.SectorNumber(i), res.Sectors[0])
|
||||||
|
if failOnePCI && res.Sectors[0] == 1 {
|
||||||
|
require.Len(t, res.FailedSectors, 1)
|
||||||
|
_, found := res.FailedSectors[1]
|
||||||
|
require.True(t, found)
|
||||||
|
} else {
|
||||||
|
require.Empty(t, res.FailedSectors)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -227,33 +270,75 @@ func TestCommitBatcher(t *testing.T) {
|
|||||||
tcs := map[string]struct {
|
tcs := map[string]struct {
|
||||||
actions []action
|
actions []action
|
||||||
}{
|
}{
|
||||||
"addSingle": {
|
"addSingle-aboveBalancer": {
|
||||||
actions: []action{
|
actions: []action{
|
||||||
addSector(0),
|
addSector(0),
|
||||||
waitPending(1),
|
waitPending(1),
|
||||||
flush([]abi.SectorNumber{0}),
|
flush([]abi.SectorNumber{0}, true, false),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
"addTwo": {
|
"addTwo-aboveBalancer": {
|
||||||
actions: []action{
|
actions: []action{
|
||||||
addSectors(getSectors(2)),
|
addSectors(getSectors(2)),
|
||||||
waitPending(2),
|
waitPending(2),
|
||||||
flush(getSectors(2)),
|
flush(getSectors(2), true, false),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
"addAte": {
|
"addAte-aboveBalancer": {
|
||||||
actions: []action{
|
actions: []action{
|
||||||
addSectors(getSectors(8)),
|
addSectors(getSectors(8)),
|
||||||
waitPending(8),
|
waitPending(8),
|
||||||
flush(getSectors(8)),
|
flush(getSectors(8), true, false),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
"addMax": {
|
"addMax-aboveBalancer": {
|
||||||
actions: []action{
|
actions: []action{
|
||||||
expectSend(getSectors(maxBatch)),
|
expectSend(getSectors(maxBatch), true, false),
|
||||||
addSectors(getSectors(maxBatch)),
|
addSectors(getSectors(maxBatch)),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
"addSingle-belowBalancer": {
|
||||||
|
actions: []action{
|
||||||
|
addSector(0),
|
||||||
|
waitPending(1),
|
||||||
|
flush([]abi.SectorNumber{0}, false, false),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"addTwo-belowBalancer": {
|
||||||
|
actions: []action{
|
||||||
|
addSectors(getSectors(2)),
|
||||||
|
waitPending(2),
|
||||||
|
flush(getSectors(2), false, false),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"addAte-belowBalancer": {
|
||||||
|
actions: []action{
|
||||||
|
addSectors(getSectors(8)),
|
||||||
|
waitPending(8),
|
||||||
|
flush(getSectors(8), false, false),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"addMax-belowBalancer": {
|
||||||
|
actions: []action{
|
||||||
|
expectSend(getSectors(maxBatch), false, false),
|
||||||
|
addSectors(getSectors(maxBatch)),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
|
||||||
|
"addAte-aboveBalancer-failOne": {
|
||||||
|
actions: []action{
|
||||||
|
addSectors(getSectors(8)),
|
||||||
|
waitPending(8),
|
||||||
|
flush(getSectors(8), true, true),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"addAte-belowBalancer-failOne": {
|
||||||
|
actions: []action{
|
||||||
|
addSectors(getSectors(8)),
|
||||||
|
waitPending(8),
|
||||||
|
flush(getSectors(8), false, true),
|
||||||
|
},
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
for name, tc := range tcs {
|
for name, tc := range tcs {
|
||||||
|
1
extern/storage-sealing/fsm.go
vendored
1
extern/storage-sealing/fsm.go
vendored
@ -115,6 +115,7 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto
|
|||||||
SubmitCommitAggregate: planOne(
|
SubmitCommitAggregate: planOne(
|
||||||
on(SectorCommitAggregateSent{}, CommitWait),
|
on(SectorCommitAggregateSent{}, CommitWait),
|
||||||
on(SectorCommitFailed{}, CommitFailed),
|
on(SectorCommitFailed{}, CommitFailed),
|
||||||
|
on(SectorRetrySubmitCommit{}, SubmitCommit),
|
||||||
),
|
),
|
||||||
CommitWait: planOne(
|
CommitWait: planOne(
|
||||||
on(SectorProving{}, FinalizeSector),
|
on(SectorProving{}, FinalizeSector),
|
||||||
|
22
extern/storage-sealing/precommit_batch.go
vendored
22
extern/storage-sealing/precommit_batch.go
vendored
@ -88,6 +88,7 @@ func (b *PreCommitBatcher) run() {
|
|||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
timer := time.NewTimer(b.batchWait(cfg.PreCommitBatchWait, cfg.PreCommitBatchSlack))
|
||||||
for {
|
for {
|
||||||
if forceRes != nil {
|
if forceRes != nil {
|
||||||
forceRes <- lastRes
|
forceRes <- lastRes
|
||||||
@ -102,7 +103,7 @@ func (b *PreCommitBatcher) run() {
|
|||||||
return
|
return
|
||||||
case <-b.notify:
|
case <-b.notify:
|
||||||
sendAboveMax = true
|
sendAboveMax = true
|
||||||
case <-b.batchWait(cfg.PreCommitBatchWait, cfg.PreCommitBatchSlack):
|
case <-timer.C:
|
||||||
// do nothing
|
// do nothing
|
||||||
case fr := <-b.force: // user triggered
|
case fr := <-b.force: // user triggered
|
||||||
forceRes = fr
|
forceRes = fr
|
||||||
@ -113,17 +114,26 @@ func (b *PreCommitBatcher) run() {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warnw("PreCommitBatcher processBatch error", "error", err)
|
log.Warnw("PreCommitBatcher processBatch error", "error", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if !timer.Stop() {
|
||||||
|
select {
|
||||||
|
case <-timer.C:
|
||||||
|
default:
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *PreCommitBatcher) batchWait(maxWait, slack time.Duration) <-chan time.Time {
|
timer.Reset(b.batchWait(cfg.PreCommitBatchWait, cfg.PreCommitBatchSlack))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *PreCommitBatcher) batchWait(maxWait, slack time.Duration) time.Duration {
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
|
|
||||||
b.lk.Lock()
|
b.lk.Lock()
|
||||||
defer b.lk.Unlock()
|
defer b.lk.Unlock()
|
||||||
|
|
||||||
if len(b.todo) == 0 {
|
if len(b.todo) == 0 {
|
||||||
return nil
|
return maxWait
|
||||||
}
|
}
|
||||||
|
|
||||||
var cutoff time.Time
|
var cutoff time.Time
|
||||||
@ -141,12 +151,12 @@ func (b *PreCommitBatcher) batchWait(maxWait, slack time.Duration) <-chan time.T
|
|||||||
}
|
}
|
||||||
|
|
||||||
if cutoff.IsZero() {
|
if cutoff.IsZero() {
|
||||||
return time.After(maxWait)
|
return maxWait
|
||||||
}
|
}
|
||||||
|
|
||||||
cutoff = cutoff.Add(-slack)
|
cutoff = cutoff.Add(-slack)
|
||||||
if cutoff.Before(now) {
|
if cutoff.Before(now) {
|
||||||
return time.After(time.Nanosecond) // can't return 0
|
return time.Nanosecond // can't return 0
|
||||||
}
|
}
|
||||||
|
|
||||||
wait := cutoff.Sub(now)
|
wait := cutoff.Sub(now)
|
||||||
@ -154,7 +164,7 @@ func (b *PreCommitBatcher) batchWait(maxWait, slack time.Duration) <-chan time.T
|
|||||||
wait = maxWait
|
wait = maxWait
|
||||||
}
|
}
|
||||||
|
|
||||||
return time.After(wait)
|
return wait
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *PreCommitBatcher) maybeStartBatch(notif bool) ([]sealiface.PreCommitBatchRes, error) {
|
func (b *PreCommitBatcher) maybeStartBatch(notif bool) ([]sealiface.PreCommitBatchRes, error) {
|
||||||
|
8
extern/storage-sealing/sealiface/config.go
vendored
8
extern/storage-sealing/sealiface/config.go
vendored
@ -1,6 +1,10 @@
|
|||||||
package sealiface
|
package sealiface
|
||||||
|
|
||||||
import "time"
|
import (
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/go-state-types/abi"
|
||||||
|
)
|
||||||
|
|
||||||
// this has to be in a separate package to not make lotus API depend on filecoin-ffi
|
// this has to be in a separate package to not make lotus API depend on filecoin-ffi
|
||||||
|
|
||||||
@ -31,6 +35,8 @@ type Config struct {
|
|||||||
CommitBatchWait time.Duration
|
CommitBatchWait time.Duration
|
||||||
CommitBatchSlack time.Duration
|
CommitBatchSlack time.Duration
|
||||||
|
|
||||||
|
AggregateAboveBaseFee abi.TokenAmount
|
||||||
|
|
||||||
TerminateBatchMax uint64
|
TerminateBatchMax uint64
|
||||||
TerminateBatchMin uint64
|
TerminateBatchMin uint64
|
||||||
TerminateBatchWait time.Duration
|
TerminateBatchWait time.Duration
|
||||||
|
33
extern/storage-sealing/states_failed.go
vendored
33
extern/storage-sealing/states_failed.go
vendored
@ -142,7 +142,7 @@ func (m *Sealing) handlePreCommitFailed(ctx statemachine.Context, sector SectorI
|
|||||||
}
|
}
|
||||||
|
|
||||||
if pci.Info.SealedCID != *sector.CommR {
|
if pci.Info.SealedCID != *sector.CommR {
|
||||||
log.Warnf("sector %d is precommitted on chain, with different CommR: %x != %x", sector.SectorNumber, pci.Info.SealedCID, sector.CommR)
|
log.Warnf("sector %d is precommitted on chain, with different CommR: %s != %s", sector.SectorNumber, pci.Info.SealedCID, sector.CommR)
|
||||||
return nil // TODO: remove when the actor allows re-precommit
|
return nil // TODO: remove when the actor allows re-precommit
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -182,7 +182,7 @@ func (m *Sealing) handleComputeProofFailed(ctx statemachine.Context, sector Sect
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (m *Sealing) handleCommitFailed(ctx statemachine.Context, sector SectorInfo) error {
|
func (m *Sealing) handleCommitFailed(ctx statemachine.Context, sector SectorInfo) error {
|
||||||
tok, height, err := m.api.ChainHead(ctx.Context())
|
tok, _, err := m.api.ChainHead(ctx.Context())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("handleCommitting: api error, not proceeding: %+v", err)
|
log.Errorf("handleCommitting: api error, not proceeding: %+v", err)
|
||||||
return nil
|
return nil
|
||||||
@ -216,33 +216,6 @@ func (m *Sealing) handleCommitFailed(ctx statemachine.Context, sector SectorInfo
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := checkPrecommit(ctx.Context(), m.maddr, sector, tok, height, m.api); err != nil {
|
|
||||||
switch err.(type) {
|
|
||||||
case *ErrApi:
|
|
||||||
log.Errorf("handleCommitFailed: api error, not proceeding: %+v", err)
|
|
||||||
return nil
|
|
||||||
case *ErrBadCommD:
|
|
||||||
return ctx.Send(SectorSealPreCommit1Failed{xerrors.Errorf("bad CommD error: %w", err)})
|
|
||||||
case *ErrExpiredTicket:
|
|
||||||
return ctx.Send(SectorTicketExpired{xerrors.Errorf("ticket expired error, removing sector: %w", err)})
|
|
||||||
case *ErrBadTicket:
|
|
||||||
return ctx.Send(SectorTicketExpired{xerrors.Errorf("expired ticket, removing sector: %w", err)})
|
|
||||||
case *ErrInvalidDeals:
|
|
||||||
log.Warnf("invalid deals in sector %d: %v", sector.SectorNumber, err)
|
|
||||||
return ctx.Send(SectorInvalidDealIDs{Return: RetCommitFailed})
|
|
||||||
case *ErrExpiredDeals:
|
|
||||||
return ctx.Send(SectorDealsExpired{xerrors.Errorf("sector deals expired: %w", err)})
|
|
||||||
case nil:
|
|
||||||
return ctx.Send(SectorChainPreCommitFailed{xerrors.Errorf("no precommit: %w", err)})
|
|
||||||
case *ErrPrecommitOnChain:
|
|
||||||
// noop, this is expected
|
|
||||||
case *ErrSectorNumberAllocated:
|
|
||||||
// noop, already committed?
|
|
||||||
default:
|
|
||||||
return xerrors.Errorf("checkPrecommit sanity check error (%T): %w", err, err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := m.checkCommit(ctx.Context(), sector, sector.Proof, tok); err != nil {
|
if err := m.checkCommit(ctx.Context(), sector, sector.Proof, tok); err != nil {
|
||||||
switch err.(type) {
|
switch err.(type) {
|
||||||
case *ErrApi:
|
case *ErrApi:
|
||||||
@ -381,7 +354,7 @@ func (m *Sealing) handleRecoverDealIDs(ctx statemachine.Context, sector SectorIn
|
|||||||
}
|
}
|
||||||
|
|
||||||
if proposal.PieceCID != p.Piece.PieceCID {
|
if proposal.PieceCID != p.Piece.PieceCID {
|
||||||
log.Warnf("piece %d (of %d) of sector %d refers deal %d with wrong PieceCID: %x != %x", i, len(sector.Pieces), sector.SectorNumber, p.DealInfo.DealID, p.Piece.PieceCID, proposal.PieceCID)
|
log.Warnf("piece %d (of %d) of sector %d refers deal %d with wrong PieceCID: %s != %s", i, len(sector.Pieces), sector.SectorNumber, p.DealInfo.DealID, p.Piece.PieceCID, proposal.PieceCID)
|
||||||
toFix = append(toFix, i)
|
toFix = append(toFix, i)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
87
extern/storage-sealing/states_sealing.go
vendored
87
extern/storage-sealing/states_sealing.go
vendored
@ -105,48 +105,66 @@ func checkTicketExpired(ticket, head abi.ChainEpoch) bool {
|
|||||||
return head-ticket > MaxTicketAge // TODO: allow configuring expected seal durations
|
return head-ticket > MaxTicketAge // TODO: allow configuring expected seal durations
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Sealing) getTicket(ctx statemachine.Context, sector SectorInfo) (abi.SealRandomness, abi.ChainEpoch, error) {
|
func checkProveCommitExpired(preCommitEpoch, msd abi.ChainEpoch, currEpoch abi.ChainEpoch) bool {
|
||||||
|
return currEpoch > preCommitEpoch+msd
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Sealing) getTicket(ctx statemachine.Context, sector SectorInfo) (abi.SealRandomness, abi.ChainEpoch, bool, error) {
|
||||||
tok, epoch, err := m.api.ChainHead(ctx.Context())
|
tok, epoch, err := m.api.ChainHead(ctx.Context())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("handlePreCommit1: api error, not proceeding: %+v", err)
|
log.Errorf("getTicket: api error, not proceeding: %+v", err)
|
||||||
return nil, 0, nil
|
return nil, 0, false, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// the reason why the StateMinerSectorAllocated function is placed here, if it is outside,
|
||||||
|
// if the MarshalCBOR function and StateSectorPreCommitInfo function return err, it will be executed
|
||||||
|
allocated, aerr := m.api.StateMinerSectorAllocated(ctx.Context(), m.maddr, sector.SectorNumber, nil)
|
||||||
|
if aerr != nil {
|
||||||
|
log.Errorf("getTicket: api error, checking if sector is allocated: %+v", aerr)
|
||||||
|
return nil, 0, false, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
ticketEpoch := epoch - policy.SealRandomnessLookback
|
ticketEpoch := epoch - policy.SealRandomnessLookback
|
||||||
buf := new(bytes.Buffer)
|
buf := new(bytes.Buffer)
|
||||||
if err := m.maddr.MarshalCBOR(buf); err != nil {
|
if err := m.maddr.MarshalCBOR(buf); err != nil {
|
||||||
return nil, 0, err
|
return nil, 0, allocated, err
|
||||||
}
|
}
|
||||||
|
|
||||||
pci, err := m.api.StateSectorPreCommitInfo(ctx.Context(), m.maddr, sector.SectorNumber, tok)
|
pci, err := m.api.StateSectorPreCommitInfo(ctx.Context(), m.maddr, sector.SectorNumber, tok)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, 0, xerrors.Errorf("getting precommit info: %w", err)
|
return nil, 0, allocated, xerrors.Errorf("getting precommit info: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if pci != nil {
|
if pci != nil {
|
||||||
ticketEpoch = pci.Info.SealRandEpoch
|
ticketEpoch = pci.Info.SealRandEpoch
|
||||||
|
|
||||||
if checkTicketExpired(ticketEpoch, epoch) {
|
nv, err := m.api.StateNetworkVersion(ctx.Context(), tok)
|
||||||
return nil, 0, xerrors.Errorf("ticket expired for precommitted sector")
|
if err != nil {
|
||||||
|
return nil, 0, allocated, xerrors.Errorf("getTicket: StateNetworkVersion: api error, not proceeding: %+v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
msd := policy.GetMaxProveCommitDuration(actors.VersionForNetwork(nv), sector.SectorType)
|
||||||
|
|
||||||
|
if checkProveCommitExpired(pci.PreCommitEpoch, msd, epoch) {
|
||||||
|
return nil, 0, allocated, xerrors.Errorf("ticket expired for precommitted sector")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if pci == nil && allocated { // allocated is true, sector precommitted but expired, will SectorCommitFailed or SectorRemove
|
||||||
|
return nil, 0, allocated, xerrors.Errorf("sector %s precommitted but expired", sector.SectorNumber)
|
||||||
}
|
}
|
||||||
|
|
||||||
rand, err := m.api.ChainGetRandomnessFromTickets(ctx.Context(), tok, crypto.DomainSeparationTag_SealRandomness, ticketEpoch, buf.Bytes())
|
rand, err := m.api.ChainGetRandomnessFromTickets(ctx.Context(), tok, crypto.DomainSeparationTag_SealRandomness, ticketEpoch, buf.Bytes())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, 0, err
|
return nil, 0, allocated, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return abi.SealRandomness(rand), ticketEpoch, nil
|
return abi.SealRandomness(rand), ticketEpoch, allocated, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Sealing) handleGetTicket(ctx statemachine.Context, sector SectorInfo) error {
|
func (m *Sealing) handleGetTicket(ctx statemachine.Context, sector SectorInfo) error {
|
||||||
ticketValue, ticketEpoch, err := m.getTicket(ctx, sector)
|
ticketValue, ticketEpoch, allocated, err := m.getTicket(ctx, sector)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
allocated, aerr := m.api.StateMinerSectorAllocated(ctx.Context(), m.maddr, sector.SectorNumber, nil)
|
|
||||||
if aerr != nil {
|
|
||||||
log.Errorf("error checking if sector is allocated: %+v", aerr)
|
|
||||||
}
|
|
||||||
|
|
||||||
if allocated {
|
if allocated {
|
||||||
if sector.CommitMessage != nil {
|
if sector.CommitMessage != nil {
|
||||||
// Some recovery paths with unfortunate timing lead here
|
// Some recovery paths with unfortunate timing lead here
|
||||||
@ -182,16 +200,37 @@ func (m *Sealing) handlePreCommit1(ctx statemachine.Context, sector SectorInfo)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
_, height, err := m.api.ChainHead(ctx.Context())
|
tok, height, err := m.api.ChainHead(ctx.Context())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("handlePreCommit1: api error, not proceeding: %+v", err)
|
log.Errorf("handlePreCommit1: api error, not proceeding: %+v", err)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if checkTicketExpired(sector.TicketEpoch, height) {
|
if checkTicketExpired(sector.TicketEpoch, height) {
|
||||||
|
pci, err := m.api.StateSectorPreCommitInfo(ctx.Context(), m.maddr, sector.SectorNumber, tok)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("handlePreCommit1: StateSectorPreCommitInfo: api error, not proceeding: %+v", err)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if pci == nil {
|
||||||
return ctx.Send(SectorOldTicket{}) // go get new ticket
|
return ctx.Send(SectorOldTicket{}) // go get new ticket
|
||||||
}
|
}
|
||||||
|
|
||||||
|
nv, err := m.api.StateNetworkVersion(ctx.Context(), tok)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("handlePreCommit1: StateNetworkVersion: api error, not proceeding: %+v", err)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
msd := policy.GetMaxProveCommitDuration(actors.VersionForNetwork(nv), sector.SectorType)
|
||||||
|
|
||||||
|
// if height > PreCommitEpoch + msd, there is no need to recalculate
|
||||||
|
if checkProveCommitExpired(pci.PreCommitEpoch, msd, height) {
|
||||||
|
return ctx.Send(SectorOldTicket{}) // will be removed
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pc1o, err := m.sealer.SealPreCommit1(sector.sealingCtx(ctx.Context()), m.minerSector(sector.SectorType, sector.SectorNumber), sector.TicketValue, sector.pieceInfos())
|
pc1o, err := m.sealer.SealPreCommit1(sector.sealingCtx(ctx.Context()), m.minerSector(sector.SectorType, sector.SectorNumber), sector.TicketValue, sector.pieceInfos())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return ctx.Send(SectorSealPreCommit1Failed{xerrors.Errorf("seal pre commit(1) failed: %w", err)})
|
return ctx.Send(SectorSealPreCommit1Failed{xerrors.Errorf("seal pre commit(1) failed: %w", err)})
|
||||||
@ -485,7 +524,7 @@ func (m *Sealing) handleCommitting(ctx statemachine.Context, sector SectorInfo)
|
|||||||
|
|
||||||
log.Info("scheduling seal proof computation...")
|
log.Info("scheduling seal proof computation...")
|
||||||
|
|
||||||
log.Infof("KOMIT %d %x(%d); %x(%d); %v; r:%x; d:%x", sector.SectorNumber, sector.TicketValue, sector.TicketEpoch, sector.SeedValue, sector.SeedEpoch, sector.pieceInfos(), sector.CommR, sector.CommD)
|
log.Infof("KOMIT %d %x(%d); %x(%d); %v; r:%s; d:%s", sector.SectorNumber, sector.TicketValue, sector.TicketEpoch, sector.SeedValue, sector.SeedEpoch, sector.pieceInfos(), sector.CommR, sector.CommD)
|
||||||
|
|
||||||
if sector.CommD == nil || sector.CommR == nil {
|
if sector.CommD == nil || sector.CommR == nil {
|
||||||
return ctx.Send(SectorCommitFailed{xerrors.Errorf("sector had nil commR or commD")})
|
return ctx.Send(SectorCommitFailed{xerrors.Errorf("sector had nil commR or commD")})
|
||||||
@ -624,11 +663,21 @@ func (m *Sealing) handleSubmitCommitAggregate(ctx statemachine.Context, sector S
|
|||||||
Spt: sector.SectorType,
|
Spt: sector.SectorType,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return ctx.Send(SectorCommitFailed{xerrors.Errorf("queuing commit for aggregation failed: %w", err)})
|
return ctx.Send(SectorRetrySubmitCommit{})
|
||||||
}
|
}
|
||||||
|
|
||||||
if res.Error != "" {
|
if res.Error != "" {
|
||||||
return ctx.Send(SectorCommitFailed{xerrors.Errorf("aggregate error: %s", res.Error)})
|
tok, _, err := m.api.ChainHead(ctx.Context())
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("handleSubmitCommit: api error, not proceeding: %+v", err)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := m.checkCommit(ctx.Context(), sector, sector.Proof, tok); err != nil {
|
||||||
|
return ctx.Send(SectorCommitFailed{xerrors.Errorf("commit check error: %w", err)})
|
||||||
|
}
|
||||||
|
|
||||||
|
return ctx.Send(SectorRetrySubmitCommit{})
|
||||||
}
|
}
|
||||||
|
|
||||||
if e, found := res.FailedSectors[sector.SectorNumber]; found {
|
if e, found := res.FailedSectors[sector.SectorNumber]; found {
|
||||||
|
2
go.mod
2
go.mod
@ -163,8 +163,6 @@ replace github.com/libp2p/go-libp2p-yamux => github.com/libp2p/go-libp2p-yamux v
|
|||||||
|
|
||||||
replace github.com/filecoin-project/lotus => ./
|
replace github.com/filecoin-project/lotus => ./
|
||||||
|
|
||||||
replace github.com/golangci/golangci-lint => github.com/golangci/golangci-lint v1.18.0
|
|
||||||
|
|
||||||
replace github.com/filecoin-project/filecoin-ffi => ./extern/filecoin-ffi
|
replace github.com/filecoin-project/filecoin-ffi => ./extern/filecoin-ffi
|
||||||
|
|
||||||
replace github.com/filecoin-project/test-vectors => ./extern/test-vectors
|
replace github.com/filecoin-project/test-vectors => ./extern/test-vectors
|
||||||
|
@ -21,7 +21,7 @@ func TestDealsWithSealingAndRPC(t *testing.T) {
|
|||||||
policy.SetPreCommitChallengeDelay(oldDelay)
|
policy.SetPreCommitChallengeDelay(oldDelay)
|
||||||
})
|
})
|
||||||
|
|
||||||
var blockTime = 1 * time.Second
|
var blockTime = 50 * time.Millisecond
|
||||||
|
|
||||||
client, miner, ens := kit.EnsembleMinimal(t, kit.ThroughRPC(), kit.WithAllSubsystems()) // no mock proofs.
|
client, miner, ens := kit.EnsembleMinimal(t, kit.ThroughRPC(), kit.WithAllSubsystems()) // no mock proofs.
|
||||||
ens.InterconnectAll().BeginMining(blockTime)
|
ens.InterconnectAll().BeginMining(blockTime)
|
||||||
|
@ -5,6 +5,7 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"math"
|
"math"
|
||||||
|
"net"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -270,7 +271,10 @@ func startNodes(
|
|||||||
handler, err := gateway.Handler(gwapi)
|
handler, err := gateway.Handler(gwapi)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
srv, _ := kit.CreateRPCServer(t, handler, nil)
|
l, err := net.Listen("tcp", "127.0.0.1:0")
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
srv, _ := kit.CreateRPCServer(t, handler, l)
|
||||||
|
|
||||||
// Create a gateway client API that connects to the gateway server
|
// Create a gateway client API that connects to the gateway server
|
||||||
var gapi api.Gateway
|
var gapi api.Gateway
|
||||||
|
@ -363,9 +363,6 @@ func (n *Ensemble) Start() *Ensemble {
|
|||||||
|
|
||||||
Method: power.Methods.CreateMiner,
|
Method: power.Methods.CreateMiner,
|
||||||
Params: params,
|
Params: params,
|
||||||
|
|
||||||
GasLimit: 0,
|
|
||||||
GasPremium: big.NewInt(5252),
|
|
||||||
}
|
}
|
||||||
signed, err := m.FullNode.FullNode.MpoolPushMessage(ctx, createStorageMinerMsg, nil)
|
signed, err := m.FullNode.FullNode.MpoolPushMessage(ctx, createStorageMinerMsg, nil)
|
||||||
require.NoError(n.t, err)
|
require.NoError(n.t, err)
|
||||||
|
@ -37,7 +37,6 @@ func EnsembleWithMinerAndMarketNodes(t *testing.T, opts ...interface{}) (*TestFu
|
|||||||
blockTime := 100 * time.Millisecond
|
blockTime := 100 * time.Millisecond
|
||||||
ens := NewEnsemble(t, eopts...).FullNode(&fullnode, nopts...).Miner(&main, &fullnode, mainNodeOpts...).Start()
|
ens := NewEnsemble(t, eopts...).FullNode(&fullnode, nopts...).Miner(&main, &fullnode, mainNodeOpts...).Start()
|
||||||
ens.BeginMining(blockTime)
|
ens.BeginMining(blockTime)
|
||||||
//ens.InterconnectAll().BeginMining(blockTime)
|
|
||||||
|
|
||||||
marketNodeOpts := []NodeOpt{OwnerAddr(fullnode.DefaultKey), MainMiner(&main), WithSubsystem(SStorageMarket)}
|
marketNodeOpts := []NodeOpt{OwnerAddr(fullnode.DefaultKey), MainMiner(&main), WithSubsystem(SStorageMarket)}
|
||||||
marketNodeOpts = append(marketNodeOpts, nopts...)
|
marketNodeOpts = append(marketNodeOpts, nopts...)
|
||||||
|
@ -2,23 +2,30 @@ package kit
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"io/ioutil"
|
||||||
"net"
|
"net"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
"strings"
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/google/uuid"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
"github.com/filecoin-project/go-address"
|
"github.com/filecoin-project/go-address"
|
||||||
"github.com/filecoin-project/go-state-types/abi"
|
"github.com/filecoin-project/go-state-types/abi"
|
||||||
"github.com/filecoin-project/lotus/api"
|
"github.com/filecoin-project/lotus/api"
|
||||||
"github.com/filecoin-project/lotus/build"
|
"github.com/filecoin-project/lotus/build"
|
||||||
"github.com/filecoin-project/lotus/chain/wallet"
|
"github.com/filecoin-project/lotus/chain/wallet"
|
||||||
|
"github.com/filecoin-project/lotus/extern/sector-storage/stores"
|
||||||
sealing "github.com/filecoin-project/lotus/extern/storage-sealing"
|
sealing "github.com/filecoin-project/lotus/extern/storage-sealing"
|
||||||
"github.com/filecoin-project/lotus/miner"
|
"github.com/filecoin-project/lotus/miner"
|
||||||
libp2pcrypto "github.com/libp2p/go-libp2p-core/crypto"
|
libp2pcrypto "github.com/libp2p/go-libp2p-core/crypto"
|
||||||
"github.com/libp2p/go-libp2p-core/peer"
|
"github.com/libp2p/go-libp2p-core/peer"
|
||||||
"github.com/multiformats/go-multiaddr"
|
"github.com/multiformats/go-multiaddr"
|
||||||
"github.com/stretchr/testify/require"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type MinerSubsystem int
|
type MinerSubsystem int
|
||||||
@ -151,3 +158,41 @@ func (tm *TestMiner) FlushSealingBatches(ctx context.Context) {
|
|||||||
fmt.Printf("COMMIT BATCH: %+v\n", cb)
|
fmt.Printf("COMMIT BATCH: %+v\n", cb)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const metaFile = "sectorstore.json"
|
||||||
|
|
||||||
|
func (tm *TestMiner) AddStorage(ctx context.Context, t *testing.T, weight uint64, seal, store bool) {
|
||||||
|
p, err := ioutil.TempDir("", "lotus-testsectors-")
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
if err := os.MkdirAll(p, 0755); err != nil {
|
||||||
|
if !os.IsExist(err) {
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = os.Stat(filepath.Join(p, metaFile))
|
||||||
|
if !os.IsNotExist(err) {
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
cfg := &stores.LocalStorageMeta{
|
||||||
|
ID: stores.ID(uuid.New().String()),
|
||||||
|
Weight: weight,
|
||||||
|
CanSeal: seal,
|
||||||
|
CanStore: store,
|
||||||
|
}
|
||||||
|
|
||||||
|
if !(cfg.CanStore || cfg.CanSeal) {
|
||||||
|
t.Fatal("must specify at least one of CanStore or cfg.CanSeal")
|
||||||
|
}
|
||||||
|
|
||||||
|
b, err := json.MarshalIndent(cfg, "", " ")
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
err = ioutil.WriteFile(filepath.Join(p, metaFile), b, 0644)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
err = tm.StorageAddLocal(ctx, p)
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
@ -16,10 +16,12 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
func CreateRPCServer(t *testing.T, handler http.Handler, listener net.Listener) (*httptest.Server, multiaddr.Multiaddr) {
|
func CreateRPCServer(t *testing.T, handler http.Handler, listener net.Listener) (*httptest.Server, multiaddr.Multiaddr) {
|
||||||
testServ := httptest.NewServer(handler)
|
testServ := &httptest.Server{
|
||||||
if listener != nil {
|
Listener: listener,
|
||||||
testServ.Listener = listener
|
Config: &http.Server{Handler: handler},
|
||||||
}
|
}
|
||||||
|
testServ.Start()
|
||||||
|
|
||||||
t.Cleanup(testServ.Close)
|
t.Cleanup(testServ.Close)
|
||||||
t.Cleanup(testServ.CloseClientConnections)
|
t.Cleanup(testServ.CloseClientConnections)
|
||||||
|
|
||||||
@ -33,7 +35,10 @@ func fullRpc(t *testing.T, f *TestFullNode) *TestFullNode {
|
|||||||
handler, err := node.FullNodeHandler(f.FullNode, false)
|
handler, err := node.FullNodeHandler(f.FullNode, false)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
srv, maddr := CreateRPCServer(t, handler, nil)
|
l, err := net.Listen("tcp", "127.0.0.1:0")
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
srv, maddr := CreateRPCServer(t, handler, l)
|
||||||
|
|
||||||
cl, stop, err := client.NewFullNodeRPCV1(context.Background(), "ws://"+srv.Listener.Addr().String()+"/rpc/v1", nil)
|
cl, stop, err := client.NewFullNodeRPCV1(context.Background(), "ws://"+srv.Listener.Addr().String()+"/rpc/v1", nil)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
57
itests/nonce_test.go
Normal file
57
itests/nonce_test.go
Normal file
@ -0,0 +1,57 @@
|
|||||||
|
package itests
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/go-state-types/big"
|
||||||
|
"github.com/filecoin-project/lotus/api"
|
||||||
|
"github.com/filecoin-project/lotus/chain/types"
|
||||||
|
"github.com/filecoin-project/lotus/itests/kit"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestNonceIncremental(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
kit.QuietMiningLogs()
|
||||||
|
|
||||||
|
client, _, ens := kit.EnsembleMinimal(t, kit.MockProofs())
|
||||||
|
ens.InterconnectAll().BeginMining(10 * time.Millisecond)
|
||||||
|
|
||||||
|
// create a new address where to send funds.
|
||||||
|
addr, err := client.WalletNew(ctx, types.KTBLS)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// get the existing balance from the default wallet to then split it.
|
||||||
|
bal, err := client.WalletBalance(ctx, client.DefaultKey.Address)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
const iterations = 100
|
||||||
|
|
||||||
|
// we'll send half our balance (saving the other half for gas),
|
||||||
|
// in `iterations` increments.
|
||||||
|
toSend := big.Div(bal, big.NewInt(2))
|
||||||
|
each := big.Div(toSend, big.NewInt(iterations))
|
||||||
|
|
||||||
|
var sms []*types.SignedMessage
|
||||||
|
for i := 0; i < iterations; i++ {
|
||||||
|
msg := &types.Message{
|
||||||
|
From: client.DefaultKey.Address,
|
||||||
|
To: addr,
|
||||||
|
Value: each,
|
||||||
|
}
|
||||||
|
|
||||||
|
sm, err := client.MpoolPushMessage(ctx, msg, nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.EqualValues(t, i, sm.Message.Nonce)
|
||||||
|
|
||||||
|
sms = append(sms, sm)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, sm := range sms {
|
||||||
|
_, err := client.StateWaitMsg(ctx, sm.Cid(), 3, api.LookbackNoLimit, true)
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
}
|
66
itests/sector_finalize_early_test.go
Normal file
66
itests/sector_finalize_early_test.go
Normal file
@ -0,0 +1,66 @@
|
|||||||
|
package itests
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/lotus/extern/storage-sealing/sealiface"
|
||||||
|
"github.com/filecoin-project/lotus/itests/kit"
|
||||||
|
"github.com/filecoin-project/lotus/node"
|
||||||
|
"github.com/filecoin-project/lotus/node/config"
|
||||||
|
"github.com/filecoin-project/lotus/node/modules"
|
||||||
|
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
||||||
|
"github.com/filecoin-project/lotus/node/repo"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestDealsWithFinalizeEarly(t *testing.T) {
|
||||||
|
if testing.Short() {
|
||||||
|
t.Skip("skipping test in short mode")
|
||||||
|
}
|
||||||
|
|
||||||
|
kit.QuietMiningLogs()
|
||||||
|
|
||||||
|
var blockTime = 50 * time.Millisecond
|
||||||
|
|
||||||
|
client, miner, ens := kit.EnsembleMinimal(t, kit.ThroughRPC(), kit.ConstructorOpts(
|
||||||
|
node.ApplyIf(node.IsType(repo.StorageMiner), node.Override(new(dtypes.GetSealingConfigFunc), func() (dtypes.GetSealingConfigFunc, error) {
|
||||||
|
return func() (sealiface.Config, error) {
|
||||||
|
cf := config.DefaultStorageMiner()
|
||||||
|
cf.Sealing.FinalizeEarly = true
|
||||||
|
return modules.ToSealingConfig(cf), nil
|
||||||
|
}, nil
|
||||||
|
})))) // no mock proofs.
|
||||||
|
ens.InterconnectAll().BeginMining(blockTime)
|
||||||
|
dh := kit.NewDealHarness(t, client, miner, miner)
|
||||||
|
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
miner.AddStorage(ctx, t, 1000000000, true, false)
|
||||||
|
miner.AddStorage(ctx, t, 1000000000, false, true)
|
||||||
|
|
||||||
|
sl, err := miner.StorageList(ctx)
|
||||||
|
require.NoError(t, err)
|
||||||
|
for si, d := range sl {
|
||||||
|
i, err := miner.StorageInfo(ctx, si)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
fmt.Printf("stor d:%d %+v\n", len(d), i)
|
||||||
|
}
|
||||||
|
|
||||||
|
t.Run("single", func(t *testing.T) {
|
||||||
|
dh.RunConcurrentDeals(kit.RunConcurrentDealsOpts{N: 1})
|
||||||
|
})
|
||||||
|
|
||||||
|
sl, err = miner.StorageList(ctx)
|
||||||
|
require.NoError(t, err)
|
||||||
|
for si, d := range sl {
|
||||||
|
i, err := miner.StorageInfo(ctx, si)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
fmt.Printf("stor d:%d %+v\n", len(d), i)
|
||||||
|
}
|
||||||
|
}
|
@ -8,6 +8,7 @@ import (
|
|||||||
"github.com/filecoin-project/lotus/api/v1api"
|
"github.com/filecoin-project/lotus/api/v1api"
|
||||||
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
||||||
"github.com/filecoin-project/lotus/storage/sectorblocks"
|
"github.com/filecoin-project/lotus/storage/sectorblocks"
|
||||||
|
"github.com/hashicorp/go-multierror"
|
||||||
"golang.org/x/xerrors"
|
"golang.org/x/xerrors"
|
||||||
|
|
||||||
"github.com/ipfs/go-cid"
|
"github.com/ipfs/go-cid"
|
||||||
@ -104,8 +105,7 @@ func (rpn *retrievalProviderNode) GetChainHead(ctx context.Context) (shared.TipS
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (rpn *retrievalProviderNode) IsUnsealed(ctx context.Context, sectorID abi.SectorNumber, offset abi.UnpaddedPieceSize, length abi.UnpaddedPieceSize) (bool, error) {
|
func (rpn *retrievalProviderNode) IsUnsealed(ctx context.Context, sectorID abi.SectorNumber, offset abi.UnpaddedPieceSize, length abi.UnpaddedPieceSize) (bool, error) {
|
||||||
//TODO(anteva): maybe true? show on chain info??
|
si, err := rpn.sectorsStatus(ctx, sectorID, true)
|
||||||
si, err := rpn.secb.SectorsStatus(ctx, sectorID, false)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, xerrors.Errorf("failed to get sector info: %w", err)
|
return false, xerrors.Errorf("failed to get sector info: %w", err)
|
||||||
}
|
}
|
||||||
@ -120,7 +120,7 @@ func (rpn *retrievalProviderNode) IsUnsealed(ctx context.Context, sectorID abi.S
|
|||||||
Miner: abi.ActorID(mid),
|
Miner: abi.ActorID(mid),
|
||||||
Number: sectorID,
|
Number: sectorID,
|
||||||
},
|
},
|
||||||
ProofType: si.SealProof, //TODO: confirm this is correct
|
ProofType: si.SealProof,
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Debugf("will call IsUnsealed now sector=%+v, offset=%d, size=%d", sectorID, offset, length)
|
log.Debugf("will call IsUnsealed now sector=%+v, offset=%d, size=%d", sectorID, offset, length)
|
||||||
@ -139,10 +139,14 @@ func (rpn *retrievalProviderNode) GetRetrievalPricingInput(ctx context.Context,
|
|||||||
}
|
}
|
||||||
tsk := head.Key()
|
tsk := head.Key()
|
||||||
|
|
||||||
|
var mErr error
|
||||||
|
|
||||||
for _, dealID := range storageDeals {
|
for _, dealID := range storageDeals {
|
||||||
ds, err := rpn.full.StateMarketStorageDeal(ctx, dealID, tsk)
|
ds, err := rpn.full.StateMarketStorageDeal(ctx, dealID, tsk)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return resp, xerrors.Errorf("failed to look up deal %d on chain: err=%w", dealID, err)
|
log.Warnf("failed to look up deal %d on chain: err=%w", dealID, err)
|
||||||
|
mErr = multierror.Append(mErr, err)
|
||||||
|
continue
|
||||||
}
|
}
|
||||||
if ds.Proposal.VerifiedDeal {
|
if ds.Proposal.VerifiedDeal {
|
||||||
resp.VerifiedDeal = true
|
resp.VerifiedDeal = true
|
||||||
@ -162,9 +166,13 @@ func (rpn *retrievalProviderNode) GetRetrievalPricingInput(ctx context.Context,
|
|||||||
// Note: The piece size can never actually be zero. We only use it to here
|
// Note: The piece size can never actually be zero. We only use it to here
|
||||||
// to assert that we didn't find a matching piece.
|
// to assert that we didn't find a matching piece.
|
||||||
if resp.PieceSize == 0 {
|
if resp.PieceSize == 0 {
|
||||||
|
if mErr == nil {
|
||||||
return resp, xerrors.New("failed to find matching piece")
|
return resp, xerrors.New("failed to find matching piece")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return resp, xerrors.Errorf("failed to fetch storage deal state: %w", mErr)
|
||||||
|
}
|
||||||
|
|
||||||
return resp, nil
|
return resp, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -66,6 +66,31 @@ func TestGetPricingInput(t *testing.T) {
|
|||||||
expectedErrorStr: "failed to find matching piece",
|
expectedErrorStr: "failed to find matching piece",
|
||||||
},
|
},
|
||||||
|
|
||||||
|
"error when fails to fetch deal state": {
|
||||||
|
fFnc: func(n *mocks.MockFullNode) {
|
||||||
|
out1 := &api.MarketDeal{
|
||||||
|
Proposal: market.DealProposal{
|
||||||
|
PieceCID: pcid,
|
||||||
|
PieceSize: paddedSize,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
out2 := &api.MarketDeal{
|
||||||
|
Proposal: market.DealProposal{
|
||||||
|
PieceCID: testnet.GenerateCids(1)[0],
|
||||||
|
VerifiedDeal: true,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
n.EXPECT().ChainHead(gomock.Any()).Return(tsk, nil).Times(1)
|
||||||
|
gomock.InOrder(
|
||||||
|
n.EXPECT().StateMarketStorageDeal(gomock.Any(), deals[0], key).Return(out1, xerrors.New("error 1")),
|
||||||
|
n.EXPECT().StateMarketStorageDeal(gomock.Any(), deals[1], key).Return(out2, xerrors.New("error 2")),
|
||||||
|
)
|
||||||
|
|
||||||
|
},
|
||||||
|
expectedErrorStr: "failed to fetch storage deal state",
|
||||||
|
},
|
||||||
|
|
||||||
"verified is true even if one deal is verified and we get the correct piecesize": {
|
"verified is true even if one deal is verified and we get the correct piecesize": {
|
||||||
fFnc: func(n *mocks.MockFullNode) {
|
fFnc: func(n *mocks.MockFullNode) {
|
||||||
out1 := &api.MarketDeal{
|
out1 := &api.MarketDeal{
|
||||||
@ -92,6 +117,32 @@ func TestGetPricingInput(t *testing.T) {
|
|||||||
expectedVerified: true,
|
expectedVerified: true,
|
||||||
},
|
},
|
||||||
|
|
||||||
|
"success even if one deal state fetch errors out but the other deal is verified and has the required piececid": {
|
||||||
|
fFnc: func(n *mocks.MockFullNode) {
|
||||||
|
out1 := &api.MarketDeal{
|
||||||
|
Proposal: market.DealProposal{
|
||||||
|
PieceCID: testnet.GenerateCids(1)[0],
|
||||||
|
},
|
||||||
|
}
|
||||||
|
out2 := &api.MarketDeal{
|
||||||
|
Proposal: market.DealProposal{
|
||||||
|
PieceCID: pcid,
|
||||||
|
PieceSize: paddedSize,
|
||||||
|
VerifiedDeal: true,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
n.EXPECT().ChainHead(gomock.Any()).Return(tsk, nil).Times(1)
|
||||||
|
gomock.InOrder(
|
||||||
|
n.EXPECT().StateMarketStorageDeal(gomock.Any(), deals[0], key).Return(out1, xerrors.New("some error")),
|
||||||
|
n.EXPECT().StateMarketStorageDeal(gomock.Any(), deals[1], key).Return(out2, nil),
|
||||||
|
)
|
||||||
|
|
||||||
|
},
|
||||||
|
expectedPieceSize: unpaddedSize,
|
||||||
|
expectedVerified: true,
|
||||||
|
},
|
||||||
|
|
||||||
"verified is false if both deals are unverified and we get the correct piece size": {
|
"verified is false if both deals are unverified and we get the correct piece size": {
|
||||||
fFnc: func(n *mocks.MockFullNode) {
|
fFnc: func(n *mocks.MockFullNode) {
|
||||||
out1 := &api.MarketDeal{
|
out1 := &api.MarketDeal{
|
||||||
|
@ -155,6 +155,10 @@ type SealingConfig struct {
|
|||||||
// time buffer for forceful batch submission before sectors/deals in batch would start expiring
|
// time buffer for forceful batch submission before sectors/deals in batch would start expiring
|
||||||
CommitBatchSlack Duration
|
CommitBatchSlack Duration
|
||||||
|
|
||||||
|
// network BaseFee below which to stop doing commit aggregation, instead
|
||||||
|
// submitting proofs to the chain individually
|
||||||
|
AggregateAboveBaseFee types.FIL
|
||||||
|
|
||||||
TerminateBatchMax uint64
|
TerminateBatchMax uint64
|
||||||
TerminateBatchMin uint64
|
TerminateBatchMin uint64
|
||||||
TerminateBatchWait Duration
|
TerminateBatchWait Duration
|
||||||
@ -341,6 +345,8 @@ func DefaultStorageMiner() *StorageMiner {
|
|||||||
CommitBatchWait: Duration(24 * time.Hour), // this can be up to 30 days
|
CommitBatchWait: Duration(24 * time.Hour), // this can be up to 30 days
|
||||||
CommitBatchSlack: Duration(1 * time.Hour), // time buffer for forceful batch submission before sectors/deals in batch would start expiring, higher value will lower the chances for message fail due to expiration
|
CommitBatchSlack: Duration(1 * time.Hour), // time buffer for forceful batch submission before sectors/deals in batch would start expiring, higher value will lower the chances for message fail due to expiration
|
||||||
|
|
||||||
|
AggregateAboveBaseFee: types.FIL(types.BigMul(types.PicoFil, types.NewInt(150))), // 0.15 nFIL
|
||||||
|
|
||||||
TerminateBatchMin: 1,
|
TerminateBatchMin: 1,
|
||||||
TerminateBatchMax: 100,
|
TerminateBatchMax: 100,
|
||||||
TerminateBatchWait: Duration(5 * time.Minute),
|
TerminateBatchWait: Duration(5 * time.Minute),
|
||||||
|
@ -705,7 +705,7 @@ func (a *StateAPI) StateChangedActors(ctx context.Context, old cid.Cid, new cid.
|
|||||||
return nil, xerrors.Errorf("failed to load new state tree: %w", err)
|
return nil, xerrors.Errorf("failed to load new state tree: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return state.Diff(oldTree, newTree)
|
return state.Diff(ctx, oldTree, newTree)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *StateAPI) StateMinerSectorCount(ctx context.Context, addr address.Address, tsk types.TipSetKey) (api.MinerSectors, error) {
|
func (a *StateAPI) StateMinerSectorCount(ctx context.Context, addr address.Address, tsk types.TipSetKey) (api.MinerSectors, error) {
|
||||||
|
@ -212,8 +212,6 @@ type StorageMinerParams struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func StorageMiner(fc config.MinerFeeConfig) func(params StorageMinerParams) (*storage.Miner, error) {
|
func StorageMiner(fc config.MinerFeeConfig) func(params StorageMinerParams) (*storage.Miner, error) {
|
||||||
fmt.Printf("setting up storage miner with %#v \n", fc)
|
|
||||||
|
|
||||||
return func(params StorageMinerParams) (*storage.Miner, error) {
|
return func(params StorageMinerParams) (*storage.Miner, error) {
|
||||||
var (
|
var (
|
||||||
ds = params.MetadataDS
|
ds = params.MetadataDS
|
||||||
@ -689,13 +687,13 @@ func RemoteStorage(lstor *stores.Local, si stores.SectorIndex, sa sectorstorage.
|
|||||||
return stores.NewRemote(lstor, si, http.Header(sa), sc.ParallelFetchLimit, &stores.DefaultPartialFileHandler{})
|
return stores.NewRemote(lstor, si, http.Header(sa), sc.ParallelFetchLimit, &stores.DefaultPartialFileHandler{})
|
||||||
}
|
}
|
||||||
|
|
||||||
func SectorStorage(mctx helpers.MetricsCtx, lc fx.Lifecycle, lstor *stores.Local, stor *stores.Remote, ls stores.LocalStorage, si stores.SectorIndex, sc sectorstorage.SealerConfig, sa sectorstorage.StorageAuth, ds dtypes.MetadataDS) (*sectorstorage.Manager, error) {
|
func SectorStorage(mctx helpers.MetricsCtx, lc fx.Lifecycle, lstor *stores.Local, stor *stores.Remote, ls stores.LocalStorage, si stores.SectorIndex, sc sectorstorage.SealerConfig, ds dtypes.MetadataDS) (*sectorstorage.Manager, error) {
|
||||||
ctx := helpers.LifecycleCtx(mctx, lc)
|
ctx := helpers.LifecycleCtx(mctx, lc)
|
||||||
|
|
||||||
wsts := statestore.New(namespace.Wrap(ds, WorkerCallsPrefix))
|
wsts := statestore.New(namespace.Wrap(ds, WorkerCallsPrefix))
|
||||||
smsts := statestore.New(namespace.Wrap(ds, ManagerWorkPrefix))
|
smsts := statestore.New(namespace.Wrap(ds, ManagerWorkPrefix))
|
||||||
|
|
||||||
sst, err := sectorstorage.New(ctx, lstor, stor, ls, si, sc, sa, wsts, smsts)
|
sst, err := sectorstorage.New(ctx, lstor, stor, ls, si, sc, wsts, smsts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -719,7 +717,6 @@ func StorageAuth(ctx helpers.MetricsCtx, ca v0api.Common) (sectorstorage.Storage
|
|||||||
}
|
}
|
||||||
|
|
||||||
func StorageAuthWithURL(url string) func(ctx helpers.MetricsCtx, ca v0api.Common) (sectorstorage.StorageAuth, error) {
|
func StorageAuthWithURL(url string) func(ctx helpers.MetricsCtx, ca v0api.Common) (sectorstorage.StorageAuth, error) {
|
||||||
log.Infow("Setting auth token based on URL", "url", url)
|
|
||||||
return func(ctx helpers.MetricsCtx, ca v0api.Common) (sectorstorage.StorageAuth, error) {
|
return func(ctx helpers.MetricsCtx, ca v0api.Common) (sectorstorage.StorageAuth, error) {
|
||||||
s := strings.Split(url, ":")
|
s := strings.Split(url, ":")
|
||||||
if len(s) != 2 {
|
if len(s) != 2 {
|
||||||
@ -878,6 +875,7 @@ func NewSetSealConfigFunc(r repo.LockedRepo) (dtypes.SetSealingConfigFunc, error
|
|||||||
MaxCommitBatch: cfg.MaxCommitBatch,
|
MaxCommitBatch: cfg.MaxCommitBatch,
|
||||||
CommitBatchWait: config.Duration(cfg.CommitBatchWait),
|
CommitBatchWait: config.Duration(cfg.CommitBatchWait),
|
||||||
CommitBatchSlack: config.Duration(cfg.CommitBatchSlack),
|
CommitBatchSlack: config.Duration(cfg.CommitBatchSlack),
|
||||||
|
AggregateAboveBaseFee: types.FIL(cfg.AggregateAboveBaseFee),
|
||||||
|
|
||||||
TerminateBatchMax: cfg.TerminateBatchMax,
|
TerminateBatchMax: cfg.TerminateBatchMax,
|
||||||
TerminateBatchMin: cfg.TerminateBatchMin,
|
TerminateBatchMin: cfg.TerminateBatchMin,
|
||||||
@ -888,10 +886,8 @@ func NewSetSealConfigFunc(r repo.LockedRepo) (dtypes.SetSealingConfigFunc, error
|
|||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewGetSealConfigFunc(r repo.LockedRepo) (dtypes.GetSealingConfigFunc, error) {
|
func ToSealingConfig(cfg *config.StorageMiner) sealiface.Config {
|
||||||
return func() (out sealiface.Config, err error) {
|
return sealiface.Config{
|
||||||
err = readCfg(r, func(cfg *config.StorageMiner) {
|
|
||||||
out = sealiface.Config{
|
|
||||||
MaxWaitDealsSectors: cfg.Sealing.MaxWaitDealsSectors,
|
MaxWaitDealsSectors: cfg.Sealing.MaxWaitDealsSectors,
|
||||||
MaxSealingSectors: cfg.Sealing.MaxSealingSectors,
|
MaxSealingSectors: cfg.Sealing.MaxSealingSectors,
|
||||||
MaxSealingSectorsForDeals: cfg.Sealing.MaxSealingSectorsForDeals,
|
MaxSealingSectorsForDeals: cfg.Sealing.MaxSealingSectorsForDeals,
|
||||||
@ -909,11 +905,18 @@ func NewGetSealConfigFunc(r repo.LockedRepo) (dtypes.GetSealingConfigFunc, error
|
|||||||
MaxCommitBatch: cfg.Sealing.MaxCommitBatch,
|
MaxCommitBatch: cfg.Sealing.MaxCommitBatch,
|
||||||
CommitBatchWait: time.Duration(cfg.Sealing.CommitBatchWait),
|
CommitBatchWait: time.Duration(cfg.Sealing.CommitBatchWait),
|
||||||
CommitBatchSlack: time.Duration(cfg.Sealing.CommitBatchSlack),
|
CommitBatchSlack: time.Duration(cfg.Sealing.CommitBatchSlack),
|
||||||
|
AggregateAboveBaseFee: types.BigInt(cfg.Sealing.AggregateAboveBaseFee),
|
||||||
|
|
||||||
TerminateBatchMax: cfg.Sealing.TerminateBatchMax,
|
TerminateBatchMax: cfg.Sealing.TerminateBatchMax,
|
||||||
TerminateBatchMin: cfg.Sealing.TerminateBatchMin,
|
TerminateBatchMin: cfg.Sealing.TerminateBatchMin,
|
||||||
TerminateBatchWait: time.Duration(cfg.Sealing.TerminateBatchWait),
|
TerminateBatchWait: time.Duration(cfg.Sealing.TerminateBatchWait),
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewGetSealConfigFunc(r repo.LockedRepo) (dtypes.GetSealingConfigFunc, error) {
|
||||||
|
return func() (out sealiface.Config, err error) {
|
||||||
|
err = readCfg(r, func(cfg *config.StorageMiner) {
|
||||||
|
out = ToSealingConfig(cfg)
|
||||||
})
|
})
|
||||||
return
|
return
|
||||||
}, nil
|
}, nil
|
||||||
|
@ -118,7 +118,6 @@ func MinerHandler(a api.StorageMiner, permissioned bool) (http.Handler, error) {
|
|||||||
mapi = api.PermissionedStorMinerAPI(mapi)
|
mapi = api.PermissionedStorMinerAPI(mapi)
|
||||||
}
|
}
|
||||||
|
|
||||||
//_, _ = a.ActorAddress(context.Background())
|
|
||||||
readerHandler, readerServerOpt := rpcenc.ReaderParamDecoder()
|
readerHandler, readerServerOpt := rpcenc.ReaderParamDecoder()
|
||||||
rpcServer := jsonrpc.NewServer(readerServerOpt)
|
rpcServer := jsonrpc.NewServer(readerServerOpt)
|
||||||
rpcServer.Register("Filecoin", mapi)
|
rpcServer.Register("Filecoin", mapi)
|
||||||
|
@ -47,7 +47,7 @@ func DsKeyToDealID(key datastore.Key) (uint64, error) {
|
|||||||
return dealID, nil
|
return dealID, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
type SectorBuilder interface { // todo: apify, make work remote
|
type SectorBuilder interface {
|
||||||
SectorAddPieceToAny(ctx context.Context, size abi.UnpaddedPieceSize, r storage.Data, d api.PieceDealInfo) (api.SectorOffset, error)
|
SectorAddPieceToAny(ctx context.Context, size abi.UnpaddedPieceSize, r storage.Data, d api.PieceDealInfo) (api.SectorOffset, error)
|
||||||
SectorsStatus(ctx context.Context, sid abi.SectorNumber, showOnChainInfo bool) (api.SectorInfo, error)
|
SectorsStatus(ctx context.Context, sid abi.SectorNumber, showOnChainInfo bool) (api.SectorInfo, error)
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user