From d2448912a62dfb45d832b338794af216fbaa310c Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Fri, 6 Dec 2019 15:06:42 +0100 Subject: [PATCH 1/2] Implement basic fault handling WIP: miner actor tests working miner actor test rebase and fix tests --- api/api_storage.go | 9 ++ chain/actors/actor_miner.go | 104 +++++++++++--------- chain/actors/actor_miner_test.go | 157 +++++++++++++++++++++++++++++++ chain/actors/actor_paych_test.go | 2 +- chain/actors/cbor_gen.go | 70 +++++++++++--- chain/actors/harness2_test.go | 30 +++++- chain/types/bitfield.go | 53 ++++++++++- chain/types/vmcontext.go | 5 + chain/vm/mkactor.go | 2 +- chain/vm/syscalls.go | 14 +++ chain/vm/vm.go | 10 ++ gen/main.go | 1 + storage/cbor_gen.go | 36 +++++++ storage/sector_states.go | 54 +++++++++++ storage/sector_types.go | 3 + storage/sectors.go | 6 ++ 16 files changed, 493 insertions(+), 63 deletions(-) create mode 100644 chain/actors/actor_miner_test.go create mode 100644 chain/vm/syscalls.go diff --git a/api/api_storage.go b/api/api_storage.go index 816ad74bf..5b3c200ce 100644 --- a/api/api_storage.go +++ b/api/api_storage.go @@ -29,6 +29,10 @@ const ( CommitFailed FailedUnrecoverable + + Faulty // sector is corrupted or gone for some reason + FaultReported // sector has been declared as a fault on chain + FaultedFinal // fault declared on chain ) var SectorStates = []string{ @@ -39,6 +43,7 @@ var SectorStates = []string{ PreCommitting: "PreCommitting", PreCommitted: "PreCommitted", Committing: "Committing", + CommitWait: "CommitWait", Proving: "Proving", SealFailed: "SealFailed", @@ -47,6 +52,10 @@ var SectorStates = []string{ CommitFailed: "CommitFailed", FailedUnrecoverable: "FailedUnrecoverable", + + Faulty: "Faulty", + FaultReported: "FaultReported", + FaultedFinal: "FaultedFinal", } // StorageMiner is a low-level interface to the Filecoin network storage miner node diff --git a/chain/actors/actor_miner.go b/chain/actors/actor_miner.go index 6c775eac0..4e1ba5570 100644 --- a/chain/actors/actor_miner.go +++ b/chain/actors/actor_miner.go @@ -53,14 +53,10 @@ type StorageMinerActorState struct { // Contains mostly static info about this miner Info cid.Cid - // Faulty sectors reported since last SubmitPost, - // up to the current proving period's challenge time. - CurrentFaultSet types.BitField + // Faulty sectors reported since last SubmitPost + FaultSet types.BitField - // Faults submitted after the current proving period's challenge time, - // but before the PoSt for that period is submitted. - // These become the currentFaultSet when a PoSt is submitted. - NextFaultSet types.BitField + LastFaultSubmission uint64 // Amount of power this miner has. Power types.BigInt @@ -340,7 +336,7 @@ func (sma StorageMinerActor) ProveCommitSector(act *types.Actor, vmctx types.VMC return nil, aerrors.Wrapf(err, "failed to compute data commitment (sector %d, deals: %v)", params.SectorID, params.DealIDs) } - if ok, err := ValidatePoRep(ctx, maddr, mi.SectorSize, commD, us.Info.CommR, ticket, params.Proof, seed, params.SectorID); err != nil { + if ok, err := vmctx.Sys().ValidatePoRep(ctx, maddr, mi.SectorSize, commD, us.Info.CommR, ticket, params.Proof, seed, params.SectorID); err != nil { return nil, err } else if !ok { return nil, aerrors.Newf(2, "porep proof was invalid (t:%x; s:%x(%d); p:%s)", ticket, seed, us.ReceivedEpoch+build.InteractivePoRepDelay, truncateHexPrint(params.Proof)) @@ -463,8 +459,17 @@ func (sma StorageMinerActor) SubmitFallbackPoSt(act *types.Actor, vmctx types.VM return nil, aerrors.HandleExternalError(lerr, "could not load proving set node") } + faults, nerr := self.FaultSet.AllMap() + if nerr != nil { + return nil, aerrors.Absorb(err, 5, "RLE+ invalid") + } + var sectorInfos []ffi.PublicSectorInfo if err := pss.ForEach(func(id uint64, v *cbg.Deferred) error { + if faults[id] { + return nil + } + var comms [][]byte if err := cbor.DecodeInto(v.Raw, &comms); err != nil { return xerrors.New("could not decode comms") @@ -485,12 +490,6 @@ func (sma StorageMinerActor) SubmitFallbackPoSt(act *types.Actor, vmctx types.VM return nil, aerrors.Absorb(err, 3, "could not decode sectorset") } - faults, nerr := self.CurrentFaultSet.All() - if nerr != nil { - return nil, aerrors.Absorb(err, 5, "RLE+ invalid") - } - _ = faults - proverID := vmctx.Message().To // TODO: normalize to ID address var candidates []sectorbuilder.EPostCandidate @@ -598,6 +597,25 @@ func GetFromSectorSet(ctx context.Context, s types.Storage, ss cid.Cid, sectorID return true, comms[0], comms[1], nil } +func RemoveFromSectorSet(ctx context.Context, s types.Storage, ss cid.Cid, ids []uint64) (cid.Cid, aerrors.ActorError) { + + ssr, err := amt.LoadAMT(types.WrapStorage(s), ss) + if err != nil { + return cid.Undef, aerrors.HandleExternalError(err, "could not load sector set node") + } + + if err := ssr.BatchDelete(ids); err != nil { + return cid.Undef, aerrors.HandleExternalError(err, "failed to delete from sector set") + } + + ncid, err := ssr.Flush() + if err != nil { + return cid.Undef, aerrors.HandleExternalError(err, "failed to flush sector set") + } + + return ncid, nil +} + func ValidatePoRep(ctx context.Context, maddr address.Address, ssize uint64, commD, commR, ticket, proof, seed []byte, sectorID uint64) (bool, ActorError) { _, span := trace.StartSpan(ctx, "ValidatePoRep") defer span.End() @@ -787,34 +805,28 @@ type DeclareFaultsParams struct { } func (sma StorageMinerActor) DeclareFaults(act *types.Actor, vmctx types.VMContext, params *DeclareFaultsParams) ([]byte, ActorError) { - /* - oldstate, self, aerr := loadState(vmctx) - if aerr != nil { - return nil, aerr - } + oldstate, self, aerr := loadState(vmctx) + if aerr != nil { + return nil, aerr + } - challengeHeight := self.ProvingPeriodEnd - build.PoStChallangeTime + nfaults, err := types.MergeBitFields(params.Faults, self.FaultSet) + if err != nil { + return nil, aerrors.Absorb(err, 1, "failed to merge bitfields") + } - if vmctx.BlockHeight() < challengeHeight { - // TODO: optimized bitfield methods - for _, v := range params.Faults.All() { - self.CurrentFaultSet.Set(v) - } - } else { - for _, v := range params.Faults.All() { - self.NextFaultSet.Set(v) - } - } + self.FaultSet = nfaults - nstate, err := vmctx.Storage().Put(self) - if err != nil { - return nil, err - } - if err := vmctx.Storage().Commit(oldstate, nstate); err != nil { - return nil, err - } + self.LastFaultSubmission = vmctx.BlockHeight() + + nstate, aerr := vmctx.Storage().Put(self) + if err != nil { + return nil, aerr + } + if err := vmctx.Storage().Commit(oldstate, nstate); err != nil { + return nil, err + } - */ return nil, nil } @@ -906,10 +918,12 @@ func onSuccessfulPoSt(self *StorageMinerActorState, vmctx types.VMContext) aerro return aerrors.HandleExternalError(nerr, "failed to load proving set") } - self.CurrentFaultSet = self.NextFaultSet - self.NextFaultSet = types.NewBitField() + faults, nerr := self.FaultSet.All() + if nerr != nil { + return aerrors.Absorb(nerr, 1, "invalid bitfield (fatal?)") + } - faults := []uint64{} // TODO + self.FaultSet = types.NewBitField() oldPower := self.Power self.Power = types.BigMul(types.NewInt(pss.Count-uint64(len(faults))), @@ -941,7 +955,13 @@ func onSuccessfulPoSt(self *StorageMinerActorState, vmctx types.VMContext) aerro return err } - self.ProvingSet = self.Sectors + ncid, err := RemoveFromSectorSet(vmctx.Context(), vmctx.Storage(), self.Sectors, faults) + if err != nil { + return err + } + + self.Sectors = ncid + self.ProvingSet = ncid self.ElectionPeriodStart = vmctx.BlockHeight() return nil } diff --git a/chain/actors/actor_miner_test.go b/chain/actors/actor_miner_test.go new file mode 100644 index 000000000..28a5ce046 --- /dev/null +++ b/chain/actors/actor_miner_test.go @@ -0,0 +1,157 @@ +package actors_test + +import ( + "bytes" + "context" + "math/rand" + "testing" + + "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/chain/actors" + "github.com/filecoin-project/lotus/chain/actors/aerrors" + "github.com/filecoin-project/lotus/chain/address" + "github.com/filecoin-project/lotus/chain/stmgr" + "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/lib/sectorbuilder" + hamt "github.com/ipfs/go-hamt-ipld" + blockstore "github.com/ipfs/go-ipfs-blockstore" + cbg "github.com/whyrusleeping/cbor-gen" +) + +func TestMinerCommitSectors(t *testing.T) { + var worker, client address.Address + var minerAddr address.Address + opts := []HarnessOpt{ + HarnessAddr(&worker, 1000000), + HarnessAddr(&client, 1000000), + HarnessActor(&minerAddr, &worker, actors.StorageMinerCodeCid, + func() cbg.CBORMarshaler { + return &actors.StorageMinerConstructorParams{ + Owner: worker, + Worker: worker, + SectorSize: 1024, + PeerID: "fakepeerid", + } + }), + } + + h := NewHarness(t, opts...) + h.vm.Syscalls.ValidatePoRep = func(ctx context.Context, maddr address.Address, ssize uint64, commD, commR, ticket, proof, seed []byte, sectorID uint64) (bool, aerrors.ActorError) { + // all proofs are valid + return true, nil + } + + ret, _ := h.SendFunds(t, worker, minerAddr, types.NewInt(100000)) + ApplyOK(t, ret) + + ret, _ = h.InvokeWithValue(t, client, actors.StorageMarketAddress, actors.SMAMethods.AddBalance, types.NewInt(2000), nil) + ApplyOK(t, ret) + + addSectorToMiner(h, t, minerAddr, worker, client, 1) + + assertSectorIDs(h, t, minerAddr, []uint64{1}) +} + +func addSectorToMiner(h *Harness, t *testing.T, minerAddr, worker, client address.Address, sid uint64) { + t.Helper() + s := sectorbuilder.UserBytesForSectorSize(1024) + deal := h.makeFakeDeal(t, minerAddr, worker, client, s) + ret, _ := h.Invoke(t, worker, actors.StorageMarketAddress, actors.SMAMethods.PublishStorageDeals, + &actors.PublishStorageDealsParams{ + Deals: []actors.StorageDealProposal{*deal}, + }) + ApplyOK(t, ret) + var dealIds actors.PublishStorageDealResponse + if err := dealIds.UnmarshalCBOR(bytes.NewReader(ret.Return)); err != nil { + t.Fatal(err) + } + + dealid := dealIds.DealIDs[0] + + ret, _ = h.Invoke(t, worker, minerAddr, actors.MAMethods.PreCommitSector, + &actors.SectorPreCommitInfo{ + SectorNumber: sid, + CommR: []byte("cats"), + SealEpoch: 10, + DealIDs: []uint64{dealid}, + }) + ApplyOK(t, ret) + + h.BlockHeight += 100 + ret, _ = h.Invoke(t, worker, minerAddr, actors.MAMethods.ProveCommitSector, + &actors.SectorProveCommitInfo{ + Proof: []byte("prooofy"), + SectorID: sid, + DealIDs: []uint64{dealid}, // TODO: weird that i have to pass this again + }) + ApplyOK(t, ret) +} + +func assertSectorIDs(h *Harness, t *testing.T, maddr address.Address, ids []uint64) { + t.Helper() + sectors, err := getMinerSectorSet(context.TODO(), h.vm.StateTree(), h.bs, maddr) + if err != nil { + t.Fatal(err) + } + + if len(sectors) != len(ids) { + t.Fatal("miner has wrong number of sectors in their sector set") + } + + all := make(map[uint64]bool) + for _, s := range sectors { + all[s.SectorID] = true + } + + for _, id := range ids { + if !all[id] { + t.Fatal("expected to find sector ID: ", id) + } + } +} + +func getMinerSectorSet(ctx context.Context, st types.StateTree, bs blockstore.Blockstore, maddr address.Address) ([]*api.ChainSectorInfo, error) { + mact, err := st.GetActor(maddr) + if err != nil { + return nil, err + } + + cst := hamt.CSTFromBstore(bs) + + var mstate actors.StorageMinerActorState + if err := cst.Get(ctx, mact.Head, &mstate); err != nil { + return nil, err + } + + return stmgr.LoadSectorsFromSet(ctx, bs, mstate.Sectors) +} + +func (h *Harness) makeFakeDeal(t *testing.T, miner, worker, client address.Address, size uint64) *actors.StorageDealProposal { + data := make([]byte, size) + rand.Read(data) + commP, err := sectorbuilder.GeneratePieceCommitment(bytes.NewReader(data), size) + if err != nil { + t.Fatal(err) + } + + prop := actors.StorageDealProposal{ + PieceRef: commP[:], + PieceSize: size, + //PieceSerialization SerializationMode // Needs to be here as it tells how data in the sector maps to PieceRef cid + + Client: client, + Provider: miner, + + ProposalExpiration: 10000, + Duration: 150, + + StoragePricePerEpoch: types.NewInt(1), + StorageCollateral: types.NewInt(0), + } + + if err := api.SignWith(context.TODO(), h.w.Sign, client, &prop); err != nil { + t.Fatal(err) + } + + return &prop +} diff --git a/chain/actors/actor_paych_test.go b/chain/actors/actor_paych_test.go index e23acdf4a..55de306c0 100644 --- a/chain/actors/actor_paych_test.go +++ b/chain/actors/actor_paych_test.go @@ -83,7 +83,7 @@ func TestPaychUpdate(t *testing.T) { ApplyOK(t, ret) // now we have to 'wait' for the chain to advance. - h.vm.SetBlockHeight(1000) + h.BlockHeight = 1000 ret, _ = h.Invoke(t, targetAddr, pch, actors.PCAMethods.Collect, nil) ApplyOK(t, ret) diff --git a/chain/actors/cbor_gen.go b/chain/actors/cbor_gen.go index 475add9da..bdace2e38 100644 --- a/chain/actors/cbor_gen.go +++ b/chain/actors/cbor_gen.go @@ -248,13 +248,13 @@ func (t *StorageMinerActorState) MarshalCBOR(w io.Writer) error { return xerrors.Errorf("failed to write cid field t.Info: %w", err) } - // t.t.CurrentFaultSet (types.BitField) (struct) - if err := t.CurrentFaultSet.MarshalCBOR(w); err != nil { + // t.t.FaultSet (types.BitField) (struct) + if err := t.FaultSet.MarshalCBOR(w); err != nil { return err } - // t.t.NextFaultSet (types.BitField) (struct) - if err := t.NextFaultSet.MarshalCBOR(w); err != nil { + // t.t.LastFaultSubmission (uint64) (uint64) + if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, uint64(t.LastFaultSubmission))); err != nil { return err } @@ -384,24 +384,25 @@ func (t *StorageMinerActorState) UnmarshalCBOR(r io.Reader) error { t.Info = c } - // t.t.CurrentFaultSet (types.BitField) (struct) + // t.t.FaultSet (types.BitField) (struct) { - if err := t.CurrentFaultSet.UnmarshalCBOR(br); err != nil { + if err := t.FaultSet.UnmarshalCBOR(br); err != nil { return err } } - // t.t.NextFaultSet (types.BitField) (struct) - - { - - if err := t.NextFaultSet.UnmarshalCBOR(br); err != nil { - return err - } + // t.t.LastFaultSubmission (uint64) (uint64) + maj, extra, err = cbg.CborReadHeader(br) + if err != nil { + return err } + if maj != cbg.MajUnsignedInt { + return fmt.Errorf("wrong type for uint64 field") + } + t.LastFaultSubmission = uint64(extra) // t.t.Power (types.BigInt) (struct) { @@ -1031,6 +1032,49 @@ func (t *UpdatePeerIDParams) UnmarshalCBOR(r io.Reader) error { return nil } +func (t *DeclareFaultsParams) MarshalCBOR(w io.Writer) error { + if t == nil { + _, err := w.Write(cbg.CborNull) + return err + } + if _, err := w.Write([]byte{129}); err != nil { + return err + } + + // t.t.Faults (types.BitField) (struct) + if err := t.Faults.MarshalCBOR(w); err != nil { + return err + } + return nil +} + +func (t *DeclareFaultsParams) UnmarshalCBOR(r io.Reader) error { + br := cbg.GetPeeker(r) + + maj, extra, err := cbg.CborReadHeader(br) + if err != nil { + return err + } + if maj != cbg.MajArray { + return fmt.Errorf("cbor input should be of type array") + } + + if extra != 1 { + return fmt.Errorf("cbor input had wrong number of fields") + } + + // t.t.Faults (types.BitField) (struct) + + { + + if err := t.Faults.UnmarshalCBOR(br); err != nil { + return err + } + + } + return nil +} + func (t *MultiSigActorState) MarshalCBOR(w io.Writer) error { if t == nil { _, err := w.Write(cbg.CborNull) diff --git a/chain/actors/harness2_test.go b/chain/actors/harness2_test.go index 0ffc02b1b..6f1da8d58 100644 --- a/chain/actors/harness2_test.go +++ b/chain/actors/harness2_test.go @@ -3,6 +3,7 @@ package actors_test import ( "bytes" "context" + "math/rand" "testing" "github.com/ipfs/go-cid" @@ -41,10 +42,12 @@ const ( type HarnessOpt func(testing.TB, *Harness) error type Harness struct { - HI HarnessInit - Stage HarnessStage - Nonces map[address.Address]uint64 - GasCharges map[address.Address]types.BigInt + HI HarnessInit + Stage HarnessStage + Nonces map[address.Address]uint64 + GasCharges map[address.Address]types.BigInt + Rand vm.Rand + BlockHeight uint64 lastBalanceCheck map[address.Address]types.BigInt @@ -127,6 +130,7 @@ func NewHarness(t *testing.T, options ...HarnessOpt) *Harness { h := &Harness{ Stage: HarnessPreInit, Nonces: make(map[address.Address]uint64), + Rand: &fakeRand{}, HI: HarnessInit{ NAddrs: 1, Miner: blsaddr(0), @@ -140,6 +144,7 @@ func NewHarness(t *testing.T, options ...HarnessOpt) *Harness { w: w, ctx: context.Background(), bs: bstore.NewBlockstore(dstore.NewMapDatastore()), + BlockHeight: 0, } for _, opt := range options { err := opt(t, h) @@ -157,8 +162,14 @@ func NewHarness(t *testing.T, options ...HarnessOpt) *Harness { if err != nil { t.Fatal(err) } + + stateroot, err = gen.SetupStorageMarketActor(h.bs, stateroot, nil) + if err != nil { + t.Fatal(err) + } + h.cs = store.NewChainStore(h.bs, nil) - h.vm, err = vm.NewVM(stateroot, 1, nil, h.HI.Miner, h.cs.Blockstore()) + h.vm, err = vm.NewVM(stateroot, 1, h.Rand, h.HI.Miner, h.cs.Blockstore()) if err != nil { t.Fatal(err) } @@ -246,6 +257,7 @@ func (h *Harness) Invoke(t testing.TB, from address.Address, to address.Address, func (h *Harness) InvokeWithValue(t testing.TB, from address.Address, to address.Address, method uint64, value types.BigInt, params cbg.CBORMarshaler) (*vm.ApplyRet, *state.StateTree) { t.Helper() + h.vm.SetBlockHeight(h.BlockHeight) return h.Apply(t, types.Message{ To: to, From: from, @@ -315,3 +327,11 @@ func DumpObject(t testing.TB, obj cbg.CBORMarshaler) []byte { } return b.Bytes() } + +type fakeRand struct{} + +func (fr *fakeRand) GetRandomness(ctx context.Context, h int64) ([]byte, error) { + out := make([]byte, 32) + rand.New(rand.NewSource(h)).Read(out) + return out, nil +} diff --git a/chain/types/bitfield.go b/chain/types/bitfield.go index b4687f5e8..20e63dd95 100644 --- a/chain/types/bitfield.go +++ b/chain/types/bitfield.go @@ -34,6 +34,38 @@ func BitFieldFromSet(setBits []uint64) BitField { return res } +func MergeBitFields(a, b BitField) (BitField, error) { + ra, err := a.rle.RunIterator() + if err != nil { + return BitField{}, err + } + + rb, err := b.rle.RunIterator() + if err != nil { + return BitField{}, err + } + + merge, err := rlepluslazy.Sum(ra, rb) + if err != nil { + return BitField{}, err + } + + mergebytes, err := rlepluslazy.EncodeRuns(merge, nil) + if err != nil { + return BitField{}, err + } + + rle, err := rlepluslazy.FromBuf(mergebytes) + if err != nil { + return BitField{}, err + } + + return BitField{ + rle: rle, + bits: make(map[uint64]struct{}), + }, nil +} + func (bf BitField) sum() (rlepluslazy.RunIterator, error) { if len(bf.bits) == 0 { return bf.rle.RunIterator() @@ -86,7 +118,26 @@ func (bf BitField) All() ([]uint64, error) { return nil, err } - return res, err + return res, nil +} + +func (bf BitField) AllMap() (map[uint64]bool, error) { + + runs, err := bf.sum() + if err != nil { + return nil, err + } + + res, err := rlepluslazy.SliceFromRuns(runs) + if err != nil { + return nil, err + } + + out := make(map[uint64]bool) + for _, i := range res { + out[i] = true + } + return out, nil } func (bf BitField) MarshalCBOR(w io.Writer) error { diff --git a/chain/types/vmcontext.go b/chain/types/vmcontext.go index 5e2963483..39667d2a4 100644 --- a/chain/types/vmcontext.go +++ b/chain/types/vmcontext.go @@ -40,10 +40,15 @@ type VMContext interface { ChargeGas(uint64) aerrors.ActorError GetRandomness(height uint64) ([]byte, aerrors.ActorError) GetBalance(address.Address) (BigInt, aerrors.ActorError) + Sys() *VMSyscalls Context() context.Context } +type VMSyscalls struct { + ValidatePoRep func(context.Context, address.Address, uint64, []byte, []byte, []byte, []byte, []byte, uint64) (bool, aerrors.ActorError) +} + type storageWrapper struct { s Storage } diff --git a/chain/vm/mkactor.go b/chain/vm/mkactor.go index 1e1885825..3419f9f3c 100644 --- a/chain/vm/mkactor.go +++ b/chain/vm/mkactor.go @@ -48,7 +48,7 @@ func makeActor(st *state.StateTree, addr address.Address) (*types.Actor, aerrors case address.SECP256K1: return NewSecp256k1AccountActor(st, addr) case address.ID: - return nil, aerrors.New(1, "no actor with given ID") + return nil, aerrors.Newf(1, "no actor with given ID: %s", addr) case address.Actor: return nil, aerrors.Newf(1, "no such actor: %s", addr) default: diff --git a/chain/vm/syscalls.go b/chain/vm/syscalls.go new file mode 100644 index 000000000..92f9ea269 --- /dev/null +++ b/chain/vm/syscalls.go @@ -0,0 +1,14 @@ +package vm + +import ( + "github.com/filecoin-project/lotus/chain/actors" + "github.com/filecoin-project/lotus/chain/types" +) + +// Actual type is defined in chain/types/vmcontext.go because the VMContext interface is there + +func DefaultSyscalls() *types.VMSyscalls { + return &types.VMSyscalls{ + ValidatePoRep: actors.ValidatePoRep, + } +} diff --git a/chain/vm/vm.go b/chain/vm/vm.go index 1eed0f895..7c1c5a802 100644 --- a/chain/vm/vm.go +++ b/chain/vm/vm.go @@ -54,6 +54,8 @@ type VMContext struct { gasAvailable types.BigInt gasUsed types.BigInt + sys *types.VMSyscalls + // root cid of the state of the actor this invocation will be on sroot cid.Cid @@ -75,6 +77,10 @@ func (vmc *VMContext) GetRandomness(height uint64) ([]byte, aerrors.ActorError) return res, nil } +func (vmc *VMContext) Sys() *types.VMSyscalls { + return vmc.sys +} + // Storage interface func (vmc *VMContext) Put(i cbg.CBORMarshaler) (cid.Cid, aerrors.ActorError) { @@ -284,6 +290,7 @@ func (vm *VM) makeVMContext(ctx context.Context, sroot cid.Cid, msg *types.Messa msg: msg, origin: origin, height: vm.blockHeight, + sys: vm.Syscalls, gasUsed: usedGas, gasAvailable: msg.GasLimit, @@ -304,6 +311,8 @@ type VM struct { blockMiner address.Address inv *invoker rand Rand + + Syscalls *types.VMSyscalls } func NewVM(base cid.Cid, height uint64, r Rand, maddr address.Address, cbs blockstore.Blockstore) (*VM, error) { @@ -323,6 +332,7 @@ func NewVM(base cid.Cid, height uint64, r Rand, maddr address.Address, cbs block blockMiner: maddr, inv: newInvoker(), rand: r, + Syscalls: DefaultSyscalls(), }, nil } diff --git a/gen/main.go b/gen/main.go index 9356f5a8b..35bee00b4 100644 --- a/gen/main.go +++ b/gen/main.go @@ -96,6 +96,7 @@ func main() { actors.SubmitFallbackPoStParams{}, actors.PaymentVerifyParams{}, actors.UpdatePeerIDParams{}, + actors.DeclareFaultsParams{}, actors.MultiSigActorState{}, actors.MultiSigConstructorParams{}, actors.MultiSigProposeParams{}, diff --git a/storage/cbor_gen.go b/storage/cbor_gen.go index d7a84430c..340479941 100644 --- a/storage/cbor_gen.go +++ b/storage/cbor_gen.go @@ -342,6 +342,18 @@ func (t *SectorInfo) MarshalCBOR(w io.Writer) error { } } + // t.t.FaultReportMsg (cid.Cid) (struct) + + if t.FaultReportMsg == nil { + if _, err := w.Write(cbg.CborNull); err != nil { + return err + } + } else { + if err := cbg.WriteCid(w, *t.FaultReportMsg); err != nil { + return xerrors.Errorf("failed to write cid field t.FaultReportMsg: %w", err) + } + } + // t.t.LastErr (string) (string) if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajTextString, uint64(len(t.LastErr)))); err != nil { return err @@ -574,6 +586,30 @@ func (t *SectorInfo) UnmarshalCBOR(r io.Reader) error { t.CommitMessage = &c } + } + // t.t.FaultReportMsg (cid.Cid) (struct) + + { + + pb, err := br.PeekByte() + if err != nil { + return err + } + if pb == cbg.CborNull[0] { + var nbuf [1]byte + if _, err := br.Read(nbuf[:]); err != nil { + return err + } + } else { + + c, err := cbg.ReadCid(br) + if err != nil { + return xerrors.Errorf("failed to read cid field t.FaultReportMsg: %w", err) + } + + t.FaultReportMsg = &c + } + } // t.t.LastErr (string) (string) diff --git a/storage/sector_states.go b/storage/sector_states.go index 00c070a04..0addd1376 100644 --- a/storage/sector_states.go +++ b/storage/sector_states.go @@ -233,3 +233,57 @@ func (m *Miner) handleCommitWait(ctx context.Context, sector SectorInfo) *sector return sector.upd().to(api.Proving).state(func(info *SectorInfo) { }) } + +func (m *Miner) handleFaulty(ctx context.Context, sector SectorInfo) *sectorUpdate { + // TODO: check if the fault has already been reported, and that this sector is even valid + + // TODO: coalesce faulty sector reporting + bf := types.NewBitField() + bf.Set(sector.SectorID) + + fp := &actors.DeclareFaultsParams{bf} + _ = fp + enc, aerr := actors.SerializeParams(nil) + if aerr != nil { + return sector.upd().to(api.FailedUnrecoverable).error(xerrors.Errorf("failed to serialize declare fault params: %w", aerr)) + } + + msg := &types.Message{ + To: m.maddr, + From: m.worker, + Method: actors.MAMethods.DeclareFaults, + Params: enc, + Value: types.NewInt(0), // TODO: need to ensure sufficient collateral + GasLimit: types.NewInt(1000000 /* i dont know help */), + GasPrice: types.NewInt(1), + } + + smsg, err := m.api.MpoolPushMessage(ctx, msg) + if err != nil { + return sector.upd().to(api.FailedUnrecoverable).error(xerrors.Errorf("failed to push declare faults message to network: %w", err)) + } + + return sector.upd().to(api.FaultReported).state(func(info *SectorInfo) { + c := smsg.Cid() + info.FaultReportMsg = &c + }) +} + +func (m *Miner) handleFaultReported(ctx context.Context, sector SectorInfo) *sectorUpdate { + if sector.FaultReportMsg == nil { + return sector.upd().to(api.FailedUnrecoverable).error(xerrors.Errorf("entered fault reported state without a FaultReportMsg cid")) + } + + mw, err := m.api.StateWaitMsg(ctx, *sector.FaultReportMsg) + if err != nil { + return sector.upd().to(api.CommitFailed).error(xerrors.Errorf("failed to wait for fault declaration: %w", err)) + } + + if mw.Receipt.ExitCode != 0 { + log.Errorf("UNHANDLED: declaring sector fault failed (exit=%d, msg=%s) (id: %d)", mw.Receipt.ExitCode, *sector.FaultReportMsg, sector.SectorID) + return sector.upd().fatal(xerrors.Errorf("UNHANDLED: submitting fault declaration failed (exit %d)", mw.Receipt.ExitCode)) + } + + return sector.upd().to(api.FaultedFinal).state(func(info *SectorInfo) {}) + +} diff --git a/storage/sector_types.go b/storage/sector_types.go index dccc885dc..11a3120a7 100644 --- a/storage/sector_types.go +++ b/storage/sector_types.go @@ -71,6 +71,9 @@ type SectorInfo struct { // Committing CommitMessage *cid.Cid + // Faults + FaultReportMsg *cid.Cid + // Debug LastErr string } diff --git a/storage/sectors.go b/storage/sectors.go index 251c10331..973869418 100644 --- a/storage/sectors.go +++ b/storage/sectors.go @@ -255,6 +255,12 @@ func (m *Miner) onSectorUpdated(ctx context.Context, update sectorUpdate) { case api.CommitFailed: log.Warn("sector %d entered unimplemented state 'CommitFailed'", update.id) + // Faults + case api.Faulty: + m.handleSectorUpdate(ctx, sector, m.handleFaulty) + case api.FaultReported: + m.handleSectorUpdate(ctx, sector, m.handleFaultReported) + // Fatal errors case api.UndefinedSectorState: log.Error("sector update with undefined state!") From 2f2a5824b327c7425eea9f63795dd3ca5f8b42e6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Mon, 9 Dec 2019 21:19:46 +0100 Subject: [PATCH 2/2] storage: Use fatal instead of .to.error --- storage/sector_states.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/storage/sector_states.go b/storage/sector_states.go index 0addd1376..ffad0119e 100644 --- a/storage/sector_states.go +++ b/storage/sector_states.go @@ -245,7 +245,7 @@ func (m *Miner) handleFaulty(ctx context.Context, sector SectorInfo) *sectorUpda _ = fp enc, aerr := actors.SerializeParams(nil) if aerr != nil { - return sector.upd().to(api.FailedUnrecoverable).error(xerrors.Errorf("failed to serialize declare fault params: %w", aerr)) + return sector.upd().fatal(xerrors.Errorf("failed to serialize declare fault params: %w", aerr)) } msg := &types.Message{