Merge pull request #205 from filecoin-project/feat/post-schedule

Implement  PoSt scheduler
This commit is contained in:
Łukasz Magiera 2019-09-20 16:57:31 +02:00 committed by GitHub
commit 07f97c693e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
45 changed files with 1003 additions and 544 deletions

View File

@ -9,7 +9,6 @@ import (
"github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peer"
"github.com/filecoin-project/go-lotus/chain"
"github.com/filecoin-project/go-lotus/chain/address" "github.com/filecoin-project/go-lotus/chain/address"
"github.com/filecoin-project/go-lotus/chain/store" "github.com/filecoin-project/go-lotus/chain/store"
"github.com/filecoin-project/go-lotus/chain/types" "github.com/filecoin-project/go-lotus/chain/types"
@ -45,14 +44,18 @@ type FullNode interface {
Common Common
// chain // chain
ChainNotify(context.Context) (<-chan *store.HeadChange, error)
// ChainNotify returns channel with chain head updates
// First message is guaranteed to be of len == 1, and type == 'current'
ChainNotify(context.Context) (<-chan []*store.HeadChange, error)
ChainHead(context.Context) (*types.TipSet, error) // TODO: check serialization ChainHead(context.Context) (*types.TipSet, error) // TODO: check serialization
ChainSubmitBlock(ctx context.Context, blk *chain.BlockMsg) error // TODO: check serialization ChainSubmitBlock(ctx context.Context, blk *types.BlockMsg) error // TODO: check serialization
ChainGetRandomness(context.Context, *types.TipSet, []*types.Ticket, int) ([]byte, error) ChainGetRandomness(context.Context, *types.TipSet, []*types.Ticket, int) ([]byte, error)
ChainWaitMsg(context.Context, cid.Cid) (*MsgWait, error) ChainWaitMsg(context.Context, cid.Cid) (*MsgWait, error)
ChainGetBlock(context.Context, cid.Cid) (*types.BlockHeader, error) ChainGetBlock(context.Context, cid.Cid) (*types.BlockHeader, error)
ChainGetBlockMessages(context.Context, cid.Cid) (*BlockMessages, error) ChainGetBlockMessages(context.Context, cid.Cid) (*BlockMessages, error)
ChainGetBlockReceipts(context.Context, cid.Cid) ([]*types.MessageReceipt, error) ChainGetBlockReceipts(context.Context, cid.Cid) ([]*types.MessageReceipt, error)
ChainGetTipSetByHeight(context.Context, uint64, *types.TipSet) (*types.TipSet, error)
// messages // messages
@ -68,7 +71,7 @@ type FullNode interface {
MinerRegister(context.Context, address.Address) error MinerRegister(context.Context, address.Address) error
MinerUnregister(context.Context, address.Address) error MinerUnregister(context.Context, address.Address) error
MinerAddresses(context.Context) ([]address.Address, error) MinerAddresses(context.Context) ([]address.Address, error)
MinerCreateBlock(context.Context, address.Address, *types.TipSet, []*types.Ticket, types.ElectionProof, []*types.SignedMessage, uint64) (*chain.BlockMsg, error) MinerCreateBlock(context.Context, address.Address, *types.TipSet, []*types.Ticket, types.ElectionProof, []*types.SignedMessage, uint64) (*types.BlockMsg, error)
// // UX ? // // UX ?
@ -101,17 +104,19 @@ type FullNode interface {
//ClientListAsks() []Ask //ClientListAsks() []Ask
StateMinerSectors(context.Context, address.Address) ([]*SectorInfo, error)
StateMinerProvingSet(context.Context, address.Address) ([]*SectorInfo, error)
StateMinerPower(context.Context, address.Address, *types.TipSet) (MinerPower, error)
StateMinerWorker(context.Context, address.Address, *types.TipSet) (address.Address, error)
StateMinerPeerID(ctx context.Context, m address.Address, ts *types.TipSet) (peer.ID, error)
// if tipset is nil, we'll use heaviest // if tipset is nil, we'll use heaviest
StateCall(context.Context, *types.Message, *types.TipSet) (*types.MessageReceipt, error) StateCall(context.Context, *types.Message, *types.TipSet) (*types.MessageReceipt, error)
StateReplay(context.Context, *types.TipSet, cid.Cid) (*ReplayResults, error) StateReplay(context.Context, *types.TipSet, cid.Cid) (*ReplayResults, error)
StateGetActor(ctx context.Context, actor address.Address, ts *types.TipSet) (*types.Actor, error) StateGetActor(ctx context.Context, actor address.Address, ts *types.TipSet) (*types.Actor, error)
StateReadState(ctx context.Context, act *types.Actor, ts *types.TipSet) (*ActorState, error) StateReadState(ctx context.Context, act *types.Actor, ts *types.TipSet) (*ActorState, error)
StateMinerSectors(context.Context, address.Address) ([]*SectorInfo, error)
StateMinerProvingSet(context.Context, address.Address, *types.TipSet) ([]*SectorInfo, error)
StateMinerPower(context.Context, address.Address, *types.TipSet) (MinerPower, error)
StateMinerWorker(context.Context, address.Address, *types.TipSet) (address.Address, error)
StateMinerPeerID(ctx context.Context, m address.Address, ts *types.TipSet) (peer.ID, error)
StateMinerProvingPeriodEnd(ctx context.Context, actor address.Address, ts *types.TipSet) (uint64, error)
PaychGet(ctx context.Context, from, to address.Address, ensureFunds types.BigInt) (*ChannelInfo, error) PaychGet(ctx context.Context, from, to address.Address, ensureFunds types.BigInt) (*ChannelInfo, error)
PaychList(context.Context) ([]address.Address, error) PaychList(context.Context) ([]address.Address, error)
PaychStatus(context.Context, address.Address) (*PaychStatus, error) PaychStatus(context.Context, address.Address) (*PaychStatus, error)

View File

@ -5,7 +5,6 @@ import (
"github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/network"
"github.com/filecoin-project/go-lotus/chain"
"github.com/filecoin-project/go-lotus/chain/address" "github.com/filecoin-project/go-lotus/chain/address"
"github.com/filecoin-project/go-lotus/chain/store" "github.com/filecoin-project/go-lotus/chain/store"
"github.com/filecoin-project/go-lotus/chain/types" "github.com/filecoin-project/go-lotus/chain/types"
@ -39,14 +38,15 @@ type FullNodeStruct struct {
CommonStruct CommonStruct
Internal struct { Internal struct {
ChainNotify func(context.Context) (<-chan *store.HeadChange, error) `perm:"read"` ChainNotify func(context.Context) (<-chan []*store.HeadChange, error) `perm:"read"`
ChainSubmitBlock func(ctx context.Context, blk *chain.BlockMsg) error `perm:"write"` ChainSubmitBlock func(ctx context.Context, blk *types.BlockMsg) error `perm:"write"`
ChainHead func(context.Context) (*types.TipSet, error) `perm:"read"` ChainHead func(context.Context) (*types.TipSet, error) `perm:"read"`
ChainGetRandomness func(context.Context, *types.TipSet, []*types.Ticket, int) ([]byte, error) `perm:"read"` ChainGetRandomness func(context.Context, *types.TipSet, []*types.Ticket, int) ([]byte, error) `perm:"read"`
ChainWaitMsg func(context.Context, cid.Cid) (*MsgWait, error) `perm:"read"` ChainWaitMsg func(context.Context, cid.Cid) (*MsgWait, error) `perm:"read"`
ChainGetBlock func(context.Context, cid.Cid) (*types.BlockHeader, error) `perm:"read"` ChainGetBlock func(context.Context, cid.Cid) (*types.BlockHeader, error) `perm:"read"`
ChainGetBlockMessages func(context.Context, cid.Cid) (*BlockMessages, error) `perm:"read"` ChainGetBlockMessages func(context.Context, cid.Cid) (*BlockMessages, error) `perm:"read"`
ChainGetBlockReceipts func(context.Context, cid.Cid) ([]*types.MessageReceipt, error) `perm:"read"` ChainGetBlockReceipts func(context.Context, cid.Cid) ([]*types.MessageReceipt, error) `perm:"read"`
ChainGetTipSetByHeight func(context.Context, uint64, *types.TipSet) (*types.TipSet, error) `perm:"read"`
MpoolPending func(context.Context, *types.TipSet) ([]*types.SignedMessage, error) `perm:"read"` MpoolPending func(context.Context, *types.TipSet) ([]*types.SignedMessage, error) `perm:"read"`
MpoolPush func(context.Context, *types.SignedMessage) error `perm:"write"` MpoolPush func(context.Context, *types.SignedMessage) error `perm:"write"`
@ -55,7 +55,7 @@ type FullNodeStruct struct {
MinerRegister func(context.Context, address.Address) error `perm:"admin"` MinerRegister func(context.Context, address.Address) error `perm:"admin"`
MinerUnregister func(context.Context, address.Address) error `perm:"admin"` MinerUnregister func(context.Context, address.Address) error `perm:"admin"`
MinerAddresses func(context.Context) ([]address.Address, error) `perm:"write"` MinerAddresses func(context.Context) ([]address.Address, error) `perm:"write"`
MinerCreateBlock func(context.Context, address.Address, *types.TipSet, []*types.Ticket, types.ElectionProof, []*types.SignedMessage, uint64) (*chain.BlockMsg, error) `perm:"write"` MinerCreateBlock func(context.Context, address.Address, *types.TipSet, []*types.Ticket, types.ElectionProof, []*types.SignedMessage, uint64) (*types.BlockMsg, error) `perm:"write"`
WalletNew func(context.Context, string) (address.Address, error) `perm:"write"` WalletNew func(context.Context, string) (address.Address, error) `perm:"write"`
WalletHas func(context.Context, address.Address) (bool, error) `perm:"write"` WalletHas func(context.Context, address.Address) (bool, error) `perm:"write"`
@ -76,10 +76,11 @@ type FullNodeStruct struct {
ClientQueryAsk func(ctx context.Context, p peer.ID, miner address.Address) (*types.SignedStorageAsk, error) `perm:"read"` ClientQueryAsk func(ctx context.Context, p peer.ID, miner address.Address) (*types.SignedStorageAsk, error) `perm:"read"`
StateMinerSectors func(context.Context, address.Address) ([]*SectorInfo, error) `perm:"read"` StateMinerSectors func(context.Context, address.Address) ([]*SectorInfo, error) `perm:"read"`
StateMinerProvingSet func(context.Context, address.Address) ([]*SectorInfo, error) `perm:"read"` StateMinerProvingSet func(context.Context, address.Address, *types.TipSet) ([]*SectorInfo, error) `perm:"read"`
StateMinerPower func(context.Context, address.Address, *types.TipSet) (MinerPower, error) `perm:"read"` StateMinerPower func(context.Context, address.Address, *types.TipSet) (MinerPower, error) `perm:"read"`
StateMinerWorker func(context.Context, address.Address, *types.TipSet) (address.Address, error) `perm:"read"` StateMinerWorker func(context.Context, address.Address, *types.TipSet) (address.Address, error) `perm:"read"`
StateMinerPeerID func(ctx context.Context, m address.Address, ts *types.TipSet) (peer.ID, error) `perm:"read"` StateMinerPeerID func(ctx context.Context, m address.Address, ts *types.TipSet) (peer.ID, error) `perm:"read"`
StateMinerProvingPeriodEnd func(ctx context.Context, actor address.Address, ts *types.TipSet) (uint64, error) `perm:"read"`
StateCall func(context.Context, *types.Message, *types.TipSet) (*types.MessageReceipt, error) `perm:"read"` StateCall func(context.Context, *types.Message, *types.TipSet) (*types.MessageReceipt, error) `perm:"read"`
StateReplay func(context.Context, *types.TipSet, cid.Cid) (*ReplayResults, error) `perm:"read"` StateReplay func(context.Context, *types.TipSet, cid.Cid) (*ReplayResults, error) `perm:"read"`
StateGetActor func(context.Context, address.Address, *types.TipSet) (*types.Actor, error) `perm:"read"` StateGetActor func(context.Context, address.Address, *types.TipSet) (*types.Actor, error) `perm:"read"`
@ -211,11 +212,11 @@ func (c *FullNodeStruct) MinerAddresses(ctx context.Context) ([]address.Address,
return c.Internal.MinerAddresses(ctx) return c.Internal.MinerAddresses(ctx)
} }
func (c *FullNodeStruct) MinerCreateBlock(ctx context.Context, addr address.Address, base *types.TipSet, tickets []*types.Ticket, eproof types.ElectionProof, msgs []*types.SignedMessage, ts uint64) (*chain.BlockMsg, error) { func (c *FullNodeStruct) MinerCreateBlock(ctx context.Context, addr address.Address, base *types.TipSet, tickets []*types.Ticket, eproof types.ElectionProof, msgs []*types.SignedMessage, ts uint64) (*types.BlockMsg, error) {
return c.Internal.MinerCreateBlock(ctx, addr, base, tickets, eproof, msgs, ts) return c.Internal.MinerCreateBlock(ctx, addr, base, tickets, eproof, msgs, ts)
} }
func (c *FullNodeStruct) ChainSubmitBlock(ctx context.Context, blk *chain.BlockMsg) error { func (c *FullNodeStruct) ChainSubmitBlock(ctx context.Context, blk *types.BlockMsg) error {
return c.Internal.ChainSubmitBlock(ctx, blk) return c.Internal.ChainSubmitBlock(ctx, blk)
} }
@ -231,6 +232,10 @@ func (c *FullNodeStruct) ChainWaitMsg(ctx context.Context, msgc cid.Cid) (*MsgWa
return c.Internal.ChainWaitMsg(ctx, msgc) return c.Internal.ChainWaitMsg(ctx, msgc)
} }
func (c *FullNodeStruct) ChainGetTipSetByHeight(ctx context.Context, h uint64, ts *types.TipSet) (*types.TipSet, error) {
return c.Internal.ChainGetTipSetByHeight(ctx, h, ts)
}
func (c *FullNodeStruct) WalletNew(ctx context.Context, typ string) (address.Address, error) { func (c *FullNodeStruct) WalletNew(ctx context.Context, typ string) (address.Address, error) {
return c.Internal.WalletNew(ctx, typ) return c.Internal.WalletNew(ctx, typ)
} }
@ -275,7 +280,7 @@ func (c *FullNodeStruct) ChainGetBlockReceipts(ctx context.Context, b cid.Cid) (
return c.Internal.ChainGetBlockReceipts(ctx, b) return c.Internal.ChainGetBlockReceipts(ctx, b)
} }
func (c *FullNodeStruct) ChainNotify(ctx context.Context) (<-chan *store.HeadChange, error) { func (c *FullNodeStruct) ChainNotify(ctx context.Context) (<-chan []*store.HeadChange, error) {
return c.Internal.ChainNotify(ctx) return c.Internal.ChainNotify(ctx)
} }
@ -283,8 +288,8 @@ func (c *FullNodeStruct) StateMinerSectors(ctx context.Context, addr address.Add
return c.Internal.StateMinerSectors(ctx, addr) return c.Internal.StateMinerSectors(ctx, addr)
} }
func (c *FullNodeStruct) StateMinerProvingSet(ctx context.Context, addr address.Address) ([]*SectorInfo, error) { func (c *FullNodeStruct) StateMinerProvingSet(ctx context.Context, addr address.Address, ts *types.TipSet) ([]*SectorInfo, error) {
return c.Internal.StateMinerProvingSet(ctx, addr) return c.Internal.StateMinerProvingSet(ctx, addr, ts)
} }
func (c *FullNodeStruct) StateMinerPower(ctx context.Context, a address.Address, ts *types.TipSet) (MinerPower, error) { func (c *FullNodeStruct) StateMinerPower(ctx context.Context, a address.Address, ts *types.TipSet) (MinerPower, error) {
@ -298,6 +303,9 @@ func (c *FullNodeStruct) StateMinerWorker(ctx context.Context, m address.Address
func (c *FullNodeStruct) StateMinerPeerID(ctx context.Context, m address.Address, ts *types.TipSet) (peer.ID, error) { func (c *FullNodeStruct) StateMinerPeerID(ctx context.Context, m address.Address, ts *types.TipSet) (peer.ID, error) {
return c.Internal.StateMinerPeerID(ctx, m, ts) return c.Internal.StateMinerPeerID(ctx, m, ts)
} }
func (c *FullNodeStruct) StateMinerProvingPeriodEnd(ctx context.Context, actor address.Address, ts *types.TipSet) (uint64, error) {
return c.Internal.StateMinerProvingPeriodEnd(ctx, actor, ts)
}
func (c *FullNodeStruct) StateCall(ctx context.Context, msg *types.Message, ts *types.TipSet) (*types.MessageReceipt, error) { func (c *FullNodeStruct) StateCall(ctx context.Context, msg *types.Message, ts *types.TipSet) (*types.MessageReceipt, error) {
return c.Internal.StateCall(ctx, msg, ts) return c.Internal.StateCall(ctx, msg, ts)

View File

@ -1,5 +1,7 @@
package build package build
const BlockDelay = 5 // Seconds
const BlockDelay = 3
// Seconds
const AllowableClockDrift = BlockDelay * 2 const AllowableClockDrift = BlockDelay * 2

View File

@ -7,14 +7,20 @@ const UnixfsLinksPerLevel = 1024
const SectorSize = 1024 const SectorSize = 1024
// Blocks
const PaymentChannelClosingDelay = 6 * 60 * 2 // six hours const PaymentChannelClosingDelay = 6 * 60 * 2 // six hours
// Blocks
const DealVoucherSkewLimit = 10 const DealVoucherSkewLimit = 10
// Blocks
const ForkLengthThreshold = 20 const ForkLengthThreshold = 20
// Blocks
const RandomnessLookback = 20 const RandomnessLookback = 20
const ProvingPeriodDuration = 2 * 60 // an hour, for now // Blocks
const PoSTChallangeTime = 1 * 60 const ProvingPeriodDuration = 10
const PoSTChallangeTime = 5
// TODO: Move other important consts here // TODO: Move other important consts here

View File

@ -33,13 +33,11 @@ type StorageMinerActorState struct {
DePledgeTime types.BigInt DePledgeTime types.BigInt
// All sectors this miner has committed. // All sectors this miner has committed.
Sectors cid.Cid // TODO: Using a HAMT for now, needs to be an AMT once we implement it Sectors cid.Cid
SectorSetSize uint64 // TODO: the AMT should be able to tell us how many items are in it. This field won't be needed at that point
// Sectors this miner is currently mining. It is only updated // Sectors this miner is currently mining. It is only updated
// when a PoSt is submitted (not as each new sector commitment is added). // when a PoSt is submitted (not as each new sector commitment is added).
ProvingSet cid.Cid ProvingSet cid.Cid
ProvingSetSize uint64
// Faulty sectors reported since last SubmitPost, // Faulty sectors reported since last SubmitPost,
// up to the current proving period's challenge time. // up to the current proving period's challenge time.
@ -268,9 +266,13 @@ func (sma StorageMinerActor) CommitSector(act *types.Actor, vmctx types.VMContex
// //
// Note: Proving period is a function of sector size; small sectors take less // Note: Proving period is a function of sector size; small sectors take less
// time to prove than large sectors do. Sector size is selected when pledging. // time to prove than large sectors do. Sector size is selected when pledging.
if self.ProvingSetSize == 0 { pss, lerr := amt.LoadAMT(types.WrapStorage(vmctx.Storage()), self.ProvingSet)
if lerr != nil {
return nil, aerrors.Escalate(lerr, "could not load proving set node")
}
if pss.Count == 0 {
self.ProvingSet = self.Sectors self.ProvingSet = self.Sectors
self.ProvingSetSize = self.SectorSetSize
self.ProvingPeriodEnd = vmctx.BlockHeight() + build.ProvingPeriodDuration self.ProvingPeriodEnd = vmctx.BlockHeight() + build.ProvingPeriodDuration
} }
@ -357,7 +359,7 @@ func (sma StorageMinerActor) SubmitPoSt(act *types.Actor, vmctx types.VMContext,
pss, lerr := amt.LoadAMT(types.WrapStorage(vmctx.Storage()), self.ProvingSet) pss, lerr := amt.LoadAMT(types.WrapStorage(vmctx.Storage()), self.ProvingSet)
if lerr != nil { if lerr != nil {
return nil, aerrors.Escalate(lerr, "could not load sector set node") return nil, aerrors.Escalate(lerr, "could not load proving set node")
} }
var sectorInfos []sectorbuilder.SectorInfo var sectorInfos []sectorbuilder.SectorInfo
@ -400,7 +402,7 @@ func (sma StorageMinerActor) SubmitPoSt(act *types.Actor, vmctx types.VMContext,
ss, lerr := amt.LoadAMT(types.WrapStorage(vmctx.Storage()), self.ProvingSet) ss, lerr := amt.LoadAMT(types.WrapStorage(vmctx.Storage()), self.ProvingSet)
if lerr != nil { if lerr != nil {
return nil, aerrors.Escalate(lerr, "could not load sector set node") return nil, aerrors.Escalate(lerr, "could not load proving set node")
} }
if err := ss.BatchDelete(params.DoneSet.All()); err != nil { if err := ss.BatchDelete(params.DoneSet.All()); err != nil {
@ -416,7 +418,7 @@ func (sma StorageMinerActor) SubmitPoSt(act *types.Actor, vmctx types.VMContext,
} }
oldPower := self.Power oldPower := self.Power
self.Power = types.BigMul(types.NewInt(self.ProvingSetSize-uint64(len(faults))), self.Power = types.BigMul(types.NewInt(pss.Count-uint64(len(faults))),
mi.SectorSize) mi.SectorSize)
enc, err := SerializeParams(&UpdateStorageParams{Delta: types.BigSub(self.Power, oldPower)}) enc, err := SerializeParams(&UpdateStorageParams{Delta: types.BigSub(self.Power, oldPower)})
@ -430,7 +432,7 @@ func (sma StorageMinerActor) SubmitPoSt(act *types.Actor, vmctx types.VMContext,
} }
self.ProvingSet = self.Sectors self.ProvingSet = self.Sectors
self.ProvingSetSize = self.SectorSetSize self.ProvingPeriodEnd = nextProvingPeriodEnd
self.NextDoneSet = params.DoneSet self.NextDoneSet = params.DoneSet
c, err := vmctx.Storage().Put(self) c, err := vmctx.Storage().Put(self)

View File

@ -51,7 +51,8 @@ func setupVMTestEnv(t *testing.T) (*vm.VM, []address.Address) {
cs := store.NewChainStore(bs, nil) cs := store.NewChainStore(bs, nil)
vm, err := vm.NewVM(stateroot, 1, maddr, cs) // TODO: should probabaly mock out the randomness bit, nil works for now
vm, err := vm.NewVM(stateroot, 1, nil, maddr, cs)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

View File

@ -197,7 +197,7 @@ func (t *StorageMinerActorState) MarshalCBOR(w io.Writer) error {
_, err := w.Write(cbg.CborNull) _, err := w.Write(cbg.CborNull)
return err return err
} }
if _, err := w.Write([]byte{142}); err != nil { if _, err := w.Write([]byte{140}); err != nil {
return err return err
} }
@ -223,22 +223,12 @@ func (t *StorageMinerActorState) MarshalCBOR(w io.Writer) error {
return xerrors.Errorf("failed to write cid field t.Sectors: %w", err) return xerrors.Errorf("failed to write cid field t.Sectors: %w", err)
} }
// t.t.SectorSetSize (uint64)
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, t.SectorSetSize)); err != nil {
return err
}
// t.t.ProvingSet (cid.Cid) // t.t.ProvingSet (cid.Cid)
if err := cbg.WriteCid(w, t.ProvingSet); err != nil { if err := cbg.WriteCid(w, t.ProvingSet); err != nil {
return xerrors.Errorf("failed to write cid field t.ProvingSet: %w", err) return xerrors.Errorf("failed to write cid field t.ProvingSet: %w", err)
} }
// t.t.ProvingSetSize (uint64)
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, t.ProvingSetSize)); err != nil {
return err
}
// t.t.CurrentFaultSet (types.BitField) // t.t.CurrentFaultSet (types.BitField)
if err := t.CurrentFaultSet.MarshalCBOR(w); err != nil { if err := t.CurrentFaultSet.MarshalCBOR(w); err != nil {
return err return err
@ -287,7 +277,7 @@ func (t *StorageMinerActorState) UnmarshalCBOR(r io.Reader) error {
return fmt.Errorf("cbor input should be of type array") return fmt.Errorf("cbor input should be of type array")
} }
if extra != 14 { if extra != 12 {
return fmt.Errorf("cbor input had wrong number of fields") return fmt.Errorf("cbor input had wrong number of fields")
} }
@ -333,16 +323,6 @@ func (t *StorageMinerActorState) UnmarshalCBOR(r io.Reader) error {
t.Sectors = c t.Sectors = c
} }
// t.t.SectorSetSize (uint64)
maj, extra, err = cbg.CborReadHeader(br)
if err != nil {
return err
}
if maj != cbg.MajUnsignedInt {
return fmt.Errorf("wrong type for uint64 field")
}
t.SectorSetSize = extra
// t.t.ProvingSet (cid.Cid) // t.t.ProvingSet (cid.Cid)
{ {
@ -355,16 +335,6 @@ func (t *StorageMinerActorState) UnmarshalCBOR(r io.Reader) error {
t.ProvingSet = c t.ProvingSet = c
} }
// t.t.ProvingSetSize (uint64)
maj, extra, err = cbg.CborReadHeader(br)
if err != nil {
return err
}
if maj != cbg.MajUnsignedInt {
return fmt.Errorf("wrong type for uint64 field")
}
t.ProvingSetSize = extra
// t.t.CurrentFaultSet (types.BitField) // t.t.CurrentFaultSet (types.BitField)
{ {
@ -759,7 +729,20 @@ func (t *SubmitPoStParams) MarshalCBOR(w io.Writer) error {
_, err := w.Write(cbg.CborNull) _, err := w.Write(cbg.CborNull)
return err return err
} }
if _, err := w.Write([]byte{128}); err != nil { if _, err := w.Write([]byte{130}); err != nil {
return err
}
// t.t.Proof ([]uint8)
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajByteString, uint64(len(t.Proof)))); err != nil {
return err
}
if _, err := w.Write(t.Proof); err != nil {
return err
}
// t.t.DoneSet (types.BitField)
if err := t.DoneSet.MarshalCBOR(w); err != nil {
return err return err
} }
return nil return nil
@ -776,10 +759,36 @@ func (t *SubmitPoStParams) UnmarshalCBOR(r io.Reader) error {
return fmt.Errorf("cbor input should be of type array") return fmt.Errorf("cbor input should be of type array")
} }
if extra != 0 { if extra != 2 {
return fmt.Errorf("cbor input had wrong number of fields") return fmt.Errorf("cbor input had wrong number of fields")
} }
// t.t.Proof ([]uint8)
maj, extra, err = cbg.CborReadHeader(br)
if err != nil {
return err
}
if extra > 8192 {
return fmt.Errorf("array too large")
}
if maj != cbg.MajByteString {
return fmt.Errorf("expected byte array")
}
t.Proof = make([]byte, extra)
if _, err := io.ReadFull(br, t.Proof); err != nil {
return err
}
// t.t.DoneSet (types.BitField)
{
if err := t.DoneSet.UnmarshalCBOR(br); err != nil {
return err
}
}
return nil return nil
} }

View File

@ -158,7 +158,7 @@ func NewHarness(t *testing.T, options ...HarnessOpt) *Harness {
t.Fatal(err) t.Fatal(err)
} }
h.cs = store.NewChainStore(h.bs, nil) h.cs = store.NewChainStore(h.bs, nil)
h.vm, err = vm.NewVM(stateroot, 1, h.HI.Miner, h.cs) h.vm, err = vm.NewVM(stateroot, 1, nil, h.HI.Miner, h.cs)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

View File

@ -472,109 +472,3 @@ func (t *BSTipSet) UnmarshalCBOR(br io.Reader) error {
return nil return nil
} }
func (t *BlockMsg) MarshalCBOR(w io.Writer) error {
if _, err := w.Write([]byte{131}); err != nil {
return err
}
// t.t.Header (types.BlockHeader)
if err := t.Header.MarshalCBOR(w); err != nil {
return err
}
// t.t.BlsMessages ([]cid.Cid)
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajArray, uint64(len(t.BlsMessages)))); err != nil {
return err
}
for _, v := range t.BlsMessages {
if err := cbg.WriteCid(w, v); err != nil {
return xerrors.Errorf("failed writing cid field t.BlsMessages: %w", err)
}
}
// t.t.SecpkMessages ([]cid.Cid)
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajArray, uint64(len(t.SecpkMessages)))); err != nil {
return err
}
for _, v := range t.SecpkMessages {
if err := cbg.WriteCid(w, v); err != nil {
return xerrors.Errorf("failed writing cid field t.SecpkMessages: %w", err)
}
}
return nil
}
func (t *BlockMsg) UnmarshalCBOR(br io.Reader) error {
maj, extra, err := cbg.CborReadHeader(br)
if err != nil {
return err
}
if maj != cbg.MajArray {
return fmt.Errorf("cbor input should be of type array")
}
if extra != 3 {
return fmt.Errorf("cbor input had wrong number of fields")
}
// t.t.Header (types.BlockHeader)
t.Header = new(types.BlockHeader)
if err := t.Header.UnmarshalCBOR(br); err != nil {
return err
}
// t.t.BlsMessages ([]cid.Cid)
maj, extra, err = cbg.CborReadHeader(br)
if err != nil {
return err
}
if extra > 8192 {
return fmt.Errorf("array too large")
}
if maj != cbg.MajArray {
return fmt.Errorf("expected cbor array")
}
if extra > 0 {
t.BlsMessages = make([]cid.Cid, extra)
}
for i := 0; i < int(extra); i++ {
c, err := cbg.ReadCid(br)
if err != nil {
return xerrors.Errorf("reading cid field t.BlsMessages failed: %w", err)
}
t.BlsMessages[i] = c
}
// t.t.SecpkMessages ([]cid.Cid)
maj, extra, err = cbg.CborReadHeader(br)
if err != nil {
return err
}
if extra > 8192 {
return fmt.Errorf("array too large")
}
if maj != cbg.MajArray {
return fmt.Errorf("expected cbor array")
}
if extra > 0 {
t.SecpkMessages = make([]cid.Cid, extra)
}
for i := 0; i < int(extra); i++ {
c, err := cbg.ReadCid(br)
if err != nil {
return xerrors.Errorf("reading cid field t.SecpkMessages failed: %w", err)
}
t.SecpkMessages[i] = c
}
return nil
}

View File

@ -1,12 +1,17 @@
package events package events
import ( import (
"context"
"sync" "sync"
"time"
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log" logging "github.com/ipfs/go-log"
"golang.org/x/xerrors" "golang.org/x/xerrors"
"github.com/filecoin-project/go-lotus/api"
"github.com/filecoin-project/go-lotus/build" "github.com/filecoin-project/go-lotus/build"
"github.com/filecoin-project/go-lotus/chain/store"
"github.com/filecoin-project/go-lotus/chain/types" "github.com/filecoin-project/go-lotus/chain/types"
) )
@ -23,30 +28,32 @@ type heightHandler struct {
revert RevertHandler revert RevertHandler
} }
type eventChainStore interface { type eventApi interface {
SubscribeHeadChanges(f func(rev, app []*types.TipSet) error) ChainNotify(context.Context) (<-chan []*store.HeadChange, error)
ChainGetBlockMessages(context.Context, cid.Cid) (*api.BlockMessages, error)
GetHeaviestTipSet() *types.TipSet ChainGetTipSetByHeight(context.Context, uint64, *types.TipSet) (*types.TipSet, error)
MessagesForBlock(b *types.BlockHeader) ([]*types.Message, []*types.SignedMessage, error)
} }
type Events struct { type Events struct {
cs eventChainStore api eventApi
tsc *tipSetCache tsc *tipSetCache
lk sync.Mutex lk sync.Mutex
ready sync.WaitGroup
readyOnce sync.Once
heightEvents heightEvents
calledEvents calledEvents
} }
func NewEvents(cs eventChainStore) *Events { func NewEvents(ctx context.Context, api eventApi) *Events {
gcConfidence := 2 * build.ForkLengthThreshold gcConfidence := 2 * build.ForkLengthThreshold
tsc := newTSCache(gcConfidence) tsc := newTSCache(gcConfidence, api.ChainGetTipSetByHeight)
e := &Events{ e := &Events{
cs: cs, api: api,
tsc: tsc, tsc: tsc,
@ -60,7 +67,7 @@ func NewEvents(cs eventChainStore) *Events {
}, },
calledEvents: calledEvents{ calledEvents: calledEvents{
cs: cs, cs: api,
tsc: tsc, tsc: tsc,
gcConfidence: uint64(gcConfidence), gcConfidence: uint64(gcConfidence),
@ -72,14 +79,82 @@ func NewEvents(cs eventChainStore) *Events {
}, },
} }
_ = e.tsc.add(cs.GetHeaviestTipSet()) e.ready.Add(1)
cs.SubscribeHeadChanges(e.headChange)
go e.listenHeadChanges(ctx)
e.ready.Wait()
// TODO: cleanup/gc goroutine // TODO: cleanup/gc goroutine
return e return e
} }
func (e *Events) listenHeadChanges(ctx context.Context) {
for {
if err := e.listenHeadChangesOnce(ctx); err != nil {
log.Errorf("listen head changes errored: %s", err)
} else {
log.Warn("listenHeadChanges quit")
}
if ctx.Err() != nil {
log.Warnf("not restarting listenHeadChanges: context error: %s", ctx.Err())
return
}
time.Sleep(time.Second)
log.Info("restarting listenHeadChanges")
}
}
func (e *Events) listenHeadChangesOnce(ctx context.Context) error {
notifs, err := e.api.ChainNotify(ctx)
if err != nil {
// TODO: retry
return xerrors.Errorf("listenHeadChanges ChainNotify call failed: %w", err)
}
cur, ok := <-notifs // TODO: timeout?
if !ok {
return xerrors.Errorf("notification channel closed")
}
if len(cur) != 1 {
return xerrors.Errorf("unexpected initial head notification length: %d", len(cur))
}
if cur[0].Type != store.HCCurrent {
return xerrors.Errorf("expected first head notification type to be 'current', was '%s'", cur[0].Type)
}
if err := e.tsc.add(cur[0].Val); err != nil {
log.Warn("tsc.add: adding current tipset failed: %w", err)
}
e.readyOnce.Do(func() {
e.ready.Done()
})
for notif := range notifs {
var rev, app []*types.TipSet
for _, notif := range notif {
switch notif.Type {
case store.HCRevert:
rev = append(rev, notif.Val)
case store.HCApply:
app = append(app, notif.Val)
default:
log.Warnf("unexpected head change notification type: '%s'", notif.Type)
}
}
if err := e.headChange(rev, app); err != nil {
log.Warnf("headChange failed: %s", err)
}
}
return nil
}
func (e *Events) headChange(rev, app []*types.TipSet) error { func (e *Events) headChange(rev, app []*types.TipSet) error {
if len(app) == 0 { if len(app) == 0 {
return xerrors.New("events.headChange expected at least one applied tipset") return xerrors.New("events.headChange expected at least one applied tipset")

View File

@ -1,6 +1,7 @@
package events package events
import ( import (
"context"
"math" "math"
"sync" "sync"
@ -53,7 +54,7 @@ type queuedEvent struct {
} }
type calledEvents struct { type calledEvents struct {
cs eventChainStore cs eventApi
tsc *tipSetCache tsc *tipSetCache
gcConfidence uint64 gcConfidence uint64
@ -81,9 +82,6 @@ type callTuple struct {
} }
func (e *calledEvents) headChangeCalled(rev, app []*types.TipSet) error { func (e *calledEvents) headChangeCalled(rev, app []*types.TipSet) error {
e.lk.Lock()
defer e.lk.Unlock()
for _, ts := range rev { for _, ts := range rev {
e.handleReverts(ts) e.handleReverts(ts)
} }
@ -235,14 +233,15 @@ func (e *calledEvents) messagesForTs(ts *types.TipSet, consume func(*types.Messa
seen := map[cid.Cid]struct{}{} seen := map[cid.Cid]struct{}{}
for _, tsb := range ts.Blocks() { for _, tsb := range ts.Blocks() {
bmsgs, smsgs, err := e.cs.MessagesForBlock(tsb)
msgs, err := e.cs.ChainGetBlockMessages(context.TODO(), tsb.Cid())
if err != nil { if err != nil {
log.Errorf("messagesForTs MessagesForBlock failed (ts.H=%d, Bcid:%s, B.Mcid:%s): %s", ts.Height(), tsb.Cid(), tsb.Messages, err) log.Errorf("messagesForTs MessagesForBlock failed (ts.H=%d, Bcid:%s, B.Mcid:%s): %s", ts.Height(), tsb.Cid(), tsb.Messages, err)
// this is quite bad, but probably better than missing all the other updates // this is quite bad, but probably better than missing all the other updates
continue continue
} }
for _, m := range bmsgs { for _, m := range msgs.BlsMessages {
_, ok := seen[m.Cid()] _, ok := seen[m.Cid()]
if ok { if ok {
continue continue
@ -252,7 +251,7 @@ func (e *calledEvents) messagesForTs(ts *types.TipSet, consume func(*types.Messa
consume(m) consume(m)
} }
for _, m := range smsgs { for _, m := range msgs.SecpkMessages {
_, ok := seen[m.Message.Cid()] _, ok := seen[m.Message.Cid()]
if ok { if ok {
continue continue

View File

@ -20,10 +20,7 @@ type heightEvents struct {
} }
func (e *heightEvents) headChangeAt(rev, app []*types.TipSet) error { func (e *heightEvents) headChangeAt(rev, app []*types.TipSet) error {
e.lk.Lock() // highest tipset is always the first (see api.ReorgOps)
defer e.lk.Unlock()
// highest tipset is always the first (see cs.ReorgOps)
newH := app[0].Height() newH := app[0].Height()
for _, ts := range rev { for _, ts := range rev {

View File

@ -1,8 +1,12 @@
package events package events
import ( import (
"context"
"fmt" "fmt"
"github.com/filecoin-project/go-lotus/api"
"github.com/filecoin-project/go-lotus/chain/store"
"testing" "testing"
"time"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
"github.com/multiformats/go-multihash" "github.com/multiformats/go-multihash"
@ -30,8 +34,13 @@ type fakeCS struct {
tsc *tipSetCache tsc *tipSetCache
msgs map[cid.Cid]fakeMsg msgs map[cid.Cid]fakeMsg
blkMsgs map[cid.Cid]cid.Cid
sub func(rev, app []*types.TipSet) error sub func(rev, app []*types.TipSet)
}
func (fcs *fakeCS) ChainGetTipSetByHeight(context.Context, uint64, *types.TipSet) (*types.TipSet, error) {
panic("Not Implemented")
} }
func makeTs(t *testing.T, h uint64, msgcid cid.Cid) *types.TipSet { func makeTs(t *testing.T, h uint64, msgcid cid.Cid) *types.TipSet {
@ -50,24 +59,44 @@ func makeTs(t *testing.T, h uint64, msgcid cid.Cid) *types.TipSet {
return ts return ts
} }
func (fcs *fakeCS) SubscribeHeadChanges(f func(rev, app []*types.TipSet) error) { func (fcs *fakeCS) ChainNotify(context.Context) (<-chan []*store.HeadChange, error) {
if fcs.sub != nil { out := make(chan []*store.HeadChange, 1)
fcs.t.Fatal("sub should be nil") out <- []*store.HeadChange{{Type: store.HCCurrent, Val: fcs.tsc.best()}}
fcs.sub = func(rev, app []*types.TipSet) {
notif := make([]*store.HeadChange, len(rev)+len(app))
for i, r := range rev {
notif[i] = &store.HeadChange{
Type: store.HCRevert,
Val: r,
}
}
for i, r := range app {
notif[i+len(rev)] = &store.HeadChange{
Type: store.HCApply,
Val: r,
} }
fcs.sub = f
}
func (fcs *fakeCS) GetHeaviestTipSet() *types.TipSet {
return fcs.tsc.best()
}
func (fcs *fakeCS) MessagesForBlock(b *types.BlockHeader) ([]*types.Message, []*types.SignedMessage, error) {
ms, ok := fcs.msgs[b.Messages]
if ok {
return ms.bmsgs, ms.smsgs, nil
} }
return []*types.Message{}, []*types.SignedMessage{}, nil out <- notif
}
return out, nil
}
func (fcs *fakeCS) ChainGetBlockMessages(ctx context.Context, blk cid.Cid) (*api.BlockMessages, error) {
messages, ok := fcs.blkMsgs[blk]
if !ok {
return &api.BlockMessages{}, nil
}
ms, ok := fcs.msgs[messages]
if !ok {
return &api.BlockMessages{}, nil
}
return &api.BlockMessages{BlsMessages: ms.bmsgs, SecpkMessages: ms.smsgs}, nil
} }
func (fcs *fakeCS) fakeMsgs(m fakeMsg) cid.Cid { func (fcs *fakeCS) fakeMsgs(m fakeMsg) cid.Cid {
@ -102,32 +131,36 @@ func (fcs *fakeCS) advance(rev, app int, msgs map[int]cid.Cid) { // todo: allow
for i := 0; i < app; i++ { for i := 0; i < app; i++ {
fcs.h++ fcs.h++
mc, _ := msgs[i] mc, hasMsgs := msgs[i]
if mc == cid.Undef { if !hasMsgs {
mc = dummyCid mc = dummyCid
} }
ts := makeTs(fcs.t, fcs.h, mc) ts := makeTs(fcs.t, fcs.h, mc)
require.NoError(fcs.t, fcs.tsc.add(ts)) require.NoError(fcs.t, fcs.tsc.add(ts))
if hasMsgs {
fcs.blkMsgs[ts.Blocks()[0].Cid()] = mc
}
apps[app-i-1] = ts apps[app-i-1] = ts
} }
err := fcs.sub(revs, apps) fcs.sub(revs, apps)
require.NoError(fcs.t, err) time.Sleep(100 * time.Millisecond) // TODO: :c
} }
var _ eventChainStore = &fakeCS{} var _ eventApi = &fakeCS{}
func TestAt(t *testing.T) { func TestAt(t *testing.T) {
fcs := &fakeCS{ fcs := &fakeCS{
t: t, t: t,
h: 1, h: 1,
tsc: newTSCache(2 * build.ForkLengthThreshold), tsc: newTSCache(2*build.ForkLengthThreshold, nil),
} }
require.NoError(t, fcs.tsc.add(makeTs(t, 1, dummyCid))) require.NoError(t, fcs.tsc.add(makeTs(t, 1, dummyCid)))
events := NewEvents(fcs) events := NewEvents(context.Background(), fcs)
var applied bool var applied bool
var reverted bool var reverted bool
@ -178,17 +211,82 @@ func TestAt(t *testing.T) {
require.Equal(t, false, reverted) require.Equal(t, false, reverted)
} }
func TestAtStart(t *testing.T) {
fcs := &fakeCS{
t: t,
h: 1,
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
}
require.NoError(t, fcs.tsc.add(makeTs(t, 1, dummyCid)))
events := NewEvents(context.Background(), fcs)
fcs.advance(0, 5, nil) // 6
var applied bool
var reverted bool
err := events.ChainAt(func(ts *types.TipSet, curH uint64) error {
require.Equal(t, 5, int(ts.Height()))
require.Equal(t, 8, int(curH))
applied = true
return nil
}, func(ts *types.TipSet) error {
reverted = true
return nil
}, 3, 5)
require.NoError(t, err)
require.Equal(t, false, applied)
require.Equal(t, false, reverted)
fcs.advance(0, 5, nil) // 11
require.Equal(t, true, applied)
require.Equal(t, false, reverted)
}
func TestAtStartConfidence(t *testing.T) {
fcs := &fakeCS{
t: t,
h: 1,
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
}
require.NoError(t, fcs.tsc.add(makeTs(t, 1, dummyCid)))
events := NewEvents(context.Background(), fcs)
fcs.advance(0, 10, nil) // 11
var applied bool
var reverted bool
err := events.ChainAt(func(ts *types.TipSet, curH uint64) error {
require.Equal(t, 5, int(ts.Height()))
require.Equal(t, 11, int(curH))
applied = true
return nil
}, func(ts *types.TipSet) error {
reverted = true
return nil
}, 3, 5)
require.NoError(t, err)
require.Equal(t, true, applied)
require.Equal(t, false, reverted)
}
func TestCalled(t *testing.T) { func TestCalled(t *testing.T) {
fcs := &fakeCS{ fcs := &fakeCS{
t: t, t: t,
h: 1, h: 1,
msgs: map[cid.Cid]fakeMsg{}, msgs: map[cid.Cid]fakeMsg{},
tsc: newTSCache(2 * build.ForkLengthThreshold), blkMsgs: map[cid.Cid]cid.Cid{},
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
} }
require.NoError(t, fcs.tsc.add(makeTs(t, 1, dummyCid))) require.NoError(t, fcs.tsc.add(makeTs(t, 1, dummyCid)))
events := NewEvents(fcs) events := NewEvents(context.Background(), fcs)
t0123, err := address.NewFromString("t0123") t0123, err := address.NewFromString("t0123")
require.NoError(t, err) require.NoError(t, err)
@ -381,11 +479,12 @@ func TestCalledTimeout(t *testing.T) {
h: 1, h: 1,
msgs: map[cid.Cid]fakeMsg{}, msgs: map[cid.Cid]fakeMsg{},
tsc: newTSCache(2 * build.ForkLengthThreshold), blkMsgs: map[cid.Cid]cid.Cid{},
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
} }
require.NoError(t, fcs.tsc.add(makeTs(t, 1, dummyCid))) require.NoError(t, fcs.tsc.add(makeTs(t, 1, dummyCid)))
events := NewEvents(fcs) events := NewEvents(context.Background(), fcs)
t0123, err := address.NewFromString("t0123") t0123, err := address.NewFromString("t0123")
require.NoError(t, err) require.NoError(t, err)
@ -420,11 +519,12 @@ func TestCalledTimeout(t *testing.T) {
h: 1, h: 1,
msgs: map[cid.Cid]fakeMsg{}, msgs: map[cid.Cid]fakeMsg{},
tsc: newTSCache(2 * build.ForkLengthThreshold), blkMsgs: map[cid.Cid]cid.Cid{},
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
} }
require.NoError(t, fcs.tsc.add(makeTs(t, 1, dummyCid))) require.NoError(t, fcs.tsc.add(makeTs(t, 1, dummyCid)))
events = NewEvents(fcs) events = NewEvents(context.Background(), fcs)
err = events.Called(func(ts *types.TipSet) (d bool, m bool, e error) { err = events.Called(func(ts *types.TipSet) (d bool, m bool, e error) {
return true, true, nil return true, true, nil
@ -453,11 +553,12 @@ func TestCalledOrder(t *testing.T) {
h: 1, h: 1,
msgs: map[cid.Cid]fakeMsg{}, msgs: map[cid.Cid]fakeMsg{},
tsc: newTSCache(2 * build.ForkLengthThreshold), blkMsgs: map[cid.Cid]cid.Cid{},
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
} }
require.NoError(t, fcs.tsc.add(makeTs(t, 1, dummyCid))) require.NoError(t, fcs.tsc.add(makeTs(t, 1, dummyCid)))
events := NewEvents(fcs) events := NewEvents(context.Background(), fcs)
t0123, err := address.NewFromString("t0123") t0123, err := address.NewFromString("t0123")
require.NoError(t, err) require.NoError(t, err)

View File

@ -1,24 +1,31 @@
package events package events
import ( import (
"context"
"golang.org/x/xerrors" "golang.org/x/xerrors"
"github.com/filecoin-project/go-lotus/chain/types" "github.com/filecoin-project/go-lotus/chain/types"
) )
type tsByHFunc func(context.Context, uint64, *types.TipSet) (*types.TipSet, error)
// tipSetCache implements a simple ring-buffer cache to keep track of recent // tipSetCache implements a simple ring-buffer cache to keep track of recent
// tipsets // tipsets
type tipSetCache struct { type tipSetCache struct {
cache []*types.TipSet cache []*types.TipSet
start int start int
len int len int
storage tsByHFunc
} }
func newTSCache(cap int) *tipSetCache { func newTSCache(cap int, storage tsByHFunc) *tipSetCache {
return &tipSetCache{ return &tipSetCache{
cache: make([]*types.TipSet, cap), cache: make([]*types.TipSet, cap),
start: 0, start: 0,
len: 0, len: 0,
storage: storage,
} }
} }
@ -63,15 +70,14 @@ func (tsc *tipSetCache) get(height uint64) (*types.TipSet, error) {
return nil, xerrors.Errorf("tipSetCache.get: requested tipset not in cache (req: %d, cache head: %d)", height, headH) return nil, xerrors.Errorf("tipSetCache.get: requested tipset not in cache (req: %d, cache head: %d)", height, headH)
} }
tailH := tsc.cache[(tsc.start-tsc.len+1)%len(tsc.cache)].Height() clen := len(tsc.cache)
tailH := tsc.cache[((tsc.start-tsc.len+1)%clen+clen)%clen].Height()
if height < tailH { if height < tailH {
// TODO: we can try to walk parents, but that shouldn't happen in return tsc.storage(context.TODO(), height, tsc.cache[tailH])
// practice, so it's probably not worth implementing
return nil, xerrors.Errorf("tipSetCache.get: requested tipset not in cache (req: %d, cache tail: %d)", height, tailH)
} }
return tsc.cache[int(height-tailH+1)%len(tsc.cache)], nil return tsc.cache[(int(height-tailH+1)%clen+clen)%clen], nil
} }
func (tsc *tipSetCache) best() *types.TipSet { func (tsc *tipSetCache) best() *types.TipSet {

View File

@ -360,7 +360,7 @@ type mca struct {
} }
func (mca mca) ChainGetRandomness(ctx context.Context, pts *types.TipSet, ticks []*types.Ticket, lb int) ([]byte, error) { func (mca mca) ChainGetRandomness(ctx context.Context, pts *types.TipSet, ticks []*types.Ticket, lb int) ([]byte, error) {
return mca.sm.ChainStore().GetRandomness(ctx, pts, ticks, lb) return mca.sm.ChainStore().GetRandomness(ctx, pts.Cids(), ticks, int64(lb))
} }
func (mca mca) StateMinerPower(ctx context.Context, maddr address.Address, ts *types.TipSet) (api.MinerPower, error) { func (mca mca) StateMinerPower(ctx context.Context, maddr address.Address, ts *types.TipSet) (api.MinerPower, error) {

View File

@ -27,7 +27,8 @@ func MinerCreateBlock(ctx context.Context, sm *stmgr.StateManager, w *wallet.Wal
height := parents.Height() + uint64(len(tickets)) height := parents.Height() + uint64(len(tickets))
vmi, err := vm.NewVM(st, height, miner, sm.ChainStore()) r := vm.NewChainRand(sm.ChainStore(), parents.Cids(), parents.Height(), tickets)
vmi, err := vm.NewVM(st, height, r, miner, sm.ChainStore())
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -169,7 +169,7 @@ func mustEnc(i cbg.CBORMarshaler) []byte {
} }
func SetupStorageMiners(ctx context.Context, cs *store.ChainStore, sroot cid.Cid, gmcfg *GenMinerCfg) (cid.Cid, error) { func SetupStorageMiners(ctx context.Context, cs *store.ChainStore, sroot cid.Cid, gmcfg *GenMinerCfg) (cid.Cid, error) {
vm, err := vm.NewVM(sroot, 0, actors.NetworkAddress, cs) vm, err := vm.NewVM(sroot, 0, nil, actors.NetworkAddress, cs)
if err != nil { if err != nil {
return cid.Undef, xerrors.Errorf("failed to create NewVM: %w", err) return cid.Undef, xerrors.Errorf("failed to create NewVM: %w", err)
} }

View File

@ -122,7 +122,7 @@ func (mp *MessagePool) getNonceLocked(addr address.Address) (uint64, error) {
return mset.nextNonce, nil return mset.nextNonce, nil
} }
act, err := mp.sm.GetActor(addr) act, err := mp.sm.GetActor(addr, nil)
if err != nil { if err != nil {
return 0, err return 0, err
} }

View File

@ -12,8 +12,8 @@ import (
"golang.org/x/xerrors" "golang.org/x/xerrors"
) )
func (sm *StateManager) CallRaw(ctx context.Context, msg *types.Message, bstate cid.Cid, bheight uint64) (*types.MessageReceipt, error) { func (sm *StateManager) CallRaw(ctx context.Context, msg *types.Message, bstate cid.Cid, r vm.Rand, bheight uint64) (*types.MessageReceipt, error) {
vmi, err := vm.NewVM(bstate, bheight, actors.NetworkAddress, sm.cs) vmi, err := vm.NewVM(bstate, bheight, r, actors.NetworkAddress, sm.cs)
if err != nil { if err != nil {
return nil, xerrors.Errorf("failed to set up vm: %w", err) return nil, xerrors.Errorf("failed to set up vm: %w", err)
} }
@ -58,7 +58,9 @@ func (sm *StateManager) Call(ctx context.Context, msg *types.Message, ts *types.
return nil, err return nil, err
} }
return sm.CallRaw(ctx, msg, state, ts.Height()) r := vm.NewChainRand(sm.cs, ts.Cids(), ts.Height(), nil)
return sm.CallRaw(ctx, msg, state, r, ts.Height())
} }
var errHaltExecution = fmt.Errorf("halt") var errHaltExecution = fmt.Errorf("halt")

View File

@ -77,7 +77,14 @@ func (sm *StateManager) computeTipSetState(ctx context.Context, blks []*types.Bl
return cid.Undef, xerrors.Errorf("recursive TipSetState failed: %w", err) return cid.Undef, xerrors.Errorf("recursive TipSetState failed: %w", err)
} }
vmi, err := vm.NewVM(pstate, blks[0].Height, address.Undef, sm.cs) cids := make([]cid.Cid, len(blks))
for i, v := range blks {
cids[i] = v.Cid()
}
r := vm.NewChainRand(sm.cs, cids, blks[0].Height, nil)
vmi, err := vm.NewVM(pstate, blks[0].Height, r, address.Undef, sm.cs)
if err != nil { if err != nil {
return cid.Undef, xerrors.Errorf("instantiating VM failed: %w", err) return cid.Undef, xerrors.Errorf("instantiating VM failed: %w", err)
} }
@ -131,8 +138,11 @@ func (sm *StateManager) computeTipSetState(ctx context.Context, blks []*types.Bl
return vmi.Flush(ctx) return vmi.Flush(ctx)
} }
func (sm *StateManager) GetActor(addr address.Address) (*types.Actor, error) { func (sm *StateManager) GetActor(addr address.Address, ts *types.TipSet) (*types.Actor, error) {
ts := sm.cs.GetHeaviestTipSet() if ts == nil {
ts = sm.cs.GetHeaviestTipSet()
}
stcid, err := sm.TipSetState(ts.Cids()) stcid, err := sm.TipSetState(ts.Cids())
if err != nil { if err != nil {
return nil, xerrors.Errorf("tipset state: %w", err) return nil, xerrors.Errorf("tipset state: %w", err)
@ -147,8 +157,8 @@ func (sm *StateManager) GetActor(addr address.Address) (*types.Actor, error) {
return state.GetActor(addr) return state.GetActor(addr)
} }
func (sm *StateManager) GetBalance(addr address.Address) (types.BigInt, error) { func (sm *StateManager) GetBalance(addr address.Address, ts *types.TipSet) (types.BigInt, error) {
act, err := sm.GetActor(addr) act, err := sm.GetActor(addr, ts)
if err != nil { if err != nil {
return types.BigInt{}, xerrors.Errorf("get actor: %w", err) return types.BigInt{}, xerrors.Errorf("get actor: %w", err)
} }
@ -160,8 +170,8 @@ func (sm *StateManager) ChainStore() *store.ChainStore {
return sm.cs return sm.cs
} }
func (sm *StateManager) LoadActorState(ctx context.Context, a address.Address, out interface{}) (*types.Actor, error) { func (sm *StateManager) LoadActorState(ctx context.Context, a address.Address, out interface{}, ts *types.TipSet) (*types.Actor, error) {
act, err := sm.GetActor(a) act, err := sm.GetActor(a, ts)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -2,12 +2,18 @@ package stmgr
import ( import (
"context" "context"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/filecoin-project/go-lotus/api"
"github.com/filecoin-project/go-lotus/chain/actors" "github.com/filecoin-project/go-lotus/chain/actors"
"github.com/filecoin-project/go-lotus/chain/address" "github.com/filecoin-project/go-lotus/chain/address"
"github.com/filecoin-project/go-lotus/chain/types" "github.com/filecoin-project/go-lotus/chain/types"
amt "github.com/filecoin-project/go-amt-ipld"
cid "github.com/ipfs/go-cid" cid "github.com/ipfs/go-cid"
blockstore "github.com/ipfs/go-ipfs-blockstore"
cbor "github.com/ipfs/go-ipld-cbor"
"github.com/libp2p/go-libp2p-core/peer"
cbg "github.com/whyrusleeping/cbor-gen"
"golang.org/x/xerrors" "golang.org/x/xerrors"
) )
@ -16,7 +22,7 @@ func GetMinerWorker(ctx context.Context, sm *StateManager, st cid.Cid, maddr add
To: maddr, To: maddr,
From: maddr, From: maddr,
Method: actors.MAMethods.GetWorkerAddr, Method: actors.MAMethods.GetWorkerAddr,
}, st, 0) }, st, nil, 0)
if err != nil { if err != nil {
return address.Undef, xerrors.Errorf("callRaw failed: %w", err) return address.Undef, xerrors.Errorf("callRaw failed: %w", err)
} }
@ -42,7 +48,7 @@ func GetMinerOwner(ctx context.Context, sm *StateManager, st cid.Cid, maddr addr
To: maddr, To: maddr,
From: maddr, From: maddr,
Method: actors.MAMethods.GetOwner, Method: actors.MAMethods.GetOwner,
}, st, 0) }, st, nil, 0)
if err != nil { if err != nil {
return address.Undef, xerrors.Errorf("callRaw failed: %w", err) return address.Undef, xerrors.Errorf("callRaw failed: %w", err)
} }
@ -122,3 +128,59 @@ func GetMinerPeerID(ctx context.Context, sm *StateManager, ts *types.TipSet, mad
return peer.IDFromBytes(recp.Return) return peer.IDFromBytes(recp.Return)
} }
func GetMinerProvingPeriodEnd(ctx context.Context, sm *StateManager, ts *types.TipSet, maddr address.Address) (uint64, error) {
var mas actors.StorageMinerActorState
_, err := sm.LoadActorState(ctx, maddr, &mas, ts)
if err != nil {
return 0, xerrors.Errorf("failed to load miner actor state: %w", err)
}
return mas.ProvingPeriodEnd, nil
}
func GetMinerProvingSet(ctx context.Context, sm *StateManager, ts *types.TipSet, maddr address.Address) ([]*api.SectorInfo, error) {
var mas actors.StorageMinerActorState
_, err := sm.LoadActorState(ctx, maddr, &mas, ts)
if err != nil {
return nil, xerrors.Errorf("failed to load miner actor state: %w", err)
}
return LoadSectorsFromSet(ctx, sm.ChainStore().Blockstore(), mas.ProvingSet)
}
func GetMinerSectorSet(ctx context.Context, sm *StateManager, ts *types.TipSet, maddr address.Address) ([]*api.SectorInfo, error) {
var mas actors.StorageMinerActorState
_, err := sm.LoadActorState(ctx, maddr, &mas, ts)
if err != nil {
return nil, xerrors.Errorf("failed to load miner actor state: %w", err)
}
return LoadSectorsFromSet(ctx, sm.ChainStore().Blockstore(), mas.Sectors)
}
func LoadSectorsFromSet(ctx context.Context, bs blockstore.Blockstore, ssc cid.Cid) ([]*api.SectorInfo, error) {
blks := amt.WrapBlockstore(bs)
a, err := amt.LoadAMT(blks, ssc)
if err != nil {
return nil, err
}
var sset []*api.SectorInfo
if err := a.ForEach(func(i uint64, v *cbg.Deferred) error {
var comms [][]byte
if err := cbor.DecodeInto(v.Raw, &comms); err != nil {
return err
}
sset = append(sset, &api.SectorInfo{
SectorID: i,
CommR: comms[0],
CommD: comms[1],
})
return nil
}); err != nil {
return nil, err
}
return sset, nil
}

View File

@ -5,6 +5,7 @@ import (
"crypto/sha256" "crypto/sha256"
"encoding/json" "encoding/json"
"fmt" "fmt"
"github.com/filecoin-project/go-lotus/build"
"sync" "sync"
amt "github.com/filecoin-project/go-amt-ipld" amt "github.com/filecoin-project/go-amt-ipld"
@ -35,6 +36,7 @@ type ChainStore struct {
heaviest *types.TipSet heaviest *types.TipSet
bestTips *pubsub.PubSub bestTips *pubsub.PubSub
pubLk sync.Mutex
tstLk sync.Mutex tstLk sync.Mutex
tipsets map[uint64][]cid.Cid tipsets map[uint64][]cid.Cid
@ -51,18 +53,25 @@ func NewChainStore(bs bstore.Blockstore, ds dstore.Batching) *ChainStore {
} }
hcnf := func(rev, app []*types.TipSet) error { hcnf := func(rev, app []*types.TipSet) error {
for _, r := range rev { cs.pubLk.Lock()
cs.bestTips.Pub(&HeadChange{ defer cs.pubLk.Unlock()
notif := make([]*HeadChange, len(rev)+len(app))
for i, r := range rev {
notif[i] = &HeadChange{
Type: HCRevert, Type: HCRevert,
Val: r, Val: r,
}, "headchange")
} }
for _, r := range app { }
cs.bestTips.Pub(&HeadChange{ for i, r := range app {
notif[i+len(rev)] = &HeadChange{
Type: HCApply, Type: HCApply,
Val: r, Val: r,
}, "headchange")
} }
}
cs.bestTips.Pub(notif, "headchange")
return nil return nil
} }
@ -109,21 +118,10 @@ func (cs *ChainStore) writeHead(ts *types.TipSet) error {
return nil return nil
} }
func (cs *ChainStore) SubNewTips() chan *types.TipSet {
subch := cs.bestTips.Sub("best")
out := make(chan *types.TipSet)
go func() {
defer close(out)
for val := range subch {
out <- val.(*types.TipSet)
}
}()
return out
}
const ( const (
HCRevert = "revert" HCRevert = "revert"
HCApply = "apply" HCApply = "apply"
HCCurrent = "current"
) )
type HeadChange struct { type HeadChange struct {
@ -131,9 +129,18 @@ type HeadChange struct {
Val *types.TipSet Val *types.TipSet
} }
func (cs *ChainStore) SubHeadChanges(ctx context.Context) chan *HeadChange { func (cs *ChainStore) SubHeadChanges(ctx context.Context) chan []*HeadChange {
cs.pubLk.Lock()
subch := cs.bestTips.Sub("headchange") subch := cs.bestTips.Sub("headchange")
out := make(chan *HeadChange, 16) head := cs.GetHeaviestTipSet()
cs.pubLk.Unlock()
out := make(chan []*HeadChange, 16)
out <- []*HeadChange{{
Type: HCCurrent,
Val: head,
}}
go func() { go func() {
defer close(out) defer close(out)
for { for {
@ -143,8 +150,11 @@ func (cs *ChainStore) SubHeadChanges(ctx context.Context) chan *HeadChange {
log.Warn("chain head sub exit loop") log.Warn("chain head sub exit loop")
return return
} }
if len(out) > 0 {
log.Warnf("head change sub is slow, has %d buffered entries", len(out))
}
select { select {
case out <- val.(*HeadChange): case out <- val.([]*HeadChange):
case <-ctx.Done(): case <-ctx.Done():
} }
case <-ctx.Done(): case <-ctx.Done():
@ -597,10 +607,11 @@ func (cs *ChainStore) WaitForMessage(ctx context.Context, mcid cid.Cid) (cid.Cid
for { for {
select { select {
case val, ok := <-tsub: case notif, ok := <-tsub:
if !ok { if !ok {
return cid.Undef, nil, ctx.Err() return cid.Undef, nil, ctx.Err()
} }
for _, val := range notif {
switch val.Type { switch val.Type {
case HCRevert: case HCRevert:
continue continue
@ -613,6 +624,7 @@ func (cs *ChainStore) WaitForMessage(ctx context.Context, mcid cid.Cid) (cid.Cid
return bc, r, nil return bc, r, nil
} }
} }
}
case <-ctx.Done(): case <-ctx.Done():
return cid.Undef, nil, ctx.Err() return cid.Undef, nil, ctx.Err()
} }
@ -690,31 +702,36 @@ func (cs *ChainStore) TryFillTipSet(ts *types.TipSet) (*FullTipSet, error) {
return NewFullTipSet(out), nil return NewFullTipSet(out), nil
} }
func (cs *ChainStore) GetRandomness(ctx context.Context, pts *types.TipSet, tickets []*types.Ticket, lb int) ([]byte, error) { func (cs *ChainStore) GetRandomness(ctx context.Context, blks []cid.Cid, tickets []*types.Ticket, lb int64) ([]byte, error) {
if lb < len(tickets) { if lb < 0 {
return nil, fmt.Errorf("negative lookback parameters are not valid (got %d)", lb)
}
lt := int64(len(tickets))
if lb < lt {
log.Warn("self sampling randomness. this should be extremely rare, if you see this often it may be a bug") log.Warn("self sampling randomness. this should be extremely rare, if you see this often it may be a bug")
t := tickets[len(tickets)-(1+lb)] t := tickets[lt-(1+lb)]
return t.VDFResult, nil return t.VDFResult, nil
} }
nv := lb - len(tickets) nv := lb - lt
nextCids := pts.Cids()
for { for {
nts, err := cs.LoadTipSet(nextCids) nts, err := cs.LoadTipSet(blks)
if err != nil { if err != nil {
return nil, err return nil, err
} }
mtb := nts.MinTicketBlock() mtb := nts.MinTicketBlock()
if nv < len(mtb.Tickets) { lt := int64(len(mtb.Tickets))
t := mtb.Tickets[len(mtb.Tickets)-(1+nv)] if nv < lt {
t := mtb.Tickets[lt-(1+nv)]
log.Infof("Returning randomness: H:%d, t:%d, mtb:%s", nts.Height(), lt-(1+nv), mtb.Cid())
return t.VDFResult, nil return t.VDFResult, nil
} }
nv -= len(mtb.Tickets) nv -= lt
// special case for lookback behind genesis block // special case for lookback behind genesis block
// TODO(spec): this is not in the spec, need to sync that // TODO(spec): this is not in the spec, need to sync that
@ -723,13 +740,40 @@ func (cs *ChainStore) GetRandomness(ctx context.Context, pts *types.TipSet, tick
t := mtb.Tickets[0] t := mtb.Tickets[0]
rval := t.VDFResult rval := t.VDFResult
for i := 0; i < nv; i++ { for i := int64(0); i < nv; i++ {
h := sha256.Sum256(rval) h := sha256.Sum256(rval)
rval = h[:] rval = h[:]
} }
return rval, nil return rval, nil
} }
nextCids = mtb.Parents blks = mtb.Parents
}
}
func (cs *ChainStore) GetTipsetByHeight(ctx context.Context, h uint64, ts *types.TipSet) (*types.TipSet, error) {
if ts == nil {
ts = cs.GetHeaviestTipSet()
}
if h > ts.Height() {
return nil, xerrors.Errorf("looking for tipset with height less than start point")
}
if ts.Height()-h > build.ForkLengthThreshold {
log.Warnf("expensive call to GetTipsetByHeight, seeking %d levels", ts.Height()-h)
}
for {
mtb := ts.MinTicketBlock()
if h >= ts.Height()-uint64(len(mtb.Tickets)) {
return ts, nil
}
pts, err := cs.LoadTipSet(ts.Parents())
if err != nil {
return nil, err
}
ts = pts
} }
} }

View File

@ -2,6 +2,7 @@ package sub
import ( import (
"context" "context"
logging "github.com/ipfs/go-log" logging "github.com/ipfs/go-log"
pubsub "github.com/libp2p/go-libp2p-pubsub" pubsub "github.com/libp2p/go-libp2p-pubsub"
@ -23,7 +24,7 @@ func HandleIncomingBlocks(ctx context.Context, bsub *pubsub.Subscription, s *cha
continue continue
} }
blk, err := chain.DecodeBlockMsg(msg.GetData()) blk, err := types.DecodeBlockMsg(msg.GetData())
if err != nil { if err != nil {
log.Error("got invalid block over pubsub: ", err) log.Error("got invalid block over pubsub: ", err)
continue continue

View File

@ -313,7 +313,7 @@ func (syncer *Syncer) ValidateTipSet(ctx context.Context, fts *store.FullTipSet)
for _, b := range fts.Blocks { for _, b := range fts.Blocks {
if err := syncer.ValidateBlock(ctx, b); err != nil { if err := syncer.ValidateBlock(ctx, b); err != nil {
return err return xerrors.Errorf("validating block %s: %w", b.Cid(), err)
} }
} }
return nil return nil
@ -422,7 +422,7 @@ func (syncer *Syncer) ValidateBlock(ctx context.Context, b *types.FullBlock) err
return xerrors.Errorf("validating block tickets failed: %w", err) return xerrors.Errorf("validating block tickets failed: %w", err)
} }
rand, err := syncer.sm.ChainStore().GetRandomness(ctx, baseTs, h.Tickets, build.RandomnessLookback) rand, err := syncer.sm.ChainStore().GetRandomness(ctx, baseTs.Cids(), h.Tickets, build.RandomnessLookback)
if err != nil { if err != nil {
return xerrors.Errorf("failed to get randomness for verifying election proof: %w", err) return xerrors.Errorf("failed to get randomness for verifying election proof: %w", err)
} }
@ -440,7 +440,8 @@ func (syncer *Syncer) ValidateBlock(ctx context.Context, b *types.FullBlock) err
return xerrors.Errorf("miner created a block but was not a winner") return xerrors.Errorf("miner created a block but was not a winner")
} }
vmi, err := vm.NewVM(stateroot, h.Height, h.Miner, syncer.store) r := vm.NewChainRand(syncer.store, baseTs.Cids(), baseTs.Height(), h.Tickets)
vmi, err := vm.NewVM(stateroot, h.Height, r, h.Miner, syncer.store)
if err != nil { if err != nil {
return xerrors.Errorf("failed to instantiate VM: %w", err) return xerrors.Errorf("failed to instantiate VM: %w", err)
} }

View File

@ -10,7 +10,6 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/filecoin-project/go-lotus/api" "github.com/filecoin-project/go-lotus/api"
"github.com/filecoin-project/go-lotus/chain"
"github.com/filecoin-project/go-lotus/chain/gen" "github.com/filecoin-project/go-lotus/chain/gen"
"github.com/filecoin-project/go-lotus/chain/store" "github.com/filecoin-project/go-lotus/chain/store"
"github.com/filecoin-project/go-lotus/chain/types" "github.com/filecoin-project/go-lotus/chain/types"
@ -105,8 +104,8 @@ func (tu *syncTestUtil) mineNewBlock(src int) {
} }
} }
func fblkToBlkMsg(fb *types.FullBlock) *chain.BlockMsg { func fblkToBlkMsg(fb *types.FullBlock) *types.BlockMsg {
out := &chain.BlockMsg{ out := &types.BlockMsg{
Header: fb.Header, Header: fb.Header,
} }
@ -224,11 +223,13 @@ func (tu *syncTestUtil) waitUntilSync(from, to int) {
} }
// TODO: some sort of timeout? // TODO: some sort of timeout?
for c := range hc { for n := range hc {
for _, c := range n {
if c.Val.Equals(target) { if c.Val.Equals(target) {
return return
} }
} }
}
} }
/* /*

View File

@ -107,13 +107,12 @@ func (bi *BigInt) MarshalCBOR(w io.Writer) error {
return zero.MarshalCBOR(w) return zero.MarshalCBOR(w)
} }
tag := uint64(2)
if bi.Sign() < 0 { if bi.Sign() < 0 {
// right now we don't support negative integers. tag = 3
// In the spec, everything is listed as a Uint.
return fmt.Errorf("BigInt does not support negative integers")
} }
header := cbg.CborEncodeMajorType(cbg.MajTag, 2) header := cbg.CborEncodeMajorType(cbg.MajTag, tag)
if _, err := w.Write(header); err != nil { if _, err := w.Write(header); err != nil {
return err return err
} }
@ -138,10 +137,12 @@ func (bi *BigInt) UnmarshalCBOR(br io.Reader) error {
return err return err
} }
if maj != cbg.MajTag && extra != 2 { if maj != cbg.MajTag && extra != 2 && extra != 3 {
return fmt.Errorf("cbor input for big int was not a tagged big int") return fmt.Errorf("cbor input for big int was not a tagged big int")
} }
minus := extra & 1
maj, extra, err = cbg.CborReadHeader(br) maj, extra, err = cbg.CborReadHeader(br)
if err != nil { if err != nil {
return err return err
@ -161,6 +162,9 @@ func (bi *BigInt) UnmarshalCBOR(br io.Reader) error {
} }
bi.Int = big.NewInt(0).SetBytes(buf) bi.Int = big.NewInt(0).SetBytes(buf)
if minus > 0 {
bi.Int.Neg(bi.Int)
}
return nil return nil
} }

View File

@ -17,6 +17,14 @@ func NewBitField() BitField {
return BitField{bits: make(map[uint64]struct{})} return BitField{bits: make(map[uint64]struct{})}
} }
func BitFieldFromSet(setBits []uint64) BitField {
res := BitField{bits: make(map[uint64]struct{})}
for _, b := range setBits {
res.bits[b] = struct{}{}
}
return res
}
// Set ...s bit in the BitField // Set ...s bit in the BitField
func (bf BitField) Set(bit uint64) { func (bf BitField) Set(bit uint64) {
bf.bits[bit] = struct{}{} bf.bits[bit] = struct{}{}

View File

@ -1,15 +1,13 @@
package chain package types
import ( import (
"bytes" "bytes"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
"github.com/filecoin-project/go-lotus/chain/types"
) )
type BlockMsg struct { type BlockMsg struct {
Header *types.BlockHeader Header *BlockHeader
BlsMessages []cid.Cid BlsMessages []cid.Cid
SecpkMessages []cid.Cid SecpkMessages []cid.Cid
} }

View File

@ -1197,3 +1197,128 @@ func (t *MessageReceipt) UnmarshalCBOR(r io.Reader) error {
} }
return nil return nil
} }
func (t *BlockMsg) MarshalCBOR(w io.Writer) error {
if t == nil {
_, err := w.Write(cbg.CborNull)
return err
}
if _, err := w.Write([]byte{131}); err != nil {
return err
}
// t.t.Header (types.BlockHeader)
if err := t.Header.MarshalCBOR(w); err != nil {
return err
}
// t.t.BlsMessages ([]cid.Cid)
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajArray, uint64(len(t.BlsMessages)))); err != nil {
return err
}
for _, v := range t.BlsMessages {
if err := cbg.WriteCid(w, v); err != nil {
return xerrors.Errorf("failed writing cid field t.BlsMessages: %w", err)
}
}
// t.t.SecpkMessages ([]cid.Cid)
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajArray, uint64(len(t.SecpkMessages)))); err != nil {
return err
}
for _, v := range t.SecpkMessages {
if err := cbg.WriteCid(w, v); err != nil {
return xerrors.Errorf("failed writing cid field t.SecpkMessages: %w", err)
}
}
return nil
}
func (t *BlockMsg) UnmarshalCBOR(r io.Reader) error {
br := cbg.GetPeeker(r)
maj, extra, err := cbg.CborReadHeader(br)
if err != nil {
return err
}
if maj != cbg.MajArray {
return fmt.Errorf("cbor input should be of type array")
}
if extra != 3 {
return fmt.Errorf("cbor input had wrong number of fields")
}
// t.t.Header (types.BlockHeader)
{
pb, err := br.PeekByte()
if err != nil {
return err
}
if pb == cbg.CborNull[0] {
var nbuf [1]byte
if _, err := br.Read(nbuf[:]); err != nil {
return err
}
} else {
t.Header = new(BlockHeader)
if err := t.Header.UnmarshalCBOR(br); err != nil {
return err
}
}
}
// t.t.BlsMessages ([]cid.Cid)
maj, extra, err = cbg.CborReadHeader(br)
if err != nil {
return err
}
if extra > 8192 {
return fmt.Errorf("array too large")
}
if maj != cbg.MajArray {
return fmt.Errorf("expected cbor array")
}
if extra > 0 {
t.BlsMessages = make([]cid.Cid, extra)
}
for i := 0; i < int(extra); i++ {
c, err := cbg.ReadCid(br)
if err != nil {
return xerrors.Errorf("reading cid field t.BlsMessages failed: %w", err)
}
t.BlsMessages[i] = c
}
// t.t.SecpkMessages ([]cid.Cid)
maj, extra, err = cbg.CborReadHeader(br)
if err != nil {
return err
}
if extra > 8192 {
return fmt.Errorf("array too large")
}
if maj != cbg.MajArray {
return fmt.Errorf("expected cbor array")
}
if extra > 0 {
t.SecpkMessages = make([]cid.Cid, extra)
}
for i := 0; i < int(extra); i++ {
c, err := cbg.ReadCid(br)
if err != nil {
return xerrors.Errorf("reading cid field t.SecpkMessages failed: %w", err)
}
t.SecpkMessages[i] = c
}
return nil
}

View File

@ -128,3 +128,12 @@ func (ts *TipSet) MinTicketBlock() *BlockHeader {
return min return min
} }
func (ts *TipSet) Contains(oc cid.Cid) bool {
for _, c := range ts.cids {
if c == oc {
return true
}
}
return false
}

View File

@ -67,8 +67,8 @@ func (vmc *VMContext) Message() *types.Message {
} }
func (vmc *VMContext) GetRandomness(height uint64) ([]byte, aerrors.ActorError) { func (vmc *VMContext) GetRandomness(height uint64) ([]byte, aerrors.ActorError) {
relHeight := int(vmc.BlockHeight()) - int(height)
res, err := vmc.vm.cs.GetRandomness(vmc.ctx, vmc.vm.cs.GetHeaviestTipSet(), nil, relHeight) res, err := vmc.vm.rand.GetRandomness(vmc.ctx, int64(height))
if err != nil { if err != nil {
return nil, aerrors.Escalate(err, "could not get randomness") return nil, aerrors.Escalate(err, "could not get randomness")
} }
@ -288,9 +288,10 @@ type VM struct {
blockHeight uint64 blockHeight uint64
blockMiner address.Address blockMiner address.Address
inv *invoker inv *invoker
rand Rand
} }
func NewVM(base cid.Cid, height uint64, maddr address.Address, cs *store.ChainStore) (*VM, error) { func NewVM(base cid.Cid, height uint64, r Rand, maddr address.Address, cs *store.ChainStore) (*VM, error) {
buf := bufbstore.NewBufferedBstore(cs.Blockstore()) buf := bufbstore.NewBufferedBstore(cs.Blockstore())
cst := hamt.CSTFromBstore(buf) cst := hamt.CSTFromBstore(buf)
state, err := state.LoadStateTree(cst, base) state, err := state.LoadStateTree(cst, base)
@ -307,9 +308,35 @@ func NewVM(base cid.Cid, height uint64, maddr address.Address, cs *store.ChainSt
blockHeight: height, blockHeight: height,
blockMiner: maddr, blockMiner: maddr,
inv: newInvoker(), inv: newInvoker(),
rand: r,
}, nil }, nil
} }
type Rand interface {
GetRandomness(ctx context.Context, h int64) ([]byte, error)
}
type chainRand struct {
cs *store.ChainStore
blks []cid.Cid
bh uint64
tickets []*types.Ticket
}
func NewChainRand(cs *store.ChainStore, blks []cid.Cid, bheight uint64, tickets []*types.Ticket) Rand {
return &chainRand{
cs: cs,
blks: blks,
bh: bheight,
tickets: tickets,
}
}
func (cr *chainRand) GetRandomness(ctx context.Context, h int64) ([]byte, error) {
lb := (int64(cr.bh) + int64(len(cr.tickets))) - h
return cr.cs.GetRandomness(ctx, cr.blks, cr.tickets, lb)
}
type ApplyRet struct { type ApplyRet struct {
types.MessageReceipt types.MessageReceipt
ActorErr aerrors.ActorError ActorErr aerrors.ActorError

View File

@ -106,7 +106,7 @@ var stateProvingSetCmd = &cli.Command{
return err return err
} }
sectors, err := api.StateMinerProvingSet(ctx, maddr) sectors, err := api.StateMinerProvingSet(ctx, maddr, nil)
if err != nil { if err != nil {
return err return err
} }

View File

@ -21,6 +21,7 @@ func main() {
types.Merge{}, types.Merge{},
types.Actor{}, types.Actor{},
types.MessageReceipt{}, types.MessageReceipt{},
types.BlockMsg{},
) )
if err != nil { if err != nil {
fmt.Println(err) fmt.Println(err)
@ -32,7 +33,6 @@ func main() {
chain.BlockSyncRequest{}, chain.BlockSyncRequest{},
chain.BlockSyncResponse{}, chain.BlockSyncResponse{},
chain.BSTipSet{}, chain.BSTipSet{},
chain.BlockMsg{},
) )
if err != nil { if err != nil {
fmt.Println(err) fmt.Println(err)

View File

@ -17,6 +17,10 @@ type SectorSealingStatus = sectorbuilder.SectorSealingStatus
type StagedSectorMetadata = sectorbuilder.StagedSectorMetadata type StagedSectorMetadata = sectorbuilder.StagedSectorMetadata
type SortedSectorInfo = sectorbuilder.SortedSectorInfo
type SectorInfo = sectorbuilder.SectorInfo
const CommLen = sectorbuilder.CommitmentBytesLen const CommLen = sectorbuilder.CommitmentBytesLen
type SectorBuilder struct { type SectorBuilder struct {
@ -81,7 +85,7 @@ func (sb *SectorBuilder) GetAllStagedSectors() ([]StagedSectorMetadata, error) {
return sectorbuilder.GetAllStagedSectors(sb.handle) return sectorbuilder.GetAllStagedSectors(sb.handle)
} }
func (sb *SectorBuilder) GeneratePoSt(sectorInfo sectorbuilder.SortedSectorInfo, challengeSeed [CommLen]byte, faults []uint64) ([]byte, error) { func (sb *SectorBuilder) GeneratePoSt(sectorInfo SortedSectorInfo, challengeSeed [CommLen]byte, faults []uint64) ([]byte, error) {
// Wait, this is a blocking method with no way of interrupting it? // Wait, this is a blocking method with no way of interrupting it?
// does it checkpoint itself? // does it checkpoint itself?
return sectorbuilder.GeneratePoSt(sb.handle, sectorInfo, challengeSeed, faults) return sectorbuilder.GeneratePoSt(sb.handle, sectorInfo, challengeSeed, faults)
@ -107,9 +111,6 @@ func VerifyPieceInclusionProof(sectorSize uint64, pieceSize uint64, commP []byte
return sectorbuilder.VerifyPieceInclusionProof(sectorSize, pieceSize, commPa, commDa, proof) return sectorbuilder.VerifyPieceInclusionProof(sectorSize, pieceSize, commPa, commDa, proof)
} }
type SortedSectorInfo = sectorbuilder.SortedSectorInfo
type SectorInfo = sectorbuilder.SectorInfo
func NewSortedSectorInfo(sectors []SectorInfo) SortedSectorInfo { func NewSortedSectorInfo(sectors []SectorInfo) SortedSectorInfo {
return sectorbuilder.NewSortedSectorInfo(sectors...) return sectorbuilder.NewSortedSectorInfo(sectors...)
} }

View File

@ -84,16 +84,16 @@ class Address extends React.Component {
return info return info
} }
add10k = async () => { add200k = async () => {
[...Array(10).keys()].map(() => async () => await this.props.add1k(this.props.addr)).reduce(async (p, c) => [await p, await c()], Promise.resolve(null)) [...Array(10).keys()].map(() => async () => await this.props.add20k(this.props.addr)).reduce(async (p, c) => [await p, await c()], Promise.resolve(null))
} }
render() { render() {
let add1k = <span/> let add20k = <span/>
if(this.props.add1k) { if(this.props.add20k) {
add1k = <span>&nbsp;<a href="#" onClick={() => this.props.add1k(this.props.addr)}>[+1k]</a></span> add20k = <span>&nbsp;<a href="#" onClick={() => this.props.add20k(this.props.addr)}>[+20k]</a></span>
if (this.props.add10k) { if (this.props.add10k) {
add1k = <span>{add1k}&nbsp;<a href="#" onClick={this.add10k}>[+10k]</a></span> add20k = <span>{add20k}&nbsp;<a href="#" onClick={this.add200k}>[+200k]</a></span>
} }
} }
let addr = truncAddr(this.props.addr, this.props.short ? 12 : 17) let addr = truncAddr(this.props.addr, this.props.short ? 12 : 17)
@ -133,7 +133,7 @@ class Address extends React.Component {
minerInfo = <span>&nbsp;Power: {this.state.minerInfo.MinerPower} ({this.state.minerInfo.MinerPower/this.state.minerInfo.TotalPower*100}%)</span> minerInfo = <span>&nbsp;Power: {this.state.minerInfo.MinerPower} ({this.state.minerInfo.MinerPower/this.state.minerInfo.TotalPower*100}%)</span>
} }
return <span>{addr}{balance}{actInfo}{nonce}{add1k}{transfer}{minerInfo}</span> return <span>{addr}{balance}{actInfo}{nonce}{add20k}{transfer}{minerInfo}</span>
} }
} }

View File

@ -15,7 +15,7 @@ class FullNode extends React.Component {
this.newSecpAddr = this.newSecpAddr.bind(this) this.newSecpAddr = this.newSecpAddr.bind(this)
this.newBLSAddr = this.newBLSAddr.bind(this) this.newBLSAddr = this.newBLSAddr.bind(this)
this.startStorageMiner = this.startStorageMiner.bind(this) this.startStorageMiner = this.startStorageMiner.bind(this)
this.add1k = this.add1k.bind(this) this.add20k = this.add20k.bind(this)
this.explorer = this.explorer.bind(this) this.explorer = this.explorer.bind(this)
this.client = this.client.bind(this) this.client = this.client.bind(this)
this.stop = this.stop.bind(this) this.stop = this.stop.bind(this)
@ -84,8 +84,8 @@ class FullNode extends React.Component {
this.props.spawnStorageNode(this.props.node.Repo, this.props.client) this.props.spawnStorageNode(this.props.node.Repo, this.props.client)
} }
async add1k(to) { async add20k(to) {
await this.props.give1k(to) await this.props.give20k(to)
} }
explorer() { explorer() {
@ -123,14 +123,14 @@ class FullNode extends React.Component {
let storageMine = <a href="#" onClick={this.startStorageMiner} hidden={!this.props.spawnStorageNode}>[Spawn Storage Miner]</a> let storageMine = <a href="#" onClick={this.startStorageMiner} hidden={!this.props.spawnStorageNode}>[Spawn Storage Miner]</a>
let addresses = this.state.addrs.map((addr) => { let addresses = this.state.addrs.map((addr) => {
let line = <Address client={this.props.client} add1k={this.add1k} add10k={true} nonce={true} addr={addr} mountWindow={this.props.mountWindow}/> let line = <Address client={this.props.client} add20k={this.add20k} add10k={true} nonce={true} addr={addr} mountWindow={this.props.mountWindow}/>
if (this.state.defaultAddr === addr) { if (this.state.defaultAddr === addr) {
line = <b>{line}</b> line = <b>{line}</b>
} }
return <div key={addr}>{line}</div> return <div key={addr}>{line}</div>
}) })
let paychannels = this.state.paychs.map((addr, ak) => { let paychannels = this.state.paychs.map((addr, ak) => {
const line = <Address client={this.props.client} add1k={this.add1k} add10k={true} addr={addr} mountWindow={this.props.mountWindow}/> const line = <Address client={this.props.client} add20k={this.add20k} add10k={true} addr={addr} mountWindow={this.props.mountWindow}/>
const vouchers = this.state.vouchers[ak].map(voucher => { const vouchers = this.state.vouchers[ak].map(voucher => {
let extra = <span></span> let extra = <span></span>
if(voucher.Extra) { if(voucher.Extra) {

View File

@ -27,7 +27,7 @@ class NodeList extends React.Component {
this.spawnStorageNode = this.spawnStorageNode.bind(this) this.spawnStorageNode = this.spawnStorageNode.bind(this)
this.connMgr = this.connMgr.bind(this) this.connMgr = this.connMgr.bind(this)
this.consensus = this.consensus.bind(this) this.consensus = this.consensus.bind(this)
this.transfer1kFrom1 = this.transfer1kFrom1.bind(this) this.transfer20kFrom1 = this.transfer20kFrom1.bind(this)
this.getNodes() this.getNodes()
} }
@ -52,7 +52,7 @@ class NodeList extends React.Component {
node={{...node}} node={{...node}}
client={client} client={client}
pondClient={this.props.client} pondClient={this.props.client}
give1k={this.transfer1kFrom1} give20k={this.transfer20kFrom1}
mountWindow={this.props.mountWindow} mountWindow={this.props.mountWindow}
spawnStorageNode={this.spawnStorageNode} spawnStorageNode={this.spawnStorageNode}
stop={this.stopNode(node.ID, onClose)} stop={this.stopNode(node.ID, onClose)}
@ -81,7 +81,7 @@ class NodeList extends React.Component {
this.setState({existingLoaded: true, nodes: nodes}) this.setState({existingLoaded: true, nodes: nodes})
} }
async transfer1kFrom1(to) { async transfer20kFrom1(to) {
const addrss = await this.state.nodes[1].conn.call('Filecoin.WalletList', []) const addrss = await this.state.nodes[1].conn.call('Filecoin.WalletList', [])
const [bestaddr, bal] = await addrss.map(async addr => { const [bestaddr, bal] = await addrss.map(async addr => {
let balance = 0 let balance = 0
@ -96,7 +96,7 @@ class NodeList extends React.Component {
await pushMessage(this.state.nodes[1].conn, bestaddr, { await pushMessage(this.state.nodes[1].conn, bestaddr, {
To: to, To: to,
From: bestaddr, From: bestaddr,
Value: "1000", Value: "20000",
}) })
} }

View File

@ -59,7 +59,6 @@ class StorageNode extends React.Component {
// this.props.onConnect(client, id) // TODO: dedupe connecting part // this.props.onConnect(client, id) // TODO: dedupe connecting part
this.loadInfo()
let updates = setInterval(this.loadInfo, 1050) let updates = setInterval(this.loadInfo, 1050)
client.on('close', () => clearInterval(updates)) client.on('close', () => clearInterval(updates))
}) })
@ -72,7 +71,10 @@ class StorageNode extends React.Component {
const peers = await this.state.client.call("Filecoin.NetPeers", []) const peers = await this.state.client.call("Filecoin.NetPeers", [])
const [actor] = await this.state.client.call("Filecoin.ActorAddresses", []) const [actor] = await this.state.client.call("Filecoin.ActorAddresses", [])
this.setState({version: version, peers: peers.length, actor: actor}) const stActor = await this.props.fullConn.call('Filecoin.StateGetActor', [actor, null])
const actorState = await this.props.fullConn.call('Filecoin.StateReadState', [stActor, null])
this.setState({version: version, peers: peers.length, actor: actor, actorState: actorState})
await this.stagedList() await this.stagedList()
} }
@ -109,6 +111,7 @@ class StorageNode extends React.Component {
</div> </div>
<div> <div>
<Address client={this.props.fullConn} addr={this.state.actor} mountWindow={this.props.mountWindow}/> <Address client={this.props.fullConn} addr={this.state.actor} mountWindow={this.props.mountWindow}/>
<span>&nbsp;<abbr title="Proving period end">PPE:</abbr> <b>{this.state.actorState.State.ProvingPeriodEnd}</b></span>
</div> </div>
<div>{this.state.statusCounts.map((c, i) => <span key={i}>{sealCodes[i]}: {c} | </span>)}</div> <div>{this.state.statusCounts.map((c, i) => <span key={i}>{sealCodes[i]}: {c} | </span>)}</div>
<div> <div>

View File

@ -6,7 +6,6 @@ import (
"time" "time"
"github.com/filecoin-project/go-lotus/build" "github.com/filecoin-project/go-lotus/build"
chain "github.com/filecoin-project/go-lotus/chain"
"github.com/filecoin-project/go-lotus/chain/actors" "github.com/filecoin-project/go-lotus/chain/actors"
"github.com/filecoin-project/go-lotus/chain/address" "github.com/filecoin-project/go-lotus/chain/address"
"github.com/filecoin-project/go-lotus/chain/gen" "github.com/filecoin-project/go-lotus/chain/gen"
@ -191,7 +190,7 @@ func (m *Miner) GetBestMiningCandidate() (*MiningBase, error) {
}, nil }, nil
} }
func (m *Miner) mineOne(ctx context.Context, base *MiningBase) (*chain.BlockMsg, error) { func (m *Miner) mineOne(ctx context.Context, base *MiningBase) (*types.BlockMsg, error) {
log.Debug("attempting to mine a block on:", base.ts.Cids()) log.Debug("attempting to mine a block on:", base.ts.Cids())
ticket, err := m.scratchTicket(ctx, base) ticket, err := m.scratchTicket(ctx, base)
if err != nil { if err != nil {
@ -288,7 +287,7 @@ func (m *Miner) scratchTicket(ctx context.Context, base *MiningBase) (*types.Tic
}, nil }, nil
} }
func (m *Miner) createBlock(base *MiningBase, ticket *types.Ticket, proof types.ElectionProof) (*chain.BlockMsg, error) { func (m *Miner) createBlock(base *MiningBase, ticket *types.Ticket, proof types.ElectionProof) (*types.BlockMsg, error) {
pending, err := m.api.MpoolPending(context.TODO(), base.ts) pending, err := m.api.MpoolPending(context.TODO(), base.ts)
if err != nil { if err != nil {

View File

@ -4,7 +4,6 @@ import (
"context" "context"
"github.com/filecoin-project/go-lotus/api" "github.com/filecoin-project/go-lotus/api"
"github.com/filecoin-project/go-lotus/chain"
"github.com/filecoin-project/go-lotus/chain/store" "github.com/filecoin-project/go-lotus/chain/store"
"github.com/filecoin-project/go-lotus/chain/types" "github.com/filecoin-project/go-lotus/chain/types"
"golang.org/x/xerrors" "golang.org/x/xerrors"
@ -23,11 +22,11 @@ type ChainAPI struct {
PubSub *pubsub.PubSub PubSub *pubsub.PubSub
} }
func (a *ChainAPI) ChainNotify(ctx context.Context) (<-chan *store.HeadChange, error) { func (a *ChainAPI) ChainNotify(ctx context.Context) (<-chan []*store.HeadChange, error) {
return a.Chain.SubHeadChanges(ctx), nil return a.Chain.SubHeadChanges(ctx), nil
} }
func (a *ChainAPI) ChainSubmitBlock(ctx context.Context, blk *chain.BlockMsg) error { func (a *ChainAPI) ChainSubmitBlock(ctx context.Context, blk *types.BlockMsg) error {
if err := a.Chain.AddBlock(blk.Header); err != nil { if err := a.Chain.AddBlock(blk.Header); err != nil {
return xerrors.Errorf("AddBlock failed: %w", err) return xerrors.Errorf("AddBlock failed: %w", err)
} }
@ -46,7 +45,7 @@ func (a *ChainAPI) ChainHead(context.Context) (*types.TipSet, error) {
} }
func (a *ChainAPI) ChainGetRandomness(ctx context.Context, pts *types.TipSet, tickets []*types.Ticket, lb int) ([]byte, error) { func (a *ChainAPI) ChainGetRandomness(ctx context.Context, pts *types.TipSet, tickets []*types.Ticket, lb int) ([]byte, error) {
return a.Chain.GetRandomness(ctx, pts, tickets, lb) return a.Chain.GetRandomness(ctx, pts.Cids(), tickets, int64(lb))
} }
func (a *ChainAPI) ChainWaitMsg(ctx context.Context, msg cid.Cid) (*api.MsgWait, error) { func (a *ChainAPI) ChainWaitMsg(ctx context.Context, msg cid.Cid) (*api.MsgWait, error) {
@ -108,3 +107,7 @@ func (a *ChainAPI) ChainGetBlockReceipts(ctx context.Context, bcid cid.Cid) ([]*
return out, nil return out, nil
} }
func (a *ChainAPI) ChainGetTipSetByHeight(ctx context.Context, h uint64, ts *types.TipSet) (*types.TipSet, error) {
return a.Chain.GetTipsetByHeight(ctx, h, ts)
}

View File

@ -2,18 +2,14 @@ package full
import ( import (
"context" "context"
"fmt"
"strconv"
cid "github.com/ipfs/go-cid" cid "github.com/ipfs/go-cid"
"github.com/ipfs/go-hamt-ipld" "github.com/ipfs/go-hamt-ipld"
cbor "github.com/ipfs/go-ipld-cbor"
"github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peer"
"go.uber.org/fx" "go.uber.org/fx"
"golang.org/x/xerrors" "golang.org/x/xerrors"
"github.com/filecoin-project/go-lotus/api" "github.com/filecoin-project/go-lotus/api"
"github.com/filecoin-project/go-lotus/chain"
"github.com/filecoin-project/go-lotus/chain/actors" "github.com/filecoin-project/go-lotus/chain/actors"
"github.com/filecoin-project/go-lotus/chain/address" "github.com/filecoin-project/go-lotus/chain/address"
"github.com/filecoin-project/go-lotus/chain/gen" "github.com/filecoin-project/go-lotus/chain/gen"
@ -38,119 +34,11 @@ type StateAPI struct {
} }
func (a *StateAPI) StateMinerSectors(ctx context.Context, addr address.Address) ([]*api.SectorInfo, error) { func (a *StateAPI) StateMinerSectors(ctx context.Context, addr address.Address) ([]*api.SectorInfo, error) {
ts := a.StateManager.ChainStore().GetHeaviestTipSet() return stmgr.GetMinerSectorSet(ctx, a.StateManager, nil, addr)
stc, err := a.StateManager.TipSetState(ts.Cids())
if err != nil {
return nil, err
}
cst := hamt.CSTFromBstore(a.StateManager.ChainStore().Blockstore())
st, err := state.LoadStateTree(cst, stc)
if err != nil {
return nil, err
}
act, err := st.GetActor(addr)
if err != nil {
return nil, err
}
var minerState actors.StorageMinerActorState
if err := cst.Get(ctx, act.Head, &minerState); err != nil {
return nil, err
}
nd, err := hamt.LoadNode(ctx, cst, minerState.Sectors)
if err != nil {
return nil, err
}
var sinfos []*api.SectorInfo
// Note to self: the hamt isnt a great data structure to use here... need to implement the sector set
err = nd.ForEach(ctx, func(k string, val interface{}) error {
sid, err := strconv.ParseUint(k, 10, 64)
if err != nil {
return err
}
bval, ok := val.([]byte)
if !ok {
return fmt.Errorf("expected to get bytes in sector set hamt")
}
var comms [][]byte
if err := cbor.DecodeInto(bval, &comms); err != nil {
return err
}
sinfos = append(sinfos, &api.SectorInfo{
SectorID: sid,
CommR: comms[0],
CommD: comms[1],
})
return nil
})
return sinfos, nil
} }
func (a *StateAPI) StateMinerProvingSet(ctx context.Context, addr address.Address) ([]*api.SectorInfo, error) { func (a *StateAPI) StateMinerProvingSet(ctx context.Context, addr address.Address, ts *types.TipSet) ([]*api.SectorInfo, error) {
ts := a.Chain.GetHeaviestTipSet() return stmgr.GetMinerProvingSet(ctx, a.StateManager, ts, addr)
stc, err := a.StateManager.TipSetState(ts.Cids())
if err != nil {
return nil, err
}
cst := hamt.CSTFromBstore(a.Chain.Blockstore())
st, err := state.LoadStateTree(cst, stc)
if err != nil {
return nil, err
}
act, err := st.GetActor(addr)
if err != nil {
return nil, err
}
var minerState actors.StorageMinerActorState
if err := cst.Get(ctx, act.Head, &minerState); err != nil {
return nil, err
}
nd, err := hamt.LoadNode(ctx, cst, minerState.ProvingSet)
if err != nil {
return nil, err
}
var sinfos []*api.SectorInfo
// Note to self: the hamt isnt a great data structure to use here... need to implement the sector set
err = nd.ForEach(ctx, func(k string, val interface{}) error {
sid, err := strconv.ParseUint(k, 10, 64)
if err != nil {
return err
}
bval, ok := val.([]byte)
if !ok {
return fmt.Errorf("expected to get bytes in sector set hamt")
}
var comms [][]byte
if err := cbor.DecodeInto(bval, &comms); err != nil {
return err
}
sinfos = append(sinfos, &api.SectorInfo{
SectorID: sid,
CommR: comms[0],
CommD: comms[1],
})
return nil
})
return sinfos, nil
} }
func (a *StateAPI) StateMinerPower(ctx context.Context, maddr address.Address, ts *types.TipSet) (api.MinerPower, error) { func (a *StateAPI) StateMinerPower(ctx context.Context, maddr address.Address, ts *types.TipSet) (api.MinerPower, error) {
@ -191,6 +79,10 @@ func (a *StateAPI) StateMinerPeerID(ctx context.Context, m address.Address, ts *
return stmgr.GetMinerPeerID(ctx, a.StateManager, ts, m) return stmgr.GetMinerPeerID(ctx, a.StateManager, ts, m)
} }
func (a *StateAPI) StateMinerProvingPeriodEnd(ctx context.Context, actor address.Address, ts *types.TipSet) (uint64, error) {
return stmgr.GetMinerProvingPeriodEnd(ctx, a.StateManager, ts, actor)
}
func (a *StateAPI) StateCall(ctx context.Context, msg *types.Message, ts *types.TipSet) (*types.MessageReceipt, error) { func (a *StateAPI) StateCall(ctx context.Context, msg *types.Message, ts *types.TipSet) (*types.MessageReceipt, error) {
return a.StateManager.Call(ctx, msg, ts) return a.StateManager.Call(ctx, msg, ts)
} }
@ -260,13 +152,13 @@ func (a *StateAPI) StateReadState(ctx context.Context, act *types.Actor, ts *typ
} }
// This is on StateAPI because miner.Miner requires this, and MinerAPI requires miner.Miner // This is on StateAPI because miner.Miner requires this, and MinerAPI requires miner.Miner
func (a *StateAPI) MinerCreateBlock(ctx context.Context, addr address.Address, parents *types.TipSet, tickets []*types.Ticket, proof types.ElectionProof, msgs []*types.SignedMessage, ts uint64) (*chain.BlockMsg, error) { func (a *StateAPI) MinerCreateBlock(ctx context.Context, addr address.Address, parents *types.TipSet, tickets []*types.Ticket, proof types.ElectionProof, msgs []*types.SignedMessage, ts uint64) (*types.BlockMsg, error) {
fblk, err := gen.MinerCreateBlock(ctx, a.StateManager, a.Wallet, addr, parents, tickets, proof, msgs, ts) fblk, err := gen.MinerCreateBlock(ctx, a.StateManager, a.Wallet, addr, parents, tickets, proof, msgs, ts)
if err != nil { if err != nil {
return nil, err return nil, err
} }
var out chain.BlockMsg var out types.BlockMsg
out.Header = fblk.Header out.Header = fblk.Header
for _, msg := range fblk.BlsMessages { for _, msg := range fblk.BlsMessages {
out.BlsMessages = append(out.BlsMessages, msg.Cid()) out.BlsMessages = append(out.BlsMessages, msg.Cid())

View File

@ -32,7 +32,7 @@ func (a *WalletAPI) WalletList(ctx context.Context) ([]address.Address, error) {
} }
func (a *WalletAPI) WalletBalance(ctx context.Context, addr address.Address) (types.BigInt, error) { func (a *WalletAPI) WalletBalance(ctx context.Context, addr address.Address) (types.BigInt, error) {
return a.StateManager.GetBalance(addr) return a.StateManager.GetBalance(addr, nil)
} }
func (a *WalletAPI) WalletSign(ctx context.Context, k address.Address, msg []byte) (*types.Signature, error) { func (a *WalletAPI) WalletSign(ctx context.Context, k address.Address, msg []byte) (*types.Signature, error) {

View File

@ -223,7 +223,7 @@ func (pm *Manager) CheckVoucherSpendable(ctx context.Context, ch address.Address
func (pm *Manager) loadPaychState(ctx context.Context, ch address.Address) (*types.Actor, *actors.PaymentChannelActorState, error) { func (pm *Manager) loadPaychState(ctx context.Context, ch address.Address) (*types.Actor, *actors.PaymentChannelActorState, error) {
var pcast actors.PaymentChannelActorState var pcast actors.PaymentChannelActorState
act, err := pm.sm.LoadActorState(ctx, ch, &pcast) act, err := pm.sm.LoadActorState(ctx, ch, &pcast, nil)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }

View File

@ -2,18 +2,20 @@ package storage
import ( import (
"context" "context"
"fmt" "encoding/base64"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore"
logging "github.com/ipfs/go-log" logging "github.com/ipfs/go-log"
host "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/host"
"github.com/pkg/errors" "github.com/pkg/errors"
"golang.org/x/xerrors"
"sync"
"github.com/filecoin-project/go-lotus/api" "github.com/filecoin-project/go-lotus/api"
"github.com/filecoin-project/go-lotus/build" "github.com/filecoin-project/go-lotus/build"
"github.com/filecoin-project/go-lotus/chain/actors" "github.com/filecoin-project/go-lotus/chain/actors"
"github.com/filecoin-project/go-lotus/chain/address" "github.com/filecoin-project/go-lotus/chain/address"
"github.com/filecoin-project/go-lotus/chain/events"
"github.com/filecoin-project/go-lotus/chain/store" "github.com/filecoin-project/go-lotus/chain/store"
"github.com/filecoin-project/go-lotus/chain/types" "github.com/filecoin-project/go-lotus/chain/types"
"github.com/filecoin-project/go-lotus/lib/sectorbuilder" "github.com/filecoin-project/go-lotus/lib/sectorbuilder"
@ -23,8 +25,11 @@ import (
var log = logging.Logger("storageminer") var log = logging.Logger("storageminer")
const PoStConfidence = 1
type Miner struct { type Miner struct {
api storageMinerApi api storageMinerApi
events *events.Events
secst *sector.Store secst *sector.Store
commt *commitment.Tracker commt *commitment.Tracker
@ -36,6 +41,9 @@ type Miner struct {
h host.Host h host.Host
ds datastore.Batching ds datastore.Batching
schedLk sync.Mutex
postSched uint64
} }
type storageMinerApi interface { type storageMinerApi interface {
@ -44,21 +52,27 @@ type storageMinerApi interface {
// Call a read only method on actors (no interaction with the chain required) // Call a read only method on actors (no interaction with the chain required)
StateCall(ctx context.Context, msg *types.Message, ts *types.TipSet) (*types.MessageReceipt, error) StateCall(ctx context.Context, msg *types.Message, ts *types.TipSet) (*types.MessageReceipt, error)
StateMinerWorker(context.Context, address.Address, *types.TipSet) (address.Address, error)
StateMinerProvingPeriodEnd(context.Context, address.Address, *types.TipSet) (uint64, error)
StateMinerProvingSet(context.Context, address.Address, *types.TipSet) ([]*api.SectorInfo, error)
MpoolPush(context.Context, *types.SignedMessage) error MpoolPushMessage(context.Context, *types.Message) (*types.SignedMessage, error)
MpoolGetNonce(context.Context, address.Address) (uint64, error)
ChainHead(context.Context) (*types.TipSet, error)
ChainWaitMsg(context.Context, cid.Cid) (*api.MsgWait, error) ChainWaitMsg(context.Context, cid.Cid) (*api.MsgWait, error)
ChainNotify(context.Context) (<-chan *store.HeadChange, error) ChainNotify(context.Context) (<-chan []*store.HeadChange, error)
ChainGetRandomness(context.Context, *types.TipSet, []*types.Ticket, int) ([]byte, error)
ChainGetTipSetByHeight(context.Context, uint64, *types.TipSet) (*types.TipSet, error)
ChainGetBlockMessages(context.Context, cid.Cid) (*api.BlockMessages, error)
WalletBalance(context.Context, address.Address) (types.BigInt, error) WalletBalance(context.Context, address.Address) (types.BigInt, error)
WalletSign(context.Context, address.Address, []byte) (*types.Signature, error)
WalletHas(context.Context, address.Address) (bool, error) WalletHas(context.Context, address.Address) (bool, error)
} }
func NewMiner(api storageMinerApi, addr address.Address, h host.Host, ds datastore.Batching, secst *sector.Store, commt *commitment.Tracker) (*Miner, error) { func NewMiner(api storageMinerApi, addr address.Address, h host.Host, ds datastore.Batching, secst *sector.Store, commt *commitment.Tracker) (*Miner, error) {
return &Miner{ return &Miner{
api: api, api: api,
maddr: addr, maddr: addr,
h: h, h: h,
ds: ds, ds: ds,
@ -72,8 +86,15 @@ func (m *Miner) Run(ctx context.Context) error {
return errors.Wrap(err, "miner preflight checks failed") return errors.Wrap(err, "miner preflight checks failed")
} }
m.events = events.NewEvents(ctx, m.api)
ts, err := m.api.ChainHead(ctx)
if err != nil {
return err
}
go m.handlePostingSealedSectors(ctx) go m.handlePostingSealedSectors(ctx)
go m.runPoSt(ctx) go m.schedulePoSt(ctx, ts)
return nil return nil
} }
@ -126,7 +147,7 @@ func (m *Miner) commitSector(ctx context.Context, sinfo sectorbuilder.SectorSeal
return errors.Wrap(aerr, "could not serialize commit sector parameters") return errors.Wrap(aerr, "could not serialize commit sector parameters")
} }
msg := types.Message{ msg := &types.Message{
To: m.maddr, To: m.maddr,
From: m.worker, From: m.worker,
Method: actors.MAMethods.CommitSector, Method: actors.MAMethods.CommitSector,
@ -136,45 +157,181 @@ func (m *Miner) commitSector(ctx context.Context, sinfo sectorbuilder.SectorSeal
GasPrice: types.NewInt(1), GasPrice: types.NewInt(1),
} }
nonce, err := m.api.MpoolGetNonce(ctx, m.worker) smsg, err := m.api.MpoolPushMessage(ctx, msg)
if err != nil { if err != nil {
return errors.Wrap(err, "failed to get nonce") return errors.Wrap(err, "pushing message to mpool")
}
msg.Nonce = nonce
data, err := msg.Serialize()
if err != nil {
return errors.Wrap(err, "serializing commit sector message")
}
sig, err := m.api.WalletSign(ctx, m.worker, data)
if err != nil {
return errors.Wrap(err, "signing commit sector message")
}
smsg := &types.SignedMessage{
Message: msg,
Signature: *sig,
}
if err := m.api.MpoolPush(ctx, smsg); err != nil {
return errors.Wrap(err, "pushing commit sector message to mpool")
} }
if err := m.commt.TrackCommitSectorMsg(m.maddr, sinfo.SectorID, smsg.Cid()); err != nil { if err := m.commt.TrackCommitSectorMsg(m.maddr, sinfo.SectorID, smsg.Cid()); err != nil {
return errors.Wrap(err, "tracking sector commitment") return errors.Wrap(err, "tracking sector commitment")
} }
go func() {
_, err := m.api.ChainWaitMsg(ctx, smsg.Cid())
if err != nil {
return
}
m.schedulePoSt(ctx, nil)
}()
return nil return nil
} }
func (m *Miner) runPoSt(ctx context.Context) { func (m *Miner) schedulePoSt(ctx context.Context, baseTs *types.TipSet) {
log.Warning("dont care about posts yet") ppe, err := m.api.StateMinerProvingPeriodEnd(ctx, m.maddr, baseTs)
if err != nil {
log.Errorf("failed to get proving period end for miner: %s", err)
return
}
if ppe == 0 {
log.Errorf("Proving period end == 0")
// TODO: we probably want to call schedulePoSt after the first commitSector call
return
}
m.schedLk.Lock()
if m.postSched >= ppe {
log.Warnf("schedulePoSt already called for proving period >= %d", m.postSched)
m.schedLk.Unlock()
return
}
m.postSched = ppe
m.schedLk.Unlock()
log.Infof("Scheduling post at height %d", ppe-build.PoSTChallangeTime)
err = m.events.ChainAt(m.startPost, func(ts *types.TipSet) error { // Revert
// TODO: Cancel post
return nil
}, PoStConfidence, ppe-build.PoSTChallangeTime)
if err != nil {
// TODO: This is BAD, figure something out
log.Errorf("scheduling PoSt failed: %s", err)
return
}
}
func (m *Miner) startPost(ts *types.TipSet, curH uint64) error {
log.Info("starting PoSt computation")
head, err := m.api.ChainHead(context.TODO())
if err != nil {
return err
}
postWaitCh, _, err := m.maybeDoPost(context.TODO(), head)
if err != nil {
return err
}
if postWaitCh == nil {
return errors.New("PoSt didn't start")
}
go func() {
err := <-postWaitCh
if err != nil {
log.Errorf("got error back from postWaitCh: %s", err)
return
}
log.Infof("post successfully submitted")
m.schedulePoSt(context.TODO(), ts)
}()
return nil
}
func (m *Miner) maybeDoPost(ctx context.Context, ts *types.TipSet) (<-chan error, *types.BlockHeader, error) {
ppe, err := m.api.StateMinerProvingPeriodEnd(ctx, m.maddr, ts)
if err != nil {
return nil, nil, xerrors.Errorf("failed to get proving period end for miner: %w", err)
}
if ts.Height() > ppe {
log.Warnf("skipping post, supplied tipset too high: ppe=%d, ts.H=%d", ppe, ts.Height())
return nil, nil, nil
}
sset, err := m.api.StateMinerProvingSet(ctx, m.maddr, ts)
if err != nil {
return nil, nil, xerrors.Errorf("failed to get proving set for miner: %w", err)
}
r, err := m.api.ChainGetRandomness(ctx, ts, nil, int(int64(ts.Height())-int64(ppe)+int64(build.PoSTChallangeTime))) // TODO: review: check math
if err != nil {
return nil, nil, xerrors.Errorf("failed to get chain randomness for post: %w", err)
}
sourceTs, err := m.api.ChainGetTipSetByHeight(ctx, ppe-build.PoSTChallangeTime, ts)
if err != nil {
return nil, nil, xerrors.Errorf("failed to get post start tipset: %w", err)
}
ret := make(chan error, 1)
go func() {
log.Infof("running PoSt computation, rh=%d r=%s, ppe=%d, h=%d", ts.Height()-(ts.Height()-ppe+build.PoSTChallangeTime), base64.StdEncoding.EncodeToString(r), ppe, ts.Height())
var faults []uint64
proof, err := m.secst.RunPoSt(ctx, sset, r, faults)
if err != nil {
ret <- xerrors.Errorf("running post failed: %w", err)
return
}
log.Infof("submitting PoSt pLen=%d", len(proof))
params := &actors.SubmitPoStParams{
Proof: proof,
DoneSet: types.BitFieldFromSet(sectorIdList(sset)),
}
enc, aerr := actors.SerializeParams(params)
if aerr != nil {
ret <- xerrors.Errorf("could not serialize submit post parameters: %w", err)
return
}
msg := &types.Message{
To: m.maddr,
From: m.worker,
Method: actors.MAMethods.SubmitPoSt,
Params: enc,
Value: types.NewInt(0),
GasLimit: types.NewInt(100000 /* i dont know help */),
GasPrice: types.NewInt(1),
}
smsg, err := m.api.MpoolPushMessage(ctx, msg)
if err != nil {
ret <- xerrors.Errorf("pushing message to mpool: %w", err)
return
}
// make sure it succeeds...
_, err = m.api.ChainWaitMsg(ctx, smsg.Cid())
if err != nil {
return
}
// TODO: check receipt
m.schedulePoSt(ctx, nil)
}()
return ret, sourceTs.MinTicketBlock(), nil
}
func sectorIdList(si []*api.SectorInfo) []uint64 {
out := make([]uint64, len(si))
for i, s := range si {
out[i] = s.SectorID
}
return out
} }
func (m *Miner) runPreflightChecks(ctx context.Context) error { func (m *Miner) runPreflightChecks(ctx context.Context) error {
worker, err := m.getWorkerAddr(ctx) worker, err := m.api.StateMinerWorker(ctx, m.maddr, nil)
if err != nil { if err != nil {
return err return err
} }
@ -193,23 +350,3 @@ func (m *Miner) runPreflightChecks(ctx context.Context) error {
log.Infof("starting up miner %s, worker addr %s", m.maddr, m.worker) log.Infof("starting up miner %s, worker addr %s", m.maddr, m.worker)
return nil return nil
} }
func (m *Miner) getWorkerAddr(ctx context.Context) (address.Address, error) {
msg := &types.Message{
To: m.maddr,
From: m.maddr, // it doesnt like it if we dont give it a from... probably should fix that
Method: actors.MAMethods.GetWorkerAddr,
Params: nil,
}
recpt, err := m.api.StateCall(ctx, msg, nil)
if err != nil {
return address.Undef, errors.Wrapf(err, "calling getWorker(%s)", m.maddr)
}
if recpt.ExitCode != 0 {
return address.Undef, fmt.Errorf("failed to call getWorker(%s): return %d", m.maddr, recpt.ExitCode)
}
return address.NewFromBytes(recpt.Return)
}

View File

@ -2,12 +2,14 @@ package sector
import ( import (
"context" "context"
"golang.org/x/xerrors"
"io" "io"
"io/ioutil" "io/ioutil"
"os" "os"
"sync" "sync"
"time" "time"
"github.com/filecoin-project/go-lotus/api"
"github.com/filecoin-project/go-lotus/lib/sectorbuilder" "github.com/filecoin-project/go-lotus/lib/sectorbuilder"
logging "github.com/ipfs/go-log" logging "github.com/ipfs/go-log"
@ -40,7 +42,7 @@ func (s *Store) Service() {
} }
func (s *Store) poll() { func (s *Store) poll() {
log.Info("polling for sealed sectors...") log.Debug("polling for sealed sectors...")
// get a list of sectors to poll // get a list of sectors to poll
s.lk.Lock() s.lk.Lock()
@ -162,6 +164,30 @@ func (s *Store) WaitSeal(ctx context.Context, sector uint64) (sectorbuilder.Sect
return s.sb.SealStatus(sector) return s.sb.SealStatus(sector)
} }
func (s *Store) RunPoSt(ctx context.Context, sectors []*api.SectorInfo, r []byte, faults []uint64) ([]byte, error) {
sbsi := make([]sectorbuilder.SectorInfo, len(sectors))
for k, sector := range sectors {
var commR [sectorbuilder.CommLen]byte
if copy(commR[:], sector.CommR) != sectorbuilder.CommLen {
return nil, xerrors.Errorf("commR too short, %d bytes", len(sector.CommR))
}
sbsi[k] = sectorbuilder.SectorInfo{
SectorID: sector.SectorID,
CommR: commR,
}
}
ssi := sectorbuilder.NewSortedSectorInfo(sbsi)
var seed [sectorbuilder.CommLen]byte
if copy(seed[:], r) != sectorbuilder.CommLen {
return nil, xerrors.Errorf("random seed too short, %d bytes", len(r))
}
return s.sb.GeneratePoSt(ssi, seed, faults)
}
func (s *Store) Stop() { func (s *Store) Stop() {
close(s.closeCh) close(s.closeCh)
} }