Start implementing storage deals

This commit is contained in:
Łukasz Magiera 2019-08-01 19:12:41 +02:00 committed by whyrusleeping
parent 5b63e6aefc
commit ad9e433232
6 changed files with 171 additions and 6 deletions

View File

@ -117,6 +117,8 @@ type FullNode interface {
// ClientImport imports file under the specified path into filestore // ClientImport imports file under the specified path into filestore
ClientImport(ctx context.Context, path string) (cid.Cid, error) ClientImport(ctx context.Context, path string) (cid.Cid, error)
ClientStartDeal(ctx context.Context, data cid.Cid, miner address.Address, blocksDuration uint64) error
// ClientUnimport removes references to the specified file from filestore // ClientUnimport removes references to the specified file from filestore
//ClientUnimport(path string) //ClientUnimport(path string)

View File

@ -64,6 +64,7 @@ type FullNodeStruct struct {
ClientImport func(ctx context.Context, path string) (cid.Cid, error) `perm:"write"` ClientImport func(ctx context.Context, path string) (cid.Cid, error) `perm:"write"`
ClientListImports func(ctx context.Context) ([]Import, error) `perm:"read"` ClientListImports func(ctx context.Context) ([]Import, error) `perm:"read"`
ClientStartDeal func(ctx context.Context, data cid.Cid, miner address.Address, blocksDuration uint64) error `perm:"admin"`
StateMinerSectors func(context.Context, address.Address) ([]*SectorInfo, error) `perm:"read"` StateMinerSectors func(context.Context, address.Address) ([]*SectorInfo, error) `perm:"read"`
StateMinerProvingSet func(context.Context, address.Address) ([]*SectorInfo, error) `perm:"read"` StateMinerProvingSet func(context.Context, address.Address) ([]*SectorInfo, error) `perm:"read"`
@ -128,6 +129,10 @@ func (c *FullNodeStruct) ClientImport(ctx context.Context, path string) (cid.Cid
return c.Internal.ClientImport(ctx, path) return c.Internal.ClientImport(ctx, path)
} }
func (c *FullNodeStruct) ClientStartDeal(ctx context.Context, data cid.Cid, miner address.Address, blocksDuration uint64) error {
return c.Internal.ClientStartDeal(ctx, data, miner, blocksDuration)
}
func (c *FullNodeStruct) MpoolPending(ctx context.Context, ts *types.TipSet) ([]*types.SignedMessage, error) { func (c *FullNodeStruct) MpoolPending(ctx context.Context, ts *types.TipSet) ([]*types.SignedMessage, error) {
return c.Internal.MpoolPending(ctx, ts) return c.Internal.MpoolPending(ctx, ts)
} }

129
chain/deals/client.go Normal file
View File

@ -0,0 +1,129 @@
package deals
import (
"context"
"sync/atomic"
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log"
"github.com/libp2p/go-libp2p-core/peer"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-lotus/chain/actors"
"github.com/filecoin-project/go-lotus/chain/address"
"github.com/filecoin-project/go-lotus/chain/store"
"github.com/filecoin-project/go-lotus/chain/types"
"github.com/filecoin-project/go-lotus/chain/vm"
)
var log = logging.Logger("deals")
type DealStatus int
const (
DealResolvingMiner = DealStatus(iota)
)
type Deal struct {
ID uint64
Status DealStatus
}
type Client struct {
cs *store.ChainStore
next uint64
deals map[uint64]Deal
incoming chan Deal
stop chan struct{}
stopped chan struct{}
}
func NewClient(cs *store.ChainStore) *Client {
c := &Client{
cs: cs,
deals: map[uint64]Deal{},
incoming: make(chan Deal, 16),
stop: make(chan struct{}),
stopped: make(chan struct{}),
}
return c
}
func (c *Client) Run() {
go func() {
defer close(c.stopped)
for {
select {
case deal := <-c.incoming:
log.Info("incoming deal")
c.deals[deal.ID] = deal
case <-c.stop:
return
}
}
}()
}
func (c *Client) Start(ctx context.Context, data cid.Cid, miner address.Address, blocksDuration uint64) (uint64, error) {
// Getting PeerID
// TODO: Is there a nicer way?
ts := c.cs.GetHeaviestTipSet()
state, err := c.cs.TipSetState(ts.Cids())
if err != nil {
return 0, err
}
vmi, err := vm.NewVM(state, ts.Height(), ts.Blocks()[0].Miner, c.cs)
if err != nil {
return 0, xerrors.Errorf("failed to set up vm: %w", err)
}
msg := &types.Message{
To: miner,
Method: actors.MAMethods.GetPeerID,
Value: types.NewInt(0),
GasPrice: types.NewInt(0),
GasLimit: types.NewInt(10000000000),
}
// TODO: maybe just use the invoker directly?
r, err := vmi.ApplyMessage(ctx, msg)
if err != nil {
return 0, err
}
if r.ExitCode != 0 {
panic("TODO: do we error here?")
}
pid, err := peer.IDFromBytes(r.Return)
if err != nil {
return 0, err
}
log.Warnf("miner pid:%s", pid)
id := atomic.AddUint64(&c.next, 1)
deal := Deal{
ID: id,
Status: DealResolvingMiner,
}
c.incoming <- deal
return id, nil
}
func (c *Client) Stop() {
close(c.stop)
<-c.stopped
}

View File

@ -19,6 +19,7 @@ import (
"github.com/filecoin-project/go-lotus/api" "github.com/filecoin-project/go-lotus/api"
"github.com/filecoin-project/go-lotus/chain" "github.com/filecoin-project/go-lotus/chain"
"github.com/filecoin-project/go-lotus/chain/deals"
"github.com/filecoin-project/go-lotus/chain/store" "github.com/filecoin-project/go-lotus/chain/store"
"github.com/filecoin-project/go-lotus/chain/types" "github.com/filecoin-project/go-lotus/chain/types"
"github.com/filecoin-project/go-lotus/chain/wallet" "github.com/filecoin-project/go-lotus/chain/wallet"
@ -71,6 +72,8 @@ const (
HandleIncomingBlocksKey HandleIncomingBlocksKey
HandleIncomingMessagesKey HandleIncomingMessagesKey
RunDealClientKey
// daemon // daemon
ExtractApiKey ExtractApiKey
@ -207,6 +210,9 @@ func Online() Option {
Override(RunHelloKey, modules.RunHello), Override(RunHelloKey, modules.RunHello),
Override(RunBlockSyncKey, modules.RunBlockSync), Override(RunBlockSyncKey, modules.RunBlockSync),
Override(HandleIncomingBlocksKey, modules.HandleIncomingBlocks), Override(HandleIncomingBlocksKey, modules.HandleIncomingBlocks),
Override(new(*deals.Client), deals.NewClient),
Override(RunDealClientKey, modules.RunDealClient),
), ),
// Storage miner // Storage miner

View File

@ -11,6 +11,7 @@ import (
"github.com/filecoin-project/go-lotus/chain" "github.com/filecoin-project/go-lotus/chain"
"github.com/filecoin-project/go-lotus/chain/actors" "github.com/filecoin-project/go-lotus/chain/actors"
"github.com/filecoin-project/go-lotus/chain/address" "github.com/filecoin-project/go-lotus/chain/address"
"github.com/filecoin-project/go-lotus/chain/deals"
"github.com/filecoin-project/go-lotus/chain/gen" "github.com/filecoin-project/go-lotus/chain/gen"
"github.com/filecoin-project/go-lotus/chain/state" "github.com/filecoin-project/go-lotus/chain/state"
"github.com/filecoin-project/go-lotus/chain/store" "github.com/filecoin-project/go-lotus/chain/store"
@ -34,12 +35,18 @@ type FullNodeAPI struct {
CommonAPI CommonAPI
DealClient *deals.Client
Chain *store.ChainStore Chain *store.ChainStore
PubSub *pubsub.PubSub PubSub *pubsub.PubSub
Mpool *chain.MessagePool Mpool *chain.MessagePool
Wallet *wallet.Wallet Wallet *wallet.Wallet
} }
func (a *FullNodeAPI) ClientStartDeal(ctx context.Context, data cid.Cid, miner address.Address, blocksDuration uint64) error {
_, err := a.DealClient.Start(ctx, data, miner, blocksDuration)
return err
}
func (a *FullNodeAPI) ChainNotify(ctx context.Context) (<-chan *store.HeadChange, error) { func (a *FullNodeAPI) ChainNotify(ctx context.Context) (<-chan *store.HeadChange, error) {
return a.Chain.SubHeadChanges(ctx), nil return a.Chain.SubHeadChanges(ctx), nil
} }

View File

@ -1,12 +1,15 @@
package modules package modules
import ( import (
"context"
"github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/host"
inet "github.com/libp2p/go-libp2p-core/network" inet "github.com/libp2p/go-libp2p-core/network"
pubsub "github.com/libp2p/go-libp2p-pubsub" pubsub "github.com/libp2p/go-libp2p-pubsub"
"go.uber.org/fx" "go.uber.org/fx"
"github.com/filecoin-project/go-lotus/chain" "github.com/filecoin-project/go-lotus/chain"
"github.com/filecoin-project/go-lotus/chain/deals"
"github.com/filecoin-project/go-lotus/chain/sub" "github.com/filecoin-project/go-lotus/chain/sub"
"github.com/filecoin-project/go-lotus/node/hello" "github.com/filecoin-project/go-lotus/node/hello"
"github.com/filecoin-project/go-lotus/node/modules/helpers" "github.com/filecoin-project/go-lotus/node/modules/helpers"
@ -53,3 +56,16 @@ func HandleIncomingMessages(mctx helpers.MetricsCtx, lc fx.Lifecycle, pubsub *pu
go sub.HandleIncomingMessages(ctx, mpool, msgsub) go sub.HandleIncomingMessages(ctx, mpool, msgsub)
} }
func RunDealClient(lc fx.Lifecycle, c *deals.Client) {
lc.Append(fx.Hook{
OnStart: func(context.Context) error {
c.Run()
return nil
},
OnStop: func(context.Context) error {
c.Stop()
return nil
},
})
}