From ccf359d057ddbf75b552b53c6f2b45edfe965bb1 Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Mon, 16 Dec 2019 19:17:46 -0800 Subject: [PATCH] feat(retrieval): extract retrievalmarket Extract retrieval market and modify shared types --- chain/deals/client.go | 8 +- go.mod | 3 +- go.sum | 7 +- node/builder.go | 6 +- node/impl/client/client.go | 15 +- node/modules/client.go | 5 +- node/modules/services.go | 9 +- node/modules/storageminer.go | 9 +- retrieval/cbor-gen/main.go | 20 -- retrieval/discovery/discovery.go | 15 - retrieval/discovery/local.go | 54 ---- retrieval/impl/cbor_gen.go | 476 ------------------------------- retrieval/impl/client.go | 389 ------------------------- retrieval/impl/provider.go | 323 --------------------- retrieval/impl/run_cbor_gen.go | 33 --- retrieval/impl/types.go | 67 ----- retrieval/impl/verify.go | 141 --------- retrieval/types.go | 364 ----------------------- retrievaladapter/client.go | 20 +- retrievaladapter/converters.go | 45 +++ retrievaladapter/provider.go | 35 ++- 21 files changed, 127 insertions(+), 1917 deletions(-) delete mode 100644 retrieval/cbor-gen/main.go delete mode 100644 retrieval/discovery/discovery.go delete mode 100644 retrieval/discovery/local.go delete mode 100644 retrieval/impl/cbor_gen.go delete mode 100644 retrieval/impl/client.go delete mode 100644 retrieval/impl/provider.go delete mode 100644 retrieval/impl/run_cbor_gen.go delete mode 100644 retrieval/impl/types.go delete mode 100644 retrieval/impl/verify.go delete mode 100644 retrieval/types.go create mode 100644 retrievaladapter/converters.go diff --git a/chain/deals/client.go b/chain/deals/client.go index 879e408e2..02c9bcce8 100644 --- a/chain/deals/client.go +++ b/chain/deals/client.go @@ -11,15 +11,15 @@ import ( "golang.org/x/xerrors" "github.com/filecoin-project/go-address" - "github.com/filecoin-project/go-cbor-util" + cborutil "github.com/filecoin-project/go-cbor-util" + "github.com/filecoin-project/go-fil-components/retrievalmarket" + "github.com/filecoin-project/go-fil-components/retrievalmarket/discovery" "github.com/filecoin-project/go-statestore" "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/chain/actors" "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/node/modules/dtypes" - retrievalmarket "github.com/filecoin-project/lotus/retrieval" - "github.com/filecoin-project/lotus/retrieval/discovery" - "github.com/filecoin-project/lotus/storagemarket" + "github.com/filecoin-project/lotus/storagemarket" ) var log = logging.Logger("deals") diff --git a/go.mod b/go.mod index ee7c4d175..665c367c4 100644 --- a/go.mod +++ b/go.mod @@ -16,6 +16,7 @@ require ( github.com/filecoin-project/go-cbor-util v0.0.0-20191219014500-08c40a1e63a2 github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03 github.com/filecoin-project/go-data-transfer v0.0.0-20191219005021-4accf56bd2ce + github.com/filecoin-project/go-fil-components v0.0.0-20200110033229-671d79221133 github.com/filecoin-project/go-paramfetch v0.0.1 github.com/filecoin-project/go-sectorbuilder v0.0.0-20200109194458-9656ce473254 github.com/filecoin-project/go-statestore v0.0.0-20200102200712-1f63c701c1e5 @@ -29,7 +30,7 @@ require ( github.com/ipfs/go-bitswap v0.1.8 github.com/ipfs/go-block-format v0.0.2 github.com/ipfs/go-blockservice v0.1.3-0.20190908200855-f22eea50656c - github.com/ipfs/go-car v0.0.2 + github.com/ipfs/go-car v0.0.3-0.20191203022317-23b0a85fd1b1 github.com/ipfs/go-cid v0.0.4 github.com/ipfs/go-datastore v0.3.1 github.com/ipfs/go-ds-badger2 v0.0.0-20200108185345-7f650e6b2521 diff --git a/go.sum b/go.sum index 589a5d544..dc1b26e42 100644 --- a/go.sum +++ b/go.sum @@ -111,6 +111,8 @@ github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03/go.mod github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03/go.mod h1:+viYnvGtUTgJRdy6oaeF4MTFKAfatX071MPDPBL11EQ= github.com/filecoin-project/go-data-transfer v0.0.0-20191219005021-4accf56bd2ce h1:Jdejrx6XVSTRy2PiX08HCU5y68p3wx2hNMJJc/J7kZY= github.com/filecoin-project/go-data-transfer v0.0.0-20191219005021-4accf56bd2ce/go.mod h1:b14UWxhxVCAjrQUYvVGrQRRsjAh79wXYejw9RbUcAww= +github.com/filecoin-project/go-fil-components v0.0.0-20200110033229-671d79221133 h1:/L916kY3hyq8w18rLO9VMSHqw25/9pwRB3nVW6b+Sm4= +github.com/filecoin-project/go-fil-components v0.0.0-20200110033229-671d79221133/go.mod h1:7M0YUI2CSVmqEmXNeXNq5L/pjk7C1Q5ifhirfMADD/k= github.com/filecoin-project/go-paramfetch v0.0.0-20200102181131-b20d579f2878 h1:YicJT9xhPzZ1SBGiJFNUCkfwqK/G9vFyY1ytKBSjNJA= github.com/filecoin-project/go-paramfetch v0.0.0-20200102181131-b20d579f2878 h1:YicJT9xhPzZ1SBGiJFNUCkfwqK/G9vFyY1ytKBSjNJA= github.com/filecoin-project/go-paramfetch v0.0.0-20200102181131-b20d579f2878/go.mod h1:40kI2Gv16mwcRsHptI3OAV4nlOEU7wVDc4RgMylNFjU= @@ -120,6 +122,7 @@ github.com/filecoin-project/go-paramfetch v0.0.1 h1:gV7bs5YaqlgpGFMiLxInGK2L1FyC github.com/filecoin-project/go-paramfetch v0.0.1/go.mod h1:fZzmf4tftbwf9S37XRifoJlz7nCjRdIrMGLR07dKLCc= github.com/filecoin-project/go-sectorbuilder v0.0.0-20200109194458-9656ce473254 h1:4IvlPad82JaNBtqh8fEAUIKWv8I3tguAJjGvUyHNZS4= github.com/filecoin-project/go-sectorbuilder v0.0.0-20200109194458-9656ce473254/go.mod h1:3OZ4E3B2OuwhJjtxR4r7hPU9bCfB+A+hm4alLEsaeDc= +github.com/filecoin-project/go-statestore v0.0.0-20191219195854-7a95521e8f15/go.mod h1:LFc9hD+fRxPqiHiaqUEZOinUJB4WARkRfNl10O7kTnI= github.com/filecoin-project/go-statestore v0.0.0-20200102200712-1f63c701c1e5 h1:NZXq90YlfakSmB2/84dGr0AVmKYFA97+yyViBIgTFbk= github.com/filecoin-project/go-statestore v0.0.0-20200102200712-1f63c701c1e5/go.mod h1:LFc9hD+fRxPqiHiaqUEZOinUJB4WARkRfNl10O7kTnI= github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I= @@ -210,8 +213,8 @@ github.com/ipfs/go-blockservice v0.0.7/go.mod h1:EOfb9k/Y878ZTRY/CH0x5+ATtaipfbR github.com/ipfs/go-blockservice v0.1.0/go.mod h1:hzmMScl1kXHg3M2BjTymbVPjv627N7sYcvYaKbop39M= github.com/ipfs/go-blockservice v0.1.3-0.20190908200855-f22eea50656c h1:lN5IQA07VtLiTLAp/Scezp1ljFhXErC6yq4O1cu+yJ0= github.com/ipfs/go-blockservice v0.1.3-0.20190908200855-f22eea50656c/go.mod h1:t+411r7psEUhLueM8C7aPA7cxCclv4O3VsUVxt9kz2I= -github.com/ipfs/go-car v0.0.2 h1:j02lzgeijorstzoMl3nQmvvb8wjJUVCiOAl8XEwYMCQ= -github.com/ipfs/go-car v0.0.2/go.mod h1:60pzeu308k5kVFHzq0HIi2kPtITgor+1ll1xuGk5JwQ= +github.com/ipfs/go-car v0.0.3-0.20191203022317-23b0a85fd1b1 h1:Nq8xEW+2KZq7IkRlkOh0rTEUI8FgunhMoLj5EMkJzbQ= +github.com/ipfs/go-car v0.0.3-0.20191203022317-23b0a85fd1b1/go.mod h1:rmd887mJxQRDfndfDEY3Liyx8gQVyfFFRSHdsnDSAlk= github.com/ipfs/go-cid v0.0.1/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM= github.com/ipfs/go-cid v0.0.2/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM= github.com/ipfs/go-cid v0.0.3/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM= diff --git a/node/builder.go b/node/builder.go index dcce895b7..d263e04fa 100644 --- a/node/builder.go +++ b/node/builder.go @@ -19,6 +19,8 @@ import ( "go.uber.org/fx" "golang.org/x/xerrors" + "github.com/filecoin-project/go-fil-components/retrievalmarket" + "github.com/filecoin-project/go-fil-components/retrievalmarket/discovery" "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/chain" "github.com/filecoin-project/lotus/chain/blocksync" @@ -43,8 +45,6 @@ import ( "github.com/filecoin-project/lotus/node/repo" "github.com/filecoin-project/lotus/paych" "github.com/filecoin-project/lotus/peermgr" - retrievalmarket "github.com/filecoin-project/lotus/retrieval" - "github.com/filecoin-project/lotus/retrieval/discovery" "github.com/filecoin-project/lotus/storage" "github.com/filecoin-project/lotus/storage/sectorblocks" "github.com/filecoin-project/lotus/storagemarket" @@ -222,7 +222,7 @@ func Online() Option { Override(RunPeerMgrKey, modules.RunPeerMgr), Override(HandleIncomingBlocksKey, modules.HandleIncomingBlocks), - Override(new(*discovery.Local), discovery.NewLocal), + Override(new(*discovery.Local), modules.NewLocalDiscovery), Override(new(retrievalmarket.PeerResolver), modules.RetrievalResolver), Override(new(retrievalmarket.RetrievalClient), modules.RetrievalClient), diff --git a/node/impl/client/client.go b/node/impl/client/client.go index 58f250f67..a8b7ff17f 100644 --- a/node/impl/client/client.go +++ b/node/impl/client/client.go @@ -25,6 +25,7 @@ import ( "go.uber.org/fx" "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-fil-components/retrievalmarket" "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain/store" @@ -32,7 +33,7 @@ import ( "github.com/filecoin-project/lotus/node/impl/full" "github.com/filecoin-project/lotus/node/impl/paych" "github.com/filecoin-project/lotus/node/modules/dtypes" - retrievalmarket "github.com/filecoin-project/lotus/retrieval" + "github.com/filecoin-project/lotus/retrievaladapter" "github.com/filecoin-project/lotus/storagemarket" ) @@ -164,7 +165,7 @@ func (a *API) ClientFindData(ctx context.Context, root cid.Cid) ([]api.QueryOffe out[k] = api.QueryOffer{ Root: root, Size: queryResponse.Size, - MinPrice: queryResponse.PieceRetrievalPrice(), + MinPrice: retrievaladapter.FromSharedTokenAmount(queryResponse.PieceRetrievalPrice()), Miner: p.Address, // TODO: check MinerPeerID: p.ID, } @@ -292,9 +293,13 @@ func (a *API) ClientRetrieve(ctx context.Context, order api.RetrievalOrder, path }) a.Retrieval.Retrieve( - ctx, order.Root.Bytes(), retrievalmarket.Params{ - PricePerByte: types.BigDiv(order.Total, types.NewInt(order.Size)), - }, order.Total, order.MinerPeerID, order.Client, order.Miner) + ctx, + order.Root.Bytes(), + retrievalmarket.NewParamsV0(types.BigDiv(order.Total, types.NewInt(order.Size)).Int, 0, 0), + retrievaladapter.ToSharedTokenAmount(order.Total), + order.MinerPeerID, + order.Client, + order.Miner) select { case <-ctx.Done(): return xerrors.New("Retrieval Timed Out") diff --git a/node/modules/client.go b/node/modules/client.go index 6047df7af..67c6322ab 100644 --- a/node/modules/client.go +++ b/node/modules/client.go @@ -6,11 +6,12 @@ import ( "path/filepath" "reflect" + "github.com/filecoin-project/go-fil-components/retrievalmarket" + retrievalimpl "github.com/filecoin-project/go-fil-components/retrievalmarket/impl" "github.com/filecoin-project/go-statestore" "github.com/filecoin-project/lotus/node/modules/helpers" "github.com/filecoin-project/lotus/paych" - retrievalmarket "github.com/filecoin-project/lotus/retrieval" - retrievalimpl "github.com/filecoin-project/lotus/retrieval/impl" + "github.com/ipfs/go-bitswap" "github.com/ipfs/go-bitswap/network" graphsync "github.com/ipfs/go-graphsync/impl" diff --git a/node/modules/services.go b/node/modules/services.go index bd0978085..bc99d49f3 100644 --- a/node/modules/services.go +++ b/node/modules/services.go @@ -8,15 +8,16 @@ import ( pubsub "github.com/libp2p/go-libp2p-pubsub" "go.uber.org/fx" + "github.com/filecoin-project/go-fil-components/retrievalmarket" + "github.com/filecoin-project/go-fil-components/retrievalmarket/discovery" "github.com/filecoin-project/lotus/chain" "github.com/filecoin-project/lotus/chain/blocksync" "github.com/filecoin-project/lotus/chain/messagepool" "github.com/filecoin-project/lotus/chain/sub" "github.com/filecoin-project/lotus/node/hello" + "github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/node/modules/helpers" "github.com/filecoin-project/lotus/peermgr" - retrievalmarket "github.com/filecoin-project/lotus/retrieval" - "github.com/filecoin-project/lotus/retrieval/discovery" "github.com/filecoin-project/lotus/storagemarket" ) @@ -81,6 +82,10 @@ func RunDealClient(mctx helpers.MetricsCtx, lc fx.Lifecycle, c storagemarket.Sto }) } +func NewLocalDiscovery(ds dtypes.MetadataDS) *discovery.Local { + return discovery.NewLocal(ds) +} + func RetrievalResolver(l *discovery.Local) retrievalmarket.PeerResolver { return discovery.Multi(l) } diff --git a/node/modules/storageminer.go b/node/modules/storageminer.go index d868414d9..876c2f368 100644 --- a/node/modules/storageminer.go +++ b/node/modules/storageminer.go @@ -25,6 +25,8 @@ import ( "github.com/filecoin-project/go-address" dtgraphsync "github.com/filecoin-project/go-data-transfer/impl/graphsync" + "github.com/filecoin-project/go-fil-components/retrievalmarket" + retrievalimpl "github.com/filecoin-project/go-fil-components/retrievalmarket/impl" "github.com/filecoin-project/go-sectorbuilder" "github.com/filecoin-project/go-statestore" "github.com/filecoin-project/lotus/api" @@ -35,8 +37,7 @@ import ( "github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/node/modules/helpers" "github.com/filecoin-project/lotus/node/repo" - retrievalmarket "github.com/filecoin-project/lotus/retrieval" - retrievalimpl "github.com/filecoin-project/lotus/retrieval/impl" + "github.com/filecoin-project/lotus/retrievaladapter" "github.com/filecoin-project/lotus/storage" "github.com/filecoin-project/lotus/storage/sectorblocks" @@ -265,6 +266,6 @@ func SealTicketGen(api api.FullNode) storage.TicketFn { // RetrievalProvider creates a new retrieval provider attached to the provider blockstore func RetrievalProvider(sblks *sectorblocks.SectorBlocks, full api.FullNode) retrievalmarket.RetrievalProvider { - adapter := retrievaladapter.NewRetrievalProviderNode(full) - return retrievalimpl.NewProvider(sblks, adapter) + adapter := retrievaladapter.NewRetrievalProviderNode(sblks, full) + return retrievalimpl.NewProvider(adapter) } diff --git a/retrieval/cbor-gen/main.go b/retrieval/cbor-gen/main.go deleted file mode 100644 index cc3fe9887..000000000 --- a/retrieval/cbor-gen/main.go +++ /dev/null @@ -1,20 +0,0 @@ -package main - -import ( - "fmt" - "os" - - retrievalimpl "github.com/filecoin-project/lotus/retrieval/impl" -) - -// main func has ONE JOB -func main() { - fmt.Print("Generating Cbor Marshal/Unmarshal...") - - if err := retrievalimpl.RunCborGen(); err != nil { - fmt.Println("Failed: ") - fmt.Println(err) - os.Exit(1) - } - fmt.Println("Done.") -} diff --git a/retrieval/discovery/discovery.go b/retrieval/discovery/discovery.go deleted file mode 100644 index 93da56b60..000000000 --- a/retrieval/discovery/discovery.go +++ /dev/null @@ -1,15 +0,0 @@ -package discovery - -import ( - cbor "github.com/ipfs/go-ipld-cbor" - - retrievalmarket "github.com/filecoin-project/lotus/retrieval" -) - -func init() { - cbor.RegisterCborType(retrievalmarket.RetrievalPeer{}) -} - -func Multi(r retrievalmarket.PeerResolver) retrievalmarket.PeerResolver { // TODO: actually support multiple mechanisms - return r -} diff --git a/retrieval/discovery/local.go b/retrieval/discovery/local.go deleted file mode 100644 index c777a0828..000000000 --- a/retrieval/discovery/local.go +++ /dev/null @@ -1,54 +0,0 @@ -package discovery - -import ( - "github.com/ipfs/go-cid" - "github.com/ipfs/go-datastore" - "github.com/ipfs/go-datastore/namespace" - dshelp "github.com/ipfs/go-ipfs-ds-help" - cbor "github.com/ipfs/go-ipld-cbor" - logging "github.com/ipfs/go-log/v2" - - "github.com/filecoin-project/lotus/node/modules/dtypes" - retrievalmarket "github.com/filecoin-project/lotus/retrieval" -) - -var log = logging.Logger("ret-discovery") - -type Local struct { - ds datastore.Datastore -} - -func NewLocal(ds dtypes.MetadataDS) *Local { - return &Local{ds: namespace.Wrap(ds, datastore.NewKey("/deals/local"))} -} - -func (l *Local) AddPeer(cid cid.Cid, peer retrievalmarket.RetrievalPeer) error { - // TODO: allow multiple peers here - // (implement an util for tracking map[thing][]otherThing, use in sectorBlockstore too) - - log.Warn("Tracking multiple retrieval peers not implemented") - - entry, err := cbor.DumpObject(peer) - if err != nil { - return err - } - - return l.ds.Put(dshelp.CidToDsKey(cid), entry) -} - -func (l *Local) GetPeers(data cid.Cid) ([]retrievalmarket.RetrievalPeer, error) { - entry, err := l.ds.Get(dshelp.CidToDsKey(data)) - if err == datastore.ErrNotFound { - return []retrievalmarket.RetrievalPeer{}, nil - } - if err != nil { - return nil, err - } - var peer retrievalmarket.RetrievalPeer - if err := cbor.DecodeInto(entry, &peer); err != nil { - return nil, err - } - return []retrievalmarket.RetrievalPeer{peer}, nil -} - -var _ retrievalmarket.PeerResolver = &Local{} diff --git a/retrieval/impl/cbor_gen.go b/retrieval/impl/cbor_gen.go deleted file mode 100644 index 220577089..000000000 --- a/retrieval/impl/cbor_gen.go +++ /dev/null @@ -1,476 +0,0 @@ -package retrievalimpl - -import ( - "fmt" - "io" - - cbg "github.com/whyrusleeping/cbor-gen" - xerrors "golang.org/x/xerrors" -) - -// Code generated by github.com/whyrusleeping/cbor-gen. DO NOT EDIT. - -var _ = xerrors.Errorf - -func (t *RetParams) MarshalCBOR(w io.Writer) error { - if t == nil { - _, err := w.Write(cbg.CborNull) - return err - } - if _, err := w.Write([]byte{129}); err != nil { - return err - } - - // t.Unixfs0 (retrievalimpl.Unixfs0Offer) (struct) - if err := t.Unixfs0.MarshalCBOR(w); err != nil { - return err - } - return nil -} - -func (t *RetParams) UnmarshalCBOR(r io.Reader) error { - br := cbg.GetPeeker(r) - - maj, extra, err := cbg.CborReadHeader(br) - if err != nil { - return err - } - if maj != cbg.MajArray { - return fmt.Errorf("cbor input should be of type array") - } - - if extra != 1 { - return fmt.Errorf("cbor input had wrong number of fields") - } - - // t.Unixfs0 (retrievalimpl.Unixfs0Offer) (struct) - - { - - pb, err := br.PeekByte() - if err != nil { - return err - } - if pb == cbg.CborNull[0] { - var nbuf [1]byte - if _, err := br.Read(nbuf[:]); err != nil { - return err - } - } else { - t.Unixfs0 = new(Unixfs0Offer) - if err := t.Unixfs0.UnmarshalCBOR(br); err != nil { - return err - } - } - - } - return nil -} - -func (t *OldQuery) MarshalCBOR(w io.Writer) error { - if t == nil { - _, err := w.Write(cbg.CborNull) - return err - } - if _, err := w.Write([]byte{129}); err != nil { - return err - } - - // t.Piece (cid.Cid) (struct) - - if err := cbg.WriteCid(w, t.Piece); err != nil { - return xerrors.Errorf("failed to write cid field t.Piece: %w", err) - } - - return nil -} - -func (t *OldQuery) UnmarshalCBOR(r io.Reader) error { - br := cbg.GetPeeker(r) - - maj, extra, err := cbg.CborReadHeader(br) - if err != nil { - return err - } - if maj != cbg.MajArray { - return fmt.Errorf("cbor input should be of type array") - } - - if extra != 1 { - return fmt.Errorf("cbor input had wrong number of fields") - } - - // t.Piece (cid.Cid) (struct) - - { - - c, err := cbg.ReadCid(br) - if err != nil { - return xerrors.Errorf("failed to read cid field t.Piece: %w", err) - } - - t.Piece = c - - } - return nil -} - -func (t *OldQueryResponse) MarshalCBOR(w io.Writer) error { - if t == nil { - _, err := w.Write(cbg.CborNull) - return err - } - if _, err := w.Write([]byte{131}); err != nil { - return err - } - - // t.Status (retrievalimpl.OldQueryResponseStatus) (uint64) - if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, uint64(t.Status))); err != nil { - return err - } - - // t.Size (uint64) (uint64) - if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, uint64(t.Size))); err != nil { - return err - } - - // t.MinPrice (types.BigInt) (struct) - if err := t.MinPrice.MarshalCBOR(w); err != nil { - return err - } - return nil -} - -func (t *OldQueryResponse) UnmarshalCBOR(r io.Reader) error { - br := cbg.GetPeeker(r) - - maj, extra, err := cbg.CborReadHeader(br) - if err != nil { - return err - } - if maj != cbg.MajArray { - return fmt.Errorf("cbor input should be of type array") - } - - if extra != 3 { - return fmt.Errorf("cbor input had wrong number of fields") - } - - // t.Status (retrievalimpl.OldQueryResponseStatus) (uint64) - - maj, extra, err = cbg.CborReadHeader(br) - if err != nil { - return err - } - if maj != cbg.MajUnsignedInt { - return fmt.Errorf("wrong type for uint64 field") - } - t.Status = OldQueryResponseStatus(extra) - // t.Size (uint64) (uint64) - - maj, extra, err = cbg.CborReadHeader(br) - if err != nil { - return err - } - if maj != cbg.MajUnsignedInt { - return fmt.Errorf("wrong type for uint64 field") - } - t.Size = uint64(extra) - // t.MinPrice (types.BigInt) (struct) - - { - - if err := t.MinPrice.UnmarshalCBOR(br); err != nil { - return err - } - - } - return nil -} - -func (t *Unixfs0Offer) MarshalCBOR(w io.Writer) error { - if t == nil { - _, err := w.Write(cbg.CborNull) - return err - } - if _, err := w.Write([]byte{130}); err != nil { - return err - } - - // t.Offset (uint64) (uint64) - if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, uint64(t.Offset))); err != nil { - return err - } - - // t.Size (uint64) (uint64) - if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, uint64(t.Size))); err != nil { - return err - } - return nil -} - -func (t *Unixfs0Offer) UnmarshalCBOR(r io.Reader) error { - br := cbg.GetPeeker(r) - - maj, extra, err := cbg.CborReadHeader(br) - if err != nil { - return err - } - if maj != cbg.MajArray { - return fmt.Errorf("cbor input should be of type array") - } - - if extra != 2 { - return fmt.Errorf("cbor input had wrong number of fields") - } - - // t.Offset (uint64) (uint64) - - maj, extra, err = cbg.CborReadHeader(br) - if err != nil { - return err - } - if maj != cbg.MajUnsignedInt { - return fmt.Errorf("wrong type for uint64 field") - } - t.Offset = uint64(extra) - // t.Size (uint64) (uint64) - - maj, extra, err = cbg.CborReadHeader(br) - if err != nil { - return err - } - if maj != cbg.MajUnsignedInt { - return fmt.Errorf("wrong type for uint64 field") - } - t.Size = uint64(extra) - return nil -} - -func (t *OldDealProposal) MarshalCBOR(w io.Writer) error { - if t == nil { - _, err := w.Write(cbg.CborNull) - return err - } - if _, err := w.Write([]byte{131}); err != nil { - return err - } - - // t.Payment (api.PaymentInfo) (struct) - if err := t.Payment.MarshalCBOR(w); err != nil { - return err - } - - // t.Ref (cid.Cid) (struct) - - if err := cbg.WriteCid(w, t.Ref); err != nil { - return xerrors.Errorf("failed to write cid field t.Ref: %w", err) - } - - // t.Params (retrievalimpl.RetParams) (struct) - if err := t.Params.MarshalCBOR(w); err != nil { - return err - } - return nil -} - -func (t *OldDealProposal) UnmarshalCBOR(r io.Reader) error { - br := cbg.GetPeeker(r) - - maj, extra, err := cbg.CborReadHeader(br) - if err != nil { - return err - } - if maj != cbg.MajArray { - return fmt.Errorf("cbor input should be of type array") - } - - if extra != 3 { - return fmt.Errorf("cbor input had wrong number of fields") - } - - // t.Payment (api.PaymentInfo) (struct) - - { - - if err := t.Payment.UnmarshalCBOR(br); err != nil { - return err - } - - } - // t.Ref (cid.Cid) (struct) - - { - - c, err := cbg.ReadCid(br) - if err != nil { - return xerrors.Errorf("failed to read cid field t.Ref: %w", err) - } - - t.Ref = c - - } - // t.Params (retrievalimpl.RetParams) (struct) - - { - - if err := t.Params.UnmarshalCBOR(br); err != nil { - return err - } - - } - return nil -} - -func (t *OldDealResponse) MarshalCBOR(w io.Writer) error { - if t == nil { - _, err := w.Write(cbg.CborNull) - return err - } - if _, err := w.Write([]byte{130}); err != nil { - return err - } - - // t.Status (uint64) (uint64) - if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, uint64(t.Status))); err != nil { - return err - } - - // t.Message (string) (string) - if len(t.Message) > cbg.MaxLength { - return xerrors.Errorf("Value in field t.Message was too long") - } - - if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajTextString, uint64(len(t.Message)))); err != nil { - return err - } - if _, err := w.Write([]byte(t.Message)); err != nil { - return err - } - return nil -} - -func (t *OldDealResponse) UnmarshalCBOR(r io.Reader) error { - br := cbg.GetPeeker(r) - - maj, extra, err := cbg.CborReadHeader(br) - if err != nil { - return err - } - if maj != cbg.MajArray { - return fmt.Errorf("cbor input should be of type array") - } - - if extra != 2 { - return fmt.Errorf("cbor input had wrong number of fields") - } - - // t.Status (uint64) (uint64) - - maj, extra, err = cbg.CborReadHeader(br) - if err != nil { - return err - } - if maj != cbg.MajUnsignedInt { - return fmt.Errorf("wrong type for uint64 field") - } - t.Status = uint64(extra) - // t.Message (string) (string) - - { - sval, err := cbg.ReadString(br) - if err != nil { - return err - } - - t.Message = string(sval) - } - return nil -} - -func (t *Block) MarshalCBOR(w io.Writer) error { - if t == nil { - _, err := w.Write(cbg.CborNull) - return err - } - if _, err := w.Write([]byte{130}); err != nil { - return err - } - - // t.Prefix ([]uint8) (slice) - if len(t.Prefix) > cbg.ByteArrayMaxLen { - return xerrors.Errorf("Byte array in field t.Prefix was too long") - } - - if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajByteString, uint64(len(t.Prefix)))); err != nil { - return err - } - if _, err := w.Write(t.Prefix); err != nil { - return err - } - - // t.Data ([]uint8) (slice) - if len(t.Data) > cbg.ByteArrayMaxLen { - return xerrors.Errorf("Byte array in field t.Data was too long") - } - - if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajByteString, uint64(len(t.Data)))); err != nil { - return err - } - if _, err := w.Write(t.Data); err != nil { - return err - } - return nil -} - -func (t *Block) UnmarshalCBOR(r io.Reader) error { - br := cbg.GetPeeker(r) - - maj, extra, err := cbg.CborReadHeader(br) - if err != nil { - return err - } - if maj != cbg.MajArray { - return fmt.Errorf("cbor input should be of type array") - } - - if extra != 2 { - return fmt.Errorf("cbor input had wrong number of fields") - } - - // t.Prefix ([]uint8) (slice) - - maj, extra, err = cbg.CborReadHeader(br) - if err != nil { - return err - } - - if extra > cbg.ByteArrayMaxLen { - return fmt.Errorf("t.Prefix: byte array too large (%d)", extra) - } - if maj != cbg.MajByteString { - return fmt.Errorf("expected byte array") - } - t.Prefix = make([]byte, extra) - if _, err := io.ReadFull(br, t.Prefix); err != nil { - return err - } - // t.Data ([]uint8) (slice) - - maj, extra, err = cbg.CborReadHeader(br) - if err != nil { - return err - } - - if extra > cbg.ByteArrayMaxLen { - return fmt.Errorf("t.Data: byte array too large (%d)", extra) - } - if maj != cbg.MajByteString { - return fmt.Errorf("expected byte array") - } - t.Data = make([]byte, extra) - if _, err := io.ReadFull(br, t.Data); err != nil { - return err - } - return nil -} diff --git a/retrieval/impl/client.go b/retrieval/impl/client.go deleted file mode 100644 index 5087a893d..000000000 --- a/retrieval/impl/client.go +++ /dev/null @@ -1,389 +0,0 @@ -package retrievalimpl - -import ( - "context" - "reflect" - "sync" - - blocks "github.com/ipfs/go-block-format" - "github.com/ipfs/go-cid" - blockstore "github.com/ipfs/go-ipfs-blockstore" - logging "github.com/ipfs/go-log/v2" - "github.com/libp2p/go-libp2p-core/host" - "github.com/libp2p/go-libp2p-core/network" - "github.com/libp2p/go-libp2p-core/peer" - cbg "github.com/whyrusleeping/cbor-gen" - "golang.org/x/xerrors" - - cborutil "github.com/filecoin-project/go-cbor-util" - "github.com/filecoin-project/lotus/api" - "github.com/filecoin-project/lotus/build" - "github.com/filecoin-project/lotus/chain/types" - retrievalmarket "github.com/filecoin-project/lotus/retrieval" -) - -var log = logging.Logger("retrieval") - -type client struct { - h host.Host - bs blockstore.Blockstore - node retrievalmarket.RetrievalClientNode - // The parameters should be replaced by RetrievalClientNode - - nextDealLk sync.Mutex - nextDealID retrievalmarket.DealID - subscribersLk sync.RWMutex - subscribers []retrievalmarket.ClientSubscriber -} - -// NewClient creates a new retrieval client -func NewClient(h host.Host, bs blockstore.Blockstore, node retrievalmarket.RetrievalClientNode) retrievalmarket.RetrievalClient { - return &client{h: h, bs: bs, node: node} -} - -// V0 - -// TODO: Implement for retrieval provider V0 epic -// https://github.com/filecoin-project/go-retrieval-market-project/issues/12 -func (c *client) FindProviders(pieceCID []byte) []retrievalmarket.RetrievalPeer { - panic("not implemented") -} - -// TODO: Update to match spec for V0 epic -// https://github.com/filecoin-project/go-retrieval-market-project/issues/8 -func (c *client) Query(ctx context.Context, p retrievalmarket.RetrievalPeer, pieceCID []byte, params retrievalmarket.QueryParams) (retrievalmarket.QueryResponse, error) { - cid, err := cid.Cast(pieceCID) - if err != nil { - log.Warn(err) - return retrievalmarket.QueryResponseUndefined, err - } - - s, err := c.h.NewStream(ctx, p.ID, retrievalmarket.QueryProtocolID) - if err != nil { - log.Warn(err) - return retrievalmarket.QueryResponseUndefined, err - } - defer s.Close() - - err = cborutil.WriteCborRPC(s, &OldQuery{ - Piece: cid, - }) - if err != nil { - log.Warn(err) - return retrievalmarket.QueryResponseUndefined, err - } - - var oldResp OldQueryResponse - if err := oldResp.UnmarshalCBOR(s); err != nil { - log.Warn(err) - return retrievalmarket.QueryResponseUndefined, err - } - - resp := retrievalmarket.QueryResponse{ - Status: retrievalmarket.QueryResponseStatus(oldResp.Status), - Size: oldResp.Size, - MinPricePerByte: types.BigDiv(oldResp.MinPrice, types.NewInt(oldResp.Size)), - } - return resp, nil -} - -// TODO: Update to match spec for V0 Epic: -// https://github.com/filecoin-project/go-retrieval-market-project/issues/9 -func (c *client) Retrieve(ctx context.Context, pieceCID []byte, params retrievalmarket.Params, totalFunds types.BigInt, miner peer.ID, clientWallet retrievalmarket.Address, minerWallet retrievalmarket.Address) retrievalmarket.DealID { - /* The implementation of this function is just wrapper for the old code which retrieves UnixFS pieces - -- it will be replaced when we do the V0 implementation of the module */ - c.nextDealLk.Lock() - c.nextDealID++ - dealID := c.nextDealID - c.nextDealLk.Unlock() - - dealState := retrievalmarket.ClientDealState{ - DealProposal: retrievalmarket.DealProposal{ - PieceCID: pieceCID, - ID: dealID, - Params: params, - }, - Status: retrievalmarket.DealStatusFailed, - Sender: miner, - } - - go func() { - evt := retrievalmarket.ClientEventError - converted, err := cid.Cast(pieceCID) - - if err == nil { - err = c.retrieveUnixfs(ctx, converted, types.BigDiv(totalFunds, params.PricePerByte).Uint64(), totalFunds, miner, clientWallet, minerWallet) - if err == nil { - evt = retrievalmarket.ClientEventComplete - dealState.Status = retrievalmarket.DealStatusCompleted - } - } - - c.notifySubscribers(evt, dealState) - }() - - return dealID -} - -// unsubscribeAt returns a function that removes an item from the subscribers list by comparing -// their reflect.ValueOf before pulling the item out of the slice. Does not preserve order. -// Subsequent, repeated calls to the func with the same Subscriber are a no-op. -func (c *client) unsubscribeAt(sub retrievalmarket.ClientSubscriber) retrievalmarket.Unsubscribe { - return func() { - c.subscribersLk.Lock() - defer c.subscribersLk.Unlock() - curLen := len(c.subscribers) - for i, el := range c.subscribers { - if reflect.ValueOf(sub) == reflect.ValueOf(el) { - c.subscribers[i] = c.subscribers[curLen-1] - c.subscribers = c.subscribers[:curLen-1] - return - } - } - } -} - -func (c *client) notifySubscribers(evt retrievalmarket.ClientEvent, ds retrievalmarket.ClientDealState) { - c.subscribersLk.RLock() - defer c.subscribersLk.RUnlock() - for _, cb := range c.subscribers { - cb(evt, ds) - } -} - -func (c *client) SubscribeToEvents(subscriber retrievalmarket.ClientSubscriber) retrievalmarket.Unsubscribe { - c.subscribersLk.Lock() - c.subscribers = append(c.subscribers, subscriber) - c.subscribersLk.Unlock() - - return c.unsubscribeAt(subscriber) -} - -// V1 -func (c *client) AddMoreFunds(id retrievalmarket.DealID, amount types.BigInt) error { - panic("not implemented") -} - -func (c *client) CancelDeal(id retrievalmarket.DealID) error { - panic("not implemented") -} - -func (c *client) RetrievalStatus(id retrievalmarket.DealID) { - panic("not implemented") -} - -func (c *client) ListDeals() map[retrievalmarket.DealID]retrievalmarket.ClientDealState { - panic("not implemented") -} - -type clientStream struct { - node retrievalmarket.RetrievalClientNode - stream network.Stream - peeker cbg.BytePeeker - - root cid.Cid - size types.BigInt - offset uint64 - - paych retrievalmarket.Address - lane uint64 - total types.BigInt - transferred types.BigInt - - windowSize uint64 // how much we "trust" the peer - verifier BlockVerifier - bs blockstore.Blockstore -} - -/* This is the old retrieval code that is NOT spec compliant */ - -// C > S -// -// Offset MUST be aligned on chunking boundaries, size is rounded up to leaf size -// -// > DealProposal{Mode: Unixfs0, RootCid, Offset, Size, Payment(nil if free)} -// < Resp{Accept} -// < ..(Intermediate Block) -// < ..Blocks -// < ..(Intermediate Block) -// < ..Blocks -// > DealProposal(...) -// < ... -func (c *client) retrieveUnixfs(ctx context.Context, root cid.Cid, size uint64, total types.BigInt, miner peer.ID, client, minerAddr retrievalmarket.Address) error { - s, err := c.h.NewStream(ctx, miner, retrievalmarket.ProtocolID) - if err != nil { - return err - } - defer s.Close() - - initialOffset := uint64(0) // TODO: Check how much data we have locally - // TODO: Support in handler - // TODO: Allow client to specify this - - paych, err := c.node.GetOrCreatePaymentChannel(ctx, client, minerAddr, total) - if err != nil { - return xerrors.Errorf("getting payment channel: %w", err) - } - lane, err := c.node.AllocateLane(paych) - if err != nil { - return xerrors.Errorf("allocating payment lane: %w", err) - } - - cst := clientStream{ - node: c.node, - stream: s, - peeker: cbg.GetPeeker(s), - - root: root, - size: types.NewInt(size), - offset: initialOffset, - - paych: paych, - lane: lane, - total: total, - transferred: types.NewInt(0), - - windowSize: build.UnixfsChunkSize, - verifier: &UnixFs0Verifier{Root: root}, - bs: c.bs, - } - - for cst.offset != size+initialOffset { - toFetch := cst.windowSize - if toFetch+cst.offset > size { - toFetch = size - cst.offset - } - log.Infof("Retrieve %dB @%d", toFetch, cst.offset) - - err := cst.doOneExchange(ctx, toFetch) - if err != nil { - return xerrors.Errorf("retrieval exchange: %w", err) - } - - cst.offset += toFetch - } - log.Info("RETRIEVE SUCCESSFUL") - return nil -} - -func (cst *clientStream) doOneExchange(ctx context.Context, toFetch uint64) error { - payAmount := types.BigDiv(types.BigMul(cst.total, types.NewInt(toFetch)), cst.size) - - payment, err := cst.setupPayment(ctx, payAmount) - if err != nil { - return xerrors.Errorf("setting up retrieval payment: %w", err) - } - - deal := &OldDealProposal{ - Payment: payment, - Ref: cst.root, - Params: RetParams{ - Unixfs0: &Unixfs0Offer{ - Offset: cst.offset, - Size: toFetch, - }, - }, - } - - if err := cborutil.WriteCborRPC(cst.stream, deal); err != nil { - return err - } - - var resp OldDealResponse - if err := cborutil.ReadCborRPC(cst.peeker, &resp); err != nil { - log.Error(err) - return err - } - - if resp.Status != Accepted { - cst.windowSize = build.UnixfsChunkSize - // TODO: apply some 'penalty' to miner 'reputation' (needs to be the same in both cases) - - if resp.Status == Error { - return xerrors.Errorf("storage deal error: %s", resp.Message) - } - if resp.Status == Rejected { - return xerrors.Errorf("storage deal rejected: %s", resp.Message) - } - return xerrors.New("storage deal response had no Accepted section") - } - - log.Info("Retrieval accepted, fetching blocks") - - return cst.fetchBlocks(toFetch) - - // TODO: maybe increase miner window size after success -} - -func (cst *clientStream) fetchBlocks(toFetch uint64) error { - blocksToFetch := (toFetch + build.UnixfsChunkSize - 1) / build.UnixfsChunkSize - - for i := uint64(0); i < blocksToFetch; { - log.Infof("block %d of %d", i+1, blocksToFetch) - - var block Block - if err := cborutil.ReadCborRPC(cst.peeker, &block); err != nil { - return xerrors.Errorf("reading fetchBlock response: %w", err) - } - - dataBlocks, err := cst.consumeBlockMessage(block) - if err != nil { - return xerrors.Errorf("consuming retrieved blocks: %w", err) - } - - i += dataBlocks - } - - return nil -} - -func (cst *clientStream) consumeBlockMessage(block Block) (uint64, error) { - prefix, err := cid.PrefixFromBytes(block.Prefix) - if err != nil { - return 0, err - } - - cid, err := prefix.Sum(block.Data) - - blk, err := blocks.NewBlockWithCid(block.Data, cid) - if err != nil { - return 0, err - } - - internal, err := cst.verifier.Verify(context.TODO(), blk) - if err != nil { - log.Warnf("block verify failed: %s", err) - return 0, err - } - - // TODO: Smarter out, maybe add to filestore automagically - // (Also, persist intermediate nodes) - err = cst.bs.Put(blk) - if err != nil { - log.Warnf("block write failed: %s", err) - return 0, err - } - - if internal { - return 0, nil - } - - return 1, nil -} - -func (cst *clientStream) setupPayment(ctx context.Context, toSend types.BigInt) (api.PaymentInfo, error) { - amount := types.BigAdd(cst.transferred, toSend) - - sv, err := cst.node.CreatePaymentVoucher(ctx, cst.paych, amount, cst.lane) - if err != nil { - return api.PaymentInfo{}, err - } - - cst.transferred = amount - - return api.PaymentInfo{ - Channel: cst.paych, - ChannelMessage: nil, - Vouchers: []*types.SignedVoucher{sv}, - }, nil -} diff --git a/retrieval/impl/provider.go b/retrieval/impl/provider.go deleted file mode 100644 index 9e7254b60..000000000 --- a/retrieval/impl/provider.go +++ /dev/null @@ -1,323 +0,0 @@ -package retrievalimpl - -import ( - "context" - "io" - "reflect" - "sync" - - "github.com/ipfs/go-blockservice" - "github.com/ipfs/go-cid" - "github.com/ipfs/go-merkledag" - unixfile "github.com/ipfs/go-unixfs/file" - "github.com/libp2p/go-libp2p-core/host" - "github.com/libp2p/go-libp2p-core/network" - "golang.org/x/xerrors" - - "github.com/filecoin-project/go-address" - cborutil "github.com/filecoin-project/go-cbor-util" - "github.com/filecoin-project/lotus/build" - "github.com/filecoin-project/lotus/chain/types" - retrievalmarket "github.com/filecoin-project/lotus/retrieval" - "github.com/filecoin-project/lotus/storage/sectorblocks" -) - -// RetrMinerAPI are the node functions needed by a retrieval provider -type RetrMinerAPI interface { - PaychVoucherAdd(context.Context, address.Address, *types.SignedVoucher, []byte, types.BigInt) (types.BigInt, error) -} - -type provider struct { - - // TODO: Replace with RetrievalProviderNode & FileStore for https://github.com/filecoin-project/go-retrieval-market-project/issues/9 - sectorBlocks *sectorblocks.SectorBlocks - - // TODO: Replace with RetrievalProviderNode for - // https://github.com/filecoin-project/go-retrieval-market-project/issues/4 - node retrievalmarket.RetrievalProviderNode - - pricePerByte retrievalmarket.BigInt - - subscribersLk sync.RWMutex - subscribers []retrievalmarket.ProviderSubscriber -} - -// NewProvider returns a new retrieval provider -func NewProvider(sblks *sectorblocks.SectorBlocks, node retrievalmarket.RetrievalProviderNode) retrievalmarket.RetrievalProvider { - return &provider{ - sectorBlocks: sblks, - node: node, - - pricePerByte: types.NewInt(2), // TODO: allow setting - } -} - -// Start begins listening for deals on the given host -func (p *provider) Start(host host.Host) { - host.SetStreamHandler(retrievalmarket.QueryProtocolID, p.handleQueryStream) - host.SetStreamHandler(retrievalmarket.ProtocolID, p.handleDealStream) -} - -// V0 -// SetPricePerByte sets the price per byte a miner charges for retrievals -func (p *provider) SetPricePerByte(price retrievalmarket.BigInt) { - p.pricePerByte = price -} - -// SetPaymentInterval sets the maximum number of bytes a a provider will send before -// requesting further payment, and the rate at which that value increases -// TODO: Implement for https://github.com/filecoin-project/go-retrieval-market-project/issues/7 -func (p *provider) SetPaymentInterval(paymentInterval uint64, paymentIntervalIncrease uint64) { - panic("not implemented") -} - -// unsubscribeAt returns a function that removes an item from the subscribers list by comparing -// their reflect.ValueOf before pulling the item out of the slice. Does not preserve order. -// Subsequent, repeated calls to the func with the same Subscriber are a no-op. -func (p *provider) unsubscribeAt(sub retrievalmarket.ProviderSubscriber) retrievalmarket.Unsubscribe { - return func() { - p.subscribersLk.Lock() - defer p.subscribersLk.Unlock() - curLen := len(p.subscribers) - for i, el := range p.subscribers { - if reflect.ValueOf(sub) == reflect.ValueOf(el) { - p.subscribers[i] = p.subscribers[curLen-1] - p.subscribers = p.subscribers[:curLen-1] - return - } - } - } -} - -func (p *provider) notifySubscribers(evt retrievalmarket.ProviderEvent, ds retrievalmarket.ProviderDealState) { - p.subscribersLk.RLock() - defer p.subscribersLk.RUnlock() - for _, cb := range p.subscribers { - cb(evt, ds) - } -} - -// SubscribeToEvents listens for events that happen related to client retrievals -// TODO: Implement updates as part of https://github.com/filecoin-project/go-retrieval-market-project/issues/7 -func (p *provider) SubscribeToEvents(subscriber retrievalmarket.ProviderSubscriber) retrievalmarket.Unsubscribe { - p.subscribersLk.Lock() - p.subscribers = append(p.subscribers, subscriber) - p.subscribersLk.Unlock() - - return p.unsubscribeAt(subscriber) -} - -// V1 -func (p *provider) SetPricePerUnseal(price retrievalmarket.BigInt) { - panic("not implemented") -} - -func (p *provider) ListDeals() map[retrievalmarket.ProviderDealID]retrievalmarket.ProviderDealState { - panic("not implemented") -} - -func writeErr(stream network.Stream, err error) { - log.Errorf("Retrieval deal error: %+v", err) - _ = cborutil.WriteCborRPC(stream, &OldDealResponse{ - Status: Error, - Message: err.Error(), - }) -} - -// TODO: Update for https://github.com/filecoin-project/go-retrieval-market-project/issues/8 -func (p *provider) handleQueryStream(stream network.Stream) { - defer stream.Close() - - var query OldQuery - if err := cborutil.ReadCborRPC(stream, &query); err != nil { - writeErr(stream, err) - return - } - - size, err := p.sectorBlocks.GetSize(query.Piece) - if err != nil && err != sectorblocks.ErrNotFound { - log.Errorf("Retrieval query: GetRefs: %s", err) - return - } - - answer := &OldQueryResponse{ - Status: Unavailable, - } - if err == nil { - answer.Status = Available - - // TODO: get price, look for already unsealed ref to reduce work - answer.MinPrice = types.BigMul(types.NewInt(uint64(size)), p.pricePerByte) - answer.Size = uint64(size) // TODO: verify on intermediate - } - - if err := cborutil.WriteCborRPC(stream, answer); err != nil { - log.Errorf("Retrieval query: WriteCborRPC: %s", err) - return - } -} - -type handlerDeal struct { - p *provider - stream network.Stream - - ufsr sectorblocks.UnixfsReader - open cid.Cid - at uint64 - size uint64 -} - -// TODO: Update for https://github.com/filecoin-project/go-retrieval-market-project/issues/7 -func (p *provider) handleDealStream(stream network.Stream) { - defer stream.Close() - - hnd := &handlerDeal{ - p: p, - - stream: stream, - } - - var err error - more := true - - for more { - more, err = hnd.handleNext() // TODO: 'more' bool - if err != nil { - writeErr(stream, err) - return - } - } - -} - -func (hnd *handlerDeal) handleNext() (bool, error) { - var deal OldDealProposal - if err := cborutil.ReadCborRPC(hnd.stream, &deal); err != nil { - if err == io.EOF { // client sent all deals - err = nil - } - return false, err - } - - if deal.Params.Unixfs0 == nil { - return false, xerrors.New("unknown deal type") - } - - unixfs0 := deal.Params.Unixfs0 - - if len(deal.Payment.Vouchers) != 1 { - return false, xerrors.Errorf("expected one signed voucher, got %d", len(deal.Payment.Vouchers)) - } - - expPayment := types.BigMul(hnd.p.pricePerByte, types.NewInt(deal.Params.Unixfs0.Size)) - if _, err := hnd.p.node.SavePaymentVoucher(context.TODO(), deal.Payment.Channel, deal.Payment.Vouchers[0], nil, expPayment); err != nil { - return false, xerrors.Errorf("processing retrieval payment: %w", err) - } - - // If the file isn't open (new deal stream), isn't the right file, or isn't - // at the right offset, (re)open it - if hnd.open != deal.Ref || hnd.at != unixfs0.Offset { - log.Infof("opening file for sending (open '%s') (@%d, want %d)", deal.Ref, hnd.at, unixfs0.Offset) - if err := hnd.openFile(deal); err != nil { - return false, err - } - } - - if unixfs0.Offset+unixfs0.Size > hnd.size { - return false, xerrors.Errorf("tried to read too much %d+%d > %d", unixfs0.Offset, unixfs0.Size, hnd.size) - } - - err := hnd.accept(deal) - if err != nil { - return false, err - } - return true, nil -} - -func (hnd *handlerDeal) openFile(deal OldDealProposal) error { - unixfs0 := deal.Params.Unixfs0 - - if unixfs0.Offset != 0 { - // TODO: Implement SeekBlock (like ReadBlock) in go-unixfs - return xerrors.New("sending merkle proofs for nonzero offset not supported yet") - } - hnd.at = unixfs0.Offset - - bstore := hnd.p.sectorBlocks.SealedBlockstore(func() error { - return nil // TODO: approve unsealing based on amount paid - }) - - ds := merkledag.NewDAGService(blockservice.New(bstore, nil)) - rootNd, err := ds.Get(context.TODO(), deal.Ref) - if err != nil { - return err - } - - fsr, err := unixfile.NewUnixfsFile(context.TODO(), ds, rootNd) - if err != nil { - return err - } - - var ok bool - hnd.ufsr, ok = fsr.(sectorblocks.UnixfsReader) - if !ok { - return xerrors.Errorf("file %s didn't implement sectorblocks.UnixfsReader", deal.Ref) - } - - isize, err := hnd.ufsr.Size() - if err != nil { - return err - } - hnd.size = uint64(isize) - - hnd.open = deal.Ref - - return nil -} - -func (hnd *handlerDeal) accept(deal OldDealProposal) error { - unixfs0 := deal.Params.Unixfs0 - - resp := &OldDealResponse{ - Status: Accepted, - } - if err := cborutil.WriteCborRPC(hnd.stream, resp); err != nil { - log.Errorf("Retrieval query: Write Accepted resp: %s", err) - return err - } - - blocksToSend := (unixfs0.Size + build.UnixfsChunkSize - 1) / build.UnixfsChunkSize - for i := uint64(0); i < blocksToSend; { - data, offset, nd, err := hnd.ufsr.ReadBlock(context.TODO()) - if err != nil { - return err - } - - log.Infof("sending block for a deal: %s", nd.Cid()) - - if offset != unixfs0.Offset { - return xerrors.Errorf("ReadBlock on wrong offset: want %d, got %d", unixfs0.Offset, offset) - } - - /*if uint64(len(data)) != deal.Unixfs0.Size { // TODO: Fix for internal nodes (and any other node too) - writeErr(stream, xerrors.Errorf("ReadBlock data with wrong size: want %d, got %d", deal.Unixfs0.Size, len(data))) - return - }*/ - - block := &Block{ - Prefix: nd.Cid().Prefix().Bytes(), - Data: nd.RawData(), - } - - if err := cborutil.WriteCborRPC(hnd.stream, block); err != nil { - return err - } - - if len(data) > 0 { // don't count internal nodes - hnd.at += uint64(len(data)) - i++ - } - } - - return nil -} diff --git a/retrieval/impl/run_cbor_gen.go b/retrieval/impl/run_cbor_gen.go deleted file mode 100644 index 8dbd6f19c..000000000 --- a/retrieval/impl/run_cbor_gen.go +++ /dev/null @@ -1,33 +0,0 @@ -package retrievalimpl - -import ( - "fmt" - "os" - - cborgen "github.com/whyrusleeping/cbor-gen" -) - -func RunCborGen() error { - genName := "./impl/cbor_gen.go" - reName := "./impl/cbor_gen_old.go" - if err := os.Rename(genName, reName); err != nil { - return fmt.Errorf("could not rename %s to %s", genName, reName) - } - if err := cborgen.WriteTupleEncodersToFile( - genName, - "retrievalimpl", - RetParams{}, - OldQuery{}, - OldQueryResponse{}, - Unixfs0Offer{}, - OldDealProposal{}, - OldDealResponse{}, - Block{}, - ); err != nil { - return err - } - if err := os.Remove(reName); err != nil { - return err - } - return nil -} diff --git a/retrieval/impl/types.go b/retrieval/impl/types.go deleted file mode 100644 index 3af74f993..000000000 --- a/retrieval/impl/types.go +++ /dev/null @@ -1,67 +0,0 @@ -package retrievalimpl - -import ( - "github.com/filecoin-project/lotus/api" - "github.com/ipfs/go-cid" - - "github.com/filecoin-project/lotus/chain/types" -) - -/* These types are all the types provided by Lotus, which diverge even from -spec V0 -- prior to the "update to spec epic", we are using these types internally -and switching to spec at the boundaries of the module */ - -type OldQueryResponseStatus uint64 - -const ( - Available OldQueryResponseStatus = iota - Unavailable -) - -const ( - Accepted = iota - Error - Rejected - Unsealing -) - -type OldQuery struct { - Piece cid.Cid - // TODO: payment -} - -type OldQueryResponse struct { - Status OldQueryResponseStatus - - Size uint64 // TODO: spec - // TODO: unseal price (+spec) - // TODO: sectors to unseal - // TODO: address to send money for the deal? - MinPrice types.BigInt -} - -type Unixfs0Offer struct { - Offset uint64 - Size uint64 -} - -type RetParams struct { - Unixfs0 *Unixfs0Offer -} - -type OldDealProposal struct { - Payment api.PaymentInfo - - Ref cid.Cid - Params RetParams -} - -type OldDealResponse struct { - Status uint64 - Message string -} - -type Block struct { // TODO: put in spec - Prefix []byte // TODO: fix cid.Prefix marshaling somehow - Data []byte -} diff --git a/retrieval/impl/verify.go b/retrieval/impl/verify.go deleted file mode 100644 index 892876ef5..000000000 --- a/retrieval/impl/verify.go +++ /dev/null @@ -1,141 +0,0 @@ -package retrievalimpl - -import ( - "context" - - blocks "github.com/ipfs/go-block-format" - "github.com/ipfs/go-cid" - ipld "github.com/ipfs/go-ipld-format" - "github.com/ipfs/go-merkledag" - "github.com/ipfs/go-unixfs" - pb "github.com/ipfs/go-unixfs/pb" - "golang.org/x/xerrors" - - "github.com/filecoin-project/lotus/build" -) - -type BlockVerifier interface { - Verify(context.Context, blocks.Block) (internal bool, err error) -} - -type OptimisticVerifier struct { -} - -func (o *OptimisticVerifier) Verify(context.Context, blocks.Block) (bool, error) { - // It's probably fine - return false, nil -} - -type UnixFs0Verifier struct { - Root cid.Cid - rootBlk blocks.Block - - expect int - seen int - - sub *UnixFs0Verifier -} - -func (b *UnixFs0Verifier) verify(ctx context.Context, blk blocks.Block) (last bool, internal bool, err error) { - if b.sub != nil { - // TODO: check links here (iff b.sub.sub == nil) - - subLast, internal, err := b.sub.verify(ctx, blk) - if err != nil { - return false, false, err - } - if subLast { - b.sub = nil - b.seen++ - } - - return b.seen == b.expect, internal, nil - } - - if b.seen >= b.expect { // this is probably impossible - return false, false, xerrors.New("unixfs verifier: too many nodes in level") - } - - links, err := b.checkInternal(blk) - if err != nil { - return false, false, err - } - - if links > 0 { // TODO: check if all links are intermediate (or all aren't) - if links > build.UnixfsLinksPerLevel { - return false, false, xerrors.New("unixfs verifier: too many links in intermediate node") - } - - if b.seen+1 == b.expect && links != build.UnixfsLinksPerLevel { - return false, false, xerrors.New("unixfs verifier: too few nodes in level") - } - - b.sub = &UnixFs0Verifier{ - Root: blk.Cid(), - rootBlk: blk, - expect: links, - } - - // don't mark as seen yet - return false, true, nil - } - - b.seen++ - return b.seen == b.expect, false, nil -} - -func (b *UnixFs0Verifier) checkInternal(blk blocks.Block) (int, error) { - nd, err := ipld.Decode(blk) - if err != nil { - log.Warnf("IPLD Decode failed: %s", err) - return 0, err - } - - // TODO: check size - switch nd := nd.(type) { - case *merkledag.ProtoNode: - fsn, err := unixfs.FSNodeFromBytes(nd.Data()) - if err != nil { - log.Warnf("unixfs.FSNodeFromBytes failed: %s", err) - return 0, err - } - if fsn.Type() != pb.Data_File { - return 0, xerrors.New("internal nodes must be a file") - } - if len(fsn.Data()) > 0 { - return 0, xerrors.New("internal node with data") - } - if len(nd.Links()) == 0 { - return 0, xerrors.New("internal node with no links") - } - return len(nd.Links()), nil - - case *merkledag.RawNode: - return 0, nil - default: - return 0, xerrors.New("verifier: unknown node type") - } -} - -func (b *UnixFs0Verifier) Verify(ctx context.Context, blk blocks.Block) (bool, error) { - // root is special - if b.rootBlk == nil { - if !b.Root.Equals(blk.Cid()) { - return false, xerrors.Errorf("unixfs verifier: root block CID didn't match: valid %s, got %s", b.Root, blk.Cid()) - } - b.rootBlk = blk - links, err := b.checkInternal(blk) - if err != nil { - return false, err - } - - b.expect = links - return links != 0, nil - } - - _, internal, err := b.verify(ctx, blk) - return internal, err -} - -var _ BlockVerifier = &OptimisticVerifier{} -var _ BlockVerifier = &UnixFs0Verifier{} diff --git a/retrieval/types.go b/retrieval/types.go deleted file mode 100644 index f82279a9d..000000000 --- a/retrieval/types.go +++ /dev/null @@ -1,364 +0,0 @@ -package retrievalmarket - -import ( - "context" - - "github.com/filecoin-project/go-address" - "github.com/filecoin-project/lotus/chain/types" - "github.com/ipfs/go-cid" - "github.com/ipld/go-ipld-prime" - "github.com/libp2p/go-libp2p-core/host" - "github.com/libp2p/go-libp2p-core/peer" -) - -// type aliases -// TODO: Remove and use native types or extract for -// https://github.com/filecoin-project/go-retrieval-market-project/issues/5 - -// BigInt is used for token amounts in retrieval deals -type BigInt = types.BigInt - -// Address is an address in the filecoin network -type Address = address.Address - -// SignedVoucher is a signed payment voucher -type SignedVoucher = types.SignedVoucher - -// ProtocolID is the protocol for proposing / responding to retrieval deals -const ProtocolID = "/fil/retrieval/0.0.1" - -// QueryProtocolID is the protocol for querying infromation about retrieval -// deal parameters -const QueryProtocolID = "/fil/retrieval/qry/0.0.1" // TODO: spec - -// Unsubscribe is a function that unsubscribes a subscriber for either the -// client or the provider -type Unsubscribe func() - -// ClientDealState is the current state of a deal from the point of view -// of a retrieval client -type ClientDealState struct { - DealProposal - Status DealStatus - Sender peer.ID - TotalReceived uint64 - FundsSpent BigInt -} - -// ClientEvent is an event that occurs in a deal lifecycle on the client -type ClientEvent uint64 - -const ( - // ClientEventOpen indicates a deal was initiated - ClientEventOpen ClientEvent = iota - - // ClientEventFundsExpended indicates a deal has run out of funds in the payment channel - // forcing the client to add more funds to continue the deal - ClientEventFundsExpended // when totalFunds is expended - - // ClientEventProgress indicates more data was received for a retrieval - ClientEventProgress - - // ClientEventError indicates an error occurred during a deal - ClientEventError - - // ClientEventComplete indicates a deal has completed - ClientEventComplete -) - -// ClientSubscriber is a callback that is registered to listen for retrieval events -type ClientSubscriber func(event ClientEvent, state ClientDealState) - -// RetrievalClient is a client interface for making retrieval deals -type RetrievalClient interface { - // V0 - - // Find Providers finds retrieval providers who may be storing a given piece - FindProviders(pieceCID []byte) []RetrievalPeer - - // Query asks a provider for information about a piece it is storing - Query( - ctx context.Context, - p RetrievalPeer, - pieceCID []byte, - params QueryParams, - ) (QueryResponse, error) - - // Retrieve retrieves all or part of a piece with the given retrieval parameters - Retrieve( - ctx context.Context, - pieceCID []byte, - params Params, - totalFunds BigInt, - miner peer.ID, - clientWallet Address, - minerWallet Address, - ) DealID - - // SubscribeToEvents listens for events that happen related to client retrievals - SubscribeToEvents(subscriber ClientSubscriber) Unsubscribe - - // V1 - AddMoreFunds(id DealID, amount BigInt) error - CancelDeal(id DealID) error - RetrievalStatus(id DealID) - ListDeals() map[DealID]ClientDealState -} - -// RetrievalClientNode are the node depedencies for a RetrevalClient -type RetrievalClientNode interface { - - // GetOrCreatePaymentChannel sets up a new payment channel if one does not exist - // between a client and a miner and insures the client has the given amount of funds available in the channel - GetOrCreatePaymentChannel(ctx context.Context, clientAddress Address, minerAddress Address, clientFundsAvailable BigInt) (Address, error) - - // Allocate late creates a lane within a payment channel so that calls to - // CreatePaymentVoucher will automatically make vouchers only for the difference - // in total - AllocateLane(paymentChannel Address) (uint64, error) - - // CreatePaymentVoucher creates a new payment voucher in the given lane for a - // given payment channel so that all the payment vouchers in the lane add up - // to the given amount (so the payment voucher will be for the difference) - CreatePaymentVoucher(ctx context.Context, paymentChannel Address, amount BigInt, lane uint64) (*SignedVoucher, error) -} - -// ProviderDealState is the current state of a deal from the point of view -// of a retrieval provider -type ProviderDealState struct { - DealProposal - Status DealStatus - Receiver peer.ID - TotalSent uint64 - FundsReceived BigInt -} - -// ProviderEvent is an event that occurs in a deal lifecycle on the provider -type ProviderEvent uint64 - -const ( - - // ProviderEventOpen indicates a new deal was received from a client - ProviderEventOpen ProviderEvent = iota - - // ProviderEventProgress indicates more data was sent to a client - ProviderEventProgress - - // ProviderEventError indicates an error occurred in processing a deal for a client - ProviderEventError - - // ProviderEventComplete indicates a retrieval deal was completed for a client - ProviderEventComplete -) - -// ProviderDealID is a unique identifier for a deal on a provider -- it is -// a combination of DealID set by the client and the peer ID of the client -type ProviderDealID struct { - From peer.ID - ID DealID -} - -// ProviderSubscriber is a callback that is registered to listen for retrieval events on a provider -type ProviderSubscriber func(event ProviderEvent, state ProviderDealState) - -// RetrievalProvider is an interface by which a provider configures their -// retrieval operations and monitors deals received and process -type RetrievalProvider interface { - // Start begins listening for deals on the given host - Start(host.Host) - - // V0 - - // SetPricePerByte sets the price per byte a miner charges for retrievals - SetPricePerByte(price BigInt) - - // SetPaymentInterval sets the maximum number of bytes a a provider will send before - // requesting further payment, and the rate at which that value increases - SetPaymentInterval(paymentInterval uint64, paymentIntervalIncrease uint64) - - // SubscribeToEvents listens for events that happen related to client retrievals - SubscribeToEvents(subscriber ProviderSubscriber) Unsubscribe - - // V1 - SetPricePerUnseal(price BigInt) - ListDeals() map[ProviderDealID]ProviderDealState -} - -// RetrievalProviderNode are the node depedencies for a RetrevalProvider -type RetrievalProviderNode interface { - SavePaymentVoucher(ctx context.Context, paymentChannel address.Address, voucher *SignedVoucher, proof []byte, expectedAmount BigInt) (BigInt, error) -} - -// PeerResolver is an interface for looking up providers that may have a piece -type PeerResolver interface { - GetPeers(data cid.Cid) ([]RetrievalPeer, error) // TODO: channel -} - -// RetrievalPeer is a provider address/peer.ID pair (everything needed to make -// deals for with a miner) -type RetrievalPeer struct { - Address Address - ID peer.ID // optional -} - -// QueryResponseStatus indicates whether a queried piece is available -type QueryResponseStatus uint64 - -const ( - // QueryResponseAvailable indicates a provider has a piece and is prepared to - // return it - QueryResponseAvailable QueryResponseStatus = iota - - // QueryResponseUnavailable indicates a provider either does not have or cannot - // serve the queried piece to the client - QueryResponseUnavailable -) - -// QueryItemStatus (V1) indicates whether the requested part of a piece (payload or selector) -// is available for retrieval -type QueryItemStatus uint64 - -const ( - // QueryItemAvailable indicates requested part of the piece is available to be - // served - QueryItemAvailable QueryItemStatus = iota - - // QueryItemUnavailable indicates the piece either does not contain the requested - // item or it cannot be served - QueryItemUnavailable - - // QueryItemUnknown indicates the provider cannot determine if the given item - // is part of the requested piece (for example, if the piece is sealed and the - // miner does not maintain a payload CID index) - QueryItemUnknown -) - -// QueryParams indicate what specific information about a piece that a retrieval -// client is interested in, as well as specific parameters the client is seeking -// for the retrieval deal -type QueryParams struct { - PayloadCID cid.Cid // optional, query if miner has this cid in this piece. some miners may not be able to respond. - Selector ipld.Node // optional, query if miner has this cid in this piece. some miners may not be able to respond. - MaxPricePerByte BigInt // optional, tell miner uninterested if more expensive than this - MinPaymentInterval uint64 // optional, tell miner uninterested unless payment interval is greater than this - MinPaymentIntervalIncrease uint64 // optional, tell miner uninterested unless payment interval increase is greater than this -} - -// Query is a query to a given provider to determine information about a piece -// they may have available for retrieval -type Query struct { - PieceCID []byte // V0 - // QueryParams // V1 -} - -// QueryResponse is a miners response to a given retrieval query -type QueryResponse struct { - Status QueryResponseStatus - //PayloadCIDFound QueryItemStatus // V1 - if a PayloadCid was requested, the result - //SelectorFound QueryItemStatus // V1 - if a Selector was requested, the result - - Size uint64 // Total size of piece in bytes - //ExpectedPayloadSize uint64 // V1 - optional, if PayloadCID + selector are specified and miner knows, can offer an expected size - - PaymentAddress Address // address to send funds to -- may be different than miner addr - MinPricePerByte BigInt - MaxPaymentInterval uint64 - MaxPaymentIntervalIncrease uint64 -} - -// QueryResponseUndefined is an empty QueryResponse -var QueryResponseUndefined = QueryResponse{} - -// PieceRetrievalPrice is the total price to retrieve the piece (size * MinPricePerByte) -func (qr QueryResponse) PieceRetrievalPrice() BigInt { - return types.BigMul(qr.MinPricePerByte, types.NewInt(qr.Size)) -} - -// PayloadRetrievalPrice is the expected price to retrieve just the given payload -// & selector (V1) -//func (qr QueryResponse) PayloadRetrievalPrice() BigInt { -// return types.BigMul(qr.MinPricePerByte, types.NewInt(qr.ExpectedPayloadSize)) -//} - -// DealStatus is the status of a retrieval deal returned by a provider -// in a DealResponse -type DealStatus uint64 - -const ( - // DealStatusAccepted means a deal has been accepted by a provider - // and its is ready to proceed with retrieval - DealStatusAccepted DealStatus = iota - - // DealStatusFailed indicates something went wrong during a retrieval - DealStatusFailed - - // DealStatusRejected indicates the provider rejected a client's deal proposal - // for some reason - DealStatusRejected - - // DealStatusUnsealing indicates the provider is currently unsealing the sector - // needed to serve the retrieval deal - DealStatusUnsealing - - // DealStatusFundsNeeded indicates the provider is awaiting a payment voucher to - // continue processing the deal - DealStatusFundsNeeded - - // DealStatusOngoing indicates the provider is continuing to process a deal - DealStatusOngoing - - // DealStatusFundsNeededLastPayment indicates the provider is awaiting funds for - // a final payment in order to complete a deal - DealStatusFundsNeededLastPayment - - // DealStatusCompleted indicates a deal is complete - DealStatusCompleted - - // DealStatusDealNotFound indicates an update was received for a deal that could - // not be identified - DealStatusDealNotFound -) - -// Params are the parameters requested for a retrieval deal proposal -type Params struct { - //PayloadCID cid.Cid // V1 - //Selector ipld.Node // V1 - PricePerByte BigInt - PaymentInterval uint64 - PaymentIntervalIncrease uint64 -} - -// DealID is an identifier for a retrieval deal (unique to a client) -type DealID uint64 - -// DealProposal is a proposal for a new retrieval deal -type DealProposal struct { - PieceCID []byte - ID DealID - Params -} - -// Block is an IPLD block in bitswap format -type Block struct { - Prefix []byte - Data []byte -} - -// DealResponse is a response to a retrieval deal proposal -type DealResponse struct { - Status DealStatus - ID DealID - - // payment required to proceed - PaymentOwed BigInt - - Message string - Blocks []Block // V0 only -} - -// DealPayment is a payment for an in progress retrieval deal -type DealPayment struct { - ID DealID - PaymentChannel address.Address - PaymentVoucher *types.SignedVoucher -} diff --git a/retrievaladapter/client.go b/retrievaladapter/client.go index ff7d2f0fb..ae62632bd 100644 --- a/retrievaladapter/client.go +++ b/retrievaladapter/client.go @@ -3,9 +3,13 @@ package retrievaladapter import ( "context" + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-fil-components/retrievalmarket" + retrievaltoken "github.com/filecoin-project/go-fil-components/shared/tokenamount" + retrievaltypes "github.com/filecoin-project/go-fil-components/shared/types" + payapi "github.com/filecoin-project/lotus/node/impl/paych" "github.com/filecoin-project/lotus/paych" - retrievalmarket "github.com/filecoin-project/lotus/retrieval" ) type retrievalClientNode struct { @@ -21,21 +25,25 @@ func NewRetrievalClientNode(pmgr *paych.Manager, payapi payapi.PaychAPI) retriev // GetOrCreatePaymentChannel sets up a new payment channel if one does not exist // between a client and a miner and insures the client has the given amount of funds available in the channel -func (rcn *retrievalClientNode) GetOrCreatePaymentChannel(ctx context.Context, clientAddress retrievalmarket.Address, minerAddress retrievalmarket.Address, clientFundsAvailable retrievalmarket.BigInt) (retrievalmarket.Address, error) { - paych, _, err := rcn.pmgr.GetPaych(ctx, clientAddress, minerAddress, clientFundsAvailable) +func (rcn *retrievalClientNode) GetOrCreatePaymentChannel(ctx context.Context, clientAddress address.Address, minerAddress address.Address, clientFundsAvailable retrievaltoken.TokenAmount) (address.Address, error) { + paych, _, err := rcn.pmgr.GetPaych(ctx, clientAddress, minerAddress, FromSharedTokenAmount(clientFundsAvailable)) return paych, err } // Allocate late creates a lane within a payment channel so that calls to // CreatePaymentVoucher will automatically make vouchers only for the difference // in total -func (rcn *retrievalClientNode) AllocateLane(paymentChannel retrievalmarket.Address) (uint64, error) { +func (rcn *retrievalClientNode) AllocateLane(paymentChannel address.Address) (uint64, error) { return rcn.pmgr.AllocateLane(paymentChannel) } // CreatePaymentVoucher creates a new payment voucher in the given lane for a // given payment channel so that all the payment vouchers in the lane add up // to the given amount (so the payment voucher will be for the difference) -func (rcn *retrievalClientNode) CreatePaymentVoucher(ctx context.Context, paymentChannel retrievalmarket.Address, amount retrievalmarket.BigInt, lane uint64) (*retrievalmarket.SignedVoucher, error) { - return rcn.payapi.PaychVoucherCreate(ctx, paymentChannel, amount, lane) +func (rcn *retrievalClientNode) CreatePaymentVoucher(ctx context.Context, paymentChannel address.Address, amount retrievaltoken.TokenAmount, lane uint64) (*retrievaltypes.SignedVoucher, error) { + voucher, err := rcn.payapi.PaychVoucherCreate(ctx, paymentChannel, FromSharedTokenAmount(amount), lane) + if err != nil { + return nil, err + } + return ToSharedSignedVoucher(voucher) } diff --git a/retrievaladapter/converters.go b/retrievaladapter/converters.go new file mode 100644 index 000000000..069c3ea34 --- /dev/null +++ b/retrievaladapter/converters.go @@ -0,0 +1,45 @@ +package retrievaladapter + +import ( + "bytes" + sharedamount "github.com/filecoin-project/go-fil-components/shared/tokenamount" + sharedtypes "github.com/filecoin-project/go-fil-components/shared/types" + "github.com/filecoin-project/lotus/chain/types" +) + + +func FromSharedTokenAmount(in sharedamount.TokenAmount) types.BigInt { + return types.BigInt{Int: in.Int} +} + +func ToSharedTokenAmount(in types.BigInt) sharedamount.TokenAmount { + return sharedamount.TokenAmount{Int: in.Int} +} + +func ToSharedSignedVoucher(in *types.SignedVoucher) (*sharedtypes.SignedVoucher, error) { + var encoded bytes.Buffer + err := in.MarshalCBOR(&encoded) + if err != nil { + return nil, err + } + var out sharedtypes.SignedVoucher + err = out.UnmarshalCBOR(&encoded) + if err != nil { + return nil, err + } + return &out, nil +} + +func FromSharedSignedVoucher(in *sharedtypes.SignedVoucher) (*types.SignedVoucher, error) { + var encoded bytes.Buffer + err := in.MarshalCBOR(&encoded) + if err != nil { + return nil, err + } + var out types.SignedVoucher + err = out.UnmarshalCBOR(&encoded) + if err != nil { + return nil, err + } + return &out, nil +} diff --git a/retrievaladapter/provider.go b/retrievaladapter/provider.go index 06757593b..fffd2c7b5 100644 --- a/retrievaladapter/provider.go +++ b/retrievaladapter/provider.go @@ -4,20 +4,43 @@ import ( "context" "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-fil-components/retrievalmarket" + retrievaltoken "github.com/filecoin-project/go-fil-components/shared/tokenamount" + retrievaltypes "github.com/filecoin-project/go-fil-components/shared/types" "github.com/filecoin-project/lotus/api" - retrievalmarket "github.com/filecoin-project/lotus/retrieval" + "github.com/filecoin-project/lotus/storage/sectorblocks" + "github.com/ipfs/go-cid" + blockstore "github.com/ipfs/go-ipfs-blockstore" ) type retrievalProviderNode struct { - full api.FullNode + sectorBlocks *sectorblocks.SectorBlocks + full api.FullNode } // NewRetrievalProviderNode returns a new node adapter for a retrieval provider that talks to the // Lotus Node -func NewRetrievalProviderNode(full api.FullNode) retrievalmarket.RetrievalProviderNode { - return &retrievalProviderNode{full} +func NewRetrievalProviderNode(sectorBlocks *sectorblocks.SectorBlocks, full api.FullNode) retrievalmarket.RetrievalProviderNode { + return &retrievalProviderNode{sectorBlocks, full} } -func (rpn *retrievalProviderNode) SavePaymentVoucher(ctx context.Context, paymentChannel address.Address, voucher *retrievalmarket.SignedVoucher, proof []byte, expectedAmount retrievalmarket.BigInt) (retrievalmarket.BigInt, error) { - return rpn.full.PaychVoucherAdd(ctx, paymentChannel, voucher, proof, expectedAmount) +func (rpn *retrievalProviderNode) GetPieceSize(pieceCid []byte) (uint64, error) { + asCid, err := cid.Cast(pieceCid) + if err != nil { + return 0, err + } + return rpn.sectorBlocks.GetSize(asCid) +} + +func (rpn *retrievalProviderNode) SealedBlockstore(approveUnseal func() error) blockstore.Blockstore { + return rpn.sectorBlocks.SealedBlockstore(approveUnseal) +} + +func (rpn *retrievalProviderNode) SavePaymentVoucher(ctx context.Context, paymentChannel address.Address, voucher *retrievaltypes.SignedVoucher, proof []byte, expectedAmount retrievaltoken.TokenAmount) (retrievaltoken.TokenAmount, error) { + localVoucher, err := FromSharedSignedVoucher(voucher) + if err != nil { + return retrievaltoken.FromInt(0), err + } + added, err := rpn.full.PaychVoucherAdd(ctx, paymentChannel, localVoucher, proof, FromSharedTokenAmount(expectedAmount)) + return ToSharedTokenAmount(added), err }