From 07487b6d2046ea33fbb68dee4af5622667b573c3 Mon Sep 17 00:00:00 2001 From: Aayush Rajasekaran Date: Mon, 28 Jun 2021 22:43:14 -0400 Subject: [PATCH 01/21] Update version.go --- build/version.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build/version.go b/build/version.go index 5a4a494fc..c6a1be3e2 100644 --- a/build/version.go +++ b/build/version.go @@ -34,7 +34,7 @@ func buildType() string { } // BuildVersion is the local build version, set by build system -const BuildVersion = "1.11.0-dev" +const BuildVersion = "1.11.1-dev" func UserVersion() string { if os.Getenv("LOTUS_VERSION_IGNORE_COMMIT") == "1" { From 5c3d67a81144f035c663829c54b5ff0ba57aae10 Mon Sep 17 00:00:00 2001 From: Jerry <1032246642@qq.com> Date: Tue, 29 Jun 2021 14:06:41 +0800 Subject: [PATCH 02/21] fix: miner balance is not enough, so that ProveCommitAggregate msg exec failed --- extern/storage-sealing/commit_batch.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/extern/storage-sealing/commit_batch.go b/extern/storage-sealing/commit_batch.go index 3c0af1176..ad0a106a0 100644 --- a/extern/storage-sealing/commit_batch.go +++ b/extern/storage-sealing/commit_batch.go @@ -305,14 +305,16 @@ func (b *CommitBatcher) processBatch(cfg sealiface.Config) ([]sealiface.CommitBa aggFee := policy.AggregateNetworkFee(nv, len(infos), bf) - goodFunds := big.Add(maxFee, big.Add(collateral, aggFee)) + needFunds := big.Add(collateral, aggFee) - from, _, err := b.addrSel(b.mctx, mi, api.CommitAddr, goodFunds, collateral) + goodFunds := big.Add(maxFee, needFunds) + + from, _, err := b.addrSel(b.mctx, mi, api.CommitAddr, goodFunds, needFunds) if err != nil { return []sealiface.CommitBatchRes{res}, xerrors.Errorf("no good address found: %w", err) } - mcid, err := b.api.SendMsg(b.mctx, from, b.maddr, miner.Methods.ProveCommitAggregate, collateral, maxFee, enc.Bytes()) + mcid, err := b.api.SendMsg(b.mctx, from, b.maddr, miner.Methods.ProveCommitAggregate, needFunds, maxFee, enc.Bytes()) if err != nil { return []sealiface.CommitBatchRes{res}, xerrors.Errorf("sending message failed: %w", err) } From 73704c92bab22b8b11aa7c4dc0cb3f723a523e73 Mon Sep 17 00:00:00 2001 From: Jerry <1032246642@qq.com> Date: Tue, 29 Jun 2021 14:36:26 +0800 Subject: [PATCH 03/21] ensure agg fee is adequate --- extern/storage-sealing/commit_batch.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/extern/storage-sealing/commit_batch.go b/extern/storage-sealing/commit_batch.go index ad0a106a0..f8b7a0392 100644 --- a/extern/storage-sealing/commit_batch.go +++ b/extern/storage-sealing/commit_batch.go @@ -32,6 +32,9 @@ import ( const arp = abi.RegisteredAggregationProof_SnarkPackV1 +var aggFeeNum = big.NewInt(110) +var aggFeeDen = big.NewInt(100) + type CommitBatcherApi interface { SendMsg(ctx context.Context, from, to address.Address, method abi.MethodNum, value, maxFee abi.TokenAmount, params []byte) (cid.Cid, error) StateMinerInfo(context.Context, address.Address, TipSetToken) (miner.MinerInfo, error) @@ -303,7 +306,7 @@ func (b *CommitBatcher) processBatch(cfg sealiface.Config) ([]sealiface.CommitBa return []sealiface.CommitBatchRes{res}, xerrors.Errorf("getting network version: %s", err) } - aggFee := policy.AggregateNetworkFee(nv, len(infos), bf) + aggFee := big.Div(big.Mul(policy.AggregateNetworkFee(nv, len(infos), bf), aggFeeNum), aggFeeDen) needFunds := big.Add(collateral, aggFee) From df86efbd43ada6797a1b5cafaf47422816a5880c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 29 Jun 2021 11:27:06 +0200 Subject: [PATCH 04/21] docsgen --- build/openrpc/full.json.gz | Bin 23440 -> 23440 bytes build/openrpc/miner.json.gz | Bin 8102 -> 8102 bytes build/openrpc/worker.json.gz | Bin 2513 -> 2513 bytes documentation/en/cli-lotus-miner.md | 2 +- documentation/en/cli-lotus-worker.md | 2 +- documentation/en/cli-lotus.md | 2 +- 6 files changed, 3 insertions(+), 3 deletions(-) diff --git a/build/openrpc/full.json.gz b/build/openrpc/full.json.gz index 8306fab569c17d4acaffb7bcb42fdeedd2b2f339..9723932bf93ea4af01b44970a31edce361e27e05 100644 GIT binary patch delta 24 ecmbQRopAyXwKHv-x3Tko6vzIiW%B+@Sr`D0H42vi delta 24 ecmbQRopAyXwKJK`-`M#-isN^z&V2u+EDQjLL<%(k diff --git a/build/openrpc/miner.json.gz b/build/openrpc/miner.json.gz index b57a1b3191e1a54cc00a88523c8092f5542bf6ee..cc9a8948a2e85881f60d7ba36719fe394826a91d 100644 GIT binary patch delta 21 ccmZ2xzs!C@D&U}4W67FJI(K{?J8nm003r=2&w=8 delta 21 ccmca8d{KBpJ>$8J4W67F(##Q`x{4SW0A5)KmH+?% diff --git a/documentation/en/cli-lotus-miner.md b/documentation/en/cli-lotus-miner.md index c8abb574b..aab71bede 100644 --- a/documentation/en/cli-lotus-miner.md +++ b/documentation/en/cli-lotus-miner.md @@ -7,7 +7,7 @@ USAGE: lotus-miner [global options] command [command options] [arguments...] VERSION: - 1.11.0-dev + 1.11.1-dev COMMANDS: init Initialize a lotus miner repo diff --git a/documentation/en/cli-lotus-worker.md b/documentation/en/cli-lotus-worker.md index 0b29da503..dbfc8da29 100644 --- a/documentation/en/cli-lotus-worker.md +++ b/documentation/en/cli-lotus-worker.md @@ -7,7 +7,7 @@ USAGE: lotus-worker [global options] command [command options] [arguments...] VERSION: - 1.11.0-dev + 1.11.1-dev COMMANDS: run Start lotus worker diff --git a/documentation/en/cli-lotus.md b/documentation/en/cli-lotus.md index a1583d522..41268f207 100644 --- a/documentation/en/cli-lotus.md +++ b/documentation/en/cli-lotus.md @@ -7,7 +7,7 @@ USAGE: lotus [global options] command [command options] [arguments...] VERSION: - 1.11.0-dev + 1.11.1-dev COMMANDS: daemon Start a lotus daemon process From 022d4b548a2eed048db52aac194a8e30ec4a9930 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 29 Jun 2021 20:41:40 +0200 Subject: [PATCH 05/21] shed tool to estimate aggregate network fees --- cmd/lotus-shed/math.go | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/cmd/lotus-shed/math.go b/cmd/lotus-shed/math.go index 434559f09..c6d4ed0c9 100644 --- a/cmd/lotus-shed/math.go +++ b/cmd/lotus-shed/math.go @@ -8,8 +8,10 @@ import ( "strings" "github.com/urfave/cli/v2" + "golang.org/x/xerrors" "github.com/filecoin-project/lotus/chain/types" + miner5 "github.com/filecoin-project/specs-actors/v5/actors/builtin/miner" ) var mathCmd = &cli.Command{ @@ -17,6 +19,7 @@ var mathCmd = &cli.Command{ Usage: "utility commands around doing math on a list of numbers", Subcommands: []*cli.Command{ mathSumCmd, + mathAggFeesCmd, }, } @@ -101,3 +104,30 @@ var mathSumCmd = &cli.Command{ return nil }, } + +var mathAggFeesCmd = &cli.Command{ + Name: "agg-fees", + Flags: []cli.Flag{ + &cli.IntFlag{ + Name: "size", + Required: true, + }, + &cli.StringFlag{ + Name: "base-fee", + Usage: "baseFee aFIL", + Required: true, + }, + }, + Action: func(cctx *cli.Context) error { + as := cctx.Int("size") + + bf, err := types.BigFromString(cctx.String("base-fee")) + if err != nil { + return xerrors.Errorf("parsing basefee: %w", err) + } + + fmt.Println(types.FIL(miner5.AggregateNetworkFee(as, bf))) + + return nil + }, +} From a4342f3997e0038f5dc283eab600bf69cbf93a13 Mon Sep 17 00:00:00 2001 From: wangchao Date: Mon, 28 Jun 2021 17:50:31 +0800 Subject: [PATCH 06/21] remove precommit check in handleCommitFailed --- extern/storage-sealing/states_failed.go | 28 +------------------------ 1 file changed, 1 insertion(+), 27 deletions(-) diff --git a/extern/storage-sealing/states_failed.go b/extern/storage-sealing/states_failed.go index 7bef19b92..3a0177978 100644 --- a/extern/storage-sealing/states_failed.go +++ b/extern/storage-sealing/states_failed.go @@ -182,7 +182,7 @@ func (m *Sealing) handleComputeProofFailed(ctx statemachine.Context, sector Sect } func (m *Sealing) handleCommitFailed(ctx statemachine.Context, sector SectorInfo) error { - tok, height, err := m.api.ChainHead(ctx.Context()) + tok, _, err := m.api.ChainHead(ctx.Context()) if err != nil { log.Errorf("handleCommitting: api error, not proceeding: %+v", err) return nil @@ -216,32 +216,6 @@ func (m *Sealing) handleCommitFailed(ctx statemachine.Context, sector SectorInfo } } - if err := checkPrecommit(ctx.Context(), m.maddr, sector, tok, height, m.api); err != nil { - switch err.(type) { - case *ErrApi: - log.Errorf("handleCommitFailed: api error, not proceeding: %+v", err) - return nil - case *ErrBadCommD: - return ctx.Send(SectorSealPreCommit1Failed{xerrors.Errorf("bad CommD error: %w", err)}) - case *ErrExpiredTicket: - return ctx.Send(SectorTicketExpired{xerrors.Errorf("ticket expired error, removing sector: %w", err)}) - case *ErrBadTicket: - return ctx.Send(SectorTicketExpired{xerrors.Errorf("expired ticket, removing sector: %w", err)}) - case *ErrInvalidDeals: - log.Warnf("invalid deals in sector %d: %v", sector.SectorNumber, err) - return ctx.Send(SectorInvalidDealIDs{Return: RetCommitFailed}) - case *ErrExpiredDeals: - return ctx.Send(SectorDealsExpired{xerrors.Errorf("sector deals expired: %w", err)}) - case nil: - return ctx.Send(SectorChainPreCommitFailed{xerrors.Errorf("no precommit: %w", err)}) - case *ErrPrecommitOnChain: - // noop, this is expected - case *ErrSectorNumberAllocated: - // noop, already committed? - default: - return xerrors.Errorf("checkPrecommit sanity check error (%T): %w", err, err) - } - } if err := m.checkCommit(ctx.Context(), sector, sector.Proof, tok); err != nil { switch err.(type) { From 3dd3476bfd6e0b7b30fe88958f8ea09ac72fc130 Mon Sep 17 00:00:00 2001 From: llifezou Date: Wed, 30 Jun 2021 16:32:44 +0800 Subject: [PATCH 07/21] fix ticket expiration check, otherwise it may cause a large number of loops to retry GetTicket when retrying PreCommit1 --- extern/storage-sealing/states_sealing.go | 38 ++++++++++++++++++++++-- 1 file changed, 35 insertions(+), 3 deletions(-) diff --git a/extern/storage-sealing/states_sealing.go b/extern/storage-sealing/states_sealing.go index 360eeafa6..b6df0f79d 100644 --- a/extern/storage-sealing/states_sealing.go +++ b/extern/storage-sealing/states_sealing.go @@ -105,6 +105,10 @@ func checkTicketExpired(ticket, head abi.ChainEpoch) bool { return head-ticket > MaxTicketAge // TODO: allow configuring expected seal durations } +func checkProveCommitExpired(preCommitEpoch, msd abi.ChainEpoch, currEpoch abi.ChainEpoch) bool { + return currEpoch > preCommitEpoch+msd +} + func (m *Sealing) getTicket(ctx statemachine.Context, sector SectorInfo) (abi.SealRandomness, abi.ChainEpoch, error) { tok, epoch, err := m.api.ChainHead(ctx.Context()) if err != nil { @@ -126,7 +130,14 @@ func (m *Sealing) getTicket(ctx statemachine.Context, sector SectorInfo) (abi.Se if pci != nil { ticketEpoch = pci.Info.SealRandEpoch - if checkTicketExpired(ticketEpoch, epoch) { + nv, err := m.api.StateNetworkVersion(ctx.Context(), tok) + if err != nil { + return nil, 0, xerrors.Errorf("getTicket: StateNetworkVersion: api error, not proceeding: %+v", err) + } + + msd := policy.GetMaxProveCommitDuration(actors.VersionForNetwork(nv), sector.SectorType) + + if checkProveCommitExpired(pci.PreCommitEpoch, msd, epoch) { return nil, 0, xerrors.Errorf("ticket expired for precommitted sector") } } @@ -182,14 +193,35 @@ func (m *Sealing) handlePreCommit1(ctx statemachine.Context, sector SectorInfo) } } - _, height, err := m.api.ChainHead(ctx.Context()) + tok, height, err := m.api.ChainHead(ctx.Context()) if err != nil { log.Errorf("handlePreCommit1: api error, not proceeding: %+v", err) return nil } if checkTicketExpired(sector.TicketEpoch, height) { - return ctx.Send(SectorOldTicket{}) // go get new ticket + pci, err := m.api.StateSectorPreCommitInfo(ctx.Context(), m.maddr, sector.SectorNumber, tok) + if err != nil { + log.Errorf("handlePreCommit1: StateSectorPreCommitInfo: api error, not proceeding: %+v", err) + return nil + } + + if pci == nil { + return ctx.Send(SectorOldTicket{}) // go get new ticket + } + + nv, err := m.api.StateNetworkVersion(ctx.Context(), tok) + if err != nil { + log.Errorf("handlePreCommit1: StateNetworkVersion: api error, not proceeding: %+v", err) + return nil + } + + msd := policy.GetMaxProveCommitDuration(actors.VersionForNetwork(nv), sector.SectorType) + + // if height > PreCommitEpoch + msd, there is no need to recalculate + if checkProveCommitExpired(pci.PreCommitEpoch, msd, height) { + return ctx.Send(SectorOldTicket{}) // will be removed + } } pc1o, err := m.sealer.SealPreCommit1(sector.sealingCtx(ctx.Context()), m.minerSector(sector.SectorType, sector.SectorNumber), sector.TicketValue, sector.pieceInfos()) From b0b070ffe7fda00ffc2f00d9bead55d0902855b9 Mon Sep 17 00:00:00 2001 From: johnli-helloworld Date: Wed, 30 Jun 2021 16:56:40 +0800 Subject: [PATCH 08/21] to optimize the batchwait --- extern/storage-sealing/commit_batch.go | 22 ++++++++++++++++------ extern/storage-sealing/precommit_batch.go | 22 ++++++++++++++++------ 2 files changed, 32 insertions(+), 12 deletions(-) diff --git a/extern/storage-sealing/commit_batch.go b/extern/storage-sealing/commit_batch.go index ea246136d..4c04e4f90 100644 --- a/extern/storage-sealing/commit_batch.go +++ b/extern/storage-sealing/commit_batch.go @@ -106,6 +106,7 @@ func (b *CommitBatcher) run() { panic(err) } + timer := time.NewTimer(b.batchWait(cfg.CommitBatchWait, cfg.CommitBatchSlack)) for { if forceRes != nil { forceRes <- lastMsg @@ -121,7 +122,7 @@ func (b *CommitBatcher) run() { return case <-b.notify: sendAboveMax = true - case <-b.batchWait(cfg.CommitBatchWait, cfg.CommitBatchSlack): + case <-timer.C: // do nothing case fr := <-b.force: // user triggered forceRes = fr @@ -132,17 +133,26 @@ func (b *CommitBatcher) run() { if err != nil { log.Warnw("CommitBatcher processBatch error", "error", err) } + + if !timer.Stop() { + select { + case <-timer.C: + default: + } + } + + timer.Reset(b.batchWait(cfg.CommitBatchWait, cfg.CommitBatchSlack)) } } -func (b *CommitBatcher) batchWait(maxWait, slack time.Duration) <-chan time.Time { +func (b *CommitBatcher) batchWait(maxWait, slack time.Duration) time.Duration { now := time.Now() b.lk.Lock() defer b.lk.Unlock() if len(b.todo) == 0 { - return nil + return maxWait } var cutoff time.Time @@ -160,12 +170,12 @@ func (b *CommitBatcher) batchWait(maxWait, slack time.Duration) <-chan time.Time } if cutoff.IsZero() { - return time.After(maxWait) + return maxWait } cutoff = cutoff.Add(-slack) if cutoff.Before(now) { - return time.After(time.Nanosecond) // can't return 0 + return time.Nanosecond // can't return 0 } wait := cutoff.Sub(now) @@ -173,7 +183,7 @@ func (b *CommitBatcher) batchWait(maxWait, slack time.Duration) <-chan time.Time wait = maxWait } - return time.After(wait) + return wait } func (b *CommitBatcher) maybeStartBatch(notif bool) ([]sealiface.CommitBatchRes, error) { diff --git a/extern/storage-sealing/precommit_batch.go b/extern/storage-sealing/precommit_batch.go index 3dc3510c2..1b43bc6b9 100644 --- a/extern/storage-sealing/precommit_batch.go +++ b/extern/storage-sealing/precommit_batch.go @@ -88,6 +88,7 @@ func (b *PreCommitBatcher) run() { panic(err) } + timer := time.NewTimer(b.batchWait(cfg.CommitBatchWait, cfg.CommitBatchSlack)) for { if forceRes != nil { forceRes <- lastRes @@ -102,7 +103,7 @@ func (b *PreCommitBatcher) run() { return case <-b.notify: sendAboveMax = true - case <-b.batchWait(cfg.PreCommitBatchWait, cfg.PreCommitBatchSlack): + case <-timer.C: // do nothing case fr := <-b.force: // user triggered forceRes = fr @@ -113,17 +114,26 @@ func (b *PreCommitBatcher) run() { if err != nil { log.Warnw("PreCommitBatcher processBatch error", "error", err) } + + if !timer.Stop() { + select { + case <-timer.C: + default: + } + } + + timer.Reset(b.batchWait(cfg.CommitBatchWait, cfg.CommitBatchSlack)) } } -func (b *PreCommitBatcher) batchWait(maxWait, slack time.Duration) <-chan time.Time { +func (b *PreCommitBatcher) batchWait(maxWait, slack time.Duration) time.Duration { now := time.Now() b.lk.Lock() defer b.lk.Unlock() if len(b.todo) == 0 { - return nil + return maxWait } var cutoff time.Time @@ -141,12 +151,12 @@ func (b *PreCommitBatcher) batchWait(maxWait, slack time.Duration) <-chan time.T } if cutoff.IsZero() { - return time.After(maxWait) + return maxWait } cutoff = cutoff.Add(-slack) if cutoff.Before(now) { - return time.After(time.Nanosecond) // can't return 0 + return time.Nanosecond // can't return 0 } wait := cutoff.Sub(now) @@ -154,7 +164,7 @@ func (b *PreCommitBatcher) batchWait(maxWait, slack time.Duration) <-chan time.T wait = maxWait } - return time.After(wait) + return wait } func (b *PreCommitBatcher) maybeStartBatch(notif bool) ([]sealiface.PreCommitBatchRes, error) { From f45340461dae942ac91d2788dc09c6acb3c46e11 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Wed, 30 Jun 2021 15:19:29 +0200 Subject: [PATCH 09/21] gofmt --- extern/storage-sealing/states_failed.go | 1 - 1 file changed, 1 deletion(-) diff --git a/extern/storage-sealing/states_failed.go b/extern/storage-sealing/states_failed.go index 3a0177978..201c4456f 100644 --- a/extern/storage-sealing/states_failed.go +++ b/extern/storage-sealing/states_failed.go @@ -216,7 +216,6 @@ func (m *Sealing) handleCommitFailed(ctx statemachine.Context, sector SectorInfo } } - if err := m.checkCommit(ctx.Context(), sector, sector.Proof, tok); err != nil { switch err.(type) { case *ErrApi: From 229d5e5c8021144082dd55fca45951ae1b3d7f5b Mon Sep 17 00:00:00 2001 From: johnli-helloworld Date: Fri, 25 Jun 2021 17:25:17 +0800 Subject: [PATCH 10/21] handleSubmitCommitAggregate() exception handling --- extern/storage-sealing/fsm.go | 1 + extern/storage-sealing/states_sealing.go | 14 ++++++++++++-- 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/extern/storage-sealing/fsm.go b/extern/storage-sealing/fsm.go index d3e1e9d52..d04aef790 100644 --- a/extern/storage-sealing/fsm.go +++ b/extern/storage-sealing/fsm.go @@ -115,6 +115,7 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto SubmitCommitAggregate: planOne( on(SectorCommitAggregateSent{}, CommitWait), on(SectorCommitFailed{}, CommitFailed), + on(SectorRetrySubmitCommit{}, SubmitCommit), ), CommitWait: planOne( on(SectorProving{}, FinalizeSector), diff --git a/extern/storage-sealing/states_sealing.go b/extern/storage-sealing/states_sealing.go index 360eeafa6..bd566cdcd 100644 --- a/extern/storage-sealing/states_sealing.go +++ b/extern/storage-sealing/states_sealing.go @@ -624,11 +624,21 @@ func (m *Sealing) handleSubmitCommitAggregate(ctx statemachine.Context, sector S Spt: sector.SectorType, }) if err != nil { - return ctx.Send(SectorCommitFailed{xerrors.Errorf("queuing commit for aggregation failed: %w", err)}) + return ctx.Send(SectorRetrySubmitCommit{}) } if res.Error != "" { - return ctx.Send(SectorCommitFailed{xerrors.Errorf("aggregate error: %s", res.Error)}) + tok, _, err := m.api.ChainHead(ctx.Context()) + if err != nil { + log.Errorf("handleSubmitCommit: api error, not proceeding: %+v", err) + return nil + } + + if err := m.checkCommit(ctx.Context(), sector, sector.Proof, tok); err != nil { + return ctx.Send(SectorCommitFailed{xerrors.Errorf("commit check error: %w", err)}) + } + + return ctx.Send(SectorRetrySubmitCommit{}) } if e, found := res.FailedSectors[sector.SectorNumber]; found { From 73e58f7af1636d54db8b9b2ac532bc7f61ec183d Mon Sep 17 00:00:00 2001 From: llifezou Date: Thu, 1 Jul 2021 10:53:42 +0800 Subject: [PATCH 11/21] fix getTicket: sector precommitted but expired case --- extern/storage-sealing/states_sealing.go | 37 ++++++++++++++---------- 1 file changed, 22 insertions(+), 15 deletions(-) diff --git a/extern/storage-sealing/states_sealing.go b/extern/storage-sealing/states_sealing.go index b6df0f79d..e8e8c7601 100644 --- a/extern/storage-sealing/states_sealing.go +++ b/extern/storage-sealing/states_sealing.go @@ -109,22 +109,30 @@ func checkProveCommitExpired(preCommitEpoch, msd abi.ChainEpoch, currEpoch abi.C return currEpoch > preCommitEpoch+msd } -func (m *Sealing) getTicket(ctx statemachine.Context, sector SectorInfo) (abi.SealRandomness, abi.ChainEpoch, error) { +func (m *Sealing) getTicket(ctx statemachine.Context, sector SectorInfo) (abi.SealRandomness, abi.ChainEpoch, bool, error) { tok, epoch, err := m.api.ChainHead(ctx.Context()) if err != nil { - log.Errorf("handlePreCommit1: api error, not proceeding: %+v", err) - return nil, 0, nil + log.Errorf("getTicket: api error, not proceeding: %+v", err) + return nil, 0, false, nil + } + + // the reason why the StateMinerSectorAllocated function is placed here, if it is outside, + // if the MarshalCBOR function and StateSectorPreCommitInfo function return err, it will be executed + allocated, aerr := m.api.StateMinerSectorAllocated(ctx.Context(), m.maddr, sector.SectorNumber, nil) + if aerr != nil { + log.Errorf("getTicket: api error, checking if sector is allocated: %+v", aerr) + return nil, 0, false, nil } ticketEpoch := epoch - policy.SealRandomnessLookback buf := new(bytes.Buffer) if err := m.maddr.MarshalCBOR(buf); err != nil { - return nil, 0, err + return nil, 0, allocated, err } pci, err := m.api.StateSectorPreCommitInfo(ctx.Context(), m.maddr, sector.SectorNumber, tok) if err != nil { - return nil, 0, xerrors.Errorf("getting precommit info: %w", err) + return nil, 0, allocated, xerrors.Errorf("getting precommit info: %w", err) } if pci != nil { @@ -132,32 +140,31 @@ func (m *Sealing) getTicket(ctx statemachine.Context, sector SectorInfo) (abi.Se nv, err := m.api.StateNetworkVersion(ctx.Context(), tok) if err != nil { - return nil, 0, xerrors.Errorf("getTicket: StateNetworkVersion: api error, not proceeding: %+v", err) + return nil, 0, allocated, xerrors.Errorf("getTicket: StateNetworkVersion: api error, not proceeding: %+v", err) } msd := policy.GetMaxProveCommitDuration(actors.VersionForNetwork(nv), sector.SectorType) if checkProveCommitExpired(pci.PreCommitEpoch, msd, epoch) { - return nil, 0, xerrors.Errorf("ticket expired for precommitted sector") + return nil, 0, allocated, xerrors.Errorf("ticket expired for precommitted sector") } } + if allocated { // allocated is true, sector precommitted but expired, will SectorCommitFailed or SectorRemove + return nil, 0, allocated, xerrors.Errorf("Sector %s precommitted but expired", sector.SectorNumber) + } + rand, err := m.api.ChainGetRandomnessFromTickets(ctx.Context(), tok, crypto.DomainSeparationTag_SealRandomness, ticketEpoch, buf.Bytes()) if err != nil { - return nil, 0, err + return nil, 0, allocated, err } - return abi.SealRandomness(rand), ticketEpoch, nil + return abi.SealRandomness(rand), ticketEpoch, allocated, nil } func (m *Sealing) handleGetTicket(ctx statemachine.Context, sector SectorInfo) error { - ticketValue, ticketEpoch, err := m.getTicket(ctx, sector) + ticketValue, ticketEpoch, allocated, err := m.getTicket(ctx, sector) if err != nil { - allocated, aerr := m.api.StateMinerSectorAllocated(ctx.Context(), m.maddr, sector.SectorNumber, nil) - if aerr != nil { - log.Errorf("error checking if sector is allocated: %+v", aerr) - } - if allocated { if sector.CommitMessage != nil { // Some recovery paths with unfortunate timing lead here From 88bb9f422ec396ffba47f29d207a094de6b75f2f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 1 Jul 2021 12:15:58 +0200 Subject: [PATCH 12/21] commit batch: Initialize the FailedSectors map --- extern/storage-sealing/commit_batch.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/extern/storage-sealing/commit_batch.go b/extern/storage-sealing/commit_batch.go index 4c04e4f90..bc924fdc6 100644 --- a/extern/storage-sealing/commit_batch.go +++ b/extern/storage-sealing/commit_batch.go @@ -242,7 +242,9 @@ func (b *CommitBatcher) processBatch(cfg sealiface.Config) ([]sealiface.CommitBa total := len(b.todo) - var res sealiface.CommitBatchRes + res := sealiface.CommitBatchRes{ + FailedSectors: map[abi.SectorNumber]string{}, + } params := miner5.ProveCommitAggregateParams{ SectorNumbers: bitfield.New(), @@ -356,7 +358,8 @@ func (b *CommitBatcher) processIndividually() ([]sealiface.CommitBatchRes, error for sn, info := range b.todo { r := sealiface.CommitBatchRes{ - Sectors: []abi.SectorNumber{sn}, + Sectors: []abi.SectorNumber{sn}, + FailedSectors: map[abi.SectorNumber]string{}, } mcid, err := b.processSingle(mi, sn, info, tok) From c094aa82ec7e773f225bc234b2869f4184253eb0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 1 Jul 2021 13:33:54 +0200 Subject: [PATCH 13/21] commit batch: AggregateAboveBaseFee config --- chain/types/fil.go | 5 ++ extern/storage-sealing/commit_batch.go | 20 +++++- extern/storage-sealing/commit_batch_test.go | 72 +++++++++++++++++---- extern/storage-sealing/sealiface/config.go | 8 ++- node/config/def.go | 6 ++ node/modules/storageminer.go | 22 ++++--- 6 files changed, 108 insertions(+), 25 deletions(-) diff --git a/chain/types/fil.go b/chain/types/fil.go index 7438410c8..21125e6d6 100644 --- a/chain/types/fil.go +++ b/chain/types/fil.go @@ -23,6 +23,11 @@ func (f FIL) Unitless() string { return strings.TrimRight(strings.TrimRight(r.FloatString(18), "0"), ".") } +var AttoFil = NewInt(1) +var FemtoFil = BigMul(AttoFil, NewInt(1000)) +var PicoFil = BigMul(FemtoFil, NewInt(1000)) +var NanoFil = BigMul(PicoFil, NewInt(1000)) + var unitPrefixes = []string{"a", "f", "p", "n", "μ", "m"} func (f FIL) Short() string { diff --git a/extern/storage-sealing/commit_batch.go b/extern/storage-sealing/commit_batch.go index bc924fdc6..63bd3c7db 100644 --- a/extern/storage-sealing/commit_batch.go +++ b/extern/storage-sealing/commit_batch.go @@ -206,7 +206,25 @@ func (b *CommitBatcher) maybeStartBatch(notif bool) ([]sealiface.CommitBatchRes, var res []sealiface.CommitBatchRes - if total < cfg.MinCommitBatch || total < miner5.MinAggregatedSectors { + individual := (total < cfg.MinCommitBatch) || (total < miner5.MinAggregatedSectors) + + if !individual && !cfg.AggregateAboveBaseFee.Equals(big.Zero()) { + tok, _, err := b.api.ChainHead(b.mctx) + if err != nil { + return nil, err + } + + bf, err := b.api.ChainBaseFee(b.mctx, tok) + if err != nil { + return nil, xerrors.Errorf("couldn't get base fee: %w", err) + } + + if bf.LessThan(cfg.AggregateAboveBaseFee) { + individual = true + } + } + + if individual { res, err = b.processIndividually() } else { res, err = b.processBatch(cfg) diff --git a/extern/storage-sealing/commit_batch_test.go b/extern/storage-sealing/commit_batch_test.go index ad2bc8f6f..d7cf4a9a4 100644 --- a/extern/storage-sealing/commit_batch_test.go +++ b/extern/storage-sealing/commit_batch_test.go @@ -20,6 +20,7 @@ import ( "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/chain/actors/builtin/miner" + "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper" sealing "github.com/filecoin-project/lotus/extern/storage-sealing" "github.com/filecoin-project/lotus/extern/storage-sealing/mocks" @@ -58,6 +59,8 @@ func TestCommitBatcher(t *testing.T) { CommitBatchWait: 24 * time.Hour, CommitBatchSlack: 1 * time.Hour, + AggregateAboveBaseFee: types.BigMul(types.PicoFil, types.NewInt(150)), // 0.15 nFIL + TerminateBatchMin: 1, TerminateBatchMax: 100, TerminateBatchWait: 5 * time.Minute, @@ -143,7 +146,7 @@ func TestCommitBatcher(t *testing.T) { } } - expectSend := func(expect []abi.SectorNumber) action { + expectSend := func(expect []abi.SectorNumber, aboveBalancer bool) action { return func(t *testing.T, s *mocks.MockCommitBatcherApi, pcb *sealing.CommitBatcher) promise { s.EXPECT().StateMinerInfo(gomock.Any(), gomock.Any(), gomock.Any()).Return(miner.MinerInfo{Owner: t0123, Worker: t0123}, nil) @@ -153,6 +156,22 @@ func TestCommitBatcher(t *testing.T) { batch = true ti = 1 } + + basefee := types.PicoFil + if aboveBalancer { + basefee = types.NanoFil + } + + if batch { + s.EXPECT().ChainHead(gomock.Any()).Return(nil, abi.ChainEpoch(1), nil) + s.EXPECT().ChainBaseFee(gomock.Any(), gomock.Any()).Return(basefee, nil) + } + + if !aboveBalancer { + batch = false + ti = len(expect) + } + s.EXPECT().ChainHead(gomock.Any()).Return(nil, abi.ChainEpoch(1), nil) s.EXPECT().StateSectorPreCommitInfo(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(&miner.SectorPreCommitOnChainInfo{ PreCommitDeposit: big.Zero(), @@ -160,7 +179,7 @@ func TestCommitBatcher(t *testing.T) { s.EXPECT().StateMinerInitialPledgeCollateral(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(big.Zero(), nil).Times(len(expect)) if batch { s.EXPECT().StateNetworkVersion(gomock.Any(), gomock.Any()).Return(network.Version13, nil) - s.EXPECT().ChainBaseFee(gomock.Any(), gomock.Any()).Return(big.NewInt(2000), nil) + s.EXPECT().ChainBaseFee(gomock.Any(), gomock.Any()).Return(basefee, nil) } s.EXPECT().SendMsg(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), funMatcher(func(i interface{}) bool { @@ -183,11 +202,11 @@ func TestCommitBatcher(t *testing.T) { } } - flush := func(expect []abi.SectorNumber) action { + flush := func(expect []abi.SectorNumber, aboveBalancer bool) action { return func(t *testing.T, s *mocks.MockCommitBatcherApi, pcb *sealing.CommitBatcher) promise { - _ = expectSend(expect)(t, s, pcb) + _ = expectSend(expect, aboveBalancer)(t, s, pcb) - batch := len(expect) >= minBatch + batch := len(expect) >= minBatch && aboveBalancer r, err := pcb.Flush(ctx) require.NoError(t, err) @@ -227,30 +246,57 @@ func TestCommitBatcher(t *testing.T) { tcs := map[string]struct { actions []action }{ - "addSingle": { + "addSingle-aboveBalancer": { actions: []action{ addSector(0), waitPending(1), - flush([]abi.SectorNumber{0}), + flush([]abi.SectorNumber{0}, true), }, }, - "addTwo": { + "addTwo-aboveBalancer": { actions: []action{ addSectors(getSectors(2)), waitPending(2), - flush(getSectors(2)), + flush(getSectors(2), true), }, }, - "addAte": { + "addAte-aboveBalancer": { actions: []action{ addSectors(getSectors(8)), waitPending(8), - flush(getSectors(8)), + flush(getSectors(8), true), }, }, - "addMax": { + "addMax-aboveBalancer": { actions: []action{ - expectSend(getSectors(maxBatch)), + expectSend(getSectors(maxBatch), true), + addSectors(getSectors(maxBatch)), + }, + }, + "addSingle-belowBalancer": { + actions: []action{ + addSector(0), + waitPending(1), + flush([]abi.SectorNumber{0}, false), + }, + }, + "addTwo-belowBalancer": { + actions: []action{ + addSectors(getSectors(2)), + waitPending(2), + flush(getSectors(2), false), + }, + }, + "addAte-belowBalancer": { + actions: []action{ + addSectors(getSectors(8)), + waitPending(8), + flush(getSectors(8), false), + }, + }, + "addMax-belowBalancer": { + actions: []action{ + expectSend(getSectors(maxBatch), false), addSectors(getSectors(maxBatch)), }, }, diff --git a/extern/storage-sealing/sealiface/config.go b/extern/storage-sealing/sealiface/config.go index b237072d3..0410b92c0 100644 --- a/extern/storage-sealing/sealiface/config.go +++ b/extern/storage-sealing/sealiface/config.go @@ -1,6 +1,10 @@ package sealiface -import "time" +import ( + "time" + + "github.com/filecoin-project/go-state-types/abi" +) // this has to be in a separate package to not make lotus API depend on filecoin-ffi @@ -31,6 +35,8 @@ type Config struct { CommitBatchWait time.Duration CommitBatchSlack time.Duration + AggregateAboveBaseFee abi.TokenAmount + TerminateBatchMax uint64 TerminateBatchMin uint64 TerminateBatchWait time.Duration diff --git a/node/config/def.go b/node/config/def.go index b331b1f49..240fadbd9 100644 --- a/node/config/def.go +++ b/node/config/def.go @@ -144,6 +144,10 @@ type SealingConfig struct { // time buffer for forceful batch submission before sectors/deals in batch would start expiring CommitBatchSlack Duration + // network BaseFee below which to stop doing commit aggregation, instead + // submitting proofs to the chain individually + AggregateAboveBaseFee types.FIL + TerminateBatchMax uint64 TerminateBatchMin uint64 TerminateBatchWait Duration @@ -330,6 +334,8 @@ func DefaultStorageMiner() *StorageMiner { CommitBatchWait: Duration(24 * time.Hour), // this can be up to 30 days CommitBatchSlack: Duration(1 * time.Hour), // time buffer for forceful batch submission before sectors/deals in batch would start expiring, higher value will lower the chances for message fail due to expiration + AggregateAboveBaseFee: types.FIL(types.BigMul(types.PicoFil, types.NewInt(150))), // 0.15 nFIL + TerminateBatchMin: 1, TerminateBatchMax: 100, TerminateBatchWait: Duration(5 * time.Minute), diff --git a/node/modules/storageminer.go b/node/modules/storageminer.go index 09b1e2dfd..3d1d08071 100644 --- a/node/modules/storageminer.go +++ b/node/modules/storageminer.go @@ -866,11 +866,12 @@ func NewSetSealConfigFunc(r repo.LockedRepo) (dtypes.SetSealingConfigFunc, error PreCommitBatchWait: config.Duration(cfg.PreCommitBatchWait), PreCommitBatchSlack: config.Duration(cfg.PreCommitBatchSlack), - AggregateCommits: cfg.AggregateCommits, - MinCommitBatch: cfg.MinCommitBatch, - MaxCommitBatch: cfg.MaxCommitBatch, - CommitBatchWait: config.Duration(cfg.CommitBatchWait), - CommitBatchSlack: config.Duration(cfg.CommitBatchSlack), + AggregateCommits: cfg.AggregateCommits, + MinCommitBatch: cfg.MinCommitBatch, + MaxCommitBatch: cfg.MaxCommitBatch, + CommitBatchWait: config.Duration(cfg.CommitBatchWait), + CommitBatchSlack: config.Duration(cfg.CommitBatchSlack), + AggregateAboveBaseFee: types.FIL(cfg.AggregateAboveBaseFee), TerminateBatchMax: cfg.TerminateBatchMax, TerminateBatchMin: cfg.TerminateBatchMin, @@ -897,11 +898,12 @@ func NewGetSealConfigFunc(r repo.LockedRepo) (dtypes.GetSealingConfigFunc, error PreCommitBatchWait: time.Duration(cfg.Sealing.PreCommitBatchWait), PreCommitBatchSlack: time.Duration(cfg.Sealing.PreCommitBatchSlack), - AggregateCommits: cfg.Sealing.AggregateCommits, - MinCommitBatch: cfg.Sealing.MinCommitBatch, - MaxCommitBatch: cfg.Sealing.MaxCommitBatch, - CommitBatchWait: time.Duration(cfg.Sealing.CommitBatchWait), - CommitBatchSlack: time.Duration(cfg.Sealing.CommitBatchSlack), + AggregateCommits: cfg.Sealing.AggregateCommits, + MinCommitBatch: cfg.Sealing.MinCommitBatch, + MaxCommitBatch: cfg.Sealing.MaxCommitBatch, + CommitBatchWait: time.Duration(cfg.Sealing.CommitBatchWait), + CommitBatchSlack: time.Duration(cfg.Sealing.CommitBatchSlack), + AggregateAboveBaseFee: types.BigInt(cfg.Sealing.AggregateAboveBaseFee), TerminateBatchMax: cfg.Sealing.TerminateBatchMax, TerminateBatchMin: cfg.Sealing.TerminateBatchMin, From 7c2c8b2a95f8f8b85f69c79ad41a6c4135fda09f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 1 Jul 2021 13:51:11 +0200 Subject: [PATCH 14/21] commit batch: Regression test nil FailedSectors map --- extern/storage-sealing/commit_batch_test.go | 65 ++++++++++++++++----- 1 file changed, 52 insertions(+), 13 deletions(-) diff --git a/extern/storage-sealing/commit_batch_test.go b/extern/storage-sealing/commit_batch_test.go index d7cf4a9a4..aea6d455e 100644 --- a/extern/storage-sealing/commit_batch_test.go +++ b/extern/storage-sealing/commit_batch_test.go @@ -146,7 +146,7 @@ func TestCommitBatcher(t *testing.T) { } } - expectSend := func(expect []abi.SectorNumber, aboveBalancer bool) action { + expectSend := func(expect []abi.SectorNumber, aboveBalancer, failOnePCI bool) action { return func(t *testing.T, s *mocks.MockCommitBatcherApi, pcb *sealing.CommitBatcher) promise { s.EXPECT().StateMinerInfo(gomock.Any(), gomock.Any(), gomock.Any()).Return(miner.MinerInfo{Owner: t0123, Worker: t0123}, nil) @@ -173,10 +173,20 @@ func TestCommitBatcher(t *testing.T) { } s.EXPECT().ChainHead(gomock.Any()).Return(nil, abi.ChainEpoch(1), nil) + + pciC := len(expect) + if failOnePCI { + s.EXPECT().StateSectorPreCommitInfo(gomock.Any(), gomock.Any(), abi.SectorNumber(1), gomock.Any()).Return(nil, nil).Times(1) // not found + pciC = len(expect) - 1 + if !batch { + ti-- + } + } s.EXPECT().StateSectorPreCommitInfo(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(&miner.SectorPreCommitOnChainInfo{ PreCommitDeposit: big.Zero(), - }, nil).Times(len(expect)) - s.EXPECT().StateMinerInitialPledgeCollateral(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(big.Zero(), nil).Times(len(expect)) + }, nil).Times(pciC) + s.EXPECT().StateMinerInitialPledgeCollateral(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(big.Zero(), nil).Times(pciC) + if batch { s.EXPECT().StateNetworkVersion(gomock.Any(), gomock.Any()).Return(network.Version13, nil) s.EXPECT().ChainBaseFee(gomock.Any(), gomock.Any()).Return(basefee, nil) @@ -202,9 +212,9 @@ func TestCommitBatcher(t *testing.T) { } } - flush := func(expect []abi.SectorNumber, aboveBalancer bool) action { + flush := func(expect []abi.SectorNumber, aboveBalancer, failOnePCI bool) action { return func(t *testing.T, s *mocks.MockCommitBatcherApi, pcb *sealing.CommitBatcher) promise { - _ = expectSend(expect, aboveBalancer)(t, s, pcb) + _ = expectSend(expect, aboveBalancer, failOnePCI)(t, s, pcb) batch := len(expect) >= minBatch && aboveBalancer @@ -217,6 +227,13 @@ func TestCommitBatcher(t *testing.T) { return r[0].Sectors[i] < r[0].Sectors[j] }) require.Equal(t, expect, r[0].Sectors) + if !failOnePCI { + require.Len(t, r[0].FailedSectors, 0) + } else { + require.Len(t, r[0].FailedSectors, 1) + _, found := r[0].FailedSectors[1] + require.True(t, found) + } } else { require.Len(t, r, len(expect)) for _, res := range r { @@ -228,6 +245,13 @@ func TestCommitBatcher(t *testing.T) { }) for i, res := range r { require.Equal(t, abi.SectorNumber(i), res.Sectors[0]) + if failOnePCI && res.Sectors[0] == 1 { + require.Len(t, res.FailedSectors, 1) + _, found := res.FailedSectors[1] + require.True(t, found) + } else { + require.Empty(t, res.FailedSectors) + } } } @@ -250,26 +274,26 @@ func TestCommitBatcher(t *testing.T) { actions: []action{ addSector(0), waitPending(1), - flush([]abi.SectorNumber{0}, true), + flush([]abi.SectorNumber{0}, true, false), }, }, "addTwo-aboveBalancer": { actions: []action{ addSectors(getSectors(2)), waitPending(2), - flush(getSectors(2), true), + flush(getSectors(2), true, false), }, }, "addAte-aboveBalancer": { actions: []action{ addSectors(getSectors(8)), waitPending(8), - flush(getSectors(8), true), + flush(getSectors(8), true, false), }, }, "addMax-aboveBalancer": { actions: []action{ - expectSend(getSectors(maxBatch), true), + expectSend(getSectors(maxBatch), true, false), addSectors(getSectors(maxBatch)), }, }, @@ -277,29 +301,44 @@ func TestCommitBatcher(t *testing.T) { actions: []action{ addSector(0), waitPending(1), - flush([]abi.SectorNumber{0}, false), + flush([]abi.SectorNumber{0}, false, false), }, }, "addTwo-belowBalancer": { actions: []action{ addSectors(getSectors(2)), waitPending(2), - flush(getSectors(2), false), + flush(getSectors(2), false, false), }, }, "addAte-belowBalancer": { actions: []action{ addSectors(getSectors(8)), waitPending(8), - flush(getSectors(8), false), + flush(getSectors(8), false, false), }, }, "addMax-belowBalancer": { actions: []action{ - expectSend(getSectors(maxBatch), false), + expectSend(getSectors(maxBatch), false, false), addSectors(getSectors(maxBatch)), }, }, + + "addAte-aboveBalancer-failOne": { + actions: []action{ + addSectors(getSectors(8)), + waitPending(8), + flush(getSectors(8), true, true), + }, + }, + "addAte-belowBalancer-failOne": { + actions: []action{ + addSectors(getSectors(8)), + waitPending(8), + flush(getSectors(8), false, true), + }, + }, } for name, tc := range tcs { From fe3ad4fd60515ad68243b83a564182fd40d0975c Mon Sep 17 00:00:00 2001 From: llifezou <46102475+llifezou@users.noreply.github.com> Date: Fri, 2 Jul 2021 11:38:04 +0800 Subject: [PATCH 15/21] Update extern/storage-sealing/states_sealing.go fix log Co-authored-by: Aayush Rajasekaran --- extern/storage-sealing/states_sealing.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/extern/storage-sealing/states_sealing.go b/extern/storage-sealing/states_sealing.go index e8e8c7601..3273ff76e 100644 --- a/extern/storage-sealing/states_sealing.go +++ b/extern/storage-sealing/states_sealing.go @@ -151,7 +151,7 @@ func (m *Sealing) getTicket(ctx statemachine.Context, sector SectorInfo) (abi.Se } if allocated { // allocated is true, sector precommitted but expired, will SectorCommitFailed or SectorRemove - return nil, 0, allocated, xerrors.Errorf("Sector %s precommitted but expired", sector.SectorNumber) + return nil, 0, allocated, xerrors.Errorf("sector %s precommitted but expired", sector.SectorNumber) } rand, err := m.api.ChainGetRandomnessFromTickets(ctx.Context(), tok, crypto.DomainSeparationTag_SealRandomness, ticketEpoch, buf.Bytes()) From df53b97fe4d1cf19f1fcf07f5b1a8fc483161d1f Mon Sep 17 00:00:00 2001 From: llifezou <46102475+llifezou@users.noreply.github.com> Date: Fri, 2 Jul 2021 11:44:46 +0800 Subject: [PATCH 16/21] Update extern/storage-sealing/states_sealing.go MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit fix sector precommitted but expired judgment Co-authored-by: Łukasz Magiera --- extern/storage-sealing/states_sealing.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/extern/storage-sealing/states_sealing.go b/extern/storage-sealing/states_sealing.go index 3273ff76e..570b24884 100644 --- a/extern/storage-sealing/states_sealing.go +++ b/extern/storage-sealing/states_sealing.go @@ -150,7 +150,7 @@ func (m *Sealing) getTicket(ctx statemachine.Context, sector SectorInfo) (abi.Se } } - if allocated { // allocated is true, sector precommitted but expired, will SectorCommitFailed or SectorRemove + if pci == nil && allocated { // allocated is true, sector precommitted but expired, will SectorCommitFailed or SectorRemove return nil, 0, allocated, xerrors.Errorf("sector %s precommitted but expired", sector.SectorNumber) } From 71e1577a62b411dbce6c3e1a2504f3c3a132a789 Mon Sep 17 00:00:00 2001 From: zhoutian527 Date: Fri, 2 Jul 2021 15:53:21 +0800 Subject: [PATCH 17/21] Fix: precommit_batch method used the wrong cfg.PreCommitBatchWait --- extern/storage-sealing/precommit_batch.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/extern/storage-sealing/precommit_batch.go b/extern/storage-sealing/precommit_batch.go index 1b43bc6b9..8b132a2eb 100644 --- a/extern/storage-sealing/precommit_batch.go +++ b/extern/storage-sealing/precommit_batch.go @@ -88,7 +88,7 @@ func (b *PreCommitBatcher) run() { panic(err) } - timer := time.NewTimer(b.batchWait(cfg.CommitBatchWait, cfg.CommitBatchSlack)) + timer := time.NewTimer(b.batchWait(cfg.PreCommitBatchWait, cfg.PreCommitBatchSlack)) for { if forceRes != nil { forceRes <- lastRes @@ -122,7 +122,7 @@ func (b *PreCommitBatcher) run() { } } - timer.Reset(b.batchWait(cfg.CommitBatchWait, cfg.CommitBatchSlack)) + timer.Reset(b.batchWait(cfg.PreCommitBatchWait, cfg.PreCommitBatchSlack)) } } From 592b8c4f4ad00bb9ee9ebc2571d4cffb2fb4927a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Kripalani?= Date: Fri, 2 Jul 2021 17:04:13 +0100 Subject: [PATCH 18/21] add an incremental nonce itest. --- itests/nonce_test.go | 57 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 57 insertions(+) create mode 100644 itests/nonce_test.go diff --git a/itests/nonce_test.go b/itests/nonce_test.go new file mode 100644 index 000000000..b50fcbe26 --- /dev/null +++ b/itests/nonce_test.go @@ -0,0 +1,57 @@ +package itests + +import ( + "context" + "testing" + "time" + + "github.com/filecoin-project/go-state-types/big" + "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/itests/kit" + "github.com/stretchr/testify/require" +) + +func TestNonceIncremental(t *testing.T) { + ctx := context.Background() + + kit.QuietMiningLogs() + + client, _, ens := kit.EnsembleMinimal(t, kit.MockProofs()) + ens.InterconnectAll().BeginMining(10 * time.Millisecond) + + // create a new address where to send funds. + addr, err := client.WalletNew(ctx, types.KTBLS) + require.NoError(t, err) + + // get the existing balance from the default wallet to then split it. + bal, err := client.WalletBalance(ctx, client.DefaultKey.Address) + require.NoError(t, err) + + const iterations = 100 + + // we'll send half our balance (saving the other half for gas), + // in `iterations` increments. + toSend := big.Div(bal, big.NewInt(2)) + each := big.Div(toSend, big.NewInt(iterations)) + + var sms []*types.SignedMessage + for i := 0; i < iterations; i++ { + msg := &types.Message{ + From: client.DefaultKey.Address, + To: addr, + Value: each, + } + + sm, err := client.MpoolPushMessage(ctx, msg, nil) + require.NoError(t, err) + require.EqualValues(t, i, sm.Message.Nonce) + + sms = append(sms, sm) + } + + for _, sm := range sms { + _, err := client.StateWaitMsg(ctx, sm.Cid(), 3, api.LookbackNoLimit, true) + require.NoError(t, err) + } +} From ca402340318baa1268d126280596035c5f2a57a4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Kripalani?= Date: Fri, 2 Jul 2021 17:04:57 +0100 Subject: [PATCH 19/21] update circleci config. --- .circleci/config.yml | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/.circleci/config.yml b/.circleci/config.yml index 9f8f28b86..dd236e7fb 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -811,6 +811,11 @@ workflows: suite: itest-multisig target: "./itests/multisig_test.go" + - test: + name: test-itest-nonce + suite: itest-nonce + target: "./itests/nonce_test.go" + - test: name: test-itest-paych_api suite: itest-paych_api From 30efcf21f07986d8db5e01f6cad74e9cb20b9bd0 Mon Sep 17 00:00:00 2001 From: Peter Rabbitson Date: Fri, 2 Jul 2021 18:02:56 +0200 Subject: [PATCH 20/21] Fix tiny error in check-client-datacap --- cli/filplus.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cli/filplus.go b/cli/filplus.go index 53dc5092b..007071ea2 100644 --- a/cli/filplus.go +++ b/cli/filplus.go @@ -210,7 +210,7 @@ var filplusCheckClientCmd = &cli.Command{ return err } if dcap == nil { - return xerrors.Errorf("client %s is not a verified client", err) + return xerrors.Errorf("client %s is not a verified client", caddr) } fmt.Println(*dcap) From 8a94ab676e5dba50849aef017aef84ff27f1c665 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 1 Jul 2021 21:07:53 +0200 Subject: [PATCH 21/21] storage: Fix FinalizeSector with sectors in stoage paths --- .circleci/config.yml | 5 +++ cmd/lotus-seal-worker/storage.go | 2 +- cmd/lotus-storage-miner/storage.go | 2 +- extern/sector-storage/manager.go | 17 ++++++- itests/deals_test.go | 2 +- itests/kit/node_miner.go | 47 +++++++++++++++++++- itests/sector_finalize_early_test.go | 66 ++++++++++++++++++++++++++++ node/modules/storageminer.go | 52 ++++++++++++---------- 8 files changed, 164 insertions(+), 29 deletions(-) create mode 100644 itests/sector_finalize_early_test.go diff --git a/.circleci/config.yml b/.circleci/config.yml index 9f8f28b86..d151026f0 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -826,6 +826,11 @@ workflows: suite: itest-sdr_upgrade target: "./itests/sdr_upgrade_test.go" + - test: + name: test-itest-sector_finalize_early + suite: itest-sector_finalize_early + target: "./itests/sector_finalize_early_test.go" + - test: name: test-itest-sector_pledge suite: itest-sector_pledge diff --git a/cmd/lotus-seal-worker/storage.go b/cmd/lotus-seal-worker/storage.go index afb566166..be662a6c3 100644 --- a/cmd/lotus-seal-worker/storage.go +++ b/cmd/lotus-seal-worker/storage.go @@ -101,7 +101,7 @@ var storageAttachCmd = &cli.Command{ } if !(cfg.CanStore || cfg.CanSeal) { - return xerrors.Errorf("must specify at least one of --store of --seal") + return xerrors.Errorf("must specify at least one of --store or --seal") } b, err := json.MarshalIndent(cfg, "", " ") diff --git a/cmd/lotus-storage-miner/storage.go b/cmd/lotus-storage-miner/storage.go index b4ab26ad3..f2068ea86 100644 --- a/cmd/lotus-storage-miner/storage.go +++ b/cmd/lotus-storage-miner/storage.go @@ -145,7 +145,7 @@ over time } if !(cfg.CanStore || cfg.CanSeal) { - return xerrors.Errorf("must specify at least one of --store of --seal") + return xerrors.Errorf("must specify at least one of --store or --seal") } b, err := json.MarshalIndent(cfg, "", " ") diff --git a/extern/sector-storage/manager.go b/extern/sector-storage/manager.go index 136c00252..79eca74d6 100644 --- a/extern/sector-storage/manager.go +++ b/extern/sector-storage/manager.go @@ -528,10 +528,25 @@ func (m *Manager) FinalizeSector(ctx context.Context, sector storage.SectorRef, } } + pathType := storiface.PathStorage + { + sealedStores, err := m.index.StorageFindSector(ctx, sector.ID, storiface.FTSealed, 0, false) + if err != nil { + return xerrors.Errorf("finding sealed sector: %w", err) + } + + for _, store := range sealedStores { + if store.CanSeal { + pathType = storiface.PathSealing + break + } + } + } + selector := newExistingSelector(m.index, sector.ID, storiface.FTCache|storiface.FTSealed, false) err := m.sched.Schedule(ctx, sector, sealtasks.TTFinalize, selector, - m.schedFetch(sector, storiface.FTCache|storiface.FTSealed|unsealed, storiface.PathSealing, storiface.AcquireMove), + m.schedFetch(sector, storiface.FTCache|storiface.FTSealed|unsealed, pathType, storiface.AcquireMove), func(ctx context.Context, w Worker) error { _, err := m.waitSimpleCall(ctx)(w.FinalizeSector(ctx, sector, keepUnsealed)) return err diff --git a/itests/deals_test.go b/itests/deals_test.go index f8389bbd6..8aff414e0 100644 --- a/itests/deals_test.go +++ b/itests/deals_test.go @@ -14,7 +14,7 @@ func TestDealsWithSealingAndRPC(t *testing.T) { kit.QuietMiningLogs() - var blockTime = 1 * time.Second + var blockTime = 50 * time.Millisecond client, miner, ens := kit.EnsembleMinimal(t, kit.ThroughRPC()) // no mock proofs. ens.InterconnectAll().BeginMining(blockTime) diff --git a/itests/kit/node_miner.go b/itests/kit/node_miner.go index d3f0d2e3c..eea2bc0c1 100644 --- a/itests/kit/node_miner.go +++ b/itests/kit/node_miner.go @@ -2,22 +2,29 @@ package kit import ( "context" + "encoding/json" "fmt" + "io/ioutil" + "os" + "path/filepath" "strings" "testing" "time" + "github.com/google/uuid" + "github.com/stretchr/testify/require" + "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain/wallet" + "github.com/filecoin-project/lotus/extern/sector-storage/stores" sealing "github.com/filecoin-project/lotus/extern/storage-sealing" "github.com/filecoin-project/lotus/miner" libp2pcrypto "github.com/libp2p/go-libp2p-core/crypto" "github.com/libp2p/go-libp2p-core/peer" "github.com/multiformats/go-multiaddr" - "github.com/stretchr/testify/require" ) // TestMiner represents a miner enrolled in an Ensemble. @@ -119,3 +126,41 @@ func (tm *TestMiner) FlushSealingBatches(ctx context.Context) { fmt.Printf("COMMIT BATCH: %+v\n", cb) } } + +const metaFile = "sectorstore.json" + +func (tm *TestMiner) AddStorage(ctx context.Context, t *testing.T, weight uint64, seal, store bool) { + p, err := ioutil.TempDir("", "lotus-testsectors-") + require.NoError(t, err) + + if err := os.MkdirAll(p, 0755); err != nil { + if !os.IsExist(err) { + require.NoError(t, err) + } + } + + _, err = os.Stat(filepath.Join(p, metaFile)) + if !os.IsNotExist(err) { + require.NoError(t, err) + } + + cfg := &stores.LocalStorageMeta{ + ID: stores.ID(uuid.New().String()), + Weight: weight, + CanSeal: seal, + CanStore: store, + } + + if !(cfg.CanStore || cfg.CanSeal) { + t.Fatal("must specify at least one of CanStore or cfg.CanSeal") + } + + b, err := json.MarshalIndent(cfg, "", " ") + require.NoError(t, err) + + err = ioutil.WriteFile(filepath.Join(p, metaFile), b, 0644) + require.NoError(t, err) + + err = tm.StorageAddLocal(ctx, p) + require.NoError(t, err) +} diff --git a/itests/sector_finalize_early_test.go b/itests/sector_finalize_early_test.go new file mode 100644 index 000000000..3eb980f9e --- /dev/null +++ b/itests/sector_finalize_early_test.go @@ -0,0 +1,66 @@ +package itests + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/filecoin-project/lotus/extern/storage-sealing/sealiface" + "github.com/filecoin-project/lotus/itests/kit" + "github.com/filecoin-project/lotus/node" + "github.com/filecoin-project/lotus/node/config" + "github.com/filecoin-project/lotus/node/modules" + "github.com/filecoin-project/lotus/node/modules/dtypes" + "github.com/filecoin-project/lotus/node/repo" +) + +func TestDealsWithFinalizeEarly(t *testing.T) { + if testing.Short() { + t.Skip("skipping test in short mode") + } + + kit.QuietMiningLogs() + + var blockTime = 50 * time.Millisecond + + client, miner, ens := kit.EnsembleMinimal(t, kit.ThroughRPC(), kit.ConstructorOpts( + node.ApplyIf(node.IsType(repo.StorageMiner), node.Override(new(dtypes.GetSealingConfigFunc), func() (dtypes.GetSealingConfigFunc, error) { + return func() (sealiface.Config, error) { + cf := config.DefaultStorageMiner() + cf.Sealing.FinalizeEarly = true + return modules.ToSealingConfig(cf), nil + }, nil + })))) // no mock proofs. + ens.InterconnectAll().BeginMining(blockTime) + dh := kit.NewDealHarness(t, client, miner) + + ctx := context.Background() + + miner.AddStorage(ctx, t, 1000000000, true, false) + miner.AddStorage(ctx, t, 1000000000, false, true) + + sl, err := miner.StorageList(ctx) + require.NoError(t, err) + for si, d := range sl { + i, err := miner.StorageInfo(ctx, si) + require.NoError(t, err) + + fmt.Printf("stor d:%d %+v\n", len(d), i) + } + + t.Run("single", func(t *testing.T) { + dh.RunConcurrentDeals(kit.RunConcurrentDealsOpts{N: 1}) + }) + + sl, err = miner.StorageList(ctx) + require.NoError(t, err) + for si, d := range sl { + i, err := miner.StorageInfo(ctx, si) + require.NoError(t, err) + + fmt.Printf("stor d:%d %+v\n", len(d), i) + } +} diff --git a/node/modules/storageminer.go b/node/modules/storageminer.go index 3d1d08071..8508850d3 100644 --- a/node/modules/storageminer.go +++ b/node/modules/storageminer.go @@ -882,33 +882,37 @@ func NewSetSealConfigFunc(r repo.LockedRepo) (dtypes.SetSealingConfigFunc, error }, nil } +func ToSealingConfig(cfg *config.StorageMiner) sealiface.Config { + return sealiface.Config{ + MaxWaitDealsSectors: cfg.Sealing.MaxWaitDealsSectors, + MaxSealingSectors: cfg.Sealing.MaxSealingSectors, + MaxSealingSectorsForDeals: cfg.Sealing.MaxSealingSectorsForDeals, + WaitDealsDelay: time.Duration(cfg.Sealing.WaitDealsDelay), + AlwaysKeepUnsealedCopy: cfg.Sealing.AlwaysKeepUnsealedCopy, + FinalizeEarly: cfg.Sealing.FinalizeEarly, + + BatchPreCommits: cfg.Sealing.BatchPreCommits, + MaxPreCommitBatch: cfg.Sealing.MaxPreCommitBatch, + PreCommitBatchWait: time.Duration(cfg.Sealing.PreCommitBatchWait), + PreCommitBatchSlack: time.Duration(cfg.Sealing.PreCommitBatchSlack), + + AggregateCommits: cfg.Sealing.AggregateCommits, + MinCommitBatch: cfg.Sealing.MinCommitBatch, + MaxCommitBatch: cfg.Sealing.MaxCommitBatch, + CommitBatchWait: time.Duration(cfg.Sealing.CommitBatchWait), + CommitBatchSlack: time.Duration(cfg.Sealing.CommitBatchSlack), + AggregateAboveBaseFee: types.BigInt(cfg.Sealing.AggregateAboveBaseFee), + + TerminateBatchMax: cfg.Sealing.TerminateBatchMax, + TerminateBatchMin: cfg.Sealing.TerminateBatchMin, + TerminateBatchWait: time.Duration(cfg.Sealing.TerminateBatchWait), + } +} + func NewGetSealConfigFunc(r repo.LockedRepo) (dtypes.GetSealingConfigFunc, error) { return func() (out sealiface.Config, err error) { err = readCfg(r, func(cfg *config.StorageMiner) { - out = sealiface.Config{ - MaxWaitDealsSectors: cfg.Sealing.MaxWaitDealsSectors, - MaxSealingSectors: cfg.Sealing.MaxSealingSectors, - MaxSealingSectorsForDeals: cfg.Sealing.MaxSealingSectorsForDeals, - WaitDealsDelay: time.Duration(cfg.Sealing.WaitDealsDelay), - AlwaysKeepUnsealedCopy: cfg.Sealing.AlwaysKeepUnsealedCopy, - FinalizeEarly: cfg.Sealing.FinalizeEarly, - - BatchPreCommits: cfg.Sealing.BatchPreCommits, - MaxPreCommitBatch: cfg.Sealing.MaxPreCommitBatch, - PreCommitBatchWait: time.Duration(cfg.Sealing.PreCommitBatchWait), - PreCommitBatchSlack: time.Duration(cfg.Sealing.PreCommitBatchSlack), - - AggregateCommits: cfg.Sealing.AggregateCommits, - MinCommitBatch: cfg.Sealing.MinCommitBatch, - MaxCommitBatch: cfg.Sealing.MaxCommitBatch, - CommitBatchWait: time.Duration(cfg.Sealing.CommitBatchWait), - CommitBatchSlack: time.Duration(cfg.Sealing.CommitBatchSlack), - AggregateAboveBaseFee: types.BigInt(cfg.Sealing.AggregateAboveBaseFee), - - TerminateBatchMax: cfg.Sealing.TerminateBatchMax, - TerminateBatchMin: cfg.Sealing.TerminateBatchMin, - TerminateBatchWait: time.Duration(cfg.Sealing.TerminateBatchWait), - } + out = ToSealingConfig(cfg) }) return }, nil