Make message wait work and use it for the storage-miner init

This commit is contained in:
whyrusleeping 2019-07-27 22:35:32 -07:00
parent fa5e27d7b9
commit 0c67d66198
9 changed files with 71 additions and 23 deletions

View File

@ -7,7 +7,7 @@ import (
"math/rand"
"sync"
exchange "github.com/ipfs/go-ipfs-exchange-interface"
bserv "github.com/ipfs/go-blockservice"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/protocol"
@ -81,7 +81,6 @@ func NewBlockSyncService(cs *store.ChainStore) *BlockSyncService {
func (bss *BlockSyncService) HandleStream(s inet.Stream) {
defer s.Close()
log.Info("handling block sync request")
var req BlockSyncRequest
if err := cborrpc.ReadCborRPC(bufio.NewReader(s), &req); err != nil {
@ -185,16 +184,16 @@ func (bss *BlockSyncService) gatherMessages(ts *types.TipSet) ([]*types.SignedMe
}
type BlockSync struct {
bswap exchange.Interface
bserv bserv.BlockService
newStream NewStreamFunc
syncPeersLk sync.Mutex
syncPeers map[peer.ID]struct{}
}
func NewBlockSyncClient(bswap exchange.Interface, h host.Host) *BlockSync {
func NewBlockSyncClient(bserv bserv.BlockService, h host.Host) *BlockSync {
return &BlockSync{
bswap: bswap,
bserv: bserv,
newStream: h.NewStream,
syncPeers: make(map[peer.ID]struct{}),
}
@ -379,7 +378,7 @@ func cidArrsEqual(a, b []cid.Cid) bool {
}
func (bs *BlockSync) GetBlock(ctx context.Context, c cid.Cid) (*types.BlockHeader, error) {
sb, err := bs.bswap.GetBlock(ctx, c)
sb, err := bs.bserv.GetBlock(ctx, c)
if err != nil {
return nil, err
}
@ -396,10 +395,7 @@ func (bs *BlockSync) AddPeer(p peer.ID) {
func (bs *BlockSync) FetchMessagesByCids(cids []cid.Cid) ([]*types.SignedMessage, error) {
out := make([]*types.SignedMessage, len(cids))
resp, err := bs.bswap.GetBlocks(context.TODO(), cids)
if err != nil {
return nil, err
}
resp := bs.bserv.GetBlocks(context.TODO(), cids)
m := make(map[cid.Cid]int)
for i, c := range cids {

View File

@ -39,11 +39,31 @@ type ChainStore struct {
}
func NewChainStore(bs bstore.Blockstore, ds dstore.Batching) *ChainStore {
return &ChainStore{
cs := &ChainStore{
bs: bs,
ds: ds,
bestTips: pubsub.New(64),
}
hcnf := func(rev, app []*types.TipSet) error {
for _, r := range rev {
cs.bestTips.Pub(&HeadChange{
Type: HCRevert,
Val: r,
}, "headchange")
}
for _, r := range app {
cs.bestTips.Pub(&HeadChange{
Type: HCApply,
Val: r,
}, "headchange")
}
return nil
}
cs.headChangeNotifs = append(cs.headChangeNotifs, hcnf)
return cs
}
func (cs *ChainStore) Load() error {
@ -157,7 +177,9 @@ func (cs *ChainStore) PutTipSet(ts *FullTipSet) error {
}
}
cs.MaybeTakeHeavierTipSet(ts.TipSet())
if err := cs.MaybeTakeHeavierTipSet(ts.TipSet()); err != nil {
return errors.Wrap(err, "MaybeTakeHeavierTipSet failed in PutTipSet")
}
return nil
}
@ -170,7 +192,9 @@ func (cs *ChainStore) MaybeTakeHeavierTipSet(ts *types.TipSet) error {
return err
}
for _, hcf := range cs.headChangeNotifs {
hcf(revert, apply)
if err := hcf(revert, apply); err != nil {
return errors.Wrap(err, "head change func errored (BAD)")
}
}
log.Infof("New heaviest tipset! %s", ts.Cids())
cs.heaviest = ts
@ -327,7 +351,9 @@ func (cs *ChainStore) AddBlock(b *types.BlockHeader) error {
}
ts, _ := types.NewTipSet([]*types.BlockHeader{b})
cs.MaybeTakeHeavierTipSet(ts)
if err := cs.MaybeTakeHeavierTipSet(ts); err != nil {
return errors.Wrap(err, "MaybeTakeHeavierTipSet failed")
}
return nil
}

View File

@ -17,7 +17,7 @@ func HandleIncomingBlocks(ctx context.Context, bsub *pubsub.Subscription, s *cha
for {
msg, err := bsub.Next(ctx)
if err != nil {
fmt.Println("error from block subscription: ", err)
log.Error("error from block subscription: ", err)
continue
}
@ -33,7 +33,7 @@ func HandleIncomingBlocks(ctx context.Context, bsub *pubsub.Subscription, s *cha
log.Errorf("failed to fetch all messages for block received over pubusb: %s", err)
return
}
fmt.Println("inform new block over pubsub")
log.Info("inform new block over pubsub")
s.InformNewBlock(msg.GetFrom(), &types.FullBlock{
Header: blk.Header,
Messages: msgs,

View File

@ -321,7 +321,9 @@ func (syncer *Syncer) SyncBootstrap() {
head := blockSet[len(blockSet)-1]
log.Errorf("Finished syncing! new head: %s", head.Cids())
syncer.store.MaybeTakeHeavierTipSet(selectedHead)
if err := syncer.store.MaybeTakeHeavierTipSet(selectedHead); err != nil {
log.Errorf("MaybeTakeHeavierTipSet failed: %s", err)
}
syncer.head = head
syncer.syncMode = CaughtUp
}
@ -486,7 +488,9 @@ func (syncer *Syncer) SyncCaughtUp(maybeHead *store.FullTipSet) error {
return errors.Wrap(err, "validate tipset failed")
}
syncer.store.PutTipSet(ts)
if err := syncer.store.PutTipSet(ts); err != nil {
return errors.Wrap(err, "PutTipSet failed in SyncCaughtUp")
}
}
if err := syncer.store.PutTipSet(maybeHead); err != nil {

View File

@ -204,7 +204,10 @@ func (vm *VM) ApplyMessage(ctx context.Context, msg *types.Message) (*types.Mess
defer span.End()
st := vm.cstate
st.Snapshot()
if err := st.Snapshot(); err != nil {
return nil, xerrors.Errorf("snapshot failed: %w", err)
}
fromActor, err := st.GetActor(msg.From)
if err != nil {
return nil, xerrors.Errorf("from actor not found: %w", err)
@ -253,7 +256,10 @@ func (vm *VM) ApplyMessage(ctx context.Context, msg *types.Message) (*types.Mess
if errcode = aerrors.RetCode(err); errcode != 0 {
// revert all state changes since snapshot
st.Revert()
if err := st.Revert(); err != nil {
return nil, xerrors.Errorf("revert state failed: %w", err)
}
gascost := types.BigMul(vmctx.GasUsed(), msg.GasPrice)
if err := DeductFunds(fromActor, gascost); err != nil {
panic("invariant violated: " + err.Error())

View File

@ -7,6 +7,7 @@ import (
"github.com/filecoin-project/go-lotus/build"
"github.com/filecoin-project/go-lotus/chain/actors"
"github.com/filecoin-project/go-lotus/chain/address"
"github.com/filecoin-project/go-lotus/chain/types"
"github.com/filecoin-project/go-lotus/chain/wallet"
lcli "github.com/filecoin-project/go-lotus/cli"
@ -155,9 +156,18 @@ var initCmd = &cli.Command{
log.Infof("Waiting for confirmation")
// TODO: Wait
mw, err := api.ChainWaitMsg(ctx, signed.Cid())
if err != nil {
return err
}
// create actors and stuff
addr, err := address.NewFromBytes(mw.Receipt.Return)
if err != nil {
return err
}
// TODO: persist this address in the storage-miner repo
log.Infof("New storage miners address is: %s", addr)
// TODO: Point to setting storage price, maybe do it interactively or something
log.Info("Storage miner successfully created, you can now start it with 'lotus-storage-miner run'")

1
go.mod
View File

@ -61,6 +61,7 @@ require (
github.com/polydawn/refmt v0.0.0-20190408063855-01bf1e26dd14
github.com/smartystreets/assertions v1.0.1 // indirect
github.com/smartystreets/goconvey v0.0.0-20190710185942-9d28bd7c0945 // indirect
github.com/stretchr/objx v0.1.1 // indirect
github.com/stretchr/testify v1.3.0
github.com/whyrusleeping/multiaddr-filter v0.0.0-20160516205228-e903e4adabd7
github.com/whyrusleeping/pubsub v0.0.0-20131020042734-02de8aa2db3d

View File

@ -9,6 +9,7 @@ import (
"github.com/ipfs/go-filestore"
exchange "github.com/ipfs/go-ipfs-exchange-interface"
bserv "github.com/ipfs/go-blockservice"
"github.com/ipfs/go-datastore"
blockstore "github.com/ipfs/go-ipfs-blockstore"
ipld "github.com/ipfs/go-ipld-format"
@ -190,6 +191,7 @@ func Online() Option {
Override(new(blockstore.GCLocker), blockstore.NewGCLocker),
Override(new(blockstore.GCBlockstore), blockstore.NewGCBlockstore),
Override(new(exchange.Interface), modules.Bitswap),
Override(new(bserv.BlockService), bserv.New),
Override(new(ipld.DAGService), testing.MemoryClientDag),
// Filecoin services

View File

@ -68,7 +68,7 @@ func (a *FullNodeAPI) ChainWaitMsg(ctx context.Context, msg cid.Cid) (*api.MsgWa
return &api.MsgWait{
InBlock: blkcid,
Receipt: recpt,
Receipt: *recpt,
}, nil
}
@ -96,6 +96,9 @@ func (a *FullNodeAPI) MpoolPush(ctx context.Context, smsg *types.SignedMessage)
if err != nil {
return err
}
if err := a.Mpool.Add(smsg); err != nil {
return err
}
return a.PubSub.Publish("/fil/messages", msgb)
}