Merge pull request #775 from filecoin-project/fix/faults

Reintegrate sector faults
This commit is contained in:
Łukasz Magiera 2019-12-09 21:23:11 +01:00 committed by GitHub
commit e230d8d50c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 493 additions and 63 deletions

View File

@ -29,6 +29,10 @@ const (
CommitFailed
FailedUnrecoverable
Faulty // sector is corrupted or gone for some reason
FaultReported // sector has been declared as a fault on chain
FaultedFinal // fault declared on chain
)
var SectorStates = []string{
@ -39,6 +43,7 @@ var SectorStates = []string{
PreCommitting: "PreCommitting",
PreCommitted: "PreCommitted",
Committing: "Committing",
CommitWait: "CommitWait",
Proving: "Proving",
SealFailed: "SealFailed",
@ -47,6 +52,10 @@ var SectorStates = []string{
CommitFailed: "CommitFailed",
FailedUnrecoverable: "FailedUnrecoverable",
Faulty: "Faulty",
FaultReported: "FaultReported",
FaultedFinal: "FaultedFinal",
}
// StorageMiner is a low-level interface to the Filecoin network storage miner node

View File

@ -53,14 +53,10 @@ type StorageMinerActorState struct {
// Contains mostly static info about this miner
Info cid.Cid
// Faulty sectors reported since last SubmitPost,
// up to the current proving period's challenge time.
CurrentFaultSet types.BitField
// Faulty sectors reported since last SubmitPost
FaultSet types.BitField
// Faults submitted after the current proving period's challenge time,
// but before the PoSt for that period is submitted.
// These become the currentFaultSet when a PoSt is submitted.
NextFaultSet types.BitField
LastFaultSubmission uint64
// Amount of power this miner has.
Power types.BigInt
@ -340,7 +336,7 @@ func (sma StorageMinerActor) ProveCommitSector(act *types.Actor, vmctx types.VMC
return nil, aerrors.Wrapf(err, "failed to compute data commitment (sector %d, deals: %v)", params.SectorID, params.DealIDs)
}
if ok, err := ValidatePoRep(ctx, maddr, mi.SectorSize, commD, us.Info.CommR, ticket, params.Proof, seed, params.SectorID); err != nil {
if ok, err := vmctx.Sys().ValidatePoRep(ctx, maddr, mi.SectorSize, commD, us.Info.CommR, ticket, params.Proof, seed, params.SectorID); err != nil {
return nil, err
} else if !ok {
return nil, aerrors.Newf(2, "porep proof was invalid (t:%x; s:%x(%d); p:%s)", ticket, seed, us.ReceivedEpoch+build.InteractivePoRepDelay, truncateHexPrint(params.Proof))
@ -463,8 +459,17 @@ func (sma StorageMinerActor) SubmitFallbackPoSt(act *types.Actor, vmctx types.VM
return nil, aerrors.HandleExternalError(lerr, "could not load proving set node")
}
faults, nerr := self.FaultSet.AllMap()
if nerr != nil {
return nil, aerrors.Absorb(err, 5, "RLE+ invalid")
}
var sectorInfos []ffi.PublicSectorInfo
if err := pss.ForEach(func(id uint64, v *cbg.Deferred) error {
if faults[id] {
return nil
}
var comms [][]byte
if err := cbor.DecodeInto(v.Raw, &comms); err != nil {
return xerrors.New("could not decode comms")
@ -485,12 +490,6 @@ func (sma StorageMinerActor) SubmitFallbackPoSt(act *types.Actor, vmctx types.VM
return nil, aerrors.Absorb(err, 3, "could not decode sectorset")
}
faults, nerr := self.CurrentFaultSet.All()
if nerr != nil {
return nil, aerrors.Absorb(err, 5, "RLE+ invalid")
}
_ = faults
proverID := vmctx.Message().To // TODO: normalize to ID address
var candidates []sectorbuilder.EPostCandidate
@ -598,6 +597,25 @@ func GetFromSectorSet(ctx context.Context, s types.Storage, ss cid.Cid, sectorID
return true, comms[0], comms[1], nil
}
func RemoveFromSectorSet(ctx context.Context, s types.Storage, ss cid.Cid, ids []uint64) (cid.Cid, aerrors.ActorError) {
ssr, err := amt.LoadAMT(types.WrapStorage(s), ss)
if err != nil {
return cid.Undef, aerrors.HandleExternalError(err, "could not load sector set node")
}
if err := ssr.BatchDelete(ids); err != nil {
return cid.Undef, aerrors.HandleExternalError(err, "failed to delete from sector set")
}
ncid, err := ssr.Flush()
if err != nil {
return cid.Undef, aerrors.HandleExternalError(err, "failed to flush sector set")
}
return ncid, nil
}
func ValidatePoRep(ctx context.Context, maddr address.Address, ssize uint64, commD, commR, ticket, proof, seed []byte, sectorID uint64) (bool, ActorError) {
_, span := trace.StartSpan(ctx, "ValidatePoRep")
defer span.End()
@ -787,34 +805,28 @@ type DeclareFaultsParams struct {
}
func (sma StorageMinerActor) DeclareFaults(act *types.Actor, vmctx types.VMContext, params *DeclareFaultsParams) ([]byte, ActorError) {
/*
oldstate, self, aerr := loadState(vmctx)
if aerr != nil {
return nil, aerr
}
oldstate, self, aerr := loadState(vmctx)
if aerr != nil {
return nil, aerr
}
challengeHeight := self.ProvingPeriodEnd - build.PoStChallangeTime
nfaults, err := types.MergeBitFields(params.Faults, self.FaultSet)
if err != nil {
return nil, aerrors.Absorb(err, 1, "failed to merge bitfields")
}
if vmctx.BlockHeight() < challengeHeight {
// TODO: optimized bitfield methods
for _, v := range params.Faults.All() {
self.CurrentFaultSet.Set(v)
}
} else {
for _, v := range params.Faults.All() {
self.NextFaultSet.Set(v)
}
}
self.FaultSet = nfaults
nstate, err := vmctx.Storage().Put(self)
if err != nil {
return nil, err
}
if err := vmctx.Storage().Commit(oldstate, nstate); err != nil {
return nil, err
}
self.LastFaultSubmission = vmctx.BlockHeight()
nstate, aerr := vmctx.Storage().Put(self)
if err != nil {
return nil, aerr
}
if err := vmctx.Storage().Commit(oldstate, nstate); err != nil {
return nil, err
}
*/
return nil, nil
}
@ -906,10 +918,12 @@ func onSuccessfulPoSt(self *StorageMinerActorState, vmctx types.VMContext) aerro
return aerrors.HandleExternalError(nerr, "failed to load proving set")
}
self.CurrentFaultSet = self.NextFaultSet
self.NextFaultSet = types.NewBitField()
faults, nerr := self.FaultSet.All()
if nerr != nil {
return aerrors.Absorb(nerr, 1, "invalid bitfield (fatal?)")
}
faults := []uint64{} // TODO
self.FaultSet = types.NewBitField()
oldPower := self.Power
self.Power = types.BigMul(types.NewInt(pss.Count-uint64(len(faults))),
@ -941,7 +955,13 @@ func onSuccessfulPoSt(self *StorageMinerActorState, vmctx types.VMContext) aerro
return err
}
self.ProvingSet = self.Sectors
ncid, err := RemoveFromSectorSet(vmctx.Context(), vmctx.Storage(), self.Sectors, faults)
if err != nil {
return err
}
self.Sectors = ncid
self.ProvingSet = ncid
self.ElectionPeriodStart = vmctx.BlockHeight()
return nil
}

View File

@ -0,0 +1,157 @@
package actors_test
import (
"bytes"
"context"
"math/rand"
"testing"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/actors"
"github.com/filecoin-project/lotus/chain/actors/aerrors"
"github.com/filecoin-project/lotus/chain/address"
"github.com/filecoin-project/lotus/chain/stmgr"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/lib/sectorbuilder"
hamt "github.com/ipfs/go-hamt-ipld"
blockstore "github.com/ipfs/go-ipfs-blockstore"
cbg "github.com/whyrusleeping/cbor-gen"
)
func TestMinerCommitSectors(t *testing.T) {
var worker, client address.Address
var minerAddr address.Address
opts := []HarnessOpt{
HarnessAddr(&worker, 1000000),
HarnessAddr(&client, 1000000),
HarnessActor(&minerAddr, &worker, actors.StorageMinerCodeCid,
func() cbg.CBORMarshaler {
return &actors.StorageMinerConstructorParams{
Owner: worker,
Worker: worker,
SectorSize: 1024,
PeerID: "fakepeerid",
}
}),
}
h := NewHarness(t, opts...)
h.vm.Syscalls.ValidatePoRep = func(ctx context.Context, maddr address.Address, ssize uint64, commD, commR, ticket, proof, seed []byte, sectorID uint64) (bool, aerrors.ActorError) {
// all proofs are valid
return true, nil
}
ret, _ := h.SendFunds(t, worker, minerAddr, types.NewInt(100000))
ApplyOK(t, ret)
ret, _ = h.InvokeWithValue(t, client, actors.StorageMarketAddress, actors.SMAMethods.AddBalance, types.NewInt(2000), nil)
ApplyOK(t, ret)
addSectorToMiner(h, t, minerAddr, worker, client, 1)
assertSectorIDs(h, t, minerAddr, []uint64{1})
}
func addSectorToMiner(h *Harness, t *testing.T, minerAddr, worker, client address.Address, sid uint64) {
t.Helper()
s := sectorbuilder.UserBytesForSectorSize(1024)
deal := h.makeFakeDeal(t, minerAddr, worker, client, s)
ret, _ := h.Invoke(t, worker, actors.StorageMarketAddress, actors.SMAMethods.PublishStorageDeals,
&actors.PublishStorageDealsParams{
Deals: []actors.StorageDealProposal{*deal},
})
ApplyOK(t, ret)
var dealIds actors.PublishStorageDealResponse
if err := dealIds.UnmarshalCBOR(bytes.NewReader(ret.Return)); err != nil {
t.Fatal(err)
}
dealid := dealIds.DealIDs[0]
ret, _ = h.Invoke(t, worker, minerAddr, actors.MAMethods.PreCommitSector,
&actors.SectorPreCommitInfo{
SectorNumber: sid,
CommR: []byte("cats"),
SealEpoch: 10,
DealIDs: []uint64{dealid},
})
ApplyOK(t, ret)
h.BlockHeight += 100
ret, _ = h.Invoke(t, worker, minerAddr, actors.MAMethods.ProveCommitSector,
&actors.SectorProveCommitInfo{
Proof: []byte("prooofy"),
SectorID: sid,
DealIDs: []uint64{dealid}, // TODO: weird that i have to pass this again
})
ApplyOK(t, ret)
}
func assertSectorIDs(h *Harness, t *testing.T, maddr address.Address, ids []uint64) {
t.Helper()
sectors, err := getMinerSectorSet(context.TODO(), h.vm.StateTree(), h.bs, maddr)
if err != nil {
t.Fatal(err)
}
if len(sectors) != len(ids) {
t.Fatal("miner has wrong number of sectors in their sector set")
}
all := make(map[uint64]bool)
for _, s := range sectors {
all[s.SectorID] = true
}
for _, id := range ids {
if !all[id] {
t.Fatal("expected to find sector ID: ", id)
}
}
}
func getMinerSectorSet(ctx context.Context, st types.StateTree, bs blockstore.Blockstore, maddr address.Address) ([]*api.ChainSectorInfo, error) {
mact, err := st.GetActor(maddr)
if err != nil {
return nil, err
}
cst := hamt.CSTFromBstore(bs)
var mstate actors.StorageMinerActorState
if err := cst.Get(ctx, mact.Head, &mstate); err != nil {
return nil, err
}
return stmgr.LoadSectorsFromSet(ctx, bs, mstate.Sectors)
}
func (h *Harness) makeFakeDeal(t *testing.T, miner, worker, client address.Address, size uint64) *actors.StorageDealProposal {
data := make([]byte, size)
rand.Read(data)
commP, err := sectorbuilder.GeneratePieceCommitment(bytes.NewReader(data), size)
if err != nil {
t.Fatal(err)
}
prop := actors.StorageDealProposal{
PieceRef: commP[:],
PieceSize: size,
//PieceSerialization SerializationMode // Needs to be here as it tells how data in the sector maps to PieceRef cid
Client: client,
Provider: miner,
ProposalExpiration: 10000,
Duration: 150,
StoragePricePerEpoch: types.NewInt(1),
StorageCollateral: types.NewInt(0),
}
if err := api.SignWith(context.TODO(), h.w.Sign, client, &prop); err != nil {
t.Fatal(err)
}
return &prop
}

View File

@ -83,7 +83,7 @@ func TestPaychUpdate(t *testing.T) {
ApplyOK(t, ret)
// now we have to 'wait' for the chain to advance.
h.vm.SetBlockHeight(1000)
h.BlockHeight = 1000
ret, _ = h.Invoke(t, targetAddr, pch, actors.PCAMethods.Collect, nil)
ApplyOK(t, ret)

View File

@ -248,13 +248,13 @@ func (t *StorageMinerActorState) MarshalCBOR(w io.Writer) error {
return xerrors.Errorf("failed to write cid field t.Info: %w", err)
}
// t.t.CurrentFaultSet (types.BitField) (struct)
if err := t.CurrentFaultSet.MarshalCBOR(w); err != nil {
// t.t.FaultSet (types.BitField) (struct)
if err := t.FaultSet.MarshalCBOR(w); err != nil {
return err
}
// t.t.NextFaultSet (types.BitField) (struct)
if err := t.NextFaultSet.MarshalCBOR(w); err != nil {
// t.t.LastFaultSubmission (uint64) (uint64)
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, uint64(t.LastFaultSubmission))); err != nil {
return err
}
@ -384,24 +384,25 @@ func (t *StorageMinerActorState) UnmarshalCBOR(r io.Reader) error {
t.Info = c
}
// t.t.CurrentFaultSet (types.BitField) (struct)
// t.t.FaultSet (types.BitField) (struct)
{
if err := t.CurrentFaultSet.UnmarshalCBOR(br); err != nil {
if err := t.FaultSet.UnmarshalCBOR(br); err != nil {
return err
}
}
// t.t.NextFaultSet (types.BitField) (struct)
{
if err := t.NextFaultSet.UnmarshalCBOR(br); err != nil {
return err
}
// t.t.LastFaultSubmission (uint64) (uint64)
maj, extra, err = cbg.CborReadHeader(br)
if err != nil {
return err
}
if maj != cbg.MajUnsignedInt {
return fmt.Errorf("wrong type for uint64 field")
}
t.LastFaultSubmission = uint64(extra)
// t.t.Power (types.BigInt) (struct)
{
@ -1031,6 +1032,49 @@ func (t *UpdatePeerIDParams) UnmarshalCBOR(r io.Reader) error {
return nil
}
func (t *DeclareFaultsParams) MarshalCBOR(w io.Writer) error {
if t == nil {
_, err := w.Write(cbg.CborNull)
return err
}
if _, err := w.Write([]byte{129}); err != nil {
return err
}
// t.t.Faults (types.BitField) (struct)
if err := t.Faults.MarshalCBOR(w); err != nil {
return err
}
return nil
}
func (t *DeclareFaultsParams) UnmarshalCBOR(r io.Reader) error {
br := cbg.GetPeeker(r)
maj, extra, err := cbg.CborReadHeader(br)
if err != nil {
return err
}
if maj != cbg.MajArray {
return fmt.Errorf("cbor input should be of type array")
}
if extra != 1 {
return fmt.Errorf("cbor input had wrong number of fields")
}
// t.t.Faults (types.BitField) (struct)
{
if err := t.Faults.UnmarshalCBOR(br); err != nil {
return err
}
}
return nil
}
func (t *MultiSigActorState) MarshalCBOR(w io.Writer) error {
if t == nil {
_, err := w.Write(cbg.CborNull)

View File

@ -3,6 +3,7 @@ package actors_test
import (
"bytes"
"context"
"math/rand"
"testing"
"github.com/ipfs/go-cid"
@ -41,10 +42,12 @@ const (
type HarnessOpt func(testing.TB, *Harness) error
type Harness struct {
HI HarnessInit
Stage HarnessStage
Nonces map[address.Address]uint64
GasCharges map[address.Address]types.BigInt
HI HarnessInit
Stage HarnessStage
Nonces map[address.Address]uint64
GasCharges map[address.Address]types.BigInt
Rand vm.Rand
BlockHeight uint64
lastBalanceCheck map[address.Address]types.BigInt
@ -127,6 +130,7 @@ func NewHarness(t *testing.T, options ...HarnessOpt) *Harness {
h := &Harness{
Stage: HarnessPreInit,
Nonces: make(map[address.Address]uint64),
Rand: &fakeRand{},
HI: HarnessInit{
NAddrs: 1,
Miner: blsaddr(0),
@ -140,6 +144,7 @@ func NewHarness(t *testing.T, options ...HarnessOpt) *Harness {
w: w,
ctx: context.Background(),
bs: bstore.NewBlockstore(dstore.NewMapDatastore()),
BlockHeight: 0,
}
for _, opt := range options {
err := opt(t, h)
@ -157,8 +162,14 @@ func NewHarness(t *testing.T, options ...HarnessOpt) *Harness {
if err != nil {
t.Fatal(err)
}
stateroot, err = gen.SetupStorageMarketActor(h.bs, stateroot, nil)
if err != nil {
t.Fatal(err)
}
h.cs = store.NewChainStore(h.bs, nil)
h.vm, err = vm.NewVM(stateroot, 1, nil, h.HI.Miner, h.cs.Blockstore())
h.vm, err = vm.NewVM(stateroot, 1, h.Rand, h.HI.Miner, h.cs.Blockstore())
if err != nil {
t.Fatal(err)
}
@ -246,6 +257,7 @@ func (h *Harness) Invoke(t testing.TB, from address.Address, to address.Address,
func (h *Harness) InvokeWithValue(t testing.TB, from address.Address, to address.Address,
method uint64, value types.BigInt, params cbg.CBORMarshaler) (*vm.ApplyRet, *state.StateTree) {
t.Helper()
h.vm.SetBlockHeight(h.BlockHeight)
return h.Apply(t, types.Message{
To: to,
From: from,
@ -315,3 +327,11 @@ func DumpObject(t testing.TB, obj cbg.CBORMarshaler) []byte {
}
return b.Bytes()
}
type fakeRand struct{}
func (fr *fakeRand) GetRandomness(ctx context.Context, h int64) ([]byte, error) {
out := make([]byte, 32)
rand.New(rand.NewSource(h)).Read(out)
return out, nil
}

View File

@ -34,6 +34,38 @@ func BitFieldFromSet(setBits []uint64) BitField {
return res
}
func MergeBitFields(a, b BitField) (BitField, error) {
ra, err := a.rle.RunIterator()
if err != nil {
return BitField{}, err
}
rb, err := b.rle.RunIterator()
if err != nil {
return BitField{}, err
}
merge, err := rlepluslazy.Sum(ra, rb)
if err != nil {
return BitField{}, err
}
mergebytes, err := rlepluslazy.EncodeRuns(merge, nil)
if err != nil {
return BitField{}, err
}
rle, err := rlepluslazy.FromBuf(mergebytes)
if err != nil {
return BitField{}, err
}
return BitField{
rle: rle,
bits: make(map[uint64]struct{}),
}, nil
}
func (bf BitField) sum() (rlepluslazy.RunIterator, error) {
if len(bf.bits) == 0 {
return bf.rle.RunIterator()
@ -86,7 +118,26 @@ func (bf BitField) All() ([]uint64, error) {
return nil, err
}
return res, err
return res, nil
}
func (bf BitField) AllMap() (map[uint64]bool, error) {
runs, err := bf.sum()
if err != nil {
return nil, err
}
res, err := rlepluslazy.SliceFromRuns(runs)
if err != nil {
return nil, err
}
out := make(map[uint64]bool)
for _, i := range res {
out[i] = true
}
return out, nil
}
func (bf BitField) MarshalCBOR(w io.Writer) error {

View File

@ -40,10 +40,15 @@ type VMContext interface {
ChargeGas(uint64) aerrors.ActorError
GetRandomness(height uint64) ([]byte, aerrors.ActorError)
GetBalance(address.Address) (BigInt, aerrors.ActorError)
Sys() *VMSyscalls
Context() context.Context
}
type VMSyscalls struct {
ValidatePoRep func(context.Context, address.Address, uint64, []byte, []byte, []byte, []byte, []byte, uint64) (bool, aerrors.ActorError)
}
type storageWrapper struct {
s Storage
}

View File

@ -48,7 +48,7 @@ func makeActor(st *state.StateTree, addr address.Address) (*types.Actor, aerrors
case address.SECP256K1:
return NewSecp256k1AccountActor(st, addr)
case address.ID:
return nil, aerrors.New(1, "no actor with given ID")
return nil, aerrors.Newf(1, "no actor with given ID: %s", addr)
case address.Actor:
return nil, aerrors.Newf(1, "no such actor: %s", addr)
default:

14
chain/vm/syscalls.go Normal file
View File

@ -0,0 +1,14 @@
package vm
import (
"github.com/filecoin-project/lotus/chain/actors"
"github.com/filecoin-project/lotus/chain/types"
)
// Actual type is defined in chain/types/vmcontext.go because the VMContext interface is there
func DefaultSyscalls() *types.VMSyscalls {
return &types.VMSyscalls{
ValidatePoRep: actors.ValidatePoRep,
}
}

View File

@ -54,6 +54,8 @@ type VMContext struct {
gasAvailable types.BigInt
gasUsed types.BigInt
sys *types.VMSyscalls
// root cid of the state of the actor this invocation will be on
sroot cid.Cid
@ -75,6 +77,10 @@ func (vmc *VMContext) GetRandomness(height uint64) ([]byte, aerrors.ActorError)
return res, nil
}
func (vmc *VMContext) Sys() *types.VMSyscalls {
return vmc.sys
}
// Storage interface
func (vmc *VMContext) Put(i cbg.CBORMarshaler) (cid.Cid, aerrors.ActorError) {
@ -284,6 +290,7 @@ func (vm *VM) makeVMContext(ctx context.Context, sroot cid.Cid, msg *types.Messa
msg: msg,
origin: origin,
height: vm.blockHeight,
sys: vm.Syscalls,
gasUsed: usedGas,
gasAvailable: msg.GasLimit,
@ -304,6 +311,8 @@ type VM struct {
blockMiner address.Address
inv *invoker
rand Rand
Syscalls *types.VMSyscalls
}
func NewVM(base cid.Cid, height uint64, r Rand, maddr address.Address, cbs blockstore.Blockstore) (*VM, error) {
@ -323,6 +332,7 @@ func NewVM(base cid.Cid, height uint64, r Rand, maddr address.Address, cbs block
blockMiner: maddr,
inv: newInvoker(),
rand: r,
Syscalls: DefaultSyscalls(),
}, nil
}

View File

@ -96,6 +96,7 @@ func main() {
actors.SubmitFallbackPoStParams{},
actors.PaymentVerifyParams{},
actors.UpdatePeerIDParams{},
actors.DeclareFaultsParams{},
actors.MultiSigActorState{},
actors.MultiSigConstructorParams{},
actors.MultiSigProposeParams{},

View File

@ -342,6 +342,18 @@ func (t *SectorInfo) MarshalCBOR(w io.Writer) error {
}
}
// t.t.FaultReportMsg (cid.Cid) (struct)
if t.FaultReportMsg == nil {
if _, err := w.Write(cbg.CborNull); err != nil {
return err
}
} else {
if err := cbg.WriteCid(w, *t.FaultReportMsg); err != nil {
return xerrors.Errorf("failed to write cid field t.FaultReportMsg: %w", err)
}
}
// t.t.LastErr (string) (string)
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajTextString, uint64(len(t.LastErr)))); err != nil {
return err
@ -574,6 +586,30 @@ func (t *SectorInfo) UnmarshalCBOR(r io.Reader) error {
t.CommitMessage = &c
}
}
// t.t.FaultReportMsg (cid.Cid) (struct)
{
pb, err := br.PeekByte()
if err != nil {
return err
}
if pb == cbg.CborNull[0] {
var nbuf [1]byte
if _, err := br.Read(nbuf[:]); err != nil {
return err
}
} else {
c, err := cbg.ReadCid(br)
if err != nil {
return xerrors.Errorf("failed to read cid field t.FaultReportMsg: %w", err)
}
t.FaultReportMsg = &c
}
}
// t.t.LastErr (string) (string)

View File

@ -233,3 +233,57 @@ func (m *Miner) handleCommitWait(ctx context.Context, sector SectorInfo) *sector
return sector.upd().to(api.Proving).state(func(info *SectorInfo) {
})
}
func (m *Miner) handleFaulty(ctx context.Context, sector SectorInfo) *sectorUpdate {
// TODO: check if the fault has already been reported, and that this sector is even valid
// TODO: coalesce faulty sector reporting
bf := types.NewBitField()
bf.Set(sector.SectorID)
fp := &actors.DeclareFaultsParams{bf}
_ = fp
enc, aerr := actors.SerializeParams(nil)
if aerr != nil {
return sector.upd().fatal(xerrors.Errorf("failed to serialize declare fault params: %w", aerr))
}
msg := &types.Message{
To: m.maddr,
From: m.worker,
Method: actors.MAMethods.DeclareFaults,
Params: enc,
Value: types.NewInt(0), // TODO: need to ensure sufficient collateral
GasLimit: types.NewInt(1000000 /* i dont know help */),
GasPrice: types.NewInt(1),
}
smsg, err := m.api.MpoolPushMessage(ctx, msg)
if err != nil {
return sector.upd().to(api.FailedUnrecoverable).error(xerrors.Errorf("failed to push declare faults message to network: %w", err))
}
return sector.upd().to(api.FaultReported).state(func(info *SectorInfo) {
c := smsg.Cid()
info.FaultReportMsg = &c
})
}
func (m *Miner) handleFaultReported(ctx context.Context, sector SectorInfo) *sectorUpdate {
if sector.FaultReportMsg == nil {
return sector.upd().to(api.FailedUnrecoverable).error(xerrors.Errorf("entered fault reported state without a FaultReportMsg cid"))
}
mw, err := m.api.StateWaitMsg(ctx, *sector.FaultReportMsg)
if err != nil {
return sector.upd().to(api.CommitFailed).error(xerrors.Errorf("failed to wait for fault declaration: %w", err))
}
if mw.Receipt.ExitCode != 0 {
log.Errorf("UNHANDLED: declaring sector fault failed (exit=%d, msg=%s) (id: %d)", mw.Receipt.ExitCode, *sector.FaultReportMsg, sector.SectorID)
return sector.upd().fatal(xerrors.Errorf("UNHANDLED: submitting fault declaration failed (exit %d)", mw.Receipt.ExitCode))
}
return sector.upd().to(api.FaultedFinal).state(func(info *SectorInfo) {})
}

View File

@ -71,6 +71,9 @@ type SectorInfo struct {
// Committing
CommitMessage *cid.Cid
// Faults
FaultReportMsg *cid.Cid
// Debug
LastErr string
}

View File

@ -255,6 +255,12 @@ func (m *Miner) onSectorUpdated(ctx context.Context, update sectorUpdate) {
case api.CommitFailed:
log.Warn("sector %d entered unimplemented state 'CommitFailed'", update.id)
// Faults
case api.Faulty:
m.handleSectorUpdate(ctx, sector, m.handleFaulty)
case api.FaultReported:
m.handleSectorUpdate(ctx, sector, m.handleFaultReported)
// Fatal errors
case api.UndefinedSectorState:
log.Error("sector update with undefined state!")