From 45288b8810eba42d3228ce97d01caf3a87515c38 Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Wed, 27 Nov 2019 21:36:34 -0600 Subject: [PATCH 1/5] WIP: uncomment out windowed post code, try to make it work --- chain/actors/actor_miner.go | 3 +- lib/sectorbuilder/sectorbuilder.go | 26 +++++++++++--- storage/post.go | 57 ++++++++++++++---------------- 3 files changed, 50 insertions(+), 36 deletions(-) diff --git a/chain/actors/actor_miner.go b/chain/actors/actor_miner.go index 6661f7bc0..f358fafc2 100644 --- a/chain/actors/actor_miner.go +++ b/chain/actors/actor_miner.go @@ -5,6 +5,7 @@ import ( "context" "encoding/binary" "fmt" + ffi "github.com/filecoin-project/filecoin-ffi" "github.com/filecoin-project/lotus/build" @@ -389,7 +390,7 @@ func (sma StorageMinerActor) ProveCommitSector(act *types.Actor, vmctx types.VMC } type SubmitPoStParams struct { - Proof types.EPostProof + Proof []byte } func ProvingPeriodEnd(setPeriodEnd, height uint64) (uint64, uint64) { diff --git a/lib/sectorbuilder/sectorbuilder.go b/lib/sectorbuilder/sectorbuilder.go index fae3db3b9..53f2a672c 100644 --- a/lib/sectorbuilder/sectorbuilder.go +++ b/lib/sectorbuilder/sectorbuilder.go @@ -299,7 +299,7 @@ func (sb *SectorBuilder) GenerateEPostCandidates(sectorInfo SortedPublicSectorIn return nil, err } - challengeCount := challangeCount(uint64(len(sectorInfo.Values()))) + challengeCount := challengeCount(uint64(len(sectorInfo.Values()))) proverID := addressToProverID(sb.Miner) return sectorbuilder.GenerateCandidates(sb.ssize, proverID, challengeSeed, challengeCount, privsectors) @@ -328,8 +328,24 @@ func (sb *SectorBuilder) pubSectorToPriv(sectorInfo SortedPublicSectorInfo) (Sor return NewSortedPrivateSectorInfo(out), nil } -func (sb *SectorBuilder) GenerateFallbackPoSt(sectorInfo SortedPrivateSectorInfo, challengeSeed [CommLen]byte, faults []uint64) ([]byte, error) { - panic("NYI") +func (sb *SectorBuilder) GenerateFallbackPoSt(sectorInfo SortedPublicSectorInfo, challengeSeed [CommLen]byte, faults []uint64) ([]byte, error) { + privsectors, err := sb.pubSectorToPriv(sectorInfo) + if err != nil { + return nil, err + } + + challengeCount := challengeCount(uint64(len(sectorInfo.Values()))) + if challengeCount > 10 { + challengeCount = 10 + } + + proverID := addressToProverID(sb.Miner) + candidates, err := sectorbuilder.GenerateCandidates(sb.ssize, proverID, challengeSeed, challengeCount, privsectors) + if err != nil { + return nil, err + } + + return sectorbuilder.GeneratePoSt(sb.ssize, proverID, privsectors, challengeSeed, candidates) } var UserBytesForSectorSize = sectorbuilder.GetMaxUserBytesPerStagedSector @@ -357,7 +373,7 @@ func VerifyPost(ctx context.Context, sectorSize uint64, sectorInfo SortedPublicS var challengeSeeda [CommLen]byte copy(challengeSeeda[:], challengeSeed) - challengeCount := challangeCount(uint64(len(sectorInfo.Values()))) + challengeCount := challengeCount(uint64(len(sectorInfo.Values()))) _, span := trace.StartSpan(ctx, "VerifyPoSt") defer span.End() @@ -383,7 +399,7 @@ func GenerateDataCommitment(ssize uint64, pieces []PublicPieceInfo) ([CommLen]by return sectorbuilder.GenerateDataCommitment(ssize, pieces) } -func challangeCount(sectors uint64) uint64 { +func challengeCount(sectors uint64) uint64 { // ceil(sectors / build.SectorChallengeRatioDiv) return (sectors + build.SectorChallengeRatioDiv - 1) / build.SectorChallengeRatioDiv } diff --git a/storage/post.go b/storage/post.go index 22163dfc8..541d23b25 100644 --- a/storage/post.go +++ b/storage/post.go @@ -2,9 +2,10 @@ package storage import ( "context" - ffi "github.com/filecoin-project/filecoin-ffi" "time" + ffi "github.com/filecoin-project/filecoin-ffi" + "go.opencensus.io/trace" "golang.org/x/xerrors" @@ -170,20 +171,20 @@ func (p *post) preparePost(ctx context.Context) error { return nil } -func (p *post) sortedSectorInfo() sectorbuilder.SortedPrivateSectorInfo { +func (p *post) sortedSectorInfo() sectorbuilder.SortedPublicSectorInfo { panic("NYI") - sbsi := make([]ffi.PrivateSectorInfo, len(p.sset)) + sbsi := make([]ffi.PublicSectorInfo, len(p.sset)) for k, sector := range p.sset { var commR [sectorbuilder.CommLen]byte copy(commR[:], sector.CommR) - sbsi[k] = ffi.PrivateSectorInfo{ + sbsi[k] = ffi.PublicSectorInfo{ SectorID: sector.SectorID, CommR: commR, } } - return sectorbuilder.NewSortedPrivateSectorInfo(sbsi) + return sectorbuilder.NewSortedPublicSectorInfo(sbsi) } func (p *post) runPost(ctx context.Context) error { @@ -216,35 +217,31 @@ func (p *post) commitPost(ctx context.Context) (err error) { ctx, span := trace.StartSpan(ctx, "storage.commitPost") defer span.End() - panic("NYI") - /* + params := &actors.SubmitPoStParams{ + Proof: p.proof, + } - params := &actors.SubmitPoStParams{ - //Proof: p.proof, - } + enc, aerr := actors.SerializeParams(params) + if aerr != nil { + return xerrors.Errorf("could not serialize submit post parameters: %w", aerr) + } - enc, aerr := actors.SerializeParams(params) - if aerr != nil { - return xerrors.Errorf("could not serialize submit post parameters: %w", aerr) - } + msg := &types.Message{ + To: p.m.maddr, + From: p.m.worker, + Method: actors.MAMethods.SubmitPoSt, + Params: enc, + Value: types.NewInt(1000), // currently hard-coded late fee in actor, returned if not late + GasLimit: types.NewInt(1000000), // i dont know help + GasPrice: types.NewInt(1), + } - msg := &types.Message{ - To: p.m.maddr, - From: p.m.worker, - Method: actors.MAMethods.SubmitPoSt, - Params: enc, - Value: types.NewInt(1000), // currently hard-coded late fee in actor, returned if not late - GasLimit: types.NewInt(1000000), // i dont know help - GasPrice: types.NewInt(1), - } + log.Info("mpush") - log.Info("mpush") - - p.smsg, err = p.m.api.MpoolPushMessage(ctx, msg) - if err != nil { - return xerrors.Errorf("pushing message to mpool: %w", err) - } - */ + p.smsg, err = p.m.api.MpoolPushMessage(ctx, msg) + if err != nil { + return xerrors.Errorf("pushing message to mpool: %w", err) + } return nil } From 7afc0d4dbea6e31b929e0593bc514ed61415e5d3 Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Wed, 27 Nov 2019 21:38:00 -0600 Subject: [PATCH 2/5] dedupe some code --- chain/actors/actor_miner.go | 39 +------------------------------------ 1 file changed, 1 insertion(+), 38 deletions(-) diff --git a/chain/actors/actor_miner.go b/chain/actors/actor_miner.go index f358fafc2..c6ab993e3 100644 --- a/chain/actors/actor_miner.go +++ b/chain/actors/actor_miner.go @@ -485,11 +485,6 @@ func (sma StorageMinerActor) SubmitFallbackPoSt(act *types.Actor, vmctx types.VM _ = seed //VerifyPoStRandomness() - convertToCandidates := func(wins []types.EPostTicket) []sectorbuilder.EPostCandidate { - panic("NYI") - } - winners := convertToCandidates(params.Proof.Winners) - proverID := vmctx.Message().To // TODO: normalize to ID address if ok, lerr := sectorbuilder.VerifyPost(vmctx.Context(), mi.SectorSize, @@ -504,42 +499,10 @@ func (sma StorageMinerActor) SubmitFallbackPoSt(act *types.Actor, vmctx types.VM } // Post submission is successful! - self.CurrentFaultSet = self.NextFaultSet - self.NextFaultSet = types.NewBitField() - - oldPower := self.Power - self.Power = types.BigMul(types.NewInt(pss.Count-uint64(len(faults))), - types.NewInt(mi.SectorSize)) - - delta := types.BigSub(self.Power, oldPower) - if self.SlashedAt != 0 { - self.SlashedAt = 0 - delta = self.Power - } - - prevSlashingDeadline := self.ElectionPeriodStart + build.SlashablePowerDelay - if !self.Active { - self.Active = true - prevSlashingDeadline = 0 - } - - enc, err := SerializeParams(&UpdateStorageParams{ - Delta: delta, - NextProvingPeriodEnd: vmctx.BlockHeight() + build.SlashablePowerDelay, - PreviousProvingPeriodEnd: prevSlashingDeadline, - }) - if err != nil { + if err := onSuccessfulPoSt(self, vmctx); err != nil { return nil, err } - _, err = vmctx.Send(StoragePowerAddress, SPAMethods.UpdateStorage, types.NewInt(0), enc) - if err != nil { - return nil, err - } - - self.ProvingSet = self.Sectors - self.ElectionPeriodStart = vmctx.BlockHeight() - c, err := vmctx.Storage().Put(self) if err != nil { return nil, err From aefd432422e85c43cc182b4bc9a399d549b78629 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 28 Nov 2019 13:46:56 +0100 Subject: [PATCH 3/5] actors: Fallback post progress --- build/params_shared.go | 41 ++++++++++++++----------- chain/actors/actor_miner.go | 31 ++++++++++--------- chain/gen/gen.go | 2 +- chain/sync.go | 6 ++-- chain/types/blockheader.go | 6 ++-- chain/types/cbor_gen.go | 14 ++++----- chain/vm/vm_test.go | 2 +- cmd/lotus-bench/main.go | 2 +- lib/sectorbuilder/sectorbuilder.go | 38 +++++++++++++++++------ lib/sectorbuilder/sectorbuilder_test.go | 2 +- 10 files changed, 84 insertions(+), 60 deletions(-) diff --git a/build/params_shared.go b/build/params_shared.go index 44b66248d..c101d8db9 100644 --- a/build/params_shared.go +++ b/build/params_shared.go @@ -31,7 +31,7 @@ func SupportedSectorSize(ssize uint64) bool { // ///// // Payments -// Blocks +// Epochs const PaymentChannelClosingDelay = 6 * 60 * 2 // six hours // ///// @@ -40,13 +40,13 @@ const PaymentChannelClosingDelay = 6 * 60 * 2 // six hours // Seconds const AllowableClockDrift = BlockDelay * 2 -// Blocks +// Epochs const ForkLengthThreshold = Finality // Blocks (e) const BlocksPerEpoch = 5 -// Blocks +// Epochs const Finality = 500 // constants for Weight calculation @@ -57,40 +57,45 @@ const WRatioDen = 2 // ///// // Proofs -// PoStChallangeTime sets the window in which post computation should happen -// Blocks -const PoStChallangeTime = ProvingPeriodDuration - 6 - // PoStRandomnessLookback is additional randomness lookback for PoSt computation // To compute randomness epoch in a given proving period: // RandH = PPE - PoStChallangeTime - PoStRandomnessLookback // -// Blocks +// Epochs const PoStRandomnessLookback = 1 -// Blocks +// Epochs const SealRandomnessLookback = Finality -// Blocks +// Epochs const SealRandomnessLookbackLimit = SealRandomnessLookback + 2000 // 1 / n const SectorChallengeRatioDiv = 25 +const MaxFallbackPostChallengeCount = 10 + +// FallbackPoStBegin is the number of epochs the miner needs to wait after +// ElectionPeriodStart before starting fallback post computation +// +// Epochs +const FallbackPoStBegin = 1000 + +// SlashablePowerDelay is the number of epochs +// Epochs +const SlashablePowerDelay = 2000 + // ///// // Mining -// Blocks +// Epochs const EcRandomnessLookback = 300 -const FallbackPoStBegin = 1000 -const SlashablePowerDelay = 2000 - const PowerCollateralProportion = 5 const PerCapitaCollateralProportion = 1 const CollateralPrecision = 1000 -// Blocks +// Epochs const InteractivePoRepDelay = 10 // ///// @@ -106,8 +111,8 @@ var InitialReward *big.Int const FilecoinPrecision = 1_000_000_000_000_000_000 // six years -// Blocks -const HalvingPeriodBlocks = 6 * 365 * 24 * 60 * 2 +// Epochs +const HalvingPeriodEpochs = 6 * 365 * 24 * 60 * 2 // TODO: Move other important consts here @@ -125,6 +130,6 @@ func init() { // Sync const BadBlockCacheSize = 1 << 15 -// assuming 4000 blocks per round, this lets us not lose any messages across a +// assuming 4000 messages per round, this lets us not lose any messages across a // 10 block reorg. const BlsSignatureCacheSize = 40000 diff --git a/chain/actors/actor_miner.go b/chain/actors/actor_miner.go index c6ab993e3..b1fe46fb7 100644 --- a/chain/actors/actor_miner.go +++ b/chain/actors/actor_miner.go @@ -366,7 +366,7 @@ func (sma StorageMinerActor) ProveCommitSector(act *types.Actor, vmctx types.VMC if pss.Count == 0 { self.ProvingSet = self.Sectors // TODO: probably want to wait until the miner is above a certain - // threshold before starting this + // threshold before starting this self.ElectionPeriodStart = vmctx.BlockHeight() } @@ -390,15 +390,8 @@ func (sma StorageMinerActor) ProveCommitSector(act *types.Actor, vmctx types.VMC } type SubmitPoStParams struct { - Proof []byte -} - -func ProvingPeriodEnd(setPeriodEnd, height uint64) (uint64, uint64) { - offset := setPeriodEnd % build.ProvingPeriodDuration - period := ((height - offset - 1) / build.ProvingPeriodDuration) + 1 - end := (period * build.ProvingPeriodDuration) + offset - - return end, period + Proof []byte + Candidates []types.EPostTicket } func (sma StorageMinerActor) SubmitFallbackPoSt(act *types.Actor, vmctx types.VMContext, params *SubmitPoStParams) ([]byte, ActorError) { @@ -482,13 +475,21 @@ func (sma StorageMinerActor) SubmitFallbackPoSt(act *types.Actor, vmctx types.VM faults := self.CurrentFaultSet.All() _ = faults - _ = seed - //VerifyPoStRandomness() - proverID := vmctx.Message().To // TODO: normalize to ID address - if ok, lerr := sectorbuilder.VerifyPost(vmctx.Context(), mi.SectorSize, - sectorbuilder.NewSortedPublicSectorInfo(sectorInfos), params.Proof.PostRand, params.Proof.Proof, winners, proverID); !ok || lerr != nil { + var candidates []sectorbuilder.EPostCandidate + for _, t := range params.Candidates { + var partial [32]byte + copy(partial[:], t.Partial) + candidates = append(candidates, sectorbuilder.EPostCandidate{ + PartialTicket: partial, + SectorID: t.SectorID, + SectorChallengeIndex: t.ChallengeIndex, + }) + } + + if ok, lerr := sectorbuilder.VerifyFallbackPost(vmctx.Context(), mi.SectorSize, + sectorbuilder.NewSortedPublicSectorInfo(sectorInfos), seed[:], params.Proof, candidates, proverID); !ok || lerr != nil { if lerr != nil { // TODO: study PoST errors return nil, aerrors.Absorb(lerr, 4, "PoST error") diff --git a/chain/gen/gen.go b/chain/gen/gen.go index 5121bdc7b..9189f76c6 100644 --- a/chain/gen/gen.go +++ b/chain/gen/gen.go @@ -506,7 +506,7 @@ func IsRoundWinner(ctx context.Context, ts *types.TipSet, round int64, miner add PostRand: vrfout, } for _, win := range winners { - ept.Winners = append(ept.Winners, types.EPostTicket{ + ept.Candidates = append(ept.Candidates, types.EPostTicket{ Partial: win.PartialTicket[:], SectorID: win.SectorID, ChallengeIndex: win.SectorChallengeIndex, diff --git a/chain/sync.go b/chain/sync.go index 4b299f504..abedd2ff0 100644 --- a/chain/sync.go +++ b/chain/sync.go @@ -555,7 +555,7 @@ func (syncer *Syncer) ValidateBlock(ctx context.Context, b *types.FullBlock) err return xerrors.Errorf("failed to get sector size for block miner: %w", err) } - for _, t := range h.EPostProof.Winners { + for _, t := range h.EPostProof.Candidates { if !types.IsTicketWinner(t.Partial, ssize, tpow, 1) { return xerrors.Errorf("miner created a block but was not a winner") } @@ -667,7 +667,7 @@ func (syncer *Syncer) VerifyElectionPoStProof(ctx context.Context, h *types.Bloc } var winners []sectorbuilder.EPostCandidate - for _, t := range h.EPostProof.Winners { + for _, t := range h.EPostProof.Candidates { var partial [32]byte copy(partial[:], t.Partial) winners = append(winners, sectorbuilder.EPostCandidate{ @@ -689,7 +689,7 @@ func (syncer *Syncer) VerifyElectionPoStProof(ctx context.Context, h *types.Bloc return xerrors.Errorf("[TESTING] election post was invalid") } hvrf := sha256.Sum256(h.EPostProof.PostRand) - ok, err := sectorbuilder.VerifyPost(ctx, ssize, *sectorInfo, hvrf[:], h.EPostProof.Proof, winners, h.Miner) + ok, err := sectorbuilder.VerifyElectionPost(ctx, ssize, *sectorInfo, hvrf[:], h.EPostProof.Proof, winners, h.Miner) if err != nil { return xerrors.Errorf("failed to verify election post: %w", err) } diff --git a/chain/types/blockheader.go b/chain/types/blockheader.go index 9cac4fd42..79585f1d2 100644 --- a/chain/types/blockheader.go +++ b/chain/types/blockheader.go @@ -27,9 +27,9 @@ type EPostTicket struct { } type EPostProof struct { - Proof []byte - PostRand []byte - Winners []EPostTicket + Proof []byte + PostRand []byte + Candidates []EPostTicket } type BlockHeader struct { diff --git a/chain/types/cbor_gen.go b/chain/types/cbor_gen.go index aa821245c..d2b60f268 100644 --- a/chain/types/cbor_gen.go +++ b/chain/types/cbor_gen.go @@ -349,11 +349,11 @@ func (t *EPostProof) MarshalCBOR(w io.Writer) error { return err } - // t.t.Winners ([]types.EPostTicket) (slice) - if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajArray, uint64(len(t.Winners)))); err != nil { + // t.t.Candidates ([]types.EPostTicket) (slice) + if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajArray, uint64(len(t.Candidates)))); err != nil { return err } - for _, v := range t.Winners { + for _, v := range t.Candidates { if err := v.MarshalCBOR(w); err != nil { return err } @@ -410,21 +410,21 @@ func (t *EPostProof) UnmarshalCBOR(r io.Reader) error { if _, err := io.ReadFull(br, t.PostRand); err != nil { return err } - // t.t.Winners ([]types.EPostTicket) (slice) + // t.t.Candidates ([]types.EPostTicket) (slice) maj, extra, err = cbg.CborReadHeader(br) if err != nil { return err } if extra > 8192 { - return fmt.Errorf("t.Winners: array too large (%d)", extra) + return fmt.Errorf("t.Candidates: array too large (%d)", extra) } if maj != cbg.MajArray { return fmt.Errorf("expected cbor array") } if extra > 0 { - t.Winners = make([]EPostTicket, extra) + t.Candidates = make([]EPostTicket, extra) } for i := 0; i < int(extra); i++ { @@ -433,7 +433,7 @@ func (t *EPostProof) UnmarshalCBOR(r io.Reader) error { return err } - t.Winners[i] = v + t.Candidates[i] = v } return nil diff --git a/chain/vm/vm_test.go b/chain/vm/vm_test.go index 7e4dd3e12..862bb4acf 100644 --- a/chain/vm/vm_test.go +++ b/chain/vm/vm_test.go @@ -12,7 +12,7 @@ import ( func TestBlockReward(t *testing.T) { coffer := types.FromFil(build.MiningRewardTotal).Int sum := new(big.Int) - N := build.HalvingPeriodBlocks + N := build.HalvingPeriodEpochs for i := 0; i < N; i++ { a := MiningReward(types.BigInt{coffer}) sum = sum.Add(sum, a.Int) diff --git a/cmd/lotus-bench/main.go b/cmd/lotus-bench/main.go index 03aa60658..cbdaa3048 100644 --- a/cmd/lotus-bench/main.go +++ b/cmd/lotus-bench/main.go @@ -192,7 +192,7 @@ func main() { epost := time.Now() - ok, err := sectorbuilder.VerifyPost(context.TODO(), sectorSize, sinfos, challenge[:], proof, candidates[:1], maddr) + ok, err := sectorbuilder.VerifyElectionPost(context.TODO(), sectorSize, sinfos, challenge[:], proof, candidates[:1], maddr) if err != nil { return err } diff --git a/lib/sectorbuilder/sectorbuilder.go b/lib/sectorbuilder/sectorbuilder.go index 53f2a672c..156bf4389 100644 --- a/lib/sectorbuilder/sectorbuilder.go +++ b/lib/sectorbuilder/sectorbuilder.go @@ -299,7 +299,7 @@ func (sb *SectorBuilder) GenerateEPostCandidates(sectorInfo SortedPublicSectorIn return nil, err } - challengeCount := challengeCount(uint64(len(sectorInfo.Values()))) + challengeCount := electionPostChallengeCount(uint64(len(sectorInfo.Values()))) proverID := addressToProverID(sb.Miner) return sectorbuilder.GenerateCandidates(sb.ssize, proverID, challengeSeed, challengeCount, privsectors) @@ -334,10 +334,7 @@ func (sb *SectorBuilder) GenerateFallbackPoSt(sectorInfo SortedPublicSectorInfo, return nil, err } - challengeCount := challengeCount(uint64(len(sectorInfo.Values()))) - if challengeCount > 10 { - challengeCount = 10 - } + challengeCount := fallbackPostChallengeCount(uint64(len(sectorInfo.Values()))) proverID := addressToProverID(sb.Miner) candidates, err := sectorbuilder.GenerateCandidates(sb.ssize, proverID, challengeSeed, challengeCount, privsectors) @@ -369,16 +366,29 @@ func NewSortedPublicSectorInfo(sectors []sectorbuilder.PublicSectorInfo) SortedP return sectorbuilder.NewSortedPublicSectorInfo(sectors...) } -func VerifyPost(ctx context.Context, sectorSize uint64, sectorInfo SortedPublicSectorInfo, challengeSeed []byte, proof []byte, winners []EPostCandidate, proverID address.Address) (bool, error) { +func VerifyElectionPost(ctx context.Context, sectorSize uint64, sectorInfo SortedPublicSectorInfo, challengeSeed []byte, proof []byte, candidates []EPostCandidate, proverID address.Address) (bool, error) { + challengeCount := electionPostChallengeCount(uint64(len(sectorInfo.Values()))) + return verifyPost(ctx, sectorSize, sectorInfo, challengeCount, challengeSeed, proof, candidates, proverID) +} + +func VerifyFallbackPost(ctx context.Context, sectorSize uint64, sectorInfo SortedPublicSectorInfo, challengeSeed []byte, proof []byte, candidates []EPostCandidate, proverID address.Address) (bool, error) { + challengeCount := fallbackPostChallengeCount(uint64(len(sectorInfo.Values()))) + return verifyPost(ctx, sectorSize, sectorInfo, challengeCount, challengeSeed, proof, candidates, proverID) +} + +func verifyPost(ctx context.Context, sectorSize uint64, sectorInfo SortedPublicSectorInfo, challengeCount uint64, challengeSeed []byte, proof []byte, candidates []EPostCandidate, proverID address.Address) (bool, error) { + if challengeCount != uint64(len(candidates)) { + log.Warnf("verifyPost with wrong candidate count: expected %d, got %d", challengeCount, len(candidates)) + return false, nil // user input, dont't error + } + var challengeSeeda [CommLen]byte copy(challengeSeeda[:], challengeSeed) - challengeCount := challengeCount(uint64(len(sectorInfo.Values()))) - _, span := trace.StartSpan(ctx, "VerifyPoSt") defer span.End() prover := addressToProverID(proverID) - return sectorbuilder.VerifyPoSt(sectorSize, sectorInfo, challengeSeeda, challengeCount, proof, winners, prover) + return sectorbuilder.VerifyPoSt(sectorSize, sectorInfo, challengeSeeda, challengeCount, proof, candidates, prover) } func GeneratePieceCommitment(piece io.Reader, pieceSize uint64) (commP [CommLen]byte, err error) { @@ -399,7 +409,15 @@ func GenerateDataCommitment(ssize uint64, pieces []PublicPieceInfo) ([CommLen]by return sectorbuilder.GenerateDataCommitment(ssize, pieces) } -func challengeCount(sectors uint64) uint64 { +func electionPostChallengeCount(sectors uint64) uint64 { // ceil(sectors / build.SectorChallengeRatioDiv) return (sectors + build.SectorChallengeRatioDiv - 1) / build.SectorChallengeRatioDiv } + +func fallbackPostChallengeCount(sectors uint64) uint64 { + challengeCount := electionPostChallengeCount(sectors) + if challengeCount > build.MaxFallbackPostChallengeCount { + return build.MaxFallbackPostChallengeCount + } + return challengeCount +} diff --git a/lib/sectorbuilder/sectorbuilder_test.go b/lib/sectorbuilder/sectorbuilder_test.go index b55b6c5bf..17398d482 100644 --- a/lib/sectorbuilder/sectorbuilder_test.go +++ b/lib/sectorbuilder/sectorbuilder_test.go @@ -102,7 +102,7 @@ func (s *seal) post(t *testing.T, sb *sectorbuilder.SectorBuilder) time.Time { t.Fatalf("%+v", err) } - ok, err := sectorbuilder.VerifyPost(context.TODO(), sb.SectorSize(), ssi, cSeed[:], postProof, candndates, sb.Miner) + ok, err := sectorbuilder.VerifyElectionPost(context.TODO(), sb.SectorSize(), ssi, cSeed[:], postProof, candndates, sb.Miner) if err != nil { t.Fatalf("%+v", err) } From 920fd3ba9bdbdc1ad8057ad138d8583c27cc5311 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 28 Nov 2019 18:44:49 +0100 Subject: [PATCH 4/5] storageminer: New fallback post scheduler --- api/api_full.go | 2 +- api/struct.go | 2 +- build/params_shared.go | 4 +- chain/actors/actor_miner.go | 6 +- chain/actors/cbor_gen.go | 11 +- chain/stmgr/utils.go | 5 +- cmd/lotus-storage-miner/info.go | 2 +- gen/main.go | 2 +- lib/sectorbuilder/sectorbuilder.go | 9 +- node/impl/full/state.go | 4 +- storage/fpost_run.go | 138 +++++++++++++ storage/fpost_sched.go | 141 ++++++++++++++ storage/miner.go | 11 +- storage/post.go | 298 ----------------------------- storage/sector_states.go | 2 - 15 files changed, 308 insertions(+), 329 deletions(-) create mode 100644 storage/fpost_run.go create mode 100644 storage/fpost_sched.go delete mode 100644 storage/post.go diff --git a/api/api_full.go b/api/api_full.go index fc23cf43b..2e0b81d1a 100644 --- a/api/api_full.go +++ b/api/api_full.go @@ -99,7 +99,7 @@ type FullNode interface { StateMinerPower(context.Context, address.Address, *types.TipSet) (MinerPower, error) StateMinerWorker(context.Context, address.Address, *types.TipSet) (address.Address, error) StateMinerPeerID(ctx context.Context, m address.Address, ts *types.TipSet) (peer.ID, error) - StateMinerProvingPeriodEnd(ctx context.Context, actor address.Address, ts *types.TipSet) (uint64, error) + StateMinerElectionPeriodStart(ctx context.Context, actor address.Address, ts *types.TipSet) (uint64, error) StateMinerSectorSize(context.Context, address.Address, *types.TipSet) (uint64, error) StatePledgeCollateral(context.Context, *types.TipSet) (types.BigInt, error) StateWaitMsg(context.Context, cid.Cid) (*MsgWait, error) diff --git a/api/struct.go b/api/struct.go index 1356bff2f..e8f27680f 100644 --- a/api/struct.go +++ b/api/struct.go @@ -362,7 +362,7 @@ func (c *FullNodeStruct) StateMinerPeerID(ctx context.Context, m address.Address return c.Internal.StateMinerPeerID(ctx, m, ts) } -func (c *FullNodeStruct) StateMinerProvingPeriodEnd(ctx context.Context, actor address.Address, ts *types.TipSet) (uint64, error) { +func (c *FullNodeStruct) StateMinerElectionPeriodStart(ctx context.Context, actor address.Address, ts *types.TipSet) (uint64, error) { return c.Internal.StateMinerProvingPeriodEnd(ctx, actor, ts) } diff --git a/build/params_shared.go b/build/params_shared.go index c101d8db9..e482ca9eb 100644 --- a/build/params_shared.go +++ b/build/params_shared.go @@ -75,11 +75,11 @@ const SectorChallengeRatioDiv = 25 const MaxFallbackPostChallengeCount = 10 -// FallbackPoStBegin is the number of epochs the miner needs to wait after +// FallbackPoStDelay is the number of epochs the miner needs to wait after // ElectionPeriodStart before starting fallback post computation // // Epochs -const FallbackPoStBegin = 1000 +const FallbackPoStDelay = 1000 // SlashablePowerDelay is the number of epochs // Epochs diff --git a/chain/actors/actor_miner.go b/chain/actors/actor_miner.go index b1fe46fb7..1de5ae2f1 100644 --- a/chain/actors/actor_miner.go +++ b/chain/actors/actor_miner.go @@ -389,12 +389,12 @@ func (sma StorageMinerActor) ProveCommitSector(act *types.Actor, vmctx types.VMC return nil, err } -type SubmitPoStParams struct { +type SubmitFallbackPoStParams struct { Proof []byte Candidates []types.EPostTicket } -func (sma StorageMinerActor) SubmitFallbackPoSt(act *types.Actor, vmctx types.VMContext, params *SubmitPoStParams) ([]byte, ActorError) { +func (sma StorageMinerActor) SubmitFallbackPoSt(act *types.Actor, vmctx types.VMContext, params *SubmitFallbackPoStParams) ([]byte, ActorError) { oldstate, self, err := loadState(vmctx) if err != nil { return nil, err @@ -427,7 +427,7 @@ func (sma StorageMinerActor) SubmitFallbackPoSt(act *types.Actor, vmctx types.VM var seed [sectorbuilder.CommLen]byte { - randHeight := self.ElectionPeriodStart + build.FallbackPoStBegin + randHeight := self.ElectionPeriodStart + build.FallbackPoStDelay if vmctx.BlockHeight() <= randHeight { // TODO: spec, retcode return nil, aerrors.Newf(1, "submit fallback PoSt called too early (%d < %d)", vmctx.BlockHeight(), randHeight) diff --git a/chain/actors/cbor_gen.go b/chain/actors/cbor_gen.go index 8a8688208..1666adeb2 100644 --- a/chain/actors/cbor_gen.go +++ b/chain/actors/cbor_gen.go @@ -813,7 +813,7 @@ func (t *MinerInfo) UnmarshalCBOR(r io.Reader) error { return nil } -func (t *SubmitPoStParams) MarshalCBOR(w io.Writer) error { +func (t *SubmitFallbackPoStParams) MarshalCBOR(w io.Writer) error { if t == nil { _, err := w.Write(cbg.CborNull) return err @@ -823,13 +823,10 @@ func (t *SubmitPoStParams) MarshalCBOR(w io.Writer) error { } // t.t.Proof (types.EPostProof) (struct) - if err := t.Proof.MarshalCBOR(w); err != nil { - return err - } return nil } -func (t *SubmitPoStParams) UnmarshalCBOR(r io.Reader) error { +func (t *SubmitFallbackPoStParams) UnmarshalCBOR(r io.Reader) error { br := cbg.GetPeeker(r) maj, extra, err := cbg.CborReadHeader(br) @@ -848,10 +845,6 @@ func (t *SubmitPoStParams) UnmarshalCBOR(r io.Reader) error { { - if err := t.Proof.UnmarshalCBOR(br); err != nil { - return err - } - } return nil } diff --git a/chain/stmgr/utils.go b/chain/stmgr/utils.go index 1a1a46ad0..a8c5378c1 100644 --- a/chain/stmgr/utils.go +++ b/chain/stmgr/utils.go @@ -149,15 +149,14 @@ func GetMinerWorker(ctx context.Context, sm *StateManager, ts *types.TipSet, mad return address.NewFromBytes(recp.Return) } -func GetMinerProvingPeriodEnd(ctx context.Context, sm *StateManager, ts *types.TipSet, maddr address.Address) (uint64, error) { +func GetMinerElectionPeriodStart(ctx context.Context, sm *StateManager, ts *types.TipSet, maddr address.Address) (uint64, error) { var mas actors.StorageMinerActorState _, err := sm.LoadActorState(ctx, maddr, &mas, ts) if err != nil { return 0, xerrors.Errorf("failed to load miner actor state: %w", err) } - panic("idk what to do") - //return mas.ProvingPeriodEnd, nil + return mas.ElectionPeriodStart, nil } func GetMinerProvingSet(ctx context.Context, sm *StateManager, ts *types.TipSet, maddr address.Address) ([]*api.ChainSectorInfo, error) { diff --git a/cmd/lotus-storage-miner/info.go b/cmd/lotus-storage-miner/info.go index b1df59cb3..900a7c611 100644 --- a/cmd/lotus-storage-miner/info.go +++ b/cmd/lotus-storage-miner/info.go @@ -60,7 +60,7 @@ var infoCmd = &cli.Command{ } fmt.Printf("Worker use: %d / %d (+%d)\n", wstat.Total-wstat.Reserved-wstat.Free, wstat.Total, wstat.Reserved) - ppe, err := api.StateMinerProvingPeriodEnd(ctx, maddr, nil) + ppe, err := api.StateMinerElectionPeriodStart(ctx, maddr, nil) if err != nil { return err } diff --git a/gen/main.go b/gen/main.go index 48668d6da..4aad7622c 100644 --- a/gen/main.go +++ b/gen/main.go @@ -93,7 +93,7 @@ func main() { actors.SectorPreCommitInfo{}, actors.PreCommittedSector{}, actors.MinerInfo{}, - actors.SubmitPoStParams{}, + actors.SubmitFallbackPoStParams{}, actors.PaymentVerifyParams{}, actors.UpdatePeerIDParams{}, actors.MultiSigActorState{}, diff --git a/lib/sectorbuilder/sectorbuilder.go b/lib/sectorbuilder/sectorbuilder.go index 156bf4389..f953dea14 100644 --- a/lib/sectorbuilder/sectorbuilder.go +++ b/lib/sectorbuilder/sectorbuilder.go @@ -328,10 +328,10 @@ func (sb *SectorBuilder) pubSectorToPriv(sectorInfo SortedPublicSectorInfo) (Sor return NewSortedPrivateSectorInfo(out), nil } -func (sb *SectorBuilder) GenerateFallbackPoSt(sectorInfo SortedPublicSectorInfo, challengeSeed [CommLen]byte, faults []uint64) ([]byte, error) { +func (sb *SectorBuilder) GenerateFallbackPoSt(sectorInfo SortedPublicSectorInfo, challengeSeed [CommLen]byte, faults []uint64) ([]EPostCandidate, []byte, error) { privsectors, err := sb.pubSectorToPriv(sectorInfo) if err != nil { - return nil, err + return nil, nil, err } challengeCount := fallbackPostChallengeCount(uint64(len(sectorInfo.Values()))) @@ -339,10 +339,11 @@ func (sb *SectorBuilder) GenerateFallbackPoSt(sectorInfo SortedPublicSectorInfo, proverID := addressToProverID(sb.Miner) candidates, err := sectorbuilder.GenerateCandidates(sb.ssize, proverID, challengeSeed, challengeCount, privsectors) if err != nil { - return nil, err + return nil, nil, err } - return sectorbuilder.GeneratePoSt(sb.ssize, proverID, privsectors, challengeSeed, candidates) + proof, err := sectorbuilder.GeneratePoSt(sb.ssize, proverID, privsectors, challengeSeed, candidates) + return candidates, proof, err } var UserBytesForSectorSize = sectorbuilder.GetMaxUserBytesPerStagedSector diff --git a/node/impl/full/state.go b/node/impl/full/state.go index a51276d6c..04037c079 100644 --- a/node/impl/full/state.go +++ b/node/impl/full/state.go @@ -76,8 +76,8 @@ func (a *StateAPI) StateMinerPeerID(ctx context.Context, m address.Address, ts * return stmgr.GetMinerPeerID(ctx, a.StateManager, ts, m) } -func (a *StateAPI) StateMinerProvingPeriodEnd(ctx context.Context, actor address.Address, ts *types.TipSet) (uint64, error) { - return stmgr.GetMinerProvingPeriodEnd(ctx, a.StateManager, ts, actor) +func (a *StateAPI) StateMinerElectionPeriodStart(ctx context.Context, actor address.Address, ts *types.TipSet) (uint64, error) { + return stmgr.GetMinerElectionPeriodStart(ctx, a.StateManager, ts, actor) } func (a *StateAPI) StateMinerSectorSize(ctx context.Context, actor address.Address, ts *types.TipSet) (uint64, error) { diff --git a/storage/fpost_run.go b/storage/fpost_run.go new file mode 100644 index 000000000..ba0990ef3 --- /dev/null +++ b/storage/fpost_run.go @@ -0,0 +1,138 @@ +package storage + +import ( + "context" + ffi "github.com/filecoin-project/filecoin-ffi" + "github.com/filecoin-project/lotus/build" + "github.com/filecoin-project/lotus/chain/actors" + "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/lib/sectorbuilder" + "go.opencensus.io/trace" + "golang.org/x/xerrors" + "time" +) + +func (s *fpostScheduler) doPost(ctx context.Context, eps uint64, ts *types.TipSet) { + ctx, abort := context.WithCancel(ctx) + + s.abort = abort + s.activeEPS = eps + + go func() { + defer abort() + + ctx, span := trace.StartSpan(ctx, "fpostScheduler.doPost") + defer span.End() + + proof, err := s.runPost(ctx, eps, ts) + if err != nil { + log.Errorf("runPost failed: %+v", err) + return + } + + if err := s.submitPost(ctx, proof); err != nil { + log.Errorf("submitPost failed: %+v", err) + return + } + }() +} + +func (s *fpostScheduler) runPost(ctx context.Context, eps uint64, ts *types.TipSet) (*actors.SubmitFallbackPoStParams, error) { + ctx, span := trace.StartSpan(ctx, "storage.runPost") + defer span.End() + + challengeRound := int64(eps + build.FallbackPoStDelay) + + rand, err := s.api.ChainGetRandomness(ctx, ts.Key(), challengeRound) + if err != nil { + return nil, xerrors.Errorf("failed to get chain randomness for fpost (ts=%d; eps=%d): %w", ts.Height(), eps, err) + } + + ssi, err := s.sortedSectorInfo(ctx, ts) + if err != nil { + return nil, xerrors.Errorf("getting sorted sector info: %w", err) + } + + log.Infow("running fPoSt", "chain-random", rand, "eps", eps, "height", ts.Height()) + + tsStart := time.Now() + var faults []uint64 // TODO + + var seed [32]byte + copy(seed[:], rand) + + scandidates, proof, err := s.sb.GenerateFallbackPoSt(ssi, seed, faults) + if err != nil { + return nil, xerrors.Errorf("running post failed: %w", err) + } + + elapsed := time.Since(tsStart) + log.Infow("submitting PoSt", "pLen", len(proof), "elapsed", elapsed) + + candidates := make([]types.EPostTicket, len(scandidates)) + for i, sc := range scandidates { + candidates[i] = types.EPostTicket{ + Partial: sc.PartialTicket[:], + SectorID: sc.SectorID, + ChallengeIndex: sc.SectorChallengeIndex, + } + } + + return &actors.SubmitFallbackPoStParams{ + Proof: proof, + Candidates: candidates, + }, nil +} + +func (s *fpostScheduler) sortedSectorInfo(ctx context.Context, ts *types.TipSet) (sectorbuilder.SortedPublicSectorInfo, error) { + sset, err := s.api.StateMinerProvingSet(ctx, s.actor, ts) + if err != nil { + return sectorbuilder.SortedPublicSectorInfo{}, xerrors.Errorf("failed to get proving set for miner (tsH: %d): %w", ts.Height(), err) + } + if len(sset) == 0 { + log.Warn("empty proving set! (ts.H: %d)", ts.Height()) + } + + sbsi := make([]ffi.PublicSectorInfo, len(sset)) + for k, sector := range sset { + var commR [sectorbuilder.CommLen]byte + copy(commR[:], sector.CommR) + + sbsi[k] = ffi.PublicSectorInfo{ + SectorID: sector.SectorID, + CommR: commR, + } + } + + return sectorbuilder.NewSortedPublicSectorInfo(sbsi), nil +} + +func (s *fpostScheduler) submitPost(ctx context.Context, proof *actors.SubmitFallbackPoStParams) error { + ctx, span := trace.StartSpan(ctx, "storage.commitPost") + defer span.End() + + enc, aerr := actors.SerializeParams(proof) + if aerr != nil { + return xerrors.Errorf("could not serialize submit post parameters: %w", aerr) + } + + msg := &types.Message{ + To: s.actor, + From: s.worker, + Method: actors.MAMethods.SubmitFallbackPoSt, + Params: enc, + Value: types.NewInt(1000), // currently hard-coded late fee in actor, returned if not late + GasLimit: types.NewInt(10000000), // i dont know help + GasPrice: types.NewInt(1), + } + + // TODO: consider maybe caring about the output + sm, err := s.api.MpoolPushMessage(ctx, msg) + if err != nil { + return xerrors.Errorf("pushing message to mpool: %w", err) + } + + log.Infof("Submitted fallback post: %s", sm.Cid()) + + return nil +} diff --git a/storage/fpost_sched.go b/storage/fpost_sched.go new file mode 100644 index 000000000..565a3604f --- /dev/null +++ b/storage/fpost_sched.go @@ -0,0 +1,141 @@ +package storage + +import ( + "context" + + "go.opencensus.io/trace" + "golang.org/x/xerrors" + + "github.com/filecoin-project/lotus/build" + "github.com/filecoin-project/lotus/chain/address" + "github.com/filecoin-project/lotus/chain/store" + "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/lib/sectorbuilder" +) + +const Inactive = 0 + +const StartConfidence = 4 // TODO: config + +type fpostScheduler struct { + api storageMinerApi + sb *sectorbuilder.SectorBuilder + + actor address.Address + worker address.Address + + cur *types.TipSet + + // if a post is in progress, this indicates for which ElectionPeriodStart + activeEPS uint64 + abort context.CancelFunc +} + +func (s *fpostScheduler) run(ctx context.Context) { + notifs, err := s.api.ChainNotify(ctx) + if err != nil { + return + } + + current := <-notifs + if len(current) != 1 { + panic("expected first notif to have len = 1") + } + if current[0].Type == store.HCCurrent { + panic("expected first notif to tell current ts") + } + + if err := s.update(ctx, current[0].Val); err != nil { + panic(err) + } + + // not fine to panic after this point + for { + select { + case changes := <-notifs: + ctx, span := trace.StartSpan(ctx, "fpostScheduler.headChange") + + var lowest, highest *types.TipSet = s.cur, nil + + for _, change := range changes { + switch change.Type { + case store.HCRevert: + lowest = change.Val + case store.HCApply: + highest = change.Val + } + } + + if err := s.revert(ctx, lowest); err != nil { + log.Error("handling head reverts in fallbackPost sched: %+v", err) + } + if err := s.update(ctx, highest); err != nil { + log.Error("handling head updates in fallbackPost sched: %+v", err) + } + + span.End() + } + } +} + +func (s *fpostScheduler) revert(ctx context.Context, newLowest *types.TipSet) error { + if s.cur == newLowest { + return nil + } + s.cur = newLowest + + newEPS, _, err := s.shouldFallbackPost(ctx, newLowest) + if err != nil { + return err + } + + if newEPS != s.activeEPS { + s.abortActivePoSt() + } + + return nil +} + +func (s *fpostScheduler) update(ctx context.Context, new *types.TipSet) error { + newEPS, start, err := s.shouldFallbackPost(ctx, new) + if err != nil { + return err + } + + if newEPS != s.activeEPS { + s.abortActivePoSt() + } + + if newEPS != Inactive && start { + s.doPost(ctx, newEPS, new) + } + + return nil +} + +func (s *fpostScheduler) abortActivePoSt() { + if s.activeEPS == Inactive { + return // noop + } + + if s.abort != nil { + s.abort() + } + + log.Warnf("Aborting Fallback PoSt (EPS: %d)", s.activeEPS) + + s.activeEPS = 0 + s.abort = nil +} + +func (s *fpostScheduler) shouldFallbackPost(ctx context.Context, ts *types.TipSet) (uint64, bool, error) { + eps, err := s.api.StateMinerElectionPeriodStart(ctx, s.actor, ts) + if err != nil { + return 0, false, xerrors.Errorf("getting ElectionPeriodStart: %w", err) + } + + if ts.Height() >= eps+build.FallbackPoStDelay { + return eps, ts.Height() >= eps+build.FallbackPoStDelay+StartConfidence, nil + } + return 0, false, nil +} diff --git a/storage/miner.go b/storage/miner.go index 206bbbe4d..15f65e60c 100644 --- a/storage/miner.go +++ b/storage/miner.go @@ -53,7 +53,7 @@ type storageMinerApi interface { // Call a read only method on actors (no interaction with the chain required) StateCall(ctx context.Context, msg *types.Message, ts *types.TipSet) (*types.MessageReceipt, error) StateMinerWorker(context.Context, address.Address, *types.TipSet) (address.Address, error) - StateMinerProvingPeriodEnd(context.Context, address.Address, *types.TipSet) (uint64, error) + StateMinerElectionPeriodStart(ctx context.Context, actor address.Address, ts *types.TipSet) (uint64, error) StateMinerSectors(context.Context, address.Address, *types.TipSet) ([]*api.ChainSectorInfo, error) StateMinerProvingSet(context.Context, address.Address, *types.TipSet) ([]*api.ChainSectorInfo, error) StateMinerSectorSize(context.Context, address.Address, *types.TipSet) (uint64, error) @@ -99,7 +99,14 @@ func (m *Miner) Run(ctx context.Context) error { m.events = events.NewEvents(ctx, m.api) - go m.beginPosting(ctx) + fps := &fpostScheduler{ + api: m.api, + sb: m.sb, + actor: m.maddr, + worker: m.worker, + } + + go fps.run(ctx) go m.sectorStateLoop(ctx) return nil } diff --git a/storage/post.go b/storage/post.go deleted file mode 100644 index 541d23b25..000000000 --- a/storage/post.go +++ /dev/null @@ -1,298 +0,0 @@ -package storage - -import ( - "context" - "time" - - ffi "github.com/filecoin-project/filecoin-ffi" - - "go.opencensus.io/trace" - "golang.org/x/xerrors" - - "github.com/filecoin-project/lotus/api" - "github.com/filecoin-project/lotus/build" - "github.com/filecoin-project/lotus/chain/actors" - "github.com/filecoin-project/lotus/chain/types" - "github.com/filecoin-project/lotus/lib/sectorbuilder" -) - -const postMsgTimeout = 20 - -func (m *Miner) beginPosting(ctx context.Context) { - ts, err := m.api.ChainHead(context.TODO()) - if err != nil { - log.Error(err) - return - } - - sppe, err := m.api.StateMinerProvingPeriodEnd(ctx, m.maddr, ts) - if err != nil { - log.Errorf("failed to get proving period end for miner (ts h: %d): %s", ts.Height(), err) - return - } - - if sppe == 0 { - log.Warn("Not proving yet") - return - } - - m.postLk.Lock() - if m.schedPost > 0 { - log.Warnf("PoSts already running %d", m.schedPost) - m.postLk.Unlock() - return - } - - // height needs to be +1, because otherwise we'd be trying to schedule PoSt - // at current block height - ppe, _ := actors.ProvingPeriodEnd(sppe, ts.Height()+1) - m.schedPost = ppe - - m.postLk.Unlock() - - if build.PoStChallangeTime > ppe { - ppe = build.PoStChallangeTime - } - - log.Infof("Scheduling post at height %d (begin ts: %d, statePPE: %d)", ppe-build.PoStChallangeTime, ts.Height(), sppe) - err = m.events.ChainAt(m.computePost(m.schedPost), func(ctx context.Context, ts *types.TipSet) error { // Revert - // TODO: Cancel post - log.Errorf("TODO: Cancel PoSt, re-run") - return nil - }, PoStConfidence, ppe-build.PoStChallangeTime) - if err != nil { - // TODO: This is BAD, figure something out - log.Errorf("scheduling PoSt failed: %s", err) - return - } -} - -func (m *Miner) scheduleNextPost(ppe uint64) { - ts, err := m.api.ChainHead(context.TODO()) - if err != nil { - log.Error(err) - // TODO: retry - return - } - - headPPE, provingPeriod := actors.ProvingPeriodEnd(ppe, ts.Height()) - if headPPE > ppe { - log.Warnw("PoSt computation running behind chain", "headPPE", headPPE, "ppe", ppe) - ppe = headPPE - } - - m.postLk.Lock() - if m.schedPost >= ppe { - // this probably can't happen - log.Errorw("PoSt already scheduled", "schedPost", m.schedPost, "ppe", ppe) - m.postLk.Unlock() - return - } - - m.schedPost = ppe - m.postLk.Unlock() - - log.Infow("scheduling PoSt", "post-height", ppe-build.PoStChallangeTime, - "height", ts.Height(), "ppe", ppe, "proving-period", provingPeriod) - err = m.events.ChainAt(m.computePost(ppe), func(ctx context.Context, ts *types.TipSet) error { // Revert - // TODO: Cancel post - log.Errorf("TODO: Cancel PoSt, re-run") - return nil - }, PoStConfidence, ppe-build.PoStChallangeTime) - if err != nil { - // TODO: This is BAD, figure something out - log.Errorf("scheduling PoSt failed: %+v", err) - return - } -} - -type post struct { - m *Miner - - ppe uint64 - ts *types.TipSet - - // prep - sset []*api.ChainSectorInfo - r []byte - - // run - proof []byte - - // commit - smsg *types.SignedMessage -} - -func (p *post) doPost(ctx context.Context) error { - ctx, span := trace.StartSpan(ctx, "storage.computePost") - defer span.End() - - if err := p.preparePost(ctx); err != nil { - return xerrors.Errorf("prepare: %w", err) - } - - if err := p.runPost(ctx); err != nil { - return xerrors.Errorf("run: %w", err) - } - - if err := p.commitPost(ctx); err != nil { - return xerrors.Errorf("commit: %w", err) - } - - if err := p.waitCommit(ctx); err != nil { - return xerrors.Errorf("wait: %w", err) - } - - return nil -} - -func (p *post) preparePost(ctx context.Context) error { - ctx, span := trace.StartSpan(ctx, "storage.preparePost") - defer span.End() - log.Info("preparePost") - - sset, err := p.m.api.StateMinerProvingSet(ctx, p.m.maddr, p.ts) - if err != nil { - return xerrors.Errorf("failed to get proving set for miner (tsH: %d): %w", p.ts.Height(), err) - } - if len(sset) == 0 { - log.Warn("empty proving set! (ts.H: %d)", p.ts.Height()) - } - - p.sset = sset - - challengeRound := int64(p.ppe) - int64(build.PoStChallangeTime+build.PoStRandomnessLookback) - r, err := p.m.api.ChainGetRandomness(ctx, p.ts.Key(), challengeRound) - if err != nil { - return xerrors.Errorf("failed to get chain randomness for post (ts=%d; ppe=%d): %w", p.ts.Height(), p.ppe, err) - } - p.r = r - - return nil -} - -func (p *post) sortedSectorInfo() sectorbuilder.SortedPublicSectorInfo { - panic("NYI") - sbsi := make([]ffi.PublicSectorInfo, len(p.sset)) - for k, sector := range p.sset { - var commR [sectorbuilder.CommLen]byte - copy(commR[:], sector.CommR) - - sbsi[k] = ffi.PublicSectorInfo{ - SectorID: sector.SectorID, - CommR: commR, - } - } - - return sectorbuilder.NewSortedPublicSectorInfo(sbsi) -} - -func (p *post) runPost(ctx context.Context) error { - ctx, span := trace.StartSpan(ctx, "storage.runPost") - defer span.End() - - log.Infow("running PoSt", "delayed-by", - int64(p.ts.Height())-(int64(p.ppe)-int64(build.PoStChallangeTime)), - "chain-random", p.r, "ppe", p.ppe, "height", p.ts.Height(), "sectors", len(p.sset)) - - tsStart := time.Now() - var faults []uint64 // TODO - - var seed [32]byte - copy(seed[:], p.r) - - proof, err := p.m.sb.GenerateFallbackPoSt(p.sortedSectorInfo(), seed, faults) - if err != nil { - return xerrors.Errorf("running post failed: %w", err) - } - elapsed := time.Since(tsStart) - - p.proof = proof - log.Infow("submitting PoSt", "pLen", len(proof), "elapsed", elapsed) - - return nil -} - -func (p *post) commitPost(ctx context.Context) (err error) { - ctx, span := trace.StartSpan(ctx, "storage.commitPost") - defer span.End() - - params := &actors.SubmitPoStParams{ - Proof: p.proof, - } - - enc, aerr := actors.SerializeParams(params) - if aerr != nil { - return xerrors.Errorf("could not serialize submit post parameters: %w", aerr) - } - - msg := &types.Message{ - To: p.m.maddr, - From: p.m.worker, - Method: actors.MAMethods.SubmitPoSt, - Params: enc, - Value: types.NewInt(1000), // currently hard-coded late fee in actor, returned if not late - GasLimit: types.NewInt(1000000), // i dont know help - GasPrice: types.NewInt(1), - } - - log.Info("mpush") - - p.smsg, err = p.m.api.MpoolPushMessage(ctx, msg) - if err != nil { - return xerrors.Errorf("pushing message to mpool: %w", err) - } - - return nil -} - -func (p *post) waitCommit(ctx context.Context) error { - ctx, span := trace.StartSpan(ctx, "storage.waitPost") - defer span.End() - - log.Infof("Waiting for post %s to appear on chain", p.smsg.Cid()) - - err := p.m.events.CalledMsg(ctx, func(msg *types.Message, rec *types.MessageReceipt, ts *types.TipSet, curH uint64) (more bool, err error) { - if rec.ExitCode != 0 { - log.Warnf("SubmitPoSt EXIT: %d", rec.ExitCode) - } - - log.Infof("Post made it on chain! (height=%d)", ts.Height()) - - return false, nil - }, func(ctx context.Context, ts *types.TipSet) error { - log.Warn("post message reverted") - return nil - }, 3, postMsgTimeout, p.smsg) - if err != nil { - return xerrors.Errorf("waiting for post to appear on chain: %w", err) - } - - return nil -} - -func (m *Miner) computePost(ppe uint64) func(ctx context.Context, ts *types.TipSet, curH uint64) error { - called := 0 - return func(ctx context.Context, ts *types.TipSet, curH uint64) error { - called++ - if called > 1 { - log.Errorw("BUG: computePost callback called again", "ppe", ppe, - "height", ts.Height(), "curH", curH, "called", called-1) - return nil - } - - err := (&post{ - m: m, - ppe: ppe, - ts: ts, - }).doPost(ctx) - - m.scheduleNextPost(ppe + build.ProvingPeriodDuration) - - if err != nil { - return xerrors.Errorf("doPost: %w", err) - } - - return nil - } -} diff --git a/storage/sector_states.go b/storage/sector_states.go index ed5666285..455fa15b8 100644 --- a/storage/sector_states.go +++ b/storage/sector_states.go @@ -220,8 +220,6 @@ func (m *Miner) committing(ctx context.Context, sector SectorInfo) (func(*Sector return nil, xerrors.New("UNHANDLED: submitting sector proof failed") } - m.beginPosting(ctx) - return func(info *SectorInfo) { mcid := smsg.Cid() info.CommitMessage = &mcid From 367512d0670a3ff44bb9fa367f101986ac8426d1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 28 Nov 2019 19:08:10 +0100 Subject: [PATCH 5/5] Some fpost fixes --- api/struct.go | 46 +++++++++++++++++++++--------------------- chain/vm/invoker.go | 2 +- storage/fpost_run.go | 8 +++++--- storage/fpost_sched.go | 2 +- 4 files changed, 30 insertions(+), 28 deletions(-) diff --git a/api/struct.go b/api/struct.go index e8f27680f..f47836f6a 100644 --- a/api/struct.go +++ b/api/struct.go @@ -84,28 +84,28 @@ type FullNodeStruct struct { ClientRetrieve func(ctx context.Context, order RetrievalOrder, path string) error `perm:"admin"` ClientQueryAsk func(ctx context.Context, p peer.ID, miner address.Address) (*types.SignedStorageAsk, error) `perm:"read"` - StateMinerSectors func(context.Context, address.Address, *types.TipSet) ([]*ChainSectorInfo, error) `perm:"read"` - StateMinerProvingSet func(context.Context, address.Address, *types.TipSet) ([]*ChainSectorInfo, error) `perm:"read"` - StateMinerPower func(context.Context, address.Address, *types.TipSet) (MinerPower, error) `perm:"read"` - StateMinerWorker func(context.Context, address.Address, *types.TipSet) (address.Address, error) `perm:"read"` - StateMinerPeerID func(ctx context.Context, m address.Address, ts *types.TipSet) (peer.ID, error) `perm:"read"` - StateMinerProvingPeriodEnd func(ctx context.Context, actor address.Address, ts *types.TipSet) (uint64, error) `perm:"read"` - StateMinerSectorSize func(context.Context, address.Address, *types.TipSet) (uint64, error) `perm:"read"` - StateCall func(context.Context, *types.Message, *types.TipSet) (*types.MessageReceipt, error) `perm:"read"` - StateReplay func(context.Context, *types.TipSet, cid.Cid) (*ReplayResults, error) `perm:"read"` - StateGetActor func(context.Context, address.Address, *types.TipSet) (*types.Actor, error) `perm:"read"` - StateReadState func(context.Context, *types.Actor, *types.TipSet) (*ActorState, error) `perm:"read"` - StatePledgeCollateral func(context.Context, *types.TipSet) (types.BigInt, error) `perm:"read"` - StateWaitMsg func(context.Context, cid.Cid) (*MsgWait, error) `perm:"read"` - StateListMiners func(context.Context, *types.TipSet) ([]address.Address, error) `perm:"read"` - StateListActors func(context.Context, *types.TipSet) ([]address.Address, error) `perm:"read"` - StateMarketBalance func(context.Context, address.Address, *types.TipSet) (actors.StorageParticipantBalance, error) `perm:"read"` - StateMarketParticipants func(context.Context, *types.TipSet) (map[string]actors.StorageParticipantBalance, error) `perm:"read"` - StateMarketDeals func(context.Context, *types.TipSet) (map[string]actors.OnChainDeal, error) `perm:"read"` - StateMarketStorageDeal func(context.Context, uint64, *types.TipSet) (*actors.OnChainDeal, error) `perm:"read"` - StateLookupID func(ctx context.Context, addr address.Address, ts *types.TipSet) (address.Address, error) `perm:"read"` - StateChangedActors func(context.Context, cid.Cid, cid.Cid) (map[string]types.Actor, error) `perm:"read"` - StateGetReceipt func(context.Context, cid.Cid, *types.TipSet) (*types.MessageReceipt, error) `perm:"read"` + StateMinerSectors func(context.Context, address.Address, *types.TipSet) ([]*ChainSectorInfo, error) `perm:"read"` + StateMinerProvingSet func(context.Context, address.Address, *types.TipSet) ([]*ChainSectorInfo, error) `perm:"read"` + StateMinerPower func(context.Context, address.Address, *types.TipSet) (MinerPower, error) `perm:"read"` + StateMinerWorker func(context.Context, address.Address, *types.TipSet) (address.Address, error) `perm:"read"` + StateMinerPeerID func(ctx context.Context, m address.Address, ts *types.TipSet) (peer.ID, error) `perm:"read"` + StateMinerElectionPeriodStart func(ctx context.Context, actor address.Address, ts *types.TipSet) (uint64, error) `perm:"read"` + StateMinerSectorSize func(context.Context, address.Address, *types.TipSet) (uint64, error) `perm:"read"` + StateCall func(context.Context, *types.Message, *types.TipSet) (*types.MessageReceipt, error) `perm:"read"` + StateReplay func(context.Context, *types.TipSet, cid.Cid) (*ReplayResults, error) `perm:"read"` + StateGetActor func(context.Context, address.Address, *types.TipSet) (*types.Actor, error) `perm:"read"` + StateReadState func(context.Context, *types.Actor, *types.TipSet) (*ActorState, error) `perm:"read"` + StatePledgeCollateral func(context.Context, *types.TipSet) (types.BigInt, error) `perm:"read"` + StateWaitMsg func(context.Context, cid.Cid) (*MsgWait, error) `perm:"read"` + StateListMiners func(context.Context, *types.TipSet) ([]address.Address, error) `perm:"read"` + StateListActors func(context.Context, *types.TipSet) ([]address.Address, error) `perm:"read"` + StateMarketBalance func(context.Context, address.Address, *types.TipSet) (actors.StorageParticipantBalance, error) `perm:"read"` + StateMarketParticipants func(context.Context, *types.TipSet) (map[string]actors.StorageParticipantBalance, error) `perm:"read"` + StateMarketDeals func(context.Context, *types.TipSet) (map[string]actors.OnChainDeal, error) `perm:"read"` + StateMarketStorageDeal func(context.Context, uint64, *types.TipSet) (*actors.OnChainDeal, error) `perm:"read"` + StateLookupID func(ctx context.Context, addr address.Address, ts *types.TipSet) (address.Address, error) `perm:"read"` + StateChangedActors func(context.Context, cid.Cid, cid.Cid) (map[string]types.Actor, error) `perm:"read"` + StateGetReceipt func(context.Context, cid.Cid, *types.TipSet) (*types.MessageReceipt, error) `perm:"read"` MarketEnsureAvailable func(context.Context, address.Address, types.BigInt) error `perm:"sign"` @@ -363,7 +363,7 @@ func (c *FullNodeStruct) StateMinerPeerID(ctx context.Context, m address.Address } func (c *FullNodeStruct) StateMinerElectionPeriodStart(ctx context.Context, actor address.Address, ts *types.TipSet) (uint64, error) { - return c.Internal.StateMinerProvingPeriodEnd(ctx, actor, ts) + return c.Internal.StateMinerElectionPeriodStart(ctx, actor, ts) } func (c *FullNodeStruct) StateMinerSectorSize(ctx context.Context, actor address.Address, ts *types.TipSet) (uint64, error) { diff --git a/chain/vm/invoker.go b/chain/vm/invoker.go index 3fa02d4c1..c53b75c24 100644 --- a/chain/vm/invoker.go +++ b/chain/vm/invoker.go @@ -159,7 +159,7 @@ func DumpActorState(code cid.Cid, b []byte) (interface{}, error) { typ, ok := i.builtInState[code] if !ok { - return nil, xerrors.New("state type for actor not found") + return nil, xerrors.Errorf("state type for actor %s not found", code) } rv := reflect.New(typ) diff --git a/storage/fpost_run.go b/storage/fpost_run.go index ba0990ef3..a2cf88810 100644 --- a/storage/fpost_run.go +++ b/storage/fpost_run.go @@ -2,14 +2,16 @@ package storage import ( "context" + "time" + ffi "github.com/filecoin-project/filecoin-ffi" + "go.opencensus.io/trace" + "golang.org/x/xerrors" + "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain/actors" "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/lib/sectorbuilder" - "go.opencensus.io/trace" - "golang.org/x/xerrors" - "time" ) func (s *fpostScheduler) doPost(ctx context.Context, eps uint64, ts *types.TipSet) { diff --git a/storage/fpost_sched.go b/storage/fpost_sched.go index 565a3604f..9480dc3c2 100644 --- a/storage/fpost_sched.go +++ b/storage/fpost_sched.go @@ -41,7 +41,7 @@ func (s *fpostScheduler) run(ctx context.Context) { if len(current) != 1 { panic("expected first notif to have len = 1") } - if current[0].Type == store.HCCurrent { + if current[0].Type != store.HCCurrent { panic("expected first notif to tell current ts") }