resolve conflicts with master

This commit is contained in:
Anton Evangelatov 2021-07-05 13:13:32 +02:00
commit 4be0a7a215
20 changed files with 477 additions and 114 deletions

View File

@ -811,6 +811,11 @@ workflows:
suite: itest-multisig suite: itest-multisig
target: "./itests/multisig_test.go" target: "./itests/multisig_test.go"
- test:
name: test-itest-nonce
suite: itest-nonce
target: "./itests/nonce_test.go"
- test: - test:
name: test-itest-paych_api name: test-itest-paych_api
suite: itest-paych_api suite: itest-paych_api
@ -826,6 +831,11 @@ workflows:
suite: itest-sdr_upgrade suite: itest-sdr_upgrade
target: "./itests/sdr_upgrade_test.go" target: "./itests/sdr_upgrade_test.go"
- test:
name: test-itest-sector_finalize_early
suite: itest-sector_finalize_early
target: "./itests/sector_finalize_early_test.go"
- test: - test:
name: test-itest-sector_pledge name: test-itest-sector_pledge
suite: itest-sector_pledge suite: itest-sector_pledge

View File

@ -23,6 +23,11 @@ func (f FIL) Unitless() string {
return strings.TrimRight(strings.TrimRight(r.FloatString(18), "0"), ".") return strings.TrimRight(strings.TrimRight(r.FloatString(18), "0"), ".")
} }
var AttoFil = NewInt(1)
var FemtoFil = BigMul(AttoFil, NewInt(1000))
var PicoFil = BigMul(FemtoFil, NewInt(1000))
var NanoFil = BigMul(PicoFil, NewInt(1000))
var unitPrefixes = []string{"a", "f", "p", "n", "μ", "m"} var unitPrefixes = []string{"a", "f", "p", "n", "μ", "m"}
func (f FIL) Short() string { func (f FIL) Short() string {

View File

@ -210,7 +210,7 @@ var filplusCheckClientCmd = &cli.Command{
return err return err
} }
if dcap == nil { if dcap == nil {
return xerrors.Errorf("client %s is not a verified client", err) return xerrors.Errorf("client %s is not a verified client", caddr)
} }
fmt.Println(*dcap) fmt.Println(*dcap)

View File

@ -101,7 +101,7 @@ var storageAttachCmd = &cli.Command{
} }
if !(cfg.CanStore || cfg.CanSeal) { if !(cfg.CanStore || cfg.CanSeal) {
return xerrors.Errorf("must specify at least one of --store of --seal") return xerrors.Errorf("must specify at least one of --store or --seal")
} }
b, err := json.MarshalIndent(cfg, "", " ") b, err := json.MarshalIndent(cfg, "", " ")

View File

@ -145,7 +145,7 @@ over time
} }
if !(cfg.CanStore || cfg.CanSeal) { if !(cfg.CanStore || cfg.CanSeal) {
return xerrors.Errorf("must specify at least one of --store of --seal") return xerrors.Errorf("must specify at least one of --store or --seal")
} }
b, err := json.MarshalIndent(cfg, "", " ") b, err := json.MarshalIndent(cfg, "", " ")

View File

@ -526,10 +526,25 @@ func (m *Manager) FinalizeSector(ctx context.Context, sector storage.SectorRef,
} }
} }
pathType := storiface.PathStorage
{
sealedStores, err := m.index.StorageFindSector(ctx, sector.ID, storiface.FTSealed, 0, false)
if err != nil {
return xerrors.Errorf("finding sealed sector: %w", err)
}
for _, store := range sealedStores {
if store.CanSeal {
pathType = storiface.PathSealing
break
}
}
}
selector := newExistingSelector(m.index, sector.ID, storiface.FTCache|storiface.FTSealed, false) selector := newExistingSelector(m.index, sector.ID, storiface.FTCache|storiface.FTSealed, false)
err := m.sched.Schedule(ctx, sector, sealtasks.TTFinalize, selector, err := m.sched.Schedule(ctx, sector, sealtasks.TTFinalize, selector,
m.schedFetch(sector, storiface.FTCache|storiface.FTSealed|unsealed, storiface.PathSealing, storiface.AcquireMove), m.schedFetch(sector, storiface.FTCache|storiface.FTSealed|unsealed, pathType, storiface.AcquireMove),
func(ctx context.Context, w Worker) error { func(ctx context.Context, w Worker) error {
_, err := m.waitSimpleCall(ctx)(w.FinalizeSector(ctx, sector, keepUnsealed)) _, err := m.waitSimpleCall(ctx)(w.FinalizeSector(ctx, sector, keepUnsealed))
return err return err

View File

@ -106,6 +106,7 @@ func (b *CommitBatcher) run() {
panic(err) panic(err)
} }
timer := time.NewTimer(b.batchWait(cfg.CommitBatchWait, cfg.CommitBatchSlack))
for { for {
if forceRes != nil { if forceRes != nil {
forceRes <- lastMsg forceRes <- lastMsg
@ -121,7 +122,7 @@ func (b *CommitBatcher) run() {
return return
case <-b.notify: case <-b.notify:
sendAboveMax = true sendAboveMax = true
case <-b.batchWait(cfg.CommitBatchWait, cfg.CommitBatchSlack): case <-timer.C:
// do nothing // do nothing
case fr := <-b.force: // user triggered case fr := <-b.force: // user triggered
forceRes = fr forceRes = fr
@ -132,17 +133,26 @@ func (b *CommitBatcher) run() {
if err != nil { if err != nil {
log.Warnw("CommitBatcher processBatch error", "error", err) log.Warnw("CommitBatcher processBatch error", "error", err)
} }
if !timer.Stop() {
select {
case <-timer.C:
default:
}
}
timer.Reset(b.batchWait(cfg.CommitBatchWait, cfg.CommitBatchSlack))
} }
} }
func (b *CommitBatcher) batchWait(maxWait, slack time.Duration) <-chan time.Time { func (b *CommitBatcher) batchWait(maxWait, slack time.Duration) time.Duration {
now := time.Now() now := time.Now()
b.lk.Lock() b.lk.Lock()
defer b.lk.Unlock() defer b.lk.Unlock()
if len(b.todo) == 0 { if len(b.todo) == 0 {
return nil return maxWait
} }
var cutoff time.Time var cutoff time.Time
@ -160,12 +170,12 @@ func (b *CommitBatcher) batchWait(maxWait, slack time.Duration) <-chan time.Time
} }
if cutoff.IsZero() { if cutoff.IsZero() {
return time.After(maxWait) return maxWait
} }
cutoff = cutoff.Add(-slack) cutoff = cutoff.Add(-slack)
if cutoff.Before(now) { if cutoff.Before(now) {
return time.After(time.Nanosecond) // can't return 0 return time.Nanosecond // can't return 0
} }
wait := cutoff.Sub(now) wait := cutoff.Sub(now)
@ -173,7 +183,7 @@ func (b *CommitBatcher) batchWait(maxWait, slack time.Duration) <-chan time.Time
wait = maxWait wait = maxWait
} }
return time.After(wait) return wait
} }
func (b *CommitBatcher) maybeStartBatch(notif bool) ([]sealiface.CommitBatchRes, error) { func (b *CommitBatcher) maybeStartBatch(notif bool) ([]sealiface.CommitBatchRes, error) {
@ -196,7 +206,25 @@ func (b *CommitBatcher) maybeStartBatch(notif bool) ([]sealiface.CommitBatchRes,
var res []sealiface.CommitBatchRes var res []sealiface.CommitBatchRes
if total < cfg.MinCommitBatch || total < miner5.MinAggregatedSectors { individual := (total < cfg.MinCommitBatch) || (total < miner5.MinAggregatedSectors)
if !individual && !cfg.AggregateAboveBaseFee.Equals(big.Zero()) {
tok, _, err := b.api.ChainHead(b.mctx)
if err != nil {
return nil, err
}
bf, err := b.api.ChainBaseFee(b.mctx, tok)
if err != nil {
return nil, xerrors.Errorf("couldn't get base fee: %w", err)
}
if bf.LessThan(cfg.AggregateAboveBaseFee) {
individual = true
}
}
if individual {
res, err = b.processIndividually() res, err = b.processIndividually()
} else { } else {
res, err = b.processBatch(cfg) res, err = b.processBatch(cfg)
@ -232,7 +260,9 @@ func (b *CommitBatcher) processBatch(cfg sealiface.Config) ([]sealiface.CommitBa
total := len(b.todo) total := len(b.todo)
var res sealiface.CommitBatchRes res := sealiface.CommitBatchRes{
FailedSectors: map[abi.SectorNumber]string{},
}
params := miner5.ProveCommitAggregateParams{ params := miner5.ProveCommitAggregateParams{
SectorNumbers: bitfield.New(), SectorNumbers: bitfield.New(),
@ -347,6 +377,7 @@ func (b *CommitBatcher) processIndividually() ([]sealiface.CommitBatchRes, error
for sn, info := range b.todo { for sn, info := range b.todo {
r := sealiface.CommitBatchRes{ r := sealiface.CommitBatchRes{
Sectors: []abi.SectorNumber{sn}, Sectors: []abi.SectorNumber{sn},
FailedSectors: map[abi.SectorNumber]string{},
} }
mcid, err := b.processSingle(mi, sn, info, tok) mcid, err := b.processSingle(mi, sn, info, tok)

View File

@ -20,6 +20,7 @@ import (
"github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/actors/builtin/miner" "github.com/filecoin-project/lotus/chain/actors/builtin/miner"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper" "github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
sealing "github.com/filecoin-project/lotus/extern/storage-sealing" sealing "github.com/filecoin-project/lotus/extern/storage-sealing"
"github.com/filecoin-project/lotus/extern/storage-sealing/mocks" "github.com/filecoin-project/lotus/extern/storage-sealing/mocks"
@ -58,6 +59,8 @@ func TestCommitBatcher(t *testing.T) {
CommitBatchWait: 24 * time.Hour, CommitBatchWait: 24 * time.Hour,
CommitBatchSlack: 1 * time.Hour, CommitBatchSlack: 1 * time.Hour,
AggregateAboveBaseFee: types.BigMul(types.PicoFil, types.NewInt(150)), // 0.15 nFIL
TerminateBatchMin: 1, TerminateBatchMin: 1,
TerminateBatchMax: 100, TerminateBatchMax: 100,
TerminateBatchWait: 5 * time.Minute, TerminateBatchWait: 5 * time.Minute,
@ -143,7 +146,7 @@ func TestCommitBatcher(t *testing.T) {
} }
} }
expectSend := func(expect []abi.SectorNumber) action { expectSend := func(expect []abi.SectorNumber, aboveBalancer, failOnePCI bool) action {
return func(t *testing.T, s *mocks.MockCommitBatcherApi, pcb *sealing.CommitBatcher) promise { return func(t *testing.T, s *mocks.MockCommitBatcherApi, pcb *sealing.CommitBatcher) promise {
s.EXPECT().StateMinerInfo(gomock.Any(), gomock.Any(), gomock.Any()).Return(miner.MinerInfo{Owner: t0123, Worker: t0123}, nil) s.EXPECT().StateMinerInfo(gomock.Any(), gomock.Any(), gomock.Any()).Return(miner.MinerInfo{Owner: t0123, Worker: t0123}, nil)
@ -153,14 +156,40 @@ func TestCommitBatcher(t *testing.T) {
batch = true batch = true
ti = 1 ti = 1
} }
basefee := types.PicoFil
if aboveBalancer {
basefee = types.NanoFil
}
if batch {
s.EXPECT().ChainHead(gomock.Any()).Return(nil, abi.ChainEpoch(1), nil) s.EXPECT().ChainHead(gomock.Any()).Return(nil, abi.ChainEpoch(1), nil)
s.EXPECT().ChainBaseFee(gomock.Any(), gomock.Any()).Return(basefee, nil)
}
if !aboveBalancer {
batch = false
ti = len(expect)
}
s.EXPECT().ChainHead(gomock.Any()).Return(nil, abi.ChainEpoch(1), nil)
pciC := len(expect)
if failOnePCI {
s.EXPECT().StateSectorPreCommitInfo(gomock.Any(), gomock.Any(), abi.SectorNumber(1), gomock.Any()).Return(nil, nil).Times(1) // not found
pciC = len(expect) - 1
if !batch {
ti--
}
}
s.EXPECT().StateSectorPreCommitInfo(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(&miner.SectorPreCommitOnChainInfo{ s.EXPECT().StateSectorPreCommitInfo(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(&miner.SectorPreCommitOnChainInfo{
PreCommitDeposit: big.Zero(), PreCommitDeposit: big.Zero(),
}, nil).Times(len(expect)) }, nil).Times(pciC)
s.EXPECT().StateMinerInitialPledgeCollateral(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(big.Zero(), nil).Times(len(expect)) s.EXPECT().StateMinerInitialPledgeCollateral(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(big.Zero(), nil).Times(pciC)
if batch { if batch {
s.EXPECT().StateNetworkVersion(gomock.Any(), gomock.Any()).Return(network.Version13, nil) s.EXPECT().StateNetworkVersion(gomock.Any(), gomock.Any()).Return(network.Version13, nil)
s.EXPECT().ChainBaseFee(gomock.Any(), gomock.Any()).Return(big.NewInt(2000), nil) s.EXPECT().ChainBaseFee(gomock.Any(), gomock.Any()).Return(basefee, nil)
} }
s.EXPECT().SendMsg(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), funMatcher(func(i interface{}) bool { s.EXPECT().SendMsg(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), funMatcher(func(i interface{}) bool {
@ -183,11 +212,11 @@ func TestCommitBatcher(t *testing.T) {
} }
} }
flush := func(expect []abi.SectorNumber) action { flush := func(expect []abi.SectorNumber, aboveBalancer, failOnePCI bool) action {
return func(t *testing.T, s *mocks.MockCommitBatcherApi, pcb *sealing.CommitBatcher) promise { return func(t *testing.T, s *mocks.MockCommitBatcherApi, pcb *sealing.CommitBatcher) promise {
_ = expectSend(expect)(t, s, pcb) _ = expectSend(expect, aboveBalancer, failOnePCI)(t, s, pcb)
batch := len(expect) >= minBatch batch := len(expect) >= minBatch && aboveBalancer
r, err := pcb.Flush(ctx) r, err := pcb.Flush(ctx)
require.NoError(t, err) require.NoError(t, err)
@ -198,6 +227,13 @@ func TestCommitBatcher(t *testing.T) {
return r[0].Sectors[i] < r[0].Sectors[j] return r[0].Sectors[i] < r[0].Sectors[j]
}) })
require.Equal(t, expect, r[0].Sectors) require.Equal(t, expect, r[0].Sectors)
if !failOnePCI {
require.Len(t, r[0].FailedSectors, 0)
} else {
require.Len(t, r[0].FailedSectors, 1)
_, found := r[0].FailedSectors[1]
require.True(t, found)
}
} else { } else {
require.Len(t, r, len(expect)) require.Len(t, r, len(expect))
for _, res := range r { for _, res := range r {
@ -209,6 +245,13 @@ func TestCommitBatcher(t *testing.T) {
}) })
for i, res := range r { for i, res := range r {
require.Equal(t, abi.SectorNumber(i), res.Sectors[0]) require.Equal(t, abi.SectorNumber(i), res.Sectors[0])
if failOnePCI && res.Sectors[0] == 1 {
require.Len(t, res.FailedSectors, 1)
_, found := res.FailedSectors[1]
require.True(t, found)
} else {
require.Empty(t, res.FailedSectors)
}
} }
} }
@ -227,33 +270,75 @@ func TestCommitBatcher(t *testing.T) {
tcs := map[string]struct { tcs := map[string]struct {
actions []action actions []action
}{ }{
"addSingle": { "addSingle-aboveBalancer": {
actions: []action{ actions: []action{
addSector(0), addSector(0),
waitPending(1), waitPending(1),
flush([]abi.SectorNumber{0}), flush([]abi.SectorNumber{0}, true, false),
}, },
}, },
"addTwo": { "addTwo-aboveBalancer": {
actions: []action{ actions: []action{
addSectors(getSectors(2)), addSectors(getSectors(2)),
waitPending(2), waitPending(2),
flush(getSectors(2)), flush(getSectors(2), true, false),
}, },
}, },
"addAte": { "addAte-aboveBalancer": {
actions: []action{ actions: []action{
addSectors(getSectors(8)), addSectors(getSectors(8)),
waitPending(8), waitPending(8),
flush(getSectors(8)), flush(getSectors(8), true, false),
}, },
}, },
"addMax": { "addMax-aboveBalancer": {
actions: []action{ actions: []action{
expectSend(getSectors(maxBatch)), expectSend(getSectors(maxBatch), true, false),
addSectors(getSectors(maxBatch)), addSectors(getSectors(maxBatch)),
}, },
}, },
"addSingle-belowBalancer": {
actions: []action{
addSector(0),
waitPending(1),
flush([]abi.SectorNumber{0}, false, false),
},
},
"addTwo-belowBalancer": {
actions: []action{
addSectors(getSectors(2)),
waitPending(2),
flush(getSectors(2), false, false),
},
},
"addAte-belowBalancer": {
actions: []action{
addSectors(getSectors(8)),
waitPending(8),
flush(getSectors(8), false, false),
},
},
"addMax-belowBalancer": {
actions: []action{
expectSend(getSectors(maxBatch), false, false),
addSectors(getSectors(maxBatch)),
},
},
"addAte-aboveBalancer-failOne": {
actions: []action{
addSectors(getSectors(8)),
waitPending(8),
flush(getSectors(8), true, true),
},
},
"addAte-belowBalancer-failOne": {
actions: []action{
addSectors(getSectors(8)),
waitPending(8),
flush(getSectors(8), false, true),
},
},
} }
for name, tc := range tcs { for name, tc := range tcs {

View File

@ -115,6 +115,7 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto
SubmitCommitAggregate: planOne( SubmitCommitAggregate: planOne(
on(SectorCommitAggregateSent{}, CommitWait), on(SectorCommitAggregateSent{}, CommitWait),
on(SectorCommitFailed{}, CommitFailed), on(SectorCommitFailed{}, CommitFailed),
on(SectorRetrySubmitCommit{}, SubmitCommit),
), ),
CommitWait: planOne( CommitWait: planOne(
on(SectorProving{}, FinalizeSector), on(SectorProving{}, FinalizeSector),

View File

@ -88,6 +88,7 @@ func (b *PreCommitBatcher) run() {
panic(err) panic(err)
} }
timer := time.NewTimer(b.batchWait(cfg.PreCommitBatchWait, cfg.PreCommitBatchSlack))
for { for {
if forceRes != nil { if forceRes != nil {
forceRes <- lastRes forceRes <- lastRes
@ -102,7 +103,7 @@ func (b *PreCommitBatcher) run() {
return return
case <-b.notify: case <-b.notify:
sendAboveMax = true sendAboveMax = true
case <-b.batchWait(cfg.PreCommitBatchWait, cfg.PreCommitBatchSlack): case <-timer.C:
// do nothing // do nothing
case fr := <-b.force: // user triggered case fr := <-b.force: // user triggered
forceRes = fr forceRes = fr
@ -113,17 +114,26 @@ func (b *PreCommitBatcher) run() {
if err != nil { if err != nil {
log.Warnw("PreCommitBatcher processBatch error", "error", err) log.Warnw("PreCommitBatcher processBatch error", "error", err)
} }
if !timer.Stop() {
select {
case <-timer.C:
default:
}
}
timer.Reset(b.batchWait(cfg.PreCommitBatchWait, cfg.PreCommitBatchSlack))
} }
} }
func (b *PreCommitBatcher) batchWait(maxWait, slack time.Duration) <-chan time.Time { func (b *PreCommitBatcher) batchWait(maxWait, slack time.Duration) time.Duration {
now := time.Now() now := time.Now()
b.lk.Lock() b.lk.Lock()
defer b.lk.Unlock() defer b.lk.Unlock()
if len(b.todo) == 0 { if len(b.todo) == 0 {
return nil return maxWait
} }
var cutoff time.Time var cutoff time.Time
@ -141,12 +151,12 @@ func (b *PreCommitBatcher) batchWait(maxWait, slack time.Duration) <-chan time.T
} }
if cutoff.IsZero() { if cutoff.IsZero() {
return time.After(maxWait) return maxWait
} }
cutoff = cutoff.Add(-slack) cutoff = cutoff.Add(-slack)
if cutoff.Before(now) { if cutoff.Before(now) {
return time.After(time.Nanosecond) // can't return 0 return time.Nanosecond // can't return 0
} }
wait := cutoff.Sub(now) wait := cutoff.Sub(now)
@ -154,7 +164,7 @@ func (b *PreCommitBatcher) batchWait(maxWait, slack time.Duration) <-chan time.T
wait = maxWait wait = maxWait
} }
return time.After(wait) return wait
} }
func (b *PreCommitBatcher) maybeStartBatch(notif bool) ([]sealiface.PreCommitBatchRes, error) { func (b *PreCommitBatcher) maybeStartBatch(notif bool) ([]sealiface.PreCommitBatchRes, error) {

View File

@ -1,6 +1,10 @@
package sealiface package sealiface
import "time" import (
"time"
"github.com/filecoin-project/go-state-types/abi"
)
// this has to be in a separate package to not make lotus API depend on filecoin-ffi // this has to be in a separate package to not make lotus API depend on filecoin-ffi
@ -31,6 +35,8 @@ type Config struct {
CommitBatchWait time.Duration CommitBatchWait time.Duration
CommitBatchSlack time.Duration CommitBatchSlack time.Duration
AggregateAboveBaseFee abi.TokenAmount
TerminateBatchMax uint64 TerminateBatchMax uint64
TerminateBatchMin uint64 TerminateBatchMin uint64
TerminateBatchWait time.Duration TerminateBatchWait time.Duration

View File

@ -182,7 +182,7 @@ func (m *Sealing) handleComputeProofFailed(ctx statemachine.Context, sector Sect
} }
func (m *Sealing) handleCommitFailed(ctx statemachine.Context, sector SectorInfo) error { func (m *Sealing) handleCommitFailed(ctx statemachine.Context, sector SectorInfo) error {
tok, height, err := m.api.ChainHead(ctx.Context()) tok, _, err := m.api.ChainHead(ctx.Context())
if err != nil { if err != nil {
log.Errorf("handleCommitting: api error, not proceeding: %+v", err) log.Errorf("handleCommitting: api error, not proceeding: %+v", err)
return nil return nil
@ -216,33 +216,6 @@ func (m *Sealing) handleCommitFailed(ctx statemachine.Context, sector SectorInfo
} }
} }
if err := checkPrecommit(ctx.Context(), m.maddr, sector, tok, height, m.api); err != nil {
switch err.(type) {
case *ErrApi:
log.Errorf("handleCommitFailed: api error, not proceeding: %+v", err)
return nil
case *ErrBadCommD:
return ctx.Send(SectorSealPreCommit1Failed{xerrors.Errorf("bad CommD error: %w", err)})
case *ErrExpiredTicket:
return ctx.Send(SectorTicketExpired{xerrors.Errorf("ticket expired error, removing sector: %w", err)})
case *ErrBadTicket:
return ctx.Send(SectorTicketExpired{xerrors.Errorf("expired ticket, removing sector: %w", err)})
case *ErrInvalidDeals:
log.Warnf("invalid deals in sector %d: %v", sector.SectorNumber, err)
return ctx.Send(SectorInvalidDealIDs{Return: RetCommitFailed})
case *ErrExpiredDeals:
return ctx.Send(SectorDealsExpired{xerrors.Errorf("sector deals expired: %w", err)})
case nil:
return ctx.Send(SectorChainPreCommitFailed{xerrors.Errorf("no precommit: %w", err)})
case *ErrPrecommitOnChain:
// noop, this is expected
case *ErrSectorNumberAllocated:
// noop, already committed?
default:
return xerrors.Errorf("checkPrecommit sanity check error (%T): %w", err, err)
}
}
if err := m.checkCommit(ctx.Context(), sector, sector.Proof, tok); err != nil { if err := m.checkCommit(ctx.Context(), sector, sector.Proof, tok); err != nil {
switch err.(type) { switch err.(type) {
case *ErrApi: case *ErrApi:

View File

@ -105,48 +105,66 @@ func checkTicketExpired(ticket, head abi.ChainEpoch) bool {
return head-ticket > MaxTicketAge // TODO: allow configuring expected seal durations return head-ticket > MaxTicketAge // TODO: allow configuring expected seal durations
} }
func (m *Sealing) getTicket(ctx statemachine.Context, sector SectorInfo) (abi.SealRandomness, abi.ChainEpoch, error) { func checkProveCommitExpired(preCommitEpoch, msd abi.ChainEpoch, currEpoch abi.ChainEpoch) bool {
return currEpoch > preCommitEpoch+msd
}
func (m *Sealing) getTicket(ctx statemachine.Context, sector SectorInfo) (abi.SealRandomness, abi.ChainEpoch, bool, error) {
tok, epoch, err := m.api.ChainHead(ctx.Context()) tok, epoch, err := m.api.ChainHead(ctx.Context())
if err != nil { if err != nil {
log.Errorf("handlePreCommit1: api error, not proceeding: %+v", err) log.Errorf("getTicket: api error, not proceeding: %+v", err)
return nil, 0, nil return nil, 0, false, nil
}
// the reason why the StateMinerSectorAllocated function is placed here, if it is outside,
// if the MarshalCBOR function and StateSectorPreCommitInfo function return err, it will be executed
allocated, aerr := m.api.StateMinerSectorAllocated(ctx.Context(), m.maddr, sector.SectorNumber, nil)
if aerr != nil {
log.Errorf("getTicket: api error, checking if sector is allocated: %+v", aerr)
return nil, 0, false, nil
} }
ticketEpoch := epoch - policy.SealRandomnessLookback ticketEpoch := epoch - policy.SealRandomnessLookback
buf := new(bytes.Buffer) buf := new(bytes.Buffer)
if err := m.maddr.MarshalCBOR(buf); err != nil { if err := m.maddr.MarshalCBOR(buf); err != nil {
return nil, 0, err return nil, 0, allocated, err
} }
pci, err := m.api.StateSectorPreCommitInfo(ctx.Context(), m.maddr, sector.SectorNumber, tok) pci, err := m.api.StateSectorPreCommitInfo(ctx.Context(), m.maddr, sector.SectorNumber, tok)
if err != nil { if err != nil {
return nil, 0, xerrors.Errorf("getting precommit info: %w", err) return nil, 0, allocated, xerrors.Errorf("getting precommit info: %w", err)
} }
if pci != nil { if pci != nil {
ticketEpoch = pci.Info.SealRandEpoch ticketEpoch = pci.Info.SealRandEpoch
if checkTicketExpired(ticketEpoch, epoch) { nv, err := m.api.StateNetworkVersion(ctx.Context(), tok)
return nil, 0, xerrors.Errorf("ticket expired for precommitted sector") if err != nil {
return nil, 0, allocated, xerrors.Errorf("getTicket: StateNetworkVersion: api error, not proceeding: %+v", err)
} }
msd := policy.GetMaxProveCommitDuration(actors.VersionForNetwork(nv), sector.SectorType)
if checkProveCommitExpired(pci.PreCommitEpoch, msd, epoch) {
return nil, 0, allocated, xerrors.Errorf("ticket expired for precommitted sector")
}
}
if pci == nil && allocated { // allocated is true, sector precommitted but expired, will SectorCommitFailed or SectorRemove
return nil, 0, allocated, xerrors.Errorf("sector %s precommitted but expired", sector.SectorNumber)
} }
rand, err := m.api.ChainGetRandomnessFromTickets(ctx.Context(), tok, crypto.DomainSeparationTag_SealRandomness, ticketEpoch, buf.Bytes()) rand, err := m.api.ChainGetRandomnessFromTickets(ctx.Context(), tok, crypto.DomainSeparationTag_SealRandomness, ticketEpoch, buf.Bytes())
if err != nil { if err != nil {
return nil, 0, err return nil, 0, allocated, err
} }
return abi.SealRandomness(rand), ticketEpoch, nil return abi.SealRandomness(rand), ticketEpoch, allocated, nil
} }
func (m *Sealing) handleGetTicket(ctx statemachine.Context, sector SectorInfo) error { func (m *Sealing) handleGetTicket(ctx statemachine.Context, sector SectorInfo) error {
ticketValue, ticketEpoch, err := m.getTicket(ctx, sector) ticketValue, ticketEpoch, allocated, err := m.getTicket(ctx, sector)
if err != nil { if err != nil {
allocated, aerr := m.api.StateMinerSectorAllocated(ctx.Context(), m.maddr, sector.SectorNumber, nil)
if aerr != nil {
log.Errorf("error checking if sector is allocated: %+v", aerr)
}
if allocated { if allocated {
if sector.CommitMessage != nil { if sector.CommitMessage != nil {
// Some recovery paths with unfortunate timing lead here // Some recovery paths with unfortunate timing lead here
@ -182,16 +200,37 @@ func (m *Sealing) handlePreCommit1(ctx statemachine.Context, sector SectorInfo)
} }
} }
_, height, err := m.api.ChainHead(ctx.Context()) tok, height, err := m.api.ChainHead(ctx.Context())
if err != nil { if err != nil {
log.Errorf("handlePreCommit1: api error, not proceeding: %+v", err) log.Errorf("handlePreCommit1: api error, not proceeding: %+v", err)
return nil return nil
} }
if checkTicketExpired(sector.TicketEpoch, height) { if checkTicketExpired(sector.TicketEpoch, height) {
pci, err := m.api.StateSectorPreCommitInfo(ctx.Context(), m.maddr, sector.SectorNumber, tok)
if err != nil {
log.Errorf("handlePreCommit1: StateSectorPreCommitInfo: api error, not proceeding: %+v", err)
return nil
}
if pci == nil {
return ctx.Send(SectorOldTicket{}) // go get new ticket return ctx.Send(SectorOldTicket{}) // go get new ticket
} }
nv, err := m.api.StateNetworkVersion(ctx.Context(), tok)
if err != nil {
log.Errorf("handlePreCommit1: StateNetworkVersion: api error, not proceeding: %+v", err)
return nil
}
msd := policy.GetMaxProveCommitDuration(actors.VersionForNetwork(nv), sector.SectorType)
// if height > PreCommitEpoch + msd, there is no need to recalculate
if checkProveCommitExpired(pci.PreCommitEpoch, msd, height) {
return ctx.Send(SectorOldTicket{}) // will be removed
}
}
pc1o, err := m.sealer.SealPreCommit1(sector.sealingCtx(ctx.Context()), m.minerSector(sector.SectorType, sector.SectorNumber), sector.TicketValue, sector.pieceInfos()) pc1o, err := m.sealer.SealPreCommit1(sector.sealingCtx(ctx.Context()), m.minerSector(sector.SectorType, sector.SectorNumber), sector.TicketValue, sector.pieceInfos())
if err != nil { if err != nil {
return ctx.Send(SectorSealPreCommit1Failed{xerrors.Errorf("seal pre commit(1) failed: %w", err)}) return ctx.Send(SectorSealPreCommit1Failed{xerrors.Errorf("seal pre commit(1) failed: %w", err)})
@ -624,11 +663,21 @@ func (m *Sealing) handleSubmitCommitAggregate(ctx statemachine.Context, sector S
Spt: sector.SectorType, Spt: sector.SectorType,
}) })
if err != nil { if err != nil {
return ctx.Send(SectorCommitFailed{xerrors.Errorf("queuing commit for aggregation failed: %w", err)}) return ctx.Send(SectorRetrySubmitCommit{})
} }
if res.Error != "" { if res.Error != "" {
return ctx.Send(SectorCommitFailed{xerrors.Errorf("aggregate error: %s", res.Error)}) tok, _, err := m.api.ChainHead(ctx.Context())
if err != nil {
log.Errorf("handleSubmitCommit: api error, not proceeding: %+v", err)
return nil
}
if err := m.checkCommit(ctx.Context(), sector, sector.Proof, tok); err != nil {
return ctx.Send(SectorCommitFailed{xerrors.Errorf("commit check error: %w", err)})
}
return ctx.Send(SectorRetrySubmitCommit{})
} }
if e, found := res.FailedSectors[sector.SectorNumber]; found { if e, found := res.FailedSectors[sector.SectorNumber]; found {

2
go.mod
View File

@ -165,8 +165,6 @@ replace github.com/libp2p/go-libp2p-yamux => github.com/libp2p/go-libp2p-yamux v
replace github.com/filecoin-project/lotus => ./ replace github.com/filecoin-project/lotus => ./
replace github.com/golangci/golangci-lint => github.com/golangci/golangci-lint v1.18.0
replace github.com/filecoin-project/filecoin-ffi => ./extern/filecoin-ffi replace github.com/filecoin-project/filecoin-ffi => ./extern/filecoin-ffi
replace github.com/filecoin-project/test-vectors => ./extern/test-vectors replace github.com/filecoin-project/test-vectors => ./extern/test-vectors

View File

@ -21,7 +21,7 @@ func TestDealsWithSealingAndRPC(t *testing.T) {
policy.SetPreCommitChallengeDelay(oldDelay) policy.SetPreCommitChallengeDelay(oldDelay)
}) })
var blockTime = 1 * time.Second var blockTime = 50 * time.Millisecond
client, miner, ens := kit.EnsembleMinimal(t, kit.ThroughRPC(), kit.WithAllSubsystems()) // no mock proofs. client, miner, ens := kit.EnsembleMinimal(t, kit.ThroughRPC(), kit.WithAllSubsystems()) // no mock proofs.
ens.InterconnectAll().BeginMining(blockTime) ens.InterconnectAll().BeginMining(blockTime)

View File

@ -2,23 +2,30 @@ package kit
import ( import (
"context" "context"
"encoding/json"
"fmt" "fmt"
"io/ioutil"
"net" "net"
"os"
"path/filepath"
"strings" "strings"
"testing" "testing"
"time" "time"
"github.com/google/uuid"
"github.com/stretchr/testify/require"
"github.com/filecoin-project/go-address" "github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/wallet" "github.com/filecoin-project/lotus/chain/wallet"
"github.com/filecoin-project/lotus/extern/sector-storage/stores"
sealing "github.com/filecoin-project/lotus/extern/storage-sealing" sealing "github.com/filecoin-project/lotus/extern/storage-sealing"
"github.com/filecoin-project/lotus/miner" "github.com/filecoin-project/lotus/miner"
libp2pcrypto "github.com/libp2p/go-libp2p-core/crypto" libp2pcrypto "github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peer"
"github.com/multiformats/go-multiaddr" "github.com/multiformats/go-multiaddr"
"github.com/stretchr/testify/require"
) )
type MinerSubsystem int type MinerSubsystem int
@ -151,3 +158,41 @@ func (tm *TestMiner) FlushSealingBatches(ctx context.Context) {
fmt.Printf("COMMIT BATCH: %+v\n", cb) fmt.Printf("COMMIT BATCH: %+v\n", cb)
} }
} }
const metaFile = "sectorstore.json"
func (tm *TestMiner) AddStorage(ctx context.Context, t *testing.T, weight uint64, seal, store bool) {
p, err := ioutil.TempDir("", "lotus-testsectors-")
require.NoError(t, err)
if err := os.MkdirAll(p, 0755); err != nil {
if !os.IsExist(err) {
require.NoError(t, err)
}
}
_, err = os.Stat(filepath.Join(p, metaFile))
if !os.IsNotExist(err) {
require.NoError(t, err)
}
cfg := &stores.LocalStorageMeta{
ID: stores.ID(uuid.New().String()),
Weight: weight,
CanSeal: seal,
CanStore: store,
}
if !(cfg.CanStore || cfg.CanSeal) {
t.Fatal("must specify at least one of CanStore or cfg.CanSeal")
}
b, err := json.MarshalIndent(cfg, "", " ")
require.NoError(t, err)
err = ioutil.WriteFile(filepath.Join(p, metaFile), b, 0644)
require.NoError(t, err)
err = tm.StorageAddLocal(ctx, p)
require.NoError(t, err)
}

57
itests/nonce_test.go Normal file
View File

@ -0,0 +1,57 @@
package itests
import (
"context"
"testing"
"time"
"github.com/filecoin-project/go-state-types/big"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/itests/kit"
"github.com/stretchr/testify/require"
)
func TestNonceIncremental(t *testing.T) {
ctx := context.Background()
kit.QuietMiningLogs()
client, _, ens := kit.EnsembleMinimal(t, kit.MockProofs())
ens.InterconnectAll().BeginMining(10 * time.Millisecond)
// create a new address where to send funds.
addr, err := client.WalletNew(ctx, types.KTBLS)
require.NoError(t, err)
// get the existing balance from the default wallet to then split it.
bal, err := client.WalletBalance(ctx, client.DefaultKey.Address)
require.NoError(t, err)
const iterations = 100
// we'll send half our balance (saving the other half for gas),
// in `iterations` increments.
toSend := big.Div(bal, big.NewInt(2))
each := big.Div(toSend, big.NewInt(iterations))
var sms []*types.SignedMessage
for i := 0; i < iterations; i++ {
msg := &types.Message{
From: client.DefaultKey.Address,
To: addr,
Value: each,
}
sm, err := client.MpoolPushMessage(ctx, msg, nil)
require.NoError(t, err)
require.EqualValues(t, i, sm.Message.Nonce)
sms = append(sms, sm)
}
for _, sm := range sms {
_, err := client.StateWaitMsg(ctx, sm.Cid(), 3, api.LookbackNoLimit, true)
require.NoError(t, err)
}
}

View File

@ -0,0 +1,66 @@
package itests
import (
"context"
"fmt"
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/filecoin-project/lotus/extern/storage-sealing/sealiface"
"github.com/filecoin-project/lotus/itests/kit"
"github.com/filecoin-project/lotus/node"
"github.com/filecoin-project/lotus/node/config"
"github.com/filecoin-project/lotus/node/modules"
"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/node/repo"
)
func TestDealsWithFinalizeEarly(t *testing.T) {
if testing.Short() {
t.Skip("skipping test in short mode")
}
kit.QuietMiningLogs()
var blockTime = 50 * time.Millisecond
client, miner, ens := kit.EnsembleMinimal(t, kit.ThroughRPC(), kit.ConstructorOpts(
node.ApplyIf(node.IsType(repo.StorageMiner), node.Override(new(dtypes.GetSealingConfigFunc), func() (dtypes.GetSealingConfigFunc, error) {
return func() (sealiface.Config, error) {
cf := config.DefaultStorageMiner()
cf.Sealing.FinalizeEarly = true
return modules.ToSealingConfig(cf), nil
}, nil
})))) // no mock proofs.
ens.InterconnectAll().BeginMining(blockTime)
dh := kit.NewDealHarness(t, client, miner)
ctx := context.Background()
miner.AddStorage(ctx, t, 1000000000, true, false)
miner.AddStorage(ctx, t, 1000000000, false, true)
sl, err := miner.StorageList(ctx)
require.NoError(t, err)
for si, d := range sl {
i, err := miner.StorageInfo(ctx, si)
require.NoError(t, err)
fmt.Printf("stor d:%d %+v\n", len(d), i)
}
t.Run("single", func(t *testing.T) {
dh.RunConcurrentDeals(kit.RunConcurrentDealsOpts{N: 1})
})
sl, err = miner.StorageList(ctx)
require.NoError(t, err)
for si, d := range sl {
i, err := miner.StorageInfo(ctx, si)
require.NoError(t, err)
fmt.Printf("stor d:%d %+v\n", len(d), i)
}
}

View File

@ -155,6 +155,10 @@ type SealingConfig struct {
// time buffer for forceful batch submission before sectors/deals in batch would start expiring // time buffer for forceful batch submission before sectors/deals in batch would start expiring
CommitBatchSlack Duration CommitBatchSlack Duration
// network BaseFee below which to stop doing commit aggregation, instead
// submitting proofs to the chain individually
AggregateAboveBaseFee types.FIL
TerminateBatchMax uint64 TerminateBatchMax uint64
TerminateBatchMin uint64 TerminateBatchMin uint64
TerminateBatchWait Duration TerminateBatchWait Duration
@ -341,6 +345,8 @@ func DefaultStorageMiner() *StorageMiner {
CommitBatchWait: Duration(24 * time.Hour), // this can be up to 30 days CommitBatchWait: Duration(24 * time.Hour), // this can be up to 30 days
CommitBatchSlack: Duration(1 * time.Hour), // time buffer for forceful batch submission before sectors/deals in batch would start expiring, higher value will lower the chances for message fail due to expiration CommitBatchSlack: Duration(1 * time.Hour), // time buffer for forceful batch submission before sectors/deals in batch would start expiring, higher value will lower the chances for message fail due to expiration
AggregateAboveBaseFee: types.FIL(types.BigMul(types.PicoFil, types.NewInt(150))), // 0.15 nFIL
TerminateBatchMin: 1, TerminateBatchMin: 1,
TerminateBatchMax: 100, TerminateBatchMax: 100,
TerminateBatchWait: Duration(5 * time.Minute), TerminateBatchWait: Duration(5 * time.Minute),

View File

@ -881,6 +881,7 @@ func NewSetSealConfigFunc(r repo.LockedRepo) (dtypes.SetSealingConfigFunc, error
MaxCommitBatch: cfg.MaxCommitBatch, MaxCommitBatch: cfg.MaxCommitBatch,
CommitBatchWait: config.Duration(cfg.CommitBatchWait), CommitBatchWait: config.Duration(cfg.CommitBatchWait),
CommitBatchSlack: config.Duration(cfg.CommitBatchSlack), CommitBatchSlack: config.Duration(cfg.CommitBatchSlack),
AggregateAboveBaseFee: types.FIL(cfg.AggregateAboveBaseFee),
TerminateBatchMax: cfg.TerminateBatchMax, TerminateBatchMax: cfg.TerminateBatchMax,
TerminateBatchMin: cfg.TerminateBatchMin, TerminateBatchMin: cfg.TerminateBatchMin,
@ -891,10 +892,8 @@ func NewSetSealConfigFunc(r repo.LockedRepo) (dtypes.SetSealingConfigFunc, error
}, nil }, nil
} }
func NewGetSealConfigFunc(r repo.LockedRepo) (dtypes.GetSealingConfigFunc, error) { func ToSealingConfig(cfg *config.StorageMiner) sealiface.Config {
return func() (out sealiface.Config, err error) { return sealiface.Config{
err = readCfg(r, func(cfg *config.StorageMiner) {
out = sealiface.Config{
MaxWaitDealsSectors: cfg.Sealing.MaxWaitDealsSectors, MaxWaitDealsSectors: cfg.Sealing.MaxWaitDealsSectors,
MaxSealingSectors: cfg.Sealing.MaxSealingSectors, MaxSealingSectors: cfg.Sealing.MaxSealingSectors,
MaxSealingSectorsForDeals: cfg.Sealing.MaxSealingSectorsForDeals, MaxSealingSectorsForDeals: cfg.Sealing.MaxSealingSectorsForDeals,
@ -912,11 +911,18 @@ func NewGetSealConfigFunc(r repo.LockedRepo) (dtypes.GetSealingConfigFunc, error
MaxCommitBatch: cfg.Sealing.MaxCommitBatch, MaxCommitBatch: cfg.Sealing.MaxCommitBatch,
CommitBatchWait: time.Duration(cfg.Sealing.CommitBatchWait), CommitBatchWait: time.Duration(cfg.Sealing.CommitBatchWait),
CommitBatchSlack: time.Duration(cfg.Sealing.CommitBatchSlack), CommitBatchSlack: time.Duration(cfg.Sealing.CommitBatchSlack),
AggregateAboveBaseFee: types.BigInt(cfg.Sealing.AggregateAboveBaseFee),
TerminateBatchMax: cfg.Sealing.TerminateBatchMax, TerminateBatchMax: cfg.Sealing.TerminateBatchMax,
TerminateBatchMin: cfg.Sealing.TerminateBatchMin, TerminateBatchMin: cfg.Sealing.TerminateBatchMin,
TerminateBatchWait: time.Duration(cfg.Sealing.TerminateBatchWait), TerminateBatchWait: time.Duration(cfg.Sealing.TerminateBatchWait),
} }
}
func NewGetSealConfigFunc(r repo.LockedRepo) (dtypes.GetSealingConfigFunc, error) {
return func() (out sealiface.Config, err error) {
err = readCfg(r, func(cfg *config.StorageMiner) {
out = ToSealingConfig(cfg)
}) })
return return
}, nil }, nil