Merge pull request #6367 from filecoin-project/fix/robustify-commit-batcher
Robustify commit batcher
This commit is contained in:
commit
d69032b2f0
@ -24,6 +24,7 @@ import (
|
|||||||
"github.com/filecoin-project/lotus/extern/sector-storage/fsutil"
|
"github.com/filecoin-project/lotus/extern/sector-storage/fsutil"
|
||||||
"github.com/filecoin-project/lotus/extern/sector-storage/stores"
|
"github.com/filecoin-project/lotus/extern/sector-storage/stores"
|
||||||
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
|
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
|
||||||
|
"github.com/filecoin-project/lotus/extern/storage-sealing/sealiface"
|
||||||
)
|
)
|
||||||
|
|
||||||
// StorageMiner is a low-level interface to the Filecoin network storage miner node
|
// StorageMiner is a low-level interface to the Filecoin network storage miner node
|
||||||
@ -82,12 +83,12 @@ type StorageMiner interface {
|
|||||||
SectorMarkForUpgrade(ctx context.Context, id abi.SectorNumber) error //perm:admin
|
SectorMarkForUpgrade(ctx context.Context, id abi.SectorNumber) error //perm:admin
|
||||||
// SectorPreCommitFlush immediately sends a PreCommit message with sectors batched for PreCommit.
|
// SectorPreCommitFlush immediately sends a PreCommit message with sectors batched for PreCommit.
|
||||||
// Returns null if message wasn't sent
|
// Returns null if message wasn't sent
|
||||||
SectorPreCommitFlush(ctx context.Context) (*cid.Cid, error) //perm:admin
|
SectorPreCommitFlush(ctx context.Context) ([]sealiface.PreCommitBatchRes, error) //perm:admin
|
||||||
// SectorPreCommitPending returns a list of pending PreCommit sectors to be sent in the next batch message
|
// SectorPreCommitPending returns a list of pending PreCommit sectors to be sent in the next batch message
|
||||||
SectorPreCommitPending(ctx context.Context) ([]abi.SectorID, error) //perm:admin
|
SectorPreCommitPending(ctx context.Context) ([]abi.SectorID, error) //perm:admin
|
||||||
// SectorCommitFlush immediately sends a Commit message with sectors aggregated for Commit.
|
// SectorCommitFlush immediately sends a Commit message with sectors aggregated for Commit.
|
||||||
// Returns null if message wasn't sent
|
// Returns null if message wasn't sent
|
||||||
SectorCommitFlush(ctx context.Context) (*cid.Cid, error) //perm:admin
|
SectorCommitFlush(ctx context.Context) ([]sealiface.CommitBatchRes, error) //perm:admin
|
||||||
// SectorCommitPending returns a list of pending Commit sectors to be sent in the next aggregate message
|
// SectorCommitPending returns a list of pending Commit sectors to be sent in the next aggregate message
|
||||||
SectorCommitPending(ctx context.Context) ([]abi.SectorID, error) //perm:admin
|
SectorCommitPending(ctx context.Context) ([]abi.SectorID, error) //perm:admin
|
||||||
|
|
||||||
|
@ -28,6 +28,7 @@ import (
|
|||||||
"github.com/filecoin-project/lotus/extern/sector-storage/sealtasks"
|
"github.com/filecoin-project/lotus/extern/sector-storage/sealtasks"
|
||||||
"github.com/filecoin-project/lotus/extern/sector-storage/stores"
|
"github.com/filecoin-project/lotus/extern/sector-storage/stores"
|
||||||
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
|
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
|
||||||
|
"github.com/filecoin-project/lotus/extern/storage-sealing/sealiface"
|
||||||
marketevents "github.com/filecoin-project/lotus/markets/loggers"
|
marketevents "github.com/filecoin-project/lotus/markets/loggers"
|
||||||
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
||||||
"github.com/filecoin-project/specs-storage/storage"
|
"github.com/filecoin-project/specs-storage/storage"
|
||||||
@ -639,7 +640,7 @@ type StorageMinerStruct struct {
|
|||||||
|
|
||||||
SealingSchedDiag func(p0 context.Context, p1 bool) (interface{}, error) `perm:"admin"`
|
SealingSchedDiag func(p0 context.Context, p1 bool) (interface{}, error) `perm:"admin"`
|
||||||
|
|
||||||
SectorCommitFlush func(p0 context.Context) (*cid.Cid, error) `perm:"admin"`
|
SectorCommitFlush func(p0 context.Context) ([]sealiface.CommitBatchRes, error) `perm:"admin"`
|
||||||
|
|
||||||
SectorCommitPending func(p0 context.Context) ([]abi.SectorID, error) `perm:"admin"`
|
SectorCommitPending func(p0 context.Context) ([]abi.SectorID, error) `perm:"admin"`
|
||||||
|
|
||||||
@ -649,7 +650,7 @@ type StorageMinerStruct struct {
|
|||||||
|
|
||||||
SectorMarkForUpgrade func(p0 context.Context, p1 abi.SectorNumber) error `perm:"admin"`
|
SectorMarkForUpgrade func(p0 context.Context, p1 abi.SectorNumber) error `perm:"admin"`
|
||||||
|
|
||||||
SectorPreCommitFlush func(p0 context.Context) (*cid.Cid, error) `perm:"admin"`
|
SectorPreCommitFlush func(p0 context.Context) ([]sealiface.PreCommitBatchRes, error) `perm:"admin"`
|
||||||
|
|
||||||
SectorPreCommitPending func(p0 context.Context) ([]abi.SectorID, error) `perm:"admin"`
|
SectorPreCommitPending func(p0 context.Context) ([]abi.SectorID, error) `perm:"admin"`
|
||||||
|
|
||||||
@ -1931,7 +1932,7 @@ func (s *StorageMinerStruct) SealingSchedDiag(p0 context.Context, p1 bool) (inte
|
|||||||
return s.Internal.SealingSchedDiag(p0, p1)
|
return s.Internal.SealingSchedDiag(p0, p1)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *StorageMinerStruct) SectorCommitFlush(p0 context.Context) (*cid.Cid, error) {
|
func (s *StorageMinerStruct) SectorCommitFlush(p0 context.Context) ([]sealiface.CommitBatchRes, error) {
|
||||||
return s.Internal.SectorCommitFlush(p0)
|
return s.Internal.SectorCommitFlush(p0)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1951,7 +1952,7 @@ func (s *StorageMinerStruct) SectorMarkForUpgrade(p0 context.Context, p1 abi.Sec
|
|||||||
return s.Internal.SectorMarkForUpgrade(p0, p1)
|
return s.Internal.SectorMarkForUpgrade(p0, p1)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *StorageMinerStruct) SectorPreCommitFlush(p0 context.Context) (*cid.Cid, error) {
|
func (s *StorageMinerStruct) SectorPreCommitFlush(p0 context.Context) ([]sealiface.PreCommitBatchRes, error) {
|
||||||
return s.Internal.SectorPreCommitFlush(p0)
|
return s.Internal.SectorPreCommitFlush(p0)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -169,7 +169,7 @@ func TestPledgeBatching(t *testing.T, b APIBuilder, blocktime time.Duration, nSe
|
|||||||
pcb, err := miner.SectorPreCommitFlush(ctx)
|
pcb, err := miner.SectorPreCommitFlush(ctx)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
if pcb != nil {
|
if pcb != nil {
|
||||||
fmt.Printf("PRECOMMIT BATCH: %s\n", *pcb)
|
fmt.Printf("PRECOMMIT BATCH: %+v\n", pcb)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -178,7 +178,7 @@ func TestPledgeBatching(t *testing.T, b APIBuilder, blocktime time.Duration, nSe
|
|||||||
cb, err := miner.SectorCommitFlush(ctx)
|
cb, err := miner.SectorCommitFlush(ctx)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
if cb != nil {
|
if cb != nil {
|
||||||
fmt.Printf("COMMIT BATCH: %s\n", *cb)
|
fmt.Printf("COMMIT BATCH: %+v\n", cb)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -319,13 +319,13 @@ func flushSealingBatches(t *testing.T, ctx context.Context, miner TestStorageNod
|
|||||||
pcb, err := miner.SectorPreCommitFlush(ctx)
|
pcb, err := miner.SectorPreCommitFlush(ctx)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
if pcb != nil {
|
if pcb != nil {
|
||||||
fmt.Printf("PRECOMMIT BATCH: %s\n", *pcb)
|
fmt.Printf("PRECOMMIT BATCH: %+v\n", pcb)
|
||||||
}
|
}
|
||||||
|
|
||||||
cb, err := miner.SectorCommitFlush(ctx)
|
cb, err := miner.SectorCommitFlush(ctx)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
if cb != nil {
|
if cb != nil {
|
||||||
fmt.Printf("COMMIT BATCH: %s\n", *cb)
|
fmt.Printf("COMMIT BATCH: %+v\n", cb)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Binary file not shown.
Binary file not shown.
Binary file not shown.
@ -980,7 +980,7 @@ var sectorsBatching = &cli.Command{
|
|||||||
}
|
}
|
||||||
|
|
||||||
var sectorsBatchingPendingCommit = &cli.Command{
|
var sectorsBatchingPendingCommit = &cli.Command{
|
||||||
Name: "pending-commit",
|
Name: "commit",
|
||||||
Usage: "list sectors waiting in commit batch queue",
|
Usage: "list sectors waiting in commit batch queue",
|
||||||
Flags: []cli.Flag{
|
Flags: []cli.Flag{
|
||||||
&cli.BoolFlag{
|
&cli.BoolFlag{
|
||||||
@ -997,15 +997,30 @@ var sectorsBatchingPendingCommit = &cli.Command{
|
|||||||
ctx := lcli.ReqContext(cctx)
|
ctx := lcli.ReqContext(cctx)
|
||||||
|
|
||||||
if cctx.Bool("publish-now") {
|
if cctx.Bool("publish-now") {
|
||||||
cid, err := api.SectorCommitFlush(ctx)
|
res, err := api.SectorCommitFlush(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("flush: %w", err)
|
return xerrors.Errorf("flush: %w", err)
|
||||||
}
|
}
|
||||||
if cid == nil {
|
if res == nil {
|
||||||
return xerrors.Errorf("no sectors to publish")
|
return xerrors.Errorf("no sectors to publish")
|
||||||
}
|
}
|
||||||
|
|
||||||
fmt.Println("sector batch published: ", cid)
|
for i, re := range res {
|
||||||
|
fmt.Printf("Batch %d:\n", i)
|
||||||
|
if re.Error != "" {
|
||||||
|
fmt.Printf("\tError: %s\n", re.Error)
|
||||||
|
} else {
|
||||||
|
fmt.Printf("\tMessage: %s\n", re.Msg)
|
||||||
|
}
|
||||||
|
fmt.Printf("\tSectors:\n")
|
||||||
|
for _, sector := range re.Sectors {
|
||||||
|
if e, found := re.FailedSectors[sector]; found {
|
||||||
|
fmt.Printf("\t\t%d\tERROR %s\n", sector, e)
|
||||||
|
} else {
|
||||||
|
fmt.Printf("\t\t%d\tOK\n", sector)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1027,7 +1042,7 @@ var sectorsBatchingPendingCommit = &cli.Command{
|
|||||||
}
|
}
|
||||||
|
|
||||||
var sectorsBatchingPendingPreCommit = &cli.Command{
|
var sectorsBatchingPendingPreCommit = &cli.Command{
|
||||||
Name: "pending-precommit",
|
Name: "precommit",
|
||||||
Usage: "list sectors waiting in precommit batch queue",
|
Usage: "list sectors waiting in precommit batch queue",
|
||||||
Flags: []cli.Flag{
|
Flags: []cli.Flag{
|
||||||
&cli.BoolFlag{
|
&cli.BoolFlag{
|
||||||
@ -1044,15 +1059,26 @@ var sectorsBatchingPendingPreCommit = &cli.Command{
|
|||||||
ctx := lcli.ReqContext(cctx)
|
ctx := lcli.ReqContext(cctx)
|
||||||
|
|
||||||
if cctx.Bool("publish-now") {
|
if cctx.Bool("publish-now") {
|
||||||
cid, err := api.SectorPreCommitFlush(ctx)
|
res, err := api.SectorPreCommitFlush(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("flush: %w", err)
|
return xerrors.Errorf("flush: %w", err)
|
||||||
}
|
}
|
||||||
if cid == nil {
|
if res == nil {
|
||||||
return xerrors.Errorf("no sectors to publish")
|
return xerrors.Errorf("no sectors to publish")
|
||||||
}
|
}
|
||||||
|
|
||||||
fmt.Println("sector batch published: ", cid)
|
for i, re := range res {
|
||||||
|
fmt.Printf("Batch %d:\n", i)
|
||||||
|
if re.Error != "" {
|
||||||
|
fmt.Printf("\tError: %s\n", re.Error)
|
||||||
|
} else {
|
||||||
|
fmt.Printf("\tMessage: %s\n", re.Msg)
|
||||||
|
}
|
||||||
|
fmt.Printf("\tSectors:\n")
|
||||||
|
for _, sector := range re.Sectors {
|
||||||
|
fmt.Printf("\t\t%d\tOK\n", sector)
|
||||||
|
}
|
||||||
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
215
extern/storage-sealing/commit_batch.go
vendored
215
extern/storage-sealing/commit_batch.go
vendored
@ -22,6 +22,7 @@ import (
|
|||||||
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
|
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
|
||||||
"github.com/filecoin-project/lotus/chain/actors/policy"
|
"github.com/filecoin-project/lotus/chain/actors/policy"
|
||||||
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
|
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
|
||||||
|
"github.com/filecoin-project/lotus/extern/storage-sealing/sealiface"
|
||||||
)
|
)
|
||||||
|
|
||||||
const arp = abi.RegisteredAggregationProof_SnarkPackV1
|
const arp = abi.RegisteredAggregationProof_SnarkPackV1
|
||||||
@ -30,6 +31,9 @@ type CommitBatcherApi interface {
|
|||||||
SendMsg(ctx context.Context, from, to address.Address, method abi.MethodNum, value, maxFee abi.TokenAmount, params []byte) (cid.Cid, error)
|
SendMsg(ctx context.Context, from, to address.Address, method abi.MethodNum, value, maxFee abi.TokenAmount, params []byte) (cid.Cid, error)
|
||||||
StateMinerInfo(context.Context, address.Address, TipSetToken) (miner.MinerInfo, error)
|
StateMinerInfo(context.Context, address.Address, TipSetToken) (miner.MinerInfo, error)
|
||||||
ChainHead(ctx context.Context) (TipSetToken, abi.ChainEpoch, error)
|
ChainHead(ctx context.Context) (TipSetToken, abi.ChainEpoch, error)
|
||||||
|
|
||||||
|
StateSectorPreCommitInfo(ctx context.Context, maddr address.Address, sectorNumber abi.SectorNumber, tok TipSetToken) (*miner.SectorPreCommitOnChainInfo, error)
|
||||||
|
StateMinerInitialPledgeCollateral(context.Context, address.Address, miner.SectorPreCommitInfo, TipSetToken) (big.Int, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
type AggregateInput struct {
|
type AggregateInput struct {
|
||||||
@ -49,10 +53,10 @@ type CommitBatcher struct {
|
|||||||
|
|
||||||
deadlines map[abi.SectorNumber]time.Time
|
deadlines map[abi.SectorNumber]time.Time
|
||||||
todo map[abi.SectorNumber]AggregateInput
|
todo map[abi.SectorNumber]AggregateInput
|
||||||
waiting map[abi.SectorNumber][]chan cid.Cid
|
waiting map[abi.SectorNumber][]chan sealiface.CommitBatchRes
|
||||||
|
|
||||||
notify, stop, stopped chan struct{}
|
notify, stop, stopped chan struct{}
|
||||||
force chan chan *cid.Cid
|
force chan chan []sealiface.CommitBatchRes
|
||||||
lk sync.Mutex
|
lk sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -68,10 +72,10 @@ func NewCommitBatcher(mctx context.Context, maddr address.Address, api CommitBat
|
|||||||
|
|
||||||
deadlines: map[abi.SectorNumber]time.Time{},
|
deadlines: map[abi.SectorNumber]time.Time{},
|
||||||
todo: map[abi.SectorNumber]AggregateInput{},
|
todo: map[abi.SectorNumber]AggregateInput{},
|
||||||
waiting: map[abi.SectorNumber][]chan cid.Cid{},
|
waiting: map[abi.SectorNumber][]chan sealiface.CommitBatchRes{},
|
||||||
|
|
||||||
notify: make(chan struct{}, 1),
|
notify: make(chan struct{}, 1),
|
||||||
force: make(chan chan *cid.Cid),
|
force: make(chan chan []sealiface.CommitBatchRes),
|
||||||
stop: make(chan struct{}),
|
stop: make(chan struct{}),
|
||||||
stopped: make(chan struct{}),
|
stopped: make(chan struct{}),
|
||||||
}
|
}
|
||||||
@ -82,8 +86,8 @@ func NewCommitBatcher(mctx context.Context, maddr address.Address, api CommitBat
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (b *CommitBatcher) run() {
|
func (b *CommitBatcher) run() {
|
||||||
var forceRes chan *cid.Cid
|
var forceRes chan []sealiface.CommitBatchRes
|
||||||
var lastMsg *cid.Cid
|
var lastMsg []sealiface.CommitBatchRes
|
||||||
|
|
||||||
cfg, err := b.getConfig()
|
cfg, err := b.getConfig()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -111,7 +115,7 @@ func (b *CommitBatcher) run() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
var err error
|
var err error
|
||||||
lastMsg, err = b.processBatch(sendAboveMax, sendAboveMin)
|
lastMsg, err = b.maybeStartBatch(sendAboveMax, sendAboveMin)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warnw("CommitBatcher processBatch error", "error", err)
|
log.Warnw("CommitBatcher processBatch error", "error", err)
|
||||||
}
|
}
|
||||||
@ -159,12 +163,9 @@ func (b *CommitBatcher) batchWait(maxWait, slack time.Duration) <-chan time.Time
|
|||||||
return time.After(wait)
|
return time.After(wait)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *CommitBatcher) processBatch(notif, after bool) (*cid.Cid, error) {
|
func (b *CommitBatcher) maybeStartBatch(notif, after bool) ([]sealiface.CommitBatchRes, error) {
|
||||||
b.lk.Lock()
|
b.lk.Lock()
|
||||||
defer b.lk.Unlock()
|
defer b.lk.Unlock()
|
||||||
params := miner5.ProveCommitAggregateParams{
|
|
||||||
SectorNumbers: bitfield.New(),
|
|
||||||
}
|
|
||||||
|
|
||||||
total := len(b.todo)
|
total := len(b.todo)
|
||||||
if total == 0 {
|
if total == 0 {
|
||||||
@ -184,8 +185,53 @@ func (b *CommitBatcher) processBatch(notif, after bool) (*cid.Cid, error) {
|
|||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var res []sealiface.CommitBatchRes
|
||||||
|
|
||||||
|
if total < cfg.MinCommitBatch || total < miner5.MinAggregatedSectors {
|
||||||
|
res, err = b.processIndividually()
|
||||||
|
} else {
|
||||||
|
res, err = b.processBatch(cfg)
|
||||||
|
}
|
||||||
|
if err != nil && len(res) == 0 {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, r := range res {
|
||||||
|
if err != nil {
|
||||||
|
r.Error = err.Error()
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, sn := range r.Sectors {
|
||||||
|
for _, ch := range b.waiting[sn] {
|
||||||
|
ch <- r // buffered
|
||||||
|
}
|
||||||
|
|
||||||
|
delete(b.waiting, sn)
|
||||||
|
delete(b.todo, sn)
|
||||||
|
delete(b.deadlines, sn)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return res, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *CommitBatcher) processBatch(cfg sealiface.Config) ([]sealiface.CommitBatchRes, error) {
|
||||||
|
tok, _, err := b.api.ChainHead(b.mctx)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
total := len(b.todo)
|
||||||
|
|
||||||
|
var res sealiface.CommitBatchRes
|
||||||
|
|
||||||
|
params := miner5.ProveCommitAggregateParams{
|
||||||
|
SectorNumbers: bitfield.New(),
|
||||||
|
}
|
||||||
|
|
||||||
proofs := make([][]byte, 0, total)
|
proofs := make([][]byte, 0, total)
|
||||||
infos := make([]proof5.AggregateSealVerifyInfo, 0, total)
|
infos := make([]proof5.AggregateSealVerifyInfo, 0, total)
|
||||||
|
collateral := big.Zero()
|
||||||
|
|
||||||
for id, p := range b.todo {
|
for id, p := range b.todo {
|
||||||
if len(infos) >= cfg.MaxCommitBatch {
|
if len(infos) >= cfg.MaxCommitBatch {
|
||||||
@ -193,6 +239,15 @@ func (b *CommitBatcher) processBatch(notif, after bool) (*cid.Cid, error) {
|
|||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
|
sc, err := b.getSectorCollateral(id, tok)
|
||||||
|
if err != nil {
|
||||||
|
res.FailedSectors[id] = err.Error()
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
collateral = big.Add(collateral, sc)
|
||||||
|
|
||||||
|
res.Sectors = append(res.Sectors, id)
|
||||||
params.SectorNumbers.Set(uint64(id))
|
params.SectorNumbers.Set(uint64(id))
|
||||||
infos = append(infos, p.info)
|
infos = append(infos, p.info)
|
||||||
}
|
}
|
||||||
@ -207,7 +262,7 @@ func (b *CommitBatcher) processBatch(notif, after bool) (*cid.Cid, error) {
|
|||||||
|
|
||||||
mid, err := address.IDFromAddress(b.maddr)
|
mid, err := address.IDFromAddress(b.maddr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, xerrors.Errorf("getting miner id: %w", err)
|
return []sealiface.CommitBatchRes{res}, xerrors.Errorf("getting miner id: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
params.AggregateProof, err = b.prover.AggregateSealProofs(proof5.AggregateSealVerifyProofAndInfos{
|
params.AggregateProof, err = b.prover.AggregateSealProofs(proof5.AggregateSealVerifyProofAndInfos{
|
||||||
@ -217,55 +272,107 @@ func (b *CommitBatcher) processBatch(notif, after bool) (*cid.Cid, error) {
|
|||||||
Infos: infos,
|
Infos: infos,
|
||||||
}, proofs)
|
}, proofs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, xerrors.Errorf("aggregating proofs: %w", err)
|
return []sealiface.CommitBatchRes{res}, xerrors.Errorf("aggregating proofs: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
enc := new(bytes.Buffer)
|
enc := new(bytes.Buffer)
|
||||||
if err := params.MarshalCBOR(enc); err != nil {
|
if err := params.MarshalCBOR(enc); err != nil {
|
||||||
return nil, xerrors.Errorf("couldn't serialize ProveCommitAggregateParams: %w", err)
|
return []sealiface.CommitBatchRes{res}, xerrors.Errorf("couldn't serialize ProveCommitAggregateParams: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
mi, err := b.api.StateMinerInfo(b.mctx, b.maddr, nil)
|
||||||
|
if err != nil {
|
||||||
|
return []sealiface.CommitBatchRes{res}, xerrors.Errorf("couldn't get miner info: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
goodFunds := big.Add(b.feeCfg.MaxCommitGasFee, collateral)
|
||||||
|
|
||||||
|
from, _, err := b.addrSel(b.mctx, mi, api.CommitAddr, goodFunds, collateral)
|
||||||
|
if err != nil {
|
||||||
|
return []sealiface.CommitBatchRes{res}, xerrors.Errorf("no good address found: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
mcid, err := b.api.SendMsg(b.mctx, from, b.maddr, miner.Methods.ProveCommitAggregate, collateral, b.feeCfg.MaxCommitGasFee, enc.Bytes())
|
||||||
|
if err != nil {
|
||||||
|
return []sealiface.CommitBatchRes{res}, xerrors.Errorf("sending message failed: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
res.Msg = &mcid
|
||||||
|
|
||||||
|
log.Infow("Sent ProveCommitAggregate message", "cid", mcid, "from", from, "todo", total, "sectors", len(infos))
|
||||||
|
|
||||||
|
return []sealiface.CommitBatchRes{res}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *CommitBatcher) processIndividually() ([]sealiface.CommitBatchRes, error) {
|
||||||
mi, err := b.api.StateMinerInfo(b.mctx, b.maddr, nil)
|
mi, err := b.api.StateMinerInfo(b.mctx, b.maddr, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, xerrors.Errorf("couldn't get miner info: %w", err)
|
return nil, xerrors.Errorf("couldn't get miner info: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
from, _, err := b.addrSel(b.mctx, mi, api.CommitAddr, b.feeCfg.MaxCommitGasFee, b.feeCfg.MaxCommitGasFee)
|
tok, _, err := b.api.ChainHead(b.mctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, xerrors.Errorf("no good address found: %w", err)
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
mcid, err := b.api.SendMsg(b.mctx, from, b.maddr, miner.Methods.ProveCommitAggregate, big.Zero(), b.feeCfg.MaxCommitGasFee, enc.Bytes())
|
var res []sealiface.CommitBatchRes
|
||||||
if err != nil {
|
|
||||||
return nil, xerrors.Errorf("sending message failed: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
log.Infow("Sent ProveCommitAggregate message", "cid", mcid, "from", from, "todo", total, "sectors", len(infos))
|
for sn, info := range b.todo {
|
||||||
|
r := sealiface.CommitBatchRes{
|
||||||
err = params.SectorNumbers.ForEach(func(us uint64) error {
|
Sectors: []abi.SectorNumber{sn},
|
||||||
sn := abi.SectorNumber(us)
|
|
||||||
|
|
||||||
for _, ch := range b.waiting[sn] {
|
|
||||||
ch <- mcid // buffered
|
|
||||||
}
|
}
|
||||||
delete(b.waiting, sn)
|
|
||||||
delete(b.todo, sn)
|
mcid, err := b.processSingle(mi, sn, info, tok)
|
||||||
delete(b.deadlines, sn)
|
if err != nil {
|
||||||
return nil
|
log.Errorf("process single error: %+v", err) // todo: return to user
|
||||||
})
|
r.FailedSectors[sn] = err.Error()
|
||||||
if err != nil {
|
} else {
|
||||||
return nil, xerrors.Errorf("done sectors foreach: %w", err)
|
r.Msg = &mcid
|
||||||
|
}
|
||||||
|
|
||||||
|
res = append(res, r)
|
||||||
}
|
}
|
||||||
|
|
||||||
return &mcid, nil
|
return res, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *CommitBatcher) processSingle(mi miner.MinerInfo, sn abi.SectorNumber, info AggregateInput, tok TipSetToken) (cid.Cid, error) {
|
||||||
|
enc := new(bytes.Buffer)
|
||||||
|
params := &miner.ProveCommitSectorParams{
|
||||||
|
SectorNumber: sn,
|
||||||
|
Proof: info.proof,
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := params.MarshalCBOR(enc); err != nil {
|
||||||
|
return cid.Undef, xerrors.Errorf("marshaling commit params: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
collateral, err := b.getSectorCollateral(sn, tok)
|
||||||
|
if err != nil {
|
||||||
|
return cid.Undef, err
|
||||||
|
}
|
||||||
|
|
||||||
|
goodFunds := big.Add(collateral, b.feeCfg.MaxCommitGasFee)
|
||||||
|
|
||||||
|
from, _, err := b.addrSel(b.mctx, mi, api.CommitAddr, goodFunds, collateral)
|
||||||
|
if err != nil {
|
||||||
|
return cid.Undef, xerrors.Errorf("no good address to send commit message from: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
mcid, err := b.api.SendMsg(b.mctx, from, b.maddr, miner.Methods.ProveCommitSector, collateral, b.feeCfg.MaxCommitGasFee, enc.Bytes())
|
||||||
|
if err != nil {
|
||||||
|
return cid.Undef, xerrors.Errorf("pushing message to mpool: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return mcid, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// register commit, wait for batch message, return message CID
|
// register commit, wait for batch message, return message CID
|
||||||
func (b *CommitBatcher) AddCommit(ctx context.Context, s SectorInfo, in AggregateInput) (mcid cid.Cid, err error) {
|
func (b *CommitBatcher) AddCommit(ctx context.Context, s SectorInfo, in AggregateInput) (res sealiface.CommitBatchRes, err error) {
|
||||||
_, curEpoch, err := b.api.ChainHead(b.mctx)
|
_, curEpoch, err := b.api.ChainHead(b.mctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("getting chain head: %s", err)
|
log.Errorf("getting chain head: %s", err)
|
||||||
return cid.Undef, nil
|
return sealiface.CommitBatchRes{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
sn := s.SectorNumber
|
sn := s.SectorNumber
|
||||||
@ -274,7 +381,7 @@ func (b *CommitBatcher) AddCommit(ctx context.Context, s SectorInfo, in Aggregat
|
|||||||
b.deadlines[sn] = getSectorDeadline(curEpoch, s)
|
b.deadlines[sn] = getSectorDeadline(curEpoch, s)
|
||||||
b.todo[sn] = in
|
b.todo[sn] = in
|
||||||
|
|
||||||
sent := make(chan cid.Cid, 1)
|
sent := make(chan sealiface.CommitBatchRes, 1)
|
||||||
b.waiting[sn] = append(b.waiting[sn], sent)
|
b.waiting[sn] = append(b.waiting[sn], sent)
|
||||||
|
|
||||||
select {
|
select {
|
||||||
@ -284,15 +391,15 @@ func (b *CommitBatcher) AddCommit(ctx context.Context, s SectorInfo, in Aggregat
|
|||||||
b.lk.Unlock()
|
b.lk.Unlock()
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case c := <-sent:
|
case r := <-sent:
|
||||||
return c, nil
|
return r, nil
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return cid.Undef, ctx.Err()
|
return sealiface.CommitBatchRes{}, ctx.Err()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *CommitBatcher) Flush(ctx context.Context) (*cid.Cid, error) {
|
func (b *CommitBatcher) Flush(ctx context.Context) ([]sealiface.CommitBatchRes, error) {
|
||||||
resCh := make(chan *cid.Cid, 1)
|
resCh := make(chan []sealiface.CommitBatchRes, 1)
|
||||||
select {
|
select {
|
||||||
case b.force <- resCh:
|
case b.force <- resCh:
|
||||||
select {
|
select {
|
||||||
@ -364,3 +471,25 @@ func getSectorDeadline(curEpoch abi.ChainEpoch, si SectorInfo) time.Time {
|
|||||||
|
|
||||||
return time.Now().Add(time.Duration(deadlineEpoch-curEpoch) * time.Duration(build.BlockDelaySecs) * time.Second)
|
return time.Now().Add(time.Duration(deadlineEpoch-curEpoch) * time.Duration(build.BlockDelaySecs) * time.Second)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (b *CommitBatcher) getSectorCollateral(sn abi.SectorNumber, tok TipSetToken) (abi.TokenAmount, error) {
|
||||||
|
pci, err := b.api.StateSectorPreCommitInfo(b.mctx, b.maddr, sn, tok)
|
||||||
|
if err != nil {
|
||||||
|
return big.Zero(), xerrors.Errorf("getting precommit info: %w", err)
|
||||||
|
}
|
||||||
|
if pci == nil {
|
||||||
|
return big.Zero(), xerrors.Errorf("precommit info not found on chain")
|
||||||
|
}
|
||||||
|
|
||||||
|
collateral, err := b.api.StateMinerInitialPledgeCollateral(b.mctx, b.maddr, pci.Info, tok)
|
||||||
|
if err != nil {
|
||||||
|
return big.Zero(), xerrors.Errorf("getting initial pledge collateral: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
collateral = big.Sub(collateral, pci.PreCommitDeposit)
|
||||||
|
if collateral.LessThan(big.Zero()) {
|
||||||
|
collateral = big.Zero()
|
||||||
|
}
|
||||||
|
|
||||||
|
return collateral, nil
|
||||||
|
}
|
||||||
|
86
extern/storage-sealing/precommit_batch.go
vendored
86
extern/storage-sealing/precommit_batch.go
vendored
@ -18,6 +18,7 @@ import (
|
|||||||
|
|
||||||
"github.com/filecoin-project/lotus/api"
|
"github.com/filecoin-project/lotus/api"
|
||||||
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
|
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
|
||||||
|
"github.com/filecoin-project/lotus/extern/storage-sealing/sealiface"
|
||||||
)
|
)
|
||||||
|
|
||||||
type PreCommitBatcherApi interface {
|
type PreCommitBatcherApi interface {
|
||||||
@ -41,10 +42,10 @@ type PreCommitBatcher struct {
|
|||||||
|
|
||||||
deadlines map[abi.SectorNumber]time.Time
|
deadlines map[abi.SectorNumber]time.Time
|
||||||
todo map[abi.SectorNumber]*preCommitEntry
|
todo map[abi.SectorNumber]*preCommitEntry
|
||||||
waiting map[abi.SectorNumber][]chan cid.Cid
|
waiting map[abi.SectorNumber][]chan sealiface.PreCommitBatchRes
|
||||||
|
|
||||||
notify, stop, stopped chan struct{}
|
notify, stop, stopped chan struct{}
|
||||||
force chan chan *cid.Cid
|
force chan chan []sealiface.PreCommitBatchRes
|
||||||
lk sync.Mutex
|
lk sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -59,10 +60,10 @@ func NewPreCommitBatcher(mctx context.Context, maddr address.Address, api PreCom
|
|||||||
|
|
||||||
deadlines: map[abi.SectorNumber]time.Time{},
|
deadlines: map[abi.SectorNumber]time.Time{},
|
||||||
todo: map[abi.SectorNumber]*preCommitEntry{},
|
todo: map[abi.SectorNumber]*preCommitEntry{},
|
||||||
waiting: map[abi.SectorNumber][]chan cid.Cid{},
|
waiting: map[abi.SectorNumber][]chan sealiface.PreCommitBatchRes{},
|
||||||
|
|
||||||
notify: make(chan struct{}, 1),
|
notify: make(chan struct{}, 1),
|
||||||
force: make(chan chan *cid.Cid),
|
force: make(chan chan []sealiface.PreCommitBatchRes),
|
||||||
stop: make(chan struct{}),
|
stop: make(chan struct{}),
|
||||||
stopped: make(chan struct{}),
|
stopped: make(chan struct{}),
|
||||||
}
|
}
|
||||||
@ -73,8 +74,8 @@ func NewPreCommitBatcher(mctx context.Context, maddr address.Address, api PreCom
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (b *PreCommitBatcher) run() {
|
func (b *PreCommitBatcher) run() {
|
||||||
var forceRes chan *cid.Cid
|
var forceRes chan []sealiface.PreCommitBatchRes
|
||||||
var lastMsg *cid.Cid
|
var lastRes []sealiface.PreCommitBatchRes
|
||||||
|
|
||||||
cfg, err := b.getConfig()
|
cfg, err := b.getConfig()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -83,10 +84,10 @@ func (b *PreCommitBatcher) run() {
|
|||||||
|
|
||||||
for {
|
for {
|
||||||
if forceRes != nil {
|
if forceRes != nil {
|
||||||
forceRes <- lastMsg
|
forceRes <- lastRes
|
||||||
forceRes = nil
|
forceRes = nil
|
||||||
}
|
}
|
||||||
lastMsg = nil
|
lastRes = nil
|
||||||
|
|
||||||
var sendAboveMax, sendAboveMin bool
|
var sendAboveMax, sendAboveMin bool
|
||||||
select {
|
select {
|
||||||
@ -102,7 +103,7 @@ func (b *PreCommitBatcher) run() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
var err error
|
var err error
|
||||||
lastMsg, err = b.processBatch(sendAboveMax, sendAboveMin)
|
lastRes, err = b.maybeStartBatch(sendAboveMax, sendAboveMin)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warnw("PreCommitBatcher processBatch error", "error", err)
|
log.Warnw("PreCommitBatcher processBatch error", "error", err)
|
||||||
}
|
}
|
||||||
@ -150,10 +151,9 @@ func (b *PreCommitBatcher) batchWait(maxWait, slack time.Duration) <-chan time.T
|
|||||||
return time.After(wait)
|
return time.After(wait)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *PreCommitBatcher) processBatch(notif, after bool) (*cid.Cid, error) {
|
func (b *PreCommitBatcher) maybeStartBatch(notif, after bool) ([]sealiface.PreCommitBatchRes, error) {
|
||||||
b.lk.Lock()
|
b.lk.Lock()
|
||||||
defer b.lk.Unlock()
|
defer b.lk.Unlock()
|
||||||
params := miner5.PreCommitSectorBatchParams{}
|
|
||||||
|
|
||||||
total := len(b.todo)
|
total := len(b.todo)
|
||||||
if total == 0 {
|
if total == 0 {
|
||||||
@ -173,7 +173,35 @@ func (b *PreCommitBatcher) processBatch(notif, after bool) (*cid.Cid, error) {
|
|||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// todo support multiple batches
|
||||||
|
res, err := b.processBatch(cfg)
|
||||||
|
if err != nil && len(res) == 0 {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, r := range res {
|
||||||
|
if err != nil {
|
||||||
|
r.Error = err.Error()
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, sn := range r.Sectors {
|
||||||
|
for _, ch := range b.waiting[sn] {
|
||||||
|
ch <- r // buffered
|
||||||
|
}
|
||||||
|
|
||||||
|
delete(b.waiting, sn)
|
||||||
|
delete(b.todo, sn)
|
||||||
|
delete(b.deadlines, sn)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return res, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *PreCommitBatcher) processBatch(cfg sealiface.Config) ([]sealiface.PreCommitBatchRes, error) {
|
||||||
|
params := miner5.PreCommitSectorBatchParams{}
|
||||||
deposit := big.Zero()
|
deposit := big.Zero()
|
||||||
|
var res sealiface.PreCommitBatchRes
|
||||||
|
|
||||||
for _, p := range b.todo {
|
for _, p := range b.todo {
|
||||||
if len(params.Sectors) >= cfg.MaxPreCommitBatch {
|
if len(params.Sectors) >= cfg.MaxPreCommitBatch {
|
||||||
@ -181,54 +209,46 @@ func (b *PreCommitBatcher) processBatch(notif, after bool) (*cid.Cid, error) {
|
|||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
|
res.Sectors = append(res.Sectors, p.pci.SectorNumber)
|
||||||
params.Sectors = append(params.Sectors, *p.pci)
|
params.Sectors = append(params.Sectors, *p.pci)
|
||||||
deposit = big.Add(deposit, p.deposit)
|
deposit = big.Add(deposit, p.deposit)
|
||||||
}
|
}
|
||||||
|
|
||||||
enc := new(bytes.Buffer)
|
enc := new(bytes.Buffer)
|
||||||
if err := params.MarshalCBOR(enc); err != nil {
|
if err := params.MarshalCBOR(enc); err != nil {
|
||||||
return nil, xerrors.Errorf("couldn't serialize PreCommitSectorBatchParams: %w", err)
|
return []sealiface.PreCommitBatchRes{res}, xerrors.Errorf("couldn't serialize PreCommitSectorBatchParams: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
mi, err := b.api.StateMinerInfo(b.mctx, b.maddr, nil)
|
mi, err := b.api.StateMinerInfo(b.mctx, b.maddr, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, xerrors.Errorf("couldn't get miner info: %w", err)
|
return []sealiface.PreCommitBatchRes{res}, xerrors.Errorf("couldn't get miner info: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
goodFunds := big.Add(deposit, b.feeCfg.MaxPreCommitGasFee)
|
goodFunds := big.Add(deposit, b.feeCfg.MaxPreCommitGasFee)
|
||||||
|
|
||||||
from, _, err := b.addrSel(b.mctx, mi, api.PreCommitAddr, goodFunds, deposit)
|
from, _, err := b.addrSel(b.mctx, mi, api.PreCommitAddr, goodFunds, deposit)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, xerrors.Errorf("no good address found: %w", err)
|
return []sealiface.PreCommitBatchRes{res}, xerrors.Errorf("no good address found: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
mcid, err := b.api.SendMsg(b.mctx, from, b.maddr, miner.Methods.PreCommitSectorBatch, deposit, b.feeCfg.MaxPreCommitGasFee, enc.Bytes())
|
mcid, err := b.api.SendMsg(b.mctx, from, b.maddr, miner.Methods.PreCommitSectorBatch, deposit, b.feeCfg.MaxPreCommitGasFee, enc.Bytes())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, xerrors.Errorf("sending message failed: %w", err)
|
return []sealiface.PreCommitBatchRes{res}, xerrors.Errorf("sending message failed: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Infow("Sent ProveCommitAggregate message", "cid", mcid, "from", from, "sectors", total)
|
res.Msg = &mcid
|
||||||
|
|
||||||
for _, sector := range params.Sectors {
|
log.Infow("Sent ProveCommitAggregate message", "cid", mcid, "from", from, "sectors", len(b.todo))
|
||||||
sn := sector.SectorNumber
|
|
||||||
|
|
||||||
for _, ch := range b.waiting[sn] {
|
return []sealiface.PreCommitBatchRes{res}, nil
|
||||||
ch <- mcid // buffered
|
|
||||||
}
|
|
||||||
delete(b.waiting, sn)
|
|
||||||
delete(b.todo, sn)
|
|
||||||
delete(b.deadlines, sn)
|
|
||||||
}
|
|
||||||
|
|
||||||
return &mcid, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// register PreCommit, wait for batch message, return message CID
|
// register PreCommit, wait for batch message, return message CID
|
||||||
func (b *PreCommitBatcher) AddPreCommit(ctx context.Context, s SectorInfo, deposit abi.TokenAmount, in *miner0.SectorPreCommitInfo) (mcid cid.Cid, err error) {
|
func (b *PreCommitBatcher) AddPreCommit(ctx context.Context, s SectorInfo, deposit abi.TokenAmount, in *miner0.SectorPreCommitInfo) (res sealiface.PreCommitBatchRes, err error) {
|
||||||
_, curEpoch, err := b.api.ChainHead(b.mctx)
|
_, curEpoch, err := b.api.ChainHead(b.mctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("getting chain head: %s", err)
|
log.Errorf("getting chain head: %s", err)
|
||||||
return cid.Undef, nil
|
return sealiface.PreCommitBatchRes{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
sn := s.SectorNumber
|
sn := s.SectorNumber
|
||||||
@ -240,7 +260,7 @@ func (b *PreCommitBatcher) AddPreCommit(ctx context.Context, s SectorInfo, depos
|
|||||||
pci: in,
|
pci: in,
|
||||||
}
|
}
|
||||||
|
|
||||||
sent := make(chan cid.Cid, 1)
|
sent := make(chan sealiface.PreCommitBatchRes, 1)
|
||||||
b.waiting[sn] = append(b.waiting[sn], sent)
|
b.waiting[sn] = append(b.waiting[sn], sent)
|
||||||
|
|
||||||
select {
|
select {
|
||||||
@ -253,12 +273,12 @@ func (b *PreCommitBatcher) AddPreCommit(ctx context.Context, s SectorInfo, depos
|
|||||||
case c := <-sent:
|
case c := <-sent:
|
||||||
return c, nil
|
return c, nil
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return cid.Undef, ctx.Err()
|
return sealiface.PreCommitBatchRes{}, ctx.Err()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *PreCommitBatcher) Flush(ctx context.Context) (*cid.Cid, error) {
|
func (b *PreCommitBatcher) Flush(ctx context.Context) ([]sealiface.PreCommitBatchRes, error) {
|
||||||
resCh := make(chan *cid.Cid, 1)
|
resCh := make(chan []sealiface.PreCommitBatchRes, 1)
|
||||||
select {
|
select {
|
||||||
case b.force <- resCh:
|
case b.force <- resCh:
|
||||||
select {
|
select {
|
||||||
|
23
extern/storage-sealing/sealiface/batching.go
vendored
Normal file
23
extern/storage-sealing/sealiface/batching.go
vendored
Normal file
@ -0,0 +1,23 @@
|
|||||||
|
package sealiface
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/ipfs/go-cid"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/go-state-types/abi"
|
||||||
|
)
|
||||||
|
|
||||||
|
type CommitBatchRes struct {
|
||||||
|
Sectors []abi.SectorNumber
|
||||||
|
|
||||||
|
FailedSectors map[abi.SectorNumber]string
|
||||||
|
|
||||||
|
Msg *cid.Cid
|
||||||
|
Error string // if set, means that all sectors are failed, implies Msg==nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type PreCommitBatchRes struct {
|
||||||
|
Sectors []abi.SectorNumber
|
||||||
|
|
||||||
|
Msg *cid.Cid
|
||||||
|
Error string // if set, means that all sectors are failed, implies Msg==nil
|
||||||
|
}
|
5
extern/storage-sealing/sealing.go
vendored
5
extern/storage-sealing/sealing.go
vendored
@ -27,6 +27,7 @@ import (
|
|||||||
"github.com/filecoin-project/lotus/chain/types"
|
"github.com/filecoin-project/lotus/chain/types"
|
||||||
sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage"
|
sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage"
|
||||||
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
|
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
|
||||||
|
"github.com/filecoin-project/lotus/extern/storage-sealing/sealiface"
|
||||||
)
|
)
|
||||||
|
|
||||||
const SectorStorePrefix = "/sectors"
|
const SectorStorePrefix = "/sectors"
|
||||||
@ -206,7 +207,7 @@ func (m *Sealing) TerminatePending(ctx context.Context) ([]abi.SectorID, error)
|
|||||||
return m.terminator.Pending(ctx)
|
return m.terminator.Pending(ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Sealing) SectorPreCommitFlush(ctx context.Context) (*cid.Cid, error) {
|
func (m *Sealing) SectorPreCommitFlush(ctx context.Context) ([]sealiface.PreCommitBatchRes, error) {
|
||||||
return m.precommiter.Flush(ctx)
|
return m.precommiter.Flush(ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -214,7 +215,7 @@ func (m *Sealing) SectorPreCommitPending(ctx context.Context) ([]abi.SectorID, e
|
|||||||
return m.precommiter.Pending(ctx)
|
return m.precommiter.Pending(ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Sealing) CommitFlush(ctx context.Context) (*cid.Cid, error) {
|
func (m *Sealing) CommitFlush(ctx context.Context) ([]sealiface.CommitBatchRes, error) {
|
||||||
return m.commiter.Flush(ctx)
|
return m.commiter.Flush(ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
32
extern/storage-sealing/states_sealing.go
vendored
32
extern/storage-sealing/states_sealing.go
vendored
@ -355,7 +355,7 @@ func (m *Sealing) handlePreCommitting(ctx statemachine.Context, sector SectorInf
|
|||||||
|
|
||||||
func (m *Sealing) handleSubmitPreCommitBatch(ctx statemachine.Context, sector SectorInfo) error {
|
func (m *Sealing) handleSubmitPreCommitBatch(ctx statemachine.Context, sector SectorInfo) error {
|
||||||
if sector.CommD == nil || sector.CommR == nil {
|
if sector.CommD == nil || sector.CommR == nil {
|
||||||
return ctx.Send(SectorCommitFailed{xerrors.Errorf("sector had nil commR or commD")})
|
return ctx.Send(SectorSealPreCommit1Failed{xerrors.Errorf("sector had nil commR or commD")})
|
||||||
}
|
}
|
||||||
|
|
||||||
params, deposit, _, err := m.preCommitParams(ctx, sector)
|
params, deposit, _, err := m.preCommitParams(ctx, sector)
|
||||||
@ -363,12 +363,20 @@ func (m *Sealing) handleSubmitPreCommitBatch(ctx statemachine.Context, sector Se
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
mcid, err := m.precommiter.AddPreCommit(ctx.Context(), sector, deposit, params)
|
res, err := m.precommiter.AddPreCommit(ctx.Context(), sector, deposit, params)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return ctx.Send(SectorCommitFailed{xerrors.Errorf("queuing precommit batch failed: %w", err)})
|
return ctx.Send(SectorChainPreCommitFailed{xerrors.Errorf("queuing precommit batch failed: %w", err)})
|
||||||
}
|
}
|
||||||
|
|
||||||
return ctx.Send(SectorPreCommitBatchSent{mcid})
|
if res.Error != "" {
|
||||||
|
return ctx.Send(SectorChainPreCommitFailed{xerrors.Errorf("precommit batch error: %s", res.Error)})
|
||||||
|
}
|
||||||
|
|
||||||
|
if res.Msg == nil {
|
||||||
|
return ctx.Send(SectorChainPreCommitFailed{xerrors.Errorf("batch message was nil")})
|
||||||
|
}
|
||||||
|
|
||||||
|
return ctx.Send(SectorPreCommitBatchSent{*res.Msg})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Sealing) handlePreCommitWait(ctx statemachine.Context, sector SectorInfo) error {
|
func (m *Sealing) handlePreCommitWait(ctx statemachine.Context, sector SectorInfo) error {
|
||||||
@ -581,7 +589,7 @@ func (m *Sealing) handleSubmitCommitAggregate(ctx statemachine.Context, sector S
|
|||||||
return ctx.Send(SectorCommitFailed{xerrors.Errorf("sector had nil commR or commD")})
|
return ctx.Send(SectorCommitFailed{xerrors.Errorf("sector had nil commR or commD")})
|
||||||
}
|
}
|
||||||
|
|
||||||
mcid, err := m.commiter.AddCommit(ctx.Context(), sector, AggregateInput{
|
res, err := m.commiter.AddCommit(ctx.Context(), sector, AggregateInput{
|
||||||
info: proof.AggregateSealVerifyInfo{
|
info: proof.AggregateSealVerifyInfo{
|
||||||
Number: sector.SectorNumber,
|
Number: sector.SectorNumber,
|
||||||
Randomness: sector.TicketValue,
|
Randomness: sector.TicketValue,
|
||||||
@ -596,7 +604,19 @@ func (m *Sealing) handleSubmitCommitAggregate(ctx statemachine.Context, sector S
|
|||||||
return ctx.Send(SectorCommitFailed{xerrors.Errorf("queuing commit for aggregation failed: %w", err)})
|
return ctx.Send(SectorCommitFailed{xerrors.Errorf("queuing commit for aggregation failed: %w", err)})
|
||||||
}
|
}
|
||||||
|
|
||||||
return ctx.Send(SectorCommitAggregateSent{mcid})
|
if res.Error != "" {
|
||||||
|
return ctx.Send(SectorCommitFailed{xerrors.Errorf("aggregate error: %s", res.Error)})
|
||||||
|
}
|
||||||
|
|
||||||
|
if e, found := res.FailedSectors[sector.SectorNumber]; found {
|
||||||
|
return ctx.Send(SectorCommitFailed{xerrors.Errorf("sector failed in aggregate processing: %s", e)})
|
||||||
|
}
|
||||||
|
|
||||||
|
if res.Msg == nil {
|
||||||
|
return ctx.Send(SectorCommitFailed{xerrors.Errorf("aggregate message was nil")})
|
||||||
|
}
|
||||||
|
|
||||||
|
return ctx.Send(SectorCommitAggregateSent{*res.Msg})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Sealing) handleCommitWait(ctx statemachine.Context, sector SectorInfo) error {
|
func (m *Sealing) handleCommitWait(ctx statemachine.Context, sector SectorInfo) error {
|
||||||
|
@ -271,7 +271,7 @@ func DefaultStorageMiner() *StorageMiner {
|
|||||||
PreCommitBatchSlack: Duration(3 * time.Hour),
|
PreCommitBatchSlack: Duration(3 * time.Hour),
|
||||||
|
|
||||||
AggregateCommits: true,
|
AggregateCommits: true,
|
||||||
MinCommitBatch: 1, // we must have at least one proof to aggregate
|
MinCommitBatch: miner5.MinAggregatedSectors, // we must have at least four proofs to aggregate
|
||||||
MaxCommitBatch: miner5.MaxAggregatedSectors, // this is the maximum aggregation per FIP13
|
MaxCommitBatch: miner5.MaxAggregatedSectors, // this is the maximum aggregation per FIP13
|
||||||
CommitBatchWait: Duration(24 * time.Hour), // this can be up to 6 days
|
CommitBatchWait: Duration(24 * time.Hour), // this can be up to 6 days
|
||||||
CommitBatchSlack: Duration(1 * time.Hour),
|
CommitBatchSlack: Duration(1 * time.Hour),
|
||||||
|
@ -32,6 +32,7 @@ import (
|
|||||||
"github.com/filecoin-project/lotus/extern/sector-storage/stores"
|
"github.com/filecoin-project/lotus/extern/sector-storage/stores"
|
||||||
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
|
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
|
||||||
sealing "github.com/filecoin-project/lotus/extern/storage-sealing"
|
sealing "github.com/filecoin-project/lotus/extern/storage-sealing"
|
||||||
|
"github.com/filecoin-project/lotus/extern/storage-sealing/sealiface"
|
||||||
|
|
||||||
"github.com/filecoin-project/lotus/api"
|
"github.com/filecoin-project/lotus/api"
|
||||||
"github.com/filecoin-project/lotus/api/apistruct"
|
"github.com/filecoin-project/lotus/api/apistruct"
|
||||||
@ -374,7 +375,7 @@ func (sm *StorageMinerAPI) SectorTerminatePending(ctx context.Context) ([]abi.Se
|
|||||||
return sm.Miner.TerminatePending(ctx)
|
return sm.Miner.TerminatePending(ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sm *StorageMinerAPI) SectorPreCommitFlush(ctx context.Context) (*cid.Cid, error) {
|
func (sm *StorageMinerAPI) SectorPreCommitFlush(ctx context.Context) ([]sealiface.PreCommitBatchRes, error) {
|
||||||
return sm.Miner.SectorPreCommitFlush(ctx)
|
return sm.Miner.SectorPreCommitFlush(ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -386,7 +387,7 @@ func (sm *StorageMinerAPI) SectorMarkForUpgrade(ctx context.Context, id abi.Sect
|
|||||||
return sm.Miner.MarkForUpgrade(id)
|
return sm.Miner.MarkForUpgrade(id)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sm *StorageMinerAPI) SectorCommitFlush(ctx context.Context) (*cid.Cid, error) {
|
func (sm *StorageMinerAPI) SectorCommitFlush(ctx context.Context) ([]sealiface.CommitBatchRes, error) {
|
||||||
return sm.Miner.CommitFlush(ctx)
|
return sm.Miner.CommitFlush(ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -11,6 +11,7 @@ import (
|
|||||||
"github.com/filecoin-project/specs-storage/storage"
|
"github.com/filecoin-project/specs-storage/storage"
|
||||||
|
|
||||||
sealing "github.com/filecoin-project/lotus/extern/storage-sealing"
|
sealing "github.com/filecoin-project/lotus/extern/storage-sealing"
|
||||||
|
"github.com/filecoin-project/lotus/extern/storage-sealing/sealiface"
|
||||||
)
|
)
|
||||||
|
|
||||||
// TODO: refactor this to be direct somehow
|
// TODO: refactor this to be direct somehow
|
||||||
@ -59,7 +60,7 @@ func (m *Miner) TerminatePending(ctx context.Context) ([]abi.SectorID, error) {
|
|||||||
return m.sealing.TerminatePending(ctx)
|
return m.sealing.TerminatePending(ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Miner) SectorPreCommitFlush(ctx context.Context) (*cid.Cid, error) {
|
func (m *Miner) SectorPreCommitFlush(ctx context.Context) ([]sealiface.PreCommitBatchRes, error) {
|
||||||
return m.sealing.SectorPreCommitFlush(ctx)
|
return m.sealing.SectorPreCommitFlush(ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -67,7 +68,7 @@ func (m *Miner) SectorPreCommitPending(ctx context.Context) ([]abi.SectorID, err
|
|||||||
return m.sealing.SectorPreCommitPending(ctx)
|
return m.sealing.SectorPreCommitPending(ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Miner) CommitFlush(ctx context.Context) (*cid.Cid, error) {
|
func (m *Miner) CommitFlush(ctx context.Context) ([]sealiface.CommitBatchRes, error) {
|
||||||
return m.sealing.CommitFlush(ctx)
|
return m.sealing.CommitFlush(ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user