WIP: post scheduling

This commit is contained in:
whyrusleeping 2019-09-17 15:43:47 -07:00
parent f3854a4826
commit 11040f105b
6 changed files with 116 additions and 11 deletions

View File

@ -120,8 +120,11 @@ func (sm *StateManager) computeTipSetState(cids []cid.Cid) (cid.Cid, error) {
return vmi.Flush(ctx)
}
func (sm *StateManager) GetActor(addr address.Address) (*types.Actor, error) {
ts := sm.cs.GetHeaviestTipSet()
func (sm *StateManager) GetActor(addr address.Address, ts *types.TipSet) (*types.Actor, error) {
if ts == nil {
ts = sm.cs.GetHeaviestTipSet()
}
stcid, err := sm.TipSetState(ts.Cids())
if err != nil {
return nil, xerrors.Errorf("tipset state: %w", err)
@ -136,8 +139,8 @@ func (sm *StateManager) GetActor(addr address.Address) (*types.Actor, error) {
return state.GetActor(addr)
}
func (sm *StateManager) GetBalance(addr address.Address) (types.BigInt, error) {
act, err := sm.GetActor(addr)
func (sm *StateManager) GetBalance(addr address.Address, ts *types.TipSet) (types.BigInt, error) {
act, err := sm.GetActor(addr, ts)
if err != nil {
return types.BigInt{}, xerrors.Errorf("get actor: %w", err)
}
@ -149,8 +152,8 @@ func (sm *StateManager) ChainStore() *store.ChainStore {
return sm.cs
}
func (sm *StateManager) LoadActorState(ctx context.Context, a address.Address, out interface{}) (*types.Actor, error) {
act, err := sm.GetActor(a)
func (sm *StateManager) LoadActorState(ctx context.Context, a address.Address, out interface{}, ts *types.TipSet) (*types.Actor, error) {
act, err := sm.GetActor(a, ts)
if err != nil {
return nil, err
}

View File

@ -2,12 +2,16 @@ package stmgr
import (
"context"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/filecoin-project/go-lotus/chain/actors"
"github.com/filecoin-project/go-lotus/chain/address"
"github.com/filecoin-project/go-lotus/chain/types"
amt "github.com/filecoin-project/go-amt-ipld"
cid "github.com/ipfs/go-cid"
cbor "github.com/ipfs/go-ipld-cbor"
"github.com/libp2p/go-libp2p-core/peer"
cbg "github.com/whyrusleeping/cbor-gen"
"golang.org/x/xerrors"
)
@ -122,3 +126,51 @@ func GetMinerPeerID(ctx context.Context, sm *StateManager, ts *types.TipSet, mad
return peer.IDFromBytes(recp.Return)
}
func GetMinerProvingPeriodEnd(ctx context.Context, sm *StateManager, ts *types.TipSet, maddr address.Address) (uint64, error) {
var mas actors.StorageMinerActorState
_, err := sm.LoadActorState(ctx, maddr, &mas, ts)
if err != nil {
return 0, xerrors.Errorf("failed to load miner actor state: %w", err)
}
return mas.ProvingPeriodEnd, nil
}
type SectorSetEntry struct {
SectorID uint64
CommR []byte
CommD []byte
}
func GetMinerProvingSet(ctx context.Context, sm *StateManager, ts *types.TipSet, maddr address.Address) ([]SectorSetEntry, error) {
var mas actors.StorageMinerActorState
_, err := sm.LoadActorState(ctx, maddr, &mas, ts)
if err != nil {
return nil, xerrors.Errorf("failed to load miner actor state: %w", err)
}
bs := amt.WrapBlockstore(sm.ChainStore().Blockstore())
a, err := amt.LoadAMT(bs, mas.ProvingSet)
if err != nil {
return nil, err
}
var sset []SectorSetEntry
if err := a.ForEach(func(i uint64, v *cbg.Deferred) error {
var comms [][]byte
if err := cbor.DecodeInto(v.Raw, &comms); err != nil {
return err
}
sset = append(sset, SectorSetEntry{
SectorID: i,
CommR: comms[0],
CommD: comms[1],
})
return nil
}); err != nil {
return nil, err
}
return sset, nil
}

View File

@ -35,6 +35,7 @@ type ChainStore struct {
heaviest *types.TipSet
bestTips *pubsub.PubSub
pubLk sync.Mutex
tstLk sync.Mutex
tipsets map[uint64][]cid.Cid
@ -51,6 +52,8 @@ func NewChainStore(bs bstore.Blockstore, ds dstore.Batching) *ChainStore {
}
hcnf := func(rev, app []*types.TipSet) error {
cs.pubLk.Lock()
defer cs.pubLk.Unlock()
for _, r := range rev {
cs.bestTips.Pub(&HeadChange{
Type: HCRevert,
@ -122,8 +125,9 @@ func (cs *ChainStore) SubNewTips() chan *types.TipSet {
}
const (
HCRevert = "revert"
HCApply = "apply"
HCRevert = "revert"
HCApply = "apply"
HCCurrent = "current"
)
type HeadChange struct {
@ -132,8 +136,17 @@ type HeadChange struct {
}
func (cs *ChainStore) SubHeadChanges(ctx context.Context) chan *HeadChange {
cs.pubLk.Lock()
subch := cs.bestTips.Sub("headchange")
head := cs.GetHeaviestTipSet()
cs.pubLk.Unlock()
out := make(chan *HeadChange, 16)
out <- &HeadChange{
Type: HCCurrent,
Val: head,
}
go func() {
defer close(out)
for {

2
go.mod
View File

@ -6,7 +6,7 @@ require (
contrib.go.opencensus.io/exporter/jaeger v0.1.0
github.com/BurntSushi/toml v0.3.1
github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d // indirect
github.com/filecoin-project/go-amt-ipld v0.0.0-20190917010905-40ffeec492ae
github.com/filecoin-project/go-amt-ipld v0.0.0-20190917221444-2ed85149c65d
github.com/filecoin-project/go-bls-sigs v0.0.0-20190718224239-4bc4b8a7bbf8
github.com/filecoin-project/go-leb128 v0.0.0-20190212224330-8d79a5489543
github.com/filecoin-project/go-sectorbuilder v0.0.0-00010101000000-000000000000

2
go.sum
View File

@ -70,6 +70,8 @@ github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFP
github.com/fatih/color v1.6.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
github.com/filecoin-project/go-amt-ipld v0.0.0-20190917010905-40ffeec492ae h1:rSg6wenxKdXby0piY57Vv5gOJR6Eibqq/4PxEk6KjvE=
github.com/filecoin-project/go-amt-ipld v0.0.0-20190917010905-40ffeec492ae/go.mod h1:lKjJYPg2kwbav5f78i5YA8kGccnZn18IySbpneXvaQs=
github.com/filecoin-project/go-amt-ipld v0.0.0-20190917221444-2ed85149c65d h1:fAJ40dcN0kpSFfdTssa1kLxlDImSEZy8e1d7a32tyBY=
github.com/filecoin-project/go-amt-ipld v0.0.0-20190917221444-2ed85149c65d/go.mod h1:lKjJYPg2kwbav5f78i5YA8kGccnZn18IySbpneXvaQs=
github.com/filecoin-project/go-leb128 v0.0.0-20190212224330-8d79a5489543 h1:aMJGfgqe1QDhAVwxRg5fjCRF533xHidiKsugk7Vvzug=
github.com/filecoin-project/go-leb128 v0.0.0-20190212224330-8d79a5489543/go.mod h1:mjrHv1cDGJWDlGmC0eDc1E5VJr8DmL9XMUcaFwiuKg8=
github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I=

View File

@ -170,7 +170,42 @@ func (m *Miner) commitSector(ctx context.Context, sinfo sectorbuilder.SectorSeal
}
func (m *Miner) runPoSt(ctx context.Context) {
log.Warning("dont care about posts yet")
notifs, err := m.api.ChainNotify(ctx)
if err != nil {
// TODO: this is probably 'crash the node' level serious
log.Errorf("POST ROUTINE FAILED: failed to get chain notifications stream: %s", err)
return
}
curhead := <-notifs
if curhead.Type != store.HCCurrent {
// TODO: this is probably 'crash the node' level serious
log.Warning("expected to get current best tipset from chain notifications stream")
return
}
postCtx, cancel := context.WithCancel(ctx)
postWaitCh, onBlock := m.maybeDoPost(postCtx, curhead)
for {
select {
case <-ctx.Done():
case ch, ok := <-notifs:
if !ok {
log.Warning("chain notifications stream terminated")
// TODO: attempt to restart it if the context isnt cancelled
return
}
if ch.Type == store.HCApply {
m.maybeDoPost(ch.Val)
}
}
}
}
func (m *Miner) maybeDoPost(ctx context.Context, ts *types.TipSet) (<-chan error, *types.BlockHeader) {
}
func (m *Miner) runPreflightChecks(ctx context.Context) error {