diff --git a/chain/deals/client.go b/chain/deals/client.go index b99422e11..a64a6abcc 100644 --- a/chain/deals/client.go +++ b/chain/deals/client.go @@ -2,6 +2,7 @@ package deals import ( "context" + "github.com/filecoin-project/lotus/lib/statestore" "github.com/filecoin-project/lotus/node/impl/full" @@ -226,7 +227,6 @@ func (c *Client) Start(ctx context.Context, p ClientDealProposal) (cid.Cid, erro s, err := c.h.NewStream(ctx, p.MinerID, DealProtocolID) if err != nil { - s.Reset() return cid.Undef, xerrors.Errorf("connecting to storage provider failed: %w", err) } diff --git a/cmd/lotus-storage-miner/run.go b/cmd/lotus-storage-miner/run.go index 1b2a53a80..4142233fd 100644 --- a/cmd/lotus-storage-miner/run.go +++ b/cmd/lotus-storage-miner/run.go @@ -8,13 +8,12 @@ import ( "os/signal" "syscall" - "github.com/filecoin-project/lotus/build" - "github.com/multiformats/go-multiaddr" "golang.org/x/xerrors" "gopkg.in/urfave/cli.v2" "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/build" lcli "github.com/filecoin-project/lotus/cli" "github.com/filecoin-project/lotus/lib/auth" "github.com/filecoin-project/lotus/lib/jsonrpc" diff --git a/lib/sectorbuilder/sectorbuilder_test.go b/lib/sectorbuilder/sectorbuilder_test.go index 5ce934f04..28bbc4ebf 100644 --- a/lib/sectorbuilder/sectorbuilder_test.go +++ b/lib/sectorbuilder/sectorbuilder_test.go @@ -12,7 +12,7 @@ import ( const sectorSize = 1024 func TestSealAndVerify(t *testing.T) { - t.Skip("this is slow") + //t.Skip("this is slow") //os.Setenv("BELLMAN_NO_GPU", "1") build.SectorSizes = []uint64{sectorSize} diff --git a/node/impl/client/client.go b/node/impl/client/client.go index 17ab26731..7128cbb22 100644 --- a/node/impl/client/client.go +++ b/node/impl/client/client.go @@ -58,7 +58,7 @@ func (a *API) ClientStartDeal(ctx context.Context, data cid.Cid, miner address.A // TODO: make this a param self, err := a.WalletDefaultAddress(ctx) if err != nil { - return nil, err + return nil, xerrors.Errorf("failed to get default address: %w", err) } // get miner peerID @@ -70,11 +70,15 @@ func (a *API) ClientStartDeal(ctx context.Context, data cid.Cid, miner address.A r, err := a.StateCall(ctx, msg, nil) if err != nil { - return nil, err + return nil, xerrors.Errorf("failed getting peer ID: %w", err) } + if r.ExitCode != 0 { + return nil, xerrors.Errorf("call to get peer ID for miner failed: exit code %d", r.ExitCode) + } + pid, err := peer.IDFromBytes(r.Return) if err != nil { - return nil, err + return nil, xerrors.Errorf("parsing peer ID wrong: %w", err) } proposal := deals.ClientDealProposal{ @@ -88,7 +92,12 @@ func (a *API) ClientStartDeal(ctx context.Context, data cid.Cid, miner address.A } c, err := a.DealClient.Start(ctx, proposal) - return &c, err + // TODO: send updated voucher with PaymentVerifySector for cheaper validation (validate the sector the miner sent us first!) + if err != nil { + return nil, xerrors.Errorf("failed to start deal: %w", err) + } + + return &c, nil } func (a *API) ClientListDeals(ctx context.Context) ([]api.DealInfo, error) { @@ -177,7 +186,11 @@ func (a *API) ClientImport(ctx context.Context, path string) (cid.Cid, error) { return cid.Undef, err } - return nd.Cid(), bufferedDS.Commit() + if err := bufferedDS.Commit(); err != nil { + return cid.Undef, err + } + + return nd.Cid(), nil } func (a *API) ClientImportLocal(ctx context.Context, f io.Reader) (cid.Cid, error) { diff --git a/node/modules/testing/genesis.go b/node/modules/testing/genesis.go index b53e33319..6c81f68ba 100644 --- a/node/modules/testing/genesis.go +++ b/node/modules/testing/genesis.go @@ -25,7 +25,7 @@ import ( var glog = logging.Logger("genesis") -func MakeGenesisMem(out io.Writer) func(bs dtypes.ChainBlockstore, w *wallet.Wallet) modules.Genesis { +func MakeGenesisMem(out io.Writer, minerPid peer.ID) func(bs dtypes.ChainBlockstore, w *wallet.Wallet) modules.Genesis { return func(bs dtypes.ChainBlockstore, w *wallet.Wallet) modules.Genesis { return func() (*types.BlockHeader, error) { glog.Warn("Generating new random genesis block, note that this SHOULD NOT happen unless you are setting up new network") @@ -38,7 +38,7 @@ func MakeGenesisMem(out io.Writer) func(bs dtypes.ChainBlockstore, w *wallet.Wal gmc := &gen.GenMinerCfg{ Owners: []address.Address{w}, Workers: []address.Address{w}, - PeerIDs: []peer.ID{"peerID 1"}, + PeerIDs: []peer.ID{minerPid}, } alloc := map[address.Address]types.BigInt{ w: types.FromFil(10000), diff --git a/node/node_test.go b/node/node_test.go index b035a7926..74833227b 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -31,15 +31,12 @@ import ( "github.com/filecoin-project/lotus/node/repo" ) -func testStorageNode(ctx context.Context, t *testing.T, waddr address.Address, act address.Address, tnd test.TestNode) test.TestStorageNode { +func testStorageNode(ctx context.Context, t *testing.T, waddr address.Address, act address.Address, pk crypto.PrivKey, tnd test.TestNode) test.TestStorageNode { r := repo.NewMemory(nil) lr, err := r.Lock(repo.RepoStorageMiner) require.NoError(t, err) - pk, _, err := crypto.GenerateEd25519Key(rand.Reader) - require.NoError(t, err) - ks, err := lr.KeyStore() require.NoError(t, err) @@ -115,12 +112,18 @@ func builder(t *testing.T, nFull int, storage []int) ([]test.TestNode, []test.Te fulls := make([]test.TestNode, nFull) storers := make([]test.TestStorageNode, len(storage)) + pk, _, err := crypto.GenerateEd25519Key(rand.Reader) + require.NoError(t, err) + + minerPid, err := peer.IDFromPrivateKey(pk) + require.NoError(t, err) + var genbuf bytes.Buffer for i := 0; i < nFull; i++ { var genesis node.Option if i == 0 { - genesis = node.Override(new(modules.Genesis), modtest.MakeGenesisMem(&genbuf)) + genesis = node.Override(new(modules.Genesis), modtest.MakeGenesisMem(&genbuf, minerPid)) } else { genesis = node.Override(new(modules.Genesis), modules.LoadGenesis(genbuf.Bytes())) } @@ -171,7 +174,7 @@ func builder(t *testing.T, nFull int, storage []int) ([]test.TestNode, []test.Te genMiner, err := address.NewFromString("t0101") require.NoError(t, err) - storers[i] = testStorageNode(ctx, t, wa, genMiner, f) + storers[i] = testStorageNode(ctx, t, wa, genMiner, pk, f) } if err := mn.LinkAll(); err != nil { @@ -221,3 +224,7 @@ func rpcBuilder(t *testing.T, nFull int, storage []int) ([]test.TestNode, []test func TestAPIRPC(t *testing.T) { test.TestApis(t, rpcBuilder) } + +func TestAPIDealFlow(t *testing.T) { + test.TestDealFlow(t, builder) +} diff --git a/storage/miner.go b/storage/miner.go index ae93e78f5..72a6a0a8b 100644 --- a/storage/miner.go +++ b/storage/miner.go @@ -2,9 +2,10 @@ package storage import ( "context" + "sync" + "github.com/filecoin-project/lotus/lib/statestore" "github.com/ipfs/go-datastore/namespace" - "sync" "github.com/ipfs/go-cid" "github.com/ipfs/go-datastore" diff --git a/storage/post.go b/storage/post.go index 3fe45d24c..ff21c48f7 100644 --- a/storage/post.go +++ b/storage/post.go @@ -2,9 +2,10 @@ package storage import ( "context" + "time" + "github.com/ipfs/go-cid" "go.opencensus.io/trace" - "time" "golang.org/x/xerrors" diff --git a/storage/sector/store.go b/storage/sector/store.go index 8b7f59d5c..453bb1c64 100644 --- a/storage/sector/store.go +++ b/storage/sector/store.go @@ -7,6 +7,7 @@ import ( "io" "math/bits" "sync" + "testing/iotest" "github.com/filecoin-project/go-sectorbuilder/sealing_state" "github.com/ipfs/go-datastore" @@ -59,9 +60,24 @@ func (s *Store) SectorStatus(sid uint64) (*sectorbuilder.SectorSealingStatus, er } func (s *Store) AddPiece(ref string, size uint64, r io.Reader, dealIDs ...uint64) (sectorID uint64, err error) { - sectorID, err = s.sb.AddPiece(ref, size, r) + padSize := computePaddedSize(size) + + buf := make([]byte, padSize) + r = iotest.NewReadLogger("UNIX FILE", r) + n, err := io.ReadFull(r, buf) if err != nil { - return 0, err + return 0, xerrors.Errorf("failed a bad thing: %w", err) + } + if uint64(n) != size { + panic("bad bad") + } + + bufr := bytes.NewReader(buf) + //r = io.MultiReader(r, io.LimitReader(nullReader{}, int64(padSize-size))) + + sectorID, err = s.sb.AddPiece(ref, padSize, bufr) + if err != nil { + return 0, xerrors.Errorf("sector store AddPiece call failed: %w", err) } s.dealsLk.Lock() diff --git a/storage/sectorblocks/blocks.go b/storage/sectorblocks/blocks.go index 1d1289def..d8666b41c 100644 --- a/storage/sectorblocks/blocks.go +++ b/storage/sectorblocks/blocks.go @@ -4,7 +4,6 @@ import ( "bytes" "context" "errors" - "io" "sync" "github.com/ipfs/go-cid" @@ -20,7 +19,6 @@ import ( "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/lib/cborrpc" - "github.com/filecoin-project/lotus/lib/padreader" "github.com/filecoin-project/lotus/lib/sectorbuilder" "github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/storage/sector" @@ -132,9 +130,6 @@ func (r *refStorer) Read(p []byte) (n int, err error) { for { data, offset, nd, err := r.blockReader.ReadBlock(context.TODO()) if err != nil { - if err == io.EOF { - return 0, io.EOF - } return 0, xerrors.Errorf("reading block: %w", err) } @@ -173,9 +168,7 @@ func (st *SectorBlocks) AddUnixfsPiece(ref cid.Cid, r UnixfsReader, dealID uint6 intermediate: st.intermediate, } - pr, psize := padreader.New(r, uint64(size)) - - return st.Store.AddPiece(refst.pieceRef, psize, pr, dealID) + return st.Store.AddPiece(refst.pieceRef, uint64(size), refst, dealID) } func (st *SectorBlocks) List() (map[cid.Cid][]api.SealedRef, error) {