precommit batcher: Improve error propagation
This commit is contained in:
parent
5c3ebd424c
commit
482e1110c2
@ -83,7 +83,7 @@ type StorageMiner interface {
|
||||
SectorMarkForUpgrade(ctx context.Context, id abi.SectorNumber) error //perm:admin
|
||||
// SectorPreCommitFlush immediately sends a PreCommit message with sectors batched for PreCommit.
|
||||
// Returns null if message wasn't sent
|
||||
SectorPreCommitFlush(ctx context.Context) (*cid.Cid, error) //perm:admin
|
||||
SectorPreCommitFlush(ctx context.Context) ([]sealiface.PreCommitBatchRes, error) //perm:admin
|
||||
// SectorPreCommitPending returns a list of pending PreCommit sectors to be sent in the next batch message
|
||||
SectorPreCommitPending(ctx context.Context) ([]abi.SectorID, error) //perm:admin
|
||||
// SectorCommitFlush immediately sends a Commit message with sectors aggregated for Commit.
|
||||
|
@ -650,7 +650,7 @@ type StorageMinerStruct struct {
|
||||
|
||||
SectorMarkForUpgrade func(p0 context.Context, p1 abi.SectorNumber) error `perm:"admin"`
|
||||
|
||||
SectorPreCommitFlush func(p0 context.Context) (*cid.Cid, error) `perm:"admin"`
|
||||
SectorPreCommitFlush func(p0 context.Context) ([]sealiface.PreCommitBatchRes, error) `perm:"admin"`
|
||||
|
||||
SectorPreCommitPending func(p0 context.Context) ([]abi.SectorID, error) `perm:"admin"`
|
||||
|
||||
@ -1952,7 +1952,7 @@ func (s *StorageMinerStruct) SectorMarkForUpgrade(p0 context.Context, p1 abi.Sec
|
||||
return s.Internal.SectorMarkForUpgrade(p0, p1)
|
||||
}
|
||||
|
||||
func (s *StorageMinerStruct) SectorPreCommitFlush(p0 context.Context) (*cid.Cid, error) {
|
||||
func (s *StorageMinerStruct) SectorPreCommitFlush(p0 context.Context) ([]sealiface.PreCommitBatchRes, error) {
|
||||
return s.Internal.SectorPreCommitFlush(p0)
|
||||
}
|
||||
|
||||
|
@ -169,7 +169,7 @@ func TestPledgeBatching(t *testing.T, b APIBuilder, blocktime time.Duration, nSe
|
||||
pcb, err := miner.SectorPreCommitFlush(ctx)
|
||||
require.NoError(t, err)
|
||||
if pcb != nil {
|
||||
fmt.Printf("PRECOMMIT BATCH: %s\n", *pcb)
|
||||
fmt.Printf("PRECOMMIT BATCH: %+v\n", pcb)
|
||||
}
|
||||
}
|
||||
|
||||
@ -319,7 +319,7 @@ func flushSealingBatches(t *testing.T, ctx context.Context, miner TestStorageNod
|
||||
pcb, err := miner.SectorPreCommitFlush(ctx)
|
||||
require.NoError(t, err)
|
||||
if pcb != nil {
|
||||
fmt.Printf("PRECOMMIT BATCH: %s\n", *pcb)
|
||||
fmt.Printf("PRECOMMIT BATCH: %+v\n", pcb)
|
||||
}
|
||||
|
||||
cb, err := miner.SectorCommitFlush(ctx)
|
||||
|
Binary file not shown.
@ -1059,15 +1059,26 @@ var sectorsBatchingPendingPreCommit = &cli.Command{
|
||||
ctx := lcli.ReqContext(cctx)
|
||||
|
||||
if cctx.Bool("publish-now") {
|
||||
cid, err := api.SectorPreCommitFlush(ctx)
|
||||
res, err := api.SectorPreCommitFlush(ctx)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("flush: %w", err)
|
||||
}
|
||||
if cid == nil {
|
||||
if res == nil {
|
||||
return xerrors.Errorf("no sectors to publish")
|
||||
}
|
||||
|
||||
fmt.Println("sector batch published: ", cid)
|
||||
for i, re := range res {
|
||||
fmt.Printf("Batch %d:\n", i)
|
||||
if re.Error != "" {
|
||||
fmt.Printf("\tError: %s\n", re.Error)
|
||||
} else {
|
||||
fmt.Printf("\tMessage: %s\n", re.Msg)
|
||||
}
|
||||
fmt.Printf("\tSectors:\n")
|
||||
for _, sector := range re.Sectors {
|
||||
fmt.Printf("\t\t%d\tOK\n", sector)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
86
extern/storage-sealing/precommit_batch.go
vendored
86
extern/storage-sealing/precommit_batch.go
vendored
@ -18,6 +18,7 @@ import (
|
||||
|
||||
"github.com/filecoin-project/lotus/api"
|
||||
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
|
||||
"github.com/filecoin-project/lotus/extern/storage-sealing/sealiface"
|
||||
)
|
||||
|
||||
type PreCommitBatcherApi interface {
|
||||
@ -41,10 +42,10 @@ type PreCommitBatcher struct {
|
||||
|
||||
deadlines map[abi.SectorNumber]time.Time
|
||||
todo map[abi.SectorNumber]*preCommitEntry
|
||||
waiting map[abi.SectorNumber][]chan cid.Cid
|
||||
waiting map[abi.SectorNumber][]chan sealiface.PreCommitBatchRes
|
||||
|
||||
notify, stop, stopped chan struct{}
|
||||
force chan chan *cid.Cid
|
||||
force chan chan []sealiface.PreCommitBatchRes
|
||||
lk sync.Mutex
|
||||
}
|
||||
|
||||
@ -59,10 +60,10 @@ func NewPreCommitBatcher(mctx context.Context, maddr address.Address, api PreCom
|
||||
|
||||
deadlines: map[abi.SectorNumber]time.Time{},
|
||||
todo: map[abi.SectorNumber]*preCommitEntry{},
|
||||
waiting: map[abi.SectorNumber][]chan cid.Cid{},
|
||||
waiting: map[abi.SectorNumber][]chan sealiface.PreCommitBatchRes{},
|
||||
|
||||
notify: make(chan struct{}, 1),
|
||||
force: make(chan chan *cid.Cid),
|
||||
force: make(chan chan []sealiface.PreCommitBatchRes),
|
||||
stop: make(chan struct{}),
|
||||
stopped: make(chan struct{}),
|
||||
}
|
||||
@ -73,8 +74,8 @@ func NewPreCommitBatcher(mctx context.Context, maddr address.Address, api PreCom
|
||||
}
|
||||
|
||||
func (b *PreCommitBatcher) run() {
|
||||
var forceRes chan *cid.Cid
|
||||
var lastMsg *cid.Cid
|
||||
var forceRes chan []sealiface.PreCommitBatchRes
|
||||
var lastRes []sealiface.PreCommitBatchRes
|
||||
|
||||
cfg, err := b.getConfig()
|
||||
if err != nil {
|
||||
@ -83,10 +84,10 @@ func (b *PreCommitBatcher) run() {
|
||||
|
||||
for {
|
||||
if forceRes != nil {
|
||||
forceRes <- lastMsg
|
||||
forceRes <- lastRes
|
||||
forceRes = nil
|
||||
}
|
||||
lastMsg = nil
|
||||
lastRes = nil
|
||||
|
||||
var sendAboveMax, sendAboveMin bool
|
||||
select {
|
||||
@ -102,7 +103,7 @@ func (b *PreCommitBatcher) run() {
|
||||
}
|
||||
|
||||
var err error
|
||||
lastMsg, err = b.processBatch(sendAboveMax, sendAboveMin)
|
||||
lastRes, err = b.maybeStartBatch(sendAboveMax, sendAboveMin)
|
||||
if err != nil {
|
||||
log.Warnw("PreCommitBatcher processBatch error", "error", err)
|
||||
}
|
||||
@ -150,10 +151,9 @@ func (b *PreCommitBatcher) batchWait(maxWait, slack time.Duration) <-chan time.T
|
||||
return time.After(wait)
|
||||
}
|
||||
|
||||
func (b *PreCommitBatcher) processBatch(notif, after bool) (*cid.Cid, error) {
|
||||
func (b *PreCommitBatcher) maybeStartBatch(notif, after bool) ([]sealiface.PreCommitBatchRes, error) {
|
||||
b.lk.Lock()
|
||||
defer b.lk.Unlock()
|
||||
params := miner5.PreCommitSectorBatchParams{}
|
||||
|
||||
total := len(b.todo)
|
||||
if total == 0 {
|
||||
@ -173,7 +173,35 @@ func (b *PreCommitBatcher) processBatch(notif, after bool) (*cid.Cid, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// todo support multiple batches
|
||||
res, err := b.processBatch(cfg)
|
||||
if err != nil && len(res) == 0 {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for _, r := range res {
|
||||
if err != nil {
|
||||
r.Error = err.Error()
|
||||
}
|
||||
|
||||
for _, sn := range r.Sectors {
|
||||
for _, ch := range b.waiting[sn] {
|
||||
ch <- r // buffered
|
||||
}
|
||||
|
||||
delete(b.waiting, sn)
|
||||
delete(b.todo, sn)
|
||||
delete(b.deadlines, sn)
|
||||
}
|
||||
}
|
||||
|
||||
return res, nil
|
||||
}
|
||||
|
||||
func (b *PreCommitBatcher) processBatch(cfg sealiface.Config) ([]sealiface.PreCommitBatchRes, error) {
|
||||
params := miner5.PreCommitSectorBatchParams{}
|
||||
deposit := big.Zero()
|
||||
var res sealiface.PreCommitBatchRes
|
||||
|
||||
for _, p := range b.todo {
|
||||
if len(params.Sectors) >= cfg.MaxPreCommitBatch {
|
||||
@ -181,54 +209,46 @@ func (b *PreCommitBatcher) processBatch(notif, after bool) (*cid.Cid, error) {
|
||||
break
|
||||
}
|
||||
|
||||
res.Sectors = append(res.Sectors, p.pci.SectorNumber)
|
||||
params.Sectors = append(params.Sectors, *p.pci)
|
||||
deposit = big.Add(deposit, p.deposit)
|
||||
}
|
||||
|
||||
enc := new(bytes.Buffer)
|
||||
if err := params.MarshalCBOR(enc); err != nil {
|
||||
return nil, xerrors.Errorf("couldn't serialize PreCommitSectorBatchParams: %w", err)
|
||||
return []sealiface.PreCommitBatchRes{res}, xerrors.Errorf("couldn't serialize PreCommitSectorBatchParams: %w", err)
|
||||
}
|
||||
|
||||
mi, err := b.api.StateMinerInfo(b.mctx, b.maddr, nil)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("couldn't get miner info: %w", err)
|
||||
return []sealiface.PreCommitBatchRes{res}, xerrors.Errorf("couldn't get miner info: %w", err)
|
||||
}
|
||||
|
||||
goodFunds := big.Add(deposit, b.feeCfg.MaxPreCommitGasFee)
|
||||
|
||||
from, _, err := b.addrSel(b.mctx, mi, api.PreCommitAddr, goodFunds, deposit)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("no good address found: %w", err)
|
||||
return []sealiface.PreCommitBatchRes{res}, xerrors.Errorf("no good address found: %w", err)
|
||||
}
|
||||
|
||||
mcid, err := b.api.SendMsg(b.mctx, from, b.maddr, miner.Methods.PreCommitSectorBatch, deposit, b.feeCfg.MaxPreCommitGasFee, enc.Bytes())
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("sending message failed: %w", err)
|
||||
return []sealiface.PreCommitBatchRes{res}, xerrors.Errorf("sending message failed: %w", err)
|
||||
}
|
||||
|
||||
log.Infow("Sent ProveCommitAggregate message", "cid", mcid, "from", from, "sectors", total)
|
||||
res.Msg = &mcid
|
||||
|
||||
for _, sector := range params.Sectors {
|
||||
sn := sector.SectorNumber
|
||||
log.Infow("Sent ProveCommitAggregate message", "cid", mcid, "from", from, "sectors", len(b.todo))
|
||||
|
||||
for _, ch := range b.waiting[sn] {
|
||||
ch <- mcid // buffered
|
||||
}
|
||||
delete(b.waiting, sn)
|
||||
delete(b.todo, sn)
|
||||
delete(b.deadlines, sn)
|
||||
}
|
||||
|
||||
return &mcid, nil
|
||||
return []sealiface.PreCommitBatchRes{res}, nil
|
||||
}
|
||||
|
||||
// register PreCommit, wait for batch message, return message CID
|
||||
func (b *PreCommitBatcher) AddPreCommit(ctx context.Context, s SectorInfo, deposit abi.TokenAmount, in *miner0.SectorPreCommitInfo) (mcid cid.Cid, err error) {
|
||||
func (b *PreCommitBatcher) AddPreCommit(ctx context.Context, s SectorInfo, deposit abi.TokenAmount, in *miner0.SectorPreCommitInfo) (res sealiface.PreCommitBatchRes, err error) {
|
||||
_, curEpoch, err := b.api.ChainHead(b.mctx)
|
||||
if err != nil {
|
||||
log.Errorf("getting chain head: %s", err)
|
||||
return cid.Undef, nil
|
||||
return sealiface.PreCommitBatchRes{}, err
|
||||
}
|
||||
|
||||
sn := s.SectorNumber
|
||||
@ -240,7 +260,7 @@ func (b *PreCommitBatcher) AddPreCommit(ctx context.Context, s SectorInfo, depos
|
||||
pci: in,
|
||||
}
|
||||
|
||||
sent := make(chan cid.Cid, 1)
|
||||
sent := make(chan sealiface.PreCommitBatchRes, 1)
|
||||
b.waiting[sn] = append(b.waiting[sn], sent)
|
||||
|
||||
select {
|
||||
@ -253,12 +273,12 @@ func (b *PreCommitBatcher) AddPreCommit(ctx context.Context, s SectorInfo, depos
|
||||
case c := <-sent:
|
||||
return c, nil
|
||||
case <-ctx.Done():
|
||||
return cid.Undef, ctx.Err()
|
||||
return sealiface.PreCommitBatchRes{}, ctx.Err()
|
||||
}
|
||||
}
|
||||
|
||||
func (b *PreCommitBatcher) Flush(ctx context.Context) (*cid.Cid, error) {
|
||||
resCh := make(chan *cid.Cid, 1)
|
||||
func (b *PreCommitBatcher) Flush(ctx context.Context) ([]sealiface.PreCommitBatchRes, error) {
|
||||
resCh := make(chan []sealiface.PreCommitBatchRes, 1)
|
||||
select {
|
||||
case b.force <- resCh:
|
||||
select {
|
||||
|
7
extern/storage-sealing/sealiface/batching.go
vendored
7
extern/storage-sealing/sealiface/batching.go
vendored
@ -14,3 +14,10 @@ type CommitBatchRes struct {
|
||||
Msg *cid.Cid
|
||||
Error string // if set, means that all sectors are failed, implies Msg==nil
|
||||
}
|
||||
|
||||
type PreCommitBatchRes struct {
|
||||
Sectors []abi.SectorNumber
|
||||
|
||||
Msg *cid.Cid
|
||||
Error string // if set, means that all sectors are failed, implies Msg==nil
|
||||
}
|
||||
|
2
extern/storage-sealing/sealing.go
vendored
2
extern/storage-sealing/sealing.go
vendored
@ -207,7 +207,7 @@ func (m *Sealing) TerminatePending(ctx context.Context) ([]abi.SectorID, error)
|
||||
return m.terminator.Pending(ctx)
|
||||
}
|
||||
|
||||
func (m *Sealing) SectorPreCommitFlush(ctx context.Context) (*cid.Cid, error) {
|
||||
func (m *Sealing) SectorPreCommitFlush(ctx context.Context) ([]sealiface.PreCommitBatchRes, error) {
|
||||
return m.precommiter.Flush(ctx)
|
||||
}
|
||||
|
||||
|
16
extern/storage-sealing/states_sealing.go
vendored
16
extern/storage-sealing/states_sealing.go
vendored
@ -355,7 +355,7 @@ func (m *Sealing) handlePreCommitting(ctx statemachine.Context, sector SectorInf
|
||||
|
||||
func (m *Sealing) handleSubmitPreCommitBatch(ctx statemachine.Context, sector SectorInfo) error {
|
||||
if sector.CommD == nil || sector.CommR == nil {
|
||||
return ctx.Send(SectorCommitFailed{xerrors.Errorf("sector had nil commR or commD")})
|
||||
return ctx.Send(SectorSealPreCommit1Failed{xerrors.Errorf("sector had nil commR or commD")})
|
||||
}
|
||||
|
||||
params, deposit, _, err := m.preCommitParams(ctx, sector)
|
||||
@ -363,12 +363,20 @@ func (m *Sealing) handleSubmitPreCommitBatch(ctx statemachine.Context, sector Se
|
||||
return err
|
||||
}
|
||||
|
||||
mcid, err := m.precommiter.AddPreCommit(ctx.Context(), sector, deposit, params)
|
||||
res, err := m.precommiter.AddPreCommit(ctx.Context(), sector, deposit, params)
|
||||
if err != nil {
|
||||
return ctx.Send(SectorCommitFailed{xerrors.Errorf("queuing precommit batch failed: %w", err)})
|
||||
return ctx.Send(SectorChainPreCommitFailed{xerrors.Errorf("queuing precommit batch failed: %w", err)})
|
||||
}
|
||||
|
||||
return ctx.Send(SectorPreCommitBatchSent{mcid})
|
||||
if res.Error != "" {
|
||||
return ctx.Send(SectorChainPreCommitFailed{xerrors.Errorf("precommit batch error: %s", res.Error)})
|
||||
}
|
||||
|
||||
if res.Msg == nil {
|
||||
return ctx.Send(SectorChainPreCommitFailed{xerrors.Errorf("batch message was nil")})
|
||||
}
|
||||
|
||||
return ctx.Send(SectorPreCommitBatchSent{*res.Msg})
|
||||
}
|
||||
|
||||
func (m *Sealing) handlePreCommitWait(ctx statemachine.Context, sector SectorInfo) error {
|
||||
|
@ -375,7 +375,7 @@ func (sm *StorageMinerAPI) SectorTerminatePending(ctx context.Context) ([]abi.Se
|
||||
return sm.Miner.TerminatePending(ctx)
|
||||
}
|
||||
|
||||
func (sm *StorageMinerAPI) SectorPreCommitFlush(ctx context.Context) (*cid.Cid, error) {
|
||||
func (sm *StorageMinerAPI) SectorPreCommitFlush(ctx context.Context) ([]sealiface.PreCommitBatchRes, error) {
|
||||
return sm.Miner.SectorPreCommitFlush(ctx)
|
||||
}
|
||||
|
||||
|
@ -60,7 +60,7 @@ func (m *Miner) TerminatePending(ctx context.Context) ([]abi.SectorID, error) {
|
||||
return m.sealing.TerminatePending(ctx)
|
||||
}
|
||||
|
||||
func (m *Miner) SectorPreCommitFlush(ctx context.Context) (*cid.Cid, error) {
|
||||
func (m *Miner) SectorPreCommitFlush(ctx context.Context) ([]sealiface.PreCommitBatchRes, error) {
|
||||
return m.sealing.SectorPreCommitFlush(ctx)
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user