feat: proving: Introduce manual sector fault recovery (#9144)

* rebase

* fix lint error

* fix errors

* add itest

* make gen after rebase

* apply suggestion from review

* make gen after latest rebase

* gen after rebase

* fix waitgroup

* change as per review

* refactor Recovery Batches

* fix CLI

* gen after rebase

* close the channel
This commit is contained in:
LexLuthr 2022-09-06 21:17:30 +05:30 committed by GitHub
parent 783fc6a20d
commit 67d4f905e6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 459 additions and 9 deletions

View File

@ -314,6 +314,11 @@ type StorageMiner interface {
CheckProvable(ctx context.Context, pp abi.RegisteredPoStProof, sectors []storiface.SectorRef, expensive bool) (map[abi.SectorNumber]string, error) //perm:admin
ComputeProof(ctx context.Context, ssi []builtin.ExtendedSectorInfo, rand abi.PoStRandomness, poStEpoch abi.ChainEpoch, nv abinetwork.Version) ([]builtin.PoStProof, error) //perm:read
// RecoverFault can be used to declare recoveries manually. It sends messages
// to the miner actor with details of recovered sectors and returns the CID of messages. It honors the
// maxPartitionsPerRecoveryMessage from the config
RecoverFault(ctx context.Context, sectors []abi.SectorNumber) ([]cid.Cid, error) //perm:admin
}
var _ storiface.WorkerReturn = *new(StorageMiner)

View File

@ -770,6 +770,8 @@ type StorageMinerStruct struct {
PledgeSector func(p0 context.Context) (abi.SectorID, error) `perm:"write"`
RecoverFault func(p0 context.Context, p1 []abi.SectorNumber) ([]cid.Cid, error) `perm:"admin"`
ReturnAddPiece func(p0 context.Context, p1 storiface.CallID, p2 abi.PieceInfo, p3 *storiface.CallError) error `perm:"admin"`
ReturnDataCid func(p0 context.Context, p1 storiface.CallID, p2 abi.PieceInfo, p3 *storiface.CallError) error `perm:"admin"`
@ -4617,6 +4619,17 @@ func (s *StorageMinerStub) PledgeSector(p0 context.Context) (abi.SectorID, error
return *new(abi.SectorID), ErrNotSupported
}
func (s *StorageMinerStruct) RecoverFault(p0 context.Context, p1 []abi.SectorNumber) ([]cid.Cid, error) {
if s.Internal.RecoverFault == nil {
return *new([]cid.Cid), ErrNotSupported
}
return s.Internal.RecoverFault(p0, p1)
}
func (s *StorageMinerStub) RecoverFault(p0 context.Context, p1 []abi.SectorNumber) ([]cid.Cid, error) {
return *new([]cid.Cid), ErrNotSupported
}
func (s *StorageMinerStruct) ReturnAddPiece(p0 context.Context, p1 storiface.CallID, p2 abi.PieceInfo, p3 *storiface.CallError) error {
if s.Internal.ReturnAddPiece == nil {
return ErrNotSupported

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -7,10 +7,12 @@ import (
"os"
"strconv"
"strings"
"sync"
"text/tabwriter"
"time"
"github.com/fatih/color"
"github.com/ipfs/go-cid"
"github.com/urfave/cli/v2"
"golang.org/x/xerrors"
@ -19,6 +21,7 @@ import (
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/blockstore"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types"
@ -37,6 +40,7 @@ var provingCmd = &cli.Command{
provingCheckProvableCmd,
workersCmd(false),
provingComputeCmd,
provingRecoverFaultsCmd,
},
}
@ -644,3 +648,82 @@ It will not send any messages to the chain.`,
return nil
},
}
var provingRecoverFaultsCmd = &cli.Command{
Name: "recover-faults",
Usage: "Manually recovers faulty sectors on chain",
ArgsUsage: "<faulty sectors>",
Flags: []cli.Flag{
&cli.IntFlag{
Name: "confidence",
Usage: "number of block confirmations to wait for",
Value: int(build.MessageConfidence),
},
},
Action: func(cctx *cli.Context) error {
if cctx.Args().Len() < 1 {
return xerrors.Errorf("must pass at least 1 sector number")
}
arglist := cctx.Args().Slice()
var sectors []abi.SectorNumber
for _, v := range arglist {
s, err := strconv.ParseUint(v, 10, 64)
if err != nil {
return xerrors.Errorf("failed to convert sectors, please check the arguments: %w", err)
}
sectors = append(sectors, abi.SectorNumber(s))
}
nodeApi, closer, err := lcli.GetStorageMinerAPI(cctx)
if err != nil {
return err
}
defer closer()
api, acloser, err := lcli.GetFullNodeAPI(cctx)
if err != nil {
return err
}
defer acloser()
ctx := lcli.ReqContext(cctx)
msgs, err := nodeApi.RecoverFault(ctx, sectors)
if err != nil {
return err
}
// wait for msgs to get mined into a block
var wg sync.WaitGroup
wg.Add(len(msgs))
results := make(chan error, len(msgs))
for _, msg := range msgs {
go func(m cid.Cid) {
defer wg.Done()
wait, err := api.StateWaitMsg(ctx, m, uint64(cctx.Int("confidence")))
if err != nil {
results <- xerrors.Errorf("Timeout waiting for message to land on chain %s", wait.Message)
return
}
if wait.Receipt.ExitCode != 0 {
results <- xerrors.Errorf("Failed to execute message %s: %w", wait.Message, wait.Receipt.ExitCode.Error())
return
}
results <- nil
return
}(msg)
}
wg.Wait()
close(results)
for v := range results {
if v != nil {
fmt.Println("Failed to execute the message %w", v)
}
}
return nil
},
}

View File

@ -106,6 +106,8 @@
* [PiecesListPieces](#PiecesListPieces)
* [Pledge](#Pledge)
* [PledgeSector](#PledgeSector)
* [Recover](#Recover)
* [RecoverFault](#RecoverFault)
* [Return](#Return)
* [ReturnAddPiece](#ReturnAddPiece)
* [ReturnDataCid](#ReturnDataCid)
@ -2265,6 +2267,36 @@ Response:
}
```
## Recover
### RecoverFault
RecoverFault can be used to declare recoveries manually. It sends messages
to the miner actor with details of recovered sectors and returns the CID of messages. It honors the
maxPartitionsPerRecoveryMessage from the config
Perms: admin
Inputs:
```json
[
[
123,
124
]
]
```
Response:
```json
[
{
"/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4"
}
]
```
## Return

View File

@ -2100,6 +2100,7 @@ COMMANDS:
check Check sectors provable
workers list workers
compute Compute simulated proving tasks
recover-faults Manually recovers faulty sectors on chain
help, h Shows a list of commands or help for one command
OPTIONS:
@ -2210,6 +2211,19 @@ OPTIONS:
```
```
### lotus-miner proving recover-faults
```
NAME:
lotus-miner proving recover-faults - Manually recovers faulty sectors on chain
USAGE:
lotus-miner proving recover-faults [command options] <faulty sectors>
OPTIONS:
--confidence value number of block confirmations to wait for (default: 5)
```
## lotus-miner storage
```
NAME:

2
extern/filecoin-ffi vendored

@ -1 +1 @@
Subproject commit f997fe6c77632c0bc58d0b1fdf53ee7a93f6027c
Subproject commit 32afd6e1f1419b6bb7d0f4b3944287fde593ca64

View File

@ -300,3 +300,170 @@ func TestWindowPostMaxSectorsRecoveryConfig(t *testing.T) {
sectors = p.MinerPower.RawBytePower.Uint64() / uint64(ssz)
require.Equal(t, nSectors+kit.DefaultPresealsPerBootstrapMiner-1, int(sectors)) // -1 not recovered sector
}
func TestWindowPostManualSectorsRecovery(t *testing.T) {
oldVal := wdpost.RecoveringSectorLimit
defer func() {
wdpost.RecoveringSectorLimit = oldVal
}()
wdpost.RecoveringSectorLimit = 1
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
client, miner, ens := kit.EnsembleMinimal(t,
kit.LatestActorsAt(-1),
kit.MockProofs())
ens.InterconnectAll().BeginMining(2 * time.Millisecond)
nSectors := 10
miner.PledgeSectors(ctx, nSectors, 0, nil)
maddr, err := miner.ActorAddress(ctx)
require.NoError(t, err)
di, err := client.StateMinerProvingDeadline(ctx, maddr, types.EmptyTSK)
require.NoError(t, err)
mid, err := address.IDFromAddress(maddr)
require.NoError(t, err)
t.Log("Running one proving period")
waitUntil := di.Open + di.WPoStProvingPeriod
t.Logf("End for head.Height > %d", waitUntil)
ts := client.WaitTillChain(ctx, kit.HeightAtLeast(waitUntil))
t.Logf("Now head.Height = %d", ts.Height())
p, err := client.StateMinerPower(ctx, maddr, types.EmptyTSK)
require.NoError(t, err)
ssz, err := miner.ActorSectorSize(ctx, maddr)
require.NoError(t, err)
require.Equal(t, p.MinerPower, p.TotalPower)
require.Equal(t, p.MinerPower.RawBytePower, types.NewInt(uint64(ssz)*uint64(nSectors+kit.DefaultPresealsPerBootstrapMiner)))
failed, err := client.StateMinerFaults(ctx, maddr, types.TipSetKey{})
require.NoError(t, err)
failedCount, err := failed.Count()
require.NoError(t, err)
require.Equal(t, failedCount, uint64(0))
t.Log("Drop some sectors")
// Drop 2 sectors from deadline 2 partition 0 (full partition / deadline)
parts, err := client.StateMinerPartitions(ctx, maddr, 2, types.EmptyTSK)
require.NoError(t, err)
require.Greater(t, len(parts), 0)
secs := parts[0].AllSectors
n, err := secs.Count()
require.NoError(t, err)
require.Equal(t, uint64(2), n)
var failedSectors []abi.SectorNumber
// Drop the partition
err = secs.ForEach(func(sid uint64) error {
failedSectors = append(failedSectors, abi.SectorNumber(sid))
return miner.StorageMiner.(*impl.StorageMinerAPI).IStorageMgr.(*mock.SectorMgr).MarkFailed(storiface.SectorRef{
ID: abi.SectorID{
Miner: abi.ActorID(mid),
Number: abi.SectorNumber(sid),
},
}, true)
})
require.NoError(t, err)
di, err = client.StateMinerProvingDeadline(ctx, maddr, types.EmptyTSK)
require.NoError(t, err)
t.Log("Go through another PP, wait for sectors to become faulty")
waitUntil = di.Open + di.WPoStProvingPeriod
t.Logf("End for head.Height > %d", waitUntil)
ts = client.WaitTillChain(ctx, kit.HeightAtLeast(waitUntil))
t.Logf("Now head.Height = %d", ts.Height())
failed, err = client.StateMinerFaults(ctx, maddr, types.TipSetKey{})
require.NoError(t, err)
failedCount, err = failed.Count()
require.NoError(t, err)
require.Equal(t, failedCount, uint64(2))
recovered, err := client.StateMinerRecoveries(ctx, maddr, types.TipSetKey{})
require.NoError(t, err)
recoveredCount, err := recovered.Count()
require.NoError(t, err)
require.Equal(t, recoveredCount, uint64(0))
p, err = client.StateMinerPower(ctx, maddr, types.EmptyTSK)
require.NoError(t, err)
require.Equal(t, p.MinerPower, p.TotalPower)
t.Log("Make the sectors recoverable")
err = secs.ForEach(func(sid uint64) error {
return miner.StorageMiner.(*impl.StorageMinerAPI).IStorageMgr.(*mock.SectorMgr).MarkFailed(storiface.SectorRef{
ID: abi.SectorID{
Miner: abi.ActorID(mid),
Number: abi.SectorNumber(sid),
},
}, false)
})
require.NoError(t, err)
// Try to manually recover the sector
t.Log("Send recovery message")
_, err = miner.RecoverFault(ctx, failedSectors)
require.NoError(t, err)
currentHeight, err := client.ChainHead(ctx)
require.NoError(t, err)
ts = client.WaitTillChain(ctx, kit.HeightAtLeast(currentHeight.Height()+abi.ChainEpoch(10)))
t.Logf("Now head.Height = %d", ts.Height())
failed, err = client.StateMinerFaults(ctx, maddr, types.TipSetKey{})
require.NoError(t, err)
failedCount, err = failed.Count()
require.NoError(t, err)
require.Equal(t, failedCount, uint64(2))
recovered, err = client.StateMinerRecoveries(ctx, maddr, types.TipSetKey{})
require.NoError(t, err)
recoveredCount, err = recovered.Count()
require.NoError(t, err)
require.Equal(t, recoveredCount, uint64(2))
di, err = client.StateMinerProvingDeadline(ctx, maddr, types.EmptyTSK)
require.NoError(t, err)
t.Log("Go through another PP, wait for sectors to become faulty")
waitUntil = di.Open + di.WPoStProvingPeriod
t.Logf("End for head.Height > %d", waitUntil)
ts = client.WaitTillChain(ctx, kit.HeightAtLeast(waitUntil))
t.Logf("Now head.Height = %d", ts.Height())
failed, err = client.StateMinerFaults(ctx, maddr, types.TipSetKey{})
require.NoError(t, err)
failedCount, err = failed.Count()
require.NoError(t, err)
require.Equal(t, failedCount, uint64(0))
recovered, err = client.StateMinerRecoveries(ctx, maddr, types.TipSetKey{})
require.NoError(t, err)
recoveredCount, err = recovered.Count()
require.NoError(t, err)
require.Equal(t, recoveredCount, uint64(0))
}

View File

@ -1323,6 +1323,27 @@ func (sm *StorageMinerAPI) ComputeProof(ctx context.Context, ssi []builtin.Exten
return sm.Epp.ComputeProof(ctx, ssi, rand, poStEpoch, nv)
}
func (sm *StorageMinerAPI) RecoverFault(ctx context.Context, sectors []abi.SectorNumber) ([]cid.Cid, error) {
allsectors, err := sm.Miner.ListSectors()
if err != nil {
return nil, xerrors.Errorf("could not get a list of all sectors from the miner: %w", err)
}
var found bool
for _, v := range sectors {
found = false
for _, s := range allsectors {
if v == s.SectorNumber {
found = true
break
}
}
if !found {
return nil, xerrors.Errorf("sectors %d not found in the sector list for miner", v)
}
}
return sm.WdPoSt.ManualFaultRecovery(ctx, sm.Miner.Address(), sectors)
}
func (sm *StorageMinerAPI) RuntimeSubsystems(context.Context) (res api.MinerSubsystems, err error) {
return sm.EnabledSubsystems, nil
}

View File

@ -717,3 +717,7 @@ func (s *WindowPoStScheduler) ComputePoSt(ctx context.Context, dlIdx uint64, ts
return s.runPoStCycle(ctx, true, *dl, ts)
}
func (s *WindowPoStScheduler) ManualFaultRecovery(ctx context.Context, maddr address.Address, sectors []abi.SectorNumber) ([]cid.Cid, error) {
return s.declareManualRecoveries(ctx, maddr, sectors, types.TipSetKey{})
}

View File

@ -10,6 +10,7 @@ import (
"go.opencensus.io/trace"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-bitfield"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/builtin"
@ -345,3 +346,111 @@ func (s *WindowPoStScheduler) asyncFaultRecover(di dline.Info, ts *types.TipSet)
}
}()
}
// declareRecoveries identifies sectors that were previously marked as faulty
// for our miner, but are now recovered (i.e. are now provable again) and
// still not reported as such.
//
// It then reports the recovery on chain via a `DeclareFaultsRecovered`
// message to our miner actor.
//
// This is always invoked ahead of time, before the deadline for the evaluated
// sectors arrives. That way, recoveries are declared in preparation for those
// sectors to be proven.
//
// If a declaration is made, it awaits for build.MessageConfidence confirmations
// on chain before returning.
func (s *WindowPoStScheduler) declareManualRecoveries(ctx context.Context, maddr address.Address, sectors []abi.SectorNumber, tsk types.TipSetKey) ([]cid.Cid, error) {
var RecoveryDecls []miner.RecoveryDeclaration
var RecoveryBatches [][]miner.RecoveryDeclaration
type ptx struct {
deadline uint64
partition uint64
}
smap := make(map[ptx][]uint64)
var mcids []cid.Cid
for _, sector := range sectors {
ptxID, err := s.api.StateSectorPartition(ctx, maddr, sector, types.TipSetKey{})
if err != nil {
return nil, xerrors.Errorf("failed to fetch partition and deadline details for sector %d: %w", sector, err)
}
ptxinfo := ptx{
deadline: ptxID.Deadline,
partition: ptxID.Partition,
}
slist := smap[ptxinfo]
sn := uint64(sector)
slist = append(slist, sn)
smap[ptxinfo] = slist
}
for i, v := range smap {
sectorinbit := bitfield.NewFromSet(v)
RecoveryDecls = append(RecoveryDecls, miner.RecoveryDeclaration{
Deadline: i.deadline,
Partition: i.partition,
Sectors: sectorinbit,
})
}
// Batch if maxPartitionsPerRecoveryMessage is set
if s.maxPartitionsPerRecoveryMessage > 0 {
// Create batched
for len(RecoveryDecls) > s.maxPartitionsPerPostMessage {
Batch := RecoveryDecls[len(RecoveryDecls)-s.maxPartitionsPerRecoveryMessage:]
RecoveryDecls = RecoveryDecls[:len(RecoveryDecls)-s.maxPartitionsPerPostMessage]
RecoveryBatches = append(RecoveryBatches, Batch)
}
// Add remaining as new batch
RecoveryBatches = append(RecoveryBatches, RecoveryDecls)
} else {
RecoveryBatches = append(RecoveryBatches, RecoveryDecls)
}
for _, Batch := range RecoveryBatches {
msg, err := s.manualRecoveryMsg(ctx, Batch)
if err != nil {
return nil, err
}
mcids = append(mcids, msg)
}
return mcids, nil
}
func (s *WindowPoStScheduler) manualRecoveryMsg(ctx context.Context, Recovery []miner.RecoveryDeclaration) (cid.Cid, error) {
params := &miner.DeclareFaultsRecoveredParams{
Recoveries: Recovery,
}
enc, aerr := actors.SerializeParams(params)
if aerr != nil {
return cid.Undef, xerrors.Errorf("could not serialize declare recoveries parameters: %w", aerr)
}
msg := &types.Message{
To: s.actor,
Method: builtin.MethodsMiner.DeclareFaultsRecovered,
Params: enc,
Value: types.NewInt(0),
}
spec := &api.MessageSendSpec{MaxFee: abi.TokenAmount(s.feeCfg.MaxWindowPoStGasFee)}
if err := s.prepareMessage(ctx, msg, spec); err != nil {
return cid.Undef, err
}
sm, err := s.api.MpoolPushMessage(ctx, msg, &api.MessageSendSpec{MaxFee: abi.TokenAmount(s.feeCfg.MaxWindowPoStGasFee)})
if err != nil {
return cid.Undef, xerrors.Errorf("pushing message to mpool: %w", err)
}
return sm.Cid(), nil
}

View File

@ -19,6 +19,7 @@ import (
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build"
lminer "github.com/filecoin-project/lotus/chain/actors/builtin/miner"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/journal"
@ -44,6 +45,7 @@ type NodeAPI interface {
StateMinerPartitions(context.Context, address.Address, uint64, types.TipSetKey) ([]api.Partition, error)
StateLookupID(context.Context, address.Address, types.TipSetKey) (address.Address, error)
StateAccountKey(context.Context, address.Address, types.TipSetKey) (address.Address, error)
StateSectorPartition(ctx context.Context, maddr address.Address, sectorNumber abi.SectorNumber, tok types.TipSetKey) (*lminer.SectorLocation, error)
MpoolPushMessage(context.Context, *types.Message, *api.MessageSendSpec) (*types.SignedMessage, error)