miner: Initial event system integration

This commit is contained in:
Łukasz Magiera 2019-09-18 15:32:21 +02:00
parent 76ce3d9bb2
commit cc82cc9675
5 changed files with 69 additions and 67 deletions

View File

@ -31,6 +31,7 @@ type heightHandler struct {
type eventApi interface { type eventApi interface {
ChainNotify(context.Context) (<-chan []*store.HeadChange, error) ChainNotify(context.Context) (<-chan []*store.HeadChange, error)
ChainGetBlockMessages(context.Context, cid.Cid) (*api.BlockMessages, error) ChainGetBlockMessages(context.Context, cid.Cid) (*api.BlockMessages, error)
ChainGetTipSetByHeight(context.Context, uint64, *types.TipSet) (*types.TipSet, error)
} }
type Events struct { type Events struct {
@ -49,7 +50,7 @@ type Events struct {
func NewEvents(api eventApi) *Events { func NewEvents(api eventApi) *Events {
gcConfidence := 2 * build.ForkLengthThreshold gcConfidence := 2 * build.ForkLengthThreshold
tsc := newTSCache(gcConfidence) tsc := newTSCache(gcConfidence, api.ChainGetTipSetByHeight)
e := &Events{ e := &Events{
api: api, api: api,

View File

@ -39,6 +39,10 @@ type fakeCS struct {
sub func(rev, app []*types.TipSet) sub func(rev, app []*types.TipSet)
} }
func (fcs *fakeCS) ChainGetTipSetByHeight(context.Context, uint64, *types.TipSet) (*types.TipSet, error) {
panic("Not Implemented")
}
func makeTs(t *testing.T, h uint64, msgcid cid.Cid) *types.TipSet { func makeTs(t *testing.T, h uint64, msgcid cid.Cid) *types.TipSet {
ts, err := types.NewTipSet([]*types.BlockHeader{ ts, err := types.NewTipSet([]*types.BlockHeader{
{ {
@ -152,7 +156,7 @@ func TestAt(t *testing.T) {
fcs := &fakeCS{ fcs := &fakeCS{
t: t, t: t,
h: 1, h: 1,
tsc: newTSCache(2 * build.ForkLengthThreshold), tsc: newTSCache(2*build.ForkLengthThreshold, nil),
} }
require.NoError(t, fcs.tsc.add(makeTs(t, 1, dummyCid))) require.NoError(t, fcs.tsc.add(makeTs(t, 1, dummyCid)))
@ -211,7 +215,7 @@ func TestAtStart(t *testing.T) {
fcs := &fakeCS{ fcs := &fakeCS{
t: t, t: t,
h: 1, h: 1,
tsc: newTSCache(2 * build.ForkLengthThreshold), tsc: newTSCache(2*build.ForkLengthThreshold, nil),
} }
require.NoError(t, fcs.tsc.add(makeTs(t, 1, dummyCid))) require.NoError(t, fcs.tsc.add(makeTs(t, 1, dummyCid)))
@ -245,7 +249,7 @@ func TestAtStartConfidence(t *testing.T) {
fcs := &fakeCS{ fcs := &fakeCS{
t: t, t: t,
h: 1, h: 1,
tsc: newTSCache(2 * build.ForkLengthThreshold), tsc: newTSCache(2*build.ForkLengthThreshold, nil),
} }
require.NoError(t, fcs.tsc.add(makeTs(t, 1, dummyCid))) require.NoError(t, fcs.tsc.add(makeTs(t, 1, dummyCid)))
@ -278,7 +282,7 @@ func TestCalled(t *testing.T) {
msgs: map[cid.Cid]fakeMsg{}, msgs: map[cid.Cid]fakeMsg{},
blkMsgs: map[cid.Cid]cid.Cid{}, blkMsgs: map[cid.Cid]cid.Cid{},
tsc: newTSCache(2 * build.ForkLengthThreshold), tsc: newTSCache(2*build.ForkLengthThreshold, nil),
} }
require.NoError(t, fcs.tsc.add(makeTs(t, 1, dummyCid))) require.NoError(t, fcs.tsc.add(makeTs(t, 1, dummyCid)))
@ -476,7 +480,7 @@ func TestCalledTimeout(t *testing.T) {
msgs: map[cid.Cid]fakeMsg{}, msgs: map[cid.Cid]fakeMsg{},
blkMsgs: map[cid.Cid]cid.Cid{}, blkMsgs: map[cid.Cid]cid.Cid{},
tsc: newTSCache(2 * build.ForkLengthThreshold), tsc: newTSCache(2*build.ForkLengthThreshold, nil),
} }
require.NoError(t, fcs.tsc.add(makeTs(t, 1, dummyCid))) require.NoError(t, fcs.tsc.add(makeTs(t, 1, dummyCid)))
@ -516,7 +520,7 @@ func TestCalledTimeout(t *testing.T) {
msgs: map[cid.Cid]fakeMsg{}, msgs: map[cid.Cid]fakeMsg{},
blkMsgs: map[cid.Cid]cid.Cid{}, blkMsgs: map[cid.Cid]cid.Cid{},
tsc: newTSCache(2 * build.ForkLengthThreshold), tsc: newTSCache(2*build.ForkLengthThreshold, nil),
} }
require.NoError(t, fcs.tsc.add(makeTs(t, 1, dummyCid))) require.NoError(t, fcs.tsc.add(makeTs(t, 1, dummyCid)))
@ -550,7 +554,7 @@ func TestCalledOrder(t *testing.T) {
msgs: map[cid.Cid]fakeMsg{}, msgs: map[cid.Cid]fakeMsg{},
blkMsgs: map[cid.Cid]cid.Cid{}, blkMsgs: map[cid.Cid]cid.Cid{},
tsc: newTSCache(2 * build.ForkLengthThreshold), tsc: newTSCache(2*build.ForkLengthThreshold, nil),
} }
require.NoError(t, fcs.tsc.add(makeTs(t, 1, dummyCid))) require.NoError(t, fcs.tsc.add(makeTs(t, 1, dummyCid)))

View File

@ -1,24 +1,31 @@
package events package events
import ( import (
"context"
"golang.org/x/xerrors" "golang.org/x/xerrors"
"github.com/filecoin-project/go-lotus/chain/types" "github.com/filecoin-project/go-lotus/chain/types"
) )
type tsByHFunc func(context.Context, uint64, *types.TipSet) (*types.TipSet, error)
// tipSetCache implements a simple ring-buffer cache to keep track of recent // tipSetCache implements a simple ring-buffer cache to keep track of recent
// tipsets // tipsets
type tipSetCache struct { type tipSetCache struct {
cache []*types.TipSet cache []*types.TipSet
start int start int
len int len int
storage tsByHFunc
} }
func newTSCache(cap int) *tipSetCache { func newTSCache(cap int, storage tsByHFunc) *tipSetCache {
return &tipSetCache{ return &tipSetCache{
cache: make([]*types.TipSet, cap), cache: make([]*types.TipSet, cap),
start: 0, start: 0,
len: 0, len: 0,
storage: storage,
} }
} }
@ -66,9 +73,7 @@ func (tsc *tipSetCache) get(height uint64) (*types.TipSet, error) {
tailH := tsc.cache[(tsc.start-tsc.len+1)%len(tsc.cache)].Height() tailH := tsc.cache[(tsc.start-tsc.len+1)%len(tsc.cache)].Height()
if height < tailH { if height < tailH {
// TODO: we can try to walk parents, but that shouldn't happen in return tsc.storage(context.TODO(), height, tsc.cache[tailH])
// practice, so it's probably not worth implementing
return nil, xerrors.Errorf("tipSetCache.get: requested tipset not in cache (req: %d, cache tail: %d)", height, tailH)
} }
return tsc.cache[int(height-tailH+1)%len(tsc.cache)], nil return tsc.cache[int(height-tailH+1)%len(tsc.cache)], nil

View File

@ -109,5 +109,5 @@ func (a *ChainAPI) ChainGetBlockReceipts(ctx context.Context, bcid cid.Cid) ([]*
} }
func (a *ChainAPI) ChainGetTipSetByHeight(ctx context.Context, h uint64, ts *types.TipSet) (*types.TipSet, error) { func (a *ChainAPI) ChainGetTipSetByHeight(ctx context.Context, h uint64, ts *types.TipSet) (*types.TipSet, error) {
panic("NYI") return a.Chain.GetTipsetByHeight(ctx, h, ts)
} }

View File

@ -2,11 +2,10 @@ package storage
import ( import (
"context" "context"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore"
logging "github.com/ipfs/go-log" logging "github.com/ipfs/go-log"
host "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/host"
"github.com/pkg/errors" "github.com/pkg/errors"
"golang.org/x/xerrors" "golang.org/x/xerrors"
@ -24,6 +23,8 @@ import (
var log = logging.Logger("storageminer") var log = logging.Logger("storageminer")
const PoStConfidence = 0
type Miner struct { type Miner struct {
api storageMinerApi api storageMinerApi
events *events.Events events *events.Events
@ -53,6 +54,7 @@ type storageMinerApi interface {
MpoolPush(context.Context, *types.SignedMessage) error MpoolPush(context.Context, *types.SignedMessage) error
MpoolGetNonce(context.Context, address.Address) (uint64, error) MpoolGetNonce(context.Context, address.Address) (uint64, error)
ChainHead(context.Context) (*types.TipSet, error)
ChainWaitMsg(context.Context, cid.Cid) (*api.MsgWait, error) ChainWaitMsg(context.Context, cid.Cid) (*api.MsgWait, error)
ChainNotify(context.Context) (<-chan []*store.HeadChange, error) ChainNotify(context.Context) (<-chan []*store.HeadChange, error)
ChainGetRandomness(context.Context, *types.TipSet, []*types.Ticket, int) ([]byte, error) ChainGetRandomness(context.Context, *types.TipSet, []*types.Ticket, int) ([]byte, error)
@ -82,8 +84,13 @@ func (m *Miner) Run(ctx context.Context) error {
return errors.Wrap(err, "miner preflight checks failed") return errors.Wrap(err, "miner preflight checks failed")
} }
ts, err := m.api.ChainHead(ctx)
if err != nil {
return err
}
go m.handlePostingSealedSectors(ctx) go m.handlePostingSealedSectors(ctx)
go m.runPoSt(ctx) go m.schedulePoSt(ctx, ts)
return nil return nil
} }
@ -179,70 +186,54 @@ func (m *Miner) commitSector(ctx context.Context, sinfo sectorbuilder.SectorSeal
return nil return nil
} }
func (m *Miner) runPoSt(ctx context.Context) { func (m *Miner) schedulePoSt(ctx context.Context, baseTs *types.TipSet) {
// TODO: most of this method can probably be replaced by the events module once it works on top of the api ppe, err := m.api.StateMinerProvingPeriodEnd(ctx, m.maddr, baseTs)
notifs, err := m.api.ChainNotify(ctx)
if err != nil { if err != nil {
// TODO: this is probably 'crash the node' level serious log.Errorf("failed to get proving period end for miner: %s", err)
log.Errorf("POST ROUTINE FAILED: failed to get chain notifications stream: %s", err)
return return
} }
curhead := <-notifs if ppe == 0 {
if curhead[0].Type != store.HCCurrent { log.Errorf("Proving period end == 0")
// TODO: this is probably 'crash the node' level serious // TODO: we probably want to call schedulePoSt after the first commitSector call
log.Warning("expected to get current best tipset from chain notifications stream")
return return
} }
postCtx, cancel := context.WithCancel(ctx) log.Infof("Scheduling post at height %d", ppe)
postWaitCh, onBlock, err := m.maybeDoPost(postCtx, curhead[0].Val) // TODO: Should we set confidence to randomness lookback?
err = m.events.ChainAt(m.startPost, func(ts *types.TipSet) error { // Revert
// TODO: Cancel post
return nil
}, PoStConfidence, ppe)
if err != nil { if err != nil {
log.Errorf("initial 'maybeDoPost' call failed: %s", err) // TODO: This is BAD, figure something out
log.Errorf("scheduling PoSt failed: %s", err)
return return
} }
}
for { func (m *Miner) startPost(ts *types.TipSet, curH uint64) error {
select { postWaitCh, _, err := m.maybeDoPost(context.TODO(), ts)
case <-ctx.Done():
case notif, ok := <-notifs:
for _, ch := range notif {
if !ok {
log.Warning("chain notifications stream terminated")
// TODO: attempt to restart it if the context isnt cancelled
return
}
switch ch.Type {
case store.HCApply:
postWaitCh, onBlock, err = m.maybeDoPost(postCtx, ch.Val)
if err != nil { if err != nil {
log.Errorf("maybeDoPost failed: %s", err) return err
return
} }
case store.HCRevert:
if onBlock != nil { if postWaitCh == nil {
if ch.Val.Contains(onBlock.Cid()) { return errors.New("PoSt didn't start")
// Our post may now be invalid!
cancel() // probably the right thing to do?
} }
}
case store.HCCurrent: go func() {
log.Warn("got 'current' chain notification in middle of stream") err := <-postWaitCh
} if err != nil {
}
case perr := <-postWaitCh:
if perr != nil {
log.Errorf("got error back from postWaitCh: %s", err) log.Errorf("got error back from postWaitCh: %s", err)
// TODO: what do we even do here?
return return
} }
postWaitCh = nil
onBlock = nil
// yay?
log.Infof("post successfully submitted") log.Infof("post successfully submitted")
}
} m.schedulePoSt(context.TODO(), ts)
}()
return nil
} }
func (m *Miner) maybeDoPost(ctx context.Context, ts *types.TipSet) (<-chan error, *types.BlockHeader, error) { func (m *Miner) maybeDoPost(ctx context.Context, ts *types.TipSet) (<-chan error, *types.BlockHeader, error) {
@ -252,6 +243,7 @@ func (m *Miner) maybeDoPost(ctx context.Context, ts *types.TipSet) (<-chan error
} }
if ppe < ts.Height() { if ppe < ts.Height() {
log.Warnf("skipping post, supplied tipset too high: ppe=%d, ts.H=%d", ppe, ts.Height())
return nil, nil, nil return nil, nil, nil
} }