deals: Cleanup client a bit

This commit is contained in:
Łukasz Magiera 2019-08-07 21:48:53 +02:00 committed by whyrusleeping
parent 25dbdd761a
commit 821e03bcd7
3 changed files with 76 additions and 67 deletions

View File

@ -2,10 +2,7 @@ package deals
import (
"context"
"io"
"io/ioutil"
"math"
"os"
sectorbuilder "github.com/filecoin-project/go-sectorbuilder"
"github.com/ipfs/go-cid"
@ -16,6 +13,7 @@ import (
logging "github.com/ipfs/go-log"
unixfile "github.com/ipfs/go-unixfs/file"
"github.com/libp2p/go-libp2p-core/host"
inet "github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"golang.org/x/xerrors"
@ -101,46 +99,92 @@ func (c *Client) Run() {
}()
}
func (c *Client) Start(ctx context.Context, data cid.Cid, totalPrice types.BigInt, from address.Address, miner address.Address, minerID peer.ID, blocksDuration uint64) (cid.Cid, error) {
// TODO: Eww
func (c *Client) commP(ctx context.Context, data cid.Cid) ([]byte, int64, error) {
root, err := c.dag.Get(ctx, data)
if err != nil {
log.Errorf("failed to get file root for deal: %s", err)
return cid.Undef, err
return nil, 0, err
}
n, err := unixfile.NewUnixfsFile(ctx, c.dag, root)
if err != nil {
log.Errorf("cannot open unixfs file: %s", err)
return cid.Undef, err
return nil, 0, err
}
uf, ok := n.(files.File)
if !ok {
// TODO: we probably got directory, how should we handle this in unixfs mode?
return cid.Undef, xerrors.New("unsupported unixfs type")
return nil, 0, xerrors.New("unsupported unixfs type")
}
size, err := uf.Size()
if err != nil {
return cid.Cid{}, err
return nil, 0, err
}
f, err := ioutil.TempFile(os.TempDir(), "commP-temp-")
var commP [sectorbuilder.CommitmentBytesLen]byte
err = withTemp(uf, func(f string) error {
commP, err = sectorbuilder.GeneratePieceCommitment(f, uint64(size))
return err
})
return commP[:], size, err
}
func (c *Client) sendProposal(s inet.Stream, proposal StorageDealProposal, from address.Address) error {
log.Info("Sending deal proposal")
msg, err := cbor.DumpObject(proposal)
if err != nil {
return cid.Undef, err
return err
}
if _, err := io.Copy(f, uf); err != nil {
return cid.Undef, err
}
if err := f.Close(); err != nil {
return cid.Undef, err
}
commP, err := sectorbuilder.GeneratePieceCommitment(f.Name(), uint64(size))
sig, err := c.w.Sign(from, msg)
if err != nil {
return cid.Undef, err
return err
}
if err := os.Remove(f.Name()); err != nil {
signedProposal := &SignedStorageDealProposal{
Proposal: proposal,
Signature: sig,
}
return cborrpc.WriteCborRPC(s, signedProposal)
}
func (c *Client) waitAccept(s inet.Stream, proposal StorageDealProposal, minerID peer.ID) (ClientDeal, error) {
log.Info("Waiting for response")
var resp SignedStorageDealResponse
if err := cborrpc.ReadCborRPC(s, &resp); err != nil {
log.Errorw("failed to read StorageDealResponse message", "error", err)
return ClientDeal{}, err
}
// TODO: verify signature
if resp.Response.State != Accepted {
return ClientDeal{}, xerrors.Errorf("Deal wasn't accepted (State=%d)", resp.Response.State)
}
proposalNd, err := cbor.WrapObject(proposal, math.MaxUint64, -1)
if err != nil {
return ClientDeal{}, err
}
if resp.Response.Proposal != proposalNd.Cid() {
return ClientDeal{}, xerrors.New("miner responded to a wrong proposal")
}
return ClientDeal{
ProposalCid: proposalNd.Cid(),
Status: DealResolvingMiner,
Miner: minerID,
}, nil
}
func (c *Client) Start(ctx context.Context, data cid.Cid, totalPrice types.BigInt, from address.Address, miner address.Address, minerID peer.ID, blocksDuration uint64) (cid.Cid, error) {
commP, size, err := c.commP(ctx, data)
if err != nil {
return cid.Undef, err
}
@ -151,7 +195,7 @@ func (c *Client) Start(ctx context.Context, data cid.Cid, totalPrice types.BigIn
PieceRef: data.String(),
SerializationMode: SerializationUnixFs,
CommP: commP[:],
Size: 6,
Size: uint64(size),
TotalPrice: totalPrice,
Duration: blocksDuration,
Payment: PaymentInfo{
@ -168,57 +212,22 @@ func (c *Client) Start(ctx context.Context, data cid.Cid, totalPrice types.BigIn
if err != nil {
return cid.Undef, err
}
defer s.Close() // TODO: not too soon?
defer s.Reset() // TODO: handle other updates
log.Info("Sending deal proposal")
msg, err := cbor.DumpObject(proposal)
if err != nil {
if err := c.sendProposal(s, proposal, from); err != nil {
return cid.Undef, err
}
sig, err := c.w.Sign(from, msg)
deal, err := c.waitAccept(s, proposal, minerID)
if err != nil {
return cid.Undef, err
}
signedProposal := &SignedStorageDealProposal{
Proposal: proposal,
Signature: sig,
}
log.Info("DEAL ACCEPTED!")
if err := cborrpc.WriteCborRPC(s, signedProposal); err != nil {
return cid.Undef, err
}
log.Info("Reading response")
var resp SignedStorageDealResponse
if err := cborrpc.ReadCborRPC(s, &resp); err != nil {
log.Errorw("failed to read StorageDealResponse message", "error", err)
return cid.Undef, err
}
// TODO: verify signature
if resp.Response.State != Accepted {
return cid.Undef, xerrors.Errorf("Deal wasn't accepted (State=%d)", resp.Response.State)
}
log.Info("Registering deal")
proposalNd, err := cbor.WrapObject(proposal, math.MaxUint64, -1)
if err != nil {
return cid.Undef, err
}
deal := ClientDeal{
ProposalCid: proposalNd.Cid(),
Status: DealResolvingMiner,
Miner: minerID,
}
c.incoming <- deal
return proposalNd.Cid(), nil
// TODO: actually care about what happens with the deal after it was accepted
//c.incoming <- deal
return deal.ProposalCid, nil
}
func (c *Client) Stop() {

View File

@ -59,7 +59,7 @@ func (h *Handler) staged(ctx context.Context, deal MinerDeal) error {
Proposal: deal.ProposalCid,
})
if err != nil {
return err
log.Warnf("Sending deal response failed: %s", err)
}
root, err := h.dag.Get(ctx, deal.Ref)

View File

@ -21,7 +21,7 @@ func (h *Handler) failDeal(id cid.Cid, cerr error) {
cerr = xerrors.Errorf("unknown error (fail called at %s:%d)", f, l)
}
log.Error("Deal %s failed: %s", id, cerr)
log.Errorf("deal %s failed: %s", id, cerr)
err := h.sendSignedResponse(StorageDealResponse{
State: Failed,
@ -76,7 +76,7 @@ func (h *Handler) sendSignedResponse(resp StorageDealResponse) error {
if len(def) != 1 {
// NOTE: If this ever happens for a good reason, implement this with GetWorker on the miner actor
// TODO: implement with GetWorker on the miner actor
return xerrors.Errorf("Expected only 1 address in wallet, got %d", len(def))
return xerrors.Errorf("expected only 1 address in wallet, got %d", len(def))
}
sig, err := h.w.Sign(def[0], msg)