diff --git a/api/api_storage.go b/api/api_storage.go index c2f3a3d57..e50fedc19 100644 --- a/api/api_storage.go +++ b/api/api_storage.go @@ -83,7 +83,7 @@ type StorageMiner interface { SectorMarkForUpgrade(ctx context.Context, id abi.SectorNumber) error //perm:admin // SectorPreCommitFlush immediately sends a PreCommit message with sectors batched for PreCommit. // Returns null if message wasn't sent - SectorPreCommitFlush(ctx context.Context) (*cid.Cid, error) //perm:admin + SectorPreCommitFlush(ctx context.Context) ([]sealiface.PreCommitBatchRes, error) //perm:admin // SectorPreCommitPending returns a list of pending PreCommit sectors to be sent in the next batch message SectorPreCommitPending(ctx context.Context) ([]abi.SectorID, error) //perm:admin // SectorCommitFlush immediately sends a Commit message with sectors aggregated for Commit. diff --git a/api/apistruct/struct.go b/api/apistruct/struct.go index d70c6aa0d..acdc0f9b5 100644 --- a/api/apistruct/struct.go +++ b/api/apistruct/struct.go @@ -650,7 +650,7 @@ type StorageMinerStruct struct { SectorMarkForUpgrade func(p0 context.Context, p1 abi.SectorNumber) error `perm:"admin"` - SectorPreCommitFlush func(p0 context.Context) (*cid.Cid, error) `perm:"admin"` + SectorPreCommitFlush func(p0 context.Context) ([]sealiface.PreCommitBatchRes, error) `perm:"admin"` SectorPreCommitPending func(p0 context.Context) ([]abi.SectorID, error) `perm:"admin"` @@ -1952,7 +1952,7 @@ func (s *StorageMinerStruct) SectorMarkForUpgrade(p0 context.Context, p1 abi.Sec return s.Internal.SectorMarkForUpgrade(p0, p1) } -func (s *StorageMinerStruct) SectorPreCommitFlush(p0 context.Context) (*cid.Cid, error) { +func (s *StorageMinerStruct) SectorPreCommitFlush(p0 context.Context) ([]sealiface.PreCommitBatchRes, error) { return s.Internal.SectorPreCommitFlush(p0) } diff --git a/api/test/pledge.go b/api/test/pledge.go index b4bf88b59..08548dc60 100644 --- a/api/test/pledge.go +++ b/api/test/pledge.go @@ -169,7 +169,7 @@ func TestPledgeBatching(t *testing.T, b APIBuilder, blocktime time.Duration, nSe pcb, err := miner.SectorPreCommitFlush(ctx) require.NoError(t, err) if pcb != nil { - fmt.Printf("PRECOMMIT BATCH: %s\n", *pcb) + fmt.Printf("PRECOMMIT BATCH: %+v\n", pcb) } } @@ -319,7 +319,7 @@ func flushSealingBatches(t *testing.T, ctx context.Context, miner TestStorageNod pcb, err := miner.SectorPreCommitFlush(ctx) require.NoError(t, err) if pcb != nil { - fmt.Printf("PRECOMMIT BATCH: %s\n", *pcb) + fmt.Printf("PRECOMMIT BATCH: %+v\n", pcb) } cb, err := miner.SectorCommitFlush(ctx) diff --git a/build/openrpc/miner.json.gz b/build/openrpc/miner.json.gz index dd1b12856..f60d44df0 100644 Binary files a/build/openrpc/miner.json.gz and b/build/openrpc/miner.json.gz differ diff --git a/cmd/lotus-storage-miner/sectors.go b/cmd/lotus-storage-miner/sectors.go index c76d4a249..2476c16e8 100644 --- a/cmd/lotus-storage-miner/sectors.go +++ b/cmd/lotus-storage-miner/sectors.go @@ -1059,15 +1059,26 @@ var sectorsBatchingPendingPreCommit = &cli.Command{ ctx := lcli.ReqContext(cctx) if cctx.Bool("publish-now") { - cid, err := api.SectorPreCommitFlush(ctx) + res, err := api.SectorPreCommitFlush(ctx) if err != nil { return xerrors.Errorf("flush: %w", err) } - if cid == nil { + if res == nil { return xerrors.Errorf("no sectors to publish") } - fmt.Println("sector batch published: ", cid) + for i, re := range res { + fmt.Printf("Batch %d:\n", i) + if re.Error != "" { + fmt.Printf("\tError: %s\n", re.Error) + } else { + fmt.Printf("\tMessage: %s\n", re.Msg) + } + fmt.Printf("\tSectors:\n") + for _, sector := range re.Sectors { + fmt.Printf("\t\t%d\tOK\n", sector) + } + } return nil } diff --git a/extern/storage-sealing/precommit_batch.go b/extern/storage-sealing/precommit_batch.go index bce8e21d5..dd674d331 100644 --- a/extern/storage-sealing/precommit_batch.go +++ b/extern/storage-sealing/precommit_batch.go @@ -18,6 +18,7 @@ import ( "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/chain/actors/builtin/miner" + "github.com/filecoin-project/lotus/extern/storage-sealing/sealiface" ) type PreCommitBatcherApi interface { @@ -41,10 +42,10 @@ type PreCommitBatcher struct { deadlines map[abi.SectorNumber]time.Time todo map[abi.SectorNumber]*preCommitEntry - waiting map[abi.SectorNumber][]chan cid.Cid + waiting map[abi.SectorNumber][]chan sealiface.PreCommitBatchRes notify, stop, stopped chan struct{} - force chan chan *cid.Cid + force chan chan []sealiface.PreCommitBatchRes lk sync.Mutex } @@ -59,10 +60,10 @@ func NewPreCommitBatcher(mctx context.Context, maddr address.Address, api PreCom deadlines: map[abi.SectorNumber]time.Time{}, todo: map[abi.SectorNumber]*preCommitEntry{}, - waiting: map[abi.SectorNumber][]chan cid.Cid{}, + waiting: map[abi.SectorNumber][]chan sealiface.PreCommitBatchRes{}, notify: make(chan struct{}, 1), - force: make(chan chan *cid.Cid), + force: make(chan chan []sealiface.PreCommitBatchRes), stop: make(chan struct{}), stopped: make(chan struct{}), } @@ -73,8 +74,8 @@ func NewPreCommitBatcher(mctx context.Context, maddr address.Address, api PreCom } func (b *PreCommitBatcher) run() { - var forceRes chan *cid.Cid - var lastMsg *cid.Cid + var forceRes chan []sealiface.PreCommitBatchRes + var lastRes []sealiface.PreCommitBatchRes cfg, err := b.getConfig() if err != nil { @@ -83,10 +84,10 @@ func (b *PreCommitBatcher) run() { for { if forceRes != nil { - forceRes <- lastMsg + forceRes <- lastRes forceRes = nil } - lastMsg = nil + lastRes = nil var sendAboveMax, sendAboveMin bool select { @@ -102,7 +103,7 @@ func (b *PreCommitBatcher) run() { } var err error - lastMsg, err = b.processBatch(sendAboveMax, sendAboveMin) + lastRes, err = b.maybeStartBatch(sendAboveMax, sendAboveMin) if err != nil { log.Warnw("PreCommitBatcher processBatch error", "error", err) } @@ -150,10 +151,9 @@ func (b *PreCommitBatcher) batchWait(maxWait, slack time.Duration) <-chan time.T return time.After(wait) } -func (b *PreCommitBatcher) processBatch(notif, after bool) (*cid.Cid, error) { +func (b *PreCommitBatcher) maybeStartBatch(notif, after bool) ([]sealiface.PreCommitBatchRes, error) { b.lk.Lock() defer b.lk.Unlock() - params := miner5.PreCommitSectorBatchParams{} total := len(b.todo) if total == 0 { @@ -173,7 +173,35 @@ func (b *PreCommitBatcher) processBatch(notif, after bool) (*cid.Cid, error) { return nil, nil } + // todo support multiple batches + res, err := b.processBatch(cfg) + if err != nil && len(res) == 0 { + return nil, err + } + + for _, r := range res { + if err != nil { + r.Error = err.Error() + } + + for _, sn := range r.Sectors { + for _, ch := range b.waiting[sn] { + ch <- r // buffered + } + + delete(b.waiting, sn) + delete(b.todo, sn) + delete(b.deadlines, sn) + } + } + + return res, nil +} + +func (b *PreCommitBatcher) processBatch(cfg sealiface.Config) ([]sealiface.PreCommitBatchRes, error) { + params := miner5.PreCommitSectorBatchParams{} deposit := big.Zero() + var res sealiface.PreCommitBatchRes for _, p := range b.todo { if len(params.Sectors) >= cfg.MaxPreCommitBatch { @@ -181,54 +209,46 @@ func (b *PreCommitBatcher) processBatch(notif, after bool) (*cid.Cid, error) { break } + res.Sectors = append(res.Sectors, p.pci.SectorNumber) params.Sectors = append(params.Sectors, *p.pci) deposit = big.Add(deposit, p.deposit) } enc := new(bytes.Buffer) if err := params.MarshalCBOR(enc); err != nil { - return nil, xerrors.Errorf("couldn't serialize PreCommitSectorBatchParams: %w", err) + return []sealiface.PreCommitBatchRes{res}, xerrors.Errorf("couldn't serialize PreCommitSectorBatchParams: %w", err) } mi, err := b.api.StateMinerInfo(b.mctx, b.maddr, nil) if err != nil { - return nil, xerrors.Errorf("couldn't get miner info: %w", err) + return []sealiface.PreCommitBatchRes{res}, xerrors.Errorf("couldn't get miner info: %w", err) } goodFunds := big.Add(deposit, b.feeCfg.MaxPreCommitGasFee) from, _, err := b.addrSel(b.mctx, mi, api.PreCommitAddr, goodFunds, deposit) if err != nil { - return nil, xerrors.Errorf("no good address found: %w", err) + return []sealiface.PreCommitBatchRes{res}, xerrors.Errorf("no good address found: %w", err) } mcid, err := b.api.SendMsg(b.mctx, from, b.maddr, miner.Methods.PreCommitSectorBatch, deposit, b.feeCfg.MaxPreCommitGasFee, enc.Bytes()) if err != nil { - return nil, xerrors.Errorf("sending message failed: %w", err) + return []sealiface.PreCommitBatchRes{res}, xerrors.Errorf("sending message failed: %w", err) } - log.Infow("Sent ProveCommitAggregate message", "cid", mcid, "from", from, "sectors", total) + res.Msg = &mcid - for _, sector := range params.Sectors { - sn := sector.SectorNumber + log.Infow("Sent ProveCommitAggregate message", "cid", mcid, "from", from, "sectors", len(b.todo)) - for _, ch := range b.waiting[sn] { - ch <- mcid // buffered - } - delete(b.waiting, sn) - delete(b.todo, sn) - delete(b.deadlines, sn) - } - - return &mcid, nil + return []sealiface.PreCommitBatchRes{res}, nil } // register PreCommit, wait for batch message, return message CID -func (b *PreCommitBatcher) AddPreCommit(ctx context.Context, s SectorInfo, deposit abi.TokenAmount, in *miner0.SectorPreCommitInfo) (mcid cid.Cid, err error) { +func (b *PreCommitBatcher) AddPreCommit(ctx context.Context, s SectorInfo, deposit abi.TokenAmount, in *miner0.SectorPreCommitInfo) (res sealiface.PreCommitBatchRes, err error) { _, curEpoch, err := b.api.ChainHead(b.mctx) if err != nil { log.Errorf("getting chain head: %s", err) - return cid.Undef, nil + return sealiface.PreCommitBatchRes{}, err } sn := s.SectorNumber @@ -240,7 +260,7 @@ func (b *PreCommitBatcher) AddPreCommit(ctx context.Context, s SectorInfo, depos pci: in, } - sent := make(chan cid.Cid, 1) + sent := make(chan sealiface.PreCommitBatchRes, 1) b.waiting[sn] = append(b.waiting[sn], sent) select { @@ -253,12 +273,12 @@ func (b *PreCommitBatcher) AddPreCommit(ctx context.Context, s SectorInfo, depos case c := <-sent: return c, nil case <-ctx.Done(): - return cid.Undef, ctx.Err() + return sealiface.PreCommitBatchRes{}, ctx.Err() } } -func (b *PreCommitBatcher) Flush(ctx context.Context) (*cid.Cid, error) { - resCh := make(chan *cid.Cid, 1) +func (b *PreCommitBatcher) Flush(ctx context.Context) ([]sealiface.PreCommitBatchRes, error) { + resCh := make(chan []sealiface.PreCommitBatchRes, 1) select { case b.force <- resCh: select { diff --git a/extern/storage-sealing/sealiface/batching.go b/extern/storage-sealing/sealiface/batching.go index e7c2cadbb..d0e6d4178 100644 --- a/extern/storage-sealing/sealiface/batching.go +++ b/extern/storage-sealing/sealiface/batching.go @@ -14,3 +14,10 @@ type CommitBatchRes struct { Msg *cid.Cid Error string // if set, means that all sectors are failed, implies Msg==nil } + +type PreCommitBatchRes struct { + Sectors []abi.SectorNumber + + Msg *cid.Cid + Error string // if set, means that all sectors are failed, implies Msg==nil +} diff --git a/extern/storage-sealing/sealing.go b/extern/storage-sealing/sealing.go index 61360dc12..e69ce5be0 100644 --- a/extern/storage-sealing/sealing.go +++ b/extern/storage-sealing/sealing.go @@ -207,7 +207,7 @@ func (m *Sealing) TerminatePending(ctx context.Context) ([]abi.SectorID, error) return m.terminator.Pending(ctx) } -func (m *Sealing) SectorPreCommitFlush(ctx context.Context) (*cid.Cid, error) { +func (m *Sealing) SectorPreCommitFlush(ctx context.Context) ([]sealiface.PreCommitBatchRes, error) { return m.precommiter.Flush(ctx) } diff --git a/extern/storage-sealing/states_sealing.go b/extern/storage-sealing/states_sealing.go index 6f4c57bfd..815ad6ac0 100644 --- a/extern/storage-sealing/states_sealing.go +++ b/extern/storage-sealing/states_sealing.go @@ -355,7 +355,7 @@ func (m *Sealing) handlePreCommitting(ctx statemachine.Context, sector SectorInf func (m *Sealing) handleSubmitPreCommitBatch(ctx statemachine.Context, sector SectorInfo) error { if sector.CommD == nil || sector.CommR == nil { - return ctx.Send(SectorCommitFailed{xerrors.Errorf("sector had nil commR or commD")}) + return ctx.Send(SectorSealPreCommit1Failed{xerrors.Errorf("sector had nil commR or commD")}) } params, deposit, _, err := m.preCommitParams(ctx, sector) @@ -363,12 +363,20 @@ func (m *Sealing) handleSubmitPreCommitBatch(ctx statemachine.Context, sector Se return err } - mcid, err := m.precommiter.AddPreCommit(ctx.Context(), sector, deposit, params) + res, err := m.precommiter.AddPreCommit(ctx.Context(), sector, deposit, params) if err != nil { - return ctx.Send(SectorCommitFailed{xerrors.Errorf("queuing precommit batch failed: %w", err)}) + return ctx.Send(SectorChainPreCommitFailed{xerrors.Errorf("queuing precommit batch failed: %w", err)}) } - return ctx.Send(SectorPreCommitBatchSent{mcid}) + if res.Error != "" { + return ctx.Send(SectorChainPreCommitFailed{xerrors.Errorf("precommit batch error: %s", res.Error)}) + } + + if res.Msg == nil { + return ctx.Send(SectorChainPreCommitFailed{xerrors.Errorf("batch message was nil")}) + } + + return ctx.Send(SectorPreCommitBatchSent{*res.Msg}) } func (m *Sealing) handlePreCommitWait(ctx statemachine.Context, sector SectorInfo) error { diff --git a/node/impl/storminer.go b/node/impl/storminer.go index 9b6f65207..e10925927 100644 --- a/node/impl/storminer.go +++ b/node/impl/storminer.go @@ -375,7 +375,7 @@ func (sm *StorageMinerAPI) SectorTerminatePending(ctx context.Context) ([]abi.Se return sm.Miner.TerminatePending(ctx) } -func (sm *StorageMinerAPI) SectorPreCommitFlush(ctx context.Context) (*cid.Cid, error) { +func (sm *StorageMinerAPI) SectorPreCommitFlush(ctx context.Context) ([]sealiface.PreCommitBatchRes, error) { return sm.Miner.SectorPreCommitFlush(ctx) } diff --git a/storage/sealing.go b/storage/sealing.go index bd8241197..6a1195826 100644 --- a/storage/sealing.go +++ b/storage/sealing.go @@ -60,7 +60,7 @@ func (m *Miner) TerminatePending(ctx context.Context) ([]abi.SectorID, error) { return m.sealing.TerminatePending(ctx) } -func (m *Miner) SectorPreCommitFlush(ctx context.Context) (*cid.Cid, error) { +func (m *Miner) SectorPreCommitFlush(ctx context.Context) ([]sealiface.PreCommitBatchRes, error) { return m.sealing.SectorPreCommitFlush(ctx) }