Merge pull request #1152 from filecoin-project/feat/update-markets
Update go-fil-markets to spec compliance, add CAR file support
This commit is contained in:
commit
d7548ab2f7
@ -5,6 +5,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/filecoin-project/go-address"
|
||||
"github.com/filecoin-project/go-fil-markets/storagemarket"
|
||||
"github.com/ipfs/go-cid"
|
||||
"github.com/ipfs/go-filestore"
|
||||
"github.com/libp2p/go-libp2p-core/peer"
|
||||
@ -157,7 +158,7 @@ type Import struct {
|
||||
|
||||
type DealInfo struct {
|
||||
ProposalCid cid.Cid
|
||||
State DealState
|
||||
State storagemarket.StorageDealStatus
|
||||
Provider address.Address
|
||||
|
||||
PieceRef []byte // cid bytes
|
||||
@ -239,7 +240,8 @@ type QueryOffer struct {
|
||||
|
||||
Size uint64
|
||||
MinPrice types.BigInt
|
||||
|
||||
PaymentInterval uint64
|
||||
PaymentIntervalIncrease uint64
|
||||
Miner address.Address
|
||||
MinerPeerID peer.ID
|
||||
}
|
||||
@ -249,7 +251,8 @@ func (o *QueryOffer) Order(client address.Address) RetrievalOrder {
|
||||
Root: o.Root,
|
||||
Size: o.Size,
|
||||
Total: o.MinPrice,
|
||||
|
||||
PaymentInterval: o.PaymentInterval,
|
||||
PaymentIntervalIncrease: o.PaymentIntervalIncrease,
|
||||
Client: client,
|
||||
|
||||
Miner: o.Miner,
|
||||
@ -263,7 +266,8 @@ type RetrievalOrder struct {
|
||||
Size uint64
|
||||
// TODO: support offset
|
||||
Total types.BigInt
|
||||
|
||||
PaymentInterval uint64
|
||||
PaymentIntervalIncrease uint64
|
||||
Client address.Address
|
||||
Miner address.Address
|
||||
MinerPeerID peer.ID
|
||||
|
@ -13,7 +13,7 @@ import (
|
||||
|
||||
logging "github.com/ipfs/go-log/v2"
|
||||
|
||||
"github.com/filecoin-project/lotus/api"
|
||||
"github.com/filecoin-project/go-fil-markets/storagemarket"
|
||||
"github.com/filecoin-project/lotus/build"
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
"github.com/filecoin-project/lotus/node/impl"
|
||||
@ -42,7 +42,7 @@ func TestDealFlow(t *testing.T, b APIBuilder, blocktime time.Duration) {
|
||||
}
|
||||
time.Sleep(time.Second)
|
||||
|
||||
data := make([]byte, 1000)
|
||||
data := make([]byte, 800)
|
||||
rand.New(rand.NewSource(5)).Read(data)
|
||||
|
||||
r := bytes.NewReader(data)
|
||||
@ -88,17 +88,17 @@ loop:
|
||||
t.Fatal(err)
|
||||
}
|
||||
switch di.State {
|
||||
case api.DealRejected:
|
||||
case storagemarket.StorageDealProposalRejected:
|
||||
t.Fatal("deal rejected")
|
||||
case api.DealFailed:
|
||||
case storagemarket.StorageDealFailing:
|
||||
t.Fatal("deal failed")
|
||||
case api.DealError:
|
||||
case storagemarket.StorageDealError:
|
||||
t.Fatal("deal errored")
|
||||
case api.DealComplete:
|
||||
case storagemarket.StorageDealActive:
|
||||
fmt.Println("COMPLETE", di)
|
||||
break loop
|
||||
}
|
||||
fmt.Println("Deal state: ", api.DealStates[di.State])
|
||||
fmt.Println("Deal state: ", storagemarket.DealStates[di.State])
|
||||
time.Sleep(time.Second / 2)
|
||||
}
|
||||
|
||||
|
30
api/types.go
30
api/types.go
@ -6,36 +6,6 @@ import (
|
||||
ma "github.com/multiformats/go-multiaddr"
|
||||
)
|
||||
|
||||
type DealState = uint64
|
||||
|
||||
const (
|
||||
DealUnknown = DealState(iota)
|
||||
DealRejected // Provider didn't like the proposal
|
||||
DealAccepted // Proposal accepted, data moved
|
||||
DealStaged // Data put into the sector
|
||||
DealSealing // Data in process of being sealed
|
||||
|
||||
DealFailed
|
||||
DealComplete
|
||||
|
||||
// Internal
|
||||
|
||||
DealError // deal failed with an unexpected error
|
||||
|
||||
DealNoUpdate = DealUnknown
|
||||
)
|
||||
|
||||
var DealStates = []string{
|
||||
"DealUnknown",
|
||||
"DealRejected",
|
||||
"DealAccepted",
|
||||
"DealStaged",
|
||||
"DealSealing",
|
||||
"DealFailed",
|
||||
"DealComplete",
|
||||
"DealError",
|
||||
}
|
||||
|
||||
// TODO: check if this exists anywhere else
|
||||
type MultiaddrSlice []ma.Multiaddr
|
||||
|
||||
|
@ -13,7 +13,7 @@ import (
|
||||
"gopkg.in/urfave/cli.v2"
|
||||
|
||||
"github.com/filecoin-project/go-address"
|
||||
lapi "github.com/filecoin-project/lotus/api"
|
||||
"github.com/filecoin-project/go-fil-markets/storagemarket"
|
||||
actors "github.com/filecoin-project/lotus/chain/actors"
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
)
|
||||
@ -359,7 +359,7 @@ var clientListDeals = &cli.Command{
|
||||
w := tabwriter.NewWriter(os.Stdout, 2, 4, 2, ' ', 0)
|
||||
fmt.Fprintf(w, "DealCid\tProvider\tState\tPieceRef\tSize\tPrice\tDuration\n")
|
||||
for _, d := range deals {
|
||||
fmt.Fprintf(w, "%s\t%s\t%s\t%x\t%d\t%s\t%d\n", d.ProposalCid, d.Provider, lapi.DealStates[d.State], d.PieceRef, d.Size, d.PricePerEpoch, d.Duration)
|
||||
fmt.Fprintf(w, "%s\t%s\t%s\t%x\t%d\t%s\t%d\n", d.ProposalCid, d.Provider, storagemarket.DealStates[d.State], d.PieceRef, d.Size, d.PricePerEpoch, d.Duration)
|
||||
}
|
||||
return w.Flush()
|
||||
},
|
||||
|
@ -293,7 +293,7 @@ func migratePreSealMeta(ctx context.Context, api lapi.FullNode, presealDir strin
|
||||
MinerDeal: storagemarket.MinerDeal{
|
||||
Proposal: *proposal,
|
||||
ProposalCid: proposalCid,
|
||||
State: lapi.DealComplete,
|
||||
State: storagemarket.StorageDealActive,
|
||||
Ref: proposalCid, // TODO: This is super wrong, but there
|
||||
// are no params for CommP CIDs, we can't recover unixfs cid easily,
|
||||
// and this isn't even used after the deal enters Complete state
|
||||
|
6
go.mod
6
go.mod
@ -18,7 +18,7 @@ require (
|
||||
github.com/filecoin-project/go-cbor-util v0.0.0-20191219014500-08c40a1e63a2
|
||||
github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03
|
||||
github.com/filecoin-project/go-data-transfer v0.0.0-20191219005021-4accf56bd2ce
|
||||
github.com/filecoin-project/go-fil-markets v0.0.0-20200114015428-74d100f305f8
|
||||
github.com/filecoin-project/go-fil-markets v0.0.0-20200124235616-d94a1cf0beaa
|
||||
github.com/filecoin-project/go-paramfetch v0.0.1
|
||||
github.com/filecoin-project/go-sectorbuilder v0.0.2-0.20200203173614-42d67726bb62
|
||||
github.com/filecoin-project/go-statestore v0.1.0
|
||||
@ -32,7 +32,7 @@ require (
|
||||
github.com/ipfs/go-bitswap v0.1.8
|
||||
github.com/ipfs/go-block-format v0.0.2
|
||||
github.com/ipfs/go-blockservice v0.1.3-0.20190908200855-f22eea50656c
|
||||
github.com/ipfs/go-car v0.0.3-0.20200121013634-f188c0e24291
|
||||
github.com/ipfs/go-car v0.0.3-0.20200124090545-1a340009d896
|
||||
github.com/ipfs/go-cid v0.0.4
|
||||
github.com/ipfs/go-datastore v0.3.1
|
||||
github.com/ipfs/go-ds-badger2 v0.0.0-20200123200730-d75eb2678a5d
|
||||
@ -83,7 +83,7 @@ require (
|
||||
github.com/multiformats/go-varint v0.0.2
|
||||
github.com/opentracing/opentracing-go v1.1.0
|
||||
github.com/polydawn/refmt v0.0.0-20190809202753-05966cbd336a
|
||||
github.com/prometheus/common v0.2.0
|
||||
github.com/prometheus/common v0.4.0
|
||||
github.com/stretchr/testify v1.4.0
|
||||
github.com/whyrusleeping/bencher v0.0.0-20190829221104-bb6607aa8bba
|
||||
github.com/whyrusleeping/cbor-gen v0.0.0-20200121162646-b63bacf5eaf8
|
||||
|
23
go.sum
23
go.sum
@ -111,8 +111,8 @@ github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03 h1:2pMX
|
||||
github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03/go.mod h1:+viYnvGtUTgJRdy6oaeF4MTFKAfatX071MPDPBL11EQ=
|
||||
github.com/filecoin-project/go-data-transfer v0.0.0-20191219005021-4accf56bd2ce h1:Jdejrx6XVSTRy2PiX08HCU5y68p3wx2hNMJJc/J7kZY=
|
||||
github.com/filecoin-project/go-data-transfer v0.0.0-20191219005021-4accf56bd2ce/go.mod h1:b14UWxhxVCAjrQUYvVGrQRRsjAh79wXYejw9RbUcAww=
|
||||
github.com/filecoin-project/go-fil-markets v0.0.0-20200114015428-74d100f305f8 h1:g3oodvSz+Ou+ObwcVBB2wyt8SHdWpwzMiNJ19U1zZNA=
|
||||
github.com/filecoin-project/go-fil-markets v0.0.0-20200114015428-74d100f305f8/go.mod h1:c8NTjvFVy1Ud02mmGDjOiMeawY2t6ALfrrdvAB01FQc=
|
||||
github.com/filecoin-project/go-fil-markets v0.0.0-20200124235616-d94a1cf0beaa h1:45LCzmHF2NZuYYFOmJ/MQ5SRDXw4QApQvT4BJzVysV8=
|
||||
github.com/filecoin-project/go-fil-markets v0.0.0-20200124235616-d94a1cf0beaa/go.mod h1:hbYlEmbOg9QwhZ71B724oAgXQ0wnoWHj8S+33q9lrm8=
|
||||
github.com/filecoin-project/go-paramfetch v0.0.0-20200102181131-b20d579f2878/go.mod h1:40kI2Gv16mwcRsHptI3OAV4nlOEU7wVDc4RgMylNFjU=
|
||||
github.com/filecoin-project/go-paramfetch v0.0.1 h1:gV7bs5YaqlgpGFMiLxInGK2L1FyCXUE0rimz4L7ghoE=
|
||||
github.com/filecoin-project/go-paramfetch v0.0.1/go.mod h1:fZzmf4tftbwf9S37XRifoJlz7nCjRdIrMGLR07dKLCc=
|
||||
@ -177,6 +177,8 @@ github.com/gxed/hashland/murmur3 v0.0.1/go.mod h1:KjXop02n4/ckmZSnY2+HKcLud/tcmv
|
||||
github.com/gxed/pubsub v0.0.0-20180201040156-26ebdf44f824/go.mod h1:OiEWyHgK+CWrmOlVquHaIK1vhpUJydC9m0Je6mhaiNE=
|
||||
github.com/hannahhoward/cbor-gen-for v0.0.0-20191216214420-3e450425c40c h1:+MSf4NEnLCYZoAgK6fqwc7NH88nM8haFSxKGUGIG3vA=
|
||||
github.com/hannahhoward/cbor-gen-for v0.0.0-20191216214420-3e450425c40c/go.mod h1:WVPCl0HO/0RAL5+vBH2GMxBomlxBF70MAS78+Lu1//k=
|
||||
github.com/hannahhoward/cbor-gen-for v0.0.0-20191218204337-9ab7b1bcc099 h1:vQqOW42RRM5LoM/1K5dK940VipLqpH8lEVGrMz+mNjU=
|
||||
github.com/hannahhoward/cbor-gen-for v0.0.0-20191218204337-9ab7b1bcc099/go.mod h1:WVPCl0HO/0RAL5+vBH2GMxBomlxBF70MAS78+Lu1//k=
|
||||
github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA=
|
||||
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
|
||||
github.com/hashicorp/go-multierror v1.0.0 h1:iVjPR7a6H0tWELX5NxNe7bYopibicUzc7uPribsnS6o=
|
||||
@ -211,9 +213,8 @@ github.com/ipfs/go-blockservice v0.0.7/go.mod h1:EOfb9k/Y878ZTRY/CH0x5+ATtaipfbR
|
||||
github.com/ipfs/go-blockservice v0.1.0/go.mod h1:hzmMScl1kXHg3M2BjTymbVPjv627N7sYcvYaKbop39M=
|
||||
github.com/ipfs/go-blockservice v0.1.3-0.20190908200855-f22eea50656c h1:lN5IQA07VtLiTLAp/Scezp1ljFhXErC6yq4O1cu+yJ0=
|
||||
github.com/ipfs/go-blockservice v0.1.3-0.20190908200855-f22eea50656c/go.mod h1:t+411r7psEUhLueM8C7aPA7cxCclv4O3VsUVxt9kz2I=
|
||||
github.com/ipfs/go-car v0.0.3-0.20191203022317-23b0a85fd1b1/go.mod h1:rmd887mJxQRDfndfDEY3Liyx8gQVyfFFRSHdsnDSAlk=
|
||||
github.com/ipfs/go-car v0.0.3-0.20200121013634-f188c0e24291 h1:Yy0dcFWw8oDV/WJ4S/rkMQRWnJ3tGr9EbgDDv2JhVQw=
|
||||
github.com/ipfs/go-car v0.0.3-0.20200121013634-f188c0e24291/go.mod h1:AG6sBpd2PWMccpAG7XLFBBQ/4rfBEtzUNeO2GSMesYk=
|
||||
github.com/ipfs/go-car v0.0.3-0.20200124090545-1a340009d896 h1:l8gnU1VBhftugMKzfh+n7nuDhOw3X1iqfrA33GVBMMY=
|
||||
github.com/ipfs/go-car v0.0.3-0.20200124090545-1a340009d896/go.mod h1:rmd887mJxQRDfndfDEY3Liyx8gQVyfFFRSHdsnDSAlk=
|
||||
github.com/ipfs/go-cid v0.0.1/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM=
|
||||
github.com/ipfs/go-cid v0.0.2/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM=
|
||||
github.com/ipfs/go-cid v0.0.3/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM=
|
||||
@ -305,7 +306,6 @@ github.com/ipfs/go-unixfs v0.2.2-0.20190827150610-868af2e9e5cb h1:tmWYgjltxwM7PD
|
||||
github.com/ipfs/go-unixfs v0.2.2-0.20190827150610-868af2e9e5cb/go.mod h1:IwAAgul1UQIcNZzKPYZWOCijryFBeCV79cNubPzol+k=
|
||||
github.com/ipfs/go-verifcid v0.0.1 h1:m2HI7zIuR5TFyQ1b79Da5N9dnnCP1vcu2QqawmWlK2E=
|
||||
github.com/ipfs/go-verifcid v0.0.1/go.mod h1:5Hrva5KBeIog4A+UpqlaIU+DEstipcJYQQZc0g37pY0=
|
||||
github.com/ipld/go-ipld-prime v0.0.1/go.mod h1:bDDSvVz7vaK12FNvMeRYnpRFkSUPNQOiCYQezMD/P3w=
|
||||
github.com/ipld/go-ipld-prime v0.0.2-0.20191108012745-28a82f04c785 h1:fASnkvtR+SmB2y453RxmDD3Uvd4LonVUgFGk9JoDaZs=
|
||||
github.com/ipld/go-ipld-prime v0.0.2-0.20191108012745-28a82f04c785/go.mod h1:bDDSvVz7vaK12FNvMeRYnpRFkSUPNQOiCYQezMD/P3w=
|
||||
github.com/ipld/go-ipld-prime-proto v0.0.0-20191113031812-e32bd156a1e5 h1:lSip43rAdyGA+yRQuy6ju0ucZkWpYc1F2CTQtZTVW/4=
|
||||
@ -410,10 +410,13 @@ github.com/libp2p/go-libp2p-discovery v0.1.0/go.mod h1:4F/x+aldVHjHDHuX85x1zWoFT
|
||||
github.com/libp2p/go-libp2p-discovery v0.2.0 h1:1p3YSOq7VsgaL+xVHPi8XAmtGyas6D2J6rWBEfz/aiY=
|
||||
github.com/libp2p/go-libp2p-discovery v0.2.0/go.mod h1:s4VGaxYMbw4+4+tsoQTqh7wfxg97AEdo4GYBt6BadWg=
|
||||
github.com/libp2p/go-libp2p-host v0.0.1/go.mod h1:qWd+H1yuU0m5CwzAkvbSjqKairayEHdR5MMl7Cwa7Go=
|
||||
github.com/libp2p/go-libp2p-host v0.0.3 h1:BB/1Z+4X0rjKP5lbQTmjEjLbDVbrcmLOlA6QDsN5/j4=
|
||||
github.com/libp2p/go-libp2p-host v0.0.3/go.mod h1:Y/qPyA6C8j2coYyos1dfRm0I8+nvd4TGrDGt4tA7JR8=
|
||||
github.com/libp2p/go-libp2p-interface-connmgr v0.0.1/go.mod h1:GarlRLH0LdeWcLnYM/SaBykKFl9U5JFnbBGruAk/D5k=
|
||||
github.com/libp2p/go-libp2p-interface-connmgr v0.0.4/go.mod h1:GarlRLH0LdeWcLnYM/SaBykKFl9U5JFnbBGruAk/D5k=
|
||||
github.com/libp2p/go-libp2p-interface-connmgr v0.0.5 h1:KG/KNYL2tYzXAfMvQN5K1aAGTYSYUMJ1prgYa2/JI1E=
|
||||
github.com/libp2p/go-libp2p-interface-connmgr v0.0.5/go.mod h1:GarlRLH0LdeWcLnYM/SaBykKFl9U5JFnbBGruAk/D5k=
|
||||
github.com/libp2p/go-libp2p-interface-pnet v0.0.1 h1:7GnzRrBTJHEsofi1ahFdPN9Si6skwXQE9UqR2S+Pkh8=
|
||||
github.com/libp2p/go-libp2p-interface-pnet v0.0.1/go.mod h1:el9jHpQAXK5dnTpKA4yfCNBZXvrzdOU75zz+C6ryp3k=
|
||||
github.com/libp2p/go-libp2p-kad-dht v0.1.1 h1:IH6NQuoUv5w5e1O8Jc3KyVDtr0rNd0G9aaADpLI1xVo=
|
||||
github.com/libp2p/go-libp2p-kad-dht v0.1.1/go.mod h1:1kj2Rk5pX3/0RwqMm9AMNCT7DzcMHYhgDN5VTi+cY0M=
|
||||
@ -422,6 +425,7 @@ github.com/libp2p/go-libp2p-kbucket v0.2.0/go.mod h1:JNymBToym3QXKBMKGy3m29+xprg
|
||||
github.com/libp2p/go-libp2p-loggables v0.0.1/go.mod h1:lDipDlBNYbpyqyPX/KcoO+eq0sJYEVR2JgOexcivchg=
|
||||
github.com/libp2p/go-libp2p-loggables v0.1.0 h1:h3w8QFfCt2UJl/0/NW4K829HX/0S4KD31PQ7m8UXXO8=
|
||||
github.com/libp2p/go-libp2p-loggables v0.1.0/go.mod h1:EyumB2Y6PrYjr55Q3/tiJ/o3xoDasoRYM7nOzEpoa90=
|
||||
github.com/libp2p/go-libp2p-metrics v0.0.1 h1:yumdPC/P2VzINdmcKZd0pciSUCpou+s0lwYCjBbzQZU=
|
||||
github.com/libp2p/go-libp2p-metrics v0.0.1/go.mod h1:jQJ95SXXA/K1VZi13h52WZMa9ja78zjyy5rspMsC/08=
|
||||
github.com/libp2p/go-libp2p-mplex v0.1.1/go.mod h1:KUQWpGkCzfV7UIpi8SKsAVxyBgz1c9R5EvxgnwLsb/I=
|
||||
github.com/libp2p/go-libp2p-mplex v0.2.0/go.mod h1:Ejl9IyjvXJ0T9iqUTE1jpYATQ9NM3g+OtR+EMMODbKo=
|
||||
@ -431,6 +435,7 @@ github.com/libp2p/go-libp2p-nat v0.0.4/go.mod h1:N9Js/zVtAXqaeT99cXgTV9e75KpnWCv
|
||||
github.com/libp2p/go-libp2p-nat v0.0.5 h1:/mH8pXFVKleflDL1YwqMg27W9GD8kjEx7NY0P6eGc98=
|
||||
github.com/libp2p/go-libp2p-nat v0.0.5/go.mod h1:1qubaE5bTZMJE+E/uu2URroMbzdubFz1ChgiN79yKPE=
|
||||
github.com/libp2p/go-libp2p-net v0.0.1/go.mod h1:Yt3zgmlsHOgUWSXmt5V/Jpz9upuJBE8EgNU9DrCcR8c=
|
||||
github.com/libp2p/go-libp2p-net v0.0.2 h1:qP06u4TYXfl7uW/hzqPhlVVTSA2nw1B/bHBJaUnbh6M=
|
||||
github.com/libp2p/go-libp2p-net v0.0.2/go.mod h1:Yt3zgmlsHOgUWSXmt5V/Jpz9upuJBE8EgNU9DrCcR8c=
|
||||
github.com/libp2p/go-libp2p-netutil v0.0.1/go.mod h1:GdusFvujWZI9Vt0X5BKqwWWmZFxecf9Gt03cKxm2f/Q=
|
||||
github.com/libp2p/go-libp2p-netutil v0.1.0 h1:zscYDNVEcGxyUpMd0JReUZTrpMfia8PmLKcKF72EAMQ=
|
||||
@ -447,6 +452,7 @@ github.com/libp2p/go-libp2p-peerstore v0.1.3/go.mod h1:BJ9sHlm59/80oSkpWgr1MyY1c
|
||||
github.com/libp2p/go-libp2p-peerstore v0.1.4 h1:d23fvq5oYMJ/lkkbO4oTwBp/JP+I/1m5gZJobNXCE/k=
|
||||
github.com/libp2p/go-libp2p-peerstore v0.1.4/go.mod h1:+4BDbDiiKf4PzpANZDAT+knVdLxvqh7hXOujessqdzs=
|
||||
github.com/libp2p/go-libp2p-protocol v0.0.1/go.mod h1:Af9n4PiruirSDjHycM1QuiMi/1VZNHYcK8cLgFJLZ4s=
|
||||
github.com/libp2p/go-libp2p-protocol v0.1.0 h1:HdqhEyhg0ToCaxgMhnOmUO8snQtt/kQlcjVk3UoJU3c=
|
||||
github.com/libp2p/go-libp2p-protocol v0.1.0/go.mod h1:KQPHpAabB57XQxGrXCNvbL6UEXfQqUgC/1adR2Xtflk=
|
||||
github.com/libp2p/go-libp2p-pubsub v0.2.3 h1:qJRnRnM7Z4xnHb4i6EBb3DKQXRPgtFWlKP4AmfJudLQ=
|
||||
github.com/libp2p/go-libp2p-pubsub v0.2.3/go.mod h1:Jscj3fk23R5mCrOwb625xjVs5ZEyTZcx/OlTwMDqU+g=
|
||||
@ -515,6 +521,7 @@ github.com/libp2p/go-reuseport v0.0.1/go.mod h1:jn6RmB1ufnQwl0Q1f+YxAj8isJgDCQza
|
||||
github.com/libp2p/go-reuseport-transport v0.0.2 h1:WglMwyXyBu61CMkjCCtnmqNqnjib0GIEjMiHTwR/KN4=
|
||||
github.com/libp2p/go-reuseport-transport v0.0.2/go.mod h1:YkbSDrvjUVDL6b8XqriyA20obEtsW9BLkuOUyQAOCbs=
|
||||
github.com/libp2p/go-stream-muxer v0.0.1/go.mod h1:bAo8x7YkSpadMTbtTaxGVHWUQsR/l5MEaHbKaliuT14=
|
||||
github.com/libp2p/go-stream-muxer v0.1.0 h1:3ToDXUzx8pDC6RfuOzGsUYP5roMDthbUKRdMRRhqAqY=
|
||||
github.com/libp2p/go-stream-muxer v0.1.0/go.mod h1:8JAVsjeRBCWwPoZeH0W1imLOcriqXJyFvB0mR4A04sQ=
|
||||
github.com/libp2p/go-stream-muxer-multistream v0.1.1/go.mod h1:zmGdfkQ1AzOECIAcccoL8L//laqawOsO03zX8Sa+eGw=
|
||||
github.com/libp2p/go-stream-muxer-multistream v0.2.0 h1:714bRJ4Zy9mdhyTLJ+ZKiROmAFwUHpeRidG+q7LTQOg=
|
||||
@ -554,6 +561,7 @@ github.com/mattn/go-runewidth v0.0.7 h1:Ei8KR0497xHyKJPAv59M1dkC+rOZCMBJ+t3fZ+tw
|
||||
github.com/mattn/go-runewidth v0.0.7/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI=
|
||||
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
|
||||
github.com/mgutz/ansi v0.0.0-20170206155736-9520e82c474b/go.mod h1:01TrycV0kFyexm33Z7vhZRXopbI8J3TDReVlkTgMUxE=
|
||||
github.com/miekg/dns v1.1.12 h1:WMhc1ik4LNkTg8U9l3hI1LvxKmIL+f1+WV/SZtCbDDA=
|
||||
github.com/miekg/dns v1.1.12/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
|
||||
github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1 h1:lYpkrQH5ajf0OXOcUbGjvZxxijuBwbbmlSxLiuofa+g=
|
||||
github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1/go.mod h1:pD8RvIylQ358TN4wwqatJ8rNavkEINozVn9DtGI3dfQ=
|
||||
@ -647,6 +655,8 @@ github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:
|
||||
github.com/prometheus/client_model v0.0.0-20190115171406-56726106282f/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
|
||||
github.com/prometheus/common v0.2.0 h1:kUZDBDTdBVBYBj5Tmh2NZLlF60mfjA27rM34b+cVwNU=
|
||||
github.com/prometheus/common v0.2.0/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4=
|
||||
github.com/prometheus/common v0.4.0 h1:7etb9YClo3a6HjLzfl6rIQaU+FDfi0VSX39io3aQ+DM=
|
||||
github.com/prometheus/common v0.4.0/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4=
|
||||
github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
|
||||
github.com/prometheus/procfs v0.0.0-20190117184657-bf6a532e95b1/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
|
||||
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
|
||||
@ -728,6 +738,7 @@ github.com/whyrusleeping/go-smux-yamux v2.0.8+incompatible/go.mod h1:6qHUzBXUbB9
|
||||
github.com/whyrusleeping/mafmt v1.2.8 h1:TCghSl5kkwEE0j+sU/gudyhVMRlpBin8fMBBHg59EbA=
|
||||
github.com/whyrusleeping/mafmt v1.2.8/go.mod h1:faQJFPbLSxzD9xpA02ttW/tS9vZykNvXwGvqIpk20FA=
|
||||
github.com/whyrusleeping/mdns v0.0.0-20180901202407-ef14215e6b30/go.mod h1:j4l84WPFclQPj320J9gp0XwNKBb3U0zt5CBqjPp22G4=
|
||||
github.com/whyrusleeping/mdns v0.0.0-20190826153040-b9b60ed33aa9 h1:Y1/FEOpaCpD21WxrmfeIYCFPuVPRCY2XZTWzTNHGw30=
|
||||
github.com/whyrusleeping/mdns v0.0.0-20190826153040-b9b60ed33aa9/go.mod h1:j4l84WPFclQPj320J9gp0XwNKBb3U0zt5CBqjPp22G4=
|
||||
github.com/whyrusleeping/multiaddr-filter v0.0.0-20160516205228-e903e4adabd7 h1:E9S12nwJwEOXe2d6gT6qxdvqMnNq+VnSsKPgm2ZZNds=
|
||||
github.com/whyrusleeping/multiaddr-filter v0.0.0-20160516205228-e903e4adabd7/go.mod h1:X2c0RVCI1eSUFI8eLcY3c0423ykwiUdxLJtkDvruhjI=
|
||||
|
@ -1,68 +1,92 @@
|
||||
import React from 'react';
|
||||
import Address from "./Address";
|
||||
import Window from "./Window";
|
||||
import Fil from "./Fil";
|
||||
import React from 'react'
|
||||
import Address from './Address'
|
||||
import Window from './Window'
|
||||
import Fil from './Fil'
|
||||
|
||||
const dealStates = [
|
||||
"Unknown",
|
||||
"Rejected",
|
||||
"Accepted",
|
||||
"Staged",
|
||||
"Sealing",
|
||||
"Failed",
|
||||
"Complete",
|
||||
"Error",
|
||||
]
|
||||
'Unknown',
|
||||
'ProposalNotFound',
|
||||
'ProposalRejected',
|
||||
'ProposalAccepted',
|
||||
'Staged',
|
||||
'Sealing',
|
||||
'ProposalSigned',
|
||||
'Published',
|
||||
'Committed',
|
||||
'Active',
|
||||
'Failing',
|
||||
'Recovering',
|
||||
'Expired',
|
||||
'NotFound',
|
||||
|
||||
'Validating',
|
||||
'Transferring',
|
||||
'VerifyData',
|
||||
'Publishing',
|
||||
'Error'
|
||||
]
|
||||
|
||||
class Client extends React.Component {
|
||||
constructor(props) {
|
||||
super(props)
|
||||
|
||||
this.state = {
|
||||
miners: ["t0101"],
|
||||
ask: {Price: "1000000000"}, // 2x min default ask to account for bin packing (could also do the math correctly below, but..)
|
||||
miners: ['t0101'],
|
||||
ask: { Price: '1000000000' }, // 2x min default ask to account for bin packing (could also do the math correctly below, but..)
|
||||
|
||||
kbs: 1,
|
||||
blocks: 12,
|
||||
total: 36000,
|
||||
miner: "t0101",
|
||||
miner: 't0101',
|
||||
|
||||
deals: [],
|
||||
|
||||
blockDelay: 10,
|
||||
blockDelay: 10
|
||||
}
|
||||
}
|
||||
|
||||
async componentDidMount() {
|
||||
let ver = await this.props.client.call('Filecoin.Version', [])
|
||||
this.setState({blockDelay: ver.BlockDelay})
|
||||
this.setState({ blockDelay: ver.BlockDelay })
|
||||
|
||||
this.getDeals()
|
||||
setInterval(this.getDeals, 1325)
|
||||
}
|
||||
|
||||
getDeals = async () => {
|
||||
let miners = await this.props.client.call('Filecoin.StateListMiners', [null])
|
||||
let miners = await this.props.client.call('Filecoin.StateListMiners', [
|
||||
null
|
||||
])
|
||||
let deals = await this.props.client.call('Filecoin.ClientListDeals', [])
|
||||
miners.sort()
|
||||
this.setState({deals, miners})
|
||||
this.setState({ deals, miners })
|
||||
}
|
||||
|
||||
update = (name) => (e) => this.setState({ [name]: e.target.value });
|
||||
update = name => e => this.setState({ [name]: e.target.value })
|
||||
|
||||
makeDeal = async () => {
|
||||
let perBlk = this.state.ask.Price * this.state.kbs * 1000 / (1 << 30) * 2
|
||||
let perBlk =
|
||||
((this.state.ask.Price * this.state.kbs * 1000) / (1 << 30)) * 2
|
||||
|
||||
let file = await this.props.pondClient.call('Pond.CreateRandomFile', [this.state.kbs * 1000]) // 1024 won't fit in 1k blocks :(
|
||||
let file = await this.props.pondClient.call('Pond.CreateRandomFile', [
|
||||
this.state.kbs * 1000
|
||||
]) // 1024 won't fit in 1k blocks :(
|
||||
let cid = await this.props.client.call('Filecoin.ClientImport', [file])
|
||||
let dealcid = await this.props.client.call('Filecoin.ClientStartDeal', [cid, this.state.miner, `${Math.round(perBlk)}`, Number(this.state.blocks)])
|
||||
console.log("deal cid: ", dealcid)
|
||||
let dealcid = await this.props.client.call('Filecoin.ClientStartDeal', [
|
||||
cid,
|
||||
this.state.miner,
|
||||
`${Math.round(perBlk)}`,
|
||||
Number(this.state.blocks)
|
||||
])
|
||||
console.log('deal cid: ', dealcid)
|
||||
}
|
||||
|
||||
retrieve = (deal) => async () => {
|
||||
retrieve = deal => async () => {
|
||||
console.log(deal)
|
||||
let client = await this.props.client.call('Filecoin.WalletDefaultAddress', [])
|
||||
let client = await this.props.client.call(
|
||||
'Filecoin.WalletDefaultAddress',
|
||||
[]
|
||||
)
|
||||
|
||||
let order = {
|
||||
Root: deal.PieceRef,
|
||||
@ -74,7 +98,10 @@ class Client extends React.Component {
|
||||
Miner: deal.Miner
|
||||
}
|
||||
|
||||
await this.props.client.call('Filecoin.ClientRetrieve', [order, '/dev/null'])
|
||||
await this.props.client.call('Filecoin.ClientRetrieve', [
|
||||
order,
|
||||
'/dev/null'
|
||||
])
|
||||
}
|
||||
|
||||
render() {
|
||||
@ -82,41 +109,111 @@ class Client extends React.Component {
|
||||
let total = perBlk * this.state.blocks
|
||||
let days = (this.state.blocks * this.state.blockDelay) / 60 / 60 / 24
|
||||
|
||||
let dealMaker = <div hidden={!this.props.pondClient}>
|
||||
let dealMaker = (
|
||||
<div hidden={!this.props.pondClient}>
|
||||
<div>
|
||||
<span>Make Deal: </span>
|
||||
<select>{this.state.miners.map(m => <option key={m} value={m}>{m}</option>)}</select>
|
||||
<span> Ask: <b><Fil>{this.state.ask.Price}</Fil></b> Fil/Byte/Block</span>
|
||||
<select>
|
||||
{this.state.miners.map(m => (
|
||||
<option key={m} value={m}>
|
||||
{m}
|
||||
</option>
|
||||
))}
|
||||
</select>
|
||||
<span>
|
||||
{' '}
|
||||
Ask:{' '}
|
||||
<b>
|
||||
<Fil>{this.state.ask.Price}</Fil>
|
||||
</b>{' '}
|
||||
Fil/Byte/Block
|
||||
</span>
|
||||
</div>
|
||||
<div>
|
||||
Data Size: <input type="text" placeholder="KBs" defaultValue={1} onChange={this.update("kbs")} style={{width: "5em"}}/>KB;
|
||||
Duration:<input type="text" placeholder="blocks" defaultValue={12} onChange={this.update("blocks")} style={{width: "5em"}}/>Blocks
|
||||
Data Size:{' '}
|
||||
<input
|
||||
type="text"
|
||||
placeholder="KBs"
|
||||
defaultValue={1}
|
||||
onChange={this.update('kbs')}
|
||||
style={{ width: '5em' }}
|
||||
/>
|
||||
KB; Duration:
|
||||
<input
|
||||
type="text"
|
||||
placeholder="blocks"
|
||||
defaultValue={12}
|
||||
onChange={this.update('blocks')}
|
||||
style={{ width: '5em' }}
|
||||
/>
|
||||
Blocks
|
||||
</div>
|
||||
<div>
|
||||
Total: <Fil>{total}</Fil>; {days} Days
|
||||
</div>
|
||||
<button onClick={this.makeDeal}>Deal!</button>
|
||||
</div>
|
||||
)
|
||||
|
||||
let deals = this.state.deals.map((deal, i) => <div key={i}>
|
||||
let deals = this.state.deals.map((deal, i) => (
|
||||
<div key={i}>
|
||||
<ul>
|
||||
<li>{i}. Proposal: {deal.ProposalCid['/'].substr(0, 18)}... <Address nobalance={true} client={this.props.client} addr={deal.Provider} mountWindow={this.props.mountWindow}/>: <b>{dealStates[deal.State]}</b>
|
||||
{dealStates[deal.State] === 'Complete' ? <span> <a href="#" onClick={this.retrieve(deal)}>[Retrieve]</a></span> : <span/> }
|
||||
<li>
|
||||
{i}. Proposal: {deal.ProposalCid['/'].substr(0, 18)}...{' '}
|
||||
<Address
|
||||
nobalance={true}
|
||||
client={this.props.client}
|
||||
addr={deal.Provider}
|
||||
mountWindow={this.props.mountWindow}
|
||||
/>
|
||||
: <b>{dealStates[deal.State]}</b>
|
||||
{dealStates[deal.State] === 'Complete' ? (
|
||||
<span>
|
||||
|
||||
<a href="#" onClick={this.retrieve(deal)}>
|
||||
[Retrieve]
|
||||
</a>
|
||||
</span>
|
||||
) : (
|
||||
<span />
|
||||
)}
|
||||
<ul>
|
||||
<li>Data: {deal.PieceRef['/']}, <b>{deal.Size}</b>B; Duration: <b>{deal.Duration}</b>Blocks</li>
|
||||
<li>Total: <b>{deal.TotalPrice}</b>FIL; Per Block: <b>{Math.round(deal.TotalPrice / deal.Duration * 100) / 100}</b>FIL; PerMbyteByteBlock: <b>{Math.round(deal.TotalPrice / deal.Duration / (deal.Size / 1000000) * 100) / 100}</b>FIL</li>
|
||||
<li>
|
||||
Data: {deal.PieceRef['/']}, <b>{deal.Size}</b>B; Duration:{' '}
|
||||
<b>{deal.Duration}</b>Blocks
|
||||
</li>
|
||||
<li>
|
||||
Total: <b>{deal.TotalPrice}</b>FIL; Per Block:{' '}
|
||||
<b>
|
||||
{Math.round((deal.TotalPrice / deal.Duration) * 100) / 100}
|
||||
</b>
|
||||
FIL; PerMbyteByteBlock:{' '}
|
||||
<b>
|
||||
{Math.round(
|
||||
(deal.TotalPrice / deal.Duration / (deal.Size / 1000000)) *
|
||||
100
|
||||
) / 100}
|
||||
</b>
|
||||
FIL
|
||||
</li>
|
||||
</ul>
|
||||
</li>
|
||||
</ul>
|
||||
</div>
|
||||
))
|
||||
|
||||
</div>)
|
||||
|
||||
return <Window title={"Client - Node " + this.props.node.ID} onClose={this.props.onClose} initialSize={{width: 600, height: 400}}>
|
||||
return (
|
||||
<Window
|
||||
title={'Client - Node ' + this.props.node.ID}
|
||||
onClose={this.props.onClose}
|
||||
initialSize={{ width: 600, height: 400 }}
|
||||
>
|
||||
<div className="Client">
|
||||
<div>{dealMaker}</div>
|
||||
<div>{deals}</div>
|
||||
</div>
|
||||
</Window>
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2,40 +2,36 @@ package retrievaladapter
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/ipfs/go-cid"
|
||||
blockstore "github.com/ipfs/go-ipfs-blockstore"
|
||||
"io"
|
||||
|
||||
"github.com/filecoin-project/go-address"
|
||||
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
|
||||
retrievaltoken "github.com/filecoin-project/go-fil-markets/shared/tokenamount"
|
||||
retrievaltypes "github.com/filecoin-project/go-fil-markets/shared/types"
|
||||
"github.com/filecoin-project/go-sectorbuilder"
|
||||
"github.com/filecoin-project/lotus/api"
|
||||
"github.com/filecoin-project/lotus/markets/utils"
|
||||
"github.com/filecoin-project/lotus/storage/sectorblocks"
|
||||
"github.com/filecoin-project/lotus/storage"
|
||||
)
|
||||
|
||||
type retrievalProviderNode struct {
|
||||
sectorBlocks *sectorblocks.SectorBlocks
|
||||
miner *storage.Miner
|
||||
sb sectorbuilder.Interface
|
||||
full api.FullNode
|
||||
}
|
||||
|
||||
// NewRetrievalProviderNode returns a new node adapter for a retrieval provider that talks to the
|
||||
// Lotus Node
|
||||
func NewRetrievalProviderNode(sectorBlocks *sectorblocks.SectorBlocks, full api.FullNode) retrievalmarket.RetrievalProviderNode {
|
||||
return &retrievalProviderNode{sectorBlocks, full}
|
||||
func NewRetrievalProviderNode(miner *storage.Miner, sb sectorbuilder.Interface, full api.FullNode) retrievalmarket.RetrievalProviderNode {
|
||||
return &retrievalProviderNode{miner, sb, full}
|
||||
}
|
||||
|
||||
func (rpn *retrievalProviderNode) GetPieceSize(pieceCid []byte) (uint64, error) {
|
||||
asCid, err := cid.Cast(pieceCid)
|
||||
func (rpn *retrievalProviderNode) UnsealSector(ctx context.Context, sectorID uint64, offset uint64, length uint64) (io.ReadCloser, error) {
|
||||
si, err := rpn.miner.GetSectorInfo(sectorID)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
return nil, err
|
||||
}
|
||||
return rpn.sectorBlocks.GetSize(asCid)
|
||||
}
|
||||
|
||||
func (rpn *retrievalProviderNode) SealedBlockstore(approveUnseal func() error) blockstore.Blockstore {
|
||||
return rpn.sectorBlocks.SealedBlockstore(approveUnseal)
|
||||
return rpn.sb.ReadPieceFromSealedSector(ctx, sectorID, offset, length, si.Ticket.TicketBytes, si.CommD)
|
||||
}
|
||||
|
||||
func (rpn *retrievalProviderNode) SavePaymentVoucher(ctx context.Context, paymentChannel address.Address, voucher *retrievaltypes.SignedVoucher, proof []byte, expectedAmount retrievaltoken.TokenAmount) (retrievaltoken.TokenAmount, error) {
|
||||
|
@ -221,6 +221,7 @@ func (c *ClientNodeAdapter) ValidatePublishedDeal(ctx context.Context, deal stor
|
||||
func (c *ClientNodeAdapter) OnDealSectorCommitted(ctx context.Context, provider address.Address, dealId uint64, cb storagemarket.DealSectorCommittedCallback) error {
|
||||
checkFunc := func(ts *types.TipSet) (done bool, more bool, err error) {
|
||||
sd, err := stmgr.GetStorageDeal(ctx, c.StateManager, dealId, ts)
|
||||
|
||||
if err != nil {
|
||||
// TODO: This may be fine for some errors
|
||||
return false, false, xerrors.Errorf("failed to look up deal on chain: %w", err)
|
||||
|
@ -5,10 +5,10 @@ package storageadapter
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"io"
|
||||
|
||||
"github.com/ipfs/go-cid"
|
||||
logging "github.com/ipfs/go-log"
|
||||
unixfile "github.com/ipfs/go-unixfs/file"
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"github.com/filecoin-project/go-address"
|
||||
@ -16,11 +16,13 @@ import (
|
||||
sharedtypes "github.com/filecoin-project/go-fil-markets/shared/types"
|
||||
"github.com/filecoin-project/go-fil-markets/storagemarket"
|
||||
"github.com/filecoin-project/lotus/api"
|
||||
"github.com/filecoin-project/lotus/build"
|
||||
"github.com/filecoin-project/lotus/chain/actors"
|
||||
"github.com/filecoin-project/lotus/chain/events"
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
"github.com/filecoin-project/lotus/lib/padreader"
|
||||
"github.com/filecoin-project/lotus/markets/utils"
|
||||
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
||||
"github.com/filecoin-project/lotus/storage/sealing"
|
||||
"github.com/filecoin-project/lotus/storage/sectorblocks"
|
||||
)
|
||||
|
||||
@ -33,6 +35,7 @@ type ProviderNodeAdapter struct {
|
||||
dag dtypes.StagingDAG
|
||||
|
||||
secb *sectorblocks.SectorBlocks
|
||||
ev *events.Events
|
||||
}
|
||||
|
||||
func NewProviderNodeAdapter(dag dtypes.StagingDAG, secb *sectorblocks.SectorBlocks, full api.FullNode) storagemarket.StorageProviderNode {
|
||||
@ -40,6 +43,7 @@ func NewProviderNodeAdapter(dag dtypes.StagingDAG, secb *sectorblocks.SectorBloc
|
||||
FullNode: full,
|
||||
dag: dag,
|
||||
secb: secb,
|
||||
ev: events.NewEvents(context.TODO(), full),
|
||||
}
|
||||
}
|
||||
|
||||
@ -94,35 +98,9 @@ func (n *ProviderNodeAdapter) PublishDeals(ctx context.Context, deal storagemark
|
||||
return storagemarket.DealID(resp.DealIDs[0]), smsg.Cid(), nil
|
||||
}
|
||||
|
||||
func (n *ProviderNodeAdapter) OnDealComplete(ctx context.Context, deal storagemarket.MinerDeal, piecePath string) (uint64, error) {
|
||||
root, err := n.dag.Get(ctx, deal.Ref)
|
||||
if err != nil {
|
||||
return 0, xerrors.Errorf("failed to get file root for deal: %s", err)
|
||||
}
|
||||
func (n *ProviderNodeAdapter) OnDealComplete(ctx context.Context, deal storagemarket.MinerDeal, pieceSize uint64, pieceData io.Reader) (uint64, error) {
|
||||
|
||||
// TODO: abstract this away into ReadSizeCloser + implement different modes
|
||||
node, err := unixfile.NewUnixfsFile(ctx, n.dag, root)
|
||||
if err != nil {
|
||||
return 0, xerrors.Errorf("cannot open unixfs file: %s", err)
|
||||
}
|
||||
|
||||
uf, ok := node.(sectorblocks.UnixfsReader)
|
||||
if !ok {
|
||||
// we probably got directory, unsupported for now
|
||||
return 0, xerrors.Errorf("unsupported unixfs file type")
|
||||
}
|
||||
|
||||
// TODO: uf.Size() is user input, not trusted
|
||||
// This won't be useful / here after we migrate to putting CARs into sectors
|
||||
size, err := uf.Size()
|
||||
if err != nil {
|
||||
return 0, xerrors.Errorf("getting unixfs file size: %w", err)
|
||||
}
|
||||
if padreader.PaddedSize(uint64(size)) != deal.Proposal.PieceSize {
|
||||
return 0, xerrors.Errorf("deal.Proposal.PieceSize didn't match padded unixfs file size")
|
||||
}
|
||||
|
||||
sectorID, err := n.secb.AddUnixfsPiece(ctx, uf, deal.DealID)
|
||||
sectorID, err := n.secb.AddPiece(ctx, pieceSize, pieceData, deal.DealID)
|
||||
if err != nil {
|
||||
return 0, xerrors.Errorf("AddPiece failed: %s", err)
|
||||
}
|
||||
@ -206,4 +184,117 @@ func (n *ProviderNodeAdapter) GetBalance(ctx context.Context, addr address.Addre
|
||||
return utils.ToSharedBalance(bal), nil
|
||||
}
|
||||
|
||||
func (n *ProviderNodeAdapter) LocatePieceForDealWithinSector(ctx context.Context, dealID uint64) (sectorID uint64, offset uint64, length uint64, err error) {
|
||||
|
||||
refs, err := n.secb.GetRefs(dealID)
|
||||
if err != nil {
|
||||
return 0, 0, 0, err
|
||||
}
|
||||
if len(refs) == 0 {
|
||||
return 0, 0, 0, xerrors.New("no sector information for deal ID")
|
||||
}
|
||||
|
||||
// TODO: better strategy (e.g. look for already unsealed)
|
||||
var best api.SealedRef
|
||||
var bestSi sealing.SectorInfo
|
||||
for _, r := range refs {
|
||||
si, err := n.secb.Miner.GetSectorInfo(r.SectorID)
|
||||
if err != nil {
|
||||
return 0, 0, 0, xerrors.Errorf("getting sector info: %w", err)
|
||||
}
|
||||
if si.State == api.Proving {
|
||||
best = r
|
||||
bestSi = si
|
||||
break
|
||||
}
|
||||
}
|
||||
if bestSi.State == api.UndefinedSectorState {
|
||||
return 0, 0, 0, xerrors.New("no sealed sector found")
|
||||
}
|
||||
return best.SectorID, best.Offset, best.Size, nil
|
||||
}
|
||||
|
||||
func (n *ProviderNodeAdapter) OnDealSectorCommitted(ctx context.Context, provider address.Address, dealID uint64, cb storagemarket.DealSectorCommittedCallback) error {
|
||||
checkFunc := func(ts *types.TipSet) (done bool, more bool, err error) {
|
||||
sd, err := n.StateMarketStorageDeal(ctx, dealID, ts)
|
||||
|
||||
if err != nil {
|
||||
// TODO: This may be fine for some errors
|
||||
return false, false, xerrors.Errorf("failed to look up deal on chain: %w", err)
|
||||
}
|
||||
|
||||
if sd.ActivationEpoch > 0 {
|
||||
cb(nil)
|
||||
return true, false, nil
|
||||
}
|
||||
|
||||
return false, true, nil
|
||||
}
|
||||
|
||||
called := func(msg *types.Message, rec *types.MessageReceipt, ts *types.TipSet, curH uint64) (more bool, err error) {
|
||||
defer func() {
|
||||
if err != nil {
|
||||
cb(xerrors.Errorf("handling applied event: %w", err))
|
||||
}
|
||||
}()
|
||||
|
||||
if msg == nil {
|
||||
log.Error("timed out waiting for deal activation... what now?")
|
||||
return false, nil
|
||||
}
|
||||
|
||||
sd, err := n.StateMarketStorageDeal(ctx, dealID, ts)
|
||||
if err != nil {
|
||||
return false, xerrors.Errorf("failed to look up deal on chain: %w", err)
|
||||
}
|
||||
|
||||
if sd.ActivationEpoch == 0 {
|
||||
return false, xerrors.Errorf("deal wasn't active: deal=%d, parentState=%s, h=%d", dealID, ts.ParentState(), ts.Height())
|
||||
}
|
||||
|
||||
log.Infof("Storage deal %d activated at epoch %d", dealID, sd.ActivationEpoch)
|
||||
|
||||
cb(nil)
|
||||
|
||||
return false, nil
|
||||
}
|
||||
|
||||
revert := func(ctx context.Context, ts *types.TipSet) error {
|
||||
log.Warn("deal activation reverted; TODO: actually handle this!")
|
||||
// TODO: Just go back to DealSealing?
|
||||
return nil
|
||||
}
|
||||
|
||||
matchEvent := func(msg *types.Message) (bool, error) {
|
||||
if msg.To != provider {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
if msg.Method != actors.MAMethods.ProveCommitSector {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
var params actors.SectorProveCommitInfo
|
||||
if err := params.UnmarshalCBOR(bytes.NewReader(msg.Params)); err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
var found bool
|
||||
for _, did := range params.DealIDs {
|
||||
if did == dealID {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
return found, nil
|
||||
}
|
||||
|
||||
if err := n.ev.Called(checkFunc, called, revert, 3, build.SealRandomnessLookbackLimit, matchEvent); err != nil {
|
||||
return xerrors.Errorf("failed to set up called handler")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
var _ storagemarket.StorageProviderNode = &ProviderNodeAdapter{}
|
||||
|
@ -259,6 +259,7 @@ func Online() Option {
|
||||
Override(new(dtypes.ProviderDealStore), modules.NewProviderDealStore),
|
||||
Override(new(dtypes.ProviderDataTransfer), modules.NewProviderDAGServiceDataTransfer),
|
||||
Override(new(*deals.ProviderRequestValidator), modules.NewProviderRequestValidator),
|
||||
Override(new(dtypes.ProviderPieceStore), modules.NewProviderPieceStore),
|
||||
Override(new(storagemarket.StorageProvider), modules.StorageProvider),
|
||||
Override(new(storagemarket.StorageProviderNode), storageadapter.NewProviderNodeAdapter),
|
||||
Override(RegisterProviderValidatorKey, modules.RegisterProviderValidator),
|
||||
|
@ -1,7 +1,6 @@
|
||||
package client
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"io"
|
||||
@ -155,7 +154,7 @@ func (a *API) ClientFindData(ctx context.Context, root cid.Cid) ([]api.QueryOffe
|
||||
|
||||
out := make([]api.QueryOffer, len(peers))
|
||||
for k, p := range peers {
|
||||
queryResponse, err := a.Retrieval.Query(ctx, p, root.Bytes(), retrievalmarket.QueryParams{})
|
||||
queryResponse, err := a.Retrieval.Query(ctx, p, root, retrievalmarket.QueryParams{})
|
||||
if err != nil {
|
||||
out[k] = api.QueryOffer{Err: err.Error(), Miner: p.Address, MinerPeerID: p.ID}
|
||||
} else {
|
||||
@ -163,6 +162,8 @@ func (a *API) ClientFindData(ctx context.Context, root cid.Cid) ([]api.QueryOffe
|
||||
Root: root,
|
||||
Size: queryResponse.Size,
|
||||
MinPrice: utils.FromSharedTokenAmount(queryResponse.PieceRetrievalPrice()),
|
||||
PaymentInterval: queryResponse.MaxPaymentInterval,
|
||||
PaymentIntervalIncrease: queryResponse.MaxPaymentIntervalIncrease,
|
||||
Miner: p.Address, // TODO: check
|
||||
MinerPeerID: p.ID,
|
||||
}
|
||||
@ -279,10 +280,10 @@ func (a *API) ClientRetrieve(ctx context.Context, order api.RetrievalOrder, path
|
||||
retrievalResult := make(chan error, 1)
|
||||
|
||||
unsubscribe := a.Retrieval.SubscribeToEvents(func(event retrievalmarket.ClientEvent, state retrievalmarket.ClientDealState) {
|
||||
if bytes.Equal(state.PieceCID, order.Root.Bytes()) {
|
||||
if state.PayloadCID.Equals(order.Root) {
|
||||
switch event {
|
||||
case retrievalmarket.ClientEventError:
|
||||
retrievalResult <- xerrors.New("Retrieval Error")
|
||||
retrievalResult <- xerrors.Errorf("Retrieval Error: %s", state.Message)
|
||||
case retrievalmarket.ClientEventComplete:
|
||||
retrievalResult <- nil
|
||||
}
|
||||
@ -291,8 +292,8 @@ func (a *API) ClientRetrieve(ctx context.Context, order api.RetrievalOrder, path
|
||||
|
||||
a.Retrieval.Retrieve(
|
||||
ctx,
|
||||
order.Root.Bytes(),
|
||||
retrievalmarket.NewParamsV0(types.BigDiv(order.Total, types.NewInt(order.Size)).Int, 0, 0),
|
||||
order.Root,
|
||||
retrievalmarket.NewParamsV0(types.BigDiv(order.Total, types.NewInt(order.Size)).Int, order.PaymentInterval, order.PaymentIntervalIncrease),
|
||||
utils.ToSharedTokenAmount(order.Total),
|
||||
order.MinerPeerID,
|
||||
order.Client,
|
||||
@ -302,7 +303,7 @@ func (a *API) ClientRetrieve(ctx context.Context, order api.RetrievalOrder, path
|
||||
return xerrors.New("Retrieval Timed Out")
|
||||
case err := <-retrievalResult:
|
||||
if err != nil {
|
||||
return xerrors.Errorf("RetrieveUnixfs: %w", err)
|
||||
return xerrors.Errorf("Retrieve: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -15,6 +15,7 @@ import (
|
||||
"github.com/filecoin-project/go-address"
|
||||
"github.com/filecoin-project/go-sectorbuilder"
|
||||
"github.com/filecoin-project/go-sectorbuilder/fs"
|
||||
|
||||
"github.com/filecoin-project/lotus/api"
|
||||
"github.com/filecoin-project/lotus/api/apistruct"
|
||||
"github.com/filecoin-project/lotus/lib/tarutil"
|
||||
@ -226,7 +227,7 @@ func (sm *StorageMinerAPI) SectorsRefs(context.Context) (map[string][]api.Sealed
|
||||
}
|
||||
|
||||
for k, v := range refs {
|
||||
out[k.String()] = v
|
||||
out[strconv.FormatUint(k, 10)] = v
|
||||
}
|
||||
|
||||
return out, nil
|
||||
|
@ -5,10 +5,12 @@ import (
|
||||
"path/filepath"
|
||||
"reflect"
|
||||
|
||||
"github.com/filecoin-project/go-data-transfer/impl/graphsync"
|
||||
graphsyncimpl "github.com/filecoin-project/go-data-transfer/impl/graphsync"
|
||||
piecefilestore "github.com/filecoin-project/go-fil-markets/filestore"
|
||||
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
|
||||
"github.com/filecoin-project/go-fil-markets/retrievalmarket/discovery"
|
||||
retrievalimpl "github.com/filecoin-project/go-fil-markets/retrievalmarket/impl"
|
||||
rmnet "github.com/filecoin-project/go-fil-markets/retrievalmarket/network"
|
||||
"github.com/filecoin-project/go-fil-markets/storagemarket"
|
||||
deals "github.com/filecoin-project/go-fil-markets/storagemarket/impl"
|
||||
storageimpl "github.com/filecoin-project/go-fil-markets/storagemarket/impl"
|
||||
@ -109,12 +111,17 @@ func NewClientRequestValidator(deals dtypes.ClientDealStore) *storageimpl.Client
|
||||
return storageimpl.NewClientRequestValidator(deals)
|
||||
}
|
||||
|
||||
func StorageClient(h host.Host, dag dtypes.ClientDAG, dataTransfer dtypes.ClientDataTransfer, discovery *discovery.Local, deals dtypes.ClientDealStore, scn storagemarket.StorageClientNode) storagemarket.StorageClient {
|
||||
return storageimpl.NewClient(h, dag, dataTransfer, discovery, deals, scn)
|
||||
func StorageClient(h host.Host, ibs dtypes.ClientBlockstore, r repo.LockedRepo, dataTransfer dtypes.ClientDataTransfer, discovery *discovery.Local, deals dtypes.ClientDealStore, scn storagemarket.StorageClientNode) (storagemarket.StorageClient, error) {
|
||||
store, err := piecefilestore.NewLocalFileStore(piecefilestore.OsPath(r.Path()))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return storageimpl.NewClient(h, ibs, store, dataTransfer, discovery, deals, scn), nil
|
||||
}
|
||||
|
||||
// RetrievalClient creates a new retrieval client attached to the client blockstore
|
||||
func RetrievalClient(h host.Host, bs dtypes.ClientBlockstore, pmgr *paych.Manager, payapi payapi.PaychAPI) retrievalmarket.RetrievalClient {
|
||||
func RetrievalClient(h host.Host, bs dtypes.ClientBlockstore, pmgr *paych.Manager, payapi payapi.PaychAPI, resolver retrievalmarket.PeerResolver) retrievalmarket.RetrievalClient {
|
||||
adapter := retrievaladapter.NewRetrievalClientNode(pmgr, payapi)
|
||||
return retrievalimpl.NewClient(h, bs, adapter)
|
||||
network := rmnet.NewFromLibp2pHost(h)
|
||||
return retrievalimpl.NewClient(network, bs, adapter, resolver)
|
||||
}
|
||||
|
@ -1,6 +1,8 @@
|
||||
package dtypes
|
||||
|
||||
import (
|
||||
datatransfer "github.com/filecoin-project/go-data-transfer"
|
||||
"github.com/filecoin-project/go-fil-markets/piecestore"
|
||||
bserv "github.com/ipfs/go-blockservice"
|
||||
"github.com/ipfs/go-datastore"
|
||||
"github.com/ipfs/go-filestore"
|
||||
@ -9,7 +11,6 @@ import (
|
||||
exchange "github.com/ipfs/go-ipfs-exchange-interface"
|
||||
ipld "github.com/ipfs/go-ipld-format"
|
||||
|
||||
"github.com/filecoin-project/go-data-transfer"
|
||||
"github.com/filecoin-project/go-statestore"
|
||||
)
|
||||
|
||||
@ -34,6 +35,7 @@ type ClientDealStore *statestore.StateStore
|
||||
type ClientDataTransfer datatransfer.Manager
|
||||
|
||||
type ProviderDealStore *statestore.StateStore
|
||||
type ProviderPieceStore piecestore.PieceStore
|
||||
|
||||
// ProviderDataTransfer is a data transfer manager for the provider
|
||||
type ProviderDataTransfer datatransfer.Manager
|
||||
|
@ -7,8 +7,11 @@ import (
|
||||
|
||||
"github.com/filecoin-project/go-address"
|
||||
dtgraphsync "github.com/filecoin-project/go-data-transfer/impl/graphsync"
|
||||
piecefilestore "github.com/filecoin-project/go-fil-markets/filestore"
|
||||
"github.com/filecoin-project/go-fil-markets/piecestore"
|
||||
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
|
||||
retrievalimpl "github.com/filecoin-project/go-fil-markets/retrievalmarket/impl"
|
||||
rmnet "github.com/filecoin-project/go-fil-markets/retrievalmarket/network"
|
||||
"github.com/filecoin-project/go-fil-markets/storagemarket"
|
||||
deals "github.com/filecoin-project/go-fil-markets/storagemarket/impl"
|
||||
storageimpl "github.com/filecoin-project/go-fil-markets/storagemarket/impl"
|
||||
@ -43,7 +46,6 @@ import (
|
||||
"github.com/filecoin-project/lotus/node/repo"
|
||||
"github.com/filecoin-project/lotus/storage"
|
||||
"github.com/filecoin-project/lotus/storage/sealing"
|
||||
"github.com/filecoin-project/lotus/storage/sectorblocks"
|
||||
)
|
||||
|
||||
func minerAddrFromDS(ds dtypes.MetadataDS) (address.Address, error) {
|
||||
@ -135,7 +137,11 @@ func StorageMiner(mctx helpers.MetricsCtx, lc fx.Lifecycle, api api.FullNode, h
|
||||
func HandleRetrieval(host host.Host, lc fx.Lifecycle, m retrievalmarket.RetrievalProvider) {
|
||||
lc.Append(fx.Hook{
|
||||
OnStart: func(context.Context) error {
|
||||
m.Start(host)
|
||||
m.Start()
|
||||
return nil
|
||||
},
|
||||
OnStop: func(context.Context) error {
|
||||
m.Stop()
|
||||
return nil
|
||||
},
|
||||
})
|
||||
@ -176,6 +182,12 @@ func NewProviderDealStore(ds dtypes.MetadataDS) dtypes.ProviderDealStore {
|
||||
return statestore.New(namespace.Wrap(ds, datastore.NewKey("/deals/client")))
|
||||
}
|
||||
|
||||
// NewProviderPieceStore creates a statestore for storing metadata about pieces
|
||||
// shared by the storage and retrieval providers
|
||||
func NewProviderPieceStore(ds dtypes.MetadataDS) dtypes.ProviderPieceStore {
|
||||
return piecestore.NewPieceStore(ds)
|
||||
}
|
||||
|
||||
// StagingBlockstore creates a blockstore for staging blocks for a miner
|
||||
// in a storage deal, prior to sealing
|
||||
func StagingBlockstore(r repo.LockedRepo) (dtypes.StagingBlockstore, error) {
|
||||
@ -280,12 +292,21 @@ func NewProviderRequestValidator(deals dtypes.ProviderDealStore) *storageimpl.Pr
|
||||
return storageimpl.NewProviderRequestValidator(deals)
|
||||
}
|
||||
|
||||
func StorageProvider(ds dtypes.MetadataDS, dag dtypes.StagingDAG, dataTransfer dtypes.ProviderDataTransfer, spn storagemarket.StorageProviderNode) (storagemarket.StorageProvider, error) {
|
||||
return storageimpl.NewProvider(ds, dag, dataTransfer, spn)
|
||||
func StorageProvider(ds dtypes.MetadataDS, ibs dtypes.StagingBlockstore, r repo.LockedRepo, pieceStore dtypes.ProviderPieceStore, dataTransfer dtypes.ProviderDataTransfer, spn storagemarket.StorageProviderNode) (storagemarket.StorageProvider, error) {
|
||||
store, err := piecefilestore.NewLocalFileStore(piecefilestore.OsPath(r.Path()))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return storageimpl.NewProvider(ds, ibs, store, pieceStore, dataTransfer, spn)
|
||||
}
|
||||
|
||||
// RetrievalProvider creates a new retrieval provider attached to the provider blockstore
|
||||
func RetrievalProvider(sblks *sectorblocks.SectorBlocks, full api.FullNode) retrievalmarket.RetrievalProvider {
|
||||
adapter := retrievaladapter.NewRetrievalProviderNode(sblks, full)
|
||||
return retrievalimpl.NewProvider(adapter)
|
||||
func RetrievalProvider(h host.Host, miner *storage.Miner, sb sectorbuilder.Interface, full api.FullNode, ds dtypes.MetadataDS, pieceStore dtypes.ProviderPieceStore, ibs dtypes.StagingBlockstore) (retrievalmarket.RetrievalProvider, error) {
|
||||
adapter := retrievaladapter.NewRetrievalProviderNode(miner, sb, full)
|
||||
address, err := minerAddrFromDS(ds)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
network := rmnet.NewFromLibp2pHost(h)
|
||||
return retrievalimpl.NewProvider(address, adapter, network, pieceStore, ibs), nil
|
||||
}
|
||||
|
@ -3,23 +3,19 @@ package sectorblocks
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/binary"
|
||||
"errors"
|
||||
"io"
|
||||
"sync"
|
||||
|
||||
sectorbuilder "github.com/filecoin-project/go-sectorbuilder"
|
||||
"github.com/ipfs/go-cid"
|
||||
"github.com/ipfs/go-datastore"
|
||||
"github.com/ipfs/go-datastore/namespace"
|
||||
"github.com/ipfs/go-datastore/query"
|
||||
blockstore "github.com/ipfs/go-ipfs-blockstore"
|
||||
dshelp "github.com/ipfs/go-ipfs-ds-help"
|
||||
files "github.com/ipfs/go-ipfs-files"
|
||||
ipld "github.com/ipfs/go-ipld-format"
|
||||
"github.com/ipfs/go-unixfs"
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"github.com/filecoin-project/go-cbor-util"
|
||||
cborutil "github.com/filecoin-project/go-cbor-util"
|
||||
"github.com/filecoin-project/lotus/api"
|
||||
"github.com/filecoin-project/lotus/lib/padreader"
|
||||
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
||||
@ -33,16 +29,28 @@ const (
|
||||
)
|
||||
|
||||
var dsPrefix = datastore.NewKey("/sealedblocks")
|
||||
var imBlocksPrefix = datastore.NewKey("/intermediate")
|
||||
|
||||
var ErrNotFound = errors.New("not found")
|
||||
|
||||
func DealIDToDsKey(dealID uint64) datastore.Key {
|
||||
buf := make([]byte, binary.MaxVarintLen64)
|
||||
size := binary.PutUvarint(buf, dealID)
|
||||
return dshelp.NewKeyFromBinary(buf[:size])
|
||||
}
|
||||
|
||||
func DsKeyToDealID(key datastore.Key) (uint64, error) {
|
||||
buf, err := dshelp.BinaryFromDsKey(key)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
dealID, _ := binary.Uvarint(buf)
|
||||
return dealID, nil
|
||||
}
|
||||
|
||||
type SectorBlocks struct {
|
||||
*storage.Miner
|
||||
sb sectorbuilder.Interface
|
||||
|
||||
intermediate blockstore.Blockstore // holds intermediate nodes TODO: consider combining with the staging blockstore
|
||||
|
||||
keys datastore.Batching
|
||||
keyLk sync.Mutex
|
||||
}
|
||||
@ -51,36 +59,17 @@ func NewSectorBlocks(miner *storage.Miner, ds dtypes.MetadataDS, sb sectorbuilde
|
||||
sbc := &SectorBlocks{
|
||||
Miner: miner,
|
||||
sb: sb,
|
||||
|
||||
intermediate: blockstore.NewBlockstore(namespace.Wrap(ds, imBlocksPrefix)),
|
||||
|
||||
keys: namespace.Wrap(ds, dsPrefix),
|
||||
}
|
||||
|
||||
return sbc
|
||||
}
|
||||
|
||||
type UnixfsReader interface {
|
||||
files.File
|
||||
|
||||
// ReadBlock reads data from a single unixfs block. Data is nil
|
||||
// for intermediate nodes
|
||||
ReadBlock(context.Context) (data []byte, offset uint64, nd ipld.Node, err error)
|
||||
}
|
||||
|
||||
type refStorer struct {
|
||||
blockReader UnixfsReader
|
||||
writeRef func(cid cid.Cid, offset uint64, size uint64) error
|
||||
intermediate blockstore.Blockstore
|
||||
|
||||
remaining []byte
|
||||
}
|
||||
|
||||
func (st *SectorBlocks) writeRef(cid cid.Cid, sectorID uint64, offset uint64, size uint64) error {
|
||||
func (st *SectorBlocks) writeRef(dealID uint64, sectorID uint64, offset uint64, size uint64) error {
|
||||
st.keyLk.Lock() // TODO: make this multithreaded
|
||||
defer st.keyLk.Unlock()
|
||||
|
||||
v, err := st.keys.Get(dshelp.CidToDsKey(cid))
|
||||
v, err := st.keys.Get(DealIDToDsKey(dealID))
|
||||
if err == datastore.ErrNotFound {
|
||||
err = nil
|
||||
}
|
||||
@ -105,80 +94,25 @@ func (st *SectorBlocks) writeRef(cid cid.Cid, sectorID uint64, offset uint64, si
|
||||
if err != nil {
|
||||
return xerrors.Errorf("serializing refs: %w", err)
|
||||
}
|
||||
return st.keys.Put(dshelp.CidToDsKey(cid), newRef) // TODO: batch somehow
|
||||
return st.keys.Put(DealIDToDsKey(dealID), newRef) // TODO: batch somehow
|
||||
}
|
||||
|
||||
func (r *refStorer) Read(p []byte) (n int, err error) {
|
||||
offset := 0
|
||||
if len(r.remaining) > 0 {
|
||||
offset += len(r.remaining)
|
||||
read := copy(p, r.remaining)
|
||||
if read == len(r.remaining) {
|
||||
r.remaining = nil
|
||||
} else {
|
||||
r.remaining = r.remaining[read:]
|
||||
}
|
||||
return read, nil
|
||||
}
|
||||
|
||||
for {
|
||||
data, offset, nd, err := r.blockReader.ReadBlock(context.TODO())
|
||||
if err != nil {
|
||||
if err == io.EOF {
|
||||
return 0, io.EOF
|
||||
}
|
||||
return 0, xerrors.Errorf("reading block: %w", err)
|
||||
}
|
||||
|
||||
if len(data) == 0 {
|
||||
// TODO: batch
|
||||
// TODO: GC
|
||||
if err := r.intermediate.Put(nd); err != nil {
|
||||
return 0, xerrors.Errorf("storing intermediate node: %w", err)
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
if err := r.writeRef(nd.Cid(), offset, uint64(len(data))); err != nil {
|
||||
return 0, xerrors.Errorf("writing ref: %w", err)
|
||||
}
|
||||
|
||||
read := copy(p, data)
|
||||
if read < len(data) {
|
||||
r.remaining = data[read:]
|
||||
}
|
||||
// TODO: read multiple
|
||||
return read, nil
|
||||
}
|
||||
}
|
||||
|
||||
func (st *SectorBlocks) AddUnixfsPiece(ctx context.Context, r UnixfsReader, dealID uint64) (sectorID uint64, err error) {
|
||||
size, err := r.Size()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
func (st *SectorBlocks) AddPiece(ctx context.Context, size uint64, r io.Reader, dealID uint64) (sectorID uint64, err error) {
|
||||
|
||||
sectorID, pieceOffset, err := st.Miner.AllocatePiece(padreader.PaddedSize(uint64(size)))
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
refst := &refStorer{
|
||||
blockReader: r,
|
||||
writeRef: func(cid cid.Cid, offset uint64, size uint64) error {
|
||||
offset += pieceOffset
|
||||
|
||||
return st.writeRef(cid, sectorID, offset, size)
|
||||
},
|
||||
intermediate: st.intermediate,
|
||||
err = st.writeRef(dealID, sectorID, pieceOffset, size)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
pr, psize := padreader.New(refst, uint64(size))
|
||||
|
||||
return sectorID, st.Miner.SealPiece(ctx, psize, pr, sectorID, dealID)
|
||||
return sectorID, st.Miner.SealPiece(ctx, size, r, sectorID, dealID)
|
||||
}
|
||||
|
||||
func (st *SectorBlocks) List() (map[cid.Cid][]api.SealedRef, error) {
|
||||
func (st *SectorBlocks) List() (map[uint64][]api.SealedRef, error) {
|
||||
res, err := st.keys.Query(query.Query{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -189,9 +123,9 @@ func (st *SectorBlocks) List() (map[cid.Cid][]api.SealedRef, error) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
out := map[cid.Cid][]api.SealedRef{}
|
||||
out := map[uint64][]api.SealedRef{}
|
||||
for _, ent := range ents {
|
||||
refCid, err := dshelp.DsKeyToCid(datastore.RawKey(ent.Key))
|
||||
dealID, err := DsKeyToDealID(datastore.RawKey(ent.Key))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -201,14 +135,14 @@ func (st *SectorBlocks) List() (map[cid.Cid][]api.SealedRef, error) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
out[refCid] = refs.Refs
|
||||
out[dealID] = refs.Refs
|
||||
}
|
||||
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (st *SectorBlocks) GetRefs(k cid.Cid) ([]api.SealedRef, error) { // TODO: track local sectors
|
||||
ent, err := st.keys.Get(dshelp.CidToDsKey(k))
|
||||
func (st *SectorBlocks) GetRefs(dealID uint64) ([]api.SealedRef, error) { // TODO: track local sectors
|
||||
ent, err := st.keys.Get(DealIDToDsKey(dealID))
|
||||
if err == datastore.ErrNotFound {
|
||||
err = ErrNotFound
|
||||
}
|
||||
@ -224,42 +158,16 @@ func (st *SectorBlocks) GetRefs(k cid.Cid) ([]api.SealedRef, error) { // TODO: t
|
||||
return refs.Refs, nil
|
||||
}
|
||||
|
||||
func (st *SectorBlocks) GetSize(k cid.Cid) (uint64, error) {
|
||||
blk, err := st.intermediate.Get(k)
|
||||
if err == blockstore.ErrNotFound {
|
||||
refs, err := st.GetRefs(k)
|
||||
func (st *SectorBlocks) GetSize(dealID uint64) (uint64, error) {
|
||||
refs, err := st.GetRefs(dealID)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
return uint64(refs[0].Size), nil
|
||||
}
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
nd, err := ipld.Decode(blk)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
fsn, err := unixfs.ExtractFSNode(nd)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
return fsn.FileSize(), nil
|
||||
}
|
||||
|
||||
func (st *SectorBlocks) Has(k cid.Cid) (bool, error) {
|
||||
func (st *SectorBlocks) Has(dealID uint64) (bool, error) {
|
||||
// TODO: ensure sector is still there
|
||||
return st.keys.Has(dshelp.CidToDsKey(k))
|
||||
}
|
||||
|
||||
func (st *SectorBlocks) SealedBlockstore(approveUnseal func() error) *SectorBlockStore {
|
||||
return &SectorBlockStore{
|
||||
intermediate: st.intermediate,
|
||||
sectorBlocks: st,
|
||||
approveUnseal: approveUnseal,
|
||||
}
|
||||
return st.keys.Has(DealIDToDsKey(dealID))
|
||||
}
|
||||
|
@ -1,127 +0,0 @@
|
||||
package sectorblocks
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io/ioutil"
|
||||
|
||||
blocks "github.com/ipfs/go-block-format"
|
||||
"github.com/ipfs/go-cid"
|
||||
blockstore "github.com/ipfs/go-ipfs-blockstore"
|
||||
logging "github.com/ipfs/go-log/v2"
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"github.com/filecoin-project/lotus/api"
|
||||
"github.com/filecoin-project/lotus/storage/sealing"
|
||||
)
|
||||
|
||||
var log = logging.Logger("sectorblocks")
|
||||
|
||||
type SectorBlockStore struct {
|
||||
intermediate blockstore.Blockstore
|
||||
sectorBlocks *SectorBlocks
|
||||
|
||||
approveUnseal func() error
|
||||
}
|
||||
|
||||
func (s *SectorBlockStore) DeleteBlock(cid.Cid) error {
|
||||
panic("not supported")
|
||||
}
|
||||
func (s *SectorBlockStore) GetSize(cid.Cid) (int, error) {
|
||||
panic("not supported")
|
||||
}
|
||||
|
||||
func (s *SectorBlockStore) Put(blocks.Block) error {
|
||||
panic("not supported")
|
||||
}
|
||||
|
||||
func (s *SectorBlockStore) PutMany([]blocks.Block) error {
|
||||
panic("not supported")
|
||||
}
|
||||
|
||||
func (s *SectorBlockStore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
|
||||
panic("not supported")
|
||||
}
|
||||
|
||||
func (s *SectorBlockStore) HashOnRead(enabled bool) {
|
||||
panic("not supported")
|
||||
}
|
||||
|
||||
func (s *SectorBlockStore) Has(c cid.Cid) (bool, error) {
|
||||
has, err := s.intermediate.Has(c)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
if has {
|
||||
return true, nil
|
||||
}
|
||||
|
||||
return s.sectorBlocks.Has(c)
|
||||
}
|
||||
|
||||
func (s *SectorBlockStore) Get(c cid.Cid) (blocks.Block, error) {
|
||||
val, err := s.intermediate.Get(c)
|
||||
if err == nil {
|
||||
return val, nil
|
||||
}
|
||||
if err != blockstore.ErrNotFound {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
refs, err := s.sectorBlocks.GetRefs(c)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(refs) == 0 {
|
||||
return nil, blockstore.ErrNotFound
|
||||
}
|
||||
|
||||
// TODO: better strategy (e.g. look for already unsealed)
|
||||
var best api.SealedRef
|
||||
var bestSi sealing.SectorInfo
|
||||
for _, r := range refs {
|
||||
si, err := s.sectorBlocks.Miner.GetSectorInfo(r.SectorID)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("getting sector info: %w", err)
|
||||
}
|
||||
if si.State == api.Proving {
|
||||
best = r
|
||||
bestSi = si
|
||||
break
|
||||
}
|
||||
}
|
||||
if bestSi.State == api.UndefinedSectorState {
|
||||
return nil, xerrors.New("no sealed sector found")
|
||||
}
|
||||
|
||||
log.Infof("reading block %s from sector %d(+%d;%d)", c, best.SectorID, best.Offset, best.Size)
|
||||
|
||||
r, err := s.sectorBlocks.sb.ReadPieceFromSealedSector(
|
||||
context.TODO(),
|
||||
best.SectorID,
|
||||
best.Offset,
|
||||
best.Size,
|
||||
bestSi.Ticket.TicketBytes,
|
||||
bestSi.CommD,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("unsealing block: %w", err)
|
||||
}
|
||||
defer r.Close()
|
||||
|
||||
data, err := ioutil.ReadAll(r)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("reading block data: %w", err)
|
||||
}
|
||||
if uint64(len(data)) != best.Size {
|
||||
return nil, xerrors.Errorf("got wrong amount of data: %d != !d", len(data), best.Size)
|
||||
}
|
||||
|
||||
b, err := blocks.NewBlockWithCid(data, c)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("sbs get (%d[%d:%d]): %w", best.SectorID, best.Offset, best.Offset+best.Size, err)
|
||||
}
|
||||
|
||||
return b, nil
|
||||
}
|
||||
|
||||
var _ blockstore.Blockstore = &SectorBlockStore{}
|
Loading…
Reference in New Issue
Block a user