implement looking in the past for messages

This commit is contained in:
whyrusleeping 2019-10-08 14:51:34 +09:00 committed by Łukasz Magiera
parent 84985ef96f
commit 14c4a8bee6
15 changed files with 195 additions and 99 deletions

View File

@ -51,7 +51,6 @@ type FullNode interface {
ChainHead(context.Context) (*types.TipSet, error) // TODO: check serialization
ChainSubmitBlock(ctx context.Context, blk *types.BlockMsg) error // TODO: check serialization
ChainGetRandomness(context.Context, *types.TipSet, []*types.Ticket, int) ([]byte, error)
ChainWaitMsg(context.Context, cid.Cid) (*MsgWait, error)
ChainGetBlock(context.Context, cid.Cid) (*types.BlockHeader, error)
ChainGetBlockMessages(context.Context, cid.Cid) (*BlockMessages, error)
ChainGetParentReceipts(context.Context, cid.Cid) ([]*types.MessageReceipt, error)
@ -124,6 +123,7 @@ type FullNode interface {
StateMinerPeerID(ctx context.Context, m address.Address, ts *types.TipSet) (peer.ID, error)
StateMinerProvingPeriodEnd(ctx context.Context, actor address.Address, ts *types.TipSet) (uint64, error)
StatePledgeCollateral(context.Context, *types.TipSet) (types.BigInt, error)
StateWaitMsg(context.Context, cid.Cid) (*MsgWait, error)
PaychGet(ctx context.Context, from, to address.Address, ensureFunds types.BigInt) (*ChannelInfo, error)
PaychList(context.Context) ([]address.Address, error)

View File

@ -41,7 +41,6 @@ type FullNodeStruct struct {
ChainSubmitBlock func(ctx context.Context, blk *types.BlockMsg) error `perm:"write"`
ChainHead func(context.Context) (*types.TipSet, error) `perm:"read"`
ChainGetRandomness func(context.Context, *types.TipSet, []*types.Ticket, int) ([]byte, error) `perm:"read"`
ChainWaitMsg func(context.Context, cid.Cid) (*MsgWait, error) `perm:"read"`
ChainGetBlock func(context.Context, cid.Cid) (*types.BlockHeader, error) `perm:"read"`
ChainGetBlockMessages func(context.Context, cid.Cid) (*BlockMessages, error) `perm:"read"`
ChainGetParentReceipts func(context.Context, cid.Cid) ([]*types.MessageReceipt, error) `perm:"read"`
@ -92,6 +91,7 @@ type FullNodeStruct struct {
StateGetActor func(context.Context, address.Address, *types.TipSet) (*types.Actor, error) `perm:"read"`
StateReadState func(context.Context, *types.Actor, *types.TipSet) (*ActorState, error) `perm:"read"`
StatePledgeCollateral func(context.Context, *types.TipSet) (types.BigInt, error) `perm:"read"`
StateWaitMsg func(context.Context, cid.Cid) (*MsgWait, error) `perm:"read"`
PaychGet func(ctx context.Context, from, to address.Address, ensureFunds types.BigInt) (*ChannelInfo, error) `perm:"sign"`
PaychList func(context.Context) ([]address.Address, error) `perm:"read"`
@ -235,10 +235,6 @@ func (c *FullNodeStruct) ChainGetRandomness(ctx context.Context, pts *types.TipS
return c.Internal.ChainGetRandomness(ctx, pts, ticks, lb)
}
func (c *FullNodeStruct) ChainWaitMsg(ctx context.Context, msgc cid.Cid) (*MsgWait, error) {
return c.Internal.ChainWaitMsg(ctx, msgc)
}
func (c *FullNodeStruct) ChainGetTipSetByHeight(ctx context.Context, h uint64, ts *types.TipSet) (*types.TipSet, error) {
return c.Internal.ChainGetTipSetByHeight(ctx, h, ts)
}
@ -354,6 +350,10 @@ func (c *FullNodeStruct) StatePledgeCollateral(ctx context.Context, ts *types.Ti
return c.Internal.StatePledgeCollateral(ctx, ts)
}
func (c *FullNodeStruct) StateWaitMsg(ctx context.Context, msgc cid.Cid) (*MsgWait, error) {
return c.Internal.StateWaitMsg(ctx, msgc)
}
func (c *FullNodeStruct) PaychGet(ctx context.Context, from, to address.Address, ensureFunds types.BigInt) (*ChannelInfo, error) {
return c.Internal.PaychGet(ctx, from, to, ensureFunds)
}

View File

@ -153,7 +153,7 @@ func (h *Handler) accept(ctx context.Context, deal MinerDeal) (func(*MinerDeal),
if deal.Proposal.Payment.ChannelMessage != nil {
log.Info("waiting for channel message to appear on chain")
if _, err := h.full.ChainWaitMsg(ctx, *deal.Proposal.Payment.ChannelMessage); err != nil {
if _, err := h.full.StateWaitMsg(ctx, *deal.Proposal.Payment.ChannelMessage); err != nil {
return nil, xerrors.Errorf("waiting for paych message: %w", err)
}
}

View File

@ -2,6 +2,7 @@ package events
import (
"context"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-lotus/chain/types"

View File

@ -2,6 +2,7 @@ package stmgr
import (
"context"
"fmt"
"sync"
amt "github.com/filecoin-project/go-amt-ipld"
@ -274,3 +275,151 @@ func (sm *StateManager) GetBlsPublicKey(ctx context.Context, addr address.Addres
copy(pubk[:], kaddr.Payload())
return pubk, nil
}
func (sm *StateManager) WaitForMessage(ctx context.Context, mcid cid.Cid) (*types.TipSet, *types.MessageReceipt, error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
msg, err := sm.cs.GetCMessage(mcid)
if err != nil {
return nil, nil, fmt.Errorf("failed to load message: %w", err)
}
tsub := sm.cs.SubHeadChanges(ctx)
head, ok := <-tsub
if !ok {
return nil, nil, fmt.Errorf("SubHeadChanges stream was invalid")
}
if len(head) != 1 {
return nil, nil, fmt.Errorf("SubHeadChanges first entry should have been one item")
}
if head[0].Type != store.HCCurrent {
return nil, nil, fmt.Errorf("expected current head on SHC stream (got %s)", head[0].Type)
}
r, err := sm.tipsetExecutedMessage(head[0].Val, mcid)
if err != nil {
return nil, nil, err
}
if r != nil {
return head[0].Val, r, nil
}
var backTs *types.TipSet
var backRcp *types.MessageReceipt
backSearchWait := make(chan struct{})
go func() {
fts, r, err := sm.searchBackForMsg(ctx, head[0].Val, msg)
if err != nil {
log.Warnf("failed to look back through chain for message: %w", err)
return
}
backTs = fts
backRcp = r
close(backSearchWait)
}()
for {
select {
case notif, ok := <-tsub:
if !ok {
return nil, nil, ctx.Err()
}
for _, val := range notif {
switch val.Type {
case store.HCRevert:
continue
case store.HCApply:
r, err := sm.tipsetExecutedMessage(val.Val, mcid)
if err != nil {
return nil, nil, err
}
if r != nil {
return val.Val, r, nil
}
}
}
case <-backSearchWait:
if backTs != nil {
return backTs, backRcp, nil
}
backSearchWait = nil
case <-ctx.Done():
return nil, nil, ctx.Err()
}
}
}
func (sm *StateManager) searchBackForMsg(ctx context.Context, from *types.TipSet, m store.ChainMsg) (*types.TipSet, *types.MessageReceipt, error) {
cur := from
for {
select {
case <-ctx.Done():
return nil, nil, nil
default:
}
act, err := sm.GetActor(m.VMMessage().From, cur)
if err != nil {
return nil, nil, err
}
if act.Nonce < m.VMMessage().Nonce {
// nonce on chain is before message nonce we're looking for, its
// not going to be here
return nil, nil, nil
}
ts, err := sm.cs.LoadTipSet(cur.Parents())
if err != nil {
return nil, nil, fmt.Errorf("failed to load tipset during msg wait searchback: %w", err)
}
r, err := sm.tipsetExecutedMessage(ts, m.Cid())
if err != nil {
return nil, nil, fmt.Errorf("checking for message execution during lookback: %w", err)
}
if r != nil {
return ts, r, nil
}
if ts.Height() == 0 {
// it ain't here!
return nil, nil, nil
}
cur = ts
}
}
func (sm *StateManager) tipsetExecutedMessage(ts *types.TipSet, msg cid.Cid) (*types.MessageReceipt, error) {
// The genesis block did not execute any messages
if ts.Height() == 0 {
return nil, nil
}
pts, err := sm.cs.LoadTipSet(ts.Parents())
if err != nil {
return nil, err
}
cm, err := sm.cs.MessagesForTipset(pts)
if err != nil {
return nil, err
}
for i, m := range cm {
if m.Cid() == msg {
return sm.cs.GetParentReceipt(ts.Blocks()[0], i)
}
}
return nil, nil
}

View File

@ -463,6 +463,15 @@ func (cs *ChainStore) GetGenesis() (*types.BlockHeader, error) {
return types.DecodeBlock(genb.RawData())
}
func (cs *ChainStore) GetCMessage(c cid.Cid) (ChainMsg, error) {
m, err := cs.GetMessage(c)
if err == nil {
return m, nil
}
return cs.GetSignedMessage(c)
}
func (cs *ChainStore) GetMessage(c cid.Cid) (*types.Message, error) {
sb, err := cs.bs.Get(c)
if err != nil {
@ -642,71 +651,6 @@ func (cs *ChainStore) LoadSignedMessagesFromCids(cids []cid.Cid) ([]*types.Signe
return msgs, nil
}
func (cs *ChainStore) WaitForMessage(ctx context.Context, mcid cid.Cid) (*types.TipSet, *types.MessageReceipt, error) {
tsub := cs.SubHeadChanges(ctx)
head := cs.GetHeaviestTipSet()
r, err := cs.tipsetExecutedMessage(head, mcid)
if err != nil {
return nil, nil, err
}
if r != nil {
return head, r, nil
}
for {
select {
case notif, ok := <-tsub:
if !ok {
return nil, nil, ctx.Err()
}
for _, val := range notif {
switch val.Type {
case HCRevert:
continue
case HCApply:
r, err := cs.tipsetExecutedMessage(val.Val, mcid)
if err != nil {
return nil, nil, err
}
if r != nil {
return val.Val, r, nil
}
}
}
case <-ctx.Done():
return nil, nil, ctx.Err()
}
}
}
func (cs *ChainStore) tipsetExecutedMessage(ts *types.TipSet, msg cid.Cid) (*types.MessageReceipt, error) {
// The genesis block did not execute any messages
if ts.Height() == 0 {
return nil, nil
}
pts, err := cs.LoadTipSet(ts.Parents())
if err != nil {
return nil, err
}
cm, err := cs.MessagesForTipset(pts)
if err != nil {
return nil, err
}
for i, m := range cm {
if m.Cid() == msg {
return cs.GetParentReceipt(ts.Blocks()[0], i)
}
}
return nil, nil
}
func (cs *ChainStore) Blockstore() blockstore.Blockstore {
return cs.bs
}

View File

@ -83,7 +83,7 @@ var createMinerCmd = &cli.Command{
return xerrors.Errorf("failed to push signed message: %w", err)
}
mwait, err := api.ChainWaitMsg(ctx, smsg.Cid())
mwait, err := api.StateWaitMsg(ctx, smsg.Cid())
if err != nil {
return xerrors.Errorf("failed waiting for message inclusion: %w", err)
}

View File

@ -350,7 +350,7 @@ var paychVoucherSubmitCmd = &cli.Command{
return err
}
mwait, err := api.ChainWaitMsg(ctx, mcid)
mwait, err := api.StateWaitMsg(ctx, mcid)
if err != nil {
return err
}

View File

@ -3,6 +3,7 @@ package main
import (
"context"
"crypto/rand"
"github.com/libp2p/go-libp2p-core/crypto"
"github.com/ipfs/go-datastore"
@ -232,7 +233,7 @@ func configureStorageMiner(ctx context.Context, api api.FullNode, addr address.A
}
log.Info("Waiting for message: ", smsg.Cid())
ret, err := api.ChainWaitMsg(ctx, smsg.Cid())
ret, err := api.StateWaitMsg(ctx, smsg.Cid())
if err != nil {
return err
}
@ -303,7 +304,7 @@ func createStorageMiner(ctx context.Context, api api.FullNode, peerid peer.ID, c
log.Infof("Pushed StorageMarket.CreateStorageMiner, %s to Mpool", signed.Cid())
log.Infof("Waiting for confirmation")
mw, err := api.ChainWaitMsg(ctx, signed.Cid())
mw, err := api.StateWaitMsg(ctx, signed.Cid())
if err != nil {
return address.Undef, err
}

View File

@ -46,20 +46,6 @@ func (a *ChainAPI) ChainGetRandomness(ctx context.Context, pts *types.TipSet, ti
return a.Chain.GetRandomness(ctx, pts.Cids(), tickets, int64(lb))
}
func (a *ChainAPI) ChainWaitMsg(ctx context.Context, msg cid.Cid) (*api.MsgWait, error) {
// TODO: consider using event system for this, expose confidence
ts, recpt, err := a.Chain.WaitForMessage(ctx, msg)
if err != nil {
return nil, err
}
return &api.MsgWait{
Receipt: *recpt,
TipSet: ts,
}, nil
}
func (a *ChainAPI) ChainGetBlock(ctx context.Context, msg cid.Cid) (*types.BlockHeader, error) {
return a.Chain.GetBlock(msg)
}

View File

@ -193,3 +193,17 @@ func (a *StateAPI) MinerCreateBlock(ctx context.Context, addr address.Address, p
return &out, nil
}
func (a *StateAPI) StateWaitMsg(ctx context.Context, msg cid.Cid) (*api.MsgWait, error) {
// TODO: consider using event system for this, expose confidence
ts, recpt, err := a.StateManager.WaitForMessage(ctx, msg)
if err != nil {
return nil, err
}
return &api.MsgWait{
Receipt: *recpt,
TipSet: ts,
}, nil
}

View File

@ -4,10 +4,11 @@ import (
"bytes"
"context"
"fmt"
"golang.org/x/xerrors"
"math"
"strconv"
"golang.org/x/xerrors"
logging "github.com/ipfs/go-log"
"go.uber.org/fx"
@ -25,7 +26,7 @@ type ManagerApi struct {
full.MpoolAPI
full.WalletAPI
full.ChainAPI
full.StateAPI
}
type Manager struct {
@ -34,7 +35,7 @@ type Manager struct {
mpool full.MpoolAPI
wallet full.WalletAPI
chain full.ChainAPI
state full.StateAPI
}
func NewManager(sm *stmgr.StateManager, pchstore *Store, api ManagerApi) *Manager {
@ -44,7 +45,7 @@ func NewManager(sm *stmgr.StateManager, pchstore *Store, api ManagerApi) *Manage
mpool: api.MpoolAPI,
wallet: api.WalletAPI,
chain: api.ChainAPI,
state: api.StateAPI,
}
}

View File

@ -44,7 +44,7 @@ func (pm *Manager) createPaych(ctx context.Context, from, to address.Address, am
// TODO: wait outside the store lock!
// (tricky because we need to setup channel tracking before we know it's address)
mwait, err := pm.chain.ChainWaitMsg(ctx, mcid)
mwait, err := pm.state.StateWaitMsg(ctx, mcid)
if err != nil {
return address.Undef, cid.Undef, err
}
@ -85,7 +85,7 @@ func (pm *Manager) addFunds(ctx context.Context, ch address.Address, from addres
return err
}
mwait, err := pm.chain.ChainWaitMsg(ctx, smsg.Cid()) // TODO: wait outside the store lock!
mwait, err := pm.state.StateWaitMsg(ctx, smsg.Cid()) // TODO: wait outside the store lock!
if err != nil {
return err
}

View File

@ -54,11 +54,11 @@ type storageMinerApi interface {
StateMinerWorker(context.Context, address.Address, *types.TipSet) (address.Address, error)
StateMinerProvingPeriodEnd(context.Context, address.Address, *types.TipSet) (uint64, error)
StateMinerProvingSet(context.Context, address.Address, *types.TipSet) ([]*api.SectorInfo, error)
StateWaitMsg(context.Context, cid.Cid) (*api.MsgWait, error)
MpoolPushMessage(context.Context, *types.Message) (*types.SignedMessage, error)
ChainHead(context.Context) (*types.TipSet, error)
ChainWaitMsg(context.Context, cid.Cid) (*api.MsgWait, error)
ChainNotify(context.Context) (<-chan []*store.HeadChange, error)
ChainGetRandomness(context.Context, *types.TipSet, []*types.Ticket, int) ([]byte, error)
ChainGetTipSetByHeight(context.Context, uint64, *types.TipSet) (*types.TipSet, error)
@ -161,7 +161,7 @@ func (m *Miner) commitSector(ctx context.Context, sinfo sectorbuilder.SectorSeal
}
go func() {
_, err := m.api.ChainWaitMsg(ctx, smsg.Cid())
_, err := m.api.StateWaitMsg(ctx, smsg.Cid())
if err != nil {
return
}

View File

@ -149,7 +149,7 @@ func (m *Miner) computePost(ppe uint64) func(ts *types.TipSet, curH uint64) erro
log.Infof("Waiting for post %s to appear on chain", smsg.Cid())
// make sure it succeeds...
rec, err := m.api.ChainWaitMsg(ctx, smsg.Cid())
rec, err := m.api.StateWaitMsg(ctx, smsg.Cid())
if err != nil {
return err
}