feat(markets): update to support car files

Upgrades lotus version of go-fil-markets to its head, which supports car files and handles tracking
of pieces, reducing the size of sectorBlocks
This commit is contained in:
hannahhoward 2020-01-24 12:19:52 -08:00
parent eb4b85aea5
commit 71289b58ff
13 changed files with 238 additions and 329 deletions

6
go.mod
View File

@ -18,7 +18,7 @@ require (
github.com/filecoin-project/go-cbor-util v0.0.0-20191219014500-08c40a1e63a2
github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03
github.com/filecoin-project/go-data-transfer v0.0.0-20191219005021-4accf56bd2ce
github.com/filecoin-project/go-fil-markets v0.0.0-20200114015428-74d100f305f8
github.com/filecoin-project/go-fil-markets v0.0.0-20200124174839-9211de075b61
github.com/filecoin-project/go-paramfetch v0.0.1
github.com/filecoin-project/go-sectorbuilder v0.0.2-0.20200123143044-d9cc96c53c55
github.com/filecoin-project/go-statestore v0.1.0
@ -32,7 +32,7 @@ require (
github.com/ipfs/go-bitswap v0.1.8
github.com/ipfs/go-block-format v0.0.2
github.com/ipfs/go-blockservice v0.1.3-0.20190908200855-f22eea50656c
github.com/ipfs/go-car v0.0.3-0.20200121013634-f188c0e24291
github.com/ipfs/go-car v0.0.3-0.20200124090545-1a340009d896
github.com/ipfs/go-cid v0.0.4
github.com/ipfs/go-datastore v0.3.1
github.com/ipfs/go-ds-badger2 v0.0.0-20200123200730-d75eb2678a5d
@ -83,7 +83,7 @@ require (
github.com/multiformats/go-varint v0.0.2
github.com/opentracing/opentracing-go v1.1.0
github.com/polydawn/refmt v0.0.0-20190809202753-05966cbd336a
github.com/prometheus/common v0.2.0
github.com/prometheus/common v0.4.0
github.com/stretchr/testify v1.4.0
github.com/whyrusleeping/bencher v0.0.0-20190829221104-bb6607aa8bba
github.com/whyrusleeping/cbor-gen v0.0.0-20200121162646-b63bacf5eaf8

23
go.sum
View File

@ -111,8 +111,8 @@ github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03 h1:2pMX
github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03/go.mod h1:+viYnvGtUTgJRdy6oaeF4MTFKAfatX071MPDPBL11EQ=
github.com/filecoin-project/go-data-transfer v0.0.0-20191219005021-4accf56bd2ce h1:Jdejrx6XVSTRy2PiX08HCU5y68p3wx2hNMJJc/J7kZY=
github.com/filecoin-project/go-data-transfer v0.0.0-20191219005021-4accf56bd2ce/go.mod h1:b14UWxhxVCAjrQUYvVGrQRRsjAh79wXYejw9RbUcAww=
github.com/filecoin-project/go-fil-markets v0.0.0-20200114015428-74d100f305f8 h1:g3oodvSz+Ou+ObwcVBB2wyt8SHdWpwzMiNJ19U1zZNA=
github.com/filecoin-project/go-fil-markets v0.0.0-20200114015428-74d100f305f8/go.mod h1:c8NTjvFVy1Ud02mmGDjOiMeawY2t6ALfrrdvAB01FQc=
github.com/filecoin-project/go-fil-markets v0.0.0-20200124174839-9211de075b61 h1:51j9Csz3r/AeVUtUlPMHfUCOAG4D+uodkzT2ODHgJOk=
github.com/filecoin-project/go-fil-markets v0.0.0-20200124174839-9211de075b61/go.mod h1:hbYlEmbOg9QwhZ71B724oAgXQ0wnoWHj8S+33q9lrm8=
github.com/filecoin-project/go-paramfetch v0.0.0-20200102181131-b20d579f2878/go.mod h1:40kI2Gv16mwcRsHptI3OAV4nlOEU7wVDc4RgMylNFjU=
github.com/filecoin-project/go-paramfetch v0.0.1 h1:gV7bs5YaqlgpGFMiLxInGK2L1FyCXUE0rimz4L7ghoE=
github.com/filecoin-project/go-paramfetch v0.0.1/go.mod h1:fZzmf4tftbwf9S37XRifoJlz7nCjRdIrMGLR07dKLCc=
@ -177,6 +177,8 @@ github.com/gxed/hashland/murmur3 v0.0.1/go.mod h1:KjXop02n4/ckmZSnY2+HKcLud/tcmv
github.com/gxed/pubsub v0.0.0-20180201040156-26ebdf44f824/go.mod h1:OiEWyHgK+CWrmOlVquHaIK1vhpUJydC9m0Je6mhaiNE=
github.com/hannahhoward/cbor-gen-for v0.0.0-20191216214420-3e450425c40c h1:+MSf4NEnLCYZoAgK6fqwc7NH88nM8haFSxKGUGIG3vA=
github.com/hannahhoward/cbor-gen-for v0.0.0-20191216214420-3e450425c40c/go.mod h1:WVPCl0HO/0RAL5+vBH2GMxBomlxBF70MAS78+Lu1//k=
github.com/hannahhoward/cbor-gen-for v0.0.0-20191218204337-9ab7b1bcc099 h1:vQqOW42RRM5LoM/1K5dK940VipLqpH8lEVGrMz+mNjU=
github.com/hannahhoward/cbor-gen-for v0.0.0-20191218204337-9ab7b1bcc099/go.mod h1:WVPCl0HO/0RAL5+vBH2GMxBomlxBF70MAS78+Lu1//k=
github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/go-multierror v1.0.0 h1:iVjPR7a6H0tWELX5NxNe7bYopibicUzc7uPribsnS6o=
@ -211,9 +213,8 @@ github.com/ipfs/go-blockservice v0.0.7/go.mod h1:EOfb9k/Y878ZTRY/CH0x5+ATtaipfbR
github.com/ipfs/go-blockservice v0.1.0/go.mod h1:hzmMScl1kXHg3M2BjTymbVPjv627N7sYcvYaKbop39M=
github.com/ipfs/go-blockservice v0.1.3-0.20190908200855-f22eea50656c h1:lN5IQA07VtLiTLAp/Scezp1ljFhXErC6yq4O1cu+yJ0=
github.com/ipfs/go-blockservice v0.1.3-0.20190908200855-f22eea50656c/go.mod h1:t+411r7psEUhLueM8C7aPA7cxCclv4O3VsUVxt9kz2I=
github.com/ipfs/go-car v0.0.3-0.20191203022317-23b0a85fd1b1/go.mod h1:rmd887mJxQRDfndfDEY3Liyx8gQVyfFFRSHdsnDSAlk=
github.com/ipfs/go-car v0.0.3-0.20200121013634-f188c0e24291 h1:Yy0dcFWw8oDV/WJ4S/rkMQRWnJ3tGr9EbgDDv2JhVQw=
github.com/ipfs/go-car v0.0.3-0.20200121013634-f188c0e24291/go.mod h1:AG6sBpd2PWMccpAG7XLFBBQ/4rfBEtzUNeO2GSMesYk=
github.com/ipfs/go-car v0.0.3-0.20200124090545-1a340009d896 h1:l8gnU1VBhftugMKzfh+n7nuDhOw3X1iqfrA33GVBMMY=
github.com/ipfs/go-car v0.0.3-0.20200124090545-1a340009d896/go.mod h1:rmd887mJxQRDfndfDEY3Liyx8gQVyfFFRSHdsnDSAlk=
github.com/ipfs/go-cid v0.0.1/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM=
github.com/ipfs/go-cid v0.0.2/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM=
github.com/ipfs/go-cid v0.0.3/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM=
@ -305,7 +306,6 @@ github.com/ipfs/go-unixfs v0.2.2-0.20190827150610-868af2e9e5cb h1:tmWYgjltxwM7PD
github.com/ipfs/go-unixfs v0.2.2-0.20190827150610-868af2e9e5cb/go.mod h1:IwAAgul1UQIcNZzKPYZWOCijryFBeCV79cNubPzol+k=
github.com/ipfs/go-verifcid v0.0.1 h1:m2HI7zIuR5TFyQ1b79Da5N9dnnCP1vcu2QqawmWlK2E=
github.com/ipfs/go-verifcid v0.0.1/go.mod h1:5Hrva5KBeIog4A+UpqlaIU+DEstipcJYQQZc0g37pY0=
github.com/ipld/go-ipld-prime v0.0.1/go.mod h1:bDDSvVz7vaK12FNvMeRYnpRFkSUPNQOiCYQezMD/P3w=
github.com/ipld/go-ipld-prime v0.0.2-0.20191108012745-28a82f04c785 h1:fASnkvtR+SmB2y453RxmDD3Uvd4LonVUgFGk9JoDaZs=
github.com/ipld/go-ipld-prime v0.0.2-0.20191108012745-28a82f04c785/go.mod h1:bDDSvVz7vaK12FNvMeRYnpRFkSUPNQOiCYQezMD/P3w=
github.com/ipld/go-ipld-prime-proto v0.0.0-20191113031812-e32bd156a1e5 h1:lSip43rAdyGA+yRQuy6ju0ucZkWpYc1F2CTQtZTVW/4=
@ -410,10 +410,13 @@ github.com/libp2p/go-libp2p-discovery v0.1.0/go.mod h1:4F/x+aldVHjHDHuX85x1zWoFT
github.com/libp2p/go-libp2p-discovery v0.2.0 h1:1p3YSOq7VsgaL+xVHPi8XAmtGyas6D2J6rWBEfz/aiY=
github.com/libp2p/go-libp2p-discovery v0.2.0/go.mod h1:s4VGaxYMbw4+4+tsoQTqh7wfxg97AEdo4GYBt6BadWg=
github.com/libp2p/go-libp2p-host v0.0.1/go.mod h1:qWd+H1yuU0m5CwzAkvbSjqKairayEHdR5MMl7Cwa7Go=
github.com/libp2p/go-libp2p-host v0.0.3 h1:BB/1Z+4X0rjKP5lbQTmjEjLbDVbrcmLOlA6QDsN5/j4=
github.com/libp2p/go-libp2p-host v0.0.3/go.mod h1:Y/qPyA6C8j2coYyos1dfRm0I8+nvd4TGrDGt4tA7JR8=
github.com/libp2p/go-libp2p-interface-connmgr v0.0.1/go.mod h1:GarlRLH0LdeWcLnYM/SaBykKFl9U5JFnbBGruAk/D5k=
github.com/libp2p/go-libp2p-interface-connmgr v0.0.4/go.mod h1:GarlRLH0LdeWcLnYM/SaBykKFl9U5JFnbBGruAk/D5k=
github.com/libp2p/go-libp2p-interface-connmgr v0.0.5 h1:KG/KNYL2tYzXAfMvQN5K1aAGTYSYUMJ1prgYa2/JI1E=
github.com/libp2p/go-libp2p-interface-connmgr v0.0.5/go.mod h1:GarlRLH0LdeWcLnYM/SaBykKFl9U5JFnbBGruAk/D5k=
github.com/libp2p/go-libp2p-interface-pnet v0.0.1 h1:7GnzRrBTJHEsofi1ahFdPN9Si6skwXQE9UqR2S+Pkh8=
github.com/libp2p/go-libp2p-interface-pnet v0.0.1/go.mod h1:el9jHpQAXK5dnTpKA4yfCNBZXvrzdOU75zz+C6ryp3k=
github.com/libp2p/go-libp2p-kad-dht v0.1.1 h1:IH6NQuoUv5w5e1O8Jc3KyVDtr0rNd0G9aaADpLI1xVo=
github.com/libp2p/go-libp2p-kad-dht v0.1.1/go.mod h1:1kj2Rk5pX3/0RwqMm9AMNCT7DzcMHYhgDN5VTi+cY0M=
@ -422,6 +425,7 @@ github.com/libp2p/go-libp2p-kbucket v0.2.0/go.mod h1:JNymBToym3QXKBMKGy3m29+xprg
github.com/libp2p/go-libp2p-loggables v0.0.1/go.mod h1:lDipDlBNYbpyqyPX/KcoO+eq0sJYEVR2JgOexcivchg=
github.com/libp2p/go-libp2p-loggables v0.1.0 h1:h3w8QFfCt2UJl/0/NW4K829HX/0S4KD31PQ7m8UXXO8=
github.com/libp2p/go-libp2p-loggables v0.1.0/go.mod h1:EyumB2Y6PrYjr55Q3/tiJ/o3xoDasoRYM7nOzEpoa90=
github.com/libp2p/go-libp2p-metrics v0.0.1 h1:yumdPC/P2VzINdmcKZd0pciSUCpou+s0lwYCjBbzQZU=
github.com/libp2p/go-libp2p-metrics v0.0.1/go.mod h1:jQJ95SXXA/K1VZi13h52WZMa9ja78zjyy5rspMsC/08=
github.com/libp2p/go-libp2p-mplex v0.1.1/go.mod h1:KUQWpGkCzfV7UIpi8SKsAVxyBgz1c9R5EvxgnwLsb/I=
github.com/libp2p/go-libp2p-mplex v0.2.0/go.mod h1:Ejl9IyjvXJ0T9iqUTE1jpYATQ9NM3g+OtR+EMMODbKo=
@ -431,6 +435,7 @@ github.com/libp2p/go-libp2p-nat v0.0.4/go.mod h1:N9Js/zVtAXqaeT99cXgTV9e75KpnWCv
github.com/libp2p/go-libp2p-nat v0.0.5 h1:/mH8pXFVKleflDL1YwqMg27W9GD8kjEx7NY0P6eGc98=
github.com/libp2p/go-libp2p-nat v0.0.5/go.mod h1:1qubaE5bTZMJE+E/uu2URroMbzdubFz1ChgiN79yKPE=
github.com/libp2p/go-libp2p-net v0.0.1/go.mod h1:Yt3zgmlsHOgUWSXmt5V/Jpz9upuJBE8EgNU9DrCcR8c=
github.com/libp2p/go-libp2p-net v0.0.2 h1:qP06u4TYXfl7uW/hzqPhlVVTSA2nw1B/bHBJaUnbh6M=
github.com/libp2p/go-libp2p-net v0.0.2/go.mod h1:Yt3zgmlsHOgUWSXmt5V/Jpz9upuJBE8EgNU9DrCcR8c=
github.com/libp2p/go-libp2p-netutil v0.0.1/go.mod h1:GdusFvujWZI9Vt0X5BKqwWWmZFxecf9Gt03cKxm2f/Q=
github.com/libp2p/go-libp2p-netutil v0.1.0 h1:zscYDNVEcGxyUpMd0JReUZTrpMfia8PmLKcKF72EAMQ=
@ -447,6 +452,7 @@ github.com/libp2p/go-libp2p-peerstore v0.1.3/go.mod h1:BJ9sHlm59/80oSkpWgr1MyY1c
github.com/libp2p/go-libp2p-peerstore v0.1.4 h1:d23fvq5oYMJ/lkkbO4oTwBp/JP+I/1m5gZJobNXCE/k=
github.com/libp2p/go-libp2p-peerstore v0.1.4/go.mod h1:+4BDbDiiKf4PzpANZDAT+knVdLxvqh7hXOujessqdzs=
github.com/libp2p/go-libp2p-protocol v0.0.1/go.mod h1:Af9n4PiruirSDjHycM1QuiMi/1VZNHYcK8cLgFJLZ4s=
github.com/libp2p/go-libp2p-protocol v0.1.0 h1:HdqhEyhg0ToCaxgMhnOmUO8snQtt/kQlcjVk3UoJU3c=
github.com/libp2p/go-libp2p-protocol v0.1.0/go.mod h1:KQPHpAabB57XQxGrXCNvbL6UEXfQqUgC/1adR2Xtflk=
github.com/libp2p/go-libp2p-pubsub v0.2.3 h1:qJRnRnM7Z4xnHb4i6EBb3DKQXRPgtFWlKP4AmfJudLQ=
github.com/libp2p/go-libp2p-pubsub v0.2.3/go.mod h1:Jscj3fk23R5mCrOwb625xjVs5ZEyTZcx/OlTwMDqU+g=
@ -515,6 +521,7 @@ github.com/libp2p/go-reuseport v0.0.1/go.mod h1:jn6RmB1ufnQwl0Q1f+YxAj8isJgDCQza
github.com/libp2p/go-reuseport-transport v0.0.2 h1:WglMwyXyBu61CMkjCCtnmqNqnjib0GIEjMiHTwR/KN4=
github.com/libp2p/go-reuseport-transport v0.0.2/go.mod h1:YkbSDrvjUVDL6b8XqriyA20obEtsW9BLkuOUyQAOCbs=
github.com/libp2p/go-stream-muxer v0.0.1/go.mod h1:bAo8x7YkSpadMTbtTaxGVHWUQsR/l5MEaHbKaliuT14=
github.com/libp2p/go-stream-muxer v0.1.0 h1:3ToDXUzx8pDC6RfuOzGsUYP5roMDthbUKRdMRRhqAqY=
github.com/libp2p/go-stream-muxer v0.1.0/go.mod h1:8JAVsjeRBCWwPoZeH0W1imLOcriqXJyFvB0mR4A04sQ=
github.com/libp2p/go-stream-muxer-multistream v0.1.1/go.mod h1:zmGdfkQ1AzOECIAcccoL8L//laqawOsO03zX8Sa+eGw=
github.com/libp2p/go-stream-muxer-multistream v0.2.0 h1:714bRJ4Zy9mdhyTLJ+ZKiROmAFwUHpeRidG+q7LTQOg=
@ -554,6 +561,7 @@ github.com/mattn/go-runewidth v0.0.7 h1:Ei8KR0497xHyKJPAv59M1dkC+rOZCMBJ+t3fZ+tw
github.com/mattn/go-runewidth v0.0.7/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI=
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
github.com/mgutz/ansi v0.0.0-20170206155736-9520e82c474b/go.mod h1:01TrycV0kFyexm33Z7vhZRXopbI8J3TDReVlkTgMUxE=
github.com/miekg/dns v1.1.12 h1:WMhc1ik4LNkTg8U9l3hI1LvxKmIL+f1+WV/SZtCbDDA=
github.com/miekg/dns v1.1.12/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1 h1:lYpkrQH5ajf0OXOcUbGjvZxxijuBwbbmlSxLiuofa+g=
github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1/go.mod h1:pD8RvIylQ358TN4wwqatJ8rNavkEINozVn9DtGI3dfQ=
@ -647,6 +655,8 @@ github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:
github.com/prometheus/client_model v0.0.0-20190115171406-56726106282f/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
github.com/prometheus/common v0.2.0 h1:kUZDBDTdBVBYBj5Tmh2NZLlF60mfjA27rM34b+cVwNU=
github.com/prometheus/common v0.2.0/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4=
github.com/prometheus/common v0.4.0 h1:7etb9YClo3a6HjLzfl6rIQaU+FDfi0VSX39io3aQ+DM=
github.com/prometheus/common v0.4.0/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4=
github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
github.com/prometheus/procfs v0.0.0-20190117184657-bf6a532e95b1/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
@ -728,6 +738,7 @@ github.com/whyrusleeping/go-smux-yamux v2.0.8+incompatible/go.mod h1:6qHUzBXUbB9
github.com/whyrusleeping/mafmt v1.2.8 h1:TCghSl5kkwEE0j+sU/gudyhVMRlpBin8fMBBHg59EbA=
github.com/whyrusleeping/mafmt v1.2.8/go.mod h1:faQJFPbLSxzD9xpA02ttW/tS9vZykNvXwGvqIpk20FA=
github.com/whyrusleeping/mdns v0.0.0-20180901202407-ef14215e6b30/go.mod h1:j4l84WPFclQPj320J9gp0XwNKBb3U0zt5CBqjPp22G4=
github.com/whyrusleeping/mdns v0.0.0-20190826153040-b9b60ed33aa9 h1:Y1/FEOpaCpD21WxrmfeIYCFPuVPRCY2XZTWzTNHGw30=
github.com/whyrusleeping/mdns v0.0.0-20190826153040-b9b60ed33aa9/go.mod h1:j4l84WPFclQPj320J9gp0XwNKBb3U0zt5CBqjPp22G4=
github.com/whyrusleeping/multiaddr-filter v0.0.0-20160516205228-e903e4adabd7 h1:E9S12nwJwEOXe2d6gT6qxdvqMnNq+VnSsKPgm2ZZNds=
github.com/whyrusleeping/multiaddr-filter v0.0.0-20160516205228-e903e4adabd7/go.mod h1:X2c0RVCI1eSUFI8eLcY3c0423ykwiUdxLJtkDvruhjI=

View File

@ -2,40 +2,36 @@ package retrievaladapter
import (
"context"
"github.com/ipfs/go-cid"
blockstore "github.com/ipfs/go-ipfs-blockstore"
"io"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
retrievaltoken "github.com/filecoin-project/go-fil-markets/shared/tokenamount"
retrievaltypes "github.com/filecoin-project/go-fil-markets/shared/types"
"github.com/filecoin-project/go-sectorbuilder"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/markets/utils"
"github.com/filecoin-project/lotus/storage/sectorblocks"
"github.com/filecoin-project/lotus/storage"
)
type retrievalProviderNode struct {
sectorBlocks *sectorblocks.SectorBlocks
full api.FullNode
miner *storage.Miner
sb sectorbuilder.Interface
full api.FullNode
}
// NewRetrievalProviderNode returns a new node adapter for a retrieval provider that talks to the
// Lotus Node
func NewRetrievalProviderNode(sectorBlocks *sectorblocks.SectorBlocks, full api.FullNode) retrievalmarket.RetrievalProviderNode {
return &retrievalProviderNode{sectorBlocks, full}
func NewRetrievalProviderNode(miner *storage.Miner, sb sectorbuilder.Interface, full api.FullNode) retrievalmarket.RetrievalProviderNode {
return &retrievalProviderNode{miner, sb, full}
}
func (rpn *retrievalProviderNode) GetPieceSize(pieceCid []byte) (uint64, error) {
asCid, err := cid.Cast(pieceCid)
func (rpn *retrievalProviderNode) UnsealSector(ctx context.Context, sectorID uint64, offset uint64, length uint64) (io.ReadCloser, error) {
si, err := rpn.miner.GetSectorInfo(sectorID)
if err != nil {
return 0, err
return nil, err
}
return rpn.sectorBlocks.GetSize(asCid)
}
func (rpn *retrievalProviderNode) SealedBlockstore(approveUnseal func() error) blockstore.Blockstore {
return rpn.sectorBlocks.SealedBlockstore(approveUnseal)
return rpn.sb.ReadPieceFromSealedSector(sectorID, offset, length, si.Ticket.TicketBytes, si.CommD)
}
func (rpn *retrievalProviderNode) SavePaymentVoucher(ctx context.Context, paymentChannel address.Address, voucher *retrievaltypes.SignedVoucher, proof []byte, expectedAmount retrievaltoken.TokenAmount) (retrievaltoken.TokenAmount, error) {

View File

@ -221,6 +221,7 @@ func (c *ClientNodeAdapter) ValidatePublishedDeal(ctx context.Context, deal stor
func (c *ClientNodeAdapter) OnDealSectorCommitted(ctx context.Context, provider address.Address, dealId uint64, cb storagemarket.DealSectorCommittedCallback) error {
checkFunc := func(ts *types.TipSet) (done bool, more bool, err error) {
sd, err := stmgr.GetStorageDeal(ctx, c.StateManager, dealId, ts)
if err != nil {
// TODO: This may be fine for some errors
return false, false, xerrors.Errorf("failed to look up deal on chain: %w", err)

View File

@ -5,10 +5,10 @@ package storageadapter
import (
"bytes"
"context"
"io"
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log"
unixfile "github.com/ipfs/go-unixfs/file"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-address"
@ -16,11 +16,13 @@ import (
sharedtypes "github.com/filecoin-project/go-fil-markets/shared/types"
"github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/actors"
"github.com/filecoin-project/lotus/chain/events"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/lib/padreader"
"github.com/filecoin-project/lotus/markets/utils"
"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/storage/sealing"
"github.com/filecoin-project/lotus/storage/sectorblocks"
)
@ -33,6 +35,7 @@ type ProviderNodeAdapter struct {
dag dtypes.StagingDAG
secb *sectorblocks.SectorBlocks
ev *events.Events
}
func NewProviderNodeAdapter(dag dtypes.StagingDAG, secb *sectorblocks.SectorBlocks, full api.FullNode) storagemarket.StorageProviderNode {
@ -40,6 +43,7 @@ func NewProviderNodeAdapter(dag dtypes.StagingDAG, secb *sectorblocks.SectorBloc
FullNode: full,
dag: dag,
secb: secb,
ev: events.NewEvents(context.TODO(), full),
}
}
@ -94,35 +98,9 @@ func (n *ProviderNodeAdapter) PublishDeals(ctx context.Context, deal storagemark
return storagemarket.DealID(resp.DealIDs[0]), smsg.Cid(), nil
}
func (n *ProviderNodeAdapter) OnDealComplete(ctx context.Context, deal storagemarket.MinerDeal, piecePath string) (uint64, error) {
root, err := n.dag.Get(ctx, deal.Ref)
if err != nil {
return 0, xerrors.Errorf("failed to get file root for deal: %s", err)
}
func (n *ProviderNodeAdapter) OnDealComplete(ctx context.Context, deal storagemarket.MinerDeal, pieceSize uint64, pieceData io.Reader) (uint64, error) {
// TODO: abstract this away into ReadSizeCloser + implement different modes
node, err := unixfile.NewUnixfsFile(ctx, n.dag, root)
if err != nil {
return 0, xerrors.Errorf("cannot open unixfs file: %s", err)
}
uf, ok := node.(sectorblocks.UnixfsReader)
if !ok {
// we probably got directory, unsupported for now
return 0, xerrors.Errorf("unsupported unixfs file type")
}
// TODO: uf.Size() is user input, not trusted
// This won't be useful / here after we migrate to putting CARs into sectors
size, err := uf.Size()
if err != nil {
return 0, xerrors.Errorf("getting unixfs file size: %w", err)
}
if padreader.PaddedSize(uint64(size)) != deal.Proposal.PieceSize {
return 0, xerrors.Errorf("deal.Proposal.PieceSize didn't match padded unixfs file size")
}
sectorID, err := n.secb.AddUnixfsPiece(ctx, uf, deal.DealID)
sectorID, err := n.secb.AddPiece(ctx, pieceSize, pieceData, deal.DealID)
if err != nil {
return 0, xerrors.Errorf("AddPiece failed: %s", err)
}
@ -206,4 +184,117 @@ func (n *ProviderNodeAdapter) GetBalance(ctx context.Context, addr address.Addre
return utils.ToSharedBalance(bal), nil
}
func (n *ProviderNodeAdapter) LocatePieceForDealWithinSector(ctx context.Context, dealID uint64) (sectorID uint64, offset uint64, length uint64, err error) {
refs, err := n.secb.GetRefs(dealID)
if err != nil {
return 0, 0, 0, err
}
if len(refs) == 0 {
return 0, 0, 0, xerrors.New("no sector information for deal ID")
}
// TODO: better strategy (e.g. look for already unsealed)
var best api.SealedRef
var bestSi sealing.SectorInfo
for _, r := range refs {
si, err := n.secb.Miner.GetSectorInfo(r.SectorID)
if err != nil {
return 0, 0, 0, xerrors.Errorf("getting sector info: %w", err)
}
if si.State == api.Proving {
best = r
bestSi = si
break
}
}
if bestSi.State == api.UndefinedSectorState {
return 0, 0, 0, xerrors.New("no sealed sector found")
}
return best.SectorID, best.Offset, best.Size, nil
}
func (n *ProviderNodeAdapter) OnDealSectorCommitted(ctx context.Context, provider address.Address, dealID uint64, cb storagemarket.DealSectorCommittedCallback) error {
checkFunc := func(ts *types.TipSet) (done bool, more bool, err error) {
sd, err := n.StateMarketStorageDeal(ctx, dealID, ts)
if err != nil {
// TODO: This may be fine for some errors
return false, false, xerrors.Errorf("failed to look up deal on chain: %w", err)
}
if sd.ActivationEpoch > 0 {
cb(nil)
return true, false, nil
}
return false, true, nil
}
called := func(msg *types.Message, rec *types.MessageReceipt, ts *types.TipSet, curH uint64) (more bool, err error) {
defer func() {
if err != nil {
cb(xerrors.Errorf("handling applied event: %w", err))
}
}()
if msg == nil {
log.Error("timed out waiting for deal activation... what now?")
return false, nil
}
sd, err := n.StateMarketStorageDeal(ctx, dealID, ts)
if err != nil {
return false, xerrors.Errorf("failed to look up deal on chain: %w", err)
}
if sd.ActivationEpoch == 0 {
return false, xerrors.Errorf("deal wasn't active: deal=%d, parentState=%s, h=%d", dealID, ts.ParentState(), ts.Height())
}
log.Infof("Storage deal %d activated at epoch %d", dealID, sd.ActivationEpoch)
cb(nil)
return false, nil
}
revert := func(ctx context.Context, ts *types.TipSet) error {
log.Warn("deal activation reverted; TODO: actually handle this!")
// TODO: Just go back to DealSealing?
return nil
}
matchEvent := func(msg *types.Message) (bool, error) {
if msg.To != provider {
return false, nil
}
if msg.Method != actors.MAMethods.ProveCommitSector {
return false, nil
}
var params actors.SectorProveCommitInfo
if err := params.UnmarshalCBOR(bytes.NewReader(msg.Params)); err != nil {
return false, err
}
var found bool
for _, did := range params.DealIDs {
if did == dealID {
found = true
break
}
}
return found, nil
}
if err := n.ev.Called(checkFunc, called, revert, 3, build.SealRandomnessLookbackLimit, matchEvent); err != nil {
return xerrors.Errorf("failed to set up called handler")
}
return nil
}
var _ storagemarket.StorageProviderNode = &ProviderNodeAdapter{}

View File

@ -256,6 +256,7 @@ func Online() Option {
Override(new(dtypes.ProviderDealStore), modules.NewProviderDealStore),
Override(new(dtypes.ProviderDataTransfer), modules.NewProviderDAGServiceDataTransfer),
Override(new(*deals.ProviderRequestValidator), modules.NewProviderRequestValidator),
Override(new(dtypes.ProviderPieceStore), modules.NewProviderPieceStore),
Override(new(storagemarket.StorageProvider), modules.StorageProvider),
Override(new(storagemarket.StorageProviderNode), storageadapter.NewProviderNodeAdapter),
Override(RegisterProviderValidatorKey, modules.RegisterProviderValidator),

View File

@ -1,7 +1,6 @@
package client
import (
"bytes"
"context"
"errors"
"io"
@ -155,7 +154,7 @@ func (a *API) ClientFindData(ctx context.Context, root cid.Cid) ([]api.QueryOffe
out := make([]api.QueryOffer, len(peers))
for k, p := range peers {
queryResponse, err := a.Retrieval.Query(ctx, p, root.Bytes(), retrievalmarket.QueryParams{})
queryResponse, err := a.Retrieval.Query(ctx, p, root, retrievalmarket.QueryParams{})
if err != nil {
out[k] = api.QueryOffer{Err: err.Error(), Miner: p.Address, MinerPeerID: p.ID}
} else {
@ -279,7 +278,7 @@ func (a *API) ClientRetrieve(ctx context.Context, order api.RetrievalOrder, path
retrievalResult := make(chan error, 1)
unsubscribe := a.Retrieval.SubscribeToEvents(func(event retrievalmarket.ClientEvent, state retrievalmarket.ClientDealState) {
if bytes.Equal(state.PieceCID, order.Root.Bytes()) {
if state.PayloadCID.Equals(order.Root) {
switch event {
case retrievalmarket.ClientEventError:
retrievalResult <- xerrors.New("Retrieval Error")
@ -291,7 +290,7 @@ func (a *API) ClientRetrieve(ctx context.Context, order api.RetrievalOrder, path
a.Retrieval.Retrieve(
ctx,
order.Root.Bytes(),
order.Root,
retrievalmarket.NewParamsV0(types.BigDiv(order.Total, types.NewInt(order.Size)).Int, 0, 0),
utils.ToSharedTokenAmount(order.Total),
order.MinerPeerID,

View File

@ -7,6 +7,7 @@ import (
"mime"
"net/http"
"os"
"strconv"
"github.com/filecoin-project/lotus/api/apistruct"
@ -210,7 +211,7 @@ func (sm *StorageMinerAPI) SectorsRefs(context.Context) (map[string][]api.Sealed
}
for k, v := range refs {
out[k.String()] = v
out[strconv.FormatUint(k, 10)] = v
}
return out, nil

View File

@ -5,10 +5,12 @@ import (
"path/filepath"
"reflect"
"github.com/filecoin-project/go-data-transfer/impl/graphsync"
graphsyncimpl "github.com/filecoin-project/go-data-transfer/impl/graphsync"
piecefilestore "github.com/filecoin-project/go-fil-markets/filestore"
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
"github.com/filecoin-project/go-fil-markets/retrievalmarket/discovery"
retrievalimpl "github.com/filecoin-project/go-fil-markets/retrievalmarket/impl"
rmnet "github.com/filecoin-project/go-fil-markets/retrievalmarket/network"
"github.com/filecoin-project/go-fil-markets/storagemarket"
deals "github.com/filecoin-project/go-fil-markets/storagemarket/impl"
storageimpl "github.com/filecoin-project/go-fil-markets/storagemarket/impl"
@ -109,12 +111,17 @@ func NewClientRequestValidator(deals dtypes.ClientDealStore) *storageimpl.Client
return storageimpl.NewClientRequestValidator(deals)
}
func StorageClient(h host.Host, dag dtypes.ClientDAG, dataTransfer dtypes.ClientDataTransfer, discovery *discovery.Local, deals dtypes.ClientDealStore, scn storagemarket.StorageClientNode) storagemarket.StorageClient {
return storageimpl.NewClient(h, dag, dataTransfer, discovery, deals, scn)
func StorageClient(h host.Host, ibs dtypes.ClientBlockstore, r repo.LockedRepo, dataTransfer dtypes.ClientDataTransfer, discovery *discovery.Local, deals dtypes.ClientDealStore, scn storagemarket.StorageClientNode) (storagemarket.StorageClient, error) {
store, err := piecefilestore.NewLocalFileStore(piecefilestore.OsPath(r.Path()))
if err != nil {
return nil, err
}
return storageimpl.NewClient(h, ibs, store, dataTransfer, discovery, deals, scn), nil
}
// RetrievalClient creates a new retrieval client attached to the client blockstore
func RetrievalClient(h host.Host, bs dtypes.ClientBlockstore, pmgr *paych.Manager, payapi payapi.PaychAPI) retrievalmarket.RetrievalClient {
func RetrievalClient(h host.Host, bs dtypes.ClientBlockstore, pmgr *paych.Manager, payapi payapi.PaychAPI, resolver retrievalmarket.PeerResolver) retrievalmarket.RetrievalClient {
adapter := retrievaladapter.NewRetrievalClientNode(pmgr, payapi)
return retrievalimpl.NewClient(h, bs, adapter)
network := rmnet.NewFromLibp2pHost(h)
return retrievalimpl.NewClient(network, bs, adapter, resolver)
}

View File

@ -1,6 +1,8 @@
package dtypes
import (
datatransfer "github.com/filecoin-project/go-data-transfer"
"github.com/filecoin-project/go-fil-markets/piecestore"
bserv "github.com/ipfs/go-blockservice"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-filestore"
@ -9,7 +11,6 @@ import (
exchange "github.com/ipfs/go-ipfs-exchange-interface"
ipld "github.com/ipfs/go-ipld-format"
"github.com/filecoin-project/go-data-transfer"
"github.com/filecoin-project/go-statestore"
)
@ -34,6 +35,7 @@ type ClientDealStore *statestore.StateStore
type ClientDataTransfer datatransfer.Manager
type ProviderDealStore *statestore.StateStore
type ProviderPieceStore piecestore.PieceStore
// ProviderDataTransfer is a data transfer manager for the provider
type ProviderDataTransfer datatransfer.Manager

View File

@ -7,8 +7,11 @@ import (
"github.com/filecoin-project/go-address"
dtgraphsync "github.com/filecoin-project/go-data-transfer/impl/graphsync"
piecefilestore "github.com/filecoin-project/go-fil-markets/filestore"
"github.com/filecoin-project/go-fil-markets/piecestore"
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
retrievalimpl "github.com/filecoin-project/go-fil-markets/retrievalmarket/impl"
rmnet "github.com/filecoin-project/go-fil-markets/retrievalmarket/network"
"github.com/filecoin-project/go-fil-markets/storagemarket"
deals "github.com/filecoin-project/go-fil-markets/storagemarket/impl"
storageimpl "github.com/filecoin-project/go-fil-markets/storagemarket/impl"
@ -42,7 +45,6 @@ import (
"github.com/filecoin-project/lotus/node/repo"
"github.com/filecoin-project/lotus/storage"
"github.com/filecoin-project/lotus/storage/sealing"
"github.com/filecoin-project/lotus/storage/sectorblocks"
)
func minerAddrFromDS(ds dtypes.MetadataDS) (address.Address, error) {
@ -124,7 +126,11 @@ func StorageMiner(mctx helpers.MetricsCtx, lc fx.Lifecycle, api api.FullNode, h
func HandleRetrieval(host host.Host, lc fx.Lifecycle, m retrievalmarket.RetrievalProvider) {
lc.Append(fx.Hook{
OnStart: func(context.Context) error {
m.Start(host)
m.Start()
return nil
},
OnStop: func(context.Context) error {
m.Stop()
return nil
},
})
@ -165,6 +171,12 @@ func NewProviderDealStore(ds dtypes.MetadataDS) dtypes.ProviderDealStore {
return statestore.New(namespace.Wrap(ds, datastore.NewKey("/deals/client")))
}
// NewProviderPieceStore creates a statestore for storing metadata about pieces
// shared by the storage and retrieval providers
func NewProviderPieceStore(ds dtypes.MetadataDS) dtypes.ProviderPieceStore {
return piecestore.NewPieceStore(ds)
}
// StagingBlockstore creates a blockstore for staging blocks for a miner
// in a storage deal, prior to sealing
func StagingBlockstore(r repo.LockedRepo) (dtypes.StagingBlockstore, error) {
@ -269,12 +281,21 @@ func NewProviderRequestValidator(deals dtypes.ProviderDealStore) *storageimpl.Pr
return storageimpl.NewProviderRequestValidator(deals)
}
func StorageProvider(ds dtypes.MetadataDS, dag dtypes.StagingDAG, dataTransfer dtypes.ProviderDataTransfer, spn storagemarket.StorageProviderNode) (storagemarket.StorageProvider, error) {
return storageimpl.NewProvider(ds, dag, dataTransfer, spn)
func StorageProvider(ds dtypes.MetadataDS, ibs dtypes.StagingBlockstore, r repo.LockedRepo, pieceStore dtypes.ProviderPieceStore, dataTransfer dtypes.ProviderDataTransfer, spn storagemarket.StorageProviderNode) (storagemarket.StorageProvider, error) {
store, err := piecefilestore.NewLocalFileStore(piecefilestore.OsPath(r.Path()))
if err != nil {
return nil, err
}
return storageimpl.NewProvider(ds, ibs, store, pieceStore, dataTransfer, spn)
}
// RetrievalProvider creates a new retrieval provider attached to the provider blockstore
func RetrievalProvider(sblks *sectorblocks.SectorBlocks, full api.FullNode) retrievalmarket.RetrievalProvider {
adapter := retrievaladapter.NewRetrievalProviderNode(sblks, full)
return retrievalimpl.NewProvider(adapter)
func RetrievalProvider(h host.Host, miner *storage.Miner, sb sectorbuilder.Interface, full api.FullNode, ds dtypes.MetadataDS, pieceStore dtypes.ProviderPieceStore, ibs dtypes.StagingBlockstore) (retrievalmarket.RetrievalProvider, error) {
adapter := retrievaladapter.NewRetrievalProviderNode(miner, sb, full)
address, err := minerAddrFromDS(ds)
if err != nil {
return nil, err
}
network := rmnet.NewFromLibp2pHost(h)
return retrievalimpl.NewProvider(address, adapter, network, pieceStore, ibs), nil
}

View File

@ -3,23 +3,19 @@ package sectorblocks
import (
"bytes"
"context"
"encoding/binary"
"errors"
"io"
"sync"
sectorbuilder "github.com/filecoin-project/go-sectorbuilder"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
"github.com/ipfs/go-datastore/query"
blockstore "github.com/ipfs/go-ipfs-blockstore"
dshelp "github.com/ipfs/go-ipfs-ds-help"
files "github.com/ipfs/go-ipfs-files"
ipld "github.com/ipfs/go-ipld-format"
"github.com/ipfs/go-unixfs"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-cbor-util"
cborutil "github.com/filecoin-project/go-cbor-util"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/lib/padreader"
"github.com/filecoin-project/lotus/node/modules/dtypes"
@ -33,16 +29,28 @@ const (
)
var dsPrefix = datastore.NewKey("/sealedblocks")
var imBlocksPrefix = datastore.NewKey("/intermediate")
var ErrNotFound = errors.New("not found")
func DealIDToDsKey(dealID uint64) datastore.Key {
buf := make([]byte, binary.MaxVarintLen64)
binary.PutUvarint(buf, dealID)
return dshelp.NewKeyFromBinary(buf)
}
func DsKeyToDealID(key datastore.Key) (uint64, error) {
buf, err := dshelp.BinaryFromDsKey(key)
if err != nil {
return 0, err
}
dealID, _ := binary.Uvarint(buf)
return dealID, nil
}
type SectorBlocks struct {
*storage.Miner
sb sectorbuilder.Interface
intermediate blockstore.Blockstore // holds intermediate nodes TODO: consider combining with the staging blockstore
keys datastore.Batching
keyLk sync.Mutex
}
@ -51,36 +59,17 @@ func NewSectorBlocks(miner *storage.Miner, ds dtypes.MetadataDS, sb sectorbuilde
sbc := &SectorBlocks{
Miner: miner,
sb: sb,
intermediate: blockstore.NewBlockstore(namespace.Wrap(ds, imBlocksPrefix)),
keys: namespace.Wrap(ds, dsPrefix),
keys: namespace.Wrap(ds, dsPrefix),
}
return sbc
}
type UnixfsReader interface {
files.File
// ReadBlock reads data from a single unixfs block. Data is nil
// for intermediate nodes
ReadBlock(context.Context) (data []byte, offset uint64, nd ipld.Node, err error)
}
type refStorer struct {
blockReader UnixfsReader
writeRef func(cid cid.Cid, offset uint64, size uint64) error
intermediate blockstore.Blockstore
remaining []byte
}
func (st *SectorBlocks) writeRef(cid cid.Cid, sectorID uint64, offset uint64, size uint64) error {
func (st *SectorBlocks) writeRef(dealID uint64, sectorID uint64, offset uint64, size uint64) error {
st.keyLk.Lock() // TODO: make this multithreaded
defer st.keyLk.Unlock()
v, err := st.keys.Get(dshelp.CidToDsKey(cid))
v, err := st.keys.Get(DealIDToDsKey(dealID))
if err == datastore.ErrNotFound {
err = nil
}
@ -105,80 +94,22 @@ func (st *SectorBlocks) writeRef(cid cid.Cid, sectorID uint64, offset uint64, si
if err != nil {
return xerrors.Errorf("serializing refs: %w", err)
}
return st.keys.Put(dshelp.CidToDsKey(cid), newRef) // TODO: batch somehow
return st.keys.Put(DealIDToDsKey(dealID), newRef) // TODO: batch somehow
}
func (r *refStorer) Read(p []byte) (n int, err error) {
offset := 0
if len(r.remaining) > 0 {
offset += len(r.remaining)
read := copy(p, r.remaining)
if read == len(r.remaining) {
r.remaining = nil
} else {
r.remaining = r.remaining[read:]
}
return read, nil
}
for {
data, offset, nd, err := r.blockReader.ReadBlock(context.TODO())
if err != nil {
if err == io.EOF {
return 0, io.EOF
}
return 0, xerrors.Errorf("reading block: %w", err)
}
if len(data) == 0 {
// TODO: batch
// TODO: GC
if err := r.intermediate.Put(nd); err != nil {
return 0, xerrors.Errorf("storing intermediate node: %w", err)
}
continue
}
if err := r.writeRef(nd.Cid(), offset, uint64(len(data))); err != nil {
return 0, xerrors.Errorf("writing ref: %w", err)
}
read := copy(p, data)
if read < len(data) {
r.remaining = data[read:]
}
// TODO: read multiple
return read, nil
}
}
func (st *SectorBlocks) AddUnixfsPiece(ctx context.Context, r UnixfsReader, dealID uint64) (sectorID uint64, err error) {
size, err := r.Size()
if err != nil {
return 0, err
}
func (st *SectorBlocks) AddPiece(ctx context.Context, size uint64, r io.Reader, dealID uint64) (sectorID uint64, err error) {
sectorID, pieceOffset, err := st.Miner.AllocatePiece(padreader.PaddedSize(uint64(size)))
if err != nil {
return 0, err
}
refst := &refStorer{
blockReader: r,
writeRef: func(cid cid.Cid, offset uint64, size uint64) error {
offset += pieceOffset
st.writeRef(dealID, sectorID, pieceOffset, size)
return st.writeRef(cid, sectorID, offset, size)
},
intermediate: st.intermediate,
}
pr, psize := padreader.New(refst, uint64(size))
return sectorID, st.Miner.SealPiece(ctx, psize, pr, sectorID, dealID)
return sectorID, st.Miner.SealPiece(ctx, size, r, sectorID, dealID)
}
func (st *SectorBlocks) List() (map[cid.Cid][]api.SealedRef, error) {
func (st *SectorBlocks) List() (map[uint64][]api.SealedRef, error) {
res, err := st.keys.Query(query.Query{})
if err != nil {
return nil, err
@ -189,9 +120,9 @@ func (st *SectorBlocks) List() (map[cid.Cid][]api.SealedRef, error) {
return nil, err
}
out := map[cid.Cid][]api.SealedRef{}
out := map[uint64][]api.SealedRef{}
for _, ent := range ents {
refCid, err := dshelp.DsKeyToCid(datastore.RawKey(ent.Key))
dealID, err := DsKeyToDealID(datastore.RawKey(ent.Key))
if err != nil {
return nil, err
}
@ -201,14 +132,14 @@ func (st *SectorBlocks) List() (map[cid.Cid][]api.SealedRef, error) {
return nil, err
}
out[refCid] = refs.Refs
out[dealID] = refs.Refs
}
return out, nil
}
func (st *SectorBlocks) GetRefs(k cid.Cid) ([]api.SealedRef, error) { // TODO: track local sectors
ent, err := st.keys.Get(dshelp.CidToDsKey(k))
func (st *SectorBlocks) GetRefs(dealID uint64) ([]api.SealedRef, error) { // TODO: track local sectors
ent, err := st.keys.Get(DealIDToDsKey(dealID))
if err == datastore.ErrNotFound {
err = ErrNotFound
}
@ -224,42 +155,16 @@ func (st *SectorBlocks) GetRefs(k cid.Cid) ([]api.SealedRef, error) { // TODO: t
return refs.Refs, nil
}
func (st *SectorBlocks) GetSize(k cid.Cid) (uint64, error) {
blk, err := st.intermediate.Get(k)
if err == blockstore.ErrNotFound {
refs, err := st.GetRefs(k)
if err != nil {
return 0, err
}
return uint64(refs[0].Size), nil
}
func (st *SectorBlocks) GetSize(dealID uint64) (uint64, error) {
refs, err := st.GetRefs(dealID)
if err != nil {
return 0, err
}
nd, err := ipld.Decode(blk)
if err != nil {
return 0, err
}
fsn, err := unixfs.ExtractFSNode(nd)
if err != nil {
return 0, err
}
return fsn.FileSize(), nil
return uint64(refs[0].Size), nil
}
func (st *SectorBlocks) Has(k cid.Cid) (bool, error) {
func (st *SectorBlocks) Has(dealID uint64) (bool, error) {
// TODO: ensure sector is still there
return st.keys.Has(dshelp.CidToDsKey(k))
}
func (st *SectorBlocks) SealedBlockstore(approveUnseal func() error) *SectorBlockStore {
return &SectorBlockStore{
intermediate: st.intermediate,
sectorBlocks: st,
approveUnseal: approveUnseal,
}
return st.keys.Has(DealIDToDsKey(dealID))
}

View File

@ -1,126 +0,0 @@
package sectorblocks
import (
"context"
"io/ioutil"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
blockstore "github.com/ipfs/go-ipfs-blockstore"
logging "github.com/ipfs/go-log/v2"
"golang.org/x/xerrors"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/storage/sealing"
)
var log = logging.Logger("sectorblocks")
type SectorBlockStore struct {
intermediate blockstore.Blockstore
sectorBlocks *SectorBlocks
approveUnseal func() error
}
func (s *SectorBlockStore) DeleteBlock(cid.Cid) error {
panic("not supported")
}
func (s *SectorBlockStore) GetSize(cid.Cid) (int, error) {
panic("not supported")
}
func (s *SectorBlockStore) Put(blocks.Block) error {
panic("not supported")
}
func (s *SectorBlockStore) PutMany([]blocks.Block) error {
panic("not supported")
}
func (s *SectorBlockStore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
panic("not supported")
}
func (s *SectorBlockStore) HashOnRead(enabled bool) {
panic("not supported")
}
func (s *SectorBlockStore) Has(c cid.Cid) (bool, error) {
has, err := s.intermediate.Has(c)
if err != nil {
return false, err
}
if has {
return true, nil
}
return s.sectorBlocks.Has(c)
}
func (s *SectorBlockStore) Get(c cid.Cid) (blocks.Block, error) {
val, err := s.intermediate.Get(c)
if err == nil {
return val, nil
}
if err != blockstore.ErrNotFound {
return nil, err
}
refs, err := s.sectorBlocks.GetRefs(c)
if err != nil {
return nil, err
}
if len(refs) == 0 {
return nil, blockstore.ErrNotFound
}
// TODO: better strategy (e.g. look for already unsealed)
var best api.SealedRef
var bestSi sealing.SectorInfo
for _, r := range refs {
si, err := s.sectorBlocks.Miner.GetSectorInfo(r.SectorID)
if err != nil {
return nil, xerrors.Errorf("getting sector info: %w", err)
}
if si.State == api.Proving {
best = r
bestSi = si
break
}
}
if bestSi.State == api.UndefinedSectorState {
return nil, xerrors.New("no sealed sector found")
}
log.Infof("reading block %s from sector %d(+%d;%d)", c, best.SectorID, best.Offset, best.Size)
r, err := s.sectorBlocks.sb.ReadPieceFromSealedSector(
best.SectorID,
best.Offset,
best.Size,
bestSi.Ticket.TicketBytes,
bestSi.CommD,
)
if err != nil {
return nil, xerrors.Errorf("unsealing block: %w", err)
}
defer r.Close()
data, err := ioutil.ReadAll(r)
if err != nil {
return nil, xerrors.Errorf("reading block data: %w", err)
}
if uint64(len(data)) != best.Size {
return nil, xerrors.Errorf("got wrong amount of data: %d != !d", len(data), best.Size)
}
b, err := blocks.NewBlockWithCid(data, c)
if err != nil {
return nil, xerrors.Errorf("sbs get (%d[%d:%d]): %w", best.SectorID, best.Offset, best.Offset+best.Size, err)
}
return b, nil
}
var _ blockstore.Blockstore = &SectorBlockStore{}