diff --git a/chain/blocksync.go b/chain/blocksync.go index ebbe963d6..808d325db 100644 --- a/chain/blocksync.go +++ b/chain/blocksync.go @@ -7,7 +7,7 @@ import ( "math/rand" "sync" - exchange "github.com/ipfs/go-ipfs-exchange-interface" + bserv "github.com/ipfs/go-blockservice" "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/protocol" @@ -81,7 +81,6 @@ func NewBlockSyncService(cs *store.ChainStore) *BlockSyncService { func (bss *BlockSyncService) HandleStream(s inet.Stream) { defer s.Close() - log.Info("handling block sync request") var req BlockSyncRequest if err := cborrpc.ReadCborRPC(bufio.NewReader(s), &req); err != nil { @@ -185,16 +184,16 @@ func (bss *BlockSyncService) gatherMessages(ts *types.TipSet) ([]*types.SignedMe } type BlockSync struct { - bswap exchange.Interface + bserv bserv.BlockService newStream NewStreamFunc syncPeersLk sync.Mutex syncPeers map[peer.ID]struct{} } -func NewBlockSyncClient(bswap exchange.Interface, h host.Host) *BlockSync { +func NewBlockSyncClient(bserv bserv.BlockService, h host.Host) *BlockSync { return &BlockSync{ - bswap: bswap, + bserv: bserv, newStream: h.NewStream, syncPeers: make(map[peer.ID]struct{}), } @@ -379,7 +378,7 @@ func cidArrsEqual(a, b []cid.Cid) bool { } func (bs *BlockSync) GetBlock(ctx context.Context, c cid.Cid) (*types.BlockHeader, error) { - sb, err := bs.bswap.GetBlock(ctx, c) + sb, err := bs.bserv.GetBlock(ctx, c) if err != nil { return nil, err } @@ -396,10 +395,7 @@ func (bs *BlockSync) AddPeer(p peer.ID) { func (bs *BlockSync) FetchMessagesByCids(cids []cid.Cid) ([]*types.SignedMessage, error) { out := make([]*types.SignedMessage, len(cids)) - resp, err := bs.bswap.GetBlocks(context.TODO(), cids) - if err != nil { - return nil, err - } + resp := bs.bserv.GetBlocks(context.TODO(), cids) m := make(map[cid.Cid]int) for i, c := range cids { diff --git a/chain/store/store.go b/chain/store/store.go index 0275066f8..e8629e0a6 100644 --- a/chain/store/store.go +++ b/chain/store/store.go @@ -39,11 +39,31 @@ type ChainStore struct { } func NewChainStore(bs bstore.Blockstore, ds dstore.Batching) *ChainStore { - return &ChainStore{ + cs := &ChainStore{ bs: bs, ds: ds, bestTips: pubsub.New(64), } + + hcnf := func(rev, app []*types.TipSet) error { + for _, r := range rev { + cs.bestTips.Pub(&HeadChange{ + Type: HCRevert, + Val: r, + }, "headchange") + } + for _, r := range app { + cs.bestTips.Pub(&HeadChange{ + Type: HCApply, + Val: r, + }, "headchange") + } + return nil + } + + cs.headChangeNotifs = append(cs.headChangeNotifs, hcnf) + + return cs } func (cs *ChainStore) Load() error { @@ -157,7 +177,9 @@ func (cs *ChainStore) PutTipSet(ts *FullTipSet) error { } } - cs.MaybeTakeHeavierTipSet(ts.TipSet()) + if err := cs.MaybeTakeHeavierTipSet(ts.TipSet()); err != nil { + return errors.Wrap(err, "MaybeTakeHeavierTipSet failed in PutTipSet") + } return nil } @@ -170,7 +192,9 @@ func (cs *ChainStore) MaybeTakeHeavierTipSet(ts *types.TipSet) error { return err } for _, hcf := range cs.headChangeNotifs { - hcf(revert, apply) + if err := hcf(revert, apply); err != nil { + return errors.Wrap(err, "head change func errored (BAD)") + } } log.Infof("New heaviest tipset! %s", ts.Cids()) cs.heaviest = ts @@ -327,7 +351,9 @@ func (cs *ChainStore) AddBlock(b *types.BlockHeader) error { } ts, _ := types.NewTipSet([]*types.BlockHeader{b}) - cs.MaybeTakeHeavierTipSet(ts) + if err := cs.MaybeTakeHeavierTipSet(ts); err != nil { + return errors.Wrap(err, "MaybeTakeHeavierTipSet failed") + } return nil } diff --git a/chain/sub/incoming.go b/chain/sub/incoming.go index 24b5c2aca..c10d6f2c8 100644 --- a/chain/sub/incoming.go +++ b/chain/sub/incoming.go @@ -17,7 +17,7 @@ func HandleIncomingBlocks(ctx context.Context, bsub *pubsub.Subscription, s *cha for { msg, err := bsub.Next(ctx) if err != nil { - fmt.Println("error from block subscription: ", err) + log.Error("error from block subscription: ", err) continue } @@ -33,7 +33,7 @@ func HandleIncomingBlocks(ctx context.Context, bsub *pubsub.Subscription, s *cha log.Errorf("failed to fetch all messages for block received over pubusb: %s", err) return } - fmt.Println("inform new block over pubsub") + log.Info("inform new block over pubsub") s.InformNewBlock(msg.GetFrom(), &types.FullBlock{ Header: blk.Header, Messages: msgs, diff --git a/chain/sync.go b/chain/sync.go index 84837d471..a4cbc629f 100644 --- a/chain/sync.go +++ b/chain/sync.go @@ -321,7 +321,9 @@ func (syncer *Syncer) SyncBootstrap() { head := blockSet[len(blockSet)-1] log.Errorf("Finished syncing! new head: %s", head.Cids()) - syncer.store.MaybeTakeHeavierTipSet(selectedHead) + if err := syncer.store.MaybeTakeHeavierTipSet(selectedHead); err != nil { + log.Errorf("MaybeTakeHeavierTipSet failed: %s", err) + } syncer.head = head syncer.syncMode = CaughtUp } @@ -486,7 +488,9 @@ func (syncer *Syncer) SyncCaughtUp(maybeHead *store.FullTipSet) error { return errors.Wrap(err, "validate tipset failed") } - syncer.store.PutTipSet(ts) + if err := syncer.store.PutTipSet(ts); err != nil { + return errors.Wrap(err, "PutTipSet failed in SyncCaughtUp") + } } if err := syncer.store.PutTipSet(maybeHead); err != nil { diff --git a/chain/vm/vm.go b/chain/vm/vm.go index ec15e2db8..e4fea4d54 100644 --- a/chain/vm/vm.go +++ b/chain/vm/vm.go @@ -204,7 +204,10 @@ func (vm *VM) ApplyMessage(ctx context.Context, msg *types.Message) (*types.Mess defer span.End() st := vm.cstate - st.Snapshot() + if err := st.Snapshot(); err != nil { + return nil, xerrors.Errorf("snapshot failed: %w", err) + } + fromActor, err := st.GetActor(msg.From) if err != nil { return nil, xerrors.Errorf("from actor not found: %w", err) @@ -253,7 +256,10 @@ func (vm *VM) ApplyMessage(ctx context.Context, msg *types.Message) (*types.Mess if errcode = aerrors.RetCode(err); errcode != 0 { // revert all state changes since snapshot - st.Revert() + if err := st.Revert(); err != nil { + return nil, xerrors.Errorf("revert state failed: %w", err) + } + gascost := types.BigMul(vmctx.GasUsed(), msg.GasPrice) if err := DeductFunds(fromActor, gascost); err != nil { panic("invariant violated: " + err.Error()) diff --git a/cmd/lotus-storage-miner/init.go b/cmd/lotus-storage-miner/init.go index fabd04fdc..c625f4620 100644 --- a/cmd/lotus-storage-miner/init.go +++ b/cmd/lotus-storage-miner/init.go @@ -7,6 +7,7 @@ import ( "github.com/filecoin-project/go-lotus/build" "github.com/filecoin-project/go-lotus/chain/actors" + "github.com/filecoin-project/go-lotus/chain/address" "github.com/filecoin-project/go-lotus/chain/types" "github.com/filecoin-project/go-lotus/chain/wallet" lcli "github.com/filecoin-project/go-lotus/cli" @@ -155,9 +156,18 @@ var initCmd = &cli.Command{ log.Infof("Waiting for confirmation") - // TODO: Wait + mw, err := api.ChainWaitMsg(ctx, signed.Cid()) + if err != nil { + return err + } - // create actors and stuff + addr, err := address.NewFromBytes(mw.Receipt.Return) + if err != nil { + return err + } + + // TODO: persist this address in the storage-miner repo + log.Infof("New storage miners address is: %s", addr) // TODO: Point to setting storage price, maybe do it interactively or something log.Info("Storage miner successfully created, you can now start it with 'lotus-storage-miner run'") diff --git a/go.mod b/go.mod index e806e5c6d..3cdb78661 100644 --- a/go.mod +++ b/go.mod @@ -61,6 +61,7 @@ require ( github.com/polydawn/refmt v0.0.0-20190408063855-01bf1e26dd14 github.com/smartystreets/assertions v1.0.1 // indirect github.com/smartystreets/goconvey v0.0.0-20190710185942-9d28bd7c0945 // indirect + github.com/stretchr/objx v0.1.1 // indirect github.com/stretchr/testify v1.3.0 github.com/whyrusleeping/multiaddr-filter v0.0.0-20160516205228-e903e4adabd7 github.com/whyrusleeping/pubsub v0.0.0-20131020042734-02de8aa2db3d diff --git a/node/builder.go b/node/builder.go index 0aa7a119b..da79b06a8 100644 --- a/node/builder.go +++ b/node/builder.go @@ -9,6 +9,7 @@ import ( "github.com/ipfs/go-filestore" exchange "github.com/ipfs/go-ipfs-exchange-interface" + bserv "github.com/ipfs/go-blockservice" "github.com/ipfs/go-datastore" blockstore "github.com/ipfs/go-ipfs-blockstore" ipld "github.com/ipfs/go-ipld-format" @@ -190,6 +191,7 @@ func Online() Option { Override(new(blockstore.GCLocker), blockstore.NewGCLocker), Override(new(blockstore.GCBlockstore), blockstore.NewGCBlockstore), Override(new(exchange.Interface), modules.Bitswap), + Override(new(bserv.BlockService), bserv.New), Override(new(ipld.DAGService), testing.MemoryClientDag), // Filecoin services diff --git a/node/impl/full.go b/node/impl/full.go index f334345c7..e08233248 100644 --- a/node/impl/full.go +++ b/node/impl/full.go @@ -68,7 +68,7 @@ func (a *FullNodeAPI) ChainWaitMsg(ctx context.Context, msg cid.Cid) (*api.MsgWa return &api.MsgWait{ InBlock: blkcid, - Receipt: recpt, + Receipt: *recpt, }, nil } @@ -96,6 +96,9 @@ func (a *FullNodeAPI) MpoolPush(ctx context.Context, smsg *types.SignedMessage) if err != nil { return err } + if err := a.Mpool.Add(smsg); err != nil { + return err + } return a.PubSub.Publish("/fil/messages", msgb) }