perning termination API

This commit is contained in:
Łukasz Magiera 2021-01-14 12:37:23 +01:00
parent 1cfb73cc3b
commit 144b5a1350
7 changed files with 57 additions and 0 deletions

View File

@ -74,6 +74,8 @@ type StorageMiner interface {
// SectorTerminateFlush immediately sends a terminate message with sectors batched for termination.
// Returns null if message wasn't sent
SectorTerminateFlush(ctx context.Context) (*cid.Cid, error)
// SectorTerminatePending returns a list of pending sector terminations to be sent in the next batch message
SectorTerminatePending(ctx context.Context) ([]abi.SectorID, error)
SectorMarkForUpgrade(ctx context.Context, id abi.SectorNumber) error
StorageList(ctx context.Context) (map[stores.ID][]stores.Decl, error)
@ -232,4 +234,5 @@ const (
type AddressConfig struct {
PreCommitControl []address.Address
CommitControl []address.Address
TerminateControl []address.Address
}

View File

@ -316,6 +316,7 @@ type StorageMinerStruct struct {
SectorRemove func(context.Context, abi.SectorNumber) error `perm:"admin"`
SectorTerminate func(context.Context, abi.SectorNumber) error `perm:"admin"`
SectorTerminateFlush func(ctx context.Context) (*cid.Cid, error) `perm:"admin"`
SectorTerminatePending func(ctx context.Context) ([]abi.SectorID, error) `perm:"admin"`
SectorMarkForUpgrade func(ctx context.Context, id abi.SectorNumber) error `perm:"admin"`
WorkerConnect func(context.Context, string) error `perm:"admin" retry:"true"` // TODO: worker perm
@ -1320,6 +1321,10 @@ func (c *StorageMinerStruct) SectorTerminateFlush(ctx context.Context) (*cid.Cid
return c.Internal.SectorTerminateFlush(ctx)
}
func (c *StorageMinerStruct) SectorTerminatePending(ctx context.Context) ([]abi.SectorID, error) {
return c.Internal.SectorTerminatePending(ctx)
}
func (c *StorageMinerStruct) SectorMarkForUpgrade(ctx context.Context, number abi.SectorNumber) error {
return c.Internal.SectorMarkForUpgrade(ctx, number)
}

View File

@ -287,6 +287,10 @@ func (m *Sealing) TerminateFlush(ctx context.Context) (*cid.Cid, error) {
return m.terminator.Flush(ctx)
}
func (m *Sealing) TerminatePending(ctx context.Context) ([]abi.SectorID, error) {
return m.terminator.Pending(ctx)
}
// Caller should NOT hold m.unsealedInfoMap.lk
func (m *Sealing) StartPacking(sectorID abi.SectorNumber) error {
// locking here ensures that when the SectorStartPacking event is sent, the sector won't be picked up anywhere else

View File

@ -3,6 +3,7 @@ package sealing
import (
"bytes"
"context"
"sort"
"sync"
"time"
@ -252,6 +253,40 @@ func (b *TerminateBatcher) Flush(ctx context.Context) (*cid.Cid, error) {
}
}
func (b *TerminateBatcher) Pending(ctx context.Context) ([]abi.SectorID, error) {
b.lk.Lock()
defer b.lk.Unlock()
mid, err := address.IDFromAddress(b.maddr)
if err != nil {
return nil, err
}
res := make([]abi.SectorID, 0)
for _, bf := range b.todo {
err := bf.ForEach(func(id uint64) error {
res = append(res, abi.SectorID{
Miner: abi.ActorID(mid),
Number: abi.SectorNumber(id),
})
return nil
})
if err != nil {
return nil, err
}
}
sort.Slice(res, func(i, j int) bool {
if res[i].Miner != res[j].Miner {
return res[i].Miner < res[j].Miner
}
return res[i].Number < res[j].Number
})
return res, nil
}
func (b *TerminateBatcher) Stop(ctx context.Context) error {
close(b.stop)

View File

@ -336,6 +336,10 @@ func (sm *StorageMinerAPI) SectorTerminateFlush(ctx context.Context) (*cid.Cid,
return sm.Miner.TerminateFlush(ctx)
}
func (sm *StorageMinerAPI) SectorTerminatePending(ctx context.Context) ([]abi.SectorID, error) {
return sm.Miner.TerminatePending(ctx)
}
func (sm *StorageMinerAPI) SectorMarkForUpgrade(ctx context.Context, id abi.SectorNumber) error {
return sm.Miner.MarkForUpgrade(id)
}

View File

@ -30,6 +30,8 @@ func (as *AddressSelector) AddressFor(ctx context.Context, a addrSelectApi, mi m
addrs = append(addrs, as.PreCommitControl...)
case api.CommitAddr:
addrs = append(addrs, as.CommitControl...)
case api.TerminateSectorsAddr:
addrs = append(addrs, as.TerminateControl...)
default:
defaultCtl := map[address.Address]struct{}{}
for _, a := range mi.ControlAddresses {

View File

@ -54,6 +54,10 @@ func (m *Miner) TerminateFlush(ctx context.Context) (*cid.Cid, error) {
return m.sealing.TerminateFlush(ctx)
}
func (m *Miner) TerminatePending(ctx context.Context) ([]abi.SectorID, error) {
return m.sealing.TerminatePending(ctx)
}
func (m *Miner) MarkForUpgrade(id abi.SectorNumber) error {
return m.sealing.MarkForUpgrade(id)
}