SectorTerminateFlush API

This commit is contained in:
Łukasz Magiera 2021-01-13 23:32:04 +01:00
parent db977a2f91
commit 3522c8d45a
7 changed files with 70 additions and 6 deletions

View File

@ -68,8 +68,12 @@ type StorageMiner interface {
// SectorRemove removes the sector from storage. It doesn't terminate it on-chain, which can
// be done with SectorTerminate. Removing and not terminating live sectors will cause additional penalties.
SectorRemove(context.Context, abi.SectorNumber) error
// SectorTerminate terminates the sector on-chain, then automatically removes it from storage
// SectorTerminate terminates the sector on-chain (adding it to a termination batch first), then
// automatically removes it from storage
SectorTerminate(context.Context, abi.SectorNumber) error
// SectorTerminateFlush immediately sends a terminate message with sectors batched for termination.
// Returns null if message wasn't sent
SectorTerminateFlush(ctx context.Context) (*cid.Cid, error)
SectorMarkForUpgrade(ctx context.Context, id abi.SectorNumber) error
StorageList(ctx context.Context) (map[stores.ID][]stores.Decl, error)

View File

@ -315,6 +315,7 @@ type StorageMinerStruct struct {
SectorsUpdate func(context.Context, abi.SectorNumber, api.SectorState) error `perm:"admin"`
SectorRemove func(context.Context, abi.SectorNumber) error `perm:"admin"`
SectorTerminate func(context.Context, abi.SectorNumber) error `perm:"admin"`
SectorTerminateFlush func(ctx context.Context) (*cid.Cid, error) `perm:"admin"`
SectorMarkForUpgrade func(ctx context.Context, id abi.SectorNumber) error `perm:"admin"`
WorkerConnect func(context.Context, string) error `perm:"admin" retry:"true"` // TODO: worker perm
@ -1315,6 +1316,10 @@ func (c *StorageMinerStruct) SectorTerminate(ctx context.Context, number abi.Sec
return c.Internal.SectorTerminate(ctx, number)
}
func (c *StorageMinerStruct) SectorTerminateFlush(ctx context.Context) (*cid.Cid, error) {
return c.Internal.SectorTerminateFlush(ctx)
}
func (c *StorageMinerStruct) SectorMarkForUpgrade(ctx context.Context, number abi.SectorNumber) error {
return c.Internal.SectorMarkForUpgrade(ctx, number)
}

View File

@ -100,6 +100,7 @@
* [SectorSetSealDelay](#SectorSetSealDelay)
* [SectorStartSealing](#SectorStartSealing)
* [SectorTerminate](#SectorTerminate)
* [SectorTerminateFlush](#SectorTerminateFlush)
* [Sectors](#Sectors)
* [SectorsList](#SectorsList)
* [SectorsListInStates](#SectorsListInStates)
@ -1539,7 +1540,8 @@ Inputs:
Response: `{}`
### SectorTerminate
SectorTerminate terminates the sector on-chain, then automatically removes it from storage
SectorTerminate terminates the sector on-chain (adding it to a termination batch first), then
automatically removes it from storage
Perms: admin
@ -1553,6 +1555,17 @@ Inputs:
Response: `{}`
### SectorTerminateFlush
SectorTerminateFlush immediately sends a terminate message with sectors batched for termination.
Returns null if message wasn't sent
Perms: admin
Inputs: `null`
Response: `null`
## Sectors

View File

@ -283,6 +283,10 @@ func (m *Sealing) Terminate(ctx context.Context, sid abi.SectorNumber) error {
return m.sectors.Send(uint64(sid), SectorTerminate{})
}
func (m *Sealing) TerminateFlush(ctx context.Context) (*cid.Cid, error) {
return m.terminator.Flush(ctx)
}
// Caller should NOT hold m.unsealedInfoMap.lk
func (m *Sealing) StartPacking(sectorID abi.SectorNumber) error {
// locking here ensures that when the SectorStartPacking event is sent, the sector won't be picked up anywhere else

View File

@ -46,8 +46,9 @@ type TerminateBatcher struct {
waiting map[SectorLocation][]chan cid.Cid
notify, force, stop, stopped chan struct{}
lk sync.Mutex
notify, stop, stopped chan struct{}
force chan chan *cid.Cid
lk sync.Mutex
}
func NewTerminationBatcher(mctx context.Context, maddr address.Address, api TerminateBatcherApi, addrSel AddrSel, feeCfg FeeConfig) *TerminateBatcher {
@ -62,7 +63,7 @@ func NewTerminationBatcher(mctx context.Context, maddr address.Address, api Term
waiting: map[SectorLocation][]chan cid.Cid{},
notify: make(chan struct{}, 1),
force: make(chan struct{}),
force: make(chan chan *cid.Cid),
stop: make(chan struct{}),
stopped: make(chan struct{}),
}
@ -73,7 +74,16 @@ func NewTerminationBatcher(mctx context.Context, maddr address.Address, api Term
}
func (b *TerminateBatcher) run() {
var forceRes chan *cid.Cid
var lastMsg *cid.Cid
for {
if forceRes != nil {
forceRes <- lastMsg
forceRes = nil
}
lastMsg = nil
var notif, after bool
select {
case <-b.stop:
@ -83,7 +93,8 @@ func (b *TerminateBatcher) run() {
notif = true // send above max
case <-time.After(TerminateBatchWait):
after = true // send above min
case <-b.force: // user triggered
case fr := <-b.force: // user triggered
forceRes = fr
}
dl, err := b.api.StateMinerProvingDeadline(b.mctx, b.maddr, nil)
@ -163,6 +174,7 @@ func (b *TerminateBatcher) run() {
b.lk.Unlock()
continue
}
lastMsg = &mcid
log.Infow("Sent TerminateSectors message", "cid", mcid, "from", from, "terminations", len(params.Terminations))
for _, t := range params.Terminations {
@ -177,6 +189,7 @@ func (b *TerminateBatcher) run() {
ch <- mcid // buffered
}
}
b.waiting = map[SectorLocation][]chan cid.Cid{}
b.lk.Unlock()
@ -224,6 +237,21 @@ func (b *TerminateBatcher) AddTermination(ctx context.Context, s abi.SectorID) (
}
}
func (b *TerminateBatcher) Flush(ctx context.Context) (*cid.Cid, error) {
resCh := make(chan *cid.Cid, 1)
select {
case b.force <- resCh:
select {
case res := <-resCh:
return res, nil
case <-ctx.Done():
return nil, ctx.Err()
}
case <-ctx.Done():
return nil, ctx.Err()
}
}
func (b *TerminateBatcher) Stop(ctx context.Context) error {
close(b.stop)

View File

@ -332,6 +332,10 @@ func (sm *StorageMinerAPI) SectorTerminate(ctx context.Context, id abi.SectorNum
return sm.Miner.TerminateSector(ctx, id)
}
func (sm *StorageMinerAPI) SectorTerminateFlush(ctx context.Context) (*cid.Cid, error) {
return sm.Miner.TerminateFlush(ctx)
}
func (sm *StorageMinerAPI) SectorMarkForUpgrade(ctx context.Context, id abi.SectorNumber) error {
return sm.Miner.MarkForUpgrade(id)
}

View File

@ -4,6 +4,8 @@ import (
"context"
"io"
"github.com/ipfs/go-cid"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
@ -48,6 +50,10 @@ func (m *Miner) TerminateSector(ctx context.Context, id abi.SectorNumber) error
return m.sealing.Terminate(ctx, id)
}
func (m *Miner) TerminateFlush(ctx context.Context) (*cid.Cid, error) {
return m.sealing.TerminateFlush(ctx)
}
func (m *Miner) MarkForUpgrade(id abi.SectorNumber) error {
return m.sealing.MarkForUpgrade(id)
}