From 71289b58ff4c27189f945234af078d9ac1c00cf8 Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Fri, 24 Jan 2020 12:19:52 -0800 Subject: [PATCH] feat(markets): update to support car files Upgrades lotus version of go-fil-markets to its head, which supports car files and handles tracking of pieces, reducing the size of sectorBlocks --- go.mod | 6 +- go.sum | 23 +++- markets/retrievaladapter/provider.go | 28 ++--- markets/storageadapter/client.go | 1 + markets/storageadapter/provider.go | 151 +++++++++++++++++++----- node/builder.go | 1 + node/impl/client/client.go | 7 +- node/impl/storminer.go | 3 +- node/modules/client.go | 17 ++- node/modules/dtypes/storage.go | 4 +- node/modules/storageminer.go | 35 ++++-- storage/sectorblocks/blocks.go | 165 ++++++--------------------- storage/sectorblocks/blockstore.go | 126 -------------------- 13 files changed, 238 insertions(+), 329 deletions(-) delete mode 100644 storage/sectorblocks/blockstore.go diff --git a/go.mod b/go.mod index d261af44c..2876dc82f 100644 --- a/go.mod +++ b/go.mod @@ -18,7 +18,7 @@ require ( github.com/filecoin-project/go-cbor-util v0.0.0-20191219014500-08c40a1e63a2 github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03 github.com/filecoin-project/go-data-transfer v0.0.0-20191219005021-4accf56bd2ce - github.com/filecoin-project/go-fil-markets v0.0.0-20200114015428-74d100f305f8 + github.com/filecoin-project/go-fil-markets v0.0.0-20200124174839-9211de075b61 github.com/filecoin-project/go-paramfetch v0.0.1 github.com/filecoin-project/go-sectorbuilder v0.0.2-0.20200123143044-d9cc96c53c55 github.com/filecoin-project/go-statestore v0.1.0 @@ -32,7 +32,7 @@ require ( github.com/ipfs/go-bitswap v0.1.8 github.com/ipfs/go-block-format v0.0.2 github.com/ipfs/go-blockservice v0.1.3-0.20190908200855-f22eea50656c - github.com/ipfs/go-car v0.0.3-0.20200121013634-f188c0e24291 + github.com/ipfs/go-car v0.0.3-0.20200124090545-1a340009d896 github.com/ipfs/go-cid v0.0.4 github.com/ipfs/go-datastore v0.3.1 github.com/ipfs/go-ds-badger2 v0.0.0-20200123200730-d75eb2678a5d @@ -83,7 +83,7 @@ require ( github.com/multiformats/go-varint v0.0.2 github.com/opentracing/opentracing-go v1.1.0 github.com/polydawn/refmt v0.0.0-20190809202753-05966cbd336a - github.com/prometheus/common v0.2.0 + github.com/prometheus/common v0.4.0 github.com/stretchr/testify v1.4.0 github.com/whyrusleeping/bencher v0.0.0-20190829221104-bb6607aa8bba github.com/whyrusleeping/cbor-gen v0.0.0-20200121162646-b63bacf5eaf8 diff --git a/go.sum b/go.sum index 3b9b9a133..bda87f07a 100644 --- a/go.sum +++ b/go.sum @@ -111,8 +111,8 @@ github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03 h1:2pMX github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03/go.mod h1:+viYnvGtUTgJRdy6oaeF4MTFKAfatX071MPDPBL11EQ= github.com/filecoin-project/go-data-transfer v0.0.0-20191219005021-4accf56bd2ce h1:Jdejrx6XVSTRy2PiX08HCU5y68p3wx2hNMJJc/J7kZY= github.com/filecoin-project/go-data-transfer v0.0.0-20191219005021-4accf56bd2ce/go.mod h1:b14UWxhxVCAjrQUYvVGrQRRsjAh79wXYejw9RbUcAww= -github.com/filecoin-project/go-fil-markets v0.0.0-20200114015428-74d100f305f8 h1:g3oodvSz+Ou+ObwcVBB2wyt8SHdWpwzMiNJ19U1zZNA= -github.com/filecoin-project/go-fil-markets v0.0.0-20200114015428-74d100f305f8/go.mod h1:c8NTjvFVy1Ud02mmGDjOiMeawY2t6ALfrrdvAB01FQc= +github.com/filecoin-project/go-fil-markets v0.0.0-20200124174839-9211de075b61 h1:51j9Csz3r/AeVUtUlPMHfUCOAG4D+uodkzT2ODHgJOk= +github.com/filecoin-project/go-fil-markets v0.0.0-20200124174839-9211de075b61/go.mod h1:hbYlEmbOg9QwhZ71B724oAgXQ0wnoWHj8S+33q9lrm8= github.com/filecoin-project/go-paramfetch v0.0.0-20200102181131-b20d579f2878/go.mod h1:40kI2Gv16mwcRsHptI3OAV4nlOEU7wVDc4RgMylNFjU= github.com/filecoin-project/go-paramfetch v0.0.1 h1:gV7bs5YaqlgpGFMiLxInGK2L1FyCXUE0rimz4L7ghoE= github.com/filecoin-project/go-paramfetch v0.0.1/go.mod h1:fZzmf4tftbwf9S37XRifoJlz7nCjRdIrMGLR07dKLCc= @@ -177,6 +177,8 @@ github.com/gxed/hashland/murmur3 v0.0.1/go.mod h1:KjXop02n4/ckmZSnY2+HKcLud/tcmv github.com/gxed/pubsub v0.0.0-20180201040156-26ebdf44f824/go.mod h1:OiEWyHgK+CWrmOlVquHaIK1vhpUJydC9m0Je6mhaiNE= github.com/hannahhoward/cbor-gen-for v0.0.0-20191216214420-3e450425c40c h1:+MSf4NEnLCYZoAgK6fqwc7NH88nM8haFSxKGUGIG3vA= github.com/hannahhoward/cbor-gen-for v0.0.0-20191216214420-3e450425c40c/go.mod h1:WVPCl0HO/0RAL5+vBH2GMxBomlxBF70MAS78+Lu1//k= +github.com/hannahhoward/cbor-gen-for v0.0.0-20191218204337-9ab7b1bcc099 h1:vQqOW42RRM5LoM/1K5dK940VipLqpH8lEVGrMz+mNjU= +github.com/hannahhoward/cbor-gen-for v0.0.0-20191218204337-9ab7b1bcc099/go.mod h1:WVPCl0HO/0RAL5+vBH2GMxBomlxBF70MAS78+Lu1//k= github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA= github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= github.com/hashicorp/go-multierror v1.0.0 h1:iVjPR7a6H0tWELX5NxNe7bYopibicUzc7uPribsnS6o= @@ -211,9 +213,8 @@ github.com/ipfs/go-blockservice v0.0.7/go.mod h1:EOfb9k/Y878ZTRY/CH0x5+ATtaipfbR github.com/ipfs/go-blockservice v0.1.0/go.mod h1:hzmMScl1kXHg3M2BjTymbVPjv627N7sYcvYaKbop39M= github.com/ipfs/go-blockservice v0.1.3-0.20190908200855-f22eea50656c h1:lN5IQA07VtLiTLAp/Scezp1ljFhXErC6yq4O1cu+yJ0= github.com/ipfs/go-blockservice v0.1.3-0.20190908200855-f22eea50656c/go.mod h1:t+411r7psEUhLueM8C7aPA7cxCclv4O3VsUVxt9kz2I= -github.com/ipfs/go-car v0.0.3-0.20191203022317-23b0a85fd1b1/go.mod h1:rmd887mJxQRDfndfDEY3Liyx8gQVyfFFRSHdsnDSAlk= -github.com/ipfs/go-car v0.0.3-0.20200121013634-f188c0e24291 h1:Yy0dcFWw8oDV/WJ4S/rkMQRWnJ3tGr9EbgDDv2JhVQw= -github.com/ipfs/go-car v0.0.3-0.20200121013634-f188c0e24291/go.mod h1:AG6sBpd2PWMccpAG7XLFBBQ/4rfBEtzUNeO2GSMesYk= +github.com/ipfs/go-car v0.0.3-0.20200124090545-1a340009d896 h1:l8gnU1VBhftugMKzfh+n7nuDhOw3X1iqfrA33GVBMMY= +github.com/ipfs/go-car v0.0.3-0.20200124090545-1a340009d896/go.mod h1:rmd887mJxQRDfndfDEY3Liyx8gQVyfFFRSHdsnDSAlk= github.com/ipfs/go-cid v0.0.1/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM= github.com/ipfs/go-cid v0.0.2/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM= github.com/ipfs/go-cid v0.0.3/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM= @@ -305,7 +306,6 @@ github.com/ipfs/go-unixfs v0.2.2-0.20190827150610-868af2e9e5cb h1:tmWYgjltxwM7PD github.com/ipfs/go-unixfs v0.2.2-0.20190827150610-868af2e9e5cb/go.mod h1:IwAAgul1UQIcNZzKPYZWOCijryFBeCV79cNubPzol+k= github.com/ipfs/go-verifcid v0.0.1 h1:m2HI7zIuR5TFyQ1b79Da5N9dnnCP1vcu2QqawmWlK2E= github.com/ipfs/go-verifcid v0.0.1/go.mod h1:5Hrva5KBeIog4A+UpqlaIU+DEstipcJYQQZc0g37pY0= -github.com/ipld/go-ipld-prime v0.0.1/go.mod h1:bDDSvVz7vaK12FNvMeRYnpRFkSUPNQOiCYQezMD/P3w= github.com/ipld/go-ipld-prime v0.0.2-0.20191108012745-28a82f04c785 h1:fASnkvtR+SmB2y453RxmDD3Uvd4LonVUgFGk9JoDaZs= github.com/ipld/go-ipld-prime v0.0.2-0.20191108012745-28a82f04c785/go.mod h1:bDDSvVz7vaK12FNvMeRYnpRFkSUPNQOiCYQezMD/P3w= github.com/ipld/go-ipld-prime-proto v0.0.0-20191113031812-e32bd156a1e5 h1:lSip43rAdyGA+yRQuy6ju0ucZkWpYc1F2CTQtZTVW/4= @@ -410,10 +410,13 @@ github.com/libp2p/go-libp2p-discovery v0.1.0/go.mod h1:4F/x+aldVHjHDHuX85x1zWoFT github.com/libp2p/go-libp2p-discovery v0.2.0 h1:1p3YSOq7VsgaL+xVHPi8XAmtGyas6D2J6rWBEfz/aiY= github.com/libp2p/go-libp2p-discovery v0.2.0/go.mod h1:s4VGaxYMbw4+4+tsoQTqh7wfxg97AEdo4GYBt6BadWg= github.com/libp2p/go-libp2p-host v0.0.1/go.mod h1:qWd+H1yuU0m5CwzAkvbSjqKairayEHdR5MMl7Cwa7Go= +github.com/libp2p/go-libp2p-host v0.0.3 h1:BB/1Z+4X0rjKP5lbQTmjEjLbDVbrcmLOlA6QDsN5/j4= github.com/libp2p/go-libp2p-host v0.0.3/go.mod h1:Y/qPyA6C8j2coYyos1dfRm0I8+nvd4TGrDGt4tA7JR8= github.com/libp2p/go-libp2p-interface-connmgr v0.0.1/go.mod h1:GarlRLH0LdeWcLnYM/SaBykKFl9U5JFnbBGruAk/D5k= github.com/libp2p/go-libp2p-interface-connmgr v0.0.4/go.mod h1:GarlRLH0LdeWcLnYM/SaBykKFl9U5JFnbBGruAk/D5k= +github.com/libp2p/go-libp2p-interface-connmgr v0.0.5 h1:KG/KNYL2tYzXAfMvQN5K1aAGTYSYUMJ1prgYa2/JI1E= github.com/libp2p/go-libp2p-interface-connmgr v0.0.5/go.mod h1:GarlRLH0LdeWcLnYM/SaBykKFl9U5JFnbBGruAk/D5k= +github.com/libp2p/go-libp2p-interface-pnet v0.0.1 h1:7GnzRrBTJHEsofi1ahFdPN9Si6skwXQE9UqR2S+Pkh8= github.com/libp2p/go-libp2p-interface-pnet v0.0.1/go.mod h1:el9jHpQAXK5dnTpKA4yfCNBZXvrzdOU75zz+C6ryp3k= github.com/libp2p/go-libp2p-kad-dht v0.1.1 h1:IH6NQuoUv5w5e1O8Jc3KyVDtr0rNd0G9aaADpLI1xVo= github.com/libp2p/go-libp2p-kad-dht v0.1.1/go.mod h1:1kj2Rk5pX3/0RwqMm9AMNCT7DzcMHYhgDN5VTi+cY0M= @@ -422,6 +425,7 @@ github.com/libp2p/go-libp2p-kbucket v0.2.0/go.mod h1:JNymBToym3QXKBMKGy3m29+xprg github.com/libp2p/go-libp2p-loggables v0.0.1/go.mod h1:lDipDlBNYbpyqyPX/KcoO+eq0sJYEVR2JgOexcivchg= github.com/libp2p/go-libp2p-loggables v0.1.0 h1:h3w8QFfCt2UJl/0/NW4K829HX/0S4KD31PQ7m8UXXO8= github.com/libp2p/go-libp2p-loggables v0.1.0/go.mod h1:EyumB2Y6PrYjr55Q3/tiJ/o3xoDasoRYM7nOzEpoa90= +github.com/libp2p/go-libp2p-metrics v0.0.1 h1:yumdPC/P2VzINdmcKZd0pciSUCpou+s0lwYCjBbzQZU= github.com/libp2p/go-libp2p-metrics v0.0.1/go.mod h1:jQJ95SXXA/K1VZi13h52WZMa9ja78zjyy5rspMsC/08= github.com/libp2p/go-libp2p-mplex v0.1.1/go.mod h1:KUQWpGkCzfV7UIpi8SKsAVxyBgz1c9R5EvxgnwLsb/I= github.com/libp2p/go-libp2p-mplex v0.2.0/go.mod h1:Ejl9IyjvXJ0T9iqUTE1jpYATQ9NM3g+OtR+EMMODbKo= @@ -431,6 +435,7 @@ github.com/libp2p/go-libp2p-nat v0.0.4/go.mod h1:N9Js/zVtAXqaeT99cXgTV9e75KpnWCv github.com/libp2p/go-libp2p-nat v0.0.5 h1:/mH8pXFVKleflDL1YwqMg27W9GD8kjEx7NY0P6eGc98= github.com/libp2p/go-libp2p-nat v0.0.5/go.mod h1:1qubaE5bTZMJE+E/uu2URroMbzdubFz1ChgiN79yKPE= github.com/libp2p/go-libp2p-net v0.0.1/go.mod h1:Yt3zgmlsHOgUWSXmt5V/Jpz9upuJBE8EgNU9DrCcR8c= +github.com/libp2p/go-libp2p-net v0.0.2 h1:qP06u4TYXfl7uW/hzqPhlVVTSA2nw1B/bHBJaUnbh6M= github.com/libp2p/go-libp2p-net v0.0.2/go.mod h1:Yt3zgmlsHOgUWSXmt5V/Jpz9upuJBE8EgNU9DrCcR8c= github.com/libp2p/go-libp2p-netutil v0.0.1/go.mod h1:GdusFvujWZI9Vt0X5BKqwWWmZFxecf9Gt03cKxm2f/Q= github.com/libp2p/go-libp2p-netutil v0.1.0 h1:zscYDNVEcGxyUpMd0JReUZTrpMfia8PmLKcKF72EAMQ= @@ -447,6 +452,7 @@ github.com/libp2p/go-libp2p-peerstore v0.1.3/go.mod h1:BJ9sHlm59/80oSkpWgr1MyY1c github.com/libp2p/go-libp2p-peerstore v0.1.4 h1:d23fvq5oYMJ/lkkbO4oTwBp/JP+I/1m5gZJobNXCE/k= github.com/libp2p/go-libp2p-peerstore v0.1.4/go.mod h1:+4BDbDiiKf4PzpANZDAT+knVdLxvqh7hXOujessqdzs= github.com/libp2p/go-libp2p-protocol v0.0.1/go.mod h1:Af9n4PiruirSDjHycM1QuiMi/1VZNHYcK8cLgFJLZ4s= +github.com/libp2p/go-libp2p-protocol v0.1.0 h1:HdqhEyhg0ToCaxgMhnOmUO8snQtt/kQlcjVk3UoJU3c= github.com/libp2p/go-libp2p-protocol v0.1.0/go.mod h1:KQPHpAabB57XQxGrXCNvbL6UEXfQqUgC/1adR2Xtflk= github.com/libp2p/go-libp2p-pubsub v0.2.3 h1:qJRnRnM7Z4xnHb4i6EBb3DKQXRPgtFWlKP4AmfJudLQ= github.com/libp2p/go-libp2p-pubsub v0.2.3/go.mod h1:Jscj3fk23R5mCrOwb625xjVs5ZEyTZcx/OlTwMDqU+g= @@ -515,6 +521,7 @@ github.com/libp2p/go-reuseport v0.0.1/go.mod h1:jn6RmB1ufnQwl0Q1f+YxAj8isJgDCQza github.com/libp2p/go-reuseport-transport v0.0.2 h1:WglMwyXyBu61CMkjCCtnmqNqnjib0GIEjMiHTwR/KN4= github.com/libp2p/go-reuseport-transport v0.0.2/go.mod h1:YkbSDrvjUVDL6b8XqriyA20obEtsW9BLkuOUyQAOCbs= github.com/libp2p/go-stream-muxer v0.0.1/go.mod h1:bAo8x7YkSpadMTbtTaxGVHWUQsR/l5MEaHbKaliuT14= +github.com/libp2p/go-stream-muxer v0.1.0 h1:3ToDXUzx8pDC6RfuOzGsUYP5roMDthbUKRdMRRhqAqY= github.com/libp2p/go-stream-muxer v0.1.0/go.mod h1:8JAVsjeRBCWwPoZeH0W1imLOcriqXJyFvB0mR4A04sQ= github.com/libp2p/go-stream-muxer-multistream v0.1.1/go.mod h1:zmGdfkQ1AzOECIAcccoL8L//laqawOsO03zX8Sa+eGw= github.com/libp2p/go-stream-muxer-multistream v0.2.0 h1:714bRJ4Zy9mdhyTLJ+ZKiROmAFwUHpeRidG+q7LTQOg= @@ -554,6 +561,7 @@ github.com/mattn/go-runewidth v0.0.7 h1:Ei8KR0497xHyKJPAv59M1dkC+rOZCMBJ+t3fZ+tw github.com/mattn/go-runewidth v0.0.7/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= github.com/mgutz/ansi v0.0.0-20170206155736-9520e82c474b/go.mod h1:01TrycV0kFyexm33Z7vhZRXopbI8J3TDReVlkTgMUxE= +github.com/miekg/dns v1.1.12 h1:WMhc1ik4LNkTg8U9l3hI1LvxKmIL+f1+WV/SZtCbDDA= github.com/miekg/dns v1.1.12/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg= github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1 h1:lYpkrQH5ajf0OXOcUbGjvZxxijuBwbbmlSxLiuofa+g= github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1/go.mod h1:pD8RvIylQ358TN4wwqatJ8rNavkEINozVn9DtGI3dfQ= @@ -647,6 +655,8 @@ github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1: github.com/prometheus/client_model v0.0.0-20190115171406-56726106282f/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= github.com/prometheus/common v0.2.0 h1:kUZDBDTdBVBYBj5Tmh2NZLlF60mfjA27rM34b+cVwNU= github.com/prometheus/common v0.2.0/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4= +github.com/prometheus/common v0.4.0 h1:7etb9YClo3a6HjLzfl6rIQaU+FDfi0VSX39io3aQ+DM= +github.com/prometheus/common v0.4.0/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4= github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= github.com/prometheus/procfs v0.0.0-20190117184657-bf6a532e95b1/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= @@ -728,6 +738,7 @@ github.com/whyrusleeping/go-smux-yamux v2.0.8+incompatible/go.mod h1:6qHUzBXUbB9 github.com/whyrusleeping/mafmt v1.2.8 h1:TCghSl5kkwEE0j+sU/gudyhVMRlpBin8fMBBHg59EbA= github.com/whyrusleeping/mafmt v1.2.8/go.mod h1:faQJFPbLSxzD9xpA02ttW/tS9vZykNvXwGvqIpk20FA= github.com/whyrusleeping/mdns v0.0.0-20180901202407-ef14215e6b30/go.mod h1:j4l84WPFclQPj320J9gp0XwNKBb3U0zt5CBqjPp22G4= +github.com/whyrusleeping/mdns v0.0.0-20190826153040-b9b60ed33aa9 h1:Y1/FEOpaCpD21WxrmfeIYCFPuVPRCY2XZTWzTNHGw30= github.com/whyrusleeping/mdns v0.0.0-20190826153040-b9b60ed33aa9/go.mod h1:j4l84WPFclQPj320J9gp0XwNKBb3U0zt5CBqjPp22G4= github.com/whyrusleeping/multiaddr-filter v0.0.0-20160516205228-e903e4adabd7 h1:E9S12nwJwEOXe2d6gT6qxdvqMnNq+VnSsKPgm2ZZNds= github.com/whyrusleeping/multiaddr-filter v0.0.0-20160516205228-e903e4adabd7/go.mod h1:X2c0RVCI1eSUFI8eLcY3c0423ykwiUdxLJtkDvruhjI= diff --git a/markets/retrievaladapter/provider.go b/markets/retrievaladapter/provider.go index 82c765123..c3fcf47a4 100644 --- a/markets/retrievaladapter/provider.go +++ b/markets/retrievaladapter/provider.go @@ -2,40 +2,36 @@ package retrievaladapter import ( "context" - - "github.com/ipfs/go-cid" - blockstore "github.com/ipfs/go-ipfs-blockstore" + "io" "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-fil-markets/retrievalmarket" retrievaltoken "github.com/filecoin-project/go-fil-markets/shared/tokenamount" retrievaltypes "github.com/filecoin-project/go-fil-markets/shared/types" + "github.com/filecoin-project/go-sectorbuilder" "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/markets/utils" - "github.com/filecoin-project/lotus/storage/sectorblocks" + "github.com/filecoin-project/lotus/storage" ) type retrievalProviderNode struct { - sectorBlocks *sectorblocks.SectorBlocks - full api.FullNode + miner *storage.Miner + sb sectorbuilder.Interface + full api.FullNode } // NewRetrievalProviderNode returns a new node adapter for a retrieval provider that talks to the // Lotus Node -func NewRetrievalProviderNode(sectorBlocks *sectorblocks.SectorBlocks, full api.FullNode) retrievalmarket.RetrievalProviderNode { - return &retrievalProviderNode{sectorBlocks, full} +func NewRetrievalProviderNode(miner *storage.Miner, sb sectorbuilder.Interface, full api.FullNode) retrievalmarket.RetrievalProviderNode { + return &retrievalProviderNode{miner, sb, full} } -func (rpn *retrievalProviderNode) GetPieceSize(pieceCid []byte) (uint64, error) { - asCid, err := cid.Cast(pieceCid) +func (rpn *retrievalProviderNode) UnsealSector(ctx context.Context, sectorID uint64, offset uint64, length uint64) (io.ReadCloser, error) { + si, err := rpn.miner.GetSectorInfo(sectorID) if err != nil { - return 0, err + return nil, err } - return rpn.sectorBlocks.GetSize(asCid) -} - -func (rpn *retrievalProviderNode) SealedBlockstore(approveUnseal func() error) blockstore.Blockstore { - return rpn.sectorBlocks.SealedBlockstore(approveUnseal) + return rpn.sb.ReadPieceFromSealedSector(sectorID, offset, length, si.Ticket.TicketBytes, si.CommD) } func (rpn *retrievalProviderNode) SavePaymentVoucher(ctx context.Context, paymentChannel address.Address, voucher *retrievaltypes.SignedVoucher, proof []byte, expectedAmount retrievaltoken.TokenAmount) (retrievaltoken.TokenAmount, error) { diff --git a/markets/storageadapter/client.go b/markets/storageadapter/client.go index 519e12e57..51e563765 100644 --- a/markets/storageadapter/client.go +++ b/markets/storageadapter/client.go @@ -221,6 +221,7 @@ func (c *ClientNodeAdapter) ValidatePublishedDeal(ctx context.Context, deal stor func (c *ClientNodeAdapter) OnDealSectorCommitted(ctx context.Context, provider address.Address, dealId uint64, cb storagemarket.DealSectorCommittedCallback) error { checkFunc := func(ts *types.TipSet) (done bool, more bool, err error) { sd, err := stmgr.GetStorageDeal(ctx, c.StateManager, dealId, ts) + if err != nil { // TODO: This may be fine for some errors return false, false, xerrors.Errorf("failed to look up deal on chain: %w", err) diff --git a/markets/storageadapter/provider.go b/markets/storageadapter/provider.go index abe7f66eb..608148a0a 100644 --- a/markets/storageadapter/provider.go +++ b/markets/storageadapter/provider.go @@ -5,10 +5,10 @@ package storageadapter import ( "bytes" "context" + "io" "github.com/ipfs/go-cid" logging "github.com/ipfs/go-log" - unixfile "github.com/ipfs/go-unixfs/file" "golang.org/x/xerrors" "github.com/filecoin-project/go-address" @@ -16,11 +16,13 @@ import ( sharedtypes "github.com/filecoin-project/go-fil-markets/shared/types" "github.com/filecoin-project/go-fil-markets/storagemarket" "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain/actors" + "github.com/filecoin-project/lotus/chain/events" "github.com/filecoin-project/lotus/chain/types" - "github.com/filecoin-project/lotus/lib/padreader" "github.com/filecoin-project/lotus/markets/utils" "github.com/filecoin-project/lotus/node/modules/dtypes" + "github.com/filecoin-project/lotus/storage/sealing" "github.com/filecoin-project/lotus/storage/sectorblocks" ) @@ -33,6 +35,7 @@ type ProviderNodeAdapter struct { dag dtypes.StagingDAG secb *sectorblocks.SectorBlocks + ev *events.Events } func NewProviderNodeAdapter(dag dtypes.StagingDAG, secb *sectorblocks.SectorBlocks, full api.FullNode) storagemarket.StorageProviderNode { @@ -40,6 +43,7 @@ func NewProviderNodeAdapter(dag dtypes.StagingDAG, secb *sectorblocks.SectorBloc FullNode: full, dag: dag, secb: secb, + ev: events.NewEvents(context.TODO(), full), } } @@ -94,35 +98,9 @@ func (n *ProviderNodeAdapter) PublishDeals(ctx context.Context, deal storagemark return storagemarket.DealID(resp.DealIDs[0]), smsg.Cid(), nil } -func (n *ProviderNodeAdapter) OnDealComplete(ctx context.Context, deal storagemarket.MinerDeal, piecePath string) (uint64, error) { - root, err := n.dag.Get(ctx, deal.Ref) - if err != nil { - return 0, xerrors.Errorf("failed to get file root for deal: %s", err) - } +func (n *ProviderNodeAdapter) OnDealComplete(ctx context.Context, deal storagemarket.MinerDeal, pieceSize uint64, pieceData io.Reader) (uint64, error) { - // TODO: abstract this away into ReadSizeCloser + implement different modes - node, err := unixfile.NewUnixfsFile(ctx, n.dag, root) - if err != nil { - return 0, xerrors.Errorf("cannot open unixfs file: %s", err) - } - - uf, ok := node.(sectorblocks.UnixfsReader) - if !ok { - // we probably got directory, unsupported for now - return 0, xerrors.Errorf("unsupported unixfs file type") - } - - // TODO: uf.Size() is user input, not trusted - // This won't be useful / here after we migrate to putting CARs into sectors - size, err := uf.Size() - if err != nil { - return 0, xerrors.Errorf("getting unixfs file size: %w", err) - } - if padreader.PaddedSize(uint64(size)) != deal.Proposal.PieceSize { - return 0, xerrors.Errorf("deal.Proposal.PieceSize didn't match padded unixfs file size") - } - - sectorID, err := n.secb.AddUnixfsPiece(ctx, uf, deal.DealID) + sectorID, err := n.secb.AddPiece(ctx, pieceSize, pieceData, deal.DealID) if err != nil { return 0, xerrors.Errorf("AddPiece failed: %s", err) } @@ -206,4 +184,117 @@ func (n *ProviderNodeAdapter) GetBalance(ctx context.Context, addr address.Addre return utils.ToSharedBalance(bal), nil } +func (n *ProviderNodeAdapter) LocatePieceForDealWithinSector(ctx context.Context, dealID uint64) (sectorID uint64, offset uint64, length uint64, err error) { + + refs, err := n.secb.GetRefs(dealID) + if err != nil { + return 0, 0, 0, err + } + if len(refs) == 0 { + return 0, 0, 0, xerrors.New("no sector information for deal ID") + } + + // TODO: better strategy (e.g. look for already unsealed) + var best api.SealedRef + var bestSi sealing.SectorInfo + for _, r := range refs { + si, err := n.secb.Miner.GetSectorInfo(r.SectorID) + if err != nil { + return 0, 0, 0, xerrors.Errorf("getting sector info: %w", err) + } + if si.State == api.Proving { + best = r + bestSi = si + break + } + } + if bestSi.State == api.UndefinedSectorState { + return 0, 0, 0, xerrors.New("no sealed sector found") + } + return best.SectorID, best.Offset, best.Size, nil +} + +func (n *ProviderNodeAdapter) OnDealSectorCommitted(ctx context.Context, provider address.Address, dealID uint64, cb storagemarket.DealSectorCommittedCallback) error { + checkFunc := func(ts *types.TipSet) (done bool, more bool, err error) { + sd, err := n.StateMarketStorageDeal(ctx, dealID, ts) + + if err != nil { + // TODO: This may be fine for some errors + return false, false, xerrors.Errorf("failed to look up deal on chain: %w", err) + } + + if sd.ActivationEpoch > 0 { + cb(nil) + return true, false, nil + } + + return false, true, nil + } + + called := func(msg *types.Message, rec *types.MessageReceipt, ts *types.TipSet, curH uint64) (more bool, err error) { + defer func() { + if err != nil { + cb(xerrors.Errorf("handling applied event: %w", err)) + } + }() + + if msg == nil { + log.Error("timed out waiting for deal activation... what now?") + return false, nil + } + + sd, err := n.StateMarketStorageDeal(ctx, dealID, ts) + if err != nil { + return false, xerrors.Errorf("failed to look up deal on chain: %w", err) + } + + if sd.ActivationEpoch == 0 { + return false, xerrors.Errorf("deal wasn't active: deal=%d, parentState=%s, h=%d", dealID, ts.ParentState(), ts.Height()) + } + + log.Infof("Storage deal %d activated at epoch %d", dealID, sd.ActivationEpoch) + + cb(nil) + + return false, nil + } + + revert := func(ctx context.Context, ts *types.TipSet) error { + log.Warn("deal activation reverted; TODO: actually handle this!") + // TODO: Just go back to DealSealing? + return nil + } + + matchEvent := func(msg *types.Message) (bool, error) { + if msg.To != provider { + return false, nil + } + + if msg.Method != actors.MAMethods.ProveCommitSector { + return false, nil + } + + var params actors.SectorProveCommitInfo + if err := params.UnmarshalCBOR(bytes.NewReader(msg.Params)); err != nil { + return false, err + } + + var found bool + for _, did := range params.DealIDs { + if did == dealID { + found = true + break + } + } + + return found, nil + } + + if err := n.ev.Called(checkFunc, called, revert, 3, build.SealRandomnessLookbackLimit, matchEvent); err != nil { + return xerrors.Errorf("failed to set up called handler") + } + + return nil +} + var _ storagemarket.StorageProviderNode = &ProviderNodeAdapter{} diff --git a/node/builder.go b/node/builder.go index 5ec6e4183..48d3037cb 100644 --- a/node/builder.go +++ b/node/builder.go @@ -256,6 +256,7 @@ func Online() Option { Override(new(dtypes.ProviderDealStore), modules.NewProviderDealStore), Override(new(dtypes.ProviderDataTransfer), modules.NewProviderDAGServiceDataTransfer), Override(new(*deals.ProviderRequestValidator), modules.NewProviderRequestValidator), + Override(new(dtypes.ProviderPieceStore), modules.NewProviderPieceStore), Override(new(storagemarket.StorageProvider), modules.StorageProvider), Override(new(storagemarket.StorageProviderNode), storageadapter.NewProviderNodeAdapter), Override(RegisterProviderValidatorKey, modules.RegisterProviderValidator), diff --git a/node/impl/client/client.go b/node/impl/client/client.go index 5623f4ae1..7968a8bf0 100644 --- a/node/impl/client/client.go +++ b/node/impl/client/client.go @@ -1,7 +1,6 @@ package client import ( - "bytes" "context" "errors" "io" @@ -155,7 +154,7 @@ func (a *API) ClientFindData(ctx context.Context, root cid.Cid) ([]api.QueryOffe out := make([]api.QueryOffer, len(peers)) for k, p := range peers { - queryResponse, err := a.Retrieval.Query(ctx, p, root.Bytes(), retrievalmarket.QueryParams{}) + queryResponse, err := a.Retrieval.Query(ctx, p, root, retrievalmarket.QueryParams{}) if err != nil { out[k] = api.QueryOffer{Err: err.Error(), Miner: p.Address, MinerPeerID: p.ID} } else { @@ -279,7 +278,7 @@ func (a *API) ClientRetrieve(ctx context.Context, order api.RetrievalOrder, path retrievalResult := make(chan error, 1) unsubscribe := a.Retrieval.SubscribeToEvents(func(event retrievalmarket.ClientEvent, state retrievalmarket.ClientDealState) { - if bytes.Equal(state.PieceCID, order.Root.Bytes()) { + if state.PayloadCID.Equals(order.Root) { switch event { case retrievalmarket.ClientEventError: retrievalResult <- xerrors.New("Retrieval Error") @@ -291,7 +290,7 @@ func (a *API) ClientRetrieve(ctx context.Context, order api.RetrievalOrder, path a.Retrieval.Retrieve( ctx, - order.Root.Bytes(), + order.Root, retrievalmarket.NewParamsV0(types.BigDiv(order.Total, types.NewInt(order.Size)).Int, 0, 0), utils.ToSharedTokenAmount(order.Total), order.MinerPeerID, diff --git a/node/impl/storminer.go b/node/impl/storminer.go index b600dfb7f..7089874f3 100644 --- a/node/impl/storminer.go +++ b/node/impl/storminer.go @@ -7,6 +7,7 @@ import ( "mime" "net/http" "os" + "strconv" "github.com/filecoin-project/lotus/api/apistruct" @@ -210,7 +211,7 @@ func (sm *StorageMinerAPI) SectorsRefs(context.Context) (map[string][]api.Sealed } for k, v := range refs { - out[k.String()] = v + out[strconv.FormatUint(k, 10)] = v } return out, nil diff --git a/node/modules/client.go b/node/modules/client.go index b113a5968..0897de6ea 100644 --- a/node/modules/client.go +++ b/node/modules/client.go @@ -5,10 +5,12 @@ import ( "path/filepath" "reflect" - "github.com/filecoin-project/go-data-transfer/impl/graphsync" + graphsyncimpl "github.com/filecoin-project/go-data-transfer/impl/graphsync" + piecefilestore "github.com/filecoin-project/go-fil-markets/filestore" "github.com/filecoin-project/go-fil-markets/retrievalmarket" "github.com/filecoin-project/go-fil-markets/retrievalmarket/discovery" retrievalimpl "github.com/filecoin-project/go-fil-markets/retrievalmarket/impl" + rmnet "github.com/filecoin-project/go-fil-markets/retrievalmarket/network" "github.com/filecoin-project/go-fil-markets/storagemarket" deals "github.com/filecoin-project/go-fil-markets/storagemarket/impl" storageimpl "github.com/filecoin-project/go-fil-markets/storagemarket/impl" @@ -109,12 +111,17 @@ func NewClientRequestValidator(deals dtypes.ClientDealStore) *storageimpl.Client return storageimpl.NewClientRequestValidator(deals) } -func StorageClient(h host.Host, dag dtypes.ClientDAG, dataTransfer dtypes.ClientDataTransfer, discovery *discovery.Local, deals dtypes.ClientDealStore, scn storagemarket.StorageClientNode) storagemarket.StorageClient { - return storageimpl.NewClient(h, dag, dataTransfer, discovery, deals, scn) +func StorageClient(h host.Host, ibs dtypes.ClientBlockstore, r repo.LockedRepo, dataTransfer dtypes.ClientDataTransfer, discovery *discovery.Local, deals dtypes.ClientDealStore, scn storagemarket.StorageClientNode) (storagemarket.StorageClient, error) { + store, err := piecefilestore.NewLocalFileStore(piecefilestore.OsPath(r.Path())) + if err != nil { + return nil, err + } + return storageimpl.NewClient(h, ibs, store, dataTransfer, discovery, deals, scn), nil } // RetrievalClient creates a new retrieval client attached to the client blockstore -func RetrievalClient(h host.Host, bs dtypes.ClientBlockstore, pmgr *paych.Manager, payapi payapi.PaychAPI) retrievalmarket.RetrievalClient { +func RetrievalClient(h host.Host, bs dtypes.ClientBlockstore, pmgr *paych.Manager, payapi payapi.PaychAPI, resolver retrievalmarket.PeerResolver) retrievalmarket.RetrievalClient { adapter := retrievaladapter.NewRetrievalClientNode(pmgr, payapi) - return retrievalimpl.NewClient(h, bs, adapter) + network := rmnet.NewFromLibp2pHost(h) + return retrievalimpl.NewClient(network, bs, adapter, resolver) } diff --git a/node/modules/dtypes/storage.go b/node/modules/dtypes/storage.go index d80f86d24..24d658ebf 100644 --- a/node/modules/dtypes/storage.go +++ b/node/modules/dtypes/storage.go @@ -1,6 +1,8 @@ package dtypes import ( + datatransfer "github.com/filecoin-project/go-data-transfer" + "github.com/filecoin-project/go-fil-markets/piecestore" bserv "github.com/ipfs/go-blockservice" "github.com/ipfs/go-datastore" "github.com/ipfs/go-filestore" @@ -9,7 +11,6 @@ import ( exchange "github.com/ipfs/go-ipfs-exchange-interface" ipld "github.com/ipfs/go-ipld-format" - "github.com/filecoin-project/go-data-transfer" "github.com/filecoin-project/go-statestore" ) @@ -34,6 +35,7 @@ type ClientDealStore *statestore.StateStore type ClientDataTransfer datatransfer.Manager type ProviderDealStore *statestore.StateStore +type ProviderPieceStore piecestore.PieceStore // ProviderDataTransfer is a data transfer manager for the provider type ProviderDataTransfer datatransfer.Manager diff --git a/node/modules/storageminer.go b/node/modules/storageminer.go index 533946696..ffcb40dde 100644 --- a/node/modules/storageminer.go +++ b/node/modules/storageminer.go @@ -7,8 +7,11 @@ import ( "github.com/filecoin-project/go-address" dtgraphsync "github.com/filecoin-project/go-data-transfer/impl/graphsync" + piecefilestore "github.com/filecoin-project/go-fil-markets/filestore" + "github.com/filecoin-project/go-fil-markets/piecestore" "github.com/filecoin-project/go-fil-markets/retrievalmarket" retrievalimpl "github.com/filecoin-project/go-fil-markets/retrievalmarket/impl" + rmnet "github.com/filecoin-project/go-fil-markets/retrievalmarket/network" "github.com/filecoin-project/go-fil-markets/storagemarket" deals "github.com/filecoin-project/go-fil-markets/storagemarket/impl" storageimpl "github.com/filecoin-project/go-fil-markets/storagemarket/impl" @@ -42,7 +45,6 @@ import ( "github.com/filecoin-project/lotus/node/repo" "github.com/filecoin-project/lotus/storage" "github.com/filecoin-project/lotus/storage/sealing" - "github.com/filecoin-project/lotus/storage/sectorblocks" ) func minerAddrFromDS(ds dtypes.MetadataDS) (address.Address, error) { @@ -124,7 +126,11 @@ func StorageMiner(mctx helpers.MetricsCtx, lc fx.Lifecycle, api api.FullNode, h func HandleRetrieval(host host.Host, lc fx.Lifecycle, m retrievalmarket.RetrievalProvider) { lc.Append(fx.Hook{ OnStart: func(context.Context) error { - m.Start(host) + m.Start() + return nil + }, + OnStop: func(context.Context) error { + m.Stop() return nil }, }) @@ -165,6 +171,12 @@ func NewProviderDealStore(ds dtypes.MetadataDS) dtypes.ProviderDealStore { return statestore.New(namespace.Wrap(ds, datastore.NewKey("/deals/client"))) } +// NewProviderPieceStore creates a statestore for storing metadata about pieces +// shared by the storage and retrieval providers +func NewProviderPieceStore(ds dtypes.MetadataDS) dtypes.ProviderPieceStore { + return piecestore.NewPieceStore(ds) +} + // StagingBlockstore creates a blockstore for staging blocks for a miner // in a storage deal, prior to sealing func StagingBlockstore(r repo.LockedRepo) (dtypes.StagingBlockstore, error) { @@ -269,12 +281,21 @@ func NewProviderRequestValidator(deals dtypes.ProviderDealStore) *storageimpl.Pr return storageimpl.NewProviderRequestValidator(deals) } -func StorageProvider(ds dtypes.MetadataDS, dag dtypes.StagingDAG, dataTransfer dtypes.ProviderDataTransfer, spn storagemarket.StorageProviderNode) (storagemarket.StorageProvider, error) { - return storageimpl.NewProvider(ds, dag, dataTransfer, spn) +func StorageProvider(ds dtypes.MetadataDS, ibs dtypes.StagingBlockstore, r repo.LockedRepo, pieceStore dtypes.ProviderPieceStore, dataTransfer dtypes.ProviderDataTransfer, spn storagemarket.StorageProviderNode) (storagemarket.StorageProvider, error) { + store, err := piecefilestore.NewLocalFileStore(piecefilestore.OsPath(r.Path())) + if err != nil { + return nil, err + } + return storageimpl.NewProvider(ds, ibs, store, pieceStore, dataTransfer, spn) } // RetrievalProvider creates a new retrieval provider attached to the provider blockstore -func RetrievalProvider(sblks *sectorblocks.SectorBlocks, full api.FullNode) retrievalmarket.RetrievalProvider { - adapter := retrievaladapter.NewRetrievalProviderNode(sblks, full) - return retrievalimpl.NewProvider(adapter) +func RetrievalProvider(h host.Host, miner *storage.Miner, sb sectorbuilder.Interface, full api.FullNode, ds dtypes.MetadataDS, pieceStore dtypes.ProviderPieceStore, ibs dtypes.StagingBlockstore) (retrievalmarket.RetrievalProvider, error) { + adapter := retrievaladapter.NewRetrievalProviderNode(miner, sb, full) + address, err := minerAddrFromDS(ds) + if err != nil { + return nil, err + } + network := rmnet.NewFromLibp2pHost(h) + return retrievalimpl.NewProvider(address, adapter, network, pieceStore, ibs), nil } diff --git a/storage/sectorblocks/blocks.go b/storage/sectorblocks/blocks.go index ed3a718df..3a34143ce 100644 --- a/storage/sectorblocks/blocks.go +++ b/storage/sectorblocks/blocks.go @@ -3,23 +3,19 @@ package sectorblocks import ( "bytes" "context" + "encoding/binary" "errors" "io" "sync" sectorbuilder "github.com/filecoin-project/go-sectorbuilder" - "github.com/ipfs/go-cid" "github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore/namespace" "github.com/ipfs/go-datastore/query" - blockstore "github.com/ipfs/go-ipfs-blockstore" dshelp "github.com/ipfs/go-ipfs-ds-help" - files "github.com/ipfs/go-ipfs-files" - ipld "github.com/ipfs/go-ipld-format" - "github.com/ipfs/go-unixfs" "golang.org/x/xerrors" - "github.com/filecoin-project/go-cbor-util" + cborutil "github.com/filecoin-project/go-cbor-util" "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/lib/padreader" "github.com/filecoin-project/lotus/node/modules/dtypes" @@ -33,16 +29,28 @@ const ( ) var dsPrefix = datastore.NewKey("/sealedblocks") -var imBlocksPrefix = datastore.NewKey("/intermediate") var ErrNotFound = errors.New("not found") +func DealIDToDsKey(dealID uint64) datastore.Key { + buf := make([]byte, binary.MaxVarintLen64) + binary.PutUvarint(buf, dealID) + return dshelp.NewKeyFromBinary(buf) +} + +func DsKeyToDealID(key datastore.Key) (uint64, error) { + buf, err := dshelp.BinaryFromDsKey(key) + if err != nil { + return 0, err + } + dealID, _ := binary.Uvarint(buf) + return dealID, nil +} + type SectorBlocks struct { *storage.Miner sb sectorbuilder.Interface - intermediate blockstore.Blockstore // holds intermediate nodes TODO: consider combining with the staging blockstore - keys datastore.Batching keyLk sync.Mutex } @@ -51,36 +59,17 @@ func NewSectorBlocks(miner *storage.Miner, ds dtypes.MetadataDS, sb sectorbuilde sbc := &SectorBlocks{ Miner: miner, sb: sb, - - intermediate: blockstore.NewBlockstore(namespace.Wrap(ds, imBlocksPrefix)), - - keys: namespace.Wrap(ds, dsPrefix), + keys: namespace.Wrap(ds, dsPrefix), } return sbc } -type UnixfsReader interface { - files.File - - // ReadBlock reads data from a single unixfs block. Data is nil - // for intermediate nodes - ReadBlock(context.Context) (data []byte, offset uint64, nd ipld.Node, err error) -} - -type refStorer struct { - blockReader UnixfsReader - writeRef func(cid cid.Cid, offset uint64, size uint64) error - intermediate blockstore.Blockstore - - remaining []byte -} - -func (st *SectorBlocks) writeRef(cid cid.Cid, sectorID uint64, offset uint64, size uint64) error { +func (st *SectorBlocks) writeRef(dealID uint64, sectorID uint64, offset uint64, size uint64) error { st.keyLk.Lock() // TODO: make this multithreaded defer st.keyLk.Unlock() - v, err := st.keys.Get(dshelp.CidToDsKey(cid)) + v, err := st.keys.Get(DealIDToDsKey(dealID)) if err == datastore.ErrNotFound { err = nil } @@ -105,80 +94,22 @@ func (st *SectorBlocks) writeRef(cid cid.Cid, sectorID uint64, offset uint64, si if err != nil { return xerrors.Errorf("serializing refs: %w", err) } - return st.keys.Put(dshelp.CidToDsKey(cid), newRef) // TODO: batch somehow + return st.keys.Put(DealIDToDsKey(dealID), newRef) // TODO: batch somehow } -func (r *refStorer) Read(p []byte) (n int, err error) { - offset := 0 - if len(r.remaining) > 0 { - offset += len(r.remaining) - read := copy(p, r.remaining) - if read == len(r.remaining) { - r.remaining = nil - } else { - r.remaining = r.remaining[read:] - } - return read, nil - } - - for { - data, offset, nd, err := r.blockReader.ReadBlock(context.TODO()) - if err != nil { - if err == io.EOF { - return 0, io.EOF - } - return 0, xerrors.Errorf("reading block: %w", err) - } - - if len(data) == 0 { - // TODO: batch - // TODO: GC - if err := r.intermediate.Put(nd); err != nil { - return 0, xerrors.Errorf("storing intermediate node: %w", err) - } - continue - } - - if err := r.writeRef(nd.Cid(), offset, uint64(len(data))); err != nil { - return 0, xerrors.Errorf("writing ref: %w", err) - } - - read := copy(p, data) - if read < len(data) { - r.remaining = data[read:] - } - // TODO: read multiple - return read, nil - } -} - -func (st *SectorBlocks) AddUnixfsPiece(ctx context.Context, r UnixfsReader, dealID uint64) (sectorID uint64, err error) { - size, err := r.Size() - if err != nil { - return 0, err - } +func (st *SectorBlocks) AddPiece(ctx context.Context, size uint64, r io.Reader, dealID uint64) (sectorID uint64, err error) { sectorID, pieceOffset, err := st.Miner.AllocatePiece(padreader.PaddedSize(uint64(size))) if err != nil { return 0, err } - refst := &refStorer{ - blockReader: r, - writeRef: func(cid cid.Cid, offset uint64, size uint64) error { - offset += pieceOffset + st.writeRef(dealID, sectorID, pieceOffset, size) - return st.writeRef(cid, sectorID, offset, size) - }, - intermediate: st.intermediate, - } - - pr, psize := padreader.New(refst, uint64(size)) - - return sectorID, st.Miner.SealPiece(ctx, psize, pr, sectorID, dealID) + return sectorID, st.Miner.SealPiece(ctx, size, r, sectorID, dealID) } -func (st *SectorBlocks) List() (map[cid.Cid][]api.SealedRef, error) { +func (st *SectorBlocks) List() (map[uint64][]api.SealedRef, error) { res, err := st.keys.Query(query.Query{}) if err != nil { return nil, err @@ -189,9 +120,9 @@ func (st *SectorBlocks) List() (map[cid.Cid][]api.SealedRef, error) { return nil, err } - out := map[cid.Cid][]api.SealedRef{} + out := map[uint64][]api.SealedRef{} for _, ent := range ents { - refCid, err := dshelp.DsKeyToCid(datastore.RawKey(ent.Key)) + dealID, err := DsKeyToDealID(datastore.RawKey(ent.Key)) if err != nil { return nil, err } @@ -201,14 +132,14 @@ func (st *SectorBlocks) List() (map[cid.Cid][]api.SealedRef, error) { return nil, err } - out[refCid] = refs.Refs + out[dealID] = refs.Refs } return out, nil } -func (st *SectorBlocks) GetRefs(k cid.Cid) ([]api.SealedRef, error) { // TODO: track local sectors - ent, err := st.keys.Get(dshelp.CidToDsKey(k)) +func (st *SectorBlocks) GetRefs(dealID uint64) ([]api.SealedRef, error) { // TODO: track local sectors + ent, err := st.keys.Get(DealIDToDsKey(dealID)) if err == datastore.ErrNotFound { err = ErrNotFound } @@ -224,42 +155,16 @@ func (st *SectorBlocks) GetRefs(k cid.Cid) ([]api.SealedRef, error) { // TODO: t return refs.Refs, nil } -func (st *SectorBlocks) GetSize(k cid.Cid) (uint64, error) { - blk, err := st.intermediate.Get(k) - if err == blockstore.ErrNotFound { - refs, err := st.GetRefs(k) - if err != nil { - return 0, err - } - - return uint64(refs[0].Size), nil - } +func (st *SectorBlocks) GetSize(dealID uint64) (uint64, error) { + refs, err := st.GetRefs(dealID) if err != nil { return 0, err } - nd, err := ipld.Decode(blk) - if err != nil { - return 0, err - } - - fsn, err := unixfs.ExtractFSNode(nd) - if err != nil { - return 0, err - } - - return fsn.FileSize(), nil + return uint64(refs[0].Size), nil } -func (st *SectorBlocks) Has(k cid.Cid) (bool, error) { +func (st *SectorBlocks) Has(dealID uint64) (bool, error) { // TODO: ensure sector is still there - return st.keys.Has(dshelp.CidToDsKey(k)) -} - -func (st *SectorBlocks) SealedBlockstore(approveUnseal func() error) *SectorBlockStore { - return &SectorBlockStore{ - intermediate: st.intermediate, - sectorBlocks: st, - approveUnseal: approveUnseal, - } + return st.keys.Has(DealIDToDsKey(dealID)) } diff --git a/storage/sectorblocks/blockstore.go b/storage/sectorblocks/blockstore.go deleted file mode 100644 index 36394c1c8..000000000 --- a/storage/sectorblocks/blockstore.go +++ /dev/null @@ -1,126 +0,0 @@ -package sectorblocks - -import ( - "context" - "io/ioutil" - - blocks "github.com/ipfs/go-block-format" - "github.com/ipfs/go-cid" - blockstore "github.com/ipfs/go-ipfs-blockstore" - logging "github.com/ipfs/go-log/v2" - "golang.org/x/xerrors" - - "github.com/filecoin-project/lotus/api" - "github.com/filecoin-project/lotus/storage/sealing" -) - -var log = logging.Logger("sectorblocks") - -type SectorBlockStore struct { - intermediate blockstore.Blockstore - sectorBlocks *SectorBlocks - - approveUnseal func() error -} - -func (s *SectorBlockStore) DeleteBlock(cid.Cid) error { - panic("not supported") -} -func (s *SectorBlockStore) GetSize(cid.Cid) (int, error) { - panic("not supported") -} - -func (s *SectorBlockStore) Put(blocks.Block) error { - panic("not supported") -} - -func (s *SectorBlockStore) PutMany([]blocks.Block) error { - panic("not supported") -} - -func (s *SectorBlockStore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { - panic("not supported") -} - -func (s *SectorBlockStore) HashOnRead(enabled bool) { - panic("not supported") -} - -func (s *SectorBlockStore) Has(c cid.Cid) (bool, error) { - has, err := s.intermediate.Has(c) - if err != nil { - return false, err - } - if has { - return true, nil - } - - return s.sectorBlocks.Has(c) -} - -func (s *SectorBlockStore) Get(c cid.Cid) (blocks.Block, error) { - val, err := s.intermediate.Get(c) - if err == nil { - return val, nil - } - if err != blockstore.ErrNotFound { - return nil, err - } - - refs, err := s.sectorBlocks.GetRefs(c) - if err != nil { - return nil, err - } - if len(refs) == 0 { - return nil, blockstore.ErrNotFound - } - - // TODO: better strategy (e.g. look for already unsealed) - var best api.SealedRef - var bestSi sealing.SectorInfo - for _, r := range refs { - si, err := s.sectorBlocks.Miner.GetSectorInfo(r.SectorID) - if err != nil { - return nil, xerrors.Errorf("getting sector info: %w", err) - } - if si.State == api.Proving { - best = r - bestSi = si - break - } - } - if bestSi.State == api.UndefinedSectorState { - return nil, xerrors.New("no sealed sector found") - } - - log.Infof("reading block %s from sector %d(+%d;%d)", c, best.SectorID, best.Offset, best.Size) - - r, err := s.sectorBlocks.sb.ReadPieceFromSealedSector( - best.SectorID, - best.Offset, - best.Size, - bestSi.Ticket.TicketBytes, - bestSi.CommD, - ) - if err != nil { - return nil, xerrors.Errorf("unsealing block: %w", err) - } - defer r.Close() - - data, err := ioutil.ReadAll(r) - if err != nil { - return nil, xerrors.Errorf("reading block data: %w", err) - } - if uint64(len(data)) != best.Size { - return nil, xerrors.Errorf("got wrong amount of data: %d != !d", len(data), best.Size) - } - - b, err := blocks.NewBlockWithCid(data, c) - if err != nil { - return nil, xerrors.Errorf("sbs get (%d[%d:%d]): %w", best.SectorID, best.Offset, best.Offset+best.Size, err) - } - - return b, nil -} - -var _ blockstore.Blockstore = &SectorBlockStore{}