diff --git a/api/api.go b/api/api.go index c531624fe..c86215f03 100644 --- a/api/api.go +++ b/api/api.go @@ -9,7 +9,6 @@ import ( "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" - "github.com/filecoin-project/go-lotus/chain" "github.com/filecoin-project/go-lotus/chain/address" "github.com/filecoin-project/go-lotus/chain/store" "github.com/filecoin-project/go-lotus/chain/types" @@ -45,14 +44,18 @@ type FullNode interface { Common // chain - ChainNotify(context.Context) (<-chan *store.HeadChange, error) + + // ChainNotify returns channel with chain head updates + // First message is guaranteed to be of len == 1, and type == 'current' + ChainNotify(context.Context) (<-chan []*store.HeadChange, error) ChainHead(context.Context) (*types.TipSet, error) // TODO: check serialization - ChainSubmitBlock(ctx context.Context, blk *chain.BlockMsg) error // TODO: check serialization + ChainSubmitBlock(ctx context.Context, blk *types.BlockMsg) error // TODO: check serialization ChainGetRandomness(context.Context, *types.TipSet, []*types.Ticket, int) ([]byte, error) ChainWaitMsg(context.Context, cid.Cid) (*MsgWait, error) ChainGetBlock(context.Context, cid.Cid) (*types.BlockHeader, error) ChainGetBlockMessages(context.Context, cid.Cid) (*BlockMessages, error) ChainGetBlockReceipts(context.Context, cid.Cid) ([]*types.MessageReceipt, error) + ChainGetTipSetByHeight(context.Context, uint64, *types.TipSet) (*types.TipSet, error) // messages @@ -68,7 +71,7 @@ type FullNode interface { MinerRegister(context.Context, address.Address) error MinerUnregister(context.Context, address.Address) error MinerAddresses(context.Context) ([]address.Address, error) - MinerCreateBlock(context.Context, address.Address, *types.TipSet, []*types.Ticket, types.ElectionProof, []*types.SignedMessage, uint64) (*chain.BlockMsg, error) + MinerCreateBlock(context.Context, address.Address, *types.TipSet, []*types.Ticket, types.ElectionProof, []*types.SignedMessage, uint64) (*types.BlockMsg, error) // // UX ? @@ -101,17 +104,19 @@ type FullNode interface { //ClientListAsks() []Ask - StateMinerSectors(context.Context, address.Address) ([]*SectorInfo, error) - StateMinerProvingSet(context.Context, address.Address) ([]*SectorInfo, error) - StateMinerPower(context.Context, address.Address, *types.TipSet) (MinerPower, error) - StateMinerWorker(context.Context, address.Address, *types.TipSet) (address.Address, error) - StateMinerPeerID(ctx context.Context, m address.Address, ts *types.TipSet) (peer.ID, error) // if tipset is nil, we'll use heaviest StateCall(context.Context, *types.Message, *types.TipSet) (*types.MessageReceipt, error) StateReplay(context.Context, *types.TipSet, cid.Cid) (*ReplayResults, error) StateGetActor(ctx context.Context, actor address.Address, ts *types.TipSet) (*types.Actor, error) StateReadState(ctx context.Context, act *types.Actor, ts *types.TipSet) (*ActorState, error) + StateMinerSectors(context.Context, address.Address) ([]*SectorInfo, error) + StateMinerProvingSet(context.Context, address.Address, *types.TipSet) ([]*SectorInfo, error) + StateMinerPower(context.Context, address.Address, *types.TipSet) (MinerPower, error) + StateMinerWorker(context.Context, address.Address, *types.TipSet) (address.Address, error) + StateMinerPeerID(ctx context.Context, m address.Address, ts *types.TipSet) (peer.ID, error) + StateMinerProvingPeriodEnd(ctx context.Context, actor address.Address, ts *types.TipSet) (uint64, error) + PaychGet(ctx context.Context, from, to address.Address, ensureFunds types.BigInt) (*ChannelInfo, error) PaychList(context.Context) ([]address.Address, error) PaychStatus(context.Context, address.Address) (*PaychStatus, error) diff --git a/api/struct.go b/api/struct.go index 87d891d6b..e6db41d14 100644 --- a/api/struct.go +++ b/api/struct.go @@ -5,7 +5,6 @@ import ( "github.com/libp2p/go-libp2p-core/network" - "github.com/filecoin-project/go-lotus/chain" "github.com/filecoin-project/go-lotus/chain/address" "github.com/filecoin-project/go-lotus/chain/store" "github.com/filecoin-project/go-lotus/chain/types" @@ -39,14 +38,15 @@ type FullNodeStruct struct { CommonStruct Internal struct { - ChainNotify func(context.Context) (<-chan *store.HeadChange, error) `perm:"read"` - ChainSubmitBlock func(ctx context.Context, blk *chain.BlockMsg) error `perm:"write"` - ChainHead func(context.Context) (*types.TipSet, error) `perm:"read"` - ChainGetRandomness func(context.Context, *types.TipSet, []*types.Ticket, int) ([]byte, error) `perm:"read"` - ChainWaitMsg func(context.Context, cid.Cid) (*MsgWait, error) `perm:"read"` - ChainGetBlock func(context.Context, cid.Cid) (*types.BlockHeader, error) `perm:"read"` - ChainGetBlockMessages func(context.Context, cid.Cid) (*BlockMessages, error) `perm:"read"` - ChainGetBlockReceipts func(context.Context, cid.Cid) ([]*types.MessageReceipt, error) `perm:"read"` + ChainNotify func(context.Context) (<-chan []*store.HeadChange, error) `perm:"read"` + ChainSubmitBlock func(ctx context.Context, blk *types.BlockMsg) error `perm:"write"` + ChainHead func(context.Context) (*types.TipSet, error) `perm:"read"` + ChainGetRandomness func(context.Context, *types.TipSet, []*types.Ticket, int) ([]byte, error) `perm:"read"` + ChainWaitMsg func(context.Context, cid.Cid) (*MsgWait, error) `perm:"read"` + ChainGetBlock func(context.Context, cid.Cid) (*types.BlockHeader, error) `perm:"read"` + ChainGetBlockMessages func(context.Context, cid.Cid) (*BlockMessages, error) `perm:"read"` + ChainGetBlockReceipts func(context.Context, cid.Cid) ([]*types.MessageReceipt, error) `perm:"read"` + ChainGetTipSetByHeight func(context.Context, uint64, *types.TipSet) (*types.TipSet, error) `perm:"read"` MpoolPending func(context.Context, *types.TipSet) ([]*types.SignedMessage, error) `perm:"read"` MpoolPush func(context.Context, *types.SignedMessage) error `perm:"write"` @@ -55,7 +55,7 @@ type FullNodeStruct struct { MinerRegister func(context.Context, address.Address) error `perm:"admin"` MinerUnregister func(context.Context, address.Address) error `perm:"admin"` MinerAddresses func(context.Context) ([]address.Address, error) `perm:"write"` - MinerCreateBlock func(context.Context, address.Address, *types.TipSet, []*types.Ticket, types.ElectionProof, []*types.SignedMessage, uint64) (*chain.BlockMsg, error) `perm:"write"` + MinerCreateBlock func(context.Context, address.Address, *types.TipSet, []*types.Ticket, types.ElectionProof, []*types.SignedMessage, uint64) (*types.BlockMsg, error) `perm:"write"` WalletNew func(context.Context, string) (address.Address, error) `perm:"write"` WalletHas func(context.Context, address.Address) (bool, error) `perm:"write"` @@ -75,15 +75,16 @@ type FullNodeStruct struct { ClientRetrieve func(ctx context.Context, order RetrievalOrder, path string) error `perm:"admin"` ClientQueryAsk func(ctx context.Context, p peer.ID, miner address.Address) (*types.SignedStorageAsk, error) `perm:"read"` - StateMinerSectors func(context.Context, address.Address) ([]*SectorInfo, error) `perm:"read"` - StateMinerProvingSet func(context.Context, address.Address) ([]*SectorInfo, error) `perm:"read"` - StateMinerPower func(context.Context, address.Address, *types.TipSet) (MinerPower, error) `perm:"read"` - StateMinerWorker func(context.Context, address.Address, *types.TipSet) (address.Address, error) `perm:"read"` - StateMinerPeerID func(ctx context.Context, m address.Address, ts *types.TipSet) (peer.ID, error) `perm:"read"` - StateCall func(context.Context, *types.Message, *types.TipSet) (*types.MessageReceipt, error) `perm:"read"` - StateReplay func(context.Context, *types.TipSet, cid.Cid) (*ReplayResults, error) `perm:"read"` - StateGetActor func(context.Context, address.Address, *types.TipSet) (*types.Actor, error) `perm:"read"` - StateReadState func(context.Context, *types.Actor, *types.TipSet) (*ActorState, error) `perm:"read"` + StateMinerSectors func(context.Context, address.Address) ([]*SectorInfo, error) `perm:"read"` + StateMinerProvingSet func(context.Context, address.Address, *types.TipSet) ([]*SectorInfo, error) `perm:"read"` + StateMinerPower func(context.Context, address.Address, *types.TipSet) (MinerPower, error) `perm:"read"` + StateMinerWorker func(context.Context, address.Address, *types.TipSet) (address.Address, error) `perm:"read"` + StateMinerPeerID func(ctx context.Context, m address.Address, ts *types.TipSet) (peer.ID, error) `perm:"read"` + StateMinerProvingPeriodEnd func(ctx context.Context, actor address.Address, ts *types.TipSet) (uint64, error) `perm:"read"` + StateCall func(context.Context, *types.Message, *types.TipSet) (*types.MessageReceipt, error) `perm:"read"` + StateReplay func(context.Context, *types.TipSet, cid.Cid) (*ReplayResults, error) `perm:"read"` + StateGetActor func(context.Context, address.Address, *types.TipSet) (*types.Actor, error) `perm:"read"` + StateReadState func(context.Context, *types.Actor, *types.TipSet) (*ActorState, error) `perm:"read"` PaychGet func(ctx context.Context, from, to address.Address, ensureFunds types.BigInt) (*ChannelInfo, error) `perm:"sign"` PaychList func(context.Context) ([]address.Address, error) `perm:"read"` @@ -211,11 +212,11 @@ func (c *FullNodeStruct) MinerAddresses(ctx context.Context) ([]address.Address, return c.Internal.MinerAddresses(ctx) } -func (c *FullNodeStruct) MinerCreateBlock(ctx context.Context, addr address.Address, base *types.TipSet, tickets []*types.Ticket, eproof types.ElectionProof, msgs []*types.SignedMessage, ts uint64) (*chain.BlockMsg, error) { +func (c *FullNodeStruct) MinerCreateBlock(ctx context.Context, addr address.Address, base *types.TipSet, tickets []*types.Ticket, eproof types.ElectionProof, msgs []*types.SignedMessage, ts uint64) (*types.BlockMsg, error) { return c.Internal.MinerCreateBlock(ctx, addr, base, tickets, eproof, msgs, ts) } -func (c *FullNodeStruct) ChainSubmitBlock(ctx context.Context, blk *chain.BlockMsg) error { +func (c *FullNodeStruct) ChainSubmitBlock(ctx context.Context, blk *types.BlockMsg) error { return c.Internal.ChainSubmitBlock(ctx, blk) } @@ -231,6 +232,10 @@ func (c *FullNodeStruct) ChainWaitMsg(ctx context.Context, msgc cid.Cid) (*MsgWa return c.Internal.ChainWaitMsg(ctx, msgc) } +func (c *FullNodeStruct) ChainGetTipSetByHeight(ctx context.Context, h uint64, ts *types.TipSet) (*types.TipSet, error) { + return c.Internal.ChainGetTipSetByHeight(ctx, h, ts) +} + func (c *FullNodeStruct) WalletNew(ctx context.Context, typ string) (address.Address, error) { return c.Internal.WalletNew(ctx, typ) } @@ -275,7 +280,7 @@ func (c *FullNodeStruct) ChainGetBlockReceipts(ctx context.Context, b cid.Cid) ( return c.Internal.ChainGetBlockReceipts(ctx, b) } -func (c *FullNodeStruct) ChainNotify(ctx context.Context) (<-chan *store.HeadChange, error) { +func (c *FullNodeStruct) ChainNotify(ctx context.Context) (<-chan []*store.HeadChange, error) { return c.Internal.ChainNotify(ctx) } @@ -283,8 +288,8 @@ func (c *FullNodeStruct) StateMinerSectors(ctx context.Context, addr address.Add return c.Internal.StateMinerSectors(ctx, addr) } -func (c *FullNodeStruct) StateMinerProvingSet(ctx context.Context, addr address.Address) ([]*SectorInfo, error) { - return c.Internal.StateMinerProvingSet(ctx, addr) +func (c *FullNodeStruct) StateMinerProvingSet(ctx context.Context, addr address.Address, ts *types.TipSet) ([]*SectorInfo, error) { + return c.Internal.StateMinerProvingSet(ctx, addr, ts) } func (c *FullNodeStruct) StateMinerPower(ctx context.Context, a address.Address, ts *types.TipSet) (MinerPower, error) { @@ -298,6 +303,9 @@ func (c *FullNodeStruct) StateMinerWorker(ctx context.Context, m address.Address func (c *FullNodeStruct) StateMinerPeerID(ctx context.Context, m address.Address, ts *types.TipSet) (peer.ID, error) { return c.Internal.StateMinerPeerID(ctx, m, ts) } +func (c *FullNodeStruct) StateMinerProvingPeriodEnd(ctx context.Context, actor address.Address, ts *types.TipSet) (uint64, error) { + return c.Internal.StateMinerProvingPeriodEnd(ctx, actor, ts) +} func (c *FullNodeStruct) StateCall(ctx context.Context, msg *types.Message, ts *types.TipSet) (*types.MessageReceipt, error) { return c.Internal.StateCall(ctx, msg, ts) diff --git a/build/chain.go b/build/chain.go index b490aeedf..ae00b8e55 100644 --- a/build/chain.go +++ b/build/chain.go @@ -1,5 +1,7 @@ package build -const BlockDelay = 5 +// Seconds +const BlockDelay = 3 +// Seconds const AllowableClockDrift = BlockDelay * 2 diff --git a/build/params.go b/build/params.go index 28da4058c..365fdc656 100644 --- a/build/params.go +++ b/build/params.go @@ -7,14 +7,20 @@ const UnixfsLinksPerLevel = 1024 const SectorSize = 1024 +// Blocks const PaymentChannelClosingDelay = 6 * 60 * 2 // six hours +// Blocks const DealVoucherSkewLimit = 10 +// Blocks const ForkLengthThreshold = 20 + +// Blocks const RandomnessLookback = 20 -const ProvingPeriodDuration = 2 * 60 // an hour, for now -const PoSTChallangeTime = 1 * 60 +// Blocks +const ProvingPeriodDuration = 10 +const PoSTChallangeTime = 5 // TODO: Move other important consts here diff --git a/chain/actors/actor_miner.go b/chain/actors/actor_miner.go index 02f6b5d08..18c656004 100644 --- a/chain/actors/actor_miner.go +++ b/chain/actors/actor_miner.go @@ -33,13 +33,11 @@ type StorageMinerActorState struct { DePledgeTime types.BigInt // All sectors this miner has committed. - Sectors cid.Cid // TODO: Using a HAMT for now, needs to be an AMT once we implement it - SectorSetSize uint64 // TODO: the AMT should be able to tell us how many items are in it. This field won't be needed at that point + Sectors cid.Cid // Sectors this miner is currently mining. It is only updated // when a PoSt is submitted (not as each new sector commitment is added). - ProvingSet cid.Cid - ProvingSetSize uint64 + ProvingSet cid.Cid // Faulty sectors reported since last SubmitPost, // up to the current proving period's challenge time. @@ -268,9 +266,13 @@ func (sma StorageMinerActor) CommitSector(act *types.Actor, vmctx types.VMContex // // Note: Proving period is a function of sector size; small sectors take less // time to prove than large sectors do. Sector size is selected when pledging. - if self.ProvingSetSize == 0 { + pss, lerr := amt.LoadAMT(types.WrapStorage(vmctx.Storage()), self.ProvingSet) + if lerr != nil { + return nil, aerrors.Escalate(lerr, "could not load proving set node") + } + + if pss.Count == 0 { self.ProvingSet = self.Sectors - self.ProvingSetSize = self.SectorSetSize self.ProvingPeriodEnd = vmctx.BlockHeight() + build.ProvingPeriodDuration } @@ -357,7 +359,7 @@ func (sma StorageMinerActor) SubmitPoSt(act *types.Actor, vmctx types.VMContext, pss, lerr := amt.LoadAMT(types.WrapStorage(vmctx.Storage()), self.ProvingSet) if lerr != nil { - return nil, aerrors.Escalate(lerr, "could not load sector set node") + return nil, aerrors.Escalate(lerr, "could not load proving set node") } var sectorInfos []sectorbuilder.SectorInfo @@ -400,7 +402,7 @@ func (sma StorageMinerActor) SubmitPoSt(act *types.Actor, vmctx types.VMContext, ss, lerr := amt.LoadAMT(types.WrapStorage(vmctx.Storage()), self.ProvingSet) if lerr != nil { - return nil, aerrors.Escalate(lerr, "could not load sector set node") + return nil, aerrors.Escalate(lerr, "could not load proving set node") } if err := ss.BatchDelete(params.DoneSet.All()); err != nil { @@ -416,7 +418,7 @@ func (sma StorageMinerActor) SubmitPoSt(act *types.Actor, vmctx types.VMContext, } oldPower := self.Power - self.Power = types.BigMul(types.NewInt(self.ProvingSetSize-uint64(len(faults))), + self.Power = types.BigMul(types.NewInt(pss.Count-uint64(len(faults))), mi.SectorSize) enc, err := SerializeParams(&UpdateStorageParams{Delta: types.BigSub(self.Power, oldPower)}) @@ -430,7 +432,7 @@ func (sma StorageMinerActor) SubmitPoSt(act *types.Actor, vmctx types.VMContext, } self.ProvingSet = self.Sectors - self.ProvingSetSize = self.SectorSetSize + self.ProvingPeriodEnd = nextProvingPeriodEnd self.NextDoneSet = params.DoneSet c, err := vmctx.Storage().Put(self) diff --git a/chain/actors/actors_test.go b/chain/actors/actors_test.go index 6611e8c51..5a3be6c13 100644 --- a/chain/actors/actors_test.go +++ b/chain/actors/actors_test.go @@ -51,7 +51,8 @@ func setupVMTestEnv(t *testing.T) (*vm.VM, []address.Address) { cs := store.NewChainStore(bs, nil) - vm, err := vm.NewVM(stateroot, 1, maddr, cs) + // TODO: should probabaly mock out the randomness bit, nil works for now + vm, err := vm.NewVM(stateroot, 1, nil, maddr, cs) if err != nil { t.Fatal(err) } diff --git a/chain/actors/cbor_gen.go b/chain/actors/cbor_gen.go index 381f22b7b..388ae29c8 100644 --- a/chain/actors/cbor_gen.go +++ b/chain/actors/cbor_gen.go @@ -197,7 +197,7 @@ func (t *StorageMinerActorState) MarshalCBOR(w io.Writer) error { _, err := w.Write(cbg.CborNull) return err } - if _, err := w.Write([]byte{142}); err != nil { + if _, err := w.Write([]byte{140}); err != nil { return err } @@ -223,22 +223,12 @@ func (t *StorageMinerActorState) MarshalCBOR(w io.Writer) error { return xerrors.Errorf("failed to write cid field t.Sectors: %w", err) } - // t.t.SectorSetSize (uint64) - if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, t.SectorSetSize)); err != nil { - return err - } - // t.t.ProvingSet (cid.Cid) if err := cbg.WriteCid(w, t.ProvingSet); err != nil { return xerrors.Errorf("failed to write cid field t.ProvingSet: %w", err) } - // t.t.ProvingSetSize (uint64) - if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, t.ProvingSetSize)); err != nil { - return err - } - // t.t.CurrentFaultSet (types.BitField) if err := t.CurrentFaultSet.MarshalCBOR(w); err != nil { return err @@ -287,7 +277,7 @@ func (t *StorageMinerActorState) UnmarshalCBOR(r io.Reader) error { return fmt.Errorf("cbor input should be of type array") } - if extra != 14 { + if extra != 12 { return fmt.Errorf("cbor input had wrong number of fields") } @@ -333,16 +323,6 @@ func (t *StorageMinerActorState) UnmarshalCBOR(r io.Reader) error { t.Sectors = c } - // t.t.SectorSetSize (uint64) - - maj, extra, err = cbg.CborReadHeader(br) - if err != nil { - return err - } - if maj != cbg.MajUnsignedInt { - return fmt.Errorf("wrong type for uint64 field") - } - t.SectorSetSize = extra // t.t.ProvingSet (cid.Cid) { @@ -355,16 +335,6 @@ func (t *StorageMinerActorState) UnmarshalCBOR(r io.Reader) error { t.ProvingSet = c } - // t.t.ProvingSetSize (uint64) - - maj, extra, err = cbg.CborReadHeader(br) - if err != nil { - return err - } - if maj != cbg.MajUnsignedInt { - return fmt.Errorf("wrong type for uint64 field") - } - t.ProvingSetSize = extra // t.t.CurrentFaultSet (types.BitField) { @@ -759,7 +729,20 @@ func (t *SubmitPoStParams) MarshalCBOR(w io.Writer) error { _, err := w.Write(cbg.CborNull) return err } - if _, err := w.Write([]byte{128}); err != nil { + if _, err := w.Write([]byte{130}); err != nil { + return err + } + + // t.t.Proof ([]uint8) + if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajByteString, uint64(len(t.Proof)))); err != nil { + return err + } + if _, err := w.Write(t.Proof); err != nil { + return err + } + + // t.t.DoneSet (types.BitField) + if err := t.DoneSet.MarshalCBOR(w); err != nil { return err } return nil @@ -776,10 +759,36 @@ func (t *SubmitPoStParams) UnmarshalCBOR(r io.Reader) error { return fmt.Errorf("cbor input should be of type array") } - if extra != 0 { + if extra != 2 { return fmt.Errorf("cbor input had wrong number of fields") } + // t.t.Proof ([]uint8) + + maj, extra, err = cbg.CborReadHeader(br) + if err != nil { + return err + } + if extra > 8192 { + return fmt.Errorf("array too large") + } + + if maj != cbg.MajByteString { + return fmt.Errorf("expected byte array") + } + t.Proof = make([]byte, extra) + if _, err := io.ReadFull(br, t.Proof); err != nil { + return err + } + // t.t.DoneSet (types.BitField) + + { + + if err := t.DoneSet.UnmarshalCBOR(br); err != nil { + return err + } + + } return nil } diff --git a/chain/actors/harness2_test.go b/chain/actors/harness2_test.go index 55d766a8c..3e08a2bc2 100644 --- a/chain/actors/harness2_test.go +++ b/chain/actors/harness2_test.go @@ -158,7 +158,7 @@ func NewHarness(t *testing.T, options ...HarnessOpt) *Harness { t.Fatal(err) } h.cs = store.NewChainStore(h.bs, nil) - h.vm, err = vm.NewVM(stateroot, 1, h.HI.Miner, h.cs) + h.vm, err = vm.NewVM(stateroot, 1, nil, h.HI.Miner, h.cs) if err != nil { t.Fatal(err) } diff --git a/chain/cbor_gen.go b/chain/cbor_gen.go index 0a263e20a..e7f2a40b8 100644 --- a/chain/cbor_gen.go +++ b/chain/cbor_gen.go @@ -472,109 +472,3 @@ func (t *BSTipSet) UnmarshalCBOR(br io.Reader) error { return nil } - -func (t *BlockMsg) MarshalCBOR(w io.Writer) error { - if _, err := w.Write([]byte{131}); err != nil { - return err - } - - // t.t.Header (types.BlockHeader) - if err := t.Header.MarshalCBOR(w); err != nil { - return err - } - - // t.t.BlsMessages ([]cid.Cid) - if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajArray, uint64(len(t.BlsMessages)))); err != nil { - return err - } - for _, v := range t.BlsMessages { - if err := cbg.WriteCid(w, v); err != nil { - return xerrors.Errorf("failed writing cid field t.BlsMessages: %w", err) - } - } - - // t.t.SecpkMessages ([]cid.Cid) - if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajArray, uint64(len(t.SecpkMessages)))); err != nil { - return err - } - for _, v := range t.SecpkMessages { - if err := cbg.WriteCid(w, v); err != nil { - return xerrors.Errorf("failed writing cid field t.SecpkMessages: %w", err) - } - } - return nil -} - -func (t *BlockMsg) UnmarshalCBOR(br io.Reader) error { - - maj, extra, err := cbg.CborReadHeader(br) - if err != nil { - return err - } - if maj != cbg.MajArray { - return fmt.Errorf("cbor input should be of type array") - } - - if extra != 3 { - return fmt.Errorf("cbor input had wrong number of fields") - } - - // t.t.Header (types.BlockHeader) - - t.Header = new(types.BlockHeader) - - if err := t.Header.UnmarshalCBOR(br); err != nil { - return err - } - // t.t.BlsMessages ([]cid.Cid) - - maj, extra, err = cbg.CborReadHeader(br) - if err != nil { - return err - } - if extra > 8192 { - return fmt.Errorf("array too large") - } - - if maj != cbg.MajArray { - return fmt.Errorf("expected cbor array") - } - if extra > 0 { - t.BlsMessages = make([]cid.Cid, extra) - } - for i := 0; i < int(extra); i++ { - - c, err := cbg.ReadCid(br) - if err != nil { - return xerrors.Errorf("reading cid field t.BlsMessages failed: %w", err) - } - t.BlsMessages[i] = c - } - - // t.t.SecpkMessages ([]cid.Cid) - - maj, extra, err = cbg.CborReadHeader(br) - if err != nil { - return err - } - if extra > 8192 { - return fmt.Errorf("array too large") - } - - if maj != cbg.MajArray { - return fmt.Errorf("expected cbor array") - } - if extra > 0 { - t.SecpkMessages = make([]cid.Cid, extra) - } - for i := 0; i < int(extra); i++ { - - c, err := cbg.ReadCid(br) - if err != nil { - return xerrors.Errorf("reading cid field t.SecpkMessages failed: %w", err) - } - t.SecpkMessages[i] = c - } - - return nil -} diff --git a/chain/events/events.go b/chain/events/events.go index f333de9d1..c58ab4e81 100644 --- a/chain/events/events.go +++ b/chain/events/events.go @@ -1,12 +1,17 @@ package events import ( + "context" "sync" + "time" + "github.com/ipfs/go-cid" logging "github.com/ipfs/go-log" "golang.org/x/xerrors" + "github.com/filecoin-project/go-lotus/api" "github.com/filecoin-project/go-lotus/build" + "github.com/filecoin-project/go-lotus/chain/store" "github.com/filecoin-project/go-lotus/chain/types" ) @@ -23,30 +28,32 @@ type heightHandler struct { revert RevertHandler } -type eventChainStore interface { - SubscribeHeadChanges(f func(rev, app []*types.TipSet) error) - - GetHeaviestTipSet() *types.TipSet - MessagesForBlock(b *types.BlockHeader) ([]*types.Message, []*types.SignedMessage, error) +type eventApi interface { + ChainNotify(context.Context) (<-chan []*store.HeadChange, error) + ChainGetBlockMessages(context.Context, cid.Cid) (*api.BlockMessages, error) + ChainGetTipSetByHeight(context.Context, uint64, *types.TipSet) (*types.TipSet, error) } type Events struct { - cs eventChainStore + api eventApi tsc *tipSetCache lk sync.Mutex + ready sync.WaitGroup + readyOnce sync.Once + heightEvents calledEvents } -func NewEvents(cs eventChainStore) *Events { +func NewEvents(ctx context.Context, api eventApi) *Events { gcConfidence := 2 * build.ForkLengthThreshold - tsc := newTSCache(gcConfidence) + tsc := newTSCache(gcConfidence, api.ChainGetTipSetByHeight) e := &Events{ - cs: cs, + api: api, tsc: tsc, @@ -60,7 +67,7 @@ func NewEvents(cs eventChainStore) *Events { }, calledEvents: calledEvents{ - cs: cs, + cs: api, tsc: tsc, gcConfidence: uint64(gcConfidence), @@ -72,14 +79,82 @@ func NewEvents(cs eventChainStore) *Events { }, } - _ = e.tsc.add(cs.GetHeaviestTipSet()) - cs.SubscribeHeadChanges(e.headChange) + e.ready.Add(1) + + go e.listenHeadChanges(ctx) + + e.ready.Wait() // TODO: cleanup/gc goroutine return e } +func (e *Events) listenHeadChanges(ctx context.Context) { + for { + if err := e.listenHeadChangesOnce(ctx); err != nil { + log.Errorf("listen head changes errored: %s", err) + } else { + log.Warn("listenHeadChanges quit") + } + if ctx.Err() != nil { + log.Warnf("not restarting listenHeadChanges: context error: %s", ctx.Err()) + return + } + time.Sleep(time.Second) + log.Info("restarting listenHeadChanges") + } +} + +func (e *Events) listenHeadChangesOnce(ctx context.Context) error { + notifs, err := e.api.ChainNotify(ctx) + if err != nil { + // TODO: retry + return xerrors.Errorf("listenHeadChanges ChainNotify call failed: %w", err) + } + + cur, ok := <-notifs // TODO: timeout? + if !ok { + return xerrors.Errorf("notification channel closed") + } + + if len(cur) != 1 { + return xerrors.Errorf("unexpected initial head notification length: %d", len(cur)) + } + + if cur[0].Type != store.HCCurrent { + return xerrors.Errorf("expected first head notification type to be 'current', was '%s'", cur[0].Type) + } + + if err := e.tsc.add(cur[0].Val); err != nil { + log.Warn("tsc.add: adding current tipset failed: %w", err) + } + + e.readyOnce.Do(func() { + e.ready.Done() + }) + + for notif := range notifs { + var rev, app []*types.TipSet + for _, notif := range notif { + switch notif.Type { + case store.HCRevert: + rev = append(rev, notif.Val) + case store.HCApply: + app = append(app, notif.Val) + default: + log.Warnf("unexpected head change notification type: '%s'", notif.Type) + } + } + + if err := e.headChange(rev, app); err != nil { + log.Warnf("headChange failed: %s", err) + } + } + + return nil +} + func (e *Events) headChange(rev, app []*types.TipSet) error { if len(app) == 0 { return xerrors.New("events.headChange expected at least one applied tipset") diff --git a/chain/events/events_called.go b/chain/events/events_called.go index f4789c6e1..5b94010cf 100644 --- a/chain/events/events_called.go +++ b/chain/events/events_called.go @@ -1,6 +1,7 @@ package events import ( + "context" "math" "sync" @@ -53,7 +54,7 @@ type queuedEvent struct { } type calledEvents struct { - cs eventChainStore + cs eventApi tsc *tipSetCache gcConfidence uint64 @@ -81,9 +82,6 @@ type callTuple struct { } func (e *calledEvents) headChangeCalled(rev, app []*types.TipSet) error { - e.lk.Lock() - defer e.lk.Unlock() - for _, ts := range rev { e.handleReverts(ts) } @@ -235,14 +233,15 @@ func (e *calledEvents) messagesForTs(ts *types.TipSet, consume func(*types.Messa seen := map[cid.Cid]struct{}{} for _, tsb := range ts.Blocks() { - bmsgs, smsgs, err := e.cs.MessagesForBlock(tsb) + + msgs, err := e.cs.ChainGetBlockMessages(context.TODO(), tsb.Cid()) if err != nil { log.Errorf("messagesForTs MessagesForBlock failed (ts.H=%d, Bcid:%s, B.Mcid:%s): %s", ts.Height(), tsb.Cid(), tsb.Messages, err) // this is quite bad, but probably better than missing all the other updates continue } - for _, m := range bmsgs { + for _, m := range msgs.BlsMessages { _, ok := seen[m.Cid()] if ok { continue @@ -252,7 +251,7 @@ func (e *calledEvents) messagesForTs(ts *types.TipSet, consume func(*types.Messa consume(m) } - for _, m := range smsgs { + for _, m := range msgs.SecpkMessages { _, ok := seen[m.Message.Cid()] if ok { continue diff --git a/chain/events/events_height.go b/chain/events/events_height.go index 8f12f626d..2fb3a4319 100644 --- a/chain/events/events_height.go +++ b/chain/events/events_height.go @@ -20,10 +20,7 @@ type heightEvents struct { } func (e *heightEvents) headChangeAt(rev, app []*types.TipSet) error { - e.lk.Lock() - defer e.lk.Unlock() - - // highest tipset is always the first (see cs.ReorgOps) + // highest tipset is always the first (see api.ReorgOps) newH := app[0].Height() for _, ts := range rev { diff --git a/chain/events/events_test.go b/chain/events/events_test.go index 37a1d653a..4628b5fae 100644 --- a/chain/events/events_test.go +++ b/chain/events/events_test.go @@ -1,8 +1,12 @@ package events import ( + "context" "fmt" + "github.com/filecoin-project/go-lotus/api" + "github.com/filecoin-project/go-lotus/chain/store" "testing" + "time" "github.com/ipfs/go-cid" "github.com/multiformats/go-multihash" @@ -29,9 +33,14 @@ type fakeCS struct { h uint64 tsc *tipSetCache - msgs map[cid.Cid]fakeMsg + msgs map[cid.Cid]fakeMsg + blkMsgs map[cid.Cid]cid.Cid - sub func(rev, app []*types.TipSet) error + sub func(rev, app []*types.TipSet) +} + +func (fcs *fakeCS) ChainGetTipSetByHeight(context.Context, uint64, *types.TipSet) (*types.TipSet, error) { + panic("Not Implemented") } func makeTs(t *testing.T, h uint64, msgcid cid.Cid) *types.TipSet { @@ -50,24 +59,44 @@ func makeTs(t *testing.T, h uint64, msgcid cid.Cid) *types.TipSet { return ts } -func (fcs *fakeCS) SubscribeHeadChanges(f func(rev, app []*types.TipSet) error) { - if fcs.sub != nil { - fcs.t.Fatal("sub should be nil") - } - fcs.sub = f -} +func (fcs *fakeCS) ChainNotify(context.Context) (<-chan []*store.HeadChange, error) { + out := make(chan []*store.HeadChange, 1) + out <- []*store.HeadChange{{Type: store.HCCurrent, Val: fcs.tsc.best()}} -func (fcs *fakeCS) GetHeaviestTipSet() *types.TipSet { - return fcs.tsc.best() -} + fcs.sub = func(rev, app []*types.TipSet) { + notif := make([]*store.HeadChange, len(rev)+len(app)) -func (fcs *fakeCS) MessagesForBlock(b *types.BlockHeader) ([]*types.Message, []*types.SignedMessage, error) { - ms, ok := fcs.msgs[b.Messages] - if ok { - return ms.bmsgs, ms.smsgs, nil + for i, r := range rev { + notif[i] = &store.HeadChange{ + Type: store.HCRevert, + Val: r, + } + } + for i, r := range app { + notif[i+len(rev)] = &store.HeadChange{ + Type: store.HCApply, + Val: r, + } + } + + out <- notif } - return []*types.Message{}, []*types.SignedMessage{}, nil + return out, nil +} + +func (fcs *fakeCS) ChainGetBlockMessages(ctx context.Context, blk cid.Cid) (*api.BlockMessages, error) { + messages, ok := fcs.blkMsgs[blk] + if !ok { + return &api.BlockMessages{}, nil + } + + ms, ok := fcs.msgs[messages] + if !ok { + return &api.BlockMessages{}, nil + } + return &api.BlockMessages{BlsMessages: ms.bmsgs, SecpkMessages: ms.smsgs}, nil + } func (fcs *fakeCS) fakeMsgs(m fakeMsg) cid.Cid { @@ -102,32 +131,36 @@ func (fcs *fakeCS) advance(rev, app int, msgs map[int]cid.Cid) { // todo: allow for i := 0; i < app; i++ { fcs.h++ - mc, _ := msgs[i] - if mc == cid.Undef { + mc, hasMsgs := msgs[i] + if !hasMsgs { mc = dummyCid } ts := makeTs(fcs.t, fcs.h, mc) require.NoError(fcs.t, fcs.tsc.add(ts)) + if hasMsgs { + fcs.blkMsgs[ts.Blocks()[0].Cid()] = mc + } + apps[app-i-1] = ts } - err := fcs.sub(revs, apps) - require.NoError(fcs.t, err) + fcs.sub(revs, apps) + time.Sleep(100 * time.Millisecond) // TODO: :c } -var _ eventChainStore = &fakeCS{} +var _ eventApi = &fakeCS{} func TestAt(t *testing.T) { fcs := &fakeCS{ t: t, h: 1, - tsc: newTSCache(2 * build.ForkLengthThreshold), + tsc: newTSCache(2*build.ForkLengthThreshold, nil), } require.NoError(t, fcs.tsc.add(makeTs(t, 1, dummyCid))) - events := NewEvents(fcs) + events := NewEvents(context.Background(), fcs) var applied bool var reverted bool @@ -178,17 +211,82 @@ func TestAt(t *testing.T) { require.Equal(t, false, reverted) } +func TestAtStart(t *testing.T) { + fcs := &fakeCS{ + t: t, + h: 1, + tsc: newTSCache(2*build.ForkLengthThreshold, nil), + } + require.NoError(t, fcs.tsc.add(makeTs(t, 1, dummyCid))) + + events := NewEvents(context.Background(), fcs) + + fcs.advance(0, 5, nil) // 6 + + var applied bool + var reverted bool + + err := events.ChainAt(func(ts *types.TipSet, curH uint64) error { + require.Equal(t, 5, int(ts.Height())) + require.Equal(t, 8, int(curH)) + applied = true + return nil + }, func(ts *types.TipSet) error { + reverted = true + return nil + }, 3, 5) + require.NoError(t, err) + + require.Equal(t, false, applied) + require.Equal(t, false, reverted) + + fcs.advance(0, 5, nil) // 11 + require.Equal(t, true, applied) + require.Equal(t, false, reverted) +} + +func TestAtStartConfidence(t *testing.T) { + fcs := &fakeCS{ + t: t, + h: 1, + tsc: newTSCache(2*build.ForkLengthThreshold, nil), + } + require.NoError(t, fcs.tsc.add(makeTs(t, 1, dummyCid))) + + events := NewEvents(context.Background(), fcs) + + fcs.advance(0, 10, nil) // 11 + + var applied bool + var reverted bool + + err := events.ChainAt(func(ts *types.TipSet, curH uint64) error { + require.Equal(t, 5, int(ts.Height())) + require.Equal(t, 11, int(curH)) + applied = true + return nil + }, func(ts *types.TipSet) error { + reverted = true + return nil + }, 3, 5) + require.NoError(t, err) + + require.Equal(t, true, applied) + require.Equal(t, false, reverted) +} + func TestCalled(t *testing.T) { fcs := &fakeCS{ t: t, h: 1, - msgs: map[cid.Cid]fakeMsg{}, - tsc: newTSCache(2 * build.ForkLengthThreshold), + msgs: map[cid.Cid]fakeMsg{}, + blkMsgs: map[cid.Cid]cid.Cid{}, + tsc: newTSCache(2*build.ForkLengthThreshold, nil), } require.NoError(t, fcs.tsc.add(makeTs(t, 1, dummyCid))) - events := NewEvents(fcs) + events := NewEvents(context.Background(), fcs) t0123, err := address.NewFromString("t0123") require.NoError(t, err) @@ -380,12 +478,13 @@ func TestCalledTimeout(t *testing.T) { t: t, h: 1, - msgs: map[cid.Cid]fakeMsg{}, - tsc: newTSCache(2 * build.ForkLengthThreshold), + msgs: map[cid.Cid]fakeMsg{}, + blkMsgs: map[cid.Cid]cid.Cid{}, + tsc: newTSCache(2*build.ForkLengthThreshold, nil), } require.NoError(t, fcs.tsc.add(makeTs(t, 1, dummyCid))) - events := NewEvents(fcs) + events := NewEvents(context.Background(), fcs) t0123, err := address.NewFromString("t0123") require.NoError(t, err) @@ -419,12 +518,13 @@ func TestCalledTimeout(t *testing.T) { t: t, h: 1, - msgs: map[cid.Cid]fakeMsg{}, - tsc: newTSCache(2 * build.ForkLengthThreshold), + msgs: map[cid.Cid]fakeMsg{}, + blkMsgs: map[cid.Cid]cid.Cid{}, + tsc: newTSCache(2*build.ForkLengthThreshold, nil), } require.NoError(t, fcs.tsc.add(makeTs(t, 1, dummyCid))) - events = NewEvents(fcs) + events = NewEvents(context.Background(), fcs) err = events.Called(func(ts *types.TipSet) (d bool, m bool, e error) { return true, true, nil @@ -452,12 +552,13 @@ func TestCalledOrder(t *testing.T) { t: t, h: 1, - msgs: map[cid.Cid]fakeMsg{}, - tsc: newTSCache(2 * build.ForkLengthThreshold), + msgs: map[cid.Cid]fakeMsg{}, + blkMsgs: map[cid.Cid]cid.Cid{}, + tsc: newTSCache(2*build.ForkLengthThreshold, nil), } require.NoError(t, fcs.tsc.add(makeTs(t, 1, dummyCid))) - events := NewEvents(fcs) + events := NewEvents(context.Background(), fcs) t0123, err := address.NewFromString("t0123") require.NoError(t, err) diff --git a/chain/events/tscache.go b/chain/events/tscache.go index 630aed0d0..e6d492bfa 100644 --- a/chain/events/tscache.go +++ b/chain/events/tscache.go @@ -1,24 +1,31 @@ package events import ( + "context" "golang.org/x/xerrors" "github.com/filecoin-project/go-lotus/chain/types" ) +type tsByHFunc func(context.Context, uint64, *types.TipSet) (*types.TipSet, error) + // tipSetCache implements a simple ring-buffer cache to keep track of recent // tipsets type tipSetCache struct { cache []*types.TipSet start int len int + + storage tsByHFunc } -func newTSCache(cap int) *tipSetCache { +func newTSCache(cap int, storage tsByHFunc) *tipSetCache { return &tipSetCache{ cache: make([]*types.TipSet, cap), start: 0, len: 0, + + storage: storage, } } @@ -63,15 +70,14 @@ func (tsc *tipSetCache) get(height uint64) (*types.TipSet, error) { return nil, xerrors.Errorf("tipSetCache.get: requested tipset not in cache (req: %d, cache head: %d)", height, headH) } - tailH := tsc.cache[(tsc.start-tsc.len+1)%len(tsc.cache)].Height() + clen := len(tsc.cache) + tailH := tsc.cache[((tsc.start-tsc.len+1)%clen+clen)%clen].Height() if height < tailH { - // TODO: we can try to walk parents, but that shouldn't happen in - // practice, so it's probably not worth implementing - return nil, xerrors.Errorf("tipSetCache.get: requested tipset not in cache (req: %d, cache tail: %d)", height, tailH) + return tsc.storage(context.TODO(), height, tsc.cache[tailH]) } - return tsc.cache[int(height-tailH+1)%len(tsc.cache)], nil + return tsc.cache[(int(height-tailH+1)%clen+clen)%clen], nil } func (tsc *tipSetCache) best() *types.TipSet { diff --git a/chain/gen/gen.go b/chain/gen/gen.go index 9a06af5af..d7d1bef86 100644 --- a/chain/gen/gen.go +++ b/chain/gen/gen.go @@ -360,7 +360,7 @@ type mca struct { } func (mca mca) ChainGetRandomness(ctx context.Context, pts *types.TipSet, ticks []*types.Ticket, lb int) ([]byte, error) { - return mca.sm.ChainStore().GetRandomness(ctx, pts, ticks, lb) + return mca.sm.ChainStore().GetRandomness(ctx, pts.Cids(), ticks, int64(lb)) } func (mca mca) StateMinerPower(ctx context.Context, maddr address.Address, ts *types.TipSet) (api.MinerPower, error) { diff --git a/chain/gen/mining.go b/chain/gen/mining.go index 0d484f4ca..af0adccb9 100644 --- a/chain/gen/mining.go +++ b/chain/gen/mining.go @@ -27,7 +27,8 @@ func MinerCreateBlock(ctx context.Context, sm *stmgr.StateManager, w *wallet.Wal height := parents.Height() + uint64(len(tickets)) - vmi, err := vm.NewVM(st, height, miner, sm.ChainStore()) + r := vm.NewChainRand(sm.ChainStore(), parents.Cids(), parents.Height(), tickets) + vmi, err := vm.NewVM(st, height, r, miner, sm.ChainStore()) if err != nil { return nil, err } diff --git a/chain/gen/utils.go b/chain/gen/utils.go index 7ee1053de..1a1f9fcb4 100644 --- a/chain/gen/utils.go +++ b/chain/gen/utils.go @@ -169,7 +169,7 @@ func mustEnc(i cbg.CBORMarshaler) []byte { } func SetupStorageMiners(ctx context.Context, cs *store.ChainStore, sroot cid.Cid, gmcfg *GenMinerCfg) (cid.Cid, error) { - vm, err := vm.NewVM(sroot, 0, actors.NetworkAddress, cs) + vm, err := vm.NewVM(sroot, 0, nil, actors.NetworkAddress, cs) if err != nil { return cid.Undef, xerrors.Errorf("failed to create NewVM: %w", err) } diff --git a/chain/messagepool.go b/chain/messagepool.go index 550aa8428..c15af4054 100644 --- a/chain/messagepool.go +++ b/chain/messagepool.go @@ -122,7 +122,7 @@ func (mp *MessagePool) getNonceLocked(addr address.Address) (uint64, error) { return mset.nextNonce, nil } - act, err := mp.sm.GetActor(addr) + act, err := mp.sm.GetActor(addr, nil) if err != nil { return 0, err } diff --git a/chain/stmgr/call.go b/chain/stmgr/call.go index 62c7acabe..a2fae8c63 100644 --- a/chain/stmgr/call.go +++ b/chain/stmgr/call.go @@ -12,8 +12,8 @@ import ( "golang.org/x/xerrors" ) -func (sm *StateManager) CallRaw(ctx context.Context, msg *types.Message, bstate cid.Cid, bheight uint64) (*types.MessageReceipt, error) { - vmi, err := vm.NewVM(bstate, bheight, actors.NetworkAddress, sm.cs) +func (sm *StateManager) CallRaw(ctx context.Context, msg *types.Message, bstate cid.Cid, r vm.Rand, bheight uint64) (*types.MessageReceipt, error) { + vmi, err := vm.NewVM(bstate, bheight, r, actors.NetworkAddress, sm.cs) if err != nil { return nil, xerrors.Errorf("failed to set up vm: %w", err) } @@ -58,7 +58,9 @@ func (sm *StateManager) Call(ctx context.Context, msg *types.Message, ts *types. return nil, err } - return sm.CallRaw(ctx, msg, state, ts.Height()) + r := vm.NewChainRand(sm.cs, ts.Cids(), ts.Height(), nil) + + return sm.CallRaw(ctx, msg, state, r, ts.Height()) } var errHaltExecution = fmt.Errorf("halt") diff --git a/chain/stmgr/stmgr.go b/chain/stmgr/stmgr.go index 70511fa5e..1d1afd846 100644 --- a/chain/stmgr/stmgr.go +++ b/chain/stmgr/stmgr.go @@ -77,7 +77,14 @@ func (sm *StateManager) computeTipSetState(ctx context.Context, blks []*types.Bl return cid.Undef, xerrors.Errorf("recursive TipSetState failed: %w", err) } - vmi, err := vm.NewVM(pstate, blks[0].Height, address.Undef, sm.cs) + cids := make([]cid.Cid, len(blks)) + for i, v := range blks { + cids[i] = v.Cid() + } + + r := vm.NewChainRand(sm.cs, cids, blks[0].Height, nil) + + vmi, err := vm.NewVM(pstate, blks[0].Height, r, address.Undef, sm.cs) if err != nil { return cid.Undef, xerrors.Errorf("instantiating VM failed: %w", err) } @@ -131,8 +138,11 @@ func (sm *StateManager) computeTipSetState(ctx context.Context, blks []*types.Bl return vmi.Flush(ctx) } -func (sm *StateManager) GetActor(addr address.Address) (*types.Actor, error) { - ts := sm.cs.GetHeaviestTipSet() +func (sm *StateManager) GetActor(addr address.Address, ts *types.TipSet) (*types.Actor, error) { + if ts == nil { + ts = sm.cs.GetHeaviestTipSet() + } + stcid, err := sm.TipSetState(ts.Cids()) if err != nil { return nil, xerrors.Errorf("tipset state: %w", err) @@ -147,8 +157,8 @@ func (sm *StateManager) GetActor(addr address.Address) (*types.Actor, error) { return state.GetActor(addr) } -func (sm *StateManager) GetBalance(addr address.Address) (types.BigInt, error) { - act, err := sm.GetActor(addr) +func (sm *StateManager) GetBalance(addr address.Address, ts *types.TipSet) (types.BigInt, error) { + act, err := sm.GetActor(addr, ts) if err != nil { return types.BigInt{}, xerrors.Errorf("get actor: %w", err) } @@ -160,8 +170,8 @@ func (sm *StateManager) ChainStore() *store.ChainStore { return sm.cs } -func (sm *StateManager) LoadActorState(ctx context.Context, a address.Address, out interface{}) (*types.Actor, error) { - act, err := sm.GetActor(a) +func (sm *StateManager) LoadActorState(ctx context.Context, a address.Address, out interface{}, ts *types.TipSet) (*types.Actor, error) { + act, err := sm.GetActor(a, ts) if err != nil { return nil, err } diff --git a/chain/stmgr/utils.go b/chain/stmgr/utils.go index da3f9e02d..07e6115ec 100644 --- a/chain/stmgr/utils.go +++ b/chain/stmgr/utils.go @@ -2,12 +2,18 @@ package stmgr import ( "context" - "github.com/libp2p/go-libp2p-core/peer" + "github.com/filecoin-project/go-lotus/api" "github.com/filecoin-project/go-lotus/chain/actors" "github.com/filecoin-project/go-lotus/chain/address" "github.com/filecoin-project/go-lotus/chain/types" + + amt "github.com/filecoin-project/go-amt-ipld" cid "github.com/ipfs/go-cid" + blockstore "github.com/ipfs/go-ipfs-blockstore" + cbor "github.com/ipfs/go-ipld-cbor" + "github.com/libp2p/go-libp2p-core/peer" + cbg "github.com/whyrusleeping/cbor-gen" "golang.org/x/xerrors" ) @@ -16,7 +22,7 @@ func GetMinerWorker(ctx context.Context, sm *StateManager, st cid.Cid, maddr add To: maddr, From: maddr, Method: actors.MAMethods.GetWorkerAddr, - }, st, 0) + }, st, nil, 0) if err != nil { return address.Undef, xerrors.Errorf("callRaw failed: %w", err) } @@ -42,7 +48,7 @@ func GetMinerOwner(ctx context.Context, sm *StateManager, st cid.Cid, maddr addr To: maddr, From: maddr, Method: actors.MAMethods.GetOwner, - }, st, 0) + }, st, nil, 0) if err != nil { return address.Undef, xerrors.Errorf("callRaw failed: %w", err) } @@ -122,3 +128,59 @@ func GetMinerPeerID(ctx context.Context, sm *StateManager, ts *types.TipSet, mad return peer.IDFromBytes(recp.Return) } + +func GetMinerProvingPeriodEnd(ctx context.Context, sm *StateManager, ts *types.TipSet, maddr address.Address) (uint64, error) { + var mas actors.StorageMinerActorState + _, err := sm.LoadActorState(ctx, maddr, &mas, ts) + if err != nil { + return 0, xerrors.Errorf("failed to load miner actor state: %w", err) + } + + return mas.ProvingPeriodEnd, nil +} + +func GetMinerProvingSet(ctx context.Context, sm *StateManager, ts *types.TipSet, maddr address.Address) ([]*api.SectorInfo, error) { + var mas actors.StorageMinerActorState + _, err := sm.LoadActorState(ctx, maddr, &mas, ts) + if err != nil { + return nil, xerrors.Errorf("failed to load miner actor state: %w", err) + } + + return LoadSectorsFromSet(ctx, sm.ChainStore().Blockstore(), mas.ProvingSet) +} + +func GetMinerSectorSet(ctx context.Context, sm *StateManager, ts *types.TipSet, maddr address.Address) ([]*api.SectorInfo, error) { + var mas actors.StorageMinerActorState + _, err := sm.LoadActorState(ctx, maddr, &mas, ts) + if err != nil { + return nil, xerrors.Errorf("failed to load miner actor state: %w", err) + } + + return LoadSectorsFromSet(ctx, sm.ChainStore().Blockstore(), mas.Sectors) +} + +func LoadSectorsFromSet(ctx context.Context, bs blockstore.Blockstore, ssc cid.Cid) ([]*api.SectorInfo, error) { + blks := amt.WrapBlockstore(bs) + a, err := amt.LoadAMT(blks, ssc) + if err != nil { + return nil, err + } + + var sset []*api.SectorInfo + if err := a.ForEach(func(i uint64, v *cbg.Deferred) error { + var comms [][]byte + if err := cbor.DecodeInto(v.Raw, &comms); err != nil { + return err + } + sset = append(sset, &api.SectorInfo{ + SectorID: i, + CommR: comms[0], + CommD: comms[1], + }) + return nil + }); err != nil { + return nil, err + } + + return sset, nil +} diff --git a/chain/store/store.go b/chain/store/store.go index 590f5a402..e95ccb2a7 100644 --- a/chain/store/store.go +++ b/chain/store/store.go @@ -5,6 +5,7 @@ import ( "crypto/sha256" "encoding/json" "fmt" + "github.com/filecoin-project/go-lotus/build" "sync" amt "github.com/filecoin-project/go-amt-ipld" @@ -35,6 +36,7 @@ type ChainStore struct { heaviest *types.TipSet bestTips *pubsub.PubSub + pubLk sync.Mutex tstLk sync.Mutex tipsets map[uint64][]cid.Cid @@ -51,18 +53,25 @@ func NewChainStore(bs bstore.Blockstore, ds dstore.Batching) *ChainStore { } hcnf := func(rev, app []*types.TipSet) error { - for _, r := range rev { - cs.bestTips.Pub(&HeadChange{ + cs.pubLk.Lock() + defer cs.pubLk.Unlock() + + notif := make([]*HeadChange, len(rev)+len(app)) + + for i, r := range rev { + notif[i] = &HeadChange{ Type: HCRevert, Val: r, - }, "headchange") + } } - for _, r := range app { - cs.bestTips.Pub(&HeadChange{ + for i, r := range app { + notif[i+len(rev)] = &HeadChange{ Type: HCApply, Val: r, - }, "headchange") + } } + + cs.bestTips.Pub(notif, "headchange") return nil } @@ -109,21 +118,10 @@ func (cs *ChainStore) writeHead(ts *types.TipSet) error { return nil } -func (cs *ChainStore) SubNewTips() chan *types.TipSet { - subch := cs.bestTips.Sub("best") - out := make(chan *types.TipSet) - go func() { - defer close(out) - for val := range subch { - out <- val.(*types.TipSet) - } - }() - return out -} - const ( - HCRevert = "revert" - HCApply = "apply" + HCRevert = "revert" + HCApply = "apply" + HCCurrent = "current" ) type HeadChange struct { @@ -131,9 +129,18 @@ type HeadChange struct { Val *types.TipSet } -func (cs *ChainStore) SubHeadChanges(ctx context.Context) chan *HeadChange { +func (cs *ChainStore) SubHeadChanges(ctx context.Context) chan []*HeadChange { + cs.pubLk.Lock() subch := cs.bestTips.Sub("headchange") - out := make(chan *HeadChange, 16) + head := cs.GetHeaviestTipSet() + cs.pubLk.Unlock() + + out := make(chan []*HeadChange, 16) + out <- []*HeadChange{{ + Type: HCCurrent, + Val: head, + }} + go func() { defer close(out) for { @@ -143,8 +150,11 @@ func (cs *ChainStore) SubHeadChanges(ctx context.Context) chan *HeadChange { log.Warn("chain head sub exit loop") return } + if len(out) > 0 { + log.Warnf("head change sub is slow, has %d buffered entries", len(out)) + } select { - case out <- val.(*HeadChange): + case out <- val.([]*HeadChange): case <-ctx.Done(): } case <-ctx.Done(): @@ -597,20 +607,22 @@ func (cs *ChainStore) WaitForMessage(ctx context.Context, mcid cid.Cid) (cid.Cid for { select { - case val, ok := <-tsub: + case notif, ok := <-tsub: if !ok { return cid.Undef, nil, ctx.Err() } - switch val.Type { - case HCRevert: - continue - case HCApply: - bc, r, err := cs.tipsetContainsMsg(val.Val, mcid) - if err != nil { - return cid.Undef, nil, err - } - if r != nil { - return bc, r, nil + for _, val := range notif { + switch val.Type { + case HCRevert: + continue + case HCApply: + bc, r, err := cs.tipsetContainsMsg(val.Val, mcid) + if err != nil { + return cid.Undef, nil, err + } + if r != nil { + return bc, r, nil + } } } case <-ctx.Done(): @@ -690,31 +702,36 @@ func (cs *ChainStore) TryFillTipSet(ts *types.TipSet) (*FullTipSet, error) { return NewFullTipSet(out), nil } -func (cs *ChainStore) GetRandomness(ctx context.Context, pts *types.TipSet, tickets []*types.Ticket, lb int) ([]byte, error) { - if lb < len(tickets) { +func (cs *ChainStore) GetRandomness(ctx context.Context, blks []cid.Cid, tickets []*types.Ticket, lb int64) ([]byte, error) { + if lb < 0 { + return nil, fmt.Errorf("negative lookback parameters are not valid (got %d)", lb) + } + lt := int64(len(tickets)) + if lb < lt { log.Warn("self sampling randomness. this should be extremely rare, if you see this often it may be a bug") - t := tickets[len(tickets)-(1+lb)] + t := tickets[lt-(1+lb)] return t.VDFResult, nil } - nv := lb - len(tickets) + nv := lb - lt - nextCids := pts.Cids() for { - nts, err := cs.LoadTipSet(nextCids) + nts, err := cs.LoadTipSet(blks) if err != nil { return nil, err } mtb := nts.MinTicketBlock() - if nv < len(mtb.Tickets) { - t := mtb.Tickets[len(mtb.Tickets)-(1+nv)] + lt := int64(len(mtb.Tickets)) + if nv < lt { + t := mtb.Tickets[lt-(1+nv)] + log.Infof("Returning randomness: H:%d, t:%d, mtb:%s", nts.Height(), lt-(1+nv), mtb.Cid()) return t.VDFResult, nil } - nv -= len(mtb.Tickets) + nv -= lt // special case for lookback behind genesis block // TODO(spec): this is not in the spec, need to sync that @@ -723,13 +740,40 @@ func (cs *ChainStore) GetRandomness(ctx context.Context, pts *types.TipSet, tick t := mtb.Tickets[0] rval := t.VDFResult - for i := 0; i < nv; i++ { + for i := int64(0); i < nv; i++ { h := sha256.Sum256(rval) rval = h[:] } return rval, nil } - nextCids = mtb.Parents + blks = mtb.Parents + } +} + +func (cs *ChainStore) GetTipsetByHeight(ctx context.Context, h uint64, ts *types.TipSet) (*types.TipSet, error) { + if ts == nil { + ts = cs.GetHeaviestTipSet() + } + + if h > ts.Height() { + return nil, xerrors.Errorf("looking for tipset with height less than start point") + } + + if ts.Height()-h > build.ForkLengthThreshold { + log.Warnf("expensive call to GetTipsetByHeight, seeking %d levels", ts.Height()-h) + } + + for { + mtb := ts.MinTicketBlock() + if h >= ts.Height()-uint64(len(mtb.Tickets)) { + return ts, nil + } + + pts, err := cs.LoadTipSet(ts.Parents()) + if err != nil { + return nil, err + } + ts = pts } } diff --git a/chain/sub/incoming.go b/chain/sub/incoming.go index d41eac887..5bcadf16f 100644 --- a/chain/sub/incoming.go +++ b/chain/sub/incoming.go @@ -2,6 +2,7 @@ package sub import ( "context" + logging "github.com/ipfs/go-log" pubsub "github.com/libp2p/go-libp2p-pubsub" @@ -23,7 +24,7 @@ func HandleIncomingBlocks(ctx context.Context, bsub *pubsub.Subscription, s *cha continue } - blk, err := chain.DecodeBlockMsg(msg.GetData()) + blk, err := types.DecodeBlockMsg(msg.GetData()) if err != nil { log.Error("got invalid block over pubsub: ", err) continue diff --git a/chain/sync.go b/chain/sync.go index 3af36f57f..1ebb2b9e9 100644 --- a/chain/sync.go +++ b/chain/sync.go @@ -313,7 +313,7 @@ func (syncer *Syncer) ValidateTipSet(ctx context.Context, fts *store.FullTipSet) for _, b := range fts.Blocks { if err := syncer.ValidateBlock(ctx, b); err != nil { - return err + return xerrors.Errorf("validating block %s: %w", b.Cid(), err) } } return nil @@ -422,7 +422,7 @@ func (syncer *Syncer) ValidateBlock(ctx context.Context, b *types.FullBlock) err return xerrors.Errorf("validating block tickets failed: %w", err) } - rand, err := syncer.sm.ChainStore().GetRandomness(ctx, baseTs, h.Tickets, build.RandomnessLookback) + rand, err := syncer.sm.ChainStore().GetRandomness(ctx, baseTs.Cids(), h.Tickets, build.RandomnessLookback) if err != nil { return xerrors.Errorf("failed to get randomness for verifying election proof: %w", err) } @@ -440,7 +440,8 @@ func (syncer *Syncer) ValidateBlock(ctx context.Context, b *types.FullBlock) err return xerrors.Errorf("miner created a block but was not a winner") } - vmi, err := vm.NewVM(stateroot, h.Height, h.Miner, syncer.store) + r := vm.NewChainRand(syncer.store, baseTs.Cids(), baseTs.Height(), h.Tickets) + vmi, err := vm.NewVM(stateroot, h.Height, r, h.Miner, syncer.store) if err != nil { return xerrors.Errorf("failed to instantiate VM: %w", err) } diff --git a/chain/sync_test.go b/chain/sync_test.go index 75247493d..47b8f4ec3 100644 --- a/chain/sync_test.go +++ b/chain/sync_test.go @@ -10,7 +10,6 @@ import ( "github.com/stretchr/testify/require" "github.com/filecoin-project/go-lotus/api" - "github.com/filecoin-project/go-lotus/chain" "github.com/filecoin-project/go-lotus/chain/gen" "github.com/filecoin-project/go-lotus/chain/store" "github.com/filecoin-project/go-lotus/chain/types" @@ -105,8 +104,8 @@ func (tu *syncTestUtil) mineNewBlock(src int) { } } -func fblkToBlkMsg(fb *types.FullBlock) *chain.BlockMsg { - out := &chain.BlockMsg{ +func fblkToBlkMsg(fb *types.FullBlock) *types.BlockMsg { + out := &types.BlockMsg{ Header: fb.Header, } @@ -224,9 +223,11 @@ func (tu *syncTestUtil) waitUntilSync(from, to int) { } // TODO: some sort of timeout? - for c := range hc { - if c.Val.Equals(target) { - return + for n := range hc { + for _, c := range n { + if c.Val.Equals(target) { + return + } } } } diff --git a/chain/types/bigint.go b/chain/types/bigint.go index 061cdf8a4..de95d0a61 100644 --- a/chain/types/bigint.go +++ b/chain/types/bigint.go @@ -107,13 +107,12 @@ func (bi *BigInt) MarshalCBOR(w io.Writer) error { return zero.MarshalCBOR(w) } + tag := uint64(2) if bi.Sign() < 0 { - // right now we don't support negative integers. - // In the spec, everything is listed as a Uint. - return fmt.Errorf("BigInt does not support negative integers") + tag = 3 } - header := cbg.CborEncodeMajorType(cbg.MajTag, 2) + header := cbg.CborEncodeMajorType(cbg.MajTag, tag) if _, err := w.Write(header); err != nil { return err } @@ -138,10 +137,12 @@ func (bi *BigInt) UnmarshalCBOR(br io.Reader) error { return err } - if maj != cbg.MajTag && extra != 2 { + if maj != cbg.MajTag && extra != 2 && extra != 3 { return fmt.Errorf("cbor input for big int was not a tagged big int") } + minus := extra & 1 + maj, extra, err = cbg.CborReadHeader(br) if err != nil { return err @@ -161,6 +162,9 @@ func (bi *BigInt) UnmarshalCBOR(br io.Reader) error { } bi.Int = big.NewInt(0).SetBytes(buf) + if minus > 0 { + bi.Int.Neg(bi.Int) + } return nil } diff --git a/chain/types/bitfield.go b/chain/types/bitfield.go index c59768ac1..347895f09 100644 --- a/chain/types/bitfield.go +++ b/chain/types/bitfield.go @@ -17,6 +17,14 @@ func NewBitField() BitField { return BitField{bits: make(map[uint64]struct{})} } +func BitFieldFromSet(setBits []uint64) BitField { + res := BitField{bits: make(map[uint64]struct{})} + for _, b := range setBits { + res.bits[b] = struct{}{} + } + return res +} + // Set ...s bit in the BitField func (bf BitField) Set(bit uint64) { bf.bits[bit] = struct{}{} diff --git a/chain/blockmsg.go b/chain/types/blockmsg.go similarity index 83% rename from chain/blockmsg.go rename to chain/types/blockmsg.go index 22afced32..f3114499d 100644 --- a/chain/blockmsg.go +++ b/chain/types/blockmsg.go @@ -1,15 +1,13 @@ -package chain +package types import ( "bytes" "github.com/ipfs/go-cid" - - "github.com/filecoin-project/go-lotus/chain/types" ) type BlockMsg struct { - Header *types.BlockHeader + Header *BlockHeader BlsMessages []cid.Cid SecpkMessages []cid.Cid } diff --git a/chain/types/cbor_gen.go b/chain/types/cbor_gen.go index 81a23081c..be91a35c7 100644 --- a/chain/types/cbor_gen.go +++ b/chain/types/cbor_gen.go @@ -1197,3 +1197,128 @@ func (t *MessageReceipt) UnmarshalCBOR(r io.Reader) error { } return nil } + +func (t *BlockMsg) MarshalCBOR(w io.Writer) error { + if t == nil { + _, err := w.Write(cbg.CborNull) + return err + } + if _, err := w.Write([]byte{131}); err != nil { + return err + } + + // t.t.Header (types.BlockHeader) + if err := t.Header.MarshalCBOR(w); err != nil { + return err + } + + // t.t.BlsMessages ([]cid.Cid) + if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajArray, uint64(len(t.BlsMessages)))); err != nil { + return err + } + for _, v := range t.BlsMessages { + if err := cbg.WriteCid(w, v); err != nil { + return xerrors.Errorf("failed writing cid field t.BlsMessages: %w", err) + } + } + + // t.t.SecpkMessages ([]cid.Cid) + if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajArray, uint64(len(t.SecpkMessages)))); err != nil { + return err + } + for _, v := range t.SecpkMessages { + if err := cbg.WriteCid(w, v); err != nil { + return xerrors.Errorf("failed writing cid field t.SecpkMessages: %w", err) + } + } + return nil +} + +func (t *BlockMsg) UnmarshalCBOR(r io.Reader) error { + br := cbg.GetPeeker(r) + + maj, extra, err := cbg.CborReadHeader(br) + if err != nil { + return err + } + if maj != cbg.MajArray { + return fmt.Errorf("cbor input should be of type array") + } + + if extra != 3 { + return fmt.Errorf("cbor input had wrong number of fields") + } + + // t.t.Header (types.BlockHeader) + + { + + pb, err := br.PeekByte() + if err != nil { + return err + } + if pb == cbg.CborNull[0] { + var nbuf [1]byte + if _, err := br.Read(nbuf[:]); err != nil { + return err + } + } else { + t.Header = new(BlockHeader) + if err := t.Header.UnmarshalCBOR(br); err != nil { + return err + } + } + + } + // t.t.BlsMessages ([]cid.Cid) + + maj, extra, err = cbg.CborReadHeader(br) + if err != nil { + return err + } + if extra > 8192 { + return fmt.Errorf("array too large") + } + + if maj != cbg.MajArray { + return fmt.Errorf("expected cbor array") + } + if extra > 0 { + t.BlsMessages = make([]cid.Cid, extra) + } + for i := 0; i < int(extra); i++ { + + c, err := cbg.ReadCid(br) + if err != nil { + return xerrors.Errorf("reading cid field t.BlsMessages failed: %w", err) + } + t.BlsMessages[i] = c + } + + // t.t.SecpkMessages ([]cid.Cid) + + maj, extra, err = cbg.CborReadHeader(br) + if err != nil { + return err + } + if extra > 8192 { + return fmt.Errorf("array too large") + } + + if maj != cbg.MajArray { + return fmt.Errorf("expected cbor array") + } + if extra > 0 { + t.SecpkMessages = make([]cid.Cid, extra) + } + for i := 0; i < int(extra); i++ { + + c, err := cbg.ReadCid(br) + if err != nil { + return xerrors.Errorf("reading cid field t.SecpkMessages failed: %w", err) + } + t.SecpkMessages[i] = c + } + + return nil +} diff --git a/chain/types/tipset.go b/chain/types/tipset.go index 2d763650a..5484b476a 100644 --- a/chain/types/tipset.go +++ b/chain/types/tipset.go @@ -128,3 +128,12 @@ func (ts *TipSet) MinTicketBlock() *BlockHeader { return min } + +func (ts *TipSet) Contains(oc cid.Cid) bool { + for _, c := range ts.cids { + if c == oc { + return true + } + } + return false +} diff --git a/chain/vm/vm.go b/chain/vm/vm.go index 9bf902d84..c83187503 100644 --- a/chain/vm/vm.go +++ b/chain/vm/vm.go @@ -67,8 +67,8 @@ func (vmc *VMContext) Message() *types.Message { } func (vmc *VMContext) GetRandomness(height uint64) ([]byte, aerrors.ActorError) { - relHeight := int(vmc.BlockHeight()) - int(height) - res, err := vmc.vm.cs.GetRandomness(vmc.ctx, vmc.vm.cs.GetHeaviestTipSet(), nil, relHeight) + + res, err := vmc.vm.rand.GetRandomness(vmc.ctx, int64(height)) if err != nil { return nil, aerrors.Escalate(err, "could not get randomness") } @@ -288,9 +288,10 @@ type VM struct { blockHeight uint64 blockMiner address.Address inv *invoker + rand Rand } -func NewVM(base cid.Cid, height uint64, maddr address.Address, cs *store.ChainStore) (*VM, error) { +func NewVM(base cid.Cid, height uint64, r Rand, maddr address.Address, cs *store.ChainStore) (*VM, error) { buf := bufbstore.NewBufferedBstore(cs.Blockstore()) cst := hamt.CSTFromBstore(buf) state, err := state.LoadStateTree(cst, base) @@ -307,9 +308,35 @@ func NewVM(base cid.Cid, height uint64, maddr address.Address, cs *store.ChainSt blockHeight: height, blockMiner: maddr, inv: newInvoker(), + rand: r, }, nil } +type Rand interface { + GetRandomness(ctx context.Context, h int64) ([]byte, error) +} + +type chainRand struct { + cs *store.ChainStore + blks []cid.Cid + bh uint64 + tickets []*types.Ticket +} + +func NewChainRand(cs *store.ChainStore, blks []cid.Cid, bheight uint64, tickets []*types.Ticket) Rand { + return &chainRand{ + cs: cs, + blks: blks, + bh: bheight, + tickets: tickets, + } +} + +func (cr *chainRand) GetRandomness(ctx context.Context, h int64) ([]byte, error) { + lb := (int64(cr.bh) + int64(len(cr.tickets))) - h + return cr.cs.GetRandomness(ctx, cr.blks, cr.tickets, lb) +} + type ApplyRet struct { types.MessageReceipt ActorErr aerrors.ActorError diff --git a/cli/state.go b/cli/state.go index d9f07559b..942380a31 100644 --- a/cli/state.go +++ b/cli/state.go @@ -106,7 +106,7 @@ var stateProvingSetCmd = &cli.Command{ return err } - sectors, err := api.StateMinerProvingSet(ctx, maddr) + sectors, err := api.StateMinerProvingSet(ctx, maddr, nil) if err != nil { return err } diff --git a/gen/main.go b/gen/main.go index 56323cde3..1ef741e5c 100644 --- a/gen/main.go +++ b/gen/main.go @@ -21,6 +21,7 @@ func main() { types.Merge{}, types.Actor{}, types.MessageReceipt{}, + types.BlockMsg{}, ) if err != nil { fmt.Println(err) @@ -32,7 +33,6 @@ func main() { chain.BlockSyncRequest{}, chain.BlockSyncResponse{}, chain.BSTipSet{}, - chain.BlockMsg{}, ) if err != nil { fmt.Println(err) diff --git a/lib/sectorbuilder/sectorbuilder.go b/lib/sectorbuilder/sectorbuilder.go index 2d22dd884..12d1bd172 100644 --- a/lib/sectorbuilder/sectorbuilder.go +++ b/lib/sectorbuilder/sectorbuilder.go @@ -17,6 +17,10 @@ type SectorSealingStatus = sectorbuilder.SectorSealingStatus type StagedSectorMetadata = sectorbuilder.StagedSectorMetadata +type SortedSectorInfo = sectorbuilder.SortedSectorInfo + +type SectorInfo = sectorbuilder.SectorInfo + const CommLen = sectorbuilder.CommitmentBytesLen type SectorBuilder struct { @@ -81,7 +85,7 @@ func (sb *SectorBuilder) GetAllStagedSectors() ([]StagedSectorMetadata, error) { return sectorbuilder.GetAllStagedSectors(sb.handle) } -func (sb *SectorBuilder) GeneratePoSt(sectorInfo sectorbuilder.SortedSectorInfo, challengeSeed [CommLen]byte, faults []uint64) ([]byte, error) { +func (sb *SectorBuilder) GeneratePoSt(sectorInfo SortedSectorInfo, challengeSeed [CommLen]byte, faults []uint64) ([]byte, error) { // Wait, this is a blocking method with no way of interrupting it? // does it checkpoint itself? return sectorbuilder.GeneratePoSt(sb.handle, sectorInfo, challengeSeed, faults) @@ -107,9 +111,6 @@ func VerifyPieceInclusionProof(sectorSize uint64, pieceSize uint64, commP []byte return sectorbuilder.VerifyPieceInclusionProof(sectorSize, pieceSize, commPa, commDa, proof) } -type SortedSectorInfo = sectorbuilder.SortedSectorInfo -type SectorInfo = sectorbuilder.SectorInfo - func NewSortedSectorInfo(sectors []SectorInfo) SortedSectorInfo { return sectorbuilder.NewSortedSectorInfo(sectors...) } diff --git a/lotuspond/front/src/Address.js b/lotuspond/front/src/Address.js index b41f3cdcb..64984716d 100644 --- a/lotuspond/front/src/Address.js +++ b/lotuspond/front/src/Address.js @@ -84,16 +84,16 @@ class Address extends React.Component { return info } - add10k = async () => { - [...Array(10).keys()].map(() => async () => await this.props.add1k(this.props.addr)).reduce(async (p, c) => [await p, await c()], Promise.resolve(null)) + add200k = async () => { + [...Array(10).keys()].map(() => async () => await this.props.add20k(this.props.addr)).reduce(async (p, c) => [await p, await c()], Promise.resolve(null)) } render() { - let add1k = - if(this.props.add1k) { - add1k =   this.props.add1k(this.props.addr)}>[+1k] + let add20k = + if(this.props.add20k) { + add20k =   this.props.add20k(this.props.addr)}>[+20k] if (this.props.add10k) { - add1k = {add1k} [+10k] + add20k = {add20k} [+200k] } } let addr = truncAddr(this.props.addr, this.props.short ? 12 : 17) @@ -133,7 +133,7 @@ class Address extends React.Component { minerInfo =  Power: {this.state.minerInfo.MinerPower} ({this.state.minerInfo.MinerPower/this.state.minerInfo.TotalPower*100}%) } - return {addr}{balance}{actInfo}{nonce}{add1k}{transfer}{minerInfo} + return {addr}{balance}{actInfo}{nonce}{add20k}{transfer}{minerInfo} } } diff --git a/lotuspond/front/src/FullNode.js b/lotuspond/front/src/FullNode.js index edf620579..29a805273 100644 --- a/lotuspond/front/src/FullNode.js +++ b/lotuspond/front/src/FullNode.js @@ -15,7 +15,7 @@ class FullNode extends React.Component { this.newSecpAddr = this.newSecpAddr.bind(this) this.newBLSAddr = this.newBLSAddr.bind(this) this.startStorageMiner = this.startStorageMiner.bind(this) - this.add1k = this.add1k.bind(this) + this.add20k = this.add20k.bind(this) this.explorer = this.explorer.bind(this) this.client = this.client.bind(this) this.stop = this.stop.bind(this) @@ -84,8 +84,8 @@ class FullNode extends React.Component { this.props.spawnStorageNode(this.props.node.Repo, this.props.client) } - async add1k(to) { - await this.props.give1k(to) + async add20k(to) { + await this.props.give20k(to) } explorer() { @@ -123,14 +123,14 @@ class FullNode extends React.Component { let storageMine = let addresses = this.state.addrs.map((addr) => { - let line =
+ let line =
if (this.state.defaultAddr === addr) { line = {line} } return
{line}
}) let paychannels = this.state.paychs.map((addr, ak) => { - const line =
+ const line =
const vouchers = this.state.vouchers[ak].map(voucher => { let extra = if(voucher.Extra) { diff --git a/lotuspond/front/src/NodeList.js b/lotuspond/front/src/NodeList.js index 093a336c4..0466a5eb9 100644 --- a/lotuspond/front/src/NodeList.js +++ b/lotuspond/front/src/NodeList.js @@ -27,7 +27,7 @@ class NodeList extends React.Component { this.spawnStorageNode = this.spawnStorageNode.bind(this) this.connMgr = this.connMgr.bind(this) this.consensus = this.consensus.bind(this) - this.transfer1kFrom1 = this.transfer1kFrom1.bind(this) + this.transfer20kFrom1 = this.transfer20kFrom1.bind(this) this.getNodes() } @@ -52,7 +52,7 @@ class NodeList extends React.Component { node={{...node}} client={client} pondClient={this.props.client} - give1k={this.transfer1kFrom1} + give20k={this.transfer20kFrom1} mountWindow={this.props.mountWindow} spawnStorageNode={this.spawnStorageNode} stop={this.stopNode(node.ID, onClose)} @@ -81,7 +81,7 @@ class NodeList extends React.Component { this.setState({existingLoaded: true, nodes: nodes}) } - async transfer1kFrom1(to) { + async transfer20kFrom1(to) { const addrss = await this.state.nodes[1].conn.call('Filecoin.WalletList', []) const [bestaddr, bal] = await addrss.map(async addr => { let balance = 0 @@ -96,7 +96,7 @@ class NodeList extends React.Component { await pushMessage(this.state.nodes[1].conn, bestaddr, { To: to, From: bestaddr, - Value: "1000", + Value: "20000", }) } diff --git a/lotuspond/front/src/StorageNode.js b/lotuspond/front/src/StorageNode.js index 2c813605c..ed7dcc66b 100644 --- a/lotuspond/front/src/StorageNode.js +++ b/lotuspond/front/src/StorageNode.js @@ -59,7 +59,6 @@ class StorageNode extends React.Component { // this.props.onConnect(client, id) // TODO: dedupe connecting part - this.loadInfo() let updates = setInterval(this.loadInfo, 1050) client.on('close', () => clearInterval(updates)) }) @@ -72,7 +71,10 @@ class StorageNode extends React.Component { const peers = await this.state.client.call("Filecoin.NetPeers", []) const [actor] = await this.state.client.call("Filecoin.ActorAddresses", []) - this.setState({version: version, peers: peers.length, actor: actor}) + const stActor = await this.props.fullConn.call('Filecoin.StateGetActor', [actor, null]) + const actorState = await this.props.fullConn.call('Filecoin.StateReadState', [stActor, null]) + + this.setState({version: version, peers: peers.length, actor: actor, actorState: actorState}) await this.stagedList() } @@ -109,6 +111,7 @@ class StorageNode extends React.Component {
+  PPE: {this.state.actorState.State.ProvingPeriodEnd}
{this.state.statusCounts.map((c, i) => {sealCodes[i]}: {c} | )}
diff --git a/miner/miner.go b/miner/miner.go index cf2b0e4ef..b9e1c04b1 100644 --- a/miner/miner.go +++ b/miner/miner.go @@ -6,7 +6,6 @@ import ( "time" "github.com/filecoin-project/go-lotus/build" - chain "github.com/filecoin-project/go-lotus/chain" "github.com/filecoin-project/go-lotus/chain/actors" "github.com/filecoin-project/go-lotus/chain/address" "github.com/filecoin-project/go-lotus/chain/gen" @@ -191,7 +190,7 @@ func (m *Miner) GetBestMiningCandidate() (*MiningBase, error) { }, nil } -func (m *Miner) mineOne(ctx context.Context, base *MiningBase) (*chain.BlockMsg, error) { +func (m *Miner) mineOne(ctx context.Context, base *MiningBase) (*types.BlockMsg, error) { log.Debug("attempting to mine a block on:", base.ts.Cids()) ticket, err := m.scratchTicket(ctx, base) if err != nil { @@ -288,7 +287,7 @@ func (m *Miner) scratchTicket(ctx context.Context, base *MiningBase) (*types.Tic }, nil } -func (m *Miner) createBlock(base *MiningBase, ticket *types.Ticket, proof types.ElectionProof) (*chain.BlockMsg, error) { +func (m *Miner) createBlock(base *MiningBase, ticket *types.Ticket, proof types.ElectionProof) (*types.BlockMsg, error) { pending, err := m.api.MpoolPending(context.TODO(), base.ts) if err != nil { diff --git a/node/impl/full/chain.go b/node/impl/full/chain.go index 0743cff55..51de87165 100644 --- a/node/impl/full/chain.go +++ b/node/impl/full/chain.go @@ -4,7 +4,6 @@ import ( "context" "github.com/filecoin-project/go-lotus/api" - "github.com/filecoin-project/go-lotus/chain" "github.com/filecoin-project/go-lotus/chain/store" "github.com/filecoin-project/go-lotus/chain/types" "golang.org/x/xerrors" @@ -23,11 +22,11 @@ type ChainAPI struct { PubSub *pubsub.PubSub } -func (a *ChainAPI) ChainNotify(ctx context.Context) (<-chan *store.HeadChange, error) { +func (a *ChainAPI) ChainNotify(ctx context.Context) (<-chan []*store.HeadChange, error) { return a.Chain.SubHeadChanges(ctx), nil } -func (a *ChainAPI) ChainSubmitBlock(ctx context.Context, blk *chain.BlockMsg) error { +func (a *ChainAPI) ChainSubmitBlock(ctx context.Context, blk *types.BlockMsg) error { if err := a.Chain.AddBlock(blk.Header); err != nil { return xerrors.Errorf("AddBlock failed: %w", err) } @@ -46,7 +45,7 @@ func (a *ChainAPI) ChainHead(context.Context) (*types.TipSet, error) { } func (a *ChainAPI) ChainGetRandomness(ctx context.Context, pts *types.TipSet, tickets []*types.Ticket, lb int) ([]byte, error) { - return a.Chain.GetRandomness(ctx, pts, tickets, lb) + return a.Chain.GetRandomness(ctx, pts.Cids(), tickets, int64(lb)) } func (a *ChainAPI) ChainWaitMsg(ctx context.Context, msg cid.Cid) (*api.MsgWait, error) { @@ -108,3 +107,7 @@ func (a *ChainAPI) ChainGetBlockReceipts(ctx context.Context, bcid cid.Cid) ([]* return out, nil } + +func (a *ChainAPI) ChainGetTipSetByHeight(ctx context.Context, h uint64, ts *types.TipSet) (*types.TipSet, error) { + return a.Chain.GetTipsetByHeight(ctx, h, ts) +} diff --git a/node/impl/full/state.go b/node/impl/full/state.go index 57141846f..2b1414d4d 100644 --- a/node/impl/full/state.go +++ b/node/impl/full/state.go @@ -2,18 +2,14 @@ package full import ( "context" - "fmt" - "strconv" cid "github.com/ipfs/go-cid" "github.com/ipfs/go-hamt-ipld" - cbor "github.com/ipfs/go-ipld-cbor" "github.com/libp2p/go-libp2p-core/peer" "go.uber.org/fx" "golang.org/x/xerrors" "github.com/filecoin-project/go-lotus/api" - "github.com/filecoin-project/go-lotus/chain" "github.com/filecoin-project/go-lotus/chain/actors" "github.com/filecoin-project/go-lotus/chain/address" "github.com/filecoin-project/go-lotus/chain/gen" @@ -38,119 +34,11 @@ type StateAPI struct { } func (a *StateAPI) StateMinerSectors(ctx context.Context, addr address.Address) ([]*api.SectorInfo, error) { - ts := a.StateManager.ChainStore().GetHeaviestTipSet() - - stc, err := a.StateManager.TipSetState(ts.Cids()) - if err != nil { - return nil, err - } - - cst := hamt.CSTFromBstore(a.StateManager.ChainStore().Blockstore()) - - st, err := state.LoadStateTree(cst, stc) - if err != nil { - return nil, err - } - - act, err := st.GetActor(addr) - if err != nil { - return nil, err - } - - var minerState actors.StorageMinerActorState - if err := cst.Get(ctx, act.Head, &minerState); err != nil { - return nil, err - } - - nd, err := hamt.LoadNode(ctx, cst, minerState.Sectors) - if err != nil { - return nil, err - } - - var sinfos []*api.SectorInfo - // Note to self: the hamt isnt a great data structure to use here... need to implement the sector set - err = nd.ForEach(ctx, func(k string, val interface{}) error { - sid, err := strconv.ParseUint(k, 10, 64) - if err != nil { - return err - } - - bval, ok := val.([]byte) - if !ok { - return fmt.Errorf("expected to get bytes in sector set hamt") - } - - var comms [][]byte - if err := cbor.DecodeInto(bval, &comms); err != nil { - return err - } - - sinfos = append(sinfos, &api.SectorInfo{ - SectorID: sid, - CommR: comms[0], - CommD: comms[1], - }) - return nil - }) - return sinfos, nil + return stmgr.GetMinerSectorSet(ctx, a.StateManager, nil, addr) } -func (a *StateAPI) StateMinerProvingSet(ctx context.Context, addr address.Address) ([]*api.SectorInfo, error) { - ts := a.Chain.GetHeaviestTipSet() - - stc, err := a.StateManager.TipSetState(ts.Cids()) - if err != nil { - return nil, err - } - - cst := hamt.CSTFromBstore(a.Chain.Blockstore()) - - st, err := state.LoadStateTree(cst, stc) - if err != nil { - return nil, err - } - - act, err := st.GetActor(addr) - if err != nil { - return nil, err - } - - var minerState actors.StorageMinerActorState - if err := cst.Get(ctx, act.Head, &minerState); err != nil { - return nil, err - } - - nd, err := hamt.LoadNode(ctx, cst, minerState.ProvingSet) - if err != nil { - return nil, err - } - - var sinfos []*api.SectorInfo - // Note to self: the hamt isnt a great data structure to use here... need to implement the sector set - err = nd.ForEach(ctx, func(k string, val interface{}) error { - sid, err := strconv.ParseUint(k, 10, 64) - if err != nil { - return err - } - - bval, ok := val.([]byte) - if !ok { - return fmt.Errorf("expected to get bytes in sector set hamt") - } - - var comms [][]byte - if err := cbor.DecodeInto(bval, &comms); err != nil { - return err - } - - sinfos = append(sinfos, &api.SectorInfo{ - SectorID: sid, - CommR: comms[0], - CommD: comms[1], - }) - return nil - }) - return sinfos, nil +func (a *StateAPI) StateMinerProvingSet(ctx context.Context, addr address.Address, ts *types.TipSet) ([]*api.SectorInfo, error) { + return stmgr.GetMinerProvingSet(ctx, a.StateManager, ts, addr) } func (a *StateAPI) StateMinerPower(ctx context.Context, maddr address.Address, ts *types.TipSet) (api.MinerPower, error) { @@ -191,6 +79,10 @@ func (a *StateAPI) StateMinerPeerID(ctx context.Context, m address.Address, ts * return stmgr.GetMinerPeerID(ctx, a.StateManager, ts, m) } +func (a *StateAPI) StateMinerProvingPeriodEnd(ctx context.Context, actor address.Address, ts *types.TipSet) (uint64, error) { + return stmgr.GetMinerProvingPeriodEnd(ctx, a.StateManager, ts, actor) +} + func (a *StateAPI) StateCall(ctx context.Context, msg *types.Message, ts *types.TipSet) (*types.MessageReceipt, error) { return a.StateManager.Call(ctx, msg, ts) } @@ -260,13 +152,13 @@ func (a *StateAPI) StateReadState(ctx context.Context, act *types.Actor, ts *typ } // This is on StateAPI because miner.Miner requires this, and MinerAPI requires miner.Miner -func (a *StateAPI) MinerCreateBlock(ctx context.Context, addr address.Address, parents *types.TipSet, tickets []*types.Ticket, proof types.ElectionProof, msgs []*types.SignedMessage, ts uint64) (*chain.BlockMsg, error) { +func (a *StateAPI) MinerCreateBlock(ctx context.Context, addr address.Address, parents *types.TipSet, tickets []*types.Ticket, proof types.ElectionProof, msgs []*types.SignedMessage, ts uint64) (*types.BlockMsg, error) { fblk, err := gen.MinerCreateBlock(ctx, a.StateManager, a.Wallet, addr, parents, tickets, proof, msgs, ts) if err != nil { return nil, err } - var out chain.BlockMsg + var out types.BlockMsg out.Header = fblk.Header for _, msg := range fblk.BlsMessages { out.BlsMessages = append(out.BlsMessages, msg.Cid()) diff --git a/node/impl/full/wallet.go b/node/impl/full/wallet.go index 38803460f..6f02f7ff5 100644 --- a/node/impl/full/wallet.go +++ b/node/impl/full/wallet.go @@ -32,7 +32,7 @@ func (a *WalletAPI) WalletList(ctx context.Context) ([]address.Address, error) { } func (a *WalletAPI) WalletBalance(ctx context.Context, addr address.Address) (types.BigInt, error) { - return a.StateManager.GetBalance(addr) + return a.StateManager.GetBalance(addr, nil) } func (a *WalletAPI) WalletSign(ctx context.Context, k address.Address, msg []byte) (*types.Signature, error) { diff --git a/paych/paych.go b/paych/paych.go index 07dd8cbf8..5b0889be9 100644 --- a/paych/paych.go +++ b/paych/paych.go @@ -223,7 +223,7 @@ func (pm *Manager) CheckVoucherSpendable(ctx context.Context, ch address.Address func (pm *Manager) loadPaychState(ctx context.Context, ch address.Address) (*types.Actor, *actors.PaymentChannelActorState, error) { var pcast actors.PaymentChannelActorState - act, err := pm.sm.LoadActorState(ctx, ch, &pcast) + act, err := pm.sm.LoadActorState(ctx, ch, &pcast, nil) if err != nil { return nil, nil, err } diff --git a/storage/miner.go b/storage/miner.go index 9b4ff0ef2..00c69ea87 100644 --- a/storage/miner.go +++ b/storage/miner.go @@ -2,18 +2,20 @@ package storage import ( "context" - "fmt" - + "encoding/base64" "github.com/ipfs/go-cid" "github.com/ipfs/go-datastore" logging "github.com/ipfs/go-log" - host "github.com/libp2p/go-libp2p-core/host" + "github.com/libp2p/go-libp2p-core/host" "github.com/pkg/errors" + "golang.org/x/xerrors" + "sync" "github.com/filecoin-project/go-lotus/api" "github.com/filecoin-project/go-lotus/build" "github.com/filecoin-project/go-lotus/chain/actors" "github.com/filecoin-project/go-lotus/chain/address" + "github.com/filecoin-project/go-lotus/chain/events" "github.com/filecoin-project/go-lotus/chain/store" "github.com/filecoin-project/go-lotus/chain/types" "github.com/filecoin-project/go-lotus/lib/sectorbuilder" @@ -23,8 +25,11 @@ import ( var log = logging.Logger("storageminer") +const PoStConfidence = 1 + type Miner struct { - api storageMinerApi + api storageMinerApi + events *events.Events secst *sector.Store commt *commitment.Tracker @@ -36,6 +41,9 @@ type Miner struct { h host.Host ds datastore.Batching + + schedLk sync.Mutex + postSched uint64 } type storageMinerApi interface { @@ -44,21 +52,27 @@ type storageMinerApi interface { // Call a read only method on actors (no interaction with the chain required) StateCall(ctx context.Context, msg *types.Message, ts *types.TipSet) (*types.MessageReceipt, error) + StateMinerWorker(context.Context, address.Address, *types.TipSet) (address.Address, error) + StateMinerProvingPeriodEnd(context.Context, address.Address, *types.TipSet) (uint64, error) + StateMinerProvingSet(context.Context, address.Address, *types.TipSet) ([]*api.SectorInfo, error) - MpoolPush(context.Context, *types.SignedMessage) error - MpoolGetNonce(context.Context, address.Address) (uint64, error) + MpoolPushMessage(context.Context, *types.Message) (*types.SignedMessage, error) + ChainHead(context.Context) (*types.TipSet, error) ChainWaitMsg(context.Context, cid.Cid) (*api.MsgWait, error) - ChainNotify(context.Context) (<-chan *store.HeadChange, error) + ChainNotify(context.Context) (<-chan []*store.HeadChange, error) + ChainGetRandomness(context.Context, *types.TipSet, []*types.Ticket, int) ([]byte, error) + ChainGetTipSetByHeight(context.Context, uint64, *types.TipSet) (*types.TipSet, error) + ChainGetBlockMessages(context.Context, cid.Cid) (*api.BlockMessages, error) WalletBalance(context.Context, address.Address) (types.BigInt, error) - WalletSign(context.Context, address.Address, []byte) (*types.Signature, error) WalletHas(context.Context, address.Address) (bool, error) } func NewMiner(api storageMinerApi, addr address.Address, h host.Host, ds datastore.Batching, secst *sector.Store, commt *commitment.Tracker) (*Miner, error) { return &Miner{ - api: api, + api: api, + maddr: addr, h: h, ds: ds, @@ -72,8 +86,15 @@ func (m *Miner) Run(ctx context.Context) error { return errors.Wrap(err, "miner preflight checks failed") } + m.events = events.NewEvents(ctx, m.api) + + ts, err := m.api.ChainHead(ctx) + if err != nil { + return err + } + go m.handlePostingSealedSectors(ctx) - go m.runPoSt(ctx) + go m.schedulePoSt(ctx, ts) return nil } @@ -126,7 +147,7 @@ func (m *Miner) commitSector(ctx context.Context, sinfo sectorbuilder.SectorSeal return errors.Wrap(aerr, "could not serialize commit sector parameters") } - msg := types.Message{ + msg := &types.Message{ To: m.maddr, From: m.worker, Method: actors.MAMethods.CommitSector, @@ -136,45 +157,181 @@ func (m *Miner) commitSector(ctx context.Context, sinfo sectorbuilder.SectorSeal GasPrice: types.NewInt(1), } - nonce, err := m.api.MpoolGetNonce(ctx, m.worker) + smsg, err := m.api.MpoolPushMessage(ctx, msg) if err != nil { - return errors.Wrap(err, "failed to get nonce") - } - - msg.Nonce = nonce - - data, err := msg.Serialize() - if err != nil { - return errors.Wrap(err, "serializing commit sector message") - } - - sig, err := m.api.WalletSign(ctx, m.worker, data) - if err != nil { - return errors.Wrap(err, "signing commit sector message") - } - - smsg := &types.SignedMessage{ - Message: msg, - Signature: *sig, - } - - if err := m.api.MpoolPush(ctx, smsg); err != nil { - return errors.Wrap(err, "pushing commit sector message to mpool") + return errors.Wrap(err, "pushing message to mpool") } if err := m.commt.TrackCommitSectorMsg(m.maddr, sinfo.SectorID, smsg.Cid()); err != nil { return errors.Wrap(err, "tracking sector commitment") } + go func() { + _, err := m.api.ChainWaitMsg(ctx, smsg.Cid()) + if err != nil { + return + } + + m.schedulePoSt(ctx, nil) + }() + return nil } -func (m *Miner) runPoSt(ctx context.Context) { - log.Warning("dont care about posts yet") +func (m *Miner) schedulePoSt(ctx context.Context, baseTs *types.TipSet) { + ppe, err := m.api.StateMinerProvingPeriodEnd(ctx, m.maddr, baseTs) + if err != nil { + log.Errorf("failed to get proving period end for miner: %s", err) + return + } + + if ppe == 0 { + log.Errorf("Proving period end == 0") + // TODO: we probably want to call schedulePoSt after the first commitSector call + return + } + + m.schedLk.Lock() + + if m.postSched >= ppe { + log.Warnf("schedulePoSt already called for proving period >= %d", m.postSched) + m.schedLk.Unlock() + return + } + m.postSched = ppe + m.schedLk.Unlock() + + log.Infof("Scheduling post at height %d", ppe-build.PoSTChallangeTime) + err = m.events.ChainAt(m.startPost, func(ts *types.TipSet) error { // Revert + // TODO: Cancel post + return nil + }, PoStConfidence, ppe-build.PoSTChallangeTime) + if err != nil { + // TODO: This is BAD, figure something out + log.Errorf("scheduling PoSt failed: %s", err) + return + } +} + +func (m *Miner) startPost(ts *types.TipSet, curH uint64) error { + log.Info("starting PoSt computation") + + head, err := m.api.ChainHead(context.TODO()) + if err != nil { + return err + } + + postWaitCh, _, err := m.maybeDoPost(context.TODO(), head) + if err != nil { + return err + } + + if postWaitCh == nil { + return errors.New("PoSt didn't start") + } + + go func() { + err := <-postWaitCh + if err != nil { + log.Errorf("got error back from postWaitCh: %s", err) + return + } + + log.Infof("post successfully submitted") + + m.schedulePoSt(context.TODO(), ts) + }() + return nil +} + +func (m *Miner) maybeDoPost(ctx context.Context, ts *types.TipSet) (<-chan error, *types.BlockHeader, error) { + ppe, err := m.api.StateMinerProvingPeriodEnd(ctx, m.maddr, ts) + if err != nil { + return nil, nil, xerrors.Errorf("failed to get proving period end for miner: %w", err) + } + + if ts.Height() > ppe { + log.Warnf("skipping post, supplied tipset too high: ppe=%d, ts.H=%d", ppe, ts.Height()) + return nil, nil, nil + } + + sset, err := m.api.StateMinerProvingSet(ctx, m.maddr, ts) + if err != nil { + return nil, nil, xerrors.Errorf("failed to get proving set for miner: %w", err) + } + + r, err := m.api.ChainGetRandomness(ctx, ts, nil, int(int64(ts.Height())-int64(ppe)+int64(build.PoSTChallangeTime))) // TODO: review: check math + if err != nil { + return nil, nil, xerrors.Errorf("failed to get chain randomness for post: %w", err) + } + + sourceTs, err := m.api.ChainGetTipSetByHeight(ctx, ppe-build.PoSTChallangeTime, ts) + if err != nil { + return nil, nil, xerrors.Errorf("failed to get post start tipset: %w", err) + } + + ret := make(chan error, 1) + go func() { + log.Infof("running PoSt computation, rh=%d r=%s, ppe=%d, h=%d", ts.Height()-(ts.Height()-ppe+build.PoSTChallangeTime), base64.StdEncoding.EncodeToString(r), ppe, ts.Height()) + var faults []uint64 + proof, err := m.secst.RunPoSt(ctx, sset, r, faults) + if err != nil { + ret <- xerrors.Errorf("running post failed: %w", err) + return + } + + log.Infof("submitting PoSt pLen=%d", len(proof)) + + params := &actors.SubmitPoStParams{ + Proof: proof, + DoneSet: types.BitFieldFromSet(sectorIdList(sset)), + } + + enc, aerr := actors.SerializeParams(params) + if aerr != nil { + ret <- xerrors.Errorf("could not serialize submit post parameters: %w", err) + return + } + + msg := &types.Message{ + To: m.maddr, + From: m.worker, + Method: actors.MAMethods.SubmitPoSt, + Params: enc, + Value: types.NewInt(0), + GasLimit: types.NewInt(100000 /* i dont know help */), + GasPrice: types.NewInt(1), + } + + smsg, err := m.api.MpoolPushMessage(ctx, msg) + if err != nil { + ret <- xerrors.Errorf("pushing message to mpool: %w", err) + return + } + + // make sure it succeeds... + _, err = m.api.ChainWaitMsg(ctx, smsg.Cid()) + if err != nil { + return + } + // TODO: check receipt + + m.schedulePoSt(ctx, nil) + }() + + return ret, sourceTs.MinTicketBlock(), nil +} + +func sectorIdList(si []*api.SectorInfo) []uint64 { + out := make([]uint64, len(si)) + for i, s := range si { + out[i] = s.SectorID + } + return out } func (m *Miner) runPreflightChecks(ctx context.Context) error { - worker, err := m.getWorkerAddr(ctx) + worker, err := m.api.StateMinerWorker(ctx, m.maddr, nil) if err != nil { return err } @@ -193,23 +350,3 @@ func (m *Miner) runPreflightChecks(ctx context.Context) error { log.Infof("starting up miner %s, worker addr %s", m.maddr, m.worker) return nil } - -func (m *Miner) getWorkerAddr(ctx context.Context) (address.Address, error) { - msg := &types.Message{ - To: m.maddr, - From: m.maddr, // it doesnt like it if we dont give it a from... probably should fix that - Method: actors.MAMethods.GetWorkerAddr, - Params: nil, - } - - recpt, err := m.api.StateCall(ctx, msg, nil) - if err != nil { - return address.Undef, errors.Wrapf(err, "calling getWorker(%s)", m.maddr) - } - - if recpt.ExitCode != 0 { - return address.Undef, fmt.Errorf("failed to call getWorker(%s): return %d", m.maddr, recpt.ExitCode) - } - - return address.NewFromBytes(recpt.Return) -} diff --git a/storage/sector/store.go b/storage/sector/store.go index 3d18e77ad..975688c8d 100644 --- a/storage/sector/store.go +++ b/storage/sector/store.go @@ -2,12 +2,14 @@ package sector import ( "context" + "golang.org/x/xerrors" "io" "io/ioutil" "os" "sync" "time" + "github.com/filecoin-project/go-lotus/api" "github.com/filecoin-project/go-lotus/lib/sectorbuilder" logging "github.com/ipfs/go-log" @@ -40,7 +42,7 @@ func (s *Store) Service() { } func (s *Store) poll() { - log.Info("polling for sealed sectors...") + log.Debug("polling for sealed sectors...") // get a list of sectors to poll s.lk.Lock() @@ -162,6 +164,30 @@ func (s *Store) WaitSeal(ctx context.Context, sector uint64) (sectorbuilder.Sect return s.sb.SealStatus(sector) } +func (s *Store) RunPoSt(ctx context.Context, sectors []*api.SectorInfo, r []byte, faults []uint64) ([]byte, error) { + sbsi := make([]sectorbuilder.SectorInfo, len(sectors)) + for k, sector := range sectors { + var commR [sectorbuilder.CommLen]byte + if copy(commR[:], sector.CommR) != sectorbuilder.CommLen { + return nil, xerrors.Errorf("commR too short, %d bytes", len(sector.CommR)) + } + + sbsi[k] = sectorbuilder.SectorInfo{ + SectorID: sector.SectorID, + CommR: commR, + } + } + + ssi := sectorbuilder.NewSortedSectorInfo(sbsi) + + var seed [sectorbuilder.CommLen]byte + if copy(seed[:], r) != sectorbuilder.CommLen { + return nil, xerrors.Errorf("random seed too short, %d bytes", len(r)) + } + + return s.sb.GeneratePoSt(ssi, seed, faults) +} + func (s *Store) Stop() { close(s.closeCh) }