Merge pull request #193 from filecoin-project/feat/ask-query

implement ask query protocol
This commit is contained in:
Whyrusleeping 2019-09-17 02:44:05 +10:00 committed by GitHub
commit c31151684b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 369 additions and 19 deletions

View File

@ -2,6 +2,7 @@ package api
import (
"context"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-filestore"
cbor "github.com/ipfs/go-ipld-cbor"
@ -89,6 +90,7 @@ type FullNode interface {
ClientHasLocal(ctx context.Context, root cid.Cid) (bool, error)
ClientFindData(ctx context.Context, root cid.Cid) ([]QueryOffer, error) // TODO: specify serialization mode we want (defaults to unixfs for now)
ClientRetrieve(ctx context.Context, order RetrievalOrder, path string) error
ClientQueryAsk(ctx context.Context, p peer.ID, miner address.Address) (*types.SignedStorageAsk, error)
// ClientUnimport removes references to the specified file from filestore
//ClientUnimport(path string)

View File

@ -72,6 +72,7 @@ type FullNodeStruct struct {
ClientStartDeal func(ctx context.Context, data cid.Cid, miner address.Address, price types.BigInt, blocksDuration uint64) (*cid.Cid, error) `perm:"admin"`
ClientListDeals func(ctx context.Context) ([]DealInfo, error) `perm:"write"`
ClientRetrieve func(ctx context.Context, order RetrievalOrder, path string) error `perm:"admin"`
ClientQueryAsk func(ctx context.Context, p peer.ID, miner address.Address) (*types.SignedStorageAsk, error) `perm:"read"`
StateMinerSectors func(context.Context, address.Address) ([]*SectorInfo, error) `perm:"read"`
StateMinerProvingSet func(context.Context, address.Address) ([]*SectorInfo, error) `perm:"read"`
@ -178,6 +179,10 @@ func (c *FullNodeStruct) ClientRetrieve(ctx context.Context, order RetrievalOrde
return c.Internal.ClientRetrieve(ctx, order, path)
}
func (c *FullNodeStruct) ClientQueryAsk(ctx context.Context, p peer.ID, miner address.Address) (*types.SignedStorageAsk, error) {
return c.Internal.ClientQueryAsk(ctx, p, miner)
}
func (c *FullNodeStruct) MpoolPending(ctx context.Context, ts *types.TipSet) ([]*types.SignedMessage, error) {
return c.Internal.MpoolPending(ctx, ts)
}

163
chain/deals/asks.go Normal file
View File

@ -0,0 +1,163 @@
package deals
import (
"context"
"time"
"github.com/filecoin-project/go-lotus/chain/address"
"github.com/filecoin-project/go-lotus/chain/stmgr"
"github.com/filecoin-project/go-lotus/chain/types"
"github.com/filecoin-project/go-lotus/lib/cborrpc"
datastore "github.com/ipfs/go-datastore"
cbor "github.com/ipfs/go-ipld-cbor"
inet "github.com/libp2p/go-libp2p-core/network"
"golang.org/x/xerrors"
)
func (h *Handler) SetPrice(p types.BigInt, ttlsecs int64) error {
h.askLk.Lock()
defer h.askLk.Unlock()
var seqno uint64
if h.ask != nil {
seqno = h.ask.Ask.SeqNo + 1
}
now := time.Now().Unix()
ask := &types.StorageAsk{
Price: p,
Timestamp: now,
Expiry: now + ttlsecs,
Miner: h.actor,
SeqNo: seqno,
MinPieceSize: h.minPieceSize,
}
ssa, err := h.signAsk(ask)
if err != nil {
return err
}
return h.saveAsk(ssa)
}
func (h *Handler) getAsk(m address.Address) *types.SignedStorageAsk {
h.askLk.Lock()
defer h.askLk.Unlock()
if m != h.actor {
return nil
}
return h.ask
}
func (h *Handler) HandleAskStream(s inet.Stream) {
defer s.Close()
var ar AskRequest
if err := cborrpc.ReadCborRPC(s, &ar); err != nil {
log.Errorf("failed to read AskRequest from incoming stream: %s", err)
return
}
resp := h.processAskRequest(&ar)
if err := cborrpc.WriteCborRPC(s, resp); err != nil {
log.Errorf("failed to write ask response: %s", err)
return
}
}
func (h *Handler) processAskRequest(ar *AskRequest) *AskResponse {
return &AskResponse{
Ask: h.getAsk(ar.Miner),
}
}
var bestAskKey = datastore.NewKey("latest-ask")
func (h *Handler) tryLoadAsk() error {
h.askLk.Lock()
defer h.askLk.Unlock()
err := h.loadAsk()
if err != nil {
if xerrors.Is(err, datastore.ErrNotFound) {
log.Warn("no previous ask found, miner will not accept deals until a price is set")
return nil
}
return err
}
return nil
}
func (h *Handler) loadAsk() error {
askb, err := h.ds.Get(datastore.NewKey("latest-ask"))
if err != nil {
return xerrors.Errorf("failed to load most recent ask from disk: %w", err)
}
var ssa types.SignedStorageAsk
if err := cbor.DecodeInto(askb, &ssa); err != nil {
return err
}
h.ask = &ssa
return nil
}
func (h *Handler) signAsk(a *types.StorageAsk) (*types.SignedStorageAsk, error) {
b, err := cbor.DumpObject(a)
if err != nil {
return nil, err
}
worker, err := h.getWorker(h.actor)
if err != nil {
return nil, xerrors.Errorf("failed to get worker to sign ask: %w", err)
}
sig, err := h.full.WalletSign(context.TODO(), worker, b)
if err != nil {
return nil, err
}
return &types.SignedStorageAsk{
Ask: a,
Signature: sig,
}, nil
}
func (h *Handler) saveAsk(a *types.SignedStorageAsk) error {
b, err := cbor.DumpObject(a)
if err != nil {
return err
}
if err := h.ds.Put(bestAskKey, b); err != nil {
return err
}
h.ask = a
return nil
}
func (c *Client) checkAskSignature(ask *types.SignedStorageAsk) error {
tss, err := c.sm.TipSetState(c.sm.ChainStore().GetHeaviestTipSet().Cids())
if err != nil {
return xerrors.Errorf("failed to get tipsetstate to query for miner worker: %w", err)
}
w, err := stmgr.GetMinerWorker(context.TODO(), c.sm, tss, ask.Ask.Miner)
if err != nil {
return xerrors.Errorf("failed to get worker for miner in ask", err)
}
sigb, err := cbor.DumpObject(ask.Ask)
if err != nil {
return xerrors.Errorf("failed to re-serialize ask")
}
return ask.Signature.Verify(w, sigb)
}

View File

@ -12,13 +12,15 @@ import (
"github.com/libp2p/go-libp2p-core/host"
inet "github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-lotus/api"
"github.com/filecoin-project/go-lotus/chain/actors"
"github.com/filecoin-project/go-lotus/chain/address"
"github.com/filecoin-project/go-lotus/chain/store"
"github.com/filecoin-project/go-lotus/chain/stmgr"
"github.com/filecoin-project/go-lotus/chain/types"
"github.com/filecoin-project/go-lotus/chain/wallet"
"github.com/filecoin-project/go-lotus/lib/cborrpc"
"github.com/filecoin-project/go-lotus/node/modules/dtypes"
"github.com/filecoin-project/go-lotus/retrieval/discovery"
)
@ -45,7 +47,7 @@ type ClientDeal struct {
}
type Client struct {
cs *store.ChainStore
sm *stmgr.StateManager
h host.Host
w *wallet.Wallet
dag dtypes.ClientDAG
@ -67,9 +69,9 @@ type clientDealUpdate struct {
err error
}
func NewClient(cs *store.ChainStore, h host.Host, w *wallet.Wallet, ds dtypes.MetadataDS, dag dtypes.ClientDAG, discovery *discovery.Local) *Client {
func NewClient(sm *stmgr.StateManager, h host.Host, w *wallet.Wallet, ds dtypes.MetadataDS, dag dtypes.ClientDAG, discovery *discovery.Local) *Client {
c := &Client{
cs: cs,
sm: sm,
h: h,
w: w,
dag: dag,
@ -231,6 +233,39 @@ func (c *Client) Start(ctx context.Context, p ClientDealProposal, vd *actors.Pie
})
}
func (c *Client) QueryAsk(ctx context.Context, p peer.ID, a address.Address) (*types.SignedStorageAsk, error) {
s, err := c.h.NewStream(ctx, p, AskProtocolID)
if err != nil {
return nil, err
}
req := &AskRequest{
Miner: a,
}
if err := cborrpc.WriteCborRPC(s, req); err != nil {
return nil, xerrors.Errorf("failed to send ask request: %w", err)
}
var out AskResponse
if err := cborrpc.ReadCborRPC(s, &out); err != nil {
return nil, xerrors.Errorf("failed to read ask response: %w", err)
}
if out.Ask == nil {
return nil, xerrors.Errorf("got no ask back")
}
if out.Ask.Ask.Miner != a {
return nil, xerrors.Errorf("got back ask for wrong miner")
}
if err := c.checkAskSignature(out.Ask); err != nil {
return nil, xerrors.Errorf("ask was not properly signed")
}
return out.Ask, nil
}
func (c *Client) List() ([]ClientDeal, error) {
return c.deals.ListClient()
}

View File

@ -3,13 +3,15 @@ package deals
import (
"context"
"math"
"sync"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
cid "github.com/ipfs/go-cid"
datastore "github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
cbor "github.com/ipfs/go-ipld-cbor"
inet "github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-lotus/api"
"github.com/filecoin-project/go-lotus/chain/address"
@ -37,6 +39,10 @@ type MinerDeal struct {
type Handler struct {
pricePerByteBlock types.BigInt // how much we want for storing one byte for one block
minPieceSize uint64
ask *types.SignedStorageAsk
askLk sync.Mutex
secst *sectorblocks.SectorBlocks
full api.FullNode
@ -46,6 +52,8 @@ type Handler struct {
dag dtypes.StagingDAG
deals MinerStateStore
ds dtypes.MetadataDS
conns map[cid.Cid]inet.Stream
actor address.Address
@ -73,12 +81,13 @@ func NewHandler(ds dtypes.MetadataDS, secst *sectorblocks.SectorBlocks, dag dtyp
return nil, err
}
return &Handler{
h := &Handler{
secst: secst,
dag: dag,
full: fullNode,
pricePerByteBlock: types.NewInt(3), // TODO: allow setting
minPieceSize: 1,
conns: map[cid.Cid]inet.Stream{},
@ -90,7 +99,22 @@ func NewHandler(ds dtypes.MetadataDS, secst *sectorblocks.SectorBlocks, dag dtyp
actor: minerAddress,
deals: MinerStateStore{StateStore{ds: namespace.Wrap(ds, datastore.NewKey("/deals/client"))}},
}, nil
ds: ds,
}
if err := h.tryLoadAsk(); err != nil {
return nil, err
}
if h.ask == nil {
// TODO: we should be fine with this state, and just say it means 'not actively accepting deals'
// for now... lets just set a price
if err := h.SetPrice(types.NewInt(3), 1000000); err != nil {
return nil, xerrors.Errorf("failed setting a default price: %w", err)
}
}
return h, nil
}
func (h *Handler) Run(ctx context.Context) {

View File

@ -2,9 +2,10 @@ package deals
import (
"context"
"github.com/filecoin-project/go-lotus/api"
"runtime"
"github.com/filecoin-project/go-lotus/api"
"github.com/filecoin-project/go-lotus/chain/actors"
"github.com/filecoin-project/go-lotus/chain/address"
"github.com/filecoin-project/go-lotus/chain/types"
@ -73,16 +74,7 @@ func (h *Handler) sendSignedResponse(resp StorageDealResponse) error {
return xerrors.Errorf("serializing response: %w", err)
}
getworker := &types.Message{
To: h.actor,
From: h.actor,
Method: actors.MAMethods.GetWorkerAddr,
}
r, err := h.full.StateCall(context.TODO(), getworker, nil)
if err != nil {
return xerrors.Errorf("getting worker address: %w", err)
}
worker, err := address.NewFromBytes(r.Return)
worker, err := h.getWorker(h.actor)
if err != nil {
return err
}
@ -105,3 +97,21 @@ func (h *Handler) sendSignedResponse(resp StorageDealResponse) error {
}
return err
}
func (h *Handler) getWorker(miner address.Address) (address.Address, error) {
getworker := &types.Message{
To: miner,
From: miner,
Method: actors.MAMethods.GetWorkerAddr,
}
r, err := h.full.StateCall(context.TODO(), getworker, nil)
if err != nil {
return address.Undef, xerrors.Errorf("getting worker address: %w", err)
}
if r.ExitCode != 0 {
return address.Undef, xerrors.Errorf("getWorker call failed: %d", r.ExitCode)
}
return address.NewFromBytes(r.Return)
}

View File

@ -18,9 +18,13 @@ func init() {
cbor.RegisterCborType(StorageDealResponse{})
cbor.RegisterCborType(SignedStorageDealResponse{})
cbor.RegisterCborType(AskRequest{})
cbor.RegisterCborType(AskResponse{})
}
const ProtocolID = "/fil/storage/mk/1.0.0"
const AskProtocolID = "/fil/storage/ask/1.0.0"
type SerializationMode string
@ -78,3 +82,11 @@ type SignedStorageDealResponse struct {
Signature *types.Signature
}
type AskRequest struct {
Miner address.Address
}
type AskResponse struct {
Ask *types.SignedStorageAsk
}

25
chain/types/ask.go Normal file
View File

@ -0,0 +1,25 @@
package types
import (
"github.com/filecoin-project/go-lotus/chain/address"
cbor "github.com/ipfs/go-ipld-cbor"
)
func init() {
cbor.RegisterCborType(SignedStorageAsk{})
cbor.RegisterCborType(StorageAsk{})
}
type SignedStorageAsk struct {
Ask *StorageAsk
Signature *Signature
}
type StorageAsk struct {
Price BigInt
MinPieceSize uint64
Miner address.Address
Timestamp int64
Expiry int64
SeqNo uint64
}

View File

@ -5,9 +5,11 @@ import (
"strconv"
"github.com/ipfs/go-cid"
"github.com/libp2p/go-libp2p-core/peer"
"golang.org/x/xerrors"
"gopkg.in/urfave/cli.v2"
actors "github.com/filecoin-project/go-lotus/chain/actors"
"github.com/filecoin-project/go-lotus/chain/address"
"github.com/filecoin-project/go-lotus/chain/types"
)
@ -21,6 +23,7 @@ var clientCmd = &cli.Command{
clientDealCmd,
clientFindCmd,
clientRetrieveCmd,
clientQueryAskCmd,
},
}
@ -206,3 +209,69 @@ var clientRetrieveCmd = &cli.Command{
return err
},
}
var clientQueryAskCmd = &cli.Command{
Name: "query-ask",
Usage: "find a miners ask",
Flags: []cli.Flag{
&cli.StringFlag{
Name: "peerid",
Usage: "specify peer ID of node to make query against",
},
},
Action: func(cctx *cli.Context) error {
if cctx.NArg() != 1 {
fmt.Println("Usage: query-ask [address]")
return nil
}
maddr, err := address.NewFromString(cctx.Args().First())
if err != nil {
return err
}
api, err := GetFullNodeAPI(cctx)
if err != nil {
return err
}
ctx := ReqContext(cctx)
var pid peer.ID
if pidstr := cctx.String("peerid"); pidstr != "" {
p, err := peer.IDFromString(pidstr)
if err != nil {
return err
}
pid = p
} else {
ret, err := api.StateCall(ctx, &types.Message{
To: maddr,
From: maddr,
Method: actors.MAMethods.GetPeerID,
}, nil)
if err != nil {
return xerrors.Errorf("failed to get peerID for miner: %w", err)
}
if ret.ExitCode != 0 {
return fmt.Errorf("call to GetPeerID was unsuccesful (exit code %d)", ret.ExitCode)
}
p, err := peer.IDFromBytes(ret.Return)
if err != nil {
return err
}
pid = p
}
ask, err := api.ClientQueryAsk(ctx, pid, maddr)
if err != nil {
return err
}
fmt.Printf("Ask: %s\n", maddr)
fmt.Printf("Price: %s\n", ask.Ask.Price)
return nil
},
}

View File

@ -251,3 +251,7 @@ func (a *ClientAPI) ClientRetrieve(ctx context.Context, order api.RetrievalOrder
return outFile.Close()
}
func (a *ClientAPI) ClientQueryAsk(ctx context.Context, p peer.ID, miner address.Address) (*types.SignedStorageAsk, error) {
return a.DealClient.QueryAsk(ctx, p, miner)
}

View File

@ -104,6 +104,7 @@ func HandleDeals(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, h *de
OnStart: func(context.Context) error {
h.Run(ctx)
host.SetStreamHandler(deals.ProtocolID, h.HandleStream)
host.SetStreamHandler(deals.AskProtocolID, h.HandleAskStream)
return nil
},
OnStop: func(context.Context) error {