diff --git a/.circleci/config.yml b/.circleci/config.yml index 9f8f28b86..9a74a523d 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -811,6 +811,11 @@ workflows: suite: itest-multisig target: "./itests/multisig_test.go" + - test: + name: test-itest-nonce + suite: itest-nonce + target: "./itests/nonce_test.go" + - test: name: test-itest-paych_api suite: itest-paych_api @@ -826,6 +831,11 @@ workflows: suite: itest-sdr_upgrade target: "./itests/sdr_upgrade_test.go" + - test: + name: test-itest-sector_finalize_early + suite: itest-sector_finalize_early + target: "./itests/sector_finalize_early_test.go" + - test: name: test-itest-sector_pledge suite: itest-sector_pledge diff --git a/chain/state/statetree.go b/chain/state/statetree.go index dbf150ecd..8705aeff8 100644 --- a/chain/state/statetree.go +++ b/chain/state/statetree.go @@ -547,7 +547,7 @@ func (st *StateTree) Version() types.StateTreeVersion { return st.version } -func Diff(oldTree, newTree *StateTree) (map[string]types.Actor, error) { +func Diff(ctx context.Context, oldTree, newTree *StateTree) (map[string]types.Actor, error) { out := map[string]types.Actor{} var ( @@ -555,33 +555,38 @@ func Diff(oldTree, newTree *StateTree) (map[string]types.Actor, error) { buf = bytes.NewReader(nil) ) if err := newTree.root.ForEach(&ncval, func(k string) error { - var act types.Actor + select { + case <-ctx.Done(): + return ctx.Err() + default: + var act types.Actor - addr, err := address.NewFromBytes([]byte(k)) - if err != nil { - return xerrors.Errorf("address in state tree was not valid: %w", err) + addr, err := address.NewFromBytes([]byte(k)) + if err != nil { + return xerrors.Errorf("address in state tree was not valid: %w", err) + } + + found, err := oldTree.root.Get(abi.AddrKey(addr), &ocval) + if err != nil { + return err + } + + if found && bytes.Equal(ocval.Raw, ncval.Raw) { + return nil // not changed + } + + buf.Reset(ncval.Raw) + err = act.UnmarshalCBOR(buf) + buf.Reset(nil) + + if err != nil { + return err + } + + out[addr.String()] = act + + return nil } - - found, err := oldTree.root.Get(abi.AddrKey(addr), &ocval) - if err != nil { - return err - } - - if found && bytes.Equal(ocval.Raw, ncval.Raw) { - return nil // not changed - } - - buf.Reset(ncval.Raw) - err = act.UnmarshalCBOR(buf) - buf.Reset(nil) - - if err != nil { - return err - } - - out[addr.String()] = act - - return nil }); err != nil { return nil, err } diff --git a/chain/types/fil.go b/chain/types/fil.go index 7438410c8..21125e6d6 100644 --- a/chain/types/fil.go +++ b/chain/types/fil.go @@ -23,6 +23,11 @@ func (f FIL) Unitless() string { return strings.TrimRight(strings.TrimRight(r.FloatString(18), "0"), ".") } +var AttoFil = NewInt(1) +var FemtoFil = BigMul(AttoFil, NewInt(1000)) +var PicoFil = BigMul(FemtoFil, NewInt(1000)) +var NanoFil = BigMul(PicoFil, NewInt(1000)) + var unitPrefixes = []string{"a", "f", "p", "n", "μ", "m"} func (f FIL) Short() string { diff --git a/cli/chain.go b/cli/chain.go index 0fca73406..e30a685dd 100644 --- a/cli/chain.go +++ b/cli/chain.go @@ -1030,7 +1030,9 @@ var ChainExportCmd = &cli.Command{ ArgsUsage: "[outputPath]", Flags: []cli.Flag{ &cli.StringFlag{ - Name: "tipset", + Name: "tipset", + Usage: "specify tipset to start the export from", + Value: "@head", }, &cli.Int64Flag{ Name: "recent-stateroots", diff --git a/cli/filplus.go b/cli/filplus.go index 53dc5092b..007071ea2 100644 --- a/cli/filplus.go +++ b/cli/filplus.go @@ -210,7 +210,7 @@ var filplusCheckClientCmd = &cli.Command{ return err } if dcap == nil { - return xerrors.Errorf("client %s is not a verified client", err) + return xerrors.Errorf("client %s is not a verified client", caddr) } fmt.Println(*dcap) diff --git a/cmd/lotus-seal-worker/storage.go b/cmd/lotus-seal-worker/storage.go index afb566166..be662a6c3 100644 --- a/cmd/lotus-seal-worker/storage.go +++ b/cmd/lotus-seal-worker/storage.go @@ -101,7 +101,7 @@ var storageAttachCmd = &cli.Command{ } if !(cfg.CanStore || cfg.CanSeal) { - return xerrors.Errorf("must specify at least one of --store of --seal") + return xerrors.Errorf("must specify at least one of --store or --seal") } b, err := json.MarshalIndent(cfg, "", " ") diff --git a/cmd/lotus-storage-miner/init.go b/cmd/lotus-storage-miner/init.go index 381232133..1cce52a41 100644 --- a/cmd/lotus-storage-miner/init.go +++ b/cmd/lotus-storage-miner/init.go @@ -470,7 +470,7 @@ func storageMinerInit(ctx context.Context, cctx *cli.Context, api v1api.FullNode AllowPreCommit2: true, AllowCommit: true, AllowUnseal: true, - }, sa, wsts, smsts) + }, wsts, smsts) if err != nil { return err } diff --git a/cmd/lotus-storage-miner/init_restore.go b/cmd/lotus-storage-miner/init_restore.go index 77d3bfc06..cc8979f18 100644 --- a/cmd/lotus-storage-miner/init_restore.go +++ b/cmd/lotus-storage-miner/init_restore.go @@ -31,7 +31,6 @@ import ( "github.com/filecoin-project/lotus/node/repo" ) -// ctx context.Context, api lapi.FullNode, addr address.Address, peerid peer.ID func restore(ctx context.Context, cctx *cli.Context, manageConfig func(*config.StorageMiner) error, after func(api lapi.FullNode, addr address.Address, peerid peer.ID, mi miner.MinerInfo) error) error { if cctx.Args().Len() != 1 { return xerrors.Errorf("expected 1 argument") diff --git a/cmd/lotus-storage-miner/run.go b/cmd/lotus-storage-miner/run.go index 35c2ab3dc..241d1b2c2 100644 --- a/cmd/lotus-storage-miner/run.go +++ b/cmd/lotus-storage-miner/run.go @@ -175,8 +175,6 @@ var runCmd = &cli.Command{ if err := minerapi.NetConnect(ctx, remoteAddrs); err != nil { return xerrors.Errorf("connecting to full node (libp2p): %w", err) } - } else { - log.Infof("No markets subsystem enabled, so no libp2p network bootstrapping") } log.Infof("Remote version %s", v) diff --git a/cmd/lotus-storage-miner/storage.go b/cmd/lotus-storage-miner/storage.go index b4ab26ad3..f2068ea86 100644 --- a/cmd/lotus-storage-miner/storage.go +++ b/cmd/lotus-storage-miner/storage.go @@ -145,7 +145,7 @@ over time } if !(cfg.CanStore || cfg.CanSeal) { - return xerrors.Errorf("must specify at least one of --store of --seal") + return xerrors.Errorf("must specify at least one of --store or --seal") } b, err := json.MarshalIndent(cfg, "", " ") diff --git a/documentation/en/cli-lotus.md b/documentation/en/cli-lotus.md index 41268f207..c4c63aab1 100644 --- a/documentation/en/cli-lotus.md +++ b/documentation/en/cli-lotus.md @@ -2140,7 +2140,7 @@ USAGE: lotus chain export [command options] [outputPath] OPTIONS: - --tipset value + --tipset value specify tipset to start the export from (default: "@head") --recent-stateroots value specify the number of recent state roots to include in the export (default: 0) --skip-old-msgs (default: false) --help, -h show help (default: false) diff --git a/extern/sector-storage/manager.go b/extern/sector-storage/manager.go index 87de597fd..a3af6eb7c 100644 --- a/extern/sector-storage/manager.go +++ b/extern/sector-storage/manager.go @@ -120,7 +120,7 @@ type StorageAuth http.Header type WorkerStateStore *statestore.StateStore type ManagerStateStore *statestore.StateStore -func New(ctx context.Context, lstor *stores.Local, stor *stores.Remote, ls stores.LocalStorage, si stores.SectorIndex, sc SealerConfig, sa StorageAuth, wss WorkerStateStore, mss ManagerStateStore) (*Manager, error) { +func New(ctx context.Context, lstor *stores.Local, stor *stores.Remote, ls stores.LocalStorage, si stores.SectorIndex, sc SealerConfig, wss WorkerStateStore, mss ManagerStateStore) (*Manager, error) { prover, err := ffiwrapper.New(&readonlyProvider{stor: lstor, index: si}) if err != nil { return nil, xerrors.Errorf("creating prover instance: %w", err) @@ -526,10 +526,25 @@ func (m *Manager) FinalizeSector(ctx context.Context, sector storage.SectorRef, } } + pathType := storiface.PathStorage + { + sealedStores, err := m.index.StorageFindSector(ctx, sector.ID, storiface.FTSealed, 0, false) + if err != nil { + return xerrors.Errorf("finding sealed sector: %w", err) + } + + for _, store := range sealedStores { + if store.CanSeal { + pathType = storiface.PathSealing + break + } + } + } + selector := newExistingSelector(m.index, sector.ID, storiface.FTCache|storiface.FTSealed, false) err := m.sched.Schedule(ctx, sector, sealtasks.TTFinalize, selector, - m.schedFetch(sector, storiface.FTCache|storiface.FTSealed|unsealed, storiface.PathSealing, storiface.AcquireMove), + m.schedFetch(sector, storiface.FTCache|storiface.FTSealed|unsealed, pathType, storiface.AcquireMove), func(ctx context.Context, w Worker) error { _, err := m.waitSimpleCall(ctx)(w.FinalizeSector(ctx, sector, keepUnsealed)) return err diff --git a/extern/sector-storage/piece_provider_test.go b/extern/sector-storage/piece_provider_test.go index e04853d75..d6fa14574 100644 --- a/extern/sector-storage/piece_provider_test.go +++ b/extern/sector-storage/piece_provider_test.go @@ -216,7 +216,7 @@ func newPieceProviderTestHarness(t *testing.T, mgrConfig SealerConfig, sectorPro wsts := statestore.New(namespace.Wrap(dstore, datastore.NewKey("/worker/calls"))) smsts := statestore.New(namespace.Wrap(dstore, datastore.NewKey("/stmgr/calls"))) - mgr, err := New(ctx, localStore, remoteStore, storage, index, mgrConfig, nil, wsts, smsts) + mgr, err := New(ctx, localStore, remoteStore, storage, index, mgrConfig, wsts, smsts) require.NoError(t, err) // start a http server on the manager to serve sector file requests. diff --git a/extern/sector-storage/stores/remote.go b/extern/sector-storage/stores/remote.go index c49dddbb4..6f8efc03e 100644 --- a/extern/sector-storage/stores/remote.go +++ b/extern/sector-storage/stores/remote.go @@ -53,7 +53,6 @@ func (r *Remote) RemoveCopies(ctx context.Context, s abi.SectorID, types storifa } func NewRemote(local Store, index SectorIndex, auth http.Header, fetchLimit int, pfHandler partialFileHandler) *Remote { - fmt.Printf("Creating NewRemote: %#v \n", auth) return &Remote{ local: local, index: index, diff --git a/extern/storage-sealing/checks.go b/extern/storage-sealing/checks.go index 5ee39e58f..5ba23026d 100644 --- a/extern/storage-sealing/checks.go +++ b/extern/storage-sealing/checks.go @@ -62,7 +62,7 @@ func checkPieces(ctx context.Context, maddr address.Address, si SectorInfo, api } if proposal.PieceCID != p.Piece.PieceCID { - return &ErrInvalidDeals{xerrors.Errorf("piece %d (of %d) of sector %d refers deal %d with wrong PieceCID: %x != %x", i, len(si.Pieces), si.SectorNumber, p.DealInfo.DealID, p.Piece.PieceCID, proposal.PieceCID)} + return &ErrInvalidDeals{xerrors.Errorf("piece %d (of %d) of sector %d refers deal %d with wrong PieceCID: %s != %s", i, len(si.Pieces), si.SectorNumber, p.DealInfo.DealID, p.Piece.PieceCID, proposal.PieceCID)} } if p.Piece.Size != proposal.PieceSize { diff --git a/extern/storage-sealing/commit_batch.go b/extern/storage-sealing/commit_batch.go index ea246136d..63bd3c7db 100644 --- a/extern/storage-sealing/commit_batch.go +++ b/extern/storage-sealing/commit_batch.go @@ -106,6 +106,7 @@ func (b *CommitBatcher) run() { panic(err) } + timer := time.NewTimer(b.batchWait(cfg.CommitBatchWait, cfg.CommitBatchSlack)) for { if forceRes != nil { forceRes <- lastMsg @@ -121,7 +122,7 @@ func (b *CommitBatcher) run() { return case <-b.notify: sendAboveMax = true - case <-b.batchWait(cfg.CommitBatchWait, cfg.CommitBatchSlack): + case <-timer.C: // do nothing case fr := <-b.force: // user triggered forceRes = fr @@ -132,17 +133,26 @@ func (b *CommitBatcher) run() { if err != nil { log.Warnw("CommitBatcher processBatch error", "error", err) } + + if !timer.Stop() { + select { + case <-timer.C: + default: + } + } + + timer.Reset(b.batchWait(cfg.CommitBatchWait, cfg.CommitBatchSlack)) } } -func (b *CommitBatcher) batchWait(maxWait, slack time.Duration) <-chan time.Time { +func (b *CommitBatcher) batchWait(maxWait, slack time.Duration) time.Duration { now := time.Now() b.lk.Lock() defer b.lk.Unlock() if len(b.todo) == 0 { - return nil + return maxWait } var cutoff time.Time @@ -160,12 +170,12 @@ func (b *CommitBatcher) batchWait(maxWait, slack time.Duration) <-chan time.Time } if cutoff.IsZero() { - return time.After(maxWait) + return maxWait } cutoff = cutoff.Add(-slack) if cutoff.Before(now) { - return time.After(time.Nanosecond) // can't return 0 + return time.Nanosecond // can't return 0 } wait := cutoff.Sub(now) @@ -173,7 +183,7 @@ func (b *CommitBatcher) batchWait(maxWait, slack time.Duration) <-chan time.Time wait = maxWait } - return time.After(wait) + return wait } func (b *CommitBatcher) maybeStartBatch(notif bool) ([]sealiface.CommitBatchRes, error) { @@ -196,7 +206,25 @@ func (b *CommitBatcher) maybeStartBatch(notif bool) ([]sealiface.CommitBatchRes, var res []sealiface.CommitBatchRes - if total < cfg.MinCommitBatch || total < miner5.MinAggregatedSectors { + individual := (total < cfg.MinCommitBatch) || (total < miner5.MinAggregatedSectors) + + if !individual && !cfg.AggregateAboveBaseFee.Equals(big.Zero()) { + tok, _, err := b.api.ChainHead(b.mctx) + if err != nil { + return nil, err + } + + bf, err := b.api.ChainBaseFee(b.mctx, tok) + if err != nil { + return nil, xerrors.Errorf("couldn't get base fee: %w", err) + } + + if bf.LessThan(cfg.AggregateAboveBaseFee) { + individual = true + } + } + + if individual { res, err = b.processIndividually() } else { res, err = b.processBatch(cfg) @@ -232,7 +260,9 @@ func (b *CommitBatcher) processBatch(cfg sealiface.Config) ([]sealiface.CommitBa total := len(b.todo) - var res sealiface.CommitBatchRes + res := sealiface.CommitBatchRes{ + FailedSectors: map[abi.SectorNumber]string{}, + } params := miner5.ProveCommitAggregateParams{ SectorNumbers: bitfield.New(), @@ -346,7 +376,8 @@ func (b *CommitBatcher) processIndividually() ([]sealiface.CommitBatchRes, error for sn, info := range b.todo { r := sealiface.CommitBatchRes{ - Sectors: []abi.SectorNumber{sn}, + Sectors: []abi.SectorNumber{sn}, + FailedSectors: map[abi.SectorNumber]string{}, } mcid, err := b.processSingle(mi, sn, info, tok) diff --git a/extern/storage-sealing/commit_batch_test.go b/extern/storage-sealing/commit_batch_test.go index ad2bc8f6f..aea6d455e 100644 --- a/extern/storage-sealing/commit_batch_test.go +++ b/extern/storage-sealing/commit_batch_test.go @@ -20,6 +20,7 @@ import ( "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/chain/actors/builtin/miner" + "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper" sealing "github.com/filecoin-project/lotus/extern/storage-sealing" "github.com/filecoin-project/lotus/extern/storage-sealing/mocks" @@ -58,6 +59,8 @@ func TestCommitBatcher(t *testing.T) { CommitBatchWait: 24 * time.Hour, CommitBatchSlack: 1 * time.Hour, + AggregateAboveBaseFee: types.BigMul(types.PicoFil, types.NewInt(150)), // 0.15 nFIL + TerminateBatchMin: 1, TerminateBatchMax: 100, TerminateBatchWait: 5 * time.Minute, @@ -143,7 +146,7 @@ func TestCommitBatcher(t *testing.T) { } } - expectSend := func(expect []abi.SectorNumber) action { + expectSend := func(expect []abi.SectorNumber, aboveBalancer, failOnePCI bool) action { return func(t *testing.T, s *mocks.MockCommitBatcherApi, pcb *sealing.CommitBatcher) promise { s.EXPECT().StateMinerInfo(gomock.Any(), gomock.Any(), gomock.Any()).Return(miner.MinerInfo{Owner: t0123, Worker: t0123}, nil) @@ -153,14 +156,40 @@ func TestCommitBatcher(t *testing.T) { batch = true ti = 1 } + + basefee := types.PicoFil + if aboveBalancer { + basefee = types.NanoFil + } + + if batch { + s.EXPECT().ChainHead(gomock.Any()).Return(nil, abi.ChainEpoch(1), nil) + s.EXPECT().ChainBaseFee(gomock.Any(), gomock.Any()).Return(basefee, nil) + } + + if !aboveBalancer { + batch = false + ti = len(expect) + } + s.EXPECT().ChainHead(gomock.Any()).Return(nil, abi.ChainEpoch(1), nil) + + pciC := len(expect) + if failOnePCI { + s.EXPECT().StateSectorPreCommitInfo(gomock.Any(), gomock.Any(), abi.SectorNumber(1), gomock.Any()).Return(nil, nil).Times(1) // not found + pciC = len(expect) - 1 + if !batch { + ti-- + } + } s.EXPECT().StateSectorPreCommitInfo(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(&miner.SectorPreCommitOnChainInfo{ PreCommitDeposit: big.Zero(), - }, nil).Times(len(expect)) - s.EXPECT().StateMinerInitialPledgeCollateral(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(big.Zero(), nil).Times(len(expect)) + }, nil).Times(pciC) + s.EXPECT().StateMinerInitialPledgeCollateral(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(big.Zero(), nil).Times(pciC) + if batch { s.EXPECT().StateNetworkVersion(gomock.Any(), gomock.Any()).Return(network.Version13, nil) - s.EXPECT().ChainBaseFee(gomock.Any(), gomock.Any()).Return(big.NewInt(2000), nil) + s.EXPECT().ChainBaseFee(gomock.Any(), gomock.Any()).Return(basefee, nil) } s.EXPECT().SendMsg(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), funMatcher(func(i interface{}) bool { @@ -183,11 +212,11 @@ func TestCommitBatcher(t *testing.T) { } } - flush := func(expect []abi.SectorNumber) action { + flush := func(expect []abi.SectorNumber, aboveBalancer, failOnePCI bool) action { return func(t *testing.T, s *mocks.MockCommitBatcherApi, pcb *sealing.CommitBatcher) promise { - _ = expectSend(expect)(t, s, pcb) + _ = expectSend(expect, aboveBalancer, failOnePCI)(t, s, pcb) - batch := len(expect) >= minBatch + batch := len(expect) >= minBatch && aboveBalancer r, err := pcb.Flush(ctx) require.NoError(t, err) @@ -198,6 +227,13 @@ func TestCommitBatcher(t *testing.T) { return r[0].Sectors[i] < r[0].Sectors[j] }) require.Equal(t, expect, r[0].Sectors) + if !failOnePCI { + require.Len(t, r[0].FailedSectors, 0) + } else { + require.Len(t, r[0].FailedSectors, 1) + _, found := r[0].FailedSectors[1] + require.True(t, found) + } } else { require.Len(t, r, len(expect)) for _, res := range r { @@ -209,6 +245,13 @@ func TestCommitBatcher(t *testing.T) { }) for i, res := range r { require.Equal(t, abi.SectorNumber(i), res.Sectors[0]) + if failOnePCI && res.Sectors[0] == 1 { + require.Len(t, res.FailedSectors, 1) + _, found := res.FailedSectors[1] + require.True(t, found) + } else { + require.Empty(t, res.FailedSectors) + } } } @@ -227,33 +270,75 @@ func TestCommitBatcher(t *testing.T) { tcs := map[string]struct { actions []action }{ - "addSingle": { + "addSingle-aboveBalancer": { actions: []action{ addSector(0), waitPending(1), - flush([]abi.SectorNumber{0}), + flush([]abi.SectorNumber{0}, true, false), }, }, - "addTwo": { + "addTwo-aboveBalancer": { actions: []action{ addSectors(getSectors(2)), waitPending(2), - flush(getSectors(2)), + flush(getSectors(2), true, false), }, }, - "addAte": { + "addAte-aboveBalancer": { actions: []action{ addSectors(getSectors(8)), waitPending(8), - flush(getSectors(8)), + flush(getSectors(8), true, false), }, }, - "addMax": { + "addMax-aboveBalancer": { actions: []action{ - expectSend(getSectors(maxBatch)), + expectSend(getSectors(maxBatch), true, false), addSectors(getSectors(maxBatch)), }, }, + "addSingle-belowBalancer": { + actions: []action{ + addSector(0), + waitPending(1), + flush([]abi.SectorNumber{0}, false, false), + }, + }, + "addTwo-belowBalancer": { + actions: []action{ + addSectors(getSectors(2)), + waitPending(2), + flush(getSectors(2), false, false), + }, + }, + "addAte-belowBalancer": { + actions: []action{ + addSectors(getSectors(8)), + waitPending(8), + flush(getSectors(8), false, false), + }, + }, + "addMax-belowBalancer": { + actions: []action{ + expectSend(getSectors(maxBatch), false, false), + addSectors(getSectors(maxBatch)), + }, + }, + + "addAte-aboveBalancer-failOne": { + actions: []action{ + addSectors(getSectors(8)), + waitPending(8), + flush(getSectors(8), true, true), + }, + }, + "addAte-belowBalancer-failOne": { + actions: []action{ + addSectors(getSectors(8)), + waitPending(8), + flush(getSectors(8), false, true), + }, + }, } for name, tc := range tcs { diff --git a/extern/storage-sealing/fsm.go b/extern/storage-sealing/fsm.go index d3e1e9d52..d04aef790 100644 --- a/extern/storage-sealing/fsm.go +++ b/extern/storage-sealing/fsm.go @@ -115,6 +115,7 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto SubmitCommitAggregate: planOne( on(SectorCommitAggregateSent{}, CommitWait), on(SectorCommitFailed{}, CommitFailed), + on(SectorRetrySubmitCommit{}, SubmitCommit), ), CommitWait: planOne( on(SectorProving{}, FinalizeSector), diff --git a/extern/storage-sealing/precommit_batch.go b/extern/storage-sealing/precommit_batch.go index 3dc3510c2..8b132a2eb 100644 --- a/extern/storage-sealing/precommit_batch.go +++ b/extern/storage-sealing/precommit_batch.go @@ -88,6 +88,7 @@ func (b *PreCommitBatcher) run() { panic(err) } + timer := time.NewTimer(b.batchWait(cfg.PreCommitBatchWait, cfg.PreCommitBatchSlack)) for { if forceRes != nil { forceRes <- lastRes @@ -102,7 +103,7 @@ func (b *PreCommitBatcher) run() { return case <-b.notify: sendAboveMax = true - case <-b.batchWait(cfg.PreCommitBatchWait, cfg.PreCommitBatchSlack): + case <-timer.C: // do nothing case fr := <-b.force: // user triggered forceRes = fr @@ -113,17 +114,26 @@ func (b *PreCommitBatcher) run() { if err != nil { log.Warnw("PreCommitBatcher processBatch error", "error", err) } + + if !timer.Stop() { + select { + case <-timer.C: + default: + } + } + + timer.Reset(b.batchWait(cfg.PreCommitBatchWait, cfg.PreCommitBatchSlack)) } } -func (b *PreCommitBatcher) batchWait(maxWait, slack time.Duration) <-chan time.Time { +func (b *PreCommitBatcher) batchWait(maxWait, slack time.Duration) time.Duration { now := time.Now() b.lk.Lock() defer b.lk.Unlock() if len(b.todo) == 0 { - return nil + return maxWait } var cutoff time.Time @@ -141,12 +151,12 @@ func (b *PreCommitBatcher) batchWait(maxWait, slack time.Duration) <-chan time.T } if cutoff.IsZero() { - return time.After(maxWait) + return maxWait } cutoff = cutoff.Add(-slack) if cutoff.Before(now) { - return time.After(time.Nanosecond) // can't return 0 + return time.Nanosecond // can't return 0 } wait := cutoff.Sub(now) @@ -154,7 +164,7 @@ func (b *PreCommitBatcher) batchWait(maxWait, slack time.Duration) <-chan time.T wait = maxWait } - return time.After(wait) + return wait } func (b *PreCommitBatcher) maybeStartBatch(notif bool) ([]sealiface.PreCommitBatchRes, error) { diff --git a/extern/storage-sealing/sealiface/config.go b/extern/storage-sealing/sealiface/config.go index b237072d3..0410b92c0 100644 --- a/extern/storage-sealing/sealiface/config.go +++ b/extern/storage-sealing/sealiface/config.go @@ -1,6 +1,10 @@ package sealiface -import "time" +import ( + "time" + + "github.com/filecoin-project/go-state-types/abi" +) // this has to be in a separate package to not make lotus API depend on filecoin-ffi @@ -31,6 +35,8 @@ type Config struct { CommitBatchWait time.Duration CommitBatchSlack time.Duration + AggregateAboveBaseFee abi.TokenAmount + TerminateBatchMax uint64 TerminateBatchMin uint64 TerminateBatchWait time.Duration diff --git a/extern/storage-sealing/states_failed.go b/extern/storage-sealing/states_failed.go index 7bef19b92..bd5f489b4 100644 --- a/extern/storage-sealing/states_failed.go +++ b/extern/storage-sealing/states_failed.go @@ -142,7 +142,7 @@ func (m *Sealing) handlePreCommitFailed(ctx statemachine.Context, sector SectorI } if pci.Info.SealedCID != *sector.CommR { - log.Warnf("sector %d is precommitted on chain, with different CommR: %x != %x", sector.SectorNumber, pci.Info.SealedCID, sector.CommR) + log.Warnf("sector %d is precommitted on chain, with different CommR: %s != %s", sector.SectorNumber, pci.Info.SealedCID, sector.CommR) return nil // TODO: remove when the actor allows re-precommit } @@ -182,7 +182,7 @@ func (m *Sealing) handleComputeProofFailed(ctx statemachine.Context, sector Sect } func (m *Sealing) handleCommitFailed(ctx statemachine.Context, sector SectorInfo) error { - tok, height, err := m.api.ChainHead(ctx.Context()) + tok, _, err := m.api.ChainHead(ctx.Context()) if err != nil { log.Errorf("handleCommitting: api error, not proceeding: %+v", err) return nil @@ -216,33 +216,6 @@ func (m *Sealing) handleCommitFailed(ctx statemachine.Context, sector SectorInfo } } - if err := checkPrecommit(ctx.Context(), m.maddr, sector, tok, height, m.api); err != nil { - switch err.(type) { - case *ErrApi: - log.Errorf("handleCommitFailed: api error, not proceeding: %+v", err) - return nil - case *ErrBadCommD: - return ctx.Send(SectorSealPreCommit1Failed{xerrors.Errorf("bad CommD error: %w", err)}) - case *ErrExpiredTicket: - return ctx.Send(SectorTicketExpired{xerrors.Errorf("ticket expired error, removing sector: %w", err)}) - case *ErrBadTicket: - return ctx.Send(SectorTicketExpired{xerrors.Errorf("expired ticket, removing sector: %w", err)}) - case *ErrInvalidDeals: - log.Warnf("invalid deals in sector %d: %v", sector.SectorNumber, err) - return ctx.Send(SectorInvalidDealIDs{Return: RetCommitFailed}) - case *ErrExpiredDeals: - return ctx.Send(SectorDealsExpired{xerrors.Errorf("sector deals expired: %w", err)}) - case nil: - return ctx.Send(SectorChainPreCommitFailed{xerrors.Errorf("no precommit: %w", err)}) - case *ErrPrecommitOnChain: - // noop, this is expected - case *ErrSectorNumberAllocated: - // noop, already committed? - default: - return xerrors.Errorf("checkPrecommit sanity check error (%T): %w", err, err) - } - } - if err := m.checkCommit(ctx.Context(), sector, sector.Proof, tok); err != nil { switch err.(type) { case *ErrApi: @@ -381,7 +354,7 @@ func (m *Sealing) handleRecoverDealIDs(ctx statemachine.Context, sector SectorIn } if proposal.PieceCID != p.Piece.PieceCID { - log.Warnf("piece %d (of %d) of sector %d refers deal %d with wrong PieceCID: %x != %x", i, len(sector.Pieces), sector.SectorNumber, p.DealInfo.DealID, p.Piece.PieceCID, proposal.PieceCID) + log.Warnf("piece %d (of %d) of sector %d refers deal %d with wrong PieceCID: %s != %s", i, len(sector.Pieces), sector.SectorNumber, p.DealInfo.DealID, p.Piece.PieceCID, proposal.PieceCID) toFix = append(toFix, i) continue } diff --git a/extern/storage-sealing/states_sealing.go b/extern/storage-sealing/states_sealing.go index 24d5fe3f7..730b11072 100644 --- a/extern/storage-sealing/states_sealing.go +++ b/extern/storage-sealing/states_sealing.go @@ -105,48 +105,66 @@ func checkTicketExpired(ticket, head abi.ChainEpoch) bool { return head-ticket > MaxTicketAge // TODO: allow configuring expected seal durations } -func (m *Sealing) getTicket(ctx statemachine.Context, sector SectorInfo) (abi.SealRandomness, abi.ChainEpoch, error) { +func checkProveCommitExpired(preCommitEpoch, msd abi.ChainEpoch, currEpoch abi.ChainEpoch) bool { + return currEpoch > preCommitEpoch+msd +} + +func (m *Sealing) getTicket(ctx statemachine.Context, sector SectorInfo) (abi.SealRandomness, abi.ChainEpoch, bool, error) { tok, epoch, err := m.api.ChainHead(ctx.Context()) if err != nil { - log.Errorf("handlePreCommit1: api error, not proceeding: %+v", err) - return nil, 0, nil + log.Errorf("getTicket: api error, not proceeding: %+v", err) + return nil, 0, false, nil + } + + // the reason why the StateMinerSectorAllocated function is placed here, if it is outside, + // if the MarshalCBOR function and StateSectorPreCommitInfo function return err, it will be executed + allocated, aerr := m.api.StateMinerSectorAllocated(ctx.Context(), m.maddr, sector.SectorNumber, nil) + if aerr != nil { + log.Errorf("getTicket: api error, checking if sector is allocated: %+v", aerr) + return nil, 0, false, nil } ticketEpoch := epoch - policy.SealRandomnessLookback buf := new(bytes.Buffer) if err := m.maddr.MarshalCBOR(buf); err != nil { - return nil, 0, err + return nil, 0, allocated, err } pci, err := m.api.StateSectorPreCommitInfo(ctx.Context(), m.maddr, sector.SectorNumber, tok) if err != nil { - return nil, 0, xerrors.Errorf("getting precommit info: %w", err) + return nil, 0, allocated, xerrors.Errorf("getting precommit info: %w", err) } if pci != nil { ticketEpoch = pci.Info.SealRandEpoch - if checkTicketExpired(ticketEpoch, epoch) { - return nil, 0, xerrors.Errorf("ticket expired for precommitted sector") + nv, err := m.api.StateNetworkVersion(ctx.Context(), tok) + if err != nil { + return nil, 0, allocated, xerrors.Errorf("getTicket: StateNetworkVersion: api error, not proceeding: %+v", err) } + + msd := policy.GetMaxProveCommitDuration(actors.VersionForNetwork(nv), sector.SectorType) + + if checkProveCommitExpired(pci.PreCommitEpoch, msd, epoch) { + return nil, 0, allocated, xerrors.Errorf("ticket expired for precommitted sector") + } + } + + if pci == nil && allocated { // allocated is true, sector precommitted but expired, will SectorCommitFailed or SectorRemove + return nil, 0, allocated, xerrors.Errorf("sector %s precommitted but expired", sector.SectorNumber) } rand, err := m.api.ChainGetRandomnessFromTickets(ctx.Context(), tok, crypto.DomainSeparationTag_SealRandomness, ticketEpoch, buf.Bytes()) if err != nil { - return nil, 0, err + return nil, 0, allocated, err } - return abi.SealRandomness(rand), ticketEpoch, nil + return abi.SealRandomness(rand), ticketEpoch, allocated, nil } func (m *Sealing) handleGetTicket(ctx statemachine.Context, sector SectorInfo) error { - ticketValue, ticketEpoch, err := m.getTicket(ctx, sector) + ticketValue, ticketEpoch, allocated, err := m.getTicket(ctx, sector) if err != nil { - allocated, aerr := m.api.StateMinerSectorAllocated(ctx.Context(), m.maddr, sector.SectorNumber, nil) - if aerr != nil { - log.Errorf("error checking if sector is allocated: %+v", aerr) - } - if allocated { if sector.CommitMessage != nil { // Some recovery paths with unfortunate timing lead here @@ -182,14 +200,35 @@ func (m *Sealing) handlePreCommit1(ctx statemachine.Context, sector SectorInfo) } } - _, height, err := m.api.ChainHead(ctx.Context()) + tok, height, err := m.api.ChainHead(ctx.Context()) if err != nil { log.Errorf("handlePreCommit1: api error, not proceeding: %+v", err) return nil } if checkTicketExpired(sector.TicketEpoch, height) { - return ctx.Send(SectorOldTicket{}) // go get new ticket + pci, err := m.api.StateSectorPreCommitInfo(ctx.Context(), m.maddr, sector.SectorNumber, tok) + if err != nil { + log.Errorf("handlePreCommit1: StateSectorPreCommitInfo: api error, not proceeding: %+v", err) + return nil + } + + if pci == nil { + return ctx.Send(SectorOldTicket{}) // go get new ticket + } + + nv, err := m.api.StateNetworkVersion(ctx.Context(), tok) + if err != nil { + log.Errorf("handlePreCommit1: StateNetworkVersion: api error, not proceeding: %+v", err) + return nil + } + + msd := policy.GetMaxProveCommitDuration(actors.VersionForNetwork(nv), sector.SectorType) + + // if height > PreCommitEpoch + msd, there is no need to recalculate + if checkProveCommitExpired(pci.PreCommitEpoch, msd, height) { + return ctx.Send(SectorOldTicket{}) // will be removed + } } pc1o, err := m.sealer.SealPreCommit1(sector.sealingCtx(ctx.Context()), m.minerSector(sector.SectorType, sector.SectorNumber), sector.TicketValue, sector.pieceInfos()) @@ -485,7 +524,7 @@ func (m *Sealing) handleCommitting(ctx statemachine.Context, sector SectorInfo) log.Info("scheduling seal proof computation...") - log.Infof("KOMIT %d %x(%d); %x(%d); %v; r:%x; d:%x", sector.SectorNumber, sector.TicketValue, sector.TicketEpoch, sector.SeedValue, sector.SeedEpoch, sector.pieceInfos(), sector.CommR, sector.CommD) + log.Infof("KOMIT %d %x(%d); %x(%d); %v; r:%s; d:%s", sector.SectorNumber, sector.TicketValue, sector.TicketEpoch, sector.SeedValue, sector.SeedEpoch, sector.pieceInfos(), sector.CommR, sector.CommD) if sector.CommD == nil || sector.CommR == nil { return ctx.Send(SectorCommitFailed{xerrors.Errorf("sector had nil commR or commD")}) @@ -624,11 +663,21 @@ func (m *Sealing) handleSubmitCommitAggregate(ctx statemachine.Context, sector S Spt: sector.SectorType, }) if err != nil { - return ctx.Send(SectorCommitFailed{xerrors.Errorf("queuing commit for aggregation failed: %w", err)}) + return ctx.Send(SectorRetrySubmitCommit{}) } if res.Error != "" { - return ctx.Send(SectorCommitFailed{xerrors.Errorf("aggregate error: %s", res.Error)}) + tok, _, err := m.api.ChainHead(ctx.Context()) + if err != nil { + log.Errorf("handleSubmitCommit: api error, not proceeding: %+v", err) + return nil + } + + if err := m.checkCommit(ctx.Context(), sector, sector.Proof, tok); err != nil { + return ctx.Send(SectorCommitFailed{xerrors.Errorf("commit check error: %w", err)}) + } + + return ctx.Send(SectorRetrySubmitCommit{}) } if e, found := res.FailedSectors[sector.SectorNumber]; found { diff --git a/go.mod b/go.mod index d196a8004..9ef427abc 100644 --- a/go.mod +++ b/go.mod @@ -163,8 +163,6 @@ replace github.com/libp2p/go-libp2p-yamux => github.com/libp2p/go-libp2p-yamux v replace github.com/filecoin-project/lotus => ./ -replace github.com/golangci/golangci-lint => github.com/golangci/golangci-lint v1.18.0 - replace github.com/filecoin-project/filecoin-ffi => ./extern/filecoin-ffi replace github.com/filecoin-project/test-vectors => ./extern/test-vectors diff --git a/itests/deals_test.go b/itests/deals_test.go index ce8c70910..f2e106f1f 100644 --- a/itests/deals_test.go +++ b/itests/deals_test.go @@ -21,7 +21,7 @@ func TestDealsWithSealingAndRPC(t *testing.T) { policy.SetPreCommitChallengeDelay(oldDelay) }) - var blockTime = 1 * time.Second + var blockTime = 50 * time.Millisecond client, miner, ens := kit.EnsembleMinimal(t, kit.ThroughRPC(), kit.WithAllSubsystems()) // no mock proofs. ens.InterconnectAll().BeginMining(blockTime) diff --git a/itests/gateway_test.go b/itests/gateway_test.go index 0a349b158..f9e4a0fb6 100644 --- a/itests/gateway_test.go +++ b/itests/gateway_test.go @@ -5,6 +5,7 @@ import ( "context" "fmt" "math" + "net" "testing" "time" @@ -270,7 +271,10 @@ func startNodes( handler, err := gateway.Handler(gwapi) require.NoError(t, err) - srv, _ := kit.CreateRPCServer(t, handler, nil) + l, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + + srv, _ := kit.CreateRPCServer(t, handler, l) // Create a gateway client API that connects to the gateway server var gapi api.Gateway diff --git a/itests/kit/ensemble.go b/itests/kit/ensemble.go index 313ca2194..d6d8fc850 100644 --- a/itests/kit/ensemble.go +++ b/itests/kit/ensemble.go @@ -363,9 +363,6 @@ func (n *Ensemble) Start() *Ensemble { Method: power.Methods.CreateMiner, Params: params, - - GasLimit: 0, - GasPremium: big.NewInt(5252), } signed, err := m.FullNode.FullNode.MpoolPushMessage(ctx, createStorageMinerMsg, nil) require.NoError(n.t, err) diff --git a/itests/kit/ensemble_presets.go b/itests/kit/ensemble_presets.go index 1f836f6d6..5e9e37da5 100644 --- a/itests/kit/ensemble_presets.go +++ b/itests/kit/ensemble_presets.go @@ -37,7 +37,6 @@ func EnsembleWithMinerAndMarketNodes(t *testing.T, opts ...interface{}) (*TestFu blockTime := 100 * time.Millisecond ens := NewEnsemble(t, eopts...).FullNode(&fullnode, nopts...).Miner(&main, &fullnode, mainNodeOpts...).Start() ens.BeginMining(blockTime) - //ens.InterconnectAll().BeginMining(blockTime) marketNodeOpts := []NodeOpt{OwnerAddr(fullnode.DefaultKey), MainMiner(&main), WithSubsystem(SStorageMarket)} marketNodeOpts = append(marketNodeOpts, nopts...) diff --git a/itests/kit/node_miner.go b/itests/kit/node_miner.go index ee144afee..903102f00 100644 --- a/itests/kit/node_miner.go +++ b/itests/kit/node_miner.go @@ -2,23 +2,30 @@ package kit import ( "context" + "encoding/json" "fmt" + "io/ioutil" "net" + "os" + "path/filepath" "strings" "testing" "time" + "github.com/google/uuid" + "github.com/stretchr/testify/require" + "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain/wallet" + "github.com/filecoin-project/lotus/extern/sector-storage/stores" sealing "github.com/filecoin-project/lotus/extern/storage-sealing" "github.com/filecoin-project/lotus/miner" libp2pcrypto "github.com/libp2p/go-libp2p-core/crypto" "github.com/libp2p/go-libp2p-core/peer" "github.com/multiformats/go-multiaddr" - "github.com/stretchr/testify/require" ) type MinerSubsystem int @@ -151,3 +158,41 @@ func (tm *TestMiner) FlushSealingBatches(ctx context.Context) { fmt.Printf("COMMIT BATCH: %+v\n", cb) } } + +const metaFile = "sectorstore.json" + +func (tm *TestMiner) AddStorage(ctx context.Context, t *testing.T, weight uint64, seal, store bool) { + p, err := ioutil.TempDir("", "lotus-testsectors-") + require.NoError(t, err) + + if err := os.MkdirAll(p, 0755); err != nil { + if !os.IsExist(err) { + require.NoError(t, err) + } + } + + _, err = os.Stat(filepath.Join(p, metaFile)) + if !os.IsNotExist(err) { + require.NoError(t, err) + } + + cfg := &stores.LocalStorageMeta{ + ID: stores.ID(uuid.New().String()), + Weight: weight, + CanSeal: seal, + CanStore: store, + } + + if !(cfg.CanStore || cfg.CanSeal) { + t.Fatal("must specify at least one of CanStore or cfg.CanSeal") + } + + b, err := json.MarshalIndent(cfg, "", " ") + require.NoError(t, err) + + err = ioutil.WriteFile(filepath.Join(p, metaFile), b, 0644) + require.NoError(t, err) + + err = tm.StorageAddLocal(ctx, p) + require.NoError(t, err) +} diff --git a/itests/kit/rpc.go b/itests/kit/rpc.go index 9b5951af6..35153eb64 100644 --- a/itests/kit/rpc.go +++ b/itests/kit/rpc.go @@ -16,10 +16,12 @@ import ( ) func CreateRPCServer(t *testing.T, handler http.Handler, listener net.Listener) (*httptest.Server, multiaddr.Multiaddr) { - testServ := httptest.NewServer(handler) - if listener != nil { - testServ.Listener = listener + testServ := &httptest.Server{ + Listener: listener, + Config: &http.Server{Handler: handler}, } + testServ.Start() + t.Cleanup(testServ.Close) t.Cleanup(testServ.CloseClientConnections) @@ -33,7 +35,10 @@ func fullRpc(t *testing.T, f *TestFullNode) *TestFullNode { handler, err := node.FullNodeHandler(f.FullNode, false) require.NoError(t, err) - srv, maddr := CreateRPCServer(t, handler, nil) + l, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + + srv, maddr := CreateRPCServer(t, handler, l) cl, stop, err := client.NewFullNodeRPCV1(context.Background(), "ws://"+srv.Listener.Addr().String()+"/rpc/v1", nil) require.NoError(t, err) diff --git a/itests/nonce_test.go b/itests/nonce_test.go new file mode 100644 index 000000000..b50fcbe26 --- /dev/null +++ b/itests/nonce_test.go @@ -0,0 +1,57 @@ +package itests + +import ( + "context" + "testing" + "time" + + "github.com/filecoin-project/go-state-types/big" + "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/itests/kit" + "github.com/stretchr/testify/require" +) + +func TestNonceIncremental(t *testing.T) { + ctx := context.Background() + + kit.QuietMiningLogs() + + client, _, ens := kit.EnsembleMinimal(t, kit.MockProofs()) + ens.InterconnectAll().BeginMining(10 * time.Millisecond) + + // create a new address where to send funds. + addr, err := client.WalletNew(ctx, types.KTBLS) + require.NoError(t, err) + + // get the existing balance from the default wallet to then split it. + bal, err := client.WalletBalance(ctx, client.DefaultKey.Address) + require.NoError(t, err) + + const iterations = 100 + + // we'll send half our balance (saving the other half for gas), + // in `iterations` increments. + toSend := big.Div(bal, big.NewInt(2)) + each := big.Div(toSend, big.NewInt(iterations)) + + var sms []*types.SignedMessage + for i := 0; i < iterations; i++ { + msg := &types.Message{ + From: client.DefaultKey.Address, + To: addr, + Value: each, + } + + sm, err := client.MpoolPushMessage(ctx, msg, nil) + require.NoError(t, err) + require.EqualValues(t, i, sm.Message.Nonce) + + sms = append(sms, sm) + } + + for _, sm := range sms { + _, err := client.StateWaitMsg(ctx, sm.Cid(), 3, api.LookbackNoLimit, true) + require.NoError(t, err) + } +} diff --git a/itests/sector_finalize_early_test.go b/itests/sector_finalize_early_test.go new file mode 100644 index 000000000..fa5cc9dd3 --- /dev/null +++ b/itests/sector_finalize_early_test.go @@ -0,0 +1,66 @@ +package itests + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/filecoin-project/lotus/extern/storage-sealing/sealiface" + "github.com/filecoin-project/lotus/itests/kit" + "github.com/filecoin-project/lotus/node" + "github.com/filecoin-project/lotus/node/config" + "github.com/filecoin-project/lotus/node/modules" + "github.com/filecoin-project/lotus/node/modules/dtypes" + "github.com/filecoin-project/lotus/node/repo" +) + +func TestDealsWithFinalizeEarly(t *testing.T) { + if testing.Short() { + t.Skip("skipping test in short mode") + } + + kit.QuietMiningLogs() + + var blockTime = 50 * time.Millisecond + + client, miner, ens := kit.EnsembleMinimal(t, kit.ThroughRPC(), kit.ConstructorOpts( + node.ApplyIf(node.IsType(repo.StorageMiner), node.Override(new(dtypes.GetSealingConfigFunc), func() (dtypes.GetSealingConfigFunc, error) { + return func() (sealiface.Config, error) { + cf := config.DefaultStorageMiner() + cf.Sealing.FinalizeEarly = true + return modules.ToSealingConfig(cf), nil + }, nil + })))) // no mock proofs. + ens.InterconnectAll().BeginMining(blockTime) + dh := kit.NewDealHarness(t, client, miner, miner) + + ctx := context.Background() + + miner.AddStorage(ctx, t, 1000000000, true, false) + miner.AddStorage(ctx, t, 1000000000, false, true) + + sl, err := miner.StorageList(ctx) + require.NoError(t, err) + for si, d := range sl { + i, err := miner.StorageInfo(ctx, si) + require.NoError(t, err) + + fmt.Printf("stor d:%d %+v\n", len(d), i) + } + + t.Run("single", func(t *testing.T) { + dh.RunConcurrentDeals(kit.RunConcurrentDealsOpts{N: 1}) + }) + + sl, err = miner.StorageList(ctx) + require.NoError(t, err) + for si, d := range sl { + i, err := miner.StorageInfo(ctx, si) + require.NoError(t, err) + + fmt.Printf("stor d:%d %+v\n", len(d), i) + } +} diff --git a/markets/retrievaladapter/provider.go b/markets/retrievaladapter/provider.go index 3fbb9b635..06ebd989a 100644 --- a/markets/retrievaladapter/provider.go +++ b/markets/retrievaladapter/provider.go @@ -8,6 +8,7 @@ import ( "github.com/filecoin-project/lotus/api/v1api" "github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/storage/sectorblocks" + "github.com/hashicorp/go-multierror" "golang.org/x/xerrors" "github.com/ipfs/go-cid" @@ -104,8 +105,7 @@ func (rpn *retrievalProviderNode) GetChainHead(ctx context.Context) (shared.TipS } func (rpn *retrievalProviderNode) IsUnsealed(ctx context.Context, sectorID abi.SectorNumber, offset abi.UnpaddedPieceSize, length abi.UnpaddedPieceSize) (bool, error) { - //TODO(anteva): maybe true? show on chain info?? - si, err := rpn.secb.SectorsStatus(ctx, sectorID, false) + si, err := rpn.sectorsStatus(ctx, sectorID, true) if err != nil { return false, xerrors.Errorf("failed to get sector info: %w", err) } @@ -120,7 +120,7 @@ func (rpn *retrievalProviderNode) IsUnsealed(ctx context.Context, sectorID abi.S Miner: abi.ActorID(mid), Number: sectorID, }, - ProofType: si.SealProof, //TODO: confirm this is correct + ProofType: si.SealProof, } log.Debugf("will call IsUnsealed now sector=%+v, offset=%d, size=%d", sectorID, offset, length) @@ -139,10 +139,14 @@ func (rpn *retrievalProviderNode) GetRetrievalPricingInput(ctx context.Context, } tsk := head.Key() + var mErr error + for _, dealID := range storageDeals { ds, err := rpn.full.StateMarketStorageDeal(ctx, dealID, tsk) if err != nil { - return resp, xerrors.Errorf("failed to look up deal %d on chain: err=%w", dealID, err) + log.Warnf("failed to look up deal %d on chain: err=%w", dealID, err) + mErr = multierror.Append(mErr, err) + continue } if ds.Proposal.VerifiedDeal { resp.VerifiedDeal = true @@ -162,7 +166,11 @@ func (rpn *retrievalProviderNode) GetRetrievalPricingInput(ctx context.Context, // Note: The piece size can never actually be zero. We only use it to here // to assert that we didn't find a matching piece. if resp.PieceSize == 0 { - return resp, xerrors.New("failed to find matching piece") + if mErr == nil { + return resp, xerrors.New("failed to find matching piece") + } + + return resp, xerrors.Errorf("failed to fetch storage deal state: %w", mErr) } return resp, nil diff --git a/markets/retrievaladapter/provider_test.go b/markets/retrievaladapter/provider_test.go index 5cdf5d060..eca3b1152 100644 --- a/markets/retrievaladapter/provider_test.go +++ b/markets/retrievaladapter/provider_test.go @@ -66,6 +66,31 @@ func TestGetPricingInput(t *testing.T) { expectedErrorStr: "failed to find matching piece", }, + "error when fails to fetch deal state": { + fFnc: func(n *mocks.MockFullNode) { + out1 := &api.MarketDeal{ + Proposal: market.DealProposal{ + PieceCID: pcid, + PieceSize: paddedSize, + }, + } + out2 := &api.MarketDeal{ + Proposal: market.DealProposal{ + PieceCID: testnet.GenerateCids(1)[0], + VerifiedDeal: true, + }, + } + + n.EXPECT().ChainHead(gomock.Any()).Return(tsk, nil).Times(1) + gomock.InOrder( + n.EXPECT().StateMarketStorageDeal(gomock.Any(), deals[0], key).Return(out1, xerrors.New("error 1")), + n.EXPECT().StateMarketStorageDeal(gomock.Any(), deals[1], key).Return(out2, xerrors.New("error 2")), + ) + + }, + expectedErrorStr: "failed to fetch storage deal state", + }, + "verified is true even if one deal is verified and we get the correct piecesize": { fFnc: func(n *mocks.MockFullNode) { out1 := &api.MarketDeal{ @@ -92,6 +117,32 @@ func TestGetPricingInput(t *testing.T) { expectedVerified: true, }, + "success even if one deal state fetch errors out but the other deal is verified and has the required piececid": { + fFnc: func(n *mocks.MockFullNode) { + out1 := &api.MarketDeal{ + Proposal: market.DealProposal{ + PieceCID: testnet.GenerateCids(1)[0], + }, + } + out2 := &api.MarketDeal{ + Proposal: market.DealProposal{ + PieceCID: pcid, + PieceSize: paddedSize, + VerifiedDeal: true, + }, + } + + n.EXPECT().ChainHead(gomock.Any()).Return(tsk, nil).Times(1) + gomock.InOrder( + n.EXPECT().StateMarketStorageDeal(gomock.Any(), deals[0], key).Return(out1, xerrors.New("some error")), + n.EXPECT().StateMarketStorageDeal(gomock.Any(), deals[1], key).Return(out2, nil), + ) + + }, + expectedPieceSize: unpaddedSize, + expectedVerified: true, + }, + "verified is false if both deals are unverified and we get the correct piece size": { fFnc: func(n *mocks.MockFullNode) { out1 := &api.MarketDeal{ diff --git a/node/config/def.go b/node/config/def.go index 4b993eb2e..f19466fa8 100644 --- a/node/config/def.go +++ b/node/config/def.go @@ -155,6 +155,10 @@ type SealingConfig struct { // time buffer for forceful batch submission before sectors/deals in batch would start expiring CommitBatchSlack Duration + // network BaseFee below which to stop doing commit aggregation, instead + // submitting proofs to the chain individually + AggregateAboveBaseFee types.FIL + TerminateBatchMax uint64 TerminateBatchMin uint64 TerminateBatchWait Duration @@ -341,6 +345,8 @@ func DefaultStorageMiner() *StorageMiner { CommitBatchWait: Duration(24 * time.Hour), // this can be up to 30 days CommitBatchSlack: Duration(1 * time.Hour), // time buffer for forceful batch submission before sectors/deals in batch would start expiring, higher value will lower the chances for message fail due to expiration + AggregateAboveBaseFee: types.FIL(types.BigMul(types.PicoFil, types.NewInt(150))), // 0.15 nFIL + TerminateBatchMin: 1, TerminateBatchMax: 100, TerminateBatchWait: Duration(5 * time.Minute), diff --git a/node/impl/full/state.go b/node/impl/full/state.go index b3639c5e0..d8545ae13 100644 --- a/node/impl/full/state.go +++ b/node/impl/full/state.go @@ -705,7 +705,7 @@ func (a *StateAPI) StateChangedActors(ctx context.Context, old cid.Cid, new cid. return nil, xerrors.Errorf("failed to load new state tree: %w", err) } - return state.Diff(oldTree, newTree) + return state.Diff(ctx, oldTree, newTree) } func (a *StateAPI) StateMinerSectorCount(ctx context.Context, addr address.Address, tsk types.TipSetKey) (api.MinerSectors, error) { diff --git a/node/modules/storageminer.go b/node/modules/storageminer.go index dac36d0fe..7eb1cc3b5 100644 --- a/node/modules/storageminer.go +++ b/node/modules/storageminer.go @@ -212,8 +212,6 @@ type StorageMinerParams struct { } func StorageMiner(fc config.MinerFeeConfig) func(params StorageMinerParams) (*storage.Miner, error) { - fmt.Printf("setting up storage miner with %#v \n", fc) - return func(params StorageMinerParams) (*storage.Miner, error) { var ( ds = params.MetadataDS @@ -689,13 +687,13 @@ func RemoteStorage(lstor *stores.Local, si stores.SectorIndex, sa sectorstorage. return stores.NewRemote(lstor, si, http.Header(sa), sc.ParallelFetchLimit, &stores.DefaultPartialFileHandler{}) } -func SectorStorage(mctx helpers.MetricsCtx, lc fx.Lifecycle, lstor *stores.Local, stor *stores.Remote, ls stores.LocalStorage, si stores.SectorIndex, sc sectorstorage.SealerConfig, sa sectorstorage.StorageAuth, ds dtypes.MetadataDS) (*sectorstorage.Manager, error) { +func SectorStorage(mctx helpers.MetricsCtx, lc fx.Lifecycle, lstor *stores.Local, stor *stores.Remote, ls stores.LocalStorage, si stores.SectorIndex, sc sectorstorage.SealerConfig, ds dtypes.MetadataDS) (*sectorstorage.Manager, error) { ctx := helpers.LifecycleCtx(mctx, lc) wsts := statestore.New(namespace.Wrap(ds, WorkerCallsPrefix)) smsts := statestore.New(namespace.Wrap(ds, ManagerWorkPrefix)) - sst, err := sectorstorage.New(ctx, lstor, stor, ls, si, sc, sa, wsts, smsts) + sst, err := sectorstorage.New(ctx, lstor, stor, ls, si, sc, wsts, smsts) if err != nil { return nil, err } @@ -719,7 +717,6 @@ func StorageAuth(ctx helpers.MetricsCtx, ca v0api.Common) (sectorstorage.Storage } func StorageAuthWithURL(url string) func(ctx helpers.MetricsCtx, ca v0api.Common) (sectorstorage.StorageAuth, error) { - log.Infow("Setting auth token based on URL", "url", url) return func(ctx helpers.MetricsCtx, ca v0api.Common) (sectorstorage.StorageAuth, error) { s := strings.Split(url, ":") if len(s) != 2 { @@ -873,11 +870,12 @@ func NewSetSealConfigFunc(r repo.LockedRepo) (dtypes.SetSealingConfigFunc, error PreCommitBatchWait: config.Duration(cfg.PreCommitBatchWait), PreCommitBatchSlack: config.Duration(cfg.PreCommitBatchSlack), - AggregateCommits: cfg.AggregateCommits, - MinCommitBatch: cfg.MinCommitBatch, - MaxCommitBatch: cfg.MaxCommitBatch, - CommitBatchWait: config.Duration(cfg.CommitBatchWait), - CommitBatchSlack: config.Duration(cfg.CommitBatchSlack), + AggregateCommits: cfg.AggregateCommits, + MinCommitBatch: cfg.MinCommitBatch, + MaxCommitBatch: cfg.MaxCommitBatch, + CommitBatchWait: config.Duration(cfg.CommitBatchWait), + CommitBatchSlack: config.Duration(cfg.CommitBatchSlack), + AggregateAboveBaseFee: types.FIL(cfg.AggregateAboveBaseFee), TerminateBatchMax: cfg.TerminateBatchMax, TerminateBatchMin: cfg.TerminateBatchMin, @@ -888,32 +886,37 @@ func NewSetSealConfigFunc(r repo.LockedRepo) (dtypes.SetSealingConfigFunc, error }, nil } +func ToSealingConfig(cfg *config.StorageMiner) sealiface.Config { + return sealiface.Config{ + MaxWaitDealsSectors: cfg.Sealing.MaxWaitDealsSectors, + MaxSealingSectors: cfg.Sealing.MaxSealingSectors, + MaxSealingSectorsForDeals: cfg.Sealing.MaxSealingSectorsForDeals, + WaitDealsDelay: time.Duration(cfg.Sealing.WaitDealsDelay), + AlwaysKeepUnsealedCopy: cfg.Sealing.AlwaysKeepUnsealedCopy, + FinalizeEarly: cfg.Sealing.FinalizeEarly, + + BatchPreCommits: cfg.Sealing.BatchPreCommits, + MaxPreCommitBatch: cfg.Sealing.MaxPreCommitBatch, + PreCommitBatchWait: time.Duration(cfg.Sealing.PreCommitBatchWait), + PreCommitBatchSlack: time.Duration(cfg.Sealing.PreCommitBatchSlack), + + AggregateCommits: cfg.Sealing.AggregateCommits, + MinCommitBatch: cfg.Sealing.MinCommitBatch, + MaxCommitBatch: cfg.Sealing.MaxCommitBatch, + CommitBatchWait: time.Duration(cfg.Sealing.CommitBatchWait), + CommitBatchSlack: time.Duration(cfg.Sealing.CommitBatchSlack), + AggregateAboveBaseFee: types.BigInt(cfg.Sealing.AggregateAboveBaseFee), + + TerminateBatchMax: cfg.Sealing.TerminateBatchMax, + TerminateBatchMin: cfg.Sealing.TerminateBatchMin, + TerminateBatchWait: time.Duration(cfg.Sealing.TerminateBatchWait), + } +} + func NewGetSealConfigFunc(r repo.LockedRepo) (dtypes.GetSealingConfigFunc, error) { return func() (out sealiface.Config, err error) { err = readCfg(r, func(cfg *config.StorageMiner) { - out = sealiface.Config{ - MaxWaitDealsSectors: cfg.Sealing.MaxWaitDealsSectors, - MaxSealingSectors: cfg.Sealing.MaxSealingSectors, - MaxSealingSectorsForDeals: cfg.Sealing.MaxSealingSectorsForDeals, - WaitDealsDelay: time.Duration(cfg.Sealing.WaitDealsDelay), - AlwaysKeepUnsealedCopy: cfg.Sealing.AlwaysKeepUnsealedCopy, - FinalizeEarly: cfg.Sealing.FinalizeEarly, - - BatchPreCommits: cfg.Sealing.BatchPreCommits, - MaxPreCommitBatch: cfg.Sealing.MaxPreCommitBatch, - PreCommitBatchWait: time.Duration(cfg.Sealing.PreCommitBatchWait), - PreCommitBatchSlack: time.Duration(cfg.Sealing.PreCommitBatchSlack), - - AggregateCommits: cfg.Sealing.AggregateCommits, - MinCommitBatch: cfg.Sealing.MinCommitBatch, - MaxCommitBatch: cfg.Sealing.MaxCommitBatch, - CommitBatchWait: time.Duration(cfg.Sealing.CommitBatchWait), - CommitBatchSlack: time.Duration(cfg.Sealing.CommitBatchSlack), - - TerminateBatchMax: cfg.Sealing.TerminateBatchMax, - TerminateBatchMin: cfg.Sealing.TerminateBatchMin, - TerminateBatchWait: time.Duration(cfg.Sealing.TerminateBatchWait), - } + out = ToSealingConfig(cfg) }) return }, nil diff --git a/node/rpc.go b/node/rpc.go index af0898d67..b283f6ac1 100644 --- a/node/rpc.go +++ b/node/rpc.go @@ -118,7 +118,6 @@ func MinerHandler(a api.StorageMiner, permissioned bool) (http.Handler, error) { mapi = api.PermissionedStorMinerAPI(mapi) } - //_, _ = a.ActorAddress(context.Background()) readerHandler, readerServerOpt := rpcenc.ReaderParamDecoder() rpcServer := jsonrpc.NewServer(readerServerOpt) rpcServer.Register("Filecoin", mapi) diff --git a/storage/sectorblocks/blocks.go b/storage/sectorblocks/blocks.go index ccf2c67d2..ad4ffc0db 100644 --- a/storage/sectorblocks/blocks.go +++ b/storage/sectorblocks/blocks.go @@ -47,7 +47,7 @@ func DsKeyToDealID(key datastore.Key) (uint64, error) { return dealID, nil } -type SectorBuilder interface { // todo: apify, make work remote +type SectorBuilder interface { SectorAddPieceToAny(ctx context.Context, size abi.UnpaddedPieceSize, r storage.Data, d api.PieceDealInfo) (api.SectorOffset, error) SectorsStatus(ctx context.Context, sid abi.SectorNumber, showOnChainInfo bool) (api.SectorInfo, error) }