deals: Use real data in client
This commit is contained in:
parent
c79cb7bf31
commit
7423327b26
@ -2,6 +2,7 @@ package deals
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"io"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"math"
|
"math"
|
||||||
"os"
|
"os"
|
||||||
@ -10,8 +11,10 @@ import (
|
|||||||
"github.com/ipfs/go-cid"
|
"github.com/ipfs/go-cid"
|
||||||
"github.com/ipfs/go-datastore"
|
"github.com/ipfs/go-datastore"
|
||||||
"github.com/ipfs/go-datastore/namespace"
|
"github.com/ipfs/go-datastore/namespace"
|
||||||
|
files "github.com/ipfs/go-ipfs-files"
|
||||||
cbor "github.com/ipfs/go-ipld-cbor"
|
cbor "github.com/ipfs/go-ipld-cbor"
|
||||||
logging "github.com/ipfs/go-log"
|
logging "github.com/ipfs/go-log"
|
||||||
|
unixfile "github.com/ipfs/go-unixfs/file"
|
||||||
"github.com/libp2p/go-libp2p-core/host"
|
"github.com/libp2p/go-libp2p-core/host"
|
||||||
"github.com/libp2p/go-libp2p-core/peer"
|
"github.com/libp2p/go-libp2p-core/peer"
|
||||||
"golang.org/x/xerrors"
|
"golang.org/x/xerrors"
|
||||||
@ -24,6 +27,10 @@ import (
|
|||||||
"github.com/filecoin-project/go-lotus/node/modules/dtypes"
|
"github.com/filecoin-project/go-lotus/node/modules/dtypes"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
cbor.RegisterCborType(ClientDeal{})
|
||||||
|
}
|
||||||
|
|
||||||
var log = logging.Logger("deals")
|
var log = logging.Logger("deals")
|
||||||
|
|
||||||
const ProtocolID = "/fil/storage/mk/1.0.0"
|
const ProtocolID = "/fil/storage/mk/1.0.0"
|
||||||
@ -44,6 +51,7 @@ type Client struct {
|
|||||||
cs *store.ChainStore
|
cs *store.ChainStore
|
||||||
h host.Host
|
h host.Host
|
||||||
w *wallet.Wallet
|
w *wallet.Wallet
|
||||||
|
dag dtypes.ClientDAG
|
||||||
|
|
||||||
deals StateStore
|
deals StateStore
|
||||||
|
|
||||||
@ -53,11 +61,12 @@ type Client struct {
|
|||||||
stopped chan struct{}
|
stopped chan struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewClient(cs *store.ChainStore, h host.Host, w *wallet.Wallet, ds dtypes.MetadataDS) *Client {
|
func NewClient(cs *store.ChainStore, h host.Host, w *wallet.Wallet, ds dtypes.MetadataDS, dag dtypes.ClientDAG) *Client {
|
||||||
c := &Client{
|
c := &Client{
|
||||||
cs: cs,
|
cs: cs,
|
||||||
h: h,
|
h: h,
|
||||||
w: w,
|
w: w,
|
||||||
|
dag: dag,
|
||||||
|
|
||||||
deals: StateStore{ds: namespace.Wrap(ds, datastore.NewKey("/deals/client"))},
|
deals: StateStore{ds: namespace.Wrap(ds, datastore.NewKey("/deals/client"))},
|
||||||
|
|
||||||
@ -94,12 +103,29 @@ func (c *Client) Run() {
|
|||||||
|
|
||||||
func (c *Client) Start(ctx context.Context, data cid.Cid, totalPrice types.BigInt, from address.Address, miner address.Address, minerID peer.ID, blocksDuration uint64) (cid.Cid, error) {
|
func (c *Client) Start(ctx context.Context, data cid.Cid, totalPrice types.BigInt, from address.Address, miner address.Address, minerID peer.ID, blocksDuration uint64) (cid.Cid, error) {
|
||||||
// TODO: Eww
|
// TODO: Eww
|
||||||
|
root, err := c.dag.Get(ctx, data)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("failed to get file root for deal: %s", err)
|
||||||
|
return cid.Undef, err
|
||||||
|
}
|
||||||
|
|
||||||
|
n, err := unixfile.NewUnixfsFile(ctx, c.dag, root)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("cannot open unixfs file: %s", err)
|
||||||
|
return cid.Undef, err
|
||||||
|
}
|
||||||
|
|
||||||
|
uf, ok := n.(files.File)
|
||||||
|
if !ok {
|
||||||
|
// TODO: we probably got directory, how should we handle this in unixfs mode?
|
||||||
|
return cid.Undef, xerrors.New("unsupported unixfs type")
|
||||||
|
}
|
||||||
|
|
||||||
f, err := ioutil.TempFile(os.TempDir(), "commP-temp-")
|
f, err := ioutil.TempFile(os.TempDir(), "commP-temp-")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return cid.Undef, err
|
return cid.Undef, err
|
||||||
}
|
}
|
||||||
_, err = f.Write([]byte("hello\n"))
|
if _, err := io.Copy(f, uf); err != nil {
|
||||||
if err != nil {
|
|
||||||
return cid.Undef, err
|
return cid.Undef, err
|
||||||
}
|
}
|
||||||
if err := f.Close(); err != nil {
|
if err := f.Close(); err != nil {
|
||||||
@ -117,8 +143,8 @@ func (c *Client) Start(ctx context.Context, data cid.Cid, totalPrice types.BigIn
|
|||||||
|
|
||||||
// TODO: use data
|
// TODO: use data
|
||||||
proposal := StorageDealProposal{
|
proposal := StorageDealProposal{
|
||||||
PieceRef: "bafkqabtimvwgy3yk", // identity 'hello\n'
|
PieceRef: data.String(),
|
||||||
SerializationMode: SerializationRaw,
|
SerializationMode: SerializationUnixFs,
|
||||||
CommP: commP[:],
|
CommP: commP[:],
|
||||||
Size: 6,
|
Size: 6,
|
||||||
TotalPrice: totalPrice,
|
TotalPrice: totalPrice,
|
||||||
|
@ -2,6 +2,7 @@ package deals
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"github.com/filecoin-project/go-lotus/chain/address"
|
||||||
"github.com/filecoin-project/go-lotus/lib/sectorbuilder"
|
"github.com/filecoin-project/go-lotus/lib/sectorbuilder"
|
||||||
"github.com/ipfs/go-cid"
|
"github.com/ipfs/go-cid"
|
||||||
"github.com/ipfs/go-datastore"
|
"github.com/ipfs/go-datastore"
|
||||||
@ -20,6 +21,10 @@ import (
|
|||||||
"github.com/libp2p/go-libp2p-core/peer"
|
"github.com/libp2p/go-libp2p-core/peer"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
cbor.RegisterCborType(MinerDeal{})
|
||||||
|
}
|
||||||
|
|
||||||
type MinerDeal struct {
|
type MinerDeal struct {
|
||||||
Client peer.ID
|
Client peer.ID
|
||||||
Proposal StorageDealProposal
|
Proposal StorageDealProposal
|
||||||
@ -41,21 +46,38 @@ type Handler struct {
|
|||||||
|
|
||||||
incoming chan MinerDeal
|
incoming chan MinerDeal
|
||||||
|
|
||||||
|
actor address.Address
|
||||||
|
|
||||||
stop chan struct{}
|
stop chan struct{}
|
||||||
stopped chan struct{}
|
stopped chan struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewHandler(w *wallet.Wallet, ds dtypes.MetadataDS, sb *sectorbuilder.SectorBuilder, dag dtypes.StagingDAG) *Handler {
|
func NewHandler(w *wallet.Wallet, ds dtypes.MetadataDS, sb *sectorbuilder.SectorBuilder, dag dtypes.StagingDAG) (*Handler, error) {
|
||||||
|
addr, err := ds.Get(datastore.NewKey("miner-address"))
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
minerAddress, err := address.NewFromBytes(addr)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
return &Handler{
|
return &Handler{
|
||||||
w: w,
|
w: w,
|
||||||
|
sb: sb,
|
||||||
dag: dag,
|
dag: dag,
|
||||||
|
|
||||||
|
incoming: make(chan MinerDeal),
|
||||||
|
|
||||||
|
actor: minerAddress,
|
||||||
|
|
||||||
deals: StateStore{ds: namespace.Wrap(ds, datastore.NewKey("/deals/client"))},
|
deals: StateStore{ds: namespace.Wrap(ds, datastore.NewKey("/deals/client"))},
|
||||||
}
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *Handler) Run(ctx context.Context) {
|
func (h *Handler) Run(ctx context.Context) {
|
||||||
go func() {
|
go func() {
|
||||||
|
defer log.Error("quitting deal handler loop")
|
||||||
defer close(h.stopped)
|
defer close(h.stopped)
|
||||||
fetched := make(chan cid.Cid)
|
fetched := make(chan cid.Cid)
|
||||||
|
|
||||||
@ -67,7 +89,7 @@ func (h *Handler) Run(ctx context.Context) {
|
|||||||
if err := h.deals.Begin(deal.ProposalCid, deal); err != nil {
|
if err := h.deals.Begin(deal.ProposalCid, deal); err != nil {
|
||||||
// TODO: This can happen when client re-sends proposal
|
// TODO: This can happen when client re-sends proposal
|
||||||
log.Errorf("deal tracking failed: %s", err)
|
log.Errorf("deal tracking failed: %s", err)
|
||||||
return
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
go func(id cid.Cid) {
|
go func(id cid.Cid) {
|
||||||
@ -104,7 +126,7 @@ func (h *Handler) Run(ctx context.Context) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
// TODO: fail deal
|
// TODO: fail deal
|
||||||
log.Errorf("failed to get file root for deal: %s", err)
|
log.Errorf("failed to get file root for deal: %s", err)
|
||||||
return
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: abstract this away into ReadSizeCloser + implement different modes
|
// TODO: abstract this away into ReadSizeCloser + implement different modes
|
||||||
@ -112,7 +134,7 @@ func (h *Handler) Run(ctx context.Context) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
// TODO: fail deal
|
// TODO: fail deal
|
||||||
log.Errorf("cannot open unixfs file: %s", err)
|
log.Errorf("cannot open unixfs file: %s", err)
|
||||||
return
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
f, ok := n.(files.File)
|
f, ok := n.(files.File)
|
||||||
@ -127,7 +149,7 @@ func (h *Handler) Run(ctx context.Context) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("failed to get file size: %s", err)
|
log.Errorf("failed to get file size: %s", err)
|
||||||
// TODO: fail deal
|
// TODO: fail deal
|
||||||
return
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: can we use pipes?
|
// TODO: can we use pipes?
|
||||||
@ -135,7 +157,7 @@ func (h *Handler) Run(ctx context.Context) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
// TODO: fail deal
|
// TODO: fail deal
|
||||||
log.Errorf("AddPiece failed: %s", err)
|
log.Errorf("AddPiece failed: %s", err)
|
||||||
return
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Warnf("New Sector: %d", sectorID)
|
log.Warnf("New Sector: %d", sectorID)
|
||||||
@ -162,6 +184,12 @@ func (h *Handler) HandleStream(s inet.Stream) {
|
|||||||
// TODO: Validate proposal maybe
|
// TODO: Validate proposal maybe
|
||||||
// (and signature, obviously)
|
// (and signature, obviously)
|
||||||
|
|
||||||
|
if proposal.Proposal.MinerAddress != h.actor {
|
||||||
|
log.Errorf("proposal with wrong MinerAddress: %s", proposal.Proposal.MinerAddress)
|
||||||
|
// TODO: send error
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
switch proposal.Proposal.SerializationMode {
|
switch proposal.Proposal.SerializationMode {
|
||||||
//case SerializationRaw:
|
//case SerializationRaw:
|
||||||
//case SerializationIPLD:
|
//case SerializationIPLD:
|
||||||
@ -175,6 +203,7 @@ func (h *Handler) HandleStream(s inet.Stream) {
|
|||||||
// TODO: Review: Not signed?
|
// TODO: Review: Not signed?
|
||||||
proposalNd, err := cbor.WrapObject(proposal.Proposal, math.MaxUint64, -1)
|
proposalNd, err := cbor.WrapObject(proposal.Proposal, math.MaxUint64, -1)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
log.Error(err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -189,12 +218,27 @@ func (h *Handler) HandleStream(s inet.Stream) {
|
|||||||
log.Errorw("failed to serialize response message", "error", err)
|
log.Errorw("failed to serialize response message", "error", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
sig, err := h.w.Sign(proposal.Proposal.MinerAddress, msg)
|
|
||||||
|
def, err := h.w.ListAddrs()
|
||||||
|
if err != nil {
|
||||||
|
log.Error(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if len(def) != 1 {
|
||||||
|
// NOTE: If this ever happens for a good reason, implement this with GetWorker on the miner actor
|
||||||
|
// TODO: implement with GetWorker on the miner actor
|
||||||
|
log.Errorf("Expected only 1 address in wallet, got %d", len(def))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
sig, err := h.w.Sign(def[0], msg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorw("failed to sign response message", "error", err)
|
log.Errorw("failed to sign response message", "error", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
log.Info("accepting deal")
|
||||||
|
|
||||||
signedResponse := &SignedStorageDealResponse{
|
signedResponse := &SignedStorageDealResponse{
|
||||||
Response: response,
|
Response: response,
|
||||||
Signature: sig,
|
Signature: sig,
|
||||||
@ -206,9 +250,12 @@ func (h *Handler) HandleStream(s inet.Stream) {
|
|||||||
|
|
||||||
ref, err := cid.Parse(proposal.Proposal.PieceRef)
|
ref, err := cid.Parse(proposal.Proposal.PieceRef)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
log.Error(err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
log.Info("processing deal")
|
||||||
|
|
||||||
h.incoming <- MinerDeal{
|
h.incoming <- MinerDeal{
|
||||||
Client: s.Conn().RemotePeer(),
|
Client: s.Conn().RemotePeer(),
|
||||||
Proposal: proposal.Proposal,
|
Proposal: proposal.Proposal,
|
||||||
@ -218,3 +265,8 @@ func (h *Handler) HandleStream(s inet.Stream) {
|
|||||||
Ref: ref,
|
Ref: ref,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (h *Handler) Stop() {
|
||||||
|
close(h.stop)
|
||||||
|
<-h.stopped
|
||||||
|
}
|
||||||
|
@ -89,7 +89,7 @@ type StorageDealResponse struct {
|
|||||||
PieceInclusionProof PieceInclusionProof
|
PieceInclusionProof PieceInclusionProof
|
||||||
|
|
||||||
// Complete
|
// Complete
|
||||||
SectorCommitMessage cid.Cid
|
SectorCommitMessage *cid.Cid
|
||||||
}
|
}
|
||||||
|
|
||||||
type SignedStorageDealResponse struct {
|
type SignedStorageDealResponse struct {
|
||||||
|
@ -38,6 +38,9 @@ func (w *Wallet) Sign(addr address.Address, msg []byte) (*types.Signature, error
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
if ki == nil {
|
||||||
|
return nil, xerrors.Errorf("signing using key '%s': %w", addr.String(), repo.ErrKeyNotFound)
|
||||||
|
}
|
||||||
|
|
||||||
switch ki.Type {
|
switch ki.Type {
|
||||||
case types.KTSecp256k1:
|
case types.KTSecp256k1:
|
||||||
|
@ -103,8 +103,20 @@ func StorageMiner(mctx helpers.MetricsCtx, lc fx.Lifecycle, api api.FullNode, h
|
|||||||
return sm, nil
|
return sm, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func HandleDeals(h host.Host, handler *deals.Handler) {
|
func HandleDeals(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, h *deals.Handler) {
|
||||||
h.SetStreamHandler(deals.ProtocolID, handler.HandleStream)
|
ctx := helpers.LifecycleCtx(mctx, lc)
|
||||||
|
|
||||||
|
lc.Append(fx.Hook{
|
||||||
|
OnStart: func(context.Context) error {
|
||||||
|
h.Run(ctx)
|
||||||
|
host.SetStreamHandler(deals.ProtocolID, h.HandleStream)
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
OnStop: func(context.Context) error {
|
||||||
|
h.Stop()
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func StagingDAG(mctx helpers.MetricsCtx, lc fx.Lifecycle, r repo.LockedRepo, rt routing.Routing, h host.Host) (dtypes.StagingDAG, error) {
|
func StagingDAG(mctx helpers.MetricsCtx, lc fx.Lifecycle, r repo.LockedRepo, rt routing.Routing, h host.Host) (dtypes.StagingDAG, error) {
|
||||||
|
Loading…
Reference in New Issue
Block a user