deals: Better client deal tracking

This commit is contained in:
Łukasz Magiera 2019-09-10 14:35:43 +02:00
parent bfd4624712
commit 72a406ec7e
8 changed files with 305 additions and 115 deletions

View File

@ -4,25 +4,20 @@ import (
"context"
"math"
sectorbuilder "github.com/filecoin-project/go-sectorbuilder"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
files "github.com/ipfs/go-ipfs-files"
cbor "github.com/ipfs/go-ipld-cbor"
logging "github.com/ipfs/go-log"
unixfile "github.com/ipfs/go-unixfs/file"
"github.com/libp2p/go-libp2p-core/host"
inet "github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-lotus/chain/actors"
"github.com/filecoin-project/go-lotus/chain/address"
"github.com/filecoin-project/go-lotus/chain/store"
"github.com/filecoin-project/go-lotus/chain/types"
"github.com/filecoin-project/go-lotus/chain/wallet"
"github.com/filecoin-project/go-lotus/lib/cborrpc"
"github.com/filecoin-project/go-lotus/node/modules/dtypes"
"github.com/filecoin-project/go-lotus/retrieval/discovery"
)
@ -33,18 +28,13 @@ func init() {
var log = logging.Logger("deals")
const ProtocolID = "/fil/storage/mk/1.0.0"
type DealStatus int
const (
DealResolvingMiner = DealStatus(iota)
)
type ClientDeal struct {
ProposalCid cid.Cid
Status DealStatus
Proposal StorageDealProposal
State DealState
Miner peer.ID
s inet.Stream
}
type Client struct {
@ -55,13 +45,21 @@ type Client struct {
discovery *discovery.Local
deals StateStore
conns map[cid.Cid]inet.Stream
incoming chan ClientDeal
updated chan clientDealUpdate
stop chan struct{}
stopped chan struct{}
}
type clientDealUpdate struct {
newState DealState
id cid.Cid
err error
}
func NewClient(cs *store.ChainStore, h host.Host, w *wallet.Wallet, ds dtypes.MetadataDS, dag dtypes.ClientDAG, discovery *discovery.Local) *Client {
c := &Client{
cs: cs,
@ -71,8 +69,10 @@ func NewClient(cs *store.ChainStore, h host.Host, w *wallet.Wallet, ds dtypes.Me
discovery: discovery,
deals: StateStore{ds: namespace.Wrap(ds, datastore.NewKey("/deals/client"))},
conns: map[cid.Cid]inet.Stream{},
incoming: make(chan ClientDeal, 16),
updated: make(chan clientDealUpdate, 16),
stop: make(chan struct{}),
stopped: make(chan struct{}),
@ -81,21 +81,16 @@ func NewClient(cs *store.ChainStore, h host.Host, w *wallet.Wallet, ds dtypes.Me
return c
}
func (c *Client) Run() {
func (c *Client) Run(ctx context.Context) {
go func() {
defer close(c.stopped)
for {
select {
case deal := <-c.incoming:
log.Info("incoming deal")
// TODO: track in datastore
if err := c.deals.Begin(deal.ProposalCid, deal); err != nil {
log.Errorf("deal state begin failed: %s", err)
continue
}
c.onIncoming(deal)
case update := <-c.updated:
c.onUpdated(ctx, update)
case <-c.stop:
return
}
@ -103,87 +98,59 @@ func (c *Client) Run() {
}()
}
func (c *Client) commP(ctx context.Context, data cid.Cid) ([]byte, int64, error) {
root, err := c.dag.Get(ctx, data)
if err != nil {
log.Errorf("failed to get file root for deal: %s", err)
return nil, 0, err
func (c *Client) onIncoming(deal ClientDeal) {
log.Info("incoming deal")
if _, ok := c.conns[deal.ProposalCid]; ok {
log.Errorf("tracking deal connection: already tracking connection for deal %s", deal.ProposalCid)
return
}
c.conns[deal.ProposalCid] = deal.s
if err := c.deals.Begin(deal.ProposalCid, deal); err != nil {
// We may have re-sent the proposal
log.Errorf("deal tracking failed: %s", err)
c.failDeal(deal.ProposalCid, err)
return
}
n, err := unixfile.NewUnixfsFile(ctx, c.dag, root)
if err != nil {
log.Errorf("cannot open unixfs file: %s", err)
return nil, 0, err
}
go func() {
c.updated <- clientDealUpdate{
newState: Unknown,
id: deal.ProposalCid,
err: nil,
}
}()
}
uf, ok := n.(files.File)
if !ok {
// TODO: we probably got directory, how should we handle this in unixfs mode?
return nil, 0, xerrors.New("unsupported unixfs type")
func (c *Client) onUpdated(ctx context.Context, update clientDealUpdate) {
log.Infof("Deal %s updated state to %d", update.id, update.newState)
if update.err != nil {
log.Errorf("deal %s failed: %s", update.id, update.err)
c.failDeal(update.id, update.err)
return
}
size, err := uf.Size()
if err != nil {
return nil, 0, err
}
var commP [sectorbuilder.CommitmentBytesLen]byte
err = withTemp(uf, func(f string) error {
commP, err = sectorbuilder.GeneratePieceCommitment(f, uint64(size))
return err
var deal ClientDeal
err := c.deals.MutateClient(update.id, func(d *ClientDeal) error {
d.State = update.newState
deal = *d
return nil
})
return commP[:], size, err
}
func (c *Client) sendProposal(s inet.Stream, proposal StorageDealProposal, from address.Address) error {
log.Info("Sending deal proposal")
msg, err := cbor.DumpObject(proposal)
if err != nil {
return err
}
sig, err := c.w.Sign(context.TODO(), from, msg)
if err != nil {
return err
c.failDeal(update.id, err)
return
}
signedProposal := &SignedStorageDealProposal{
Proposal: proposal,
Signature: sig,
switch update.newState {
case Unknown: // new
c.handle(ctx, deal, c.new, Accepted)
case Accepted:
c.handle(ctx, deal, c.accepted, Staged)
case Staged:
c.handle(ctx, deal, c.staged, Sealing)
case Sealing:
c.handle(ctx, deal, c.sealing, Complete)
}
return cborrpc.WriteCborRPC(s, signedProposal)
}
func (c *Client) waitAccept(s inet.Stream, proposal StorageDealProposal, minerID peer.ID) (ClientDeal, error) {
log.Info("Waiting for response")
var resp SignedStorageDealResponse
if err := cborrpc.ReadCborRPC(s, &resp); err != nil {
log.Errorw("failed to read StorageDealResponse message", "error", err)
return ClientDeal{}, err
}
// TODO: verify signature
if resp.Response.State != Accepted {
return ClientDeal{}, xerrors.Errorf("Deal wasn't accepted (State=%d)", resp.Response.State)
}
proposalNd, err := cbor.WrapObject(proposal, math.MaxUint64, -1)
if err != nil {
return ClientDeal{}, err
}
if resp.Response.Proposal != proposalNd.Cid() {
return ClientDeal{}, xerrors.New("miner responded to a wrong proposal")
}
return ClientDeal{
ProposalCid: proposalNd.Cid(),
Status: DealResolvingMiner,
Miner: minerID,
}, nil
}
type ClientDealProposal struct {
@ -228,21 +195,27 @@ func (c *Client) Start(ctx context.Context, p ClientDealProposal, vd *actors.Pie
if err != nil {
return cid.Undef, err
}
defer s.Reset() // TODO: handle other updates
if err := c.sendProposal(s, proposal, p.ClientAddress); err != nil {
return cid.Undef, err
}
deal, err := c.waitAccept(s, proposal, p.MinerID)
proposalNd, err := cbor.WrapObject(proposal, math.MaxUint64, -1)
if err != nil {
return cid.Undef, err
}
log.Info("DEAL ACCEPTED!")
deal := ClientDeal{
ProposalCid: proposalNd.Cid(),
Proposal: proposal,
State: Unknown,
Miner: p.MinerID,
s: s,
}
// TODO: actually care about what happens with the deal after it was accepted
//c.incoming <- deal
c.incoming <- deal
// TODO: start tracking after the deal is sealed
return deal.ProposalCid, c.discovery.AddPeer(p.Data, discovery.RetrievalPeer{

View File

@ -0,0 +1,99 @@
package deals
import (
"context"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-lotus/build"
"github.com/filecoin-project/go-lotus/lib/sectorbuilder"
)
type clientHandlerFunc func(ctx context.Context, deal ClientDeal) error
func (c *Client) handle(ctx context.Context, deal ClientDeal, cb clientHandlerFunc, next DealState) {
go func() {
err := cb(ctx, deal)
select {
case c.updated <- clientDealUpdate{
newState: next,
id: deal.ProposalCid,
err: err,
}:
case <-c.stop:
}
}()
}
func (c *Client) new(ctx context.Context, deal ClientDeal) error {
resp, err := c.readStorageDealResp(deal)
if err != nil {
return err
}
if resp.State != Accepted {
return xerrors.Errorf("deal wasn't accepted (State=%d)", resp.State)
}
log.Info("DEAL ACCEPTED!")
return nil
}
func (c *Client) accepted(ctx context.Context, deal ClientDeal) error {
/* data transfer happens */
resp, err := c.readStorageDealResp(deal)
if err != nil {
return err
}
if resp.State != Staged {
return xerrors.Errorf("deal wasn't staged (State=%d)", resp.State)
}
log.Info("DEAL STAGED!")
return nil
}
func (c *Client) staged(ctx context.Context, deal ClientDeal) error {
/* miner seals our data, hopefully */
resp, err := c.readStorageDealResp(deal)
if err != nil {
return err
}
if resp.State != Sealing {
return xerrors.Errorf("deal wasn't sealed (State=%d)", resp.State)
}
log.Info("DEAL SEALED!")
ok, err := sectorbuilder.VerifyPieceInclusionProof(build.SectorSize, deal.Proposal.Size, deal.Proposal.CommP, resp.CommD, resp.PieceInclusionProof.ProofElements)
if err != nil {
return xerrors.Errorf("verifying piece inclusion proof in staged deal %s: %w", deal.ProposalCid, err)
}
if !ok {
return xerrors.Errorf("verifying piece inclusion proof in staged deal %s failed", deal.ProposalCid)
}
return nil
}
func (c *Client) sealing(ctx context.Context, deal ClientDeal) error {
resp, err := c.readStorageDealResp(deal)
if err != nil {
return err
}
if resp.State != Complete {
return xerrors.Errorf("deal wasn't complete (State=%d)", resp.State)
}
// TODO: look for the commit message on chain, negotiate better payment vouchers
log.Info("DEAL COMPLETE!!")
return nil
}

111
chain/deals/client_utils.go Normal file
View File

@ -0,0 +1,111 @@
package deals
import (
"context"
"runtime"
sectorbuilder "github.com/filecoin-project/go-sectorbuilder"
"github.com/ipfs/go-cid"
files "github.com/ipfs/go-ipfs-files"
cbor "github.com/ipfs/go-ipld-cbor"
unixfile "github.com/ipfs/go-unixfs/file"
inet "github.com/libp2p/go-libp2p-core/network"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-lotus/chain/address"
"github.com/filecoin-project/go-lotus/lib/cborrpc"
)
func (c *Client) failDeal(id cid.Cid, cerr error) {
if err := c.deals.End(id); err != nil {
log.Warnf("deals.End: %s", err)
}
if cerr == nil {
_, f, l, _ := runtime.Caller(1)
cerr = xerrors.Errorf("unknown error (fail called at %s:%d)", f, l)
}
s, ok := c.conns[id]
if ok {
_ = s.Reset()
delete(c.conns, id)
}
// TODO: store in some sort of audit log
log.Errorf("deal %s failed: %s", id, cerr)
}
func (c *Client) commP(ctx context.Context, data cid.Cid) ([]byte, int64, error) {
root, err := c.dag.Get(ctx, data)
if err != nil {
log.Errorf("failed to get file root for deal: %s", err)
return nil, 0, err
}
n, err := unixfile.NewUnixfsFile(ctx, c.dag, root)
if err != nil {
log.Errorf("cannot open unixfs file: %s", err)
return nil, 0, err
}
uf, ok := n.(files.File)
if !ok {
// TODO: we probably got directory, how should we handle this in unixfs mode?
return nil, 0, xerrors.New("unsupported unixfs type")
}
size, err := uf.Size()
if err != nil {
return nil, 0, err
}
var commP [sectorbuilder.CommitmentBytesLen]byte
err = withTemp(uf, func(f string) error {
commP, err = sectorbuilder.GeneratePieceCommitment(f, uint64(size))
return err
})
return commP[:], size, err
}
func (c *Client) sendProposal(s inet.Stream, proposal StorageDealProposal, from address.Address) error {
log.Info("Sending deal proposal")
msg, err := cbor.DumpObject(proposal)
if err != nil {
return err
}
sig, err := c.w.Sign(context.TODO(), from, msg)
if err != nil {
return err
}
signedProposal := &SignedStorageDealProposal{
Proposal: proposal,
Signature: sig,
}
return cborrpc.WriteCborRPC(s, signedProposal)
}
func (c *Client) readStorageDealResp(deal ClientDeal) (*StorageDealResponse, error) {
s, ok := c.conns[deal.ProposalCid]
if !ok {
// TODO: Try to re-establish the connection using query protocol
return nil, xerrors.Errorf("no connection to miner")
}
var resp SignedStorageDealResponse
if err := cborrpc.ReadCborRPC(s, &resp); err != nil {
log.Errorw("failed to read StorageDealResponse message", "error", err)
return nil, err
}
// TODO: verify signature
if resp.Response.Proposal != deal.ProposalCid {
return nil, xerrors.New("miner responded to a wrong proposal")
}
return &resp.Response, nil
}

View File

@ -2,19 +2,20 @@ package deals
import (
"context"
"github.com/filecoin-project/go-lotus/chain/types"
"github.com/filecoin-project/go-lotus/storage/sectorblocks"
"math"
"github.com/filecoin-project/go-lotus/api"
"github.com/filecoin-project/go-lotus/chain/address"
"github.com/filecoin-project/go-lotus/node/modules/dtypes"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
cbor "github.com/ipfs/go-ipld-cbor"
inet "github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/filecoin-project/go-lotus/api"
"github.com/filecoin-project/go-lotus/chain/address"
"github.com/filecoin-project/go-lotus/chain/types"
"github.com/filecoin-project/go-lotus/node/modules/dtypes"
"github.com/filecoin-project/go-lotus/storage/sectorblocks"
)
func init() {
@ -50,12 +51,12 @@ type Handler struct {
actor address.Address
incoming chan MinerDeal
updated chan dealUpdate
updated chan minerDealUpdate
stop chan struct{}
stopped chan struct{}
}
type dealUpdate struct {
type minerDealUpdate struct {
newState DealState
id cid.Cid
err error
@ -82,7 +83,7 @@ func NewHandler(ds dtypes.MetadataDS, secst *sectorblocks.SectorBlocks, dag dtyp
conns: map[cid.Cid]inet.Stream{},
incoming: make(chan MinerDeal),
updated: make(chan dealUpdate),
updated: make(chan minerDealUpdate),
stop: make(chan struct{}),
stopped: make(chan struct{}),
@ -125,7 +126,7 @@ func (h *Handler) onIncoming(deal MinerDeal) {
}
go func() {
h.updated <- dealUpdate{
h.updated <- minerDealUpdate{
newState: Accepted,
id: deal.ProposalCid,
err: nil,
@ -133,7 +134,7 @@ func (h *Handler) onIncoming(deal MinerDeal) {
}()
}
func (h *Handler) onUpdated(ctx context.Context, update dealUpdate) {
func (h *Handler) onUpdated(ctx context.Context, update minerDealUpdate) {
log.Infof("Deal %s updated state to %d", update.id, update.newState)
if update.err != nil {
log.Errorf("deal %s failed: %s", update.id, update.err)

View File

@ -16,13 +16,13 @@ import (
"github.com/filecoin-project/go-lotus/storage/sectorblocks"
)
type handlerFunc func(ctx context.Context, deal MinerDeal) (func(*MinerDeal), error)
type minerHandlerFunc func(ctx context.Context, deal MinerDeal) (func(*MinerDeal), error)
func (h *Handler) handle(ctx context.Context, deal MinerDeal, cb handlerFunc, next DealState) {
func (h *Handler) handle(ctx context.Context, deal MinerDeal, cb minerHandlerFunc, next DealState) {
go func() {
mut, err := cb(ctx, deal)
select {
case h.updated <- dealUpdate{
case h.updated <- minerDealUpdate{
newState: next,
id: deal.ProposalCid,
err: err,
@ -237,6 +237,7 @@ func (h *Handler) sealing(ctx context.Context, deal MinerDeal) (func(*MinerDeal)
State: Sealing,
Proposal: deal.ProposalCid,
PieceInclusionProof: ip,
CommD: status.CommD[:],
})
if err != nil {
log.Warnf("Sending deal response failed: %s", err)

View File

@ -35,7 +35,7 @@ func (h *Handler) failDeal(id cid.Cid, cerr error) {
s, ok := h.conns[id]
if ok {
_ = s.Close()
_ = s.Reset()
delete(h.conns, id)
}

View File

@ -19,6 +19,8 @@ func init() {
cbor.RegisterCborType(SignedStorageDealResponse{})
}
const ProtocolID = "/fil/storage/mk/1.0.0"
type SerializationMode string
const (
@ -77,6 +79,7 @@ type StorageDealResponse struct {
// Sealing
PieceInclusionProof PieceInclusionProof
CommD []byte // TODO: not in spec
// Complete
SectorCommitMessage *cid.Cid

View File

@ -59,10 +59,12 @@ func HandleIncomingMessages(mctx helpers.MetricsCtx, lc fx.Lifecycle, pubsub *pu
go sub.HandleIncomingMessages(ctx, mpool, msgsub)
}
func RunDealClient(lc fx.Lifecycle, c *deals.Client) {
func RunDealClient(mctx helpers.MetricsCtx, lc fx.Lifecycle, c *deals.Client) {
ctx := helpers.LifecycleCtx(mctx, lc)
lc.Append(fx.Hook{
OnStart: func(context.Context) error {
c.Run()
c.Run(ctx)
return nil
},
OnStop: func(context.Context) error {