From 144b5a1350226916771ce84b84cea5686df073b2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 14 Jan 2021 12:37:23 +0100 Subject: [PATCH] perning termination API --- api/api_storage.go | 3 ++ api/apistruct/struct.go | 5 ++++ extern/storage-sealing/sealing.go | 4 +++ extern/storage-sealing/terminate_batch.go | 35 +++++++++++++++++++++++ node/impl/storminer.go | 4 +++ storage/addresses.go | 2 ++ storage/sealing.go | 4 +++ 7 files changed, 57 insertions(+) diff --git a/api/api_storage.go b/api/api_storage.go index 9e193351d..042dad73b 100644 --- a/api/api_storage.go +++ b/api/api_storage.go @@ -74,6 +74,8 @@ type StorageMiner interface { // SectorTerminateFlush immediately sends a terminate message with sectors batched for termination. // Returns null if message wasn't sent SectorTerminateFlush(ctx context.Context) (*cid.Cid, error) + // SectorTerminatePending returns a list of pending sector terminations to be sent in the next batch message + SectorTerminatePending(ctx context.Context) ([]abi.SectorID, error) SectorMarkForUpgrade(ctx context.Context, id abi.SectorNumber) error StorageList(ctx context.Context) (map[stores.ID][]stores.Decl, error) @@ -232,4 +234,5 @@ const ( type AddressConfig struct { PreCommitControl []address.Address CommitControl []address.Address + TerminateControl []address.Address } diff --git a/api/apistruct/struct.go b/api/apistruct/struct.go index 30ee19262..1569f1b2a 100644 --- a/api/apistruct/struct.go +++ b/api/apistruct/struct.go @@ -316,6 +316,7 @@ type StorageMinerStruct struct { SectorRemove func(context.Context, abi.SectorNumber) error `perm:"admin"` SectorTerminate func(context.Context, abi.SectorNumber) error `perm:"admin"` SectorTerminateFlush func(ctx context.Context) (*cid.Cid, error) `perm:"admin"` + SectorTerminatePending func(ctx context.Context) ([]abi.SectorID, error) `perm:"admin"` SectorMarkForUpgrade func(ctx context.Context, id abi.SectorNumber) error `perm:"admin"` WorkerConnect func(context.Context, string) error `perm:"admin" retry:"true"` // TODO: worker perm @@ -1320,6 +1321,10 @@ func (c *StorageMinerStruct) SectorTerminateFlush(ctx context.Context) (*cid.Cid return c.Internal.SectorTerminateFlush(ctx) } +func (c *StorageMinerStruct) SectorTerminatePending(ctx context.Context) ([]abi.SectorID, error) { + return c.Internal.SectorTerminatePending(ctx) +} + func (c *StorageMinerStruct) SectorMarkForUpgrade(ctx context.Context, number abi.SectorNumber) error { return c.Internal.SectorMarkForUpgrade(ctx, number) } diff --git a/extern/storage-sealing/sealing.go b/extern/storage-sealing/sealing.go index acede4726..891bf8a1a 100644 --- a/extern/storage-sealing/sealing.go +++ b/extern/storage-sealing/sealing.go @@ -287,6 +287,10 @@ func (m *Sealing) TerminateFlush(ctx context.Context) (*cid.Cid, error) { return m.terminator.Flush(ctx) } +func (m *Sealing) TerminatePending(ctx context.Context) ([]abi.SectorID, error) { + return m.terminator.Pending(ctx) +} + // Caller should NOT hold m.unsealedInfoMap.lk func (m *Sealing) StartPacking(sectorID abi.SectorNumber) error { // locking here ensures that when the SectorStartPacking event is sent, the sector won't be picked up anywhere else diff --git a/extern/storage-sealing/terminate_batch.go b/extern/storage-sealing/terminate_batch.go index ca68af8ad..53eec2d82 100644 --- a/extern/storage-sealing/terminate_batch.go +++ b/extern/storage-sealing/terminate_batch.go @@ -3,6 +3,7 @@ package sealing import ( "bytes" "context" + "sort" "sync" "time" @@ -252,6 +253,40 @@ func (b *TerminateBatcher) Flush(ctx context.Context) (*cid.Cid, error) { } } +func (b *TerminateBatcher) Pending(ctx context.Context) ([]abi.SectorID, error) { + b.lk.Lock() + defer b.lk.Unlock() + + mid, err := address.IDFromAddress(b.maddr) + if err != nil { + return nil, err + } + + res := make([]abi.SectorID, 0) + for _, bf := range b.todo { + err := bf.ForEach(func(id uint64) error { + res = append(res, abi.SectorID{ + Miner: abi.ActorID(mid), + Number: abi.SectorNumber(id), + }) + return nil + }) + if err != nil { + return nil, err + } + } + + sort.Slice(res, func(i, j int) bool { + if res[i].Miner != res[j].Miner { + return res[i].Miner < res[j].Miner + } + + return res[i].Number < res[j].Number + }) + + return res, nil +} + func (b *TerminateBatcher) Stop(ctx context.Context) error { close(b.stop) diff --git a/node/impl/storminer.go b/node/impl/storminer.go index 970990129..fe79817a5 100644 --- a/node/impl/storminer.go +++ b/node/impl/storminer.go @@ -336,6 +336,10 @@ func (sm *StorageMinerAPI) SectorTerminateFlush(ctx context.Context) (*cid.Cid, return sm.Miner.TerminateFlush(ctx) } +func (sm *StorageMinerAPI) SectorTerminatePending(ctx context.Context) ([]abi.SectorID, error) { + return sm.Miner.TerminatePending(ctx) +} + func (sm *StorageMinerAPI) SectorMarkForUpgrade(ctx context.Context, id abi.SectorNumber) error { return sm.Miner.MarkForUpgrade(id) } diff --git a/storage/addresses.go b/storage/addresses.go index 5da8643cd..f406394b4 100644 --- a/storage/addresses.go +++ b/storage/addresses.go @@ -30,6 +30,8 @@ func (as *AddressSelector) AddressFor(ctx context.Context, a addrSelectApi, mi m addrs = append(addrs, as.PreCommitControl...) case api.CommitAddr: addrs = append(addrs, as.CommitControl...) + case api.TerminateSectorsAddr: + addrs = append(addrs, as.TerminateControl...) default: defaultCtl := map[address.Address]struct{}{} for _, a := range mi.ControlAddresses { diff --git a/storage/sealing.go b/storage/sealing.go index f987ac882..d07a14810 100644 --- a/storage/sealing.go +++ b/storage/sealing.go @@ -54,6 +54,10 @@ func (m *Miner) TerminateFlush(ctx context.Context) (*cid.Cid, error) { return m.sealing.TerminateFlush(ctx) } +func (m *Miner) TerminatePending(ctx context.Context) ([]abi.SectorID, error) { + return m.sealing.TerminatePending(ctx) +} + func (m *Miner) MarkForUpgrade(id abi.SectorNumber) error { return m.sealing.MarkForUpgrade(id) }