Merge pull request #103 from filecoin-project/feat/msg-wait
Make message wait work, use it for storage miner init
This commit is contained in:
commit
f80050307c
@ -7,7 +7,7 @@ import (
|
||||
"math/rand"
|
||||
"sync"
|
||||
|
||||
exchange "github.com/ipfs/go-ipfs-exchange-interface"
|
||||
bserv "github.com/ipfs/go-blockservice"
|
||||
"github.com/libp2p/go-libp2p-core/host"
|
||||
"github.com/libp2p/go-libp2p-core/protocol"
|
||||
|
||||
@ -81,7 +81,6 @@ func NewBlockSyncService(cs *store.ChainStore) *BlockSyncService {
|
||||
|
||||
func (bss *BlockSyncService) HandleStream(s inet.Stream) {
|
||||
defer s.Close()
|
||||
log.Info("handling block sync request")
|
||||
|
||||
var req BlockSyncRequest
|
||||
if err := cborrpc.ReadCborRPC(bufio.NewReader(s), &req); err != nil {
|
||||
@ -185,16 +184,16 @@ func (bss *BlockSyncService) gatherMessages(ts *types.TipSet) ([]*types.SignedMe
|
||||
}
|
||||
|
||||
type BlockSync struct {
|
||||
bswap exchange.Interface
|
||||
bserv bserv.BlockService
|
||||
newStream NewStreamFunc
|
||||
|
||||
syncPeersLk sync.Mutex
|
||||
syncPeers map[peer.ID]struct{}
|
||||
}
|
||||
|
||||
func NewBlockSyncClient(bswap exchange.Interface, h host.Host) *BlockSync {
|
||||
func NewBlockSyncClient(bserv bserv.BlockService, h host.Host) *BlockSync {
|
||||
return &BlockSync{
|
||||
bswap: bswap,
|
||||
bserv: bserv,
|
||||
newStream: h.NewStream,
|
||||
syncPeers: make(map[peer.ID]struct{}),
|
||||
}
|
||||
@ -379,7 +378,7 @@ func cidArrsEqual(a, b []cid.Cid) bool {
|
||||
}
|
||||
|
||||
func (bs *BlockSync) GetBlock(ctx context.Context, c cid.Cid) (*types.BlockHeader, error) {
|
||||
sb, err := bs.bswap.GetBlock(ctx, c)
|
||||
sb, err := bs.bserv.GetBlock(ctx, c)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -396,10 +395,7 @@ func (bs *BlockSync) AddPeer(p peer.ID) {
|
||||
func (bs *BlockSync) FetchMessagesByCids(cids []cid.Cid) ([]*types.SignedMessage, error) {
|
||||
out := make([]*types.SignedMessage, len(cids))
|
||||
|
||||
resp, err := bs.bswap.GetBlocks(context.TODO(), cids)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
resp := bs.bserv.GetBlocks(context.TODO(), cids)
|
||||
|
||||
m := make(map[cid.Cid]int)
|
||||
for i, c := range cids {
|
||||
|
@ -39,11 +39,31 @@ type ChainStore struct {
|
||||
}
|
||||
|
||||
func NewChainStore(bs bstore.Blockstore, ds dstore.Batching) *ChainStore {
|
||||
return &ChainStore{
|
||||
cs := &ChainStore{
|
||||
bs: bs,
|
||||
ds: ds,
|
||||
bestTips: pubsub.New(64),
|
||||
}
|
||||
|
||||
hcnf := func(rev, app []*types.TipSet) error {
|
||||
for _, r := range rev {
|
||||
cs.bestTips.Pub(&HeadChange{
|
||||
Type: HCRevert,
|
||||
Val: r,
|
||||
}, "headchange")
|
||||
}
|
||||
for _, r := range app {
|
||||
cs.bestTips.Pub(&HeadChange{
|
||||
Type: HCApply,
|
||||
Val: r,
|
||||
}, "headchange")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
cs.headChangeNotifs = append(cs.headChangeNotifs, hcnf)
|
||||
|
||||
return cs
|
||||
}
|
||||
|
||||
func (cs *ChainStore) Load() error {
|
||||
@ -157,7 +177,9 @@ func (cs *ChainStore) PutTipSet(ts *FullTipSet) error {
|
||||
}
|
||||
}
|
||||
|
||||
cs.MaybeTakeHeavierTipSet(ts.TipSet())
|
||||
if err := cs.MaybeTakeHeavierTipSet(ts.TipSet()); err != nil {
|
||||
return errors.Wrap(err, "MaybeTakeHeavierTipSet failed in PutTipSet")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -170,7 +192,9 @@ func (cs *ChainStore) MaybeTakeHeavierTipSet(ts *types.TipSet) error {
|
||||
return err
|
||||
}
|
||||
for _, hcf := range cs.headChangeNotifs {
|
||||
hcf(revert, apply)
|
||||
if err := hcf(revert, apply); err != nil {
|
||||
return errors.Wrap(err, "head change func errored (BAD)")
|
||||
}
|
||||
}
|
||||
log.Infof("New heaviest tipset! %s", ts.Cids())
|
||||
cs.heaviest = ts
|
||||
@ -327,7 +351,9 @@ func (cs *ChainStore) AddBlock(b *types.BlockHeader) error {
|
||||
}
|
||||
|
||||
ts, _ := types.NewTipSet([]*types.BlockHeader{b})
|
||||
cs.MaybeTakeHeavierTipSet(ts)
|
||||
if err := cs.MaybeTakeHeavierTipSet(ts); err != nil {
|
||||
return errors.Wrap(err, "MaybeTakeHeavierTipSet failed")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -17,7 +17,7 @@ func HandleIncomingBlocks(ctx context.Context, bsub *pubsub.Subscription, s *cha
|
||||
for {
|
||||
msg, err := bsub.Next(ctx)
|
||||
if err != nil {
|
||||
fmt.Println("error from block subscription: ", err)
|
||||
log.Error("error from block subscription: ", err)
|
||||
continue
|
||||
}
|
||||
|
||||
@ -33,7 +33,7 @@ func HandleIncomingBlocks(ctx context.Context, bsub *pubsub.Subscription, s *cha
|
||||
log.Errorf("failed to fetch all messages for block received over pubusb: %s", err)
|
||||
return
|
||||
}
|
||||
fmt.Println("inform new block over pubsub")
|
||||
log.Info("inform new block over pubsub")
|
||||
s.InformNewBlock(msg.GetFrom(), &types.FullBlock{
|
||||
Header: blk.Header,
|
||||
Messages: msgs,
|
||||
|
@ -321,7 +321,9 @@ func (syncer *Syncer) SyncBootstrap() {
|
||||
|
||||
head := blockSet[len(blockSet)-1]
|
||||
log.Errorf("Finished syncing! new head: %s", head.Cids())
|
||||
syncer.store.MaybeTakeHeavierTipSet(selectedHead)
|
||||
if err := syncer.store.MaybeTakeHeavierTipSet(selectedHead); err != nil {
|
||||
log.Errorf("MaybeTakeHeavierTipSet failed: %s", err)
|
||||
}
|
||||
syncer.head = head
|
||||
syncer.syncMode = CaughtUp
|
||||
}
|
||||
@ -486,7 +488,9 @@ func (syncer *Syncer) SyncCaughtUp(maybeHead *store.FullTipSet) error {
|
||||
return errors.Wrap(err, "validate tipset failed")
|
||||
}
|
||||
|
||||
syncer.store.PutTipSet(ts)
|
||||
if err := syncer.store.PutTipSet(ts); err != nil {
|
||||
return errors.Wrap(err, "PutTipSet failed in SyncCaughtUp")
|
||||
}
|
||||
}
|
||||
|
||||
if err := syncer.store.PutTipSet(maybeHead); err != nil {
|
||||
|
@ -204,7 +204,10 @@ func (vm *VM) ApplyMessage(ctx context.Context, msg *types.Message) (*types.Mess
|
||||
defer span.End()
|
||||
|
||||
st := vm.cstate
|
||||
st.Snapshot()
|
||||
if err := st.Snapshot(); err != nil {
|
||||
return nil, xerrors.Errorf("snapshot failed: %w", err)
|
||||
}
|
||||
|
||||
fromActor, err := st.GetActor(msg.From)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("from actor not found: %w", err)
|
||||
@ -253,7 +256,10 @@ func (vm *VM) ApplyMessage(ctx context.Context, msg *types.Message) (*types.Mess
|
||||
|
||||
if errcode = aerrors.RetCode(err); errcode != 0 {
|
||||
// revert all state changes since snapshot
|
||||
st.Revert()
|
||||
if err := st.Revert(); err != nil {
|
||||
return nil, xerrors.Errorf("revert state failed: %w", err)
|
||||
}
|
||||
|
||||
gascost := types.BigMul(vmctx.GasUsed(), msg.GasPrice)
|
||||
if err := DeductFunds(fromActor, gascost); err != nil {
|
||||
panic("invariant violated: " + err.Error())
|
||||
|
@ -7,6 +7,7 @@ import (
|
||||
|
||||
"github.com/filecoin-project/go-lotus/build"
|
||||
"github.com/filecoin-project/go-lotus/chain/actors"
|
||||
"github.com/filecoin-project/go-lotus/chain/address"
|
||||
"github.com/filecoin-project/go-lotus/chain/types"
|
||||
"github.com/filecoin-project/go-lotus/chain/wallet"
|
||||
lcli "github.com/filecoin-project/go-lotus/cli"
|
||||
@ -155,9 +156,18 @@ var initCmd = &cli.Command{
|
||||
|
||||
log.Infof("Waiting for confirmation")
|
||||
|
||||
// TODO: Wait
|
||||
mw, err := api.ChainWaitMsg(ctx, signed.Cid())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// create actors and stuff
|
||||
addr, err := address.NewFromBytes(mw.Receipt.Return)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// TODO: persist this address in the storage-miner repo
|
||||
log.Infof("New storage miners address is: %s", addr)
|
||||
|
||||
// TODO: Point to setting storage price, maybe do it interactively or something
|
||||
log.Info("Storage miner successfully created, you can now start it with 'lotus-storage-miner run'")
|
||||
|
@ -9,6 +9,7 @@ import (
|
||||
"github.com/ipfs/go-filestore"
|
||||
exchange "github.com/ipfs/go-ipfs-exchange-interface"
|
||||
|
||||
bserv "github.com/ipfs/go-blockservice"
|
||||
"github.com/ipfs/go-datastore"
|
||||
blockstore "github.com/ipfs/go-ipfs-blockstore"
|
||||
ipld "github.com/ipfs/go-ipld-format"
|
||||
@ -190,6 +191,7 @@ func Online() Option {
|
||||
Override(new(blockstore.GCLocker), blockstore.NewGCLocker),
|
||||
Override(new(blockstore.GCBlockstore), blockstore.NewGCBlockstore),
|
||||
Override(new(exchange.Interface), modules.Bitswap),
|
||||
Override(new(bserv.BlockService), bserv.New),
|
||||
Override(new(ipld.DAGService), testing.MemoryClientDag),
|
||||
|
||||
// Filecoin services
|
||||
|
@ -61,7 +61,15 @@ func (a *FullNodeAPI) ChainGetRandomness(ctx context.Context, pts *types.TipSet)
|
||||
}
|
||||
|
||||
func (a *FullNodeAPI) ChainWaitMsg(ctx context.Context, msg cid.Cid) (*api.MsgWait, error) {
|
||||
panic("TODO")
|
||||
blkcid, recpt, err := a.Chain.WaitForMessage(ctx, msg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &api.MsgWait{
|
||||
InBlock: blkcid,
|
||||
Receipt: *recpt,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (a *FullNodeAPI) ChainGetBlock(ctx context.Context, msg cid.Cid) (*types.BlockHeader, error) {
|
||||
@ -88,6 +96,9 @@ func (a *FullNodeAPI) MpoolPush(ctx context.Context, smsg *types.SignedMessage)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := a.Mpool.Add(smsg); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return a.PubSub.Publish("/fil/messages", msgb)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user