Merge pull request #187 from filecoin-project/feat/client-deal-store

Deal client improvements
This commit is contained in:
Łukasz Magiera 2019-09-13 22:50:32 +02:00 committed by GitHub
commit 08b6d34adc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 829 additions and 290 deletions

View File

@ -94,7 +94,6 @@ dist-clean:
.PHONY: dist-clean .PHONY: dist-clean
type-gen: type-gen:
rm -f ./chain/types/cbor_gen.go
go run ./gen/main.go go run ./gen/main.go
print-%: print-%:

View File

@ -2,7 +2,6 @@ package api
import ( import (
"context" "context"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
"github.com/ipfs/go-filestore" "github.com/ipfs/go-filestore"
cbor "github.com/ipfs/go-ipld-cbor" cbor "github.com/ipfs/go-ipld-cbor"
@ -86,6 +85,7 @@ type FullNode interface {
// ClientImport imports file under the specified path into filestore // ClientImport imports file under the specified path into filestore
ClientImport(ctx context.Context, path string) (cid.Cid, error) ClientImport(ctx context.Context, path string) (cid.Cid, error)
ClientStartDeal(ctx context.Context, data cid.Cid, miner address.Address, price types.BigInt, blocksDuration uint64) (*cid.Cid, error) ClientStartDeal(ctx context.Context, data cid.Cid, miner address.Address, price types.BigInt, blocksDuration uint64) (*cid.Cid, error)
ClientListDeals(ctx context.Context) ([]DealInfo, error)
ClientHasLocal(ctx context.Context, root cid.Cid) (bool, error) ClientHasLocal(ctx context.Context, root cid.Cid) (bool, error)
ClientFindData(ctx context.Context, root cid.Cid) ([]QueryOffer, error) // TODO: specify serialization mode we want (defaults to unixfs for now) ClientFindData(ctx context.Context, root cid.Cid) ([]QueryOffer, error) // TODO: specify serialization mode we want (defaults to unixfs for now)
ClientRetrieve(ctx context.Context, order RetrievalOrder, path string) error ClientRetrieve(ctx context.Context, order RetrievalOrder, path string) error
@ -107,10 +107,11 @@ type FullNode interface {
StateGetActor(ctx context.Context, actor address.Address, ts *types.TipSet) (*types.Actor, error) StateGetActor(ctx context.Context, actor address.Address, ts *types.TipSet) (*types.Actor, error)
StateReadState(ctx context.Context, act *types.Actor, ts *types.TipSet) (*ActorState, error) StateReadState(ctx context.Context, act *types.Actor, ts *types.TipSet) (*ActorState, error)
PaychCreate(ctx context.Context, from, to address.Address, amt types.BigInt) (address.Address, error) PaychCreate(ctx context.Context, from, to address.Address, amt types.BigInt) (*ChannelInfo, error)
PaychList(context.Context) ([]address.Address, error) PaychList(context.Context) ([]address.Address, error)
PaychStatus(context.Context, address.Address) (*PaychStatus, error) PaychStatus(context.Context, address.Address) (*PaychStatus, error)
PaychClose(context.Context, address.Address) (cid.Cid, error) PaychClose(context.Context, address.Address) (cid.Cid, error)
PaychNewPayment(ctx context.Context, from, to address.Address, amount types.BigInt, extra *types.ModVerifyParams, tl uint64, minClose uint64) (*PaymentInfo, error)
PaychVoucherCheckValid(context.Context, address.Address, *types.SignedVoucher) error PaychVoucherCheckValid(context.Context, address.Address, *types.SignedVoucher) error
PaychVoucherCheckSpendable(context.Context, address.Address, *types.SignedVoucher, []byte, []byte) (bool, error) PaychVoucherCheckSpendable(context.Context, address.Address, *types.SignedVoucher, []byte, []byte) (bool, error)
PaychVoucherCreate(context.Context, address.Address, types.BigInt, uint64) (*types.SignedVoucher, error) PaychVoucherCreate(context.Context, address.Address, types.BigInt, uint64) (*types.SignedVoucher, error)
@ -160,6 +161,19 @@ type Import struct {
Size uint64 Size uint64
} }
type DealInfo struct {
ProposalCid cid.Cid
State DealState
Miner address.Address
PieceRef cid.Cid
CommP []byte
Size uint64
TotalPrice types.BigInt
Duration uint64
}
type MsgWait struct { type MsgWait struct {
InBlock cid.Cid InBlock cid.Cid
Receipt types.MessageReceipt Receipt types.MessageReceipt
@ -194,6 +208,17 @@ type PaychStatus struct {
Direction PCHDir Direction PCHDir
} }
type ChannelInfo struct {
Channel address.Address
ChannelMessage cid.Cid
}
type PaymentInfo struct {
Channel address.Address
ChannelMessage *cid.Cid
Voucher *types.SignedVoucher
}
type MinerPower struct { type MinerPower struct {
MinerPower types.BigInt MinerPower types.BigInt
TotalPower types.BigInt TotalPower types.BigInt

View File

@ -70,6 +70,7 @@ type FullNodeStruct struct {
ClientHasLocal func(ctx context.Context, root cid.Cid) (bool, error) `perm:"write"` ClientHasLocal func(ctx context.Context, root cid.Cid) (bool, error) `perm:"write"`
ClientFindData func(ctx context.Context, root cid.Cid) ([]QueryOffer, error) `perm:"read"` ClientFindData func(ctx context.Context, root cid.Cid) ([]QueryOffer, error) `perm:"read"`
ClientStartDeal func(ctx context.Context, data cid.Cid, miner address.Address, price types.BigInt, blocksDuration uint64) (*cid.Cid, error) `perm:"admin"` ClientStartDeal func(ctx context.Context, data cid.Cid, miner address.Address, price types.BigInt, blocksDuration uint64) (*cid.Cid, error) `perm:"admin"`
ClientListDeals func(ctx context.Context) ([]DealInfo, error) `perm:"write"`
ClientRetrieve func(ctx context.Context, order RetrievalOrder, path string) error `perm:"admin"` ClientRetrieve func(ctx context.Context, order RetrievalOrder, path string) error `perm:"admin"`
StateMinerSectors func(context.Context, address.Address) ([]*SectorInfo, error) `perm:"read"` StateMinerSectors func(context.Context, address.Address) ([]*SectorInfo, error) `perm:"read"`
@ -80,17 +81,18 @@ type FullNodeStruct struct {
StateGetActor func(context.Context, address.Address, *types.TipSet) (*types.Actor, error) `perm:"read"` StateGetActor func(context.Context, address.Address, *types.TipSet) (*types.Actor, error) `perm:"read"`
StateReadState func(context.Context, *types.Actor, *types.TipSet) (*ActorState, error) `perm:"read"` StateReadState func(context.Context, *types.Actor, *types.TipSet) (*ActorState, error) `perm:"read"`
PaychCreate func(ctx context.Context, from, to address.Address, amt types.BigInt) (address.Address, error) `perm:"sign"` PaychCreate func(ctx context.Context, from, to address.Address, amt types.BigInt) (*ChannelInfo, error) `perm:"sign"`
PaychList func(context.Context) ([]address.Address, error) `perm:"read"` PaychList func(context.Context) ([]address.Address, error) `perm:"read"`
PaychStatus func(context.Context, address.Address) (*PaychStatus, error) `perm:"read"` PaychStatus func(context.Context, address.Address) (*PaychStatus, error) `perm:"read"`
PaychClose func(context.Context, address.Address) (cid.Cid, error) `perm:"sign"` PaychClose func(context.Context, address.Address) (cid.Cid, error) `perm:"sign"`
PaychVoucherCheck func(context.Context, *types.SignedVoucher) error `perm:"read"` PaychNewPayment func(ctx context.Context, from, to address.Address, amount types.BigInt, extra *types.ModVerifyParams, tl uint64, minClose uint64) (*PaymentInfo, error) `perm:"sign"`
PaychVoucherCheckValid func(context.Context, address.Address, *types.SignedVoucher) error `perm:"read"` PaychVoucherCheck func(context.Context, *types.SignedVoucher) error `perm:"read"`
PaychVoucherCheckSpendable func(context.Context, address.Address, *types.SignedVoucher, []byte, []byte) (bool, error) `perm:"read"` PaychVoucherCheckValid func(context.Context, address.Address, *types.SignedVoucher) error `perm:"read"`
PaychVoucherAdd func(context.Context, address.Address, *types.SignedVoucher, []byte) error `perm:"write"` PaychVoucherCheckSpendable func(context.Context, address.Address, *types.SignedVoucher, []byte, []byte) (bool, error) `perm:"read"`
PaychVoucherCreate func(context.Context, address.Address, types.BigInt, uint64) (*types.SignedVoucher, error) `perm:"sign"` PaychVoucherAdd func(context.Context, address.Address, *types.SignedVoucher, []byte) error `perm:"write"`
PaychVoucherList func(context.Context, address.Address) ([]*types.SignedVoucher, error) `perm:"write"` PaychVoucherCreate func(context.Context, address.Address, types.BigInt, uint64) (*types.SignedVoucher, error) `perm:"sign"`
PaychVoucherSubmit func(context.Context, address.Address, *types.SignedVoucher) (cid.Cid, error) `perm:"sign"` PaychVoucherList func(context.Context, address.Address) ([]*types.SignedVoucher, error) `perm:"write"`
PaychVoucherSubmit func(context.Context, address.Address, *types.SignedVoucher) (cid.Cid, error) `perm:"sign"`
} }
} }
@ -168,6 +170,10 @@ func (c *FullNodeStruct) ClientStartDeal(ctx context.Context, data cid.Cid, mine
return c.Internal.ClientStartDeal(ctx, data, miner, price, blocksDuration) return c.Internal.ClientStartDeal(ctx, data, miner, price, blocksDuration)
} }
func (c *FullNodeStruct) ClientListDeals(ctx context.Context) ([]DealInfo, error) {
return c.Internal.ClientListDeals(ctx)
}
func (c *FullNodeStruct) ClientRetrieve(ctx context.Context, order RetrievalOrder, path string) error { func (c *FullNodeStruct) ClientRetrieve(ctx context.Context, order RetrievalOrder, path string) error {
return c.Internal.ClientRetrieve(ctx, order, path) return c.Internal.ClientRetrieve(ctx, order, path)
} }
@ -288,7 +294,7 @@ func (c *FullNodeStruct) StateReadState(ctx context.Context, act *types.Actor, t
return c.Internal.StateReadState(ctx, act, ts) return c.Internal.StateReadState(ctx, act, ts)
} }
func (c *FullNodeStruct) PaychCreate(ctx context.Context, from, to address.Address, amt types.BigInt) (address.Address, error) { func (c *FullNodeStruct) PaychCreate(ctx context.Context, from, to address.Address, amt types.BigInt) (*ChannelInfo, error) {
return c.Internal.PaychCreate(ctx, from, to, amt) return c.Internal.PaychCreate(ctx, from, to, amt)
} }
@ -324,6 +330,10 @@ func (c *FullNodeStruct) PaychClose(ctx context.Context, a address.Address) (cid
return c.Internal.PaychClose(ctx, a) return c.Internal.PaychClose(ctx, a)
} }
func (c *FullNodeStruct) PaychNewPayment(ctx context.Context, from, to address.Address, amount types.BigInt, extra *types.ModVerifyParams, tl uint64, minClose uint64) (*PaymentInfo, error) {
return c.Internal.PaychNewPayment(ctx, from, to, amount, extra, tl, minClose)
}
func (c *FullNodeStruct) PaychVoucherSubmit(ctx context.Context, ch address.Address, sv *types.SignedVoucher) (cid.Cid, error) { func (c *FullNodeStruct) PaychVoucherSubmit(ctx context.Context, ch address.Address, sv *types.SignedVoucher) (cid.Cid, error) {
return c.Internal.PaychVoucherSubmit(ctx, ch, sv) return c.Internal.PaychVoucherSubmit(ctx, ch, sv)
} }

View File

@ -6,6 +6,24 @@ import (
ma "github.com/multiformats/go-multiaddr" ma "github.com/multiformats/go-multiaddr"
) )
type DealState int
const (
DealUnknown = DealState(iota)
DealRejected
DealAccepted
DealStarted
DealFailed
DealStaged
DealSealing
DealComplete
// Client specific
DealError // deal failed with an unexpected error
DealExpired
)
// TODO: check if this exists anywhere else // TODO: check if this exists anywhere else
type MultiaddrSlice []ma.Multiaddr type MultiaddrSlice []ma.Multiaddr

View File

@ -17,7 +17,7 @@ type PaymentChannelActor struct{}
type PaymentInfo struct { type PaymentInfo struct {
PayChActor address.Address PayChActor address.Address
Payer address.Address Payer address.Address
ChannelMessage cid.Cid ChannelMessage *cid.Cid
Vouchers []*types.SignedVoucher Vouchers []*types.SignedVoucher
} }

View File

@ -2195,7 +2195,7 @@ func (t *PaymentInfo) MarshalCBOR(w io.Writer) error {
} }
// t.t.ChannelMessage (cid.Cid) // t.t.ChannelMessage (cid.Cid)
if err := cbg.WriteCid(w, t.ChannelMessage); err != nil { if err := cbg.WriteCid(w, *t.ChannelMessage); err != nil {
return xerrors.Errorf("failed to write cid field t.ChannelMessage: %w", err) return xerrors.Errorf("failed to write cid field t.ChannelMessage: %w", err)
} }
@ -2251,7 +2251,7 @@ func (t *PaymentInfo) UnmarshalCBOR(r io.Reader) error {
if err != nil { if err != nil {
return xerrors.Errorf("failed to read cid field t.ChannelMessage: %w", err) return xerrors.Errorf("failed to read cid field t.ChannelMessage: %w", err)
} }
t.ChannelMessage = c t.ChannelMessage = &c
} }
// t.t.Vouchers ([]*types.SignedVoucher) // t.t.Vouchers ([]*types.SignedVoucher)

View File

@ -4,47 +4,44 @@ import (
"context" "context"
"math" "math"
sectorbuilder "github.com/filecoin-project/go-sectorbuilder"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace" "github.com/ipfs/go-datastore/namespace"
files "github.com/ipfs/go-ipfs-files"
cbor "github.com/ipfs/go-ipld-cbor" cbor "github.com/ipfs/go-ipld-cbor"
logging "github.com/ipfs/go-log" logging "github.com/ipfs/go-log"
unixfile "github.com/ipfs/go-unixfs/file"
"github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/host"
inet "github.com/libp2p/go-libp2p-core/network" inet "github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peer"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-lotus/api"
"github.com/filecoin-project/go-lotus/chain/actors" "github.com/filecoin-project/go-lotus/chain/actors"
"github.com/filecoin-project/go-lotus/chain/address" "github.com/filecoin-project/go-lotus/chain/address"
"github.com/filecoin-project/go-lotus/chain/store" "github.com/filecoin-project/go-lotus/chain/store"
"github.com/filecoin-project/go-lotus/chain/types" "github.com/filecoin-project/go-lotus/chain/types"
"github.com/filecoin-project/go-lotus/chain/wallet" "github.com/filecoin-project/go-lotus/chain/wallet"
"github.com/filecoin-project/go-lotus/lib/cborrpc"
"github.com/filecoin-project/go-lotus/node/modules/dtypes" "github.com/filecoin-project/go-lotus/node/modules/dtypes"
"github.com/filecoin-project/go-lotus/retrieval/discovery" "github.com/filecoin-project/go-lotus/retrieval/discovery"
) )
func init() { func init() {
cbor.RegisterCborType(ClientDeal{}) cbor.RegisterCborType(ClientDeal{})
cbor.RegisterCborType(actors.PieceInclVoucherData{}) // TODO: USE CBORGEN!
cbor.RegisterCborType(types.SignedVoucher{})
cbor.RegisterCborType(types.ModVerifyParams{})
cbor.RegisterCborType(types.Signature{})
cbor.RegisterCborType(actors.PaymentInfo{})
cbor.RegisterCborType(actors.InclusionProof{})
} }
var log = logging.Logger("deals") var log = logging.Logger("deals")
const ProtocolID = "/fil/storage/mk/1.0.0"
type DealStatus int
const (
DealResolvingMiner = DealStatus(iota)
)
type ClientDeal struct { type ClientDeal struct {
ProposalCid cid.Cid ProposalCid cid.Cid
Status DealStatus Proposal StorageDealProposal
State api.DealState
Miner peer.ID Miner peer.ID
s inet.Stream
} }
type Client struct { type Client struct {
@ -54,14 +51,22 @@ type Client struct {
dag dtypes.ClientDAG dag dtypes.ClientDAG
discovery *discovery.Local discovery *discovery.Local
deals StateStore deals ClientStateStore
conns map[cid.Cid]inet.Stream
incoming chan ClientDeal incoming chan ClientDeal
updated chan clientDealUpdate
stop chan struct{} stop chan struct{}
stopped chan struct{} stopped chan struct{}
} }
type clientDealUpdate struct {
newState api.DealState
id cid.Cid
err error
}
func NewClient(cs *store.ChainStore, h host.Host, w *wallet.Wallet, ds dtypes.MetadataDS, dag dtypes.ClientDAG, discovery *discovery.Local) *Client { func NewClient(cs *store.ChainStore, h host.Host, w *wallet.Wallet, ds dtypes.MetadataDS, dag dtypes.ClientDAG, discovery *discovery.Local) *Client {
c := &Client{ c := &Client{
cs: cs, cs: cs,
@ -70,9 +75,11 @@ func NewClient(cs *store.ChainStore, h host.Host, w *wallet.Wallet, ds dtypes.Me
dag: dag, dag: dag,
discovery: discovery, discovery: discovery,
deals: StateStore{ds: namespace.Wrap(ds, datastore.NewKey("/deals/client"))}, deals: ClientStateStore{StateStore{ds: namespace.Wrap(ds, datastore.NewKey("/deals/client"))}},
conns: map[cid.Cid]inet.Stream{},
incoming: make(chan ClientDeal, 16), incoming: make(chan ClientDeal, 16),
updated: make(chan clientDealUpdate, 16),
stop: make(chan struct{}), stop: make(chan struct{}),
stopped: make(chan struct{}), stopped: make(chan struct{}),
@ -81,21 +88,16 @@ func NewClient(cs *store.ChainStore, h host.Host, w *wallet.Wallet, ds dtypes.Me
return c return c
} }
func (c *Client) Run() { func (c *Client) Run(ctx context.Context) {
go func() { go func() {
defer close(c.stopped) defer close(c.stopped)
for { for {
select { select {
case deal := <-c.incoming: case deal := <-c.incoming:
log.Info("incoming deal") c.onIncoming(deal)
case update := <-c.updated:
// TODO: track in datastore c.onUpdated(ctx, update)
if err := c.deals.Begin(deal.ProposalCid, deal); err != nil {
log.Errorf("deal state begin failed: %s", err)
continue
}
case <-c.stop: case <-c.stop:
return return
} }
@ -103,87 +105,59 @@ func (c *Client) Run() {
}() }()
} }
func (c *Client) commP(ctx context.Context, data cid.Cid) ([]byte, int64, error) { func (c *Client) onIncoming(deal ClientDeal) {
root, err := c.dag.Get(ctx, data) log.Info("incoming deal")
if err != nil {
log.Errorf("failed to get file root for deal: %s", err) if _, ok := c.conns[deal.ProposalCid]; ok {
return nil, 0, err log.Errorf("tracking deal connection: already tracking connection for deal %s", deal.ProposalCid)
return
}
c.conns[deal.ProposalCid] = deal.s
if err := c.deals.Begin(deal.ProposalCid, deal); err != nil {
// We may have re-sent the proposal
log.Errorf("deal tracking failed: %s", err)
c.failDeal(deal.ProposalCid, err)
return
} }
n, err := unixfile.NewUnixfsFile(ctx, c.dag, root) go func() {
if err != nil { c.updated <- clientDealUpdate{
log.Errorf("cannot open unixfs file: %s", err) newState: api.DealUnknown,
return nil, 0, err id: deal.ProposalCid,
} err: nil,
}
}()
}
uf, ok := n.(files.File) func (c *Client) onUpdated(ctx context.Context, update clientDealUpdate) {
if !ok { log.Infof("Deal %s updated state to %d", update.id, update.newState)
// TODO: we probably got directory, how should we handle this in unixfs mode? var deal ClientDeal
return nil, 0, xerrors.New("unsupported unixfs type") err := c.deals.MutateClient(update.id, func(d *ClientDeal) error {
} d.State = update.newState
deal = *d
size, err := uf.Size() return nil
if err != nil {
return nil, 0, err
}
var commP [sectorbuilder.CommitmentBytesLen]byte
err = withTemp(uf, func(f string) error {
commP, err = sectorbuilder.GeneratePieceCommitment(f, uint64(size))
return err
}) })
return commP[:], size, err if update.err != nil {
} log.Errorf("deal %s failed: %s", update.id, update.err)
c.failDeal(update.id, update.err)
func (c *Client) sendProposal(s inet.Stream, proposal StorageDealProposal, from address.Address) error { return
log.Info("Sending deal proposal") }
msg, err := cbor.DumpObject(proposal)
if err != nil { if err != nil {
return err c.failDeal(update.id, err)
} return
sig, err := c.w.Sign(context.TODO(), from, msg)
if err != nil {
return err
} }
signedProposal := &SignedStorageDealProposal{ switch update.newState {
Proposal: proposal, case api.DealUnknown: // new
Signature: sig, c.handle(ctx, deal, c.new, api.DealAccepted)
case api.DealAccepted:
c.handle(ctx, deal, c.accepted, api.DealStaged)
case api.DealStaged:
c.handle(ctx, deal, c.staged, api.DealSealing)
case api.DealSealing:
c.handle(ctx, deal, c.sealing, api.DealComplete)
} }
return cborrpc.WriteCborRPC(s, signedProposal)
}
func (c *Client) waitAccept(s inet.Stream, proposal StorageDealProposal, minerID peer.ID) (ClientDeal, error) {
log.Info("Waiting for response")
var resp SignedStorageDealResponse
if err := cborrpc.ReadCborRPC(s, &resp); err != nil {
log.Errorw("failed to read StorageDealResponse message", "error", err)
return ClientDeal{}, err
}
// TODO: verify signature
if resp.Response.State != Accepted {
return ClientDeal{}, xerrors.Errorf("Deal wasn't accepted (State=%d)", resp.Response.State)
}
proposalNd, err := cbor.WrapObject(proposal, math.MaxUint64, -1)
if err != nil {
return ClientDeal{}, err
}
if resp.Response.Proposal != proposalNd.Cid() {
return ClientDeal{}, xerrors.New("miner responded to a wrong proposal")
}
return ClientDeal{
ProposalCid: proposalNd.Cid(),
Status: DealResolvingMiner,
Miner: minerID,
}, nil
} }
type ClientDealProposal struct { type ClientDealProposal struct {
@ -228,21 +202,27 @@ func (c *Client) Start(ctx context.Context, p ClientDealProposal, vd *actors.Pie
if err != nil { if err != nil {
return cid.Undef, err return cid.Undef, err
} }
defer s.Reset() // TODO: handle other updates
if err := c.sendProposal(s, proposal, p.ClientAddress); err != nil { if err := c.sendProposal(s, proposal, p.ClientAddress); err != nil {
return cid.Undef, err return cid.Undef, err
} }
deal, err := c.waitAccept(s, proposal, p.MinerID) proposalNd, err := cbor.WrapObject(proposal, math.MaxUint64, -1)
if err != nil { if err != nil {
return cid.Undef, err return cid.Undef, err
} }
log.Info("DEAL ACCEPTED!") deal := ClientDeal{
ProposalCid: proposalNd.Cid(),
Proposal: proposal,
State: api.DealUnknown,
Miner: p.MinerID,
s: s,
}
// TODO: actually care about what happens with the deal after it was accepted // TODO: actually care about what happens with the deal after it was accepted
//c.incoming <- deal c.incoming <- deal
// TODO: start tracking after the deal is sealed // TODO: start tracking after the deal is sealed
return deal.ProposalCid, c.discovery.AddPeer(p.Data, discovery.RetrievalPeer{ return deal.ProposalCid, c.discovery.AddPeer(p.Data, discovery.RetrievalPeer{
@ -251,6 +231,10 @@ func (c *Client) Start(ctx context.Context, p ClientDealProposal, vd *actors.Pie
}) })
} }
func (c *Client) List() ([]ClientDeal, error) {
return c.deals.ListClient()
}
func (c *Client) Stop() { func (c *Client) Stop() {
close(c.stop) close(c.stop)
<-c.stopped <-c.stopped

View File

@ -0,0 +1,103 @@
package deals
import (
"context"
"github.com/filecoin-project/go-lotus/api"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-lotus/build"
"github.com/filecoin-project/go-lotus/lib/sectorbuilder"
)
type clientHandlerFunc func(ctx context.Context, deal ClientDeal) error
func (c *Client) handle(ctx context.Context, deal ClientDeal, cb clientHandlerFunc, next api.DealState) {
go func() {
err := cb(ctx, deal)
if err != nil {
next = api.DealError
}
select {
case c.updated <- clientDealUpdate{
newState: next,
id: deal.ProposalCid,
err: err,
}:
case <-c.stop:
}
}()
}
func (c *Client) new(ctx context.Context, deal ClientDeal) error {
resp, err := c.readStorageDealResp(deal)
if err != nil {
return err
}
if resp.State != api.DealAccepted {
return xerrors.Errorf("deal wasn't accepted (State=%d)", resp.State)
}
log.Info("DEAL ACCEPTED!")
return nil
}
func (c *Client) accepted(ctx context.Context, deal ClientDeal) error {
/* data transfer happens */
resp, err := c.readStorageDealResp(deal)
if err != nil {
return err
}
if resp.State != api.DealStaged {
return xerrors.Errorf("deal wasn't staged (State=%d)", resp.State)
}
log.Info("DEAL STAGED!")
return nil
}
func (c *Client) staged(ctx context.Context, deal ClientDeal) error {
/* miner seals our data, hopefully */
resp, err := c.readStorageDealResp(deal)
if err != nil {
return err
}
if resp.State != api.DealSealing {
return xerrors.Errorf("deal wasn't sealed (State=%d)", resp.State)
}
log.Info("DEAL SEALED!")
ok, err := sectorbuilder.VerifyPieceInclusionProof(build.SectorSize, deal.Proposal.Size, deal.Proposal.CommP, resp.CommD, resp.PieceInclusionProof.ProofElements)
if err != nil {
return xerrors.Errorf("verifying piece inclusion proof in staged deal %s: %w", deal.ProposalCid, err)
}
if !ok {
return xerrors.Errorf("verifying piece inclusion proof in staged deal %s failed", deal.ProposalCid)
}
return nil
}
func (c *Client) sealing(ctx context.Context, deal ClientDeal) error {
resp, err := c.readStorageDealResp(deal)
if err != nil {
return err
}
if resp.State != api.DealComplete {
return xerrors.Errorf("deal wasn't complete (State=%d)", resp.State)
}
// TODO: look for the commit message on chain, negotiate better payment vouchers
log.Info("DEAL COMPLETE!!")
return nil
}

107
chain/deals/client_utils.go Normal file
View File

@ -0,0 +1,107 @@
package deals
import (
"context"
"runtime"
sectorbuilder "github.com/filecoin-project/go-sectorbuilder"
"github.com/ipfs/go-cid"
files "github.com/ipfs/go-ipfs-files"
cbor "github.com/ipfs/go-ipld-cbor"
unixfile "github.com/ipfs/go-unixfs/file"
inet "github.com/libp2p/go-libp2p-core/network"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-lotus/chain/address"
"github.com/filecoin-project/go-lotus/lib/cborrpc"
)
func (c *Client) failDeal(id cid.Cid, cerr error) {
if cerr == nil {
_, f, l, _ := runtime.Caller(1)
cerr = xerrors.Errorf("unknown error (fail called at %s:%d)", f, l)
}
s, ok := c.conns[id]
if ok {
_ = s.Reset()
delete(c.conns, id)
}
// TODO: store in some sort of audit log
log.Errorf("deal %s failed: %s", id, cerr)
}
func (c *Client) commP(ctx context.Context, data cid.Cid) ([]byte, int64, error) {
root, err := c.dag.Get(ctx, data)
if err != nil {
log.Errorf("failed to get file root for deal: %s", err)
return nil, 0, err
}
n, err := unixfile.NewUnixfsFile(ctx, c.dag, root)
if err != nil {
log.Errorf("cannot open unixfs file: %s", err)
return nil, 0, err
}
uf, ok := n.(files.File)
if !ok {
// TODO: we probably got directory, how should we handle this in unixfs mode?
return nil, 0, xerrors.New("unsupported unixfs type")
}
size, err := uf.Size()
if err != nil {
return nil, 0, err
}
var commP [sectorbuilder.CommitmentBytesLen]byte
err = withTemp(uf, func(f string) error {
commP, err = sectorbuilder.GeneratePieceCommitment(f, uint64(size))
return err
})
return commP[:], size, err
}
func (c *Client) sendProposal(s inet.Stream, proposal StorageDealProposal, from address.Address) error {
log.Info("Sending deal proposal")
msg, err := cbor.DumpObject(proposal)
if err != nil {
return err
}
sig, err := c.w.Sign(context.TODO(), from, msg)
if err != nil {
return err
}
signedProposal := &SignedStorageDealProposal{
Proposal: proposal,
Signature: sig,
}
return cborrpc.WriteCborRPC(s, signedProposal)
}
func (c *Client) readStorageDealResp(deal ClientDeal) (*StorageDealResponse, error) {
s, ok := c.conns[deal.ProposalCid]
if !ok {
// TODO: Try to re-establish the connection using query protocol
return nil, xerrors.Errorf("no connection to miner")
}
var resp SignedStorageDealResponse
if err := cborrpc.ReadCborRPC(s, &resp); err != nil {
log.Errorw("failed to read StorageDealResponse message", "error", err)
return nil, err
}
// TODO: verify signature
if resp.Response.Proposal != deal.ProposalCid {
return nil, xerrors.New("miner responded to a wrong proposal")
}
return &resp.Response, nil
}

View File

@ -2,19 +2,20 @@ package deals
import ( import (
"context" "context"
"github.com/filecoin-project/go-lotus/chain/types"
"github.com/filecoin-project/go-lotus/storage/sectorblocks"
"math" "math"
"github.com/filecoin-project/go-lotus/api"
"github.com/filecoin-project/go-lotus/chain/address"
"github.com/filecoin-project/go-lotus/node/modules/dtypes"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace" "github.com/ipfs/go-datastore/namespace"
cbor "github.com/ipfs/go-ipld-cbor" cbor "github.com/ipfs/go-ipld-cbor"
inet "github.com/libp2p/go-libp2p-core/network" inet "github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peer"
"github.com/filecoin-project/go-lotus/api"
"github.com/filecoin-project/go-lotus/chain/address"
"github.com/filecoin-project/go-lotus/chain/types"
"github.com/filecoin-project/go-lotus/node/modules/dtypes"
"github.com/filecoin-project/go-lotus/storage/sectorblocks"
) )
func init() { func init() {
@ -25,11 +26,11 @@ type MinerDeal struct {
Client peer.ID Client peer.ID
Proposal StorageDealProposal Proposal StorageDealProposal
ProposalCid cid.Cid ProposalCid cid.Cid
State DealState State api.DealState
Ref cid.Cid Ref cid.Cid
SectorID uint64 // Set when State >= Staged SectorID uint64 // Set when State >= DealStaged
s inet.Stream s inet.Stream
} }
@ -44,19 +45,19 @@ type Handler struct {
// TODO: GC // TODO: GC
dag dtypes.StagingDAG dag dtypes.StagingDAG
deals StateStore deals MinerStateStore
conns map[cid.Cid]inet.Stream conns map[cid.Cid]inet.Stream
actor address.Address actor address.Address
incoming chan MinerDeal incoming chan MinerDeal
updated chan dealUpdate updated chan minerDealUpdate
stop chan struct{} stop chan struct{}
stopped chan struct{} stopped chan struct{}
} }
type dealUpdate struct { type minerDealUpdate struct {
newState DealState newState api.DealState
id cid.Cid id cid.Cid
err error err error
mut func(*MinerDeal) mut func(*MinerDeal)
@ -82,13 +83,13 @@ func NewHandler(ds dtypes.MetadataDS, secst *sectorblocks.SectorBlocks, dag dtyp
conns: map[cid.Cid]inet.Stream{}, conns: map[cid.Cid]inet.Stream{},
incoming: make(chan MinerDeal), incoming: make(chan MinerDeal),
updated: make(chan dealUpdate), updated: make(chan minerDealUpdate),
stop: make(chan struct{}), stop: make(chan struct{}),
stopped: make(chan struct{}), stopped: make(chan struct{}),
actor: minerAddress, actor: minerAddress,
deals: StateStore{ds: namespace.Wrap(ds, datastore.NewKey("/deals/client"))}, deals: MinerStateStore{StateStore{ds: namespace.Wrap(ds, datastore.NewKey("/deals/client"))}},
}, nil }, nil
} }
@ -101,9 +102,9 @@ func (h *Handler) Run(ctx context.Context) {
for { for {
select { select {
case deal := <-h.incoming: // Accepted case deal := <-h.incoming: // DealAccepted
h.onIncoming(deal) h.onIncoming(deal)
case update := <-h.updated: // Staged case update := <-h.updated: // DealStaged
h.onUpdated(ctx, update) h.onUpdated(ctx, update)
case <-h.stop: case <-h.stop:
return return
@ -125,15 +126,15 @@ func (h *Handler) onIncoming(deal MinerDeal) {
} }
go func() { go func() {
h.updated <- dealUpdate{ h.updated <- minerDealUpdate{
newState: Accepted, newState: api.DealAccepted,
id: deal.ProposalCid, id: deal.ProposalCid,
err: nil, err: nil,
} }
}() }()
} }
func (h *Handler) onUpdated(ctx context.Context, update dealUpdate) { func (h *Handler) onUpdated(ctx context.Context, update minerDealUpdate) {
log.Infof("Deal %s updated state to %d", update.id, update.newState) log.Infof("Deal %s updated state to %d", update.id, update.newState)
if update.err != nil { if update.err != nil {
log.Errorf("deal %s failed: %s", update.id, update.err) log.Errorf("deal %s failed: %s", update.id, update.err)
@ -155,12 +156,12 @@ func (h *Handler) onUpdated(ctx context.Context, update dealUpdate) {
} }
switch update.newState { switch update.newState {
case Accepted: case api.DealAccepted:
h.handle(ctx, deal, h.accept, Staged) h.handle(ctx, deal, h.accept, api.DealStaged)
case Staged: case api.DealStaged:
h.handle(ctx, deal, h.staged, Sealing) h.handle(ctx, deal, h.staged, api.DealSealing)
case Sealing: case api.DealSealing:
h.handle(ctx, deal, h.sealing, Complete) h.handle(ctx, deal, h.sealing, api.DealComplete)
} }
} }
@ -180,7 +181,7 @@ func (h *Handler) newDeal(s inet.Stream, proposal StorageDealProposal) (MinerDea
Client: s.Conn().RemotePeer(), Client: s.Conn().RemotePeer(),
Proposal: proposal, Proposal: proposal,
ProposalCid: proposalNd.Cid(), ProposalCid: proposalNd.Cid(),
State: Unknown, State: api.DealUnknown,
Ref: ref, Ref: ref,

View File

@ -3,6 +3,7 @@ package deals
import ( import (
"bytes" "bytes"
"context" "context"
"github.com/filecoin-project/go-lotus/api"
"github.com/filecoin-project/go-lotus/build" "github.com/filecoin-project/go-lotus/build"
cbor "github.com/ipfs/go-ipld-cbor" cbor "github.com/ipfs/go-ipld-cbor"
@ -16,13 +17,13 @@ import (
"github.com/filecoin-project/go-lotus/storage/sectorblocks" "github.com/filecoin-project/go-lotus/storage/sectorblocks"
) )
type handlerFunc func(ctx context.Context, deal MinerDeal) (func(*MinerDeal), error) type minerHandlerFunc func(ctx context.Context, deal MinerDeal) (func(*MinerDeal), error)
func (h *Handler) handle(ctx context.Context, deal MinerDeal, cb handlerFunc, next DealState) { func (h *Handler) handle(ctx context.Context, deal MinerDeal, cb minerHandlerFunc, next api.DealState) {
go func() { go func() {
mut, err := cb(ctx, deal) mut, err := cb(ctx, deal)
select { select {
case h.updated <- dealUpdate{ case h.updated <- minerDealUpdate{
newState: next, newState: next,
id: deal.ProposalCid, id: deal.ProposalCid,
err: err, err: err,
@ -106,6 +107,13 @@ func (h *Handler) accept(ctx context.Context, deal MinerDeal) (func(*MinerDeal),
return nil, xerrors.Errorf("deal proposal with unsupported serialization: %s", deal.Proposal.SerializationMode) return nil, xerrors.Errorf("deal proposal with unsupported serialization: %s", deal.Proposal.SerializationMode)
} }
if deal.Proposal.Payment.ChannelMessage != nil {
log.Info("waiting for channel message to appear on chain")
if _, err := h.full.ChainWaitMsg(ctx, *deal.Proposal.Payment.ChannelMessage); err != nil {
return nil, xerrors.Errorf("waiting for paych message: %w", err)
}
}
if err := h.validateVouchers(ctx, deal); err != nil { if err := h.validateVouchers(ctx, deal); err != nil {
return nil, err return nil, err
} }
@ -118,7 +126,7 @@ func (h *Handler) accept(ctx context.Context, deal MinerDeal) (func(*MinerDeal),
log.Info("fetching data for a deal") log.Info("fetching data for a deal")
err := h.sendSignedResponse(StorageDealResponse{ err := h.sendSignedResponse(StorageDealResponse{
State: Accepted, State: api.DealAccepted,
Message: "", Message: "",
Proposal: deal.ProposalCid, Proposal: deal.ProposalCid,
}) })
@ -133,7 +141,7 @@ func (h *Handler) accept(ctx context.Context, deal MinerDeal) (func(*MinerDeal),
func (h *Handler) staged(ctx context.Context, deal MinerDeal) (func(*MinerDeal), error) { func (h *Handler) staged(ctx context.Context, deal MinerDeal) (func(*MinerDeal), error) {
err := h.sendSignedResponse(StorageDealResponse{ err := h.sendSignedResponse(StorageDealResponse{
State: Staged, State: api.DealStaged,
Proposal: deal.ProposalCid, Proposal: deal.ProposalCid,
}) })
if err != nil { if err != nil {
@ -234,9 +242,10 @@ func (h *Handler) sealing(ctx context.Context, deal MinerDeal) (func(*MinerDeal)
} }
err = h.sendSignedResponse(StorageDealResponse{ err = h.sendSignedResponse(StorageDealResponse{
State: Sealing, State: api.DealSealing,
Proposal: deal.ProposalCid, Proposal: deal.ProposalCid,
PieceInclusionProof: ip, PieceInclusionProof: ip,
CommD: status.CommD[:],
}) })
if err != nil { if err != nil {
log.Warnf("Sending deal response failed: %s", err) log.Warnf("Sending deal response failed: %s", err)

View File

@ -2,6 +2,7 @@ package deals
import ( import (
"context" "context"
"github.com/filecoin-project/go-lotus/api"
"runtime" "runtime"
"github.com/filecoin-project/go-lotus/chain/actors" "github.com/filecoin-project/go-lotus/chain/actors"
@ -28,14 +29,14 @@ func (h *Handler) failDeal(id cid.Cid, cerr error) {
log.Errorf("deal %s failed: %s", id, cerr) log.Errorf("deal %s failed: %s", id, cerr)
err := h.sendSignedResponse(StorageDealResponse{ err := h.sendSignedResponse(StorageDealResponse{
State: Failed, State: api.DealFailed,
Message: cerr.Error(), Message: cerr.Error(),
Proposal: id, Proposal: id,
}) })
s, ok := h.conns[id] s, ok := h.conns[id]
if ok { if ok {
_ = s.Close() _ = s.Reset()
delete(h.conns, id) delete(h.conns, id)
} }

View File

@ -3,6 +3,7 @@ package deals
import ( import (
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/query"
cbor "github.com/ipfs/go-ipld-cbor" cbor "github.com/ipfs/go-ipld-cbor"
"golang.org/x/xerrors" "golang.org/x/xerrors"
) )
@ -18,8 +19,7 @@ func (st *StateStore) Begin(i cid.Cid, state interface{}) error {
return err return err
} }
if has { if has {
// TODO: uncomment after deals work return xerrors.Errorf("Already tracking state for %s", i)
//return xerrors.Errorf("Already tracking state for %s", i)
} }
b, err := cbor.DumpObject(state) b, err := cbor.DumpObject(state)
@ -42,48 +42,6 @@ func (st *StateStore) End(i cid.Cid) error {
return st.ds.Delete(k) return st.ds.Delete(k)
} }
// When this gets used anywhere else, migrate to reflect
func (st *StateStore) MutateMiner(i cid.Cid, mutator func(*MinerDeal) error) error {
return st.mutate(i, minerMutator(mutator))
}
func minerMutator(m func(*MinerDeal) error) func([]byte) ([]byte, error) {
return func(in []byte) ([]byte, error) {
var deal MinerDeal
err := cbor.DecodeInto(in, &deal)
if err != nil {
return nil, err
}
if err := m(&deal); err != nil {
return nil, err
}
return cbor.DumpObject(deal)
}
}
func (st *StateStore) MutateClient(i cid.Cid, mutator func(*ClientDeal) error) error {
return st.mutate(i, clientMutator(mutator))
}
func clientMutator(m func(*ClientDeal) error) func([]byte) ([]byte, error) {
return func(in []byte) ([]byte, error) {
var deal ClientDeal
err := cbor.DecodeInto(in, &deal)
if err != nil {
return nil, err
}
if err := m(&deal); err != nil {
return nil, err
}
return cbor.DumpObject(deal)
}
}
func (st *StateStore) mutate(i cid.Cid, mutator func([]byte) ([]byte, error)) error { func (st *StateStore) mutate(i cid.Cid, mutator func([]byte) ([]byte, error)) error {
k := datastore.NewKey(i.String()) k := datastore.NewKey(i.String())
has, err := st.ds.Has(k) has, err := st.ds.Has(k)
@ -106,3 +64,78 @@ func (st *StateStore) mutate(i cid.Cid, mutator func([]byte) ([]byte, error)) er
return st.ds.Put(k, mutated) return st.ds.Put(k, mutated)
} }
type MinerStateStore struct {
StateStore
}
func (st *MinerStateStore) MutateMiner(i cid.Cid, mutator func(*MinerDeal) error) error {
return st.mutate(i, minerMutator(mutator))
}
func minerMutator(m func(*MinerDeal) error) func([]byte) ([]byte, error) {
return func(in []byte) ([]byte, error) {
var deal MinerDeal
err := cbor.DecodeInto(in, &deal)
if err != nil {
return nil, err
}
if err := m(&deal); err != nil {
return nil, err
}
return cbor.DumpObject(deal)
}
}
type ClientStateStore struct {
StateStore
}
func (st *ClientStateStore) MutateClient(i cid.Cid, mutator func(*ClientDeal) error) error {
return st.mutate(i, clientMutator(mutator))
}
func clientMutator(m func(*ClientDeal) error) func([]byte) ([]byte, error) {
return func(in []byte) ([]byte, error) {
var deal ClientDeal
err := cbor.DecodeInto(in, &deal)
if err != nil {
return nil, err
}
if err := m(&deal); err != nil {
return nil, err
}
return cbor.DumpObject(deal)
}
}
func (st *ClientStateStore) ListClient() ([]ClientDeal, error) {
var out []ClientDeal
res, err := st.ds.Query(query.Query{})
if err != nil {
return nil, err
}
defer res.Close()
for {
res, ok := res.NextSync()
if !ok {
break
}
var deal ClientDeal
err := cbor.DecodeInto(res.Value, &deal)
if err != nil {
return nil, err
}
out = append(out, deal)
}
return out, nil
}

View File

@ -1,6 +1,7 @@
package deals package deals
import ( import (
"github.com/filecoin-project/go-lotus/api"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
cbor "github.com/ipfs/go-ipld-cbor" cbor "github.com/ipfs/go-ipld-cbor"
@ -19,6 +20,8 @@ func init() {
cbor.RegisterCborType(SignedStorageDealResponse{}) cbor.RegisterCborType(SignedStorageDealResponse{})
} }
const ProtocolID = "/fil/storage/mk/1.0.0"
type SerializationMode string type SerializationMode string
const ( const (
@ -27,19 +30,6 @@ const (
SerializationIPLD = "IPLD" SerializationIPLD = "IPLD"
) )
type DealState int
const (
Unknown = iota
Rejected
Accepted
Started
Failed
Staged
Sealing
Complete
)
type StorageDealProposal struct { type StorageDealProposal struct {
PieceRef cid.Cid // TODO: port to spec PieceRef cid.Cid // TODO: port to spec
SerializationMode SerializationMode SerializationMode SerializationMode
@ -69,16 +59,17 @@ type PieceInclusionProof struct {
} }
type StorageDealResponse struct { type StorageDealResponse struct {
State DealState State api.DealState
// Rejected / Accepted / Failed / Staged // DealRejected / DealAccepted / DealFailed / DealStaged
Message string Message string
Proposal cid.Cid Proposal cid.Cid
// Sealing // DealSealing
PieceInclusionProof PieceInclusionProof PieceInclusionProof PieceInclusionProof
CommD []byte // TODO: not in spec
// Complete // DealComplete
SectorCommitMessage *cid.Cid SectorCommitMessage *cid.Cid
} }

View File

@ -6,11 +6,10 @@ import (
"reflect" "reflect"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
cbor "github.com/ipfs/go-ipld-cbor"
cbg "github.com/whyrusleeping/cbor-gen" cbg "github.com/whyrusleeping/cbor-gen"
"golang.org/x/xerrors" "golang.org/x/xerrors"
actors "github.com/filecoin-project/go-lotus/chain/actors" "github.com/filecoin-project/go-lotus/chain/actors"
"github.com/filecoin-project/go-lotus/chain/actors/aerrors" "github.com/filecoin-project/go-lotus/chain/actors/aerrors"
"github.com/filecoin-project/go-lotus/chain/types" "github.com/filecoin-project/go-lotus/chain/types"
) )
@ -161,7 +160,12 @@ func DumpActorState(code cid.Cid, b []byte) (interface{}, error) {
} }
rv := reflect.New(typ) rv := reflect.New(typ)
if err := cbor.DecodeInto(b, rv.Interface()); err != nil { um, ok := rv.Interface().(cbg.CBORUnmarshaler)
if !ok {
return nil, xerrors.New("state type does not implement CBORUnmarshaler")
}
if err := um.UnmarshalCBOR(bytes.NewReader(b)); err != nil {
return nil, err return nil, err
} }

View File

@ -48,12 +48,12 @@ var paychCreateCmd = &cli.Command{
ctx := ReqContext(cctx) ctx := ReqContext(cctx)
addr, err := api.PaychCreate(ctx, from, to, amt) info, err := api.PaychCreate(ctx, from, to, amt)
if err != nil { if err != nil {
return err return err
} }
fmt.Println(addr.String()) fmt.Println(info.Channel.String())
return nil return nil
}, },
} }

View File

@ -48,6 +48,13 @@
display: inline-block; display: inline-block;
} }
.Client {
background: #f9be77;
user-select: text;
font-family: monospace;
display: inline-block;
}
.CristalScroll { .CristalScroll {
display: flex; display: flex;
min-width: 100%; min-width: 100%;

View File

@ -1,5 +1,20 @@
import React from 'react'; import React from 'react';
import Cristal from 'react-cristal' import Cristal from 'react-cristal'
import Address from "./Address";
const dealStates = [
"Unknown",
"Rejected",
"Accepted",
"Started",
"Failed",
"Staged",
"Sealing",
"Complete",
"Error",
"Expired"
]
class Client extends React.Component { class Client extends React.Component {
constructor(props) { constructor(props) {
@ -9,16 +24,28 @@ class Client extends React.Component {
kbs: 1, kbs: 1,
blocks: 12, blocks: 12,
total: 36000, total: 36000,
miner: "t0101" miner: "t0101",
deals: []
} }
} }
componentDidMount() {
this.getDeals()
setInterval(this.getDeals, 1325)
}
getDeals = async () => {
let deals = await this.props.client.call('Filecoin.ClientListDeals', [])
this.setState({deals})
}
update = (name) => (e) => this.setState({ [name]: e.target.value }); update = (name) => (e) => this.setState({ [name]: e.target.value });
makeDeal = async () => { makeDeal = async () => {
let file = await this.props.pondClient.call('Pond.CreateRandomFile', [this.state.kbs * 1000]) // 1024 won't fit in 1k blocks :( let file = await this.props.pondClient.call('Pond.CreateRandomFile', [this.state.kbs * 1000]) // 1024 won't fit in 1k blocks :(
let cid = await this.props.client.call('Filecoin.ClientImport', [file]) let cid = await this.props.client.call('Filecoin.ClientImport', [file])
let dealcid = await this.props.client.call('Filecoin.ClientStartDeal', [cid, this.state.miner, `${Math.round(this.state.total / this.state.blocks)}`, this.state.blocks]) let dealcid = await this.props.client.call('Filecoin.ClientStartDeal', [cid, this.state.miner, `${Math.round(this.state.total / this.state.blocks)}`, Number(this.state.blocks)])
console.log("deal cid: ", dealcid) console.log("deal cid: ", dealcid)
} }
@ -37,8 +64,23 @@ class Client extends React.Component {
<button onClick={this.makeDeal}>Deal!</button> <button onClick={this.makeDeal}>Deal!</button>
</div> </div>
let deals = this.state.deals.map((deal, i) => <div key={i}>
<ul>
<li>{i}. Proposal: {deal.ProposalCid['/'].substr(0, 18)}... <Address nobalance={true} client={this.props.client} addr={deal.Miner} mountWindow={this.props.mountWindow}/>: <b>{dealStates[deal.State]}</b>
<ul>
<li>Data: {deal.PieceRef['/']}, <b>{deal.Size}</b>B; Duration: <b>{deal.Duration}</b>Blocks</li>
<li>Total: <b>{deal.TotalPrice}</b>FIL; Per Block: <b>{Math.round(deal.TotalPrice / deal.Duration * 100) / 100}</b>FIL; PerMbyteByteBlock: <b>{Math.round(deal.TotalPrice / deal.Duration / (deal.Size / 1000000) * 100) / 100}</b>FIL</li>
</ul>
</li>
</ul>
</div>)
return <Cristal title={"Client - Node " + this.props.node.ID}> return <Cristal title={"Client - Node " + this.props.node.ID}>
<div>{dealMaker}</div> <div className="Client">
<div>{dealMaker}</div>
<div>{deals}</div>
</div>
</Cristal> </Cristal>
} }
} }

View File

@ -50,6 +50,8 @@ func (a *ChainAPI) ChainGetRandomness(ctx context.Context, pts *types.TipSet, ti
} }
func (a *ChainAPI) ChainWaitMsg(ctx context.Context, msg cid.Cid) (*api.MsgWait, error) { func (a *ChainAPI) ChainWaitMsg(ctx context.Context, msg cid.Cid) (*api.MsgWait, error) {
// TODO: consider using event system for this, expose confidence
blkcid, recpt, err := a.Chain.WaitForMessage(ctx, msg) blkcid, recpt, err := a.Chain.WaitForMessage(ctx, msg)
if err != nil { if err != nil {
return nil, err return nil, err

View File

@ -86,26 +86,14 @@ func (a *ClientAPI) ClientStartDeal(ctx context.Context, data cid.Cid, miner add
total := types.BigMul(price, types.NewInt(blocksDuration)) total := types.BigMul(price, types.NewInt(blocksDuration))
// TODO: at least ping the miner before creating paych / locking the money // TODO: at least ping the miner before creating paych / locking the money
paych, paychMsg, err := a.paychCreate(ctx, self, miner, total) extra := &types.ModVerifyParams{
if err != nil { Actor: miner,
return nil, err Method: actors.MAMethods.PaymentVerifyInclusion,
Data: voucherData,
} }
head := a.Chain.GetHeaviestTipSet() head := a.Chain.GetHeaviestTipSet()
payment, err := a.PaychNewPayment(ctx, self, miner, total, extra, head.Height()+blocksDuration, head.Height()+blocksDuration)
voucher := types.SignedVoucher{ // TODO: split into smaller payments
TimeLock: head.Height() + blocksDuration,
Extra: &types.ModVerifyParams{
Actor: miner,
Method: actors.MAMethods.PaymentVerifyInclusion,
Data: voucherData,
},
Lane: 0, // TODO: some api to make this easy
Amount: total,
MinCloseHeight: head.Height() + blocksDuration, // TODO: some way to start this after initial piece inclusion by actor? Using actors.PieceInclVoucherData?
}
sv, err := a.paychVoucherCreate(ctx, paych, voucher)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -115,10 +103,10 @@ func (a *ClientAPI) ClientStartDeal(ctx context.Context, data cid.Cid, miner add
TotalPrice: total, TotalPrice: total,
Duration: blocksDuration, Duration: blocksDuration,
Payment: actors.PaymentInfo{ Payment: actors.PaymentInfo{
PayChActor: paych, PayChActor: payment.Channel,
Payer: self, Payer: self,
ChannelMessage: paychMsg, ChannelMessage: payment.ChannelMessage,
Vouchers: []*types.SignedVoucher{sv}, Vouchers: []*types.SignedVoucher{payment.Voucher},
}, },
MinerAddress: miner, MinerAddress: miner,
ClientAddress: self, ClientAddress: self,
@ -130,6 +118,31 @@ func (a *ClientAPI) ClientStartDeal(ctx context.Context, data cid.Cid, miner add
return &c, err return &c, err
} }
func (a *ClientAPI) ClientListDeals(ctx context.Context) ([]api.DealInfo, error) {
deals, err := a.DealClient.List()
if err != nil {
return nil, err
}
out := make([]api.DealInfo, len(deals))
for k, v := range deals {
out[k] = api.DealInfo{
ProposalCid: v.ProposalCid,
State: v.State,
Miner: v.Proposal.MinerAddress,
PieceRef: v.Proposal.PieceRef,
CommP: v.Proposal.CommP,
Size: v.Proposal.Size,
TotalPrice: v.Proposal.TotalPrice,
Duration: v.Proposal.Duration,
}
}
return out, nil
}
func (a *ClientAPI) ClientHasLocal(ctx context.Context, root cid.Cid) (bool, error) { func (a *ClientAPI) ClientHasLocal(ctx context.Context, root cid.Cid) (bool, error) {
// TODO: check if we have the ENTIRE dag // TODO: check if we have the ENTIRE dag

View File

@ -25,20 +25,15 @@ type PaychAPI struct {
PaychMgr *paych.Manager PaychMgr *paych.Manager
} }
func (a *PaychAPI) PaychCreate(ctx context.Context, from, to address.Address, amt types.BigInt) (address.Address, error) { func (a *PaychAPI) PaychCreate(ctx context.Context, from, to address.Address, amt types.BigInt) (*api.ChannelInfo, error) {
act, _, err := a.paychCreate(ctx, from, to, amt)
return act, err
}
func (a *PaychAPI) paychCreate(ctx context.Context, from, to address.Address, amt types.BigInt) (address.Address, cid.Cid, error) {
params, aerr := actors.SerializeParams(&actors.PCAConstructorParams{To: to}) params, aerr := actors.SerializeParams(&actors.PCAConstructorParams{To: to})
if aerr != nil { if aerr != nil {
return address.Undef, cid.Undef, aerr return nil, aerr
} }
nonce, err := a.MpoolGetNonce(ctx, from) nonce, err := a.MpoolGetNonce(ctx, from)
if err != nil { if err != nil {
return address.Undef, cid.Undef, err return nil, err
} }
enc, err := actors.SerializeParams(&actors.ExecParams{ enc, err := actors.SerializeParams(&actors.ExecParams{
@ -57,44 +52,116 @@ func (a *PaychAPI) paychCreate(ctx context.Context, from, to address.Address, am
GasPrice: types.NewInt(0), GasPrice: types.NewInt(0),
} }
ser, err := msg.Serialize() smsg, err := a.WalletSignMessage(ctx, from, msg)
if err != nil { if err != nil {
return address.Undef, cid.Undef, err return nil, err
}
sig, err := a.WalletSign(ctx, from, ser)
if err != nil {
return address.Undef, cid.Undef, err
}
smsg := &types.SignedMessage{
Message: *msg,
Signature: *sig,
} }
if err := a.MpoolPush(ctx, smsg); err != nil { if err := a.MpoolPush(ctx, smsg); err != nil {
return address.Undef, cid.Undef, err return nil, err
} }
mwait, err := a.ChainWaitMsg(ctx, smsg.Cid()) mcid := smsg.Cid()
mwait, err := a.ChainWaitMsg(ctx, mcid)
if err != nil { if err != nil {
return address.Undef, cid.Undef, err return nil, err
} }
if mwait.Receipt.ExitCode != 0 { if mwait.Receipt.ExitCode != 0 {
return address.Undef, cid.Undef, fmt.Errorf("payment channel creation failed (exit code %d)", mwait.Receipt.ExitCode) return nil, fmt.Errorf("payment channel creation failed (exit code %d)", mwait.Receipt.ExitCode)
} }
paychaddr, err := address.NewFromBytes(mwait.Receipt.Return) paychaddr, err := address.NewFromBytes(mwait.Receipt.Return)
if err != nil { if err != nil {
return address.Undef, cid.Undef, err return nil, err
} }
if err := a.PaychMgr.TrackOutboundChannel(ctx, paychaddr); err != nil { if err := a.PaychMgr.TrackOutboundChannel(ctx, paychaddr); err != nil {
return address.Undef, cid.Undef, err return nil, err
} }
return paychaddr, msg.Cid(), nil return &api.ChannelInfo{
Channel: paychaddr,
ChannelMessage: mcid,
}, nil
}
func (a *PaychAPI) PaychNewPayment(ctx context.Context, from, to address.Address, amount types.BigInt, extra *types.ModVerifyParams, tl uint64, minClose uint64) (*api.PaymentInfo, error) {
ch, err := a.PaychMgr.OutboundChanTo(from, to)
if err != nil {
return nil, err
}
var chMsg *cid.Cid
if ch == address.Undef {
// don't have matching channel, open new
// TODO: this should be more atomic
chInfo, err := a.PaychCreate(ctx, from, to, amount)
if err != nil {
return nil, err
}
ch = chInfo.Channel
chMsg = &chInfo.ChannelMessage
} else {
// already have chanel to the destination, add funds, and open a new lane
// TODO: track free funds in channel
nonce, err := a.MpoolGetNonce(ctx, from)
if err != nil {
return nil, err
}
msg := &types.Message{
To: ch,
From: from,
Value: amount,
Nonce: nonce,
Method: 0,
GasLimit: types.NewInt(1000000),
GasPrice: types.NewInt(0),
}
smsg, err := a.WalletSignMessage(ctx, from, msg)
if err != nil {
return nil, err
}
if err := a.MpoolPush(ctx, smsg); err != nil {
return nil, err
}
mwait, err := a.ChainWaitMsg(ctx, smsg.Cid())
if err != nil {
return nil, err
}
if mwait.Receipt.ExitCode != 0 {
return nil, fmt.Errorf("voucher channel creation failed: adding funds (exit code %d)", mwait.Receipt.ExitCode)
}
}
lane, err := a.PaychMgr.AllocateLane(ch)
if err != nil {
return nil, err
}
sv, err := a.paychVoucherCreate(ctx, ch, types.SignedVoucher{
Amount: amount,
Lane: lane,
Extra: extra,
TimeLock: tl,
MinCloseHeight: minClose,
})
if err != nil {
return nil, err
}
return &api.PaymentInfo{
Channel: ch,
ChannelMessage: chMsg,
Voucher: sv,
}, nil
} }
func (a *PaychAPI) PaychList(ctx context.Context) ([]address.Address, error) { func (a *PaychAPI) PaychList(ctx context.Context) ([]address.Address, error) {
@ -107,7 +174,7 @@ func (a *PaychAPI) PaychStatus(ctx context.Context, pch address.Address) (*api.P
return nil, err return nil, err
} }
return &api.PaychStatus{ return &api.PaychStatus{
ControlAddr: ci.ControlAddr, ControlAddr: ci.Control,
Direction: api.PCHDir(ci.Direction), Direction: api.PCHDir(ci.Direction),
}, nil }, nil
} }
@ -118,14 +185,14 @@ func (a *PaychAPI) PaychClose(ctx context.Context, addr address.Address) (cid.Ci
return cid.Undef, err return cid.Undef, err
} }
nonce, err := a.MpoolGetNonce(ctx, ci.ControlAddr) nonce, err := a.MpoolGetNonce(ctx, ci.Control)
if err != nil { if err != nil {
return cid.Undef, err return cid.Undef, err
} }
msg := &types.Message{ msg := &types.Message{
To: addr, To: addr,
From: ci.ControlAddr, From: ci.Control,
Value: types.NewInt(0), Value: types.NewInt(0),
Method: actors.PCAMethods.Close, Method: actors.PCAMethods.Close,
Nonce: nonce, Nonce: nonce,
@ -134,7 +201,7 @@ func (a *PaychAPI) PaychClose(ctx context.Context, addr address.Address) (cid.Ci
GasPrice: types.NewInt(0), GasPrice: types.NewInt(0),
} }
smsg, err := a.WalletSignMessage(ctx, ci.ControlAddr, msg) smsg, err := a.WalletSignMessage(ctx, ci.Control, msg)
if err != nil { if err != nil {
return cid.Undef, err return cid.Undef, err
} }
@ -192,7 +259,7 @@ func (a *PaychAPI) paychVoucherCreate(ctx context.Context, pch address.Address,
return nil, err return nil, err
} }
sig, err := a.WalletSign(ctx, ci.ControlAddr, vb) sig, err := a.WalletSign(ctx, ci.Control, vb)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -226,7 +293,7 @@ func (a *PaychAPI) PaychVoucherSubmit(ctx context.Context, ch address.Address, s
return cid.Undef, err return cid.Undef, err
} }
nonce, err := a.MpoolGetNonce(ctx, ci.ControlAddr) nonce, err := a.MpoolGetNonce(ctx, ci.Control)
if err != nil { if err != nil {
return cid.Undef, err return cid.Undef, err
} }
@ -243,7 +310,7 @@ func (a *PaychAPI) PaychVoucherSubmit(ctx context.Context, ch address.Address, s
} }
msg := &types.Message{ msg := &types.Message{
From: ci.ControlAddr, From: ci.Control,
To: ch, To: ch,
Value: types.NewInt(0), Value: types.NewInt(0),
Nonce: nonce, Nonce: nonce,
@ -253,7 +320,7 @@ func (a *PaychAPI) PaychVoucherSubmit(ctx context.Context, ch address.Address, s
GasPrice: types.NewInt(0), GasPrice: types.NewInt(0),
} }
smsg, err := a.WalletSignMessage(ctx, ci.ControlAddr, msg) smsg, err := a.WalletSignMessage(ctx, ci.Control, msg)
if err != nil { if err != nil {
return cid.Undef, err return cid.Undef, err
} }

View File

@ -59,10 +59,12 @@ func HandleIncomingMessages(mctx helpers.MetricsCtx, lc fx.Lifecycle, pubsub *pu
go sub.HandleIncomingMessages(ctx, mpool, msgsub) go sub.HandleIncomingMessages(ctx, mpool, msgsub)
} }
func RunDealClient(lc fx.Lifecycle, c *deals.Client) { func RunDealClient(mctx helpers.MetricsCtx, lc fx.Lifecycle, c *deals.Client) {
ctx := helpers.LifecycleCtx(mctx, lc)
lc.Append(fx.Hook{ lc.Append(fx.Hook{
OnStart: func(context.Context) error { OnStart: func(context.Context) error {
c.Run() c.Run(ctx)
return nil return nil
}, },
OnStop: func(context.Context) error { OnStop: func(context.Context) error {

View File

@ -3,6 +3,8 @@ package paych
import ( import (
"context" "context"
"fmt" "fmt"
"math"
"strconv"
logging "github.com/ipfs/go-log" logging "github.com/ipfs/go-log"
@ -26,16 +28,38 @@ func NewManager(sm *stmgr.StateManager, pchstore *Store) *Manager {
} }
} }
func maxLaneFromState(st *actors.PaymentChannelActorState) (uint64, error) {
maxLane := uint64(math.MaxUint64)
for lane := range st.LaneStates {
ilane, err := strconv.ParseUint(lane, 10, 64)
if err != nil {
return 0, err
}
if ilane+1 > maxLane+1 {
maxLane = ilane
}
}
return maxLane, nil
}
func (pm *Manager) TrackInboundChannel(ctx context.Context, ch address.Address) error { func (pm *Manager) TrackInboundChannel(ctx context.Context, ch address.Address) error {
_, st, err := pm.loadPaychState(ctx, ch) _, st, err := pm.loadPaychState(ctx, ch)
if err != nil { if err != nil {
return err return err
} }
maxLane, err := maxLaneFromState(st)
if err != nil {
return err
}
return pm.store.TrackChannel(&ChannelInfo{ return pm.store.TrackChannel(&ChannelInfo{
Channel: ch, Channel: ch,
Direction: DirInbound, Control: st.To,
ControlAddr: st.To, Target: st.From,
Direction: DirInbound,
NextLane: maxLane + 1,
}) })
} }
@ -45,10 +69,18 @@ func (pm *Manager) TrackOutboundChannel(ctx context.Context, ch address.Address)
return err return err
} }
maxLane, err := maxLaneFromState(st)
if err != nil {
return err
}
return pm.store.TrackChannel(&ChannelInfo{ return pm.store.TrackChannel(&ChannelInfo{
Channel: ch, Channel: ch,
Direction: DirOutbound, Control: st.From,
ControlAddr: st.From, Target: st.To,
Direction: DirOutbound,
NextLane: maxLane + 1,
}) })
} }
@ -197,12 +229,25 @@ func (pm *Manager) AddVoucher(ctx context.Context, ch address.Address, sv *types
return pm.store.AddVoucher(ch, sv, proof) return pm.store.AddVoucher(ch, sv, proof)
} }
func (pm *Manager) AllocateLane(ch address.Address) (uint64, error) {
return pm.store.AllocateLane(ch)
}
func (pm *Manager) ListVouchers(ctx context.Context, ch address.Address) ([]*VoucherInfo, error) { func (pm *Manager) ListVouchers(ctx context.Context, ch address.Address) ([]*VoucherInfo, error) {
// TODO: just having a passthrough method like this feels odd. Seems like // TODO: just having a passthrough method like this feels odd. Seems like
// there should be some filtering we're doing here // there should be some filtering we're doing here
return pm.store.VouchersForPaych(ch) return pm.store.VouchersForPaych(ch)
} }
func (pm *Manager) OutboundChanTo(from, to address.Address) (address.Address, error) {
return pm.store.findChan(func(ci *ChannelInfo) bool {
if ci.Direction != DirOutbound {
return false
}
return ci.Control == from && ci.Target == to
})
}
func (pm *Manager) NextNonceForLane(ctx context.Context, ch address.Address, lane uint64) (uint64, error) { func (pm *Manager) NextNonceForLane(ctx context.Context, ch address.Address, lane uint64) (uint64, error) {
vouchers, err := pm.store.VouchersForPaych(ch) vouchers, err := pm.store.VouchersForPaych(ch)
if err != nil { if err != nil {

View File

@ -5,6 +5,7 @@ import (
"errors" "errors"
"fmt" "fmt"
"strings" "strings"
"sync"
"github.com/filecoin-project/go-lotus/chain/address" "github.com/filecoin-project/go-lotus/chain/address"
"github.com/filecoin-project/go-lotus/chain/types" "github.com/filecoin-project/go-lotus/chain/types"
@ -24,6 +25,8 @@ func init() {
} }
type Store struct { type Store struct {
lk sync.Mutex // TODO: this can be split per paych
ds datastore.Batching ds datastore.Batching
} }
@ -45,10 +48,13 @@ type VoucherInfo struct {
} }
type ChannelInfo struct { type ChannelInfo struct {
Channel address.Address Channel address.Address
ControlAddr address.Address Control address.Address
Direction int Target address.Address
Vouchers []*VoucherInfo
Direction int
Vouchers []*VoucherInfo
NextLane uint64
} }
func dskeyForChannel(addr address.Address) datastore.Key { func dskeyForChannel(addr address.Address) datastore.Key {
@ -86,6 +92,9 @@ func (ps *Store) getChannelInfo(addr address.Address) (*ChannelInfo, error) {
} }
func (ps *Store) TrackChannel(ch *ChannelInfo) error { func (ps *Store) TrackChannel(ch *ChannelInfo) error {
ps.lk.Lock()
defer ps.lk.Unlock()
_, err := ps.getChannelInfo(ch.Channel) _, err := ps.getChannelInfo(ch.Channel)
switch err { switch err {
default: default:
@ -98,10 +107,14 @@ func (ps *Store) TrackChannel(ch *ChannelInfo) error {
} }
func (ps *Store) ListChannels() ([]address.Address, error) { func (ps *Store) ListChannels() ([]address.Address, error) {
ps.lk.Lock()
defer ps.lk.Unlock()
res, err := ps.ds.Query(dsq.Query{KeysOnly: true}) res, err := ps.ds.Query(dsq.Query{KeysOnly: true})
if err != nil { if err != nil {
return nil, err return nil, err
} }
defer res.Close()
var out []address.Address var out []address.Address
for { for {
@ -125,7 +138,51 @@ func (ps *Store) ListChannels() ([]address.Address, error) {
return out, nil return out, nil
} }
func (ps *Store) findChan(filter func(*ChannelInfo) bool) (address.Address, error) {
ps.lk.Lock()
defer ps.lk.Unlock()
res, err := ps.ds.Query(dsq.Query{})
if err != nil {
return address.Undef, err
}
defer res.Close()
var ci ChannelInfo
for {
res, ok := res.NextSync()
if !ok {
break
}
if res.Error != nil {
return address.Undef, err
}
if err := cbor.DecodeInto(res.Value, &ci); err != nil {
return address.Undef, err
}
if !filter(&ci) {
continue
}
addr, err := address.NewFromString(strings.TrimPrefix(res.Key, "/"))
if err != nil {
return address.Undef, xerrors.Errorf("failed reading paych key (%q) from datastore: %w", res.Key, err)
}
return addr, nil
}
return address.Undef, nil
}
func (ps *Store) AddVoucher(ch address.Address, sv *types.SignedVoucher, proof []byte) error { func (ps *Store) AddVoucher(ch address.Address, sv *types.SignedVoucher, proof []byte) error {
ps.lk.Lock()
defer ps.lk.Unlock()
ci, err := ps.getChannelInfo(ch) ci, err := ps.getChannelInfo(ch)
if err != nil { if err != nil {
return err return err
@ -159,9 +216,28 @@ func (ps *Store) AddVoucher(ch address.Address, sv *types.SignedVoucher, proof [
Proof: proof, Proof: proof,
}) })
if ci.NextLane <= sv.Lane {
ci.NextLane = sv.Lane + 1
}
return ps.putChannelInfo(ci) return ps.putChannelInfo(ci)
} }
func (ps *Store) AllocateLane(ch address.Address) (uint64, error) {
ps.lk.Lock()
defer ps.lk.Unlock()
ci, err := ps.getChannelInfo(ch)
if err != nil {
return 0, err
}
out := ci.NextLane
ci.NextLane++
return out, ps.putChannelInfo(ci)
}
func (ps *Store) VouchersForPaych(ch address.Address) ([]*VoucherInfo, error) { func (ps *Store) VouchersForPaych(ch address.Address) ([]*VoucherInfo, error) {
ci, err := ps.getChannelInfo(ch) ci, err := ps.getChannelInfo(ch)
if err != nil { if err != nil {

View File

@ -198,7 +198,7 @@ func (m *Miner) getWorkerAddr(ctx context.Context) (address.Address, error) {
To: m.maddr, To: m.maddr,
From: m.maddr, // it doesnt like it if we dont give it a from... probably should fix that From: m.maddr, // it doesnt like it if we dont give it a from... probably should fix that
Method: actors.MAMethods.GetWorkerAddr, Method: actors.MAMethods.GetWorkerAddr,
Params: actors.EmptyStructCBOR, Params: nil,
} }
recpt, err := m.api.StateCall(ctx, msg, nil) recpt, err := m.api.StateCall(ctx, msg, nil)