Merge pull request #6812 from filecoin-project/fix/ensure-mount-started
Ensure piece store starts before calling lotus accessor methods
This commit is contained in:
commit
fb50d68c32
300
localnet.json
Normal file
300
localnet.json
Normal file
@ -0,0 +1,300 @@
|
|||||||
|
{
|
||||||
|
"NetworkVersion": 13,
|
||||||
|
"Accounts": [
|
||||||
|
{
|
||||||
|
"Type": "account",
|
||||||
|
"Balance": "50000000000000000000000000",
|
||||||
|
"Meta": {
|
||||||
|
"Owner": "t3sszb5eomn3a4y6xt4fo33pqwvo4zpapbkh3ngcfvdzkpcqwt22ojexuncmdtmbuumvmub2xdewfdnfqvocua"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"Miners": [
|
||||||
|
{
|
||||||
|
"ID": "t01000",
|
||||||
|
"Owner": "t3sszb5eomn3a4y6xt4fo33pqwvo4zpapbkh3ngcfvdzkpcqwt22ojexuncmdtmbuumvmub2xdewfdnfqvocua",
|
||||||
|
"Worker": "t3sszb5eomn3a4y6xt4fo33pqwvo4zpapbkh3ngcfvdzkpcqwt22ojexuncmdtmbuumvmub2xdewfdnfqvocua",
|
||||||
|
"PeerId": "12D3KooWDBStySruBY6D7ALYa5FRbeB3uUXfGuvjo7qV1nBYHuhK",
|
||||||
|
"MarketBalance": "0",
|
||||||
|
"PowerBalance": "0",
|
||||||
|
"SectorSize": 2048,
|
||||||
|
"Sectors": [
|
||||||
|
{
|
||||||
|
"CommR": {
|
||||||
|
"/": "bagboea4b5abcbon4d4uxotw2rnjwqkeye4espvvcmkg72jyn7yg67vqbxojjnmqj"
|
||||||
|
},
|
||||||
|
"CommD": {
|
||||||
|
"/": "baga6ea4seaqerikhcfk2tl7bxehydqpmmo2xlrdtq6hp6ya7fs2jakim4lkvklq"
|
||||||
|
},
|
||||||
|
"SectorID": 0,
|
||||||
|
"Deal": {
|
||||||
|
"PieceCID": {
|
||||||
|
"/": "baga6ea4seaqerikhcfk2tl7bxehydqpmmo2xlrdtq6hp6ya7fs2jakim4lkvklq"
|
||||||
|
},
|
||||||
|
"PieceSize": 2048,
|
||||||
|
"VerifiedDeal": false,
|
||||||
|
"Client": "t3sszb5eomn3a4y6xt4fo33pqwvo4zpapbkh3ngcfvdzkpcqwt22ojexuncmdtmbuumvmub2xdewfdnfqvocua",
|
||||||
|
"Provider": "t01000",
|
||||||
|
"Label": "0",
|
||||||
|
"StartEpoch": 0,
|
||||||
|
"EndEpoch": 9001,
|
||||||
|
"StoragePricePerEpoch": "0",
|
||||||
|
"ProviderCollateral": "0",
|
||||||
|
"ClientCollateral": "0"
|
||||||
|
},
|
||||||
|
"ProofType": 5
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"CommR": {
|
||||||
|
"/": "bagboea4b5abca2xijpxm52hc3hm75wuswvlorayaijfph4jyqmavchu4yvwaffb3"
|
||||||
|
},
|
||||||
|
"CommD": {
|
||||||
|
"/": "baga6ea4seaqbqgjroshqe6ssodwd55s6oqxjggeg5alsdnu5bvqxhqy6twrymby"
|
||||||
|
},
|
||||||
|
"SectorID": 1,
|
||||||
|
"Deal": {
|
||||||
|
"PieceCID": {
|
||||||
|
"/": "baga6ea4seaqbqgjroshqe6ssodwd55s6oqxjggeg5alsdnu5bvqxhqy6twrymby"
|
||||||
|
},
|
||||||
|
"PieceSize": 2048,
|
||||||
|
"VerifiedDeal": false,
|
||||||
|
"Client": "t3sszb5eomn3a4y6xt4fo33pqwvo4zpapbkh3ngcfvdzkpcqwt22ojexuncmdtmbuumvmub2xdewfdnfqvocua",
|
||||||
|
"Provider": "t01000",
|
||||||
|
"Label": "1",
|
||||||
|
"StartEpoch": 0,
|
||||||
|
"EndEpoch": 9001,
|
||||||
|
"StoragePricePerEpoch": "0",
|
||||||
|
"ProviderCollateral": "0",
|
||||||
|
"ClientCollateral": "0"
|
||||||
|
},
|
||||||
|
"ProofType": 5
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"CommR": {
|
||||||
|
"/": "bagboea4b5abcan2ok3zovd42rq47aynqafae3plo26m2ldctqotn3myddhvpyizc"
|
||||||
|
},
|
||||||
|
"CommD": {
|
||||||
|
"/": "baga6ea4seaqnzz3cakbkxehphgcym6fs44rqzb3tzyu2au3xeynjwq4c2sdq6pi"
|
||||||
|
},
|
||||||
|
"SectorID": 2,
|
||||||
|
"Deal": {
|
||||||
|
"PieceCID": {
|
||||||
|
"/": "baga6ea4seaqnzz3cakbkxehphgcym6fs44rqzb3tzyu2au3xeynjwq4c2sdq6pi"
|
||||||
|
},
|
||||||
|
"PieceSize": 2048,
|
||||||
|
"VerifiedDeal": false,
|
||||||
|
"Client": "t3sszb5eomn3a4y6xt4fo33pqwvo4zpapbkh3ngcfvdzkpcqwt22ojexuncmdtmbuumvmub2xdewfdnfqvocua",
|
||||||
|
"Provider": "t01000",
|
||||||
|
"Label": "2",
|
||||||
|
"StartEpoch": 0,
|
||||||
|
"EndEpoch": 9001,
|
||||||
|
"StoragePricePerEpoch": "0",
|
||||||
|
"ProviderCollateral": "0",
|
||||||
|
"ClientCollateral": "0"
|
||||||
|
},
|
||||||
|
"ProofType": 5
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"CommR": {
|
||||||
|
"/": "bagboea4b5abcbqzsrqvanvr3dzjkkhhwlplszyslkcysdss6ggsjgwltkxegifrd"
|
||||||
|
},
|
||||||
|
"CommD": {
|
||||||
|
"/": "baga6ea4seaqhfmb5jxhmibyrdnapeoryup7qfklbflyoq7rteh5qgtyfpubsgni"
|
||||||
|
},
|
||||||
|
"SectorID": 3,
|
||||||
|
"Deal": {
|
||||||
|
"PieceCID": {
|
||||||
|
"/": "baga6ea4seaqhfmb5jxhmibyrdnapeoryup7qfklbflyoq7rteh5qgtyfpubsgni"
|
||||||
|
},
|
||||||
|
"PieceSize": 2048,
|
||||||
|
"VerifiedDeal": false,
|
||||||
|
"Client": "t3sszb5eomn3a4y6xt4fo33pqwvo4zpapbkh3ngcfvdzkpcqwt22ojexuncmdtmbuumvmub2xdewfdnfqvocua",
|
||||||
|
"Provider": "t01000",
|
||||||
|
"Label": "3",
|
||||||
|
"StartEpoch": 0,
|
||||||
|
"EndEpoch": 9001,
|
||||||
|
"StoragePricePerEpoch": "0",
|
||||||
|
"ProviderCollateral": "0",
|
||||||
|
"ClientCollateral": "0"
|
||||||
|
},
|
||||||
|
"ProofType": 5
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"CommR": {
|
||||||
|
"/": "bagboea4b5abcb3gs2lfgoarjdfgzrtgphmj2uofumyyfar5hb2p4lzrpnvwkt2b3"
|
||||||
|
},
|
||||||
|
"CommD": {
|
||||||
|
"/": "baga6ea4seaqno55wyjxrday6nu7ojzgc4ixrlcoa3x2rbhevppgiudvb4p4vslq"
|
||||||
|
},
|
||||||
|
"SectorID": 4,
|
||||||
|
"Deal": {
|
||||||
|
"PieceCID": {
|
||||||
|
"/": "baga6ea4seaqno55wyjxrday6nu7ojzgc4ixrlcoa3x2rbhevppgiudvb4p4vslq"
|
||||||
|
},
|
||||||
|
"PieceSize": 2048,
|
||||||
|
"VerifiedDeal": false,
|
||||||
|
"Client": "t3sszb5eomn3a4y6xt4fo33pqwvo4zpapbkh3ngcfvdzkpcqwt22ojexuncmdtmbuumvmub2xdewfdnfqvocua",
|
||||||
|
"Provider": "t01000",
|
||||||
|
"Label": "4",
|
||||||
|
"StartEpoch": 0,
|
||||||
|
"EndEpoch": 9001,
|
||||||
|
"StoragePricePerEpoch": "0",
|
||||||
|
"ProviderCollateral": "0",
|
||||||
|
"ClientCollateral": "0"
|
||||||
|
},
|
||||||
|
"ProofType": 5
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"CommR": {
|
||||||
|
"/": "bagboea4b5abcatmlwursqwqblc2icodqawsjwddcaggozaviy56b3gcyscd6v2ig"
|
||||||
|
},
|
||||||
|
"CommD": {
|
||||||
|
"/": "baga6ea4seaqh7djlk3bny7xhpt3g7sk5prkak2rvefo27pak2j4huozlpwudkey"
|
||||||
|
},
|
||||||
|
"SectorID": 5,
|
||||||
|
"Deal": {
|
||||||
|
"PieceCID": {
|
||||||
|
"/": "baga6ea4seaqh7djlk3bny7xhpt3g7sk5prkak2rvefo27pak2j4huozlpwudkey"
|
||||||
|
},
|
||||||
|
"PieceSize": 2048,
|
||||||
|
"VerifiedDeal": false,
|
||||||
|
"Client": "t3sszb5eomn3a4y6xt4fo33pqwvo4zpapbkh3ngcfvdzkpcqwt22ojexuncmdtmbuumvmub2xdewfdnfqvocua",
|
||||||
|
"Provider": "t01000",
|
||||||
|
"Label": "5",
|
||||||
|
"StartEpoch": 0,
|
||||||
|
"EndEpoch": 9001,
|
||||||
|
"StoragePricePerEpoch": "0",
|
||||||
|
"ProviderCollateral": "0",
|
||||||
|
"ClientCollateral": "0"
|
||||||
|
},
|
||||||
|
"ProofType": 5
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"CommR": {
|
||||||
|
"/": "bagboea4b5abca4eif6xdb33st4tsj6fcxv3jvjxg47sohcuc67ev7pvunwofe7in"
|
||||||
|
},
|
||||||
|
"CommD": {
|
||||||
|
"/": "baga6ea4seaqpwiow5zlecqy6wan7n2x46gtbhrephavubbf562pytqxwjznbmda"
|
||||||
|
},
|
||||||
|
"SectorID": 6,
|
||||||
|
"Deal": {
|
||||||
|
"PieceCID": {
|
||||||
|
"/": "baga6ea4seaqpwiow5zlecqy6wan7n2x46gtbhrephavubbf562pytqxwjznbmda"
|
||||||
|
},
|
||||||
|
"PieceSize": 2048,
|
||||||
|
"VerifiedDeal": false,
|
||||||
|
"Client": "t3sszb5eomn3a4y6xt4fo33pqwvo4zpapbkh3ngcfvdzkpcqwt22ojexuncmdtmbuumvmub2xdewfdnfqvocua",
|
||||||
|
"Provider": "t01000",
|
||||||
|
"Label": "6",
|
||||||
|
"StartEpoch": 0,
|
||||||
|
"EndEpoch": 9001,
|
||||||
|
"StoragePricePerEpoch": "0",
|
||||||
|
"ProviderCollateral": "0",
|
||||||
|
"ClientCollateral": "0"
|
||||||
|
},
|
||||||
|
"ProofType": 5
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"CommR": {
|
||||||
|
"/": "bagboea4b5abcbb4rjyibkmwthuk6xqenlhqulhzjna4o3fjleq5fmvif3cpj2ocy"
|
||||||
|
},
|
||||||
|
"CommD": {
|
||||||
|
"/": "baga6ea4seaqfkvvtg2ndmlzi5v4n2xve3if7rbet3namyirah7rv6uhehgsqqgy"
|
||||||
|
},
|
||||||
|
"SectorID": 7,
|
||||||
|
"Deal": {
|
||||||
|
"PieceCID": {
|
||||||
|
"/": "baga6ea4seaqfkvvtg2ndmlzi5v4n2xve3if7rbet3namyirah7rv6uhehgsqqgy"
|
||||||
|
},
|
||||||
|
"PieceSize": 2048,
|
||||||
|
"VerifiedDeal": false,
|
||||||
|
"Client": "t3sszb5eomn3a4y6xt4fo33pqwvo4zpapbkh3ngcfvdzkpcqwt22ojexuncmdtmbuumvmub2xdewfdnfqvocua",
|
||||||
|
"Provider": "t01000",
|
||||||
|
"Label": "7",
|
||||||
|
"StartEpoch": 0,
|
||||||
|
"EndEpoch": 9001,
|
||||||
|
"StoragePricePerEpoch": "0",
|
||||||
|
"ProviderCollateral": "0",
|
||||||
|
"ClientCollateral": "0"
|
||||||
|
},
|
||||||
|
"ProofType": 5
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"CommR": {
|
||||||
|
"/": "bagboea4b5abcadgsgsbhw65ly6wh3jyl2rwwbl7jdfhsw5thrzuhskdevagnu4bb"
|
||||||
|
},
|
||||||
|
"CommD": {
|
||||||
|
"/": "baga6ea4seaqbxmpt7zgkv2s7h6httk6r3odxzorqptxqfab3llkqgwtwamx62di"
|
||||||
|
},
|
||||||
|
"SectorID": 8,
|
||||||
|
"Deal": {
|
||||||
|
"PieceCID": {
|
||||||
|
"/": "baga6ea4seaqbxmpt7zgkv2s7h6httk6r3odxzorqptxqfab3llkqgwtwamx62di"
|
||||||
|
},
|
||||||
|
"PieceSize": 2048,
|
||||||
|
"VerifiedDeal": false,
|
||||||
|
"Client": "t3sszb5eomn3a4y6xt4fo33pqwvo4zpapbkh3ngcfvdzkpcqwt22ojexuncmdtmbuumvmub2xdewfdnfqvocua",
|
||||||
|
"Provider": "t01000",
|
||||||
|
"Label": "8",
|
||||||
|
"StartEpoch": 0,
|
||||||
|
"EndEpoch": 9001,
|
||||||
|
"StoragePricePerEpoch": "0",
|
||||||
|
"ProviderCollateral": "0",
|
||||||
|
"ClientCollateral": "0"
|
||||||
|
},
|
||||||
|
"ProofType": 5
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"CommR": {
|
||||||
|
"/": "bagboea4b5abcbaim35ed7ff5myro4xkrsbcg2f5nbibsb7pmupxhzo7vl4soiwk4"
|
||||||
|
},
|
||||||
|
"CommD": {
|
||||||
|
"/": "baga6ea4seaqhskbmqw2lllkhmfmfwnmi643tnrjbn62j4z35ofxtfppijor2yky"
|
||||||
|
},
|
||||||
|
"SectorID": 9,
|
||||||
|
"Deal": {
|
||||||
|
"PieceCID": {
|
||||||
|
"/": "baga6ea4seaqhskbmqw2lllkhmfmfwnmi643tnrjbn62j4z35ofxtfppijor2yky"
|
||||||
|
},
|
||||||
|
"PieceSize": 2048,
|
||||||
|
"VerifiedDeal": false,
|
||||||
|
"Client": "t3sszb5eomn3a4y6xt4fo33pqwvo4zpapbkh3ngcfvdzkpcqwt22ojexuncmdtmbuumvmub2xdewfdnfqvocua",
|
||||||
|
"Provider": "t01000",
|
||||||
|
"Label": "9",
|
||||||
|
"StartEpoch": 0,
|
||||||
|
"EndEpoch": 9001,
|
||||||
|
"StoragePricePerEpoch": "0",
|
||||||
|
"ProviderCollateral": "0",
|
||||||
|
"ClientCollateral": "0"
|
||||||
|
},
|
||||||
|
"ProofType": 5
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"NetworkName": "localnet-8fa6bd93-60aa-48d4-b129-f4da5f1e43e7",
|
||||||
|
"VerifregRootKey": {
|
||||||
|
"Type": "multisig",
|
||||||
|
"Balance": "0",
|
||||||
|
"Meta": {
|
||||||
|
"Signers": [
|
||||||
|
"t1ceb34gnsc6qk5dt6n7xg6ycwzasjhbxm3iylkiy"
|
||||||
|
],
|
||||||
|
"Threshold": 1,
|
||||||
|
"VestingDuration": 0,
|
||||||
|
"VestingStart": 0
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"RemainderAccount": {
|
||||||
|
"Type": "multisig",
|
||||||
|
"Balance": "0",
|
||||||
|
"Meta": {
|
||||||
|
"Signers": [
|
||||||
|
"t1ceb34gnsc6qk5dt6n7xg6ycwzasjhbxm3iylkiy"
|
||||||
|
],
|
||||||
|
"Threshold": 1,
|
||||||
|
"VestingDuration": 0,
|
||||||
|
"VestingStart": 0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -4,6 +4,7 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"io"
|
"io"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/go-fil-markets/shared"
|
||||||
"github.com/ipfs/go-cid"
|
"github.com/ipfs/go-cid"
|
||||||
"golang.org/x/xerrors"
|
"golang.org/x/xerrors"
|
||||||
|
|
||||||
@ -12,22 +13,25 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type LotusAccessor interface {
|
type LotusAccessor interface {
|
||||||
Start(ctx context.Context) error
|
|
||||||
FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid) (io.ReadCloser, error)
|
FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid) (io.ReadCloser, error)
|
||||||
GetUnpaddedCARSize(pieceCid cid.Cid) (uint64, error)
|
GetUnpaddedCARSize(ctx context.Context, pieceCid cid.Cid) (uint64, error)
|
||||||
|
Start(ctx context.Context) error
|
||||||
}
|
}
|
||||||
|
|
||||||
type lotusAccessor struct {
|
type lotusAccessor struct {
|
||||||
pieceStore piecestore.PieceStore
|
pieceStore piecestore.PieceStore
|
||||||
rm retrievalmarket.RetrievalProviderNode
|
rm retrievalmarket.RetrievalProviderNode
|
||||||
|
|
||||||
|
readyMgr *shared.ReadyManager
|
||||||
}
|
}
|
||||||
|
|
||||||
var _ LotusAccessor = (*lotusAccessor)(nil)
|
var _ LotusAccessor = (*lotusAccessor)(nil)
|
||||||
|
|
||||||
func NewLotusMountAPI(store piecestore.PieceStore, rm retrievalmarket.RetrievalProviderNode) *lotusAccessor {
|
func NewLotusAccessor(store piecestore.PieceStore, rm retrievalmarket.RetrievalProviderNode) *lotusAccessor {
|
||||||
return &lotusAccessor{
|
return &lotusAccessor{
|
||||||
pieceStore: store,
|
pieceStore: store,
|
||||||
rm: rm,
|
rm: rm,
|
||||||
|
readyMgr: shared.NewReadyManager(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -43,12 +47,17 @@ func (m *lotusAccessor) Start(ctx context.Context) error {
|
|||||||
|
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return xerrors.Errorf("context cancelled waiting for piece store startup: %w", ctx.Err())
|
err := xerrors.Errorf("context cancelled waiting for piece store startup: %w", ctx.Err())
|
||||||
case err := <-ready:
|
if ferr := m.readyMgr.FireReady(err); ferr != nil {
|
||||||
// Piece store has started up, check if there was an error
|
log.Warnw("failed to publish ready event", "err", ferr)
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
return err
|
||||||
|
|
||||||
|
case err := <-ready:
|
||||||
|
if ferr := m.readyMgr.FireReady(err); ferr != nil {
|
||||||
|
log.Warnw("failed to publish ready event", "err", ferr)
|
||||||
|
}
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Piece store has started up successfully
|
// Piece store has started up successfully
|
||||||
@ -56,6 +65,11 @@ func (m *lotusAccessor) Start(ctx context.Context) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (m *lotusAccessor) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid) (io.ReadCloser, error) {
|
func (m *lotusAccessor) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid) (io.ReadCloser, error) {
|
||||||
|
err := m.readyMgr.AwaitReady()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
pieceInfo, err := m.pieceStore.GetPieceInfo(pieceCid)
|
pieceInfo, err := m.pieceStore.GetPieceInfo(pieceCid)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, xerrors.Errorf("failed to fetch pieceInfo for piece %s: %w", pieceCid, err)
|
return nil, xerrors.Errorf("failed to fetch pieceInfo for piece %s: %w", pieceCid, err)
|
||||||
@ -100,7 +114,12 @@ func (m *lotusAccessor) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid
|
|||||||
return nil, lastErr
|
return nil, lastErr
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *lotusAccessor) GetUnpaddedCARSize(pieceCid cid.Cid) (uint64, error) {
|
func (m *lotusAccessor) GetUnpaddedCARSize(ctx context.Context, pieceCid cid.Cid) (uint64, error) {
|
||||||
|
err := m.readyMgr.AwaitReady()
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
|
||||||
pieceInfo, err := m.pieceStore.GetPieceInfo(pieceCid)
|
pieceInfo, err := m.pieceStore.GetPieceInfo(pieceCid)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, xerrors.Errorf("failed to fetch pieceInfo for piece %s: %w", pieceCid, err)
|
return 0, xerrors.Errorf("failed to fetch pieceInfo for piece %s: %w", pieceCid, err)
|
||||||
|
@ -65,7 +65,8 @@ func TestLotusAccessorFetchUnsealedPiece(t *testing.T) {
|
|||||||
rpn := &mockRPN{
|
rpn := &mockRPN{
|
||||||
sectors: mockData,
|
sectors: mockData,
|
||||||
}
|
}
|
||||||
api := NewLotusMountAPI(ps, rpn)
|
api := NewLotusAccessor(ps, rpn)
|
||||||
|
require.NoError(t, api.Start(ctx))
|
||||||
|
|
||||||
// Add deals to piece store
|
// Add deals to piece store
|
||||||
for _, sectorID := range tc.deals {
|
for _, sectorID := range tc.deals {
|
||||||
@ -94,12 +95,14 @@ func TestLotusAccessorFetchUnsealedPiece(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestLotusAccessorGetUnpaddedCARSize(t *testing.T) {
|
func TestLotusAccessorGetUnpaddedCARSize(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
cid1, err := cid.Parse("bafkqaaa")
|
cid1, err := cid.Parse("bafkqaaa")
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
ps := getPieceStore(t)
|
ps := getPieceStore(t)
|
||||||
rpn := &mockRPN{}
|
rpn := &mockRPN{}
|
||||||
api := NewLotusMountAPI(ps, rpn)
|
api := NewLotusAccessor(ps, rpn)
|
||||||
|
require.NoError(t, api.Start(ctx))
|
||||||
|
|
||||||
// Add a deal with data Length 10
|
// Add a deal with data Length 10
|
||||||
dealInfo := piecestore.DealInfo{
|
dealInfo := piecestore.DealInfo{
|
||||||
@ -109,7 +112,7 @@ func TestLotusAccessorGetUnpaddedCARSize(t *testing.T) {
|
|||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
// Check that the data length is correct
|
// Check that the data length is correct
|
||||||
len, err := api.GetUnpaddedCARSize(cid1)
|
len, err := api.GetUnpaddedCARSize(ctx, cid1)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.EqualValues(t, 10, len)
|
require.EqualValues(t, 10, len)
|
||||||
}
|
}
|
||||||
@ -120,14 +123,6 @@ func getPieceStore(t *testing.T) piecestore.PieceStore {
|
|||||||
|
|
||||||
err = ps.Start(context.Background())
|
err = ps.Start(context.Background())
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
ready := make(chan error)
|
|
||||||
ps.OnReady(func(err error) {
|
|
||||||
ready <- err
|
|
||||||
})
|
|
||||||
err = <-ready
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
return ps
|
return ps
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -31,6 +31,10 @@ func NewMockLotusAccessor(ctrl *gomock.Controller) *MockLotusAccessor {
|
|||||||
return mock
|
return mock
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (mr *MockLotusAccessor) Start(_ context.Context) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// EXPECT returns an object that allows the caller to indicate expected use.
|
// EXPECT returns an object that allows the caller to indicate expected use.
|
||||||
func (m *MockLotusAccessor) EXPECT() *MockLotusAccessorMockRecorder {
|
func (m *MockLotusAccessor) EXPECT() *MockLotusAccessorMockRecorder {
|
||||||
return m.recorder
|
return m.recorder
|
||||||
@ -52,30 +56,16 @@ func (mr *MockLotusAccessorMockRecorder) FetchUnsealedPiece(ctx, pieceCid interf
|
|||||||
}
|
}
|
||||||
|
|
||||||
// GetUnpaddedCARSize mocks base method.
|
// GetUnpaddedCARSize mocks base method.
|
||||||
func (m *MockLotusAccessor) GetUnpaddedCARSize(pieceCid cid.Cid) (uint64, error) {
|
func (m *MockLotusAccessor) GetUnpaddedCARSize(ctx context.Context, pieceCid cid.Cid) (uint64, error) {
|
||||||
m.ctrl.T.Helper()
|
m.ctrl.T.Helper()
|
||||||
ret := m.ctrl.Call(m, "GetUnpaddedCARSize", pieceCid)
|
ret := m.ctrl.Call(m, "GetUnpaddedCARSize", ctx, pieceCid)
|
||||||
ret0, _ := ret[0].(uint64)
|
ret0, _ := ret[0].(uint64)
|
||||||
ret1, _ := ret[1].(error)
|
ret1, _ := ret[1].(error)
|
||||||
return ret0, ret1
|
return ret0, ret1
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetUnpaddedCARSize indicates an expected call of GetUnpaddedCARSize.
|
// GetUnpaddedCARSize indicates an expected call of GetUnpaddedCARSize.
|
||||||
func (mr *MockLotusAccessorMockRecorder) GetUnpaddedCARSize(pieceCid interface{}) *gomock.Call {
|
func (mr *MockLotusAccessorMockRecorder) GetUnpaddedCARSize(ctx, pieceCid interface{}) *gomock.Call {
|
||||||
mr.mock.ctrl.T.Helper()
|
mr.mock.ctrl.T.Helper()
|
||||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetUnpaddedCARSize", reflect.TypeOf((*MockLotusAccessor)(nil).GetUnpaddedCARSize), pieceCid)
|
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetUnpaddedCARSize", reflect.TypeOf((*MockLotusAccessor)(nil).GetUnpaddedCARSize), ctx, pieceCid)
|
||||||
}
|
|
||||||
|
|
||||||
// Start mocks base method.
|
|
||||||
func (m *MockLotusAccessor) Start(ctx context.Context) error {
|
|
||||||
m.ctrl.T.Helper()
|
|
||||||
ret := m.ctrl.Call(m, "Start", ctx)
|
|
||||||
ret0, _ := ret[0].(error)
|
|
||||||
return ret0
|
|
||||||
}
|
|
||||||
|
|
||||||
// Start indicates an expected call of Start.
|
|
||||||
func (mr *MockLotusAccessorMockRecorder) Start(ctx interface{}) *gomock.Call {
|
|
||||||
mr.mock.ctrl.T.Helper()
|
|
||||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Start", reflect.TypeOf((*MockLotusAccessor)(nil).Start), ctx)
|
|
||||||
}
|
}
|
||||||
|
@ -75,8 +75,8 @@ func (l *LotusMount) Close() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *LotusMount) Stat(_ context.Context) (mount.Stat, error) {
|
func (l *LotusMount) Stat(ctx context.Context) (mount.Stat, error) {
|
||||||
size, err := l.Api.GetUnpaddedCARSize(l.PieceCid)
|
size, err := l.Api.GetUnpaddedCARSize(ctx, l.PieceCid)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return mount.Stat{}, xerrors.Errorf("failed to fetch piece size for piece %s: %w", l.PieceCid, err)
|
return mount.Stat{}, xerrors.Errorf("failed to fetch piece size for piece %s: %w", l.PieceCid, err)
|
||||||
}
|
}
|
||||||
|
@ -29,7 +29,7 @@ func TestLotusMount(t *testing.T) {
|
|||||||
mockLotusMountAPI := mock_dagstore.NewMockLotusAccessor(mockCtrl)
|
mockLotusMountAPI := mock_dagstore.NewMockLotusAccessor(mockCtrl)
|
||||||
mockLotusMountAPI.EXPECT().FetchUnsealedPiece(gomock.Any(), cid).Return(&readCloser{ioutil.NopCloser(strings.NewReader("testing"))}, nil).Times(1)
|
mockLotusMountAPI.EXPECT().FetchUnsealedPiece(gomock.Any(), cid).Return(&readCloser{ioutil.NopCloser(strings.NewReader("testing"))}, nil).Times(1)
|
||||||
mockLotusMountAPI.EXPECT().FetchUnsealedPiece(gomock.Any(), cid).Return(&readCloser{ioutil.NopCloser(strings.NewReader("testing"))}, nil).Times(1)
|
mockLotusMountAPI.EXPECT().FetchUnsealedPiece(gomock.Any(), cid).Return(&readCloser{ioutil.NopCloser(strings.NewReader("testing"))}, nil).Times(1)
|
||||||
mockLotusMountAPI.EXPECT().GetUnpaddedCARSize(cid).Return(uint64(100), nil).Times(1)
|
mockLotusMountAPI.EXPECT().GetUnpaddedCARSize(ctx, cid).Return(uint64(100), nil).Times(1)
|
||||||
|
|
||||||
mnt, err := NewLotusMount(cid, mockLotusMountAPI)
|
mnt, err := NewLotusMount(cid, mockLotusMountAPI)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
@ -93,6 +93,7 @@ func TestLotusMountDeserialize(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestLotusMountRegistration(t *testing.T) {
|
func TestLotusMountRegistration(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
bgen := blocksutil.NewBlockGenerator()
|
bgen := blocksutil.NewBlockGenerator()
|
||||||
cid := bgen.Next().Cid()
|
cid := bgen.Next().Cid()
|
||||||
|
|
||||||
@ -113,7 +114,7 @@ func TestLotusMountRegistration(t *testing.T) {
|
|||||||
mnt, err := registry.Instantiate(u)
|
mnt, err := registry.Instantiate(u)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
mockLotusMountAPI.EXPECT().GetUnpaddedCARSize(cid).Return(uint64(100), nil).Times(1)
|
mockLotusMountAPI.EXPECT().GetUnpaddedCARSize(ctx, cid).Return(uint64(100), nil).Times(1)
|
||||||
stat, err := mnt.Stat(context.Background())
|
stat, err := mnt.Stat(context.Background())
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.EqualValues(t, 100, stat.Size)
|
require.EqualValues(t, 100, stat.Size)
|
||||||
|
@ -96,20 +96,13 @@ func NewDagStoreWrapper(cfg MarketDAGStoreConfig, mountApi LotusAccessor) (*Wrap
|
|||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ds *Wrapper) Start(ctx context.Context) error {
|
func (ds *Wrapper) Start(ctx context.Context) {
|
||||||
ds.ctx, ds.cancel = context.WithCancel(ctx)
|
ds.ctx, ds.cancel = context.WithCancel(ctx)
|
||||||
|
|
||||||
err := ds.mountApi.Start(ctx)
|
|
||||||
if err != nil {
|
|
||||||
return xerrors.Errorf("failed to start mount API: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
ds.backgroundWg.Add(1)
|
ds.backgroundWg.Add(1)
|
||||||
|
|
||||||
// Run a go-routine to handle failures, traces and GC
|
// Run a go-routine to handle failures, traces and GC
|
||||||
go ds.background()
|
go ds.background()
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ds *Wrapper) background() {
|
func (ds *Wrapper) background() {
|
||||||
|
@ -189,7 +189,7 @@ func (m mockLotusMount) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid
|
|||||||
panic("implement me")
|
panic("implement me")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m mockLotusMount) GetUnpaddedCARSize(pieceCid cid.Cid) (uint64, error) {
|
func (m mockLotusMount) GetUnpaddedCARSize(ctx context.Context, pieceCid cid.Cid) (uint64, error) {
|
||||||
panic("implement me")
|
panic("implement me")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -147,6 +147,7 @@ func ConfigStorageMiner(c interface{}) Option {
|
|||||||
Override(new(dtypes.RetrievalPricingFunc), modules.RetrievalPricingFunc(cfg.Dealmaking)),
|
Override(new(dtypes.RetrievalPricingFunc), modules.RetrievalPricingFunc(cfg.Dealmaking)),
|
||||||
|
|
||||||
// DAG Store
|
// DAG Store
|
||||||
|
Override(new(dagstore.LotusAccessor), modules.NewLotusAccessor),
|
||||||
Override(new(*dagstore.Wrapper), modules.DagStoreWrapper),
|
Override(new(*dagstore.Wrapper), modules.DagStoreWrapper),
|
||||||
|
|
||||||
// Markets (retrieval)
|
// Markets (retrieval)
|
||||||
|
@ -576,12 +576,28 @@ func BasicDealFilter(user dtypes.StorageDealFilter) func(onlineOk dtypes.Conside
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func NewLotusAccessor(lc fx.Lifecycle,
|
||||||
|
pieceStore dtypes.ProviderPieceStore,
|
||||||
|
rpn retrievalmarket.RetrievalProviderNode,
|
||||||
|
) (dagstore.LotusAccessor, error) {
|
||||||
|
mountApi := dagstore.NewLotusAccessor(pieceStore, rpn)
|
||||||
|
lc.Append(fx.Hook{
|
||||||
|
OnStart: func(ctx context.Context) error {
|
||||||
|
return mountApi.Start(ctx)
|
||||||
|
},
|
||||||
|
OnStop: func(context.Context) error {
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
return mountApi, nil
|
||||||
|
}
|
||||||
|
|
||||||
func DagStoreWrapper(
|
func DagStoreWrapper(
|
||||||
lc fx.Lifecycle,
|
lc fx.Lifecycle,
|
||||||
ds dtypes.MetadataDS,
|
ds dtypes.MetadataDS,
|
||||||
r repo.LockedRepo,
|
r repo.LockedRepo,
|
||||||
pieceStore dtypes.ProviderPieceStore,
|
lotusAccessor dagstore.LotusAccessor,
|
||||||
rpn retrievalmarket.RetrievalProviderNode,
|
|
||||||
) (*dagstore.Wrapper, error) {
|
) (*dagstore.Wrapper, error) {
|
||||||
dagStoreDir := filepath.Join(r.Path(), dagStore)
|
dagStoreDir := filepath.Join(r.Path(), dagStore)
|
||||||
dagStoreDS := namespace.Wrap(ds, datastore.NewKey("/dagstore/provider"))
|
dagStoreDS := namespace.Wrap(ds, datastore.NewKey("/dagstore/provider"))
|
||||||
@ -591,15 +607,16 @@ func DagStoreWrapper(
|
|||||||
Datastore: dagStoreDS,
|
Datastore: dagStoreDS,
|
||||||
GCInterval: 5 * time.Minute,
|
GCInterval: 5 * time.Minute,
|
||||||
}
|
}
|
||||||
mountApi := dagstore.NewLotusMountAPI(pieceStore, rpn)
|
|
||||||
dsw, err := dagstore.NewDagStoreWrapper(cfg, mountApi)
|
dsw, err := dagstore.NewDagStoreWrapper(cfg, lotusAccessor)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, xerrors.Errorf("failed to create DAG store wrapper: %w", err)
|
return nil, xerrors.Errorf("failed to create DAG store wrapper: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
lc.Append(fx.Hook{
|
lc.Append(fx.Hook{
|
||||||
OnStart: func(ctx context.Context) error {
|
OnStart: func(ctx context.Context) error {
|
||||||
return dsw.Start(ctx)
|
dsw.Start(ctx)
|
||||||
|
return nil
|
||||||
},
|
},
|
||||||
OnStop: func(context.Context) error {
|
OnStop: func(context.Context) error {
|
||||||
return dsw.Close()
|
return dsw.Close()
|
||||||
|
Loading…
Reference in New Issue
Block a user