diff --git a/api/api_storage.go b/api/api_storage.go index 202b40f93..c2f3a3d57 100644 --- a/api/api_storage.go +++ b/api/api_storage.go @@ -24,6 +24,7 @@ import ( "github.com/filecoin-project/lotus/extern/sector-storage/fsutil" "github.com/filecoin-project/lotus/extern/sector-storage/stores" "github.com/filecoin-project/lotus/extern/sector-storage/storiface" + "github.com/filecoin-project/lotus/extern/storage-sealing/sealiface" ) // StorageMiner is a low-level interface to the Filecoin network storage miner node @@ -87,7 +88,7 @@ type StorageMiner interface { SectorPreCommitPending(ctx context.Context) ([]abi.SectorID, error) //perm:admin // SectorCommitFlush immediately sends a Commit message with sectors aggregated for Commit. // Returns null if message wasn't sent - SectorCommitFlush(ctx context.Context) (*cid.Cid, error) //perm:admin + SectorCommitFlush(ctx context.Context) ([]sealiface.CommitBatchRes, error) //perm:admin // SectorCommitPending returns a list of pending Commit sectors to be sent in the next aggregate message SectorCommitPending(ctx context.Context) ([]abi.SectorID, error) //perm:admin diff --git a/api/apistruct/struct.go b/api/apistruct/struct.go index 13375cf72..d70c6aa0d 100644 --- a/api/apistruct/struct.go +++ b/api/apistruct/struct.go @@ -28,6 +28,7 @@ import ( "github.com/filecoin-project/lotus/extern/sector-storage/sealtasks" "github.com/filecoin-project/lotus/extern/sector-storage/stores" "github.com/filecoin-project/lotus/extern/sector-storage/storiface" + "github.com/filecoin-project/lotus/extern/storage-sealing/sealiface" marketevents "github.com/filecoin-project/lotus/markets/loggers" "github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/specs-storage/storage" @@ -639,7 +640,7 @@ type StorageMinerStruct struct { SealingSchedDiag func(p0 context.Context, p1 bool) (interface{}, error) `perm:"admin"` - SectorCommitFlush func(p0 context.Context) (*cid.Cid, error) `perm:"admin"` + SectorCommitFlush func(p0 context.Context) ([]sealiface.CommitBatchRes, error) `perm:"admin"` SectorCommitPending func(p0 context.Context) ([]abi.SectorID, error) `perm:"admin"` @@ -1931,7 +1932,7 @@ func (s *StorageMinerStruct) SealingSchedDiag(p0 context.Context, p1 bool) (inte return s.Internal.SealingSchedDiag(p0, p1) } -func (s *StorageMinerStruct) SectorCommitFlush(p0 context.Context) (*cid.Cid, error) { +func (s *StorageMinerStruct) SectorCommitFlush(p0 context.Context) ([]sealiface.CommitBatchRes, error) { return s.Internal.SectorCommitFlush(p0) } diff --git a/api/test/pledge.go b/api/test/pledge.go index 8752ad4ac..b4bf88b59 100644 --- a/api/test/pledge.go +++ b/api/test/pledge.go @@ -178,7 +178,7 @@ func TestPledgeBatching(t *testing.T, b APIBuilder, blocktime time.Duration, nSe cb, err := miner.SectorCommitFlush(ctx) require.NoError(t, err) if cb != nil { - fmt.Printf("COMMIT BATCH: %s\n", *cb) + fmt.Printf("COMMIT BATCH: %+v\n", cb) } } @@ -325,7 +325,7 @@ func flushSealingBatches(t *testing.T, ctx context.Context, miner TestStorageNod cb, err := miner.SectorCommitFlush(ctx) require.NoError(t, err) if cb != nil { - fmt.Printf("COMMIT BATCH: %s\n", *cb) + fmt.Printf("COMMIT BATCH: %+v\n", cb) } } diff --git a/extern/storage-sealing/commit_batch.go b/extern/storage-sealing/commit_batch.go index 845400ccf..c01067872 100644 --- a/extern/storage-sealing/commit_batch.go +++ b/extern/storage-sealing/commit_batch.go @@ -22,6 +22,7 @@ import ( "github.com/filecoin-project/lotus/chain/actors/builtin/miner" "github.com/filecoin-project/lotus/chain/actors/policy" "github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper" + "github.com/filecoin-project/lotus/extern/storage-sealing/sealiface" ) const arp = abi.RegisteredAggregationProof_SnarkPackV1 @@ -30,6 +31,9 @@ type CommitBatcherApi interface { SendMsg(ctx context.Context, from, to address.Address, method abi.MethodNum, value, maxFee abi.TokenAmount, params []byte) (cid.Cid, error) StateMinerInfo(context.Context, address.Address, TipSetToken) (miner.MinerInfo, error) ChainHead(ctx context.Context) (TipSetToken, abi.ChainEpoch, error) + + StateSectorPreCommitInfo(ctx context.Context, maddr address.Address, sectorNumber abi.SectorNumber, tok TipSetToken) (*miner.SectorPreCommitOnChainInfo, error) + StateMinerInitialPledgeCollateral(context.Context, address.Address, miner.SectorPreCommitInfo, TipSetToken) (big.Int, error) } type AggregateInput struct { @@ -49,10 +53,10 @@ type CommitBatcher struct { deadlines map[abi.SectorNumber]time.Time todo map[abi.SectorNumber]AggregateInput - waiting map[abi.SectorNumber][]chan cid.Cid + waiting map[abi.SectorNumber][]chan sealiface.CommitBatchRes notify, stop, stopped chan struct{} - force chan chan *cid.Cid + force chan chan []sealiface.CommitBatchRes lk sync.Mutex } @@ -68,10 +72,10 @@ func NewCommitBatcher(mctx context.Context, maddr address.Address, api CommitBat deadlines: map[abi.SectorNumber]time.Time{}, todo: map[abi.SectorNumber]AggregateInput{}, - waiting: map[abi.SectorNumber][]chan cid.Cid{}, + waiting: map[abi.SectorNumber][]chan sealiface.CommitBatchRes{}, notify: make(chan struct{}, 1), - force: make(chan chan *cid.Cid), + force: make(chan chan []sealiface.CommitBatchRes), stop: make(chan struct{}), stopped: make(chan struct{}), } @@ -82,8 +86,8 @@ func NewCommitBatcher(mctx context.Context, maddr address.Address, api CommitBat } func (b *CommitBatcher) run() { - var forceRes chan *cid.Cid - var lastMsg *cid.Cid + var forceRes chan []sealiface.CommitBatchRes + var lastMsg []sealiface.CommitBatchRes cfg, err := b.getConfig() if err != nil { @@ -111,7 +115,7 @@ func (b *CommitBatcher) run() { } var err error - lastMsg, err = b.processBatch(sendAboveMax, sendAboveMin) + lastMsg, err = b.maybeStartBatch(sendAboveMax, sendAboveMin) if err != nil { log.Warnw("CommitBatcher processBatch error", "error", err) } @@ -159,12 +163,9 @@ func (b *CommitBatcher) batchWait(maxWait, slack time.Duration) <-chan time.Time return time.After(wait) } -func (b *CommitBatcher) processBatch(notif, after bool) (*cid.Cid, error) { +func (b *CommitBatcher) maybeStartBatch(notif, after bool) ([]sealiface.CommitBatchRes, error) { b.lk.Lock() defer b.lk.Unlock() - params := miner5.ProveCommitAggregateParams{ - SectorNumbers: bitfield.New(), - } total := len(b.todo) if total == 0 { @@ -184,6 +185,45 @@ func (b *CommitBatcher) processBatch(notif, after bool) (*cid.Cid, error) { return nil, nil } + var res []sealiface.CommitBatchRes + + if total < cfg.MinCommitBatch || total < miner5.PreCommitSectorBatchMaxSize { + res, err = b.processIndividually() + } else { + res, err = b.processBatch(cfg) + } + if err != nil && len(res) == 0 { + return nil, err + } + + for _, r := range res { + if err != nil { + r.Error = err.Error() + } + + for _, sn := range r.Sectors { + for _, ch := range b.waiting[sn] { + ch <- r // buffered + } + + delete(b.waiting, sn) + delete(b.todo, sn) + delete(b.deadlines, sn) + } + } + + return res, nil +} + +func (b *CommitBatcher) processBatch(cfg sealiface.Config) ([]sealiface.CommitBatchRes, error) { + total := len(b.todo) + + var res sealiface.CommitBatchRes + + params := miner5.ProveCommitAggregateParams{ + SectorNumbers: bitfield.New(), + } + proofs := make([][]byte, 0, total) infos := make([]proof5.AggregateSealVerifyInfo, 0, total) @@ -202,12 +242,13 @@ func (b *CommitBatcher) processBatch(notif, after bool) (*cid.Cid, error) { }) for _, info := range infos { + res.Sectors = append(res.Sectors, info.Number) proofs = append(proofs, b.todo[info.Number].proof) } mid, err := address.IDFromAddress(b.maddr) if err != nil { - return nil, xerrors.Errorf("getting miner id: %w", err) + return []sealiface.CommitBatchRes{res}, xerrors.Errorf("getting miner id: %w", err) } params.AggregateProof, err = b.prover.AggregateSealProofs(proof5.AggregateSealVerifyProofAndInfos{ @@ -217,55 +258,107 @@ func (b *CommitBatcher) processBatch(notif, after bool) (*cid.Cid, error) { Infos: infos, }, proofs) if err != nil { - return nil, xerrors.Errorf("aggregating proofs: %w", err) + return []sealiface.CommitBatchRes{res}, xerrors.Errorf("aggregating proofs: %w", err) } enc := new(bytes.Buffer) if err := params.MarshalCBOR(enc); err != nil { - return nil, xerrors.Errorf("couldn't serialize ProveCommitAggregateParams: %w", err) + return []sealiface.CommitBatchRes{res}, xerrors.Errorf("couldn't serialize ProveCommitAggregateParams: %w", err) } + mi, err := b.api.StateMinerInfo(b.mctx, b.maddr, nil) + if err != nil { + return []sealiface.CommitBatchRes{res}, xerrors.Errorf("couldn't get miner info: %w", err) + } + + from, _, err := b.addrSel(b.mctx, mi, api.CommitAddr, b.feeCfg.MaxCommitGasFee, b.feeCfg.MaxCommitGasFee) + if err != nil { + return []sealiface.CommitBatchRes{res}, xerrors.Errorf("no good address found: %w", err) + } + + // todo: collateral + + mcid, err := b.api.SendMsg(b.mctx, from, b.maddr, miner.Methods.ProveCommitAggregate, big.Zero(), b.feeCfg.MaxCommitGasFee, enc.Bytes()) + if err != nil { + return []sealiface.CommitBatchRes{res}, xerrors.Errorf("sending message failed: %w", err) + } + + res.Msg = &mcid + + log.Infow("Sent ProveCommitAggregate message", "cid", mcid, "from", from, "todo", total, "sectors", len(infos)) + + return []sealiface.CommitBatchRes{res}, nil +} + +func (b *CommitBatcher) processIndividually() ([]sealiface.CommitBatchRes, error) { mi, err := b.api.StateMinerInfo(b.mctx, b.maddr, nil) if err != nil { return nil, xerrors.Errorf("couldn't get miner info: %w", err) } - from, _, err := b.addrSel(b.mctx, mi, api.CommitAddr, b.feeCfg.MaxCommitGasFee, b.feeCfg.MaxCommitGasFee) + tok, _, err := b.api.ChainHead(b.mctx) if err != nil { - return nil, xerrors.Errorf("no good address found: %w", err) + return nil, err } - mcid, err := b.api.SendMsg(b.mctx, from, b.maddr, miner.Methods.ProveCommitAggregate, big.Zero(), b.feeCfg.MaxCommitGasFee, enc.Bytes()) - if err != nil { - return nil, xerrors.Errorf("sending message failed: %w", err) - } + var res []sealiface.CommitBatchRes - log.Infow("Sent ProveCommitAggregate message", "cid", mcid, "from", from, "todo", total, "sectors", len(infos)) - - err = params.SectorNumbers.ForEach(func(us uint64) error { - sn := abi.SectorNumber(us) - - for _, ch := range b.waiting[sn] { - ch <- mcid // buffered + for sn, info := range b.todo { + r := sealiface.CommitBatchRes{ + Sectors: []abi.SectorNumber{sn}, } - delete(b.waiting, sn) - delete(b.todo, sn) - delete(b.deadlines, sn) - return nil - }) - if err != nil { - return nil, xerrors.Errorf("done sectors foreach: %w", err) + + mcid, err := b.processSingle(mi, sn, info, tok) + if err != nil { + log.Errorf("process single error: %+v", err) // todo: return to user + r.FailedSectors[sn] = err.Error() + } else { + r.Msg = &mcid + } + + res = append(res, r) } - return &mcid, nil + return res, nil +} + +func (b *CommitBatcher) processSingle(mi miner.MinerInfo, sn abi.SectorNumber, info AggregateInput, tok TipSetToken) (cid.Cid, error) { + enc := new(bytes.Buffer) + params := &miner.ProveCommitSectorParams{ + SectorNumber: sn, + Proof: info.proof, + } + + if err := params.MarshalCBOR(enc); err != nil { + return cid.Undef, xerrors.Errorf("marshaling commit params: %w", err) + } + + collateral, err := b.getSectorCollateral(sn, tok) + if err != nil { + return cid.Undef, err + } + + goodFunds := big.Add(collateral, b.feeCfg.MaxCommitGasFee) + + from, _, err := b.addrSel(b.mctx, mi, api.CommitAddr, goodFunds, collateral) + if err != nil { + return cid.Undef, xerrors.Errorf("no good address to send commit message from: %w", err) + } + + mcid, err := b.api.SendMsg(b.mctx, from, b.maddr, miner.Methods.ProveCommitSector, collateral, b.feeCfg.MaxCommitGasFee, enc.Bytes()) + if err != nil { + return cid.Undef, xerrors.Errorf("pushing message to mpool: %w", err) + } + + return mcid, nil } // register commit, wait for batch message, return message CID -func (b *CommitBatcher) AddCommit(ctx context.Context, s SectorInfo, in AggregateInput) (mcid cid.Cid, err error) { +func (b *CommitBatcher) AddCommit(ctx context.Context, s SectorInfo, in AggregateInput) (res sealiface.CommitBatchRes, err error) { _, curEpoch, err := b.api.ChainHead(b.mctx) if err != nil { log.Errorf("getting chain head: %s", err) - return cid.Undef, nil + return sealiface.CommitBatchRes{}, nil } sn := s.SectorNumber @@ -274,7 +367,7 @@ func (b *CommitBatcher) AddCommit(ctx context.Context, s SectorInfo, in Aggregat b.deadlines[sn] = getSectorDeadline(curEpoch, s) b.todo[sn] = in - sent := make(chan cid.Cid, 1) + sent := make(chan sealiface.CommitBatchRes, 1) b.waiting[sn] = append(b.waiting[sn], sent) select { @@ -284,15 +377,15 @@ func (b *CommitBatcher) AddCommit(ctx context.Context, s SectorInfo, in Aggregat b.lk.Unlock() select { - case c := <-sent: - return c, nil + case r := <-sent: + return r, nil case <-ctx.Done(): - return cid.Undef, ctx.Err() + return sealiface.CommitBatchRes{}, ctx.Err() } } -func (b *CommitBatcher) Flush(ctx context.Context) (*cid.Cid, error) { - resCh := make(chan *cid.Cid, 1) +func (b *CommitBatcher) Flush(ctx context.Context) ([]sealiface.CommitBatchRes, error) { + resCh := make(chan []sealiface.CommitBatchRes, 1) select { case b.force <- resCh: select { @@ -364,3 +457,25 @@ func getSectorDeadline(curEpoch abi.ChainEpoch, si SectorInfo) time.Time { return time.Now().Add(time.Duration(deadlineEpoch-curEpoch) * time.Duration(build.BlockDelaySecs) * time.Second) } + +func (b *CommitBatcher) getSectorCollateral(sn abi.SectorNumber, tok TipSetToken) (abi.TokenAmount, error) { + pci, err := b.api.StateSectorPreCommitInfo(b.mctx, b.maddr, sn, tok) + if err != nil { + return big.Zero(), xerrors.Errorf("getting precommit info: %w", err) + } + if pci == nil { + return big.Zero(), xerrors.Errorf("precommit info not found on chain") + } + + collateral, err := b.api.StateMinerInitialPledgeCollateral(b.mctx, b.maddr, pci.Info, tok) + if err != nil { + return big.Zero(), xerrors.Errorf("getting initial pledge collateral: %w", err) + } + + collateral = big.Sub(collateral, pci.PreCommitDeposit) + if collateral.LessThan(big.Zero()) { + collateral = big.Zero() + } + + return collateral, nil +} diff --git a/extern/storage-sealing/sealiface/batching.go b/extern/storage-sealing/sealiface/batching.go new file mode 100644 index 000000000..e7c2cadbb --- /dev/null +++ b/extern/storage-sealing/sealiface/batching.go @@ -0,0 +1,16 @@ +package sealiface + +import ( + "github.com/ipfs/go-cid" + + "github.com/filecoin-project/go-state-types/abi" +) + +type CommitBatchRes struct { + Sectors []abi.SectorNumber + + FailedSectors map[abi.SectorNumber]string + + Msg *cid.Cid + Error string // if set, means that all sectors are failed, implies Msg==nil +} diff --git a/extern/storage-sealing/sealing.go b/extern/storage-sealing/sealing.go index fc452cc6f..61360dc12 100644 --- a/extern/storage-sealing/sealing.go +++ b/extern/storage-sealing/sealing.go @@ -27,6 +27,7 @@ import ( "github.com/filecoin-project/lotus/chain/types" sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage" "github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper" + "github.com/filecoin-project/lotus/extern/storage-sealing/sealiface" ) const SectorStorePrefix = "/sectors" @@ -214,7 +215,7 @@ func (m *Sealing) SectorPreCommitPending(ctx context.Context) ([]abi.SectorID, e return m.precommiter.Pending(ctx) } -func (m *Sealing) CommitFlush(ctx context.Context) (*cid.Cid, error) { +func (m *Sealing) CommitFlush(ctx context.Context) ([]sealiface.CommitBatchRes, error) { return m.commiter.Flush(ctx) } diff --git a/extern/storage-sealing/states_sealing.go b/extern/storage-sealing/states_sealing.go index 391951dea..6f4c57bfd 100644 --- a/extern/storage-sealing/states_sealing.go +++ b/extern/storage-sealing/states_sealing.go @@ -581,7 +581,7 @@ func (m *Sealing) handleSubmitCommitAggregate(ctx statemachine.Context, sector S return ctx.Send(SectorCommitFailed{xerrors.Errorf("sector had nil commR or commD")}) } - mcid, err := m.commiter.AddCommit(ctx.Context(), sector, AggregateInput{ + res, err := m.commiter.AddCommit(ctx.Context(), sector, AggregateInput{ info: proof.AggregateSealVerifyInfo{ Number: sector.SectorNumber, Randomness: sector.TicketValue, @@ -596,7 +596,19 @@ func (m *Sealing) handleSubmitCommitAggregate(ctx statemachine.Context, sector S return ctx.Send(SectorCommitFailed{xerrors.Errorf("queuing commit for aggregation failed: %w", err)}) } - return ctx.Send(SectorCommitAggregateSent{mcid}) + if res.Error != "" { + return ctx.Send(SectorCommitFailed{xerrors.Errorf("aggregate error: %s", res.Error)}) + } + + if e, found := res.FailedSectors[sector.SectorNumber]; found { + return ctx.Send(SectorCommitFailed{xerrors.Errorf("sector failed in aggregate processing: %s", e)}) + } + + if res.Msg == nil { + return ctx.Send(SectorCommitFailed{xerrors.Errorf("aggregate message was nil")}) + } + + return ctx.Send(SectorCommitAggregateSent{*res.Msg}) } func (m *Sealing) handleCommitWait(ctx statemachine.Context, sector SectorInfo) error { diff --git a/node/impl/storminer.go b/node/impl/storminer.go index 8660f1efb..9b6f65207 100644 --- a/node/impl/storminer.go +++ b/node/impl/storminer.go @@ -32,6 +32,7 @@ import ( "github.com/filecoin-project/lotus/extern/sector-storage/stores" "github.com/filecoin-project/lotus/extern/sector-storage/storiface" sealing "github.com/filecoin-project/lotus/extern/storage-sealing" + "github.com/filecoin-project/lotus/extern/storage-sealing/sealiface" "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/api/apistruct" @@ -386,7 +387,7 @@ func (sm *StorageMinerAPI) SectorMarkForUpgrade(ctx context.Context, id abi.Sect return sm.Miner.MarkForUpgrade(id) } -func (sm *StorageMinerAPI) SectorCommitFlush(ctx context.Context) (*cid.Cid, error) { +func (sm *StorageMinerAPI) SectorCommitFlush(ctx context.Context) ([]sealiface.CommitBatchRes, error) { return sm.Miner.CommitFlush(ctx) } diff --git a/storage/sealing.go b/storage/sealing.go index cd215f238..bd8241197 100644 --- a/storage/sealing.go +++ b/storage/sealing.go @@ -11,6 +11,7 @@ import ( "github.com/filecoin-project/specs-storage/storage" sealing "github.com/filecoin-project/lotus/extern/storage-sealing" + "github.com/filecoin-project/lotus/extern/storage-sealing/sealiface" ) // TODO: refactor this to be direct somehow @@ -67,7 +68,7 @@ func (m *Miner) SectorPreCommitPending(ctx context.Context) ([]abi.SectorID, err return m.sealing.SectorPreCommitPending(ctx) } -func (m *Miner) CommitFlush(ctx context.Context) (*cid.Cid, error) { +func (m *Miner) CommitFlush(ctx context.Context) ([]sealiface.CommitBatchRes, error) { return m.sealing.CommitFlush(ctx) }