diff --git a/build/paramfetch.go b/build/paramfetch.go index 5b1f4068d..55dfd62bc 100644 --- a/build/paramfetch.go +++ b/build/paramfetch.go @@ -93,6 +93,7 @@ func (ft *fetch) maybeFetchAsync(name string, info paramFile) { func (ft *fetch) checkFile(path string, info paramFile) error { if os.Getenv("TRUST_PARAMS") == "1" { log.Warn("Assuming parameter files are ok. DO NOT USE IN PRODUCTION") + return nil } f, err := os.Open(path) diff --git a/build/params.go b/build/params.go index 22e1ece3c..cd2fc738c 100644 --- a/build/params.go +++ b/build/params.go @@ -37,7 +37,7 @@ const PaymentChannelClosingDelay = 6 * 60 * 2 // six hours // Consensus / Network // Seconds -const BlockDelay = 10 +const BlockDelay = 2 // Seconds const AllowableClockDrift = BlockDelay * 2 diff --git a/chain/deals/client_utils.go b/chain/deals/client_utils.go index 135f696a7..7cdd77192 100644 --- a/chain/deals/client_utils.go +++ b/chain/deals/client_utils.go @@ -80,7 +80,7 @@ func (c *Client) readStorageDealResp(deal ClientDeal) (*Response, error) { // TODO: verify signature if resp.Response.Proposal != deal.ProposalCid { - return nil, xerrors.New("miner responded to a wrong proposal") + return nil, xerrors.Errorf("miner responded to a wrong proposal: %s != %s", resp.Response.Proposal, deal.ProposalCid) } return &resp.Response, nil diff --git a/chain/deals/provider.go b/chain/deals/provider.go index ecabc7d13..46726298c 100644 --- a/chain/deals/provider.go +++ b/chain/deals/provider.go @@ -88,7 +88,7 @@ func NewProvider(ds dtypes.MetadataDS, sminer *storage.Miner, secb *sectorblocks secb: secb, pricePerByteBlock: types.NewInt(3), // TODO: allow setting - minPieceSize: 1, + minPieceSize: 256, // TODO: allow setting (BUT KEEP MIN 256! (because of how we fill sectors up)) conns: map[cid.Cid]inet.Stream{}, diff --git a/gen/main.go b/gen/main.go index 296b7bbf4..4c0c2167a 100644 --- a/gen/main.go +++ b/gen/main.go @@ -13,6 +13,7 @@ import ( "github.com/filecoin-project/lotus/paych" "github.com/filecoin-project/lotus/retrieval" "github.com/filecoin-project/lotus/storage" + "github.com/filecoin-project/lotus/storage/sector" ) func main() { @@ -158,4 +159,12 @@ func main() { fmt.Println(err) os.Exit(1) } + + err = gen.WriteTupleEncodersToFile("./storage/sector/cbor_gen.go", "sector", + sector.DealMapping{}, + ) + if err != nil { + fmt.Println(err) + os.Exit(1) + } } diff --git a/lib/padreader/padreader_test.go b/lib/padreader/padreader_test.go new file mode 100644 index 000000000..bf8464806 --- /dev/null +++ b/lib/padreader/padreader_test.go @@ -0,0 +1,12 @@ +package padreader + +import ( + "gotest.tools/assert" + "testing" +) + +func TestComputePaddedSize(t *testing.T) { + assert.Equal(t, uint64(1040384), PaddedSize(1000000)) + assert.Equal(t, uint64(1016), PaddedSize(548)) + assert.Equal(t, uint64(4064), PaddedSize(2048)) +} diff --git a/lib/sectorbuilder/sectorbuilder.go b/lib/sectorbuilder/sectorbuilder.go index 58f37cf56..7f09fa81b 100644 --- a/lib/sectorbuilder/sectorbuilder.go +++ b/lib/sectorbuilder/sectorbuilder.go @@ -40,6 +40,7 @@ const CommLen = sectorbuilder.CommitmentBytesLen type SectorBuilder struct { handle unsafe.Pointer + ssize uint64 Miner address.Address @@ -71,7 +72,9 @@ func New(cfg *Config) (*SectorBuilder, error) { } return &SectorBuilder{ - handle: sbp, + handle: sbp, + ssize: cfg.SectorSize, + Miner: cfg.Miner, rateLimit: make(chan struct{}, cfg.WorkerThreads-PoStReservedWorkers), }, nil @@ -172,6 +175,10 @@ func (sb *SectorBuilder) GeneratePoSt(sectorInfo SortedSectorInfo, challengeSeed return sectorbuilder.GeneratePoSt(sb.handle, sectorInfo, challengeSeed, faults) } +func (sb *SectorBuilder) SectorSize() uint64 { + return sb.ssize +} + var UserBytesForSectorSize = sectorbuilder.GetMaxUserBytesPerStagedSector func VerifySeal(sectorSize uint64, commR, commD []byte, proverID address.Address, ticket []byte, seed []byte, sectorID uint64, proof []byte) (bool, error) { @@ -237,7 +244,7 @@ func toReadableFile(r io.Reader, n int64) (*os.File, func() error, error) { } err := w.Close() - if werr == nil { + if werr == nil && err != nil { werr = err log.Warnf("toReadableFile: close error: %+v", err) return diff --git a/storage/garbage.go b/storage/garbage.go index 24af346ec..8e3cf1a10 100644 --- a/storage/garbage.go +++ b/storage/garbage.go @@ -16,46 +16,13 @@ import ( "github.com/filecoin-project/lotus/lib/sectorbuilder" ) -func (m *Miner) StoreGarbageData(_ context.Context) error { - ctx := context.TODO() - ssize, err := m.SectorSize(ctx) - if err != nil { - return xerrors.Errorf("failed to get miner sector size: %w", err) - } - go func() { - size := sectorbuilder.UserBytesForSectorSize(ssize) - - /*// Add market funds - smsg, err := m.api.MpoolPushMessage(ctx, &types.Message{ - To: actors.StorageMarketAddress, - From: m.worker, - Value: types.NewInt(size), - GasPrice: types.NewInt(0), - GasLimit: types.NewInt(1000000), - Method: actors.SMAMethods.AddBalance, - }) - if err != nil { - log.Error(err) - return - } - - r, err := m.api.StateWaitMsg(ctx, smsg.Cid()) - if err != nil { - log.Error(err) - return - } - - if r.Receipt.ExitCode != 0 { - log.Error(xerrors.Errorf("adding funds to storage miner market actor failed: exit %d", r.Receipt.ExitCode)) - return - }*/ - // Publish a deal - - // TODO: Maybe cache +// TODO: expected sector ID +func (m *Miner) storeGarbage(ctx context.Context, sizes ...uint64) ([]uint64, error) { + deals := make([]actors.StorageDeal, len(sizes)) + for i, size := range sizes { commP, err := sectorbuilder.GeneratePieceCommitment(io.LimitReader(rand.New(rand.NewSource(42)), int64(size)), size) if err != nil { - log.Error(err) - return + return nil, err } sdp := actors.StorageDealProposal{ @@ -72,66 +39,86 @@ func (m *Miner) StoreGarbageData(_ context.Context) error { } if err := api.SignWith(ctx, m.api.WalletSign, m.worker, &sdp); err != nil { - log.Error(xerrors.Errorf("signing storage deal failed: ", err)) - return + return nil, xerrors.Errorf("signing storage deal failed: ", err) } storageDeal := actors.StorageDeal{ Proposal: sdp, } if err := api.SignWith(ctx, m.api.WalletSign, m.worker, &storageDeal); err != nil { - log.Error(xerrors.Errorf("signing storage deal failed: ", err)) - return + return nil, xerrors.Errorf("signing storage deal failed: ", err) } - params, err := actors.SerializeParams(&actors.PublishStorageDealsParams{ - Deals: []actors.StorageDeal{storageDeal}, - }) - if err != nil { - log.Error(xerrors.Errorf("serializing PublishStorageDeals params failed: ", err)) - } + deals[i] = storageDeal + } - // TODO: We may want this to happen after fetching data - smsg, err := m.api.MpoolPushMessage(ctx, &types.Message{ - To: actors.StorageMarketAddress, - From: m.worker, - Value: types.NewInt(0), - GasPrice: types.NewInt(0), - GasLimit: types.NewInt(1000000), - Method: actors.SMAMethods.PublishStorageDeals, - Params: params, - }) - if err != nil { - log.Error(err) - return - } - r, err := m.api.StateWaitMsg(ctx, smsg.Cid()) - if err != nil { - log.Error(err) - return - } - if r.Receipt.ExitCode != 0 { - log.Error(xerrors.Errorf("publishing deal failed: exit %d", r.Receipt.ExitCode)) - } - var resp actors.PublishStorageDealResponse - if err := resp.UnmarshalCBOR(bytes.NewReader(r.Receipt.Return)); err != nil { - log.Error(err) - return - } - if len(resp.DealIDs) != 1 { - log.Error("got unexpected number of DealIDs from") - return - } + params, aerr := actors.SerializeParams(&actors.PublishStorageDealsParams{ + Deals: deals, + }) + if aerr != nil { + return nil, xerrors.Errorf("serializing PublishStorageDeals params failed: ", aerr) + } + // TODO: We may want this to happen after fetching data + smsg, err := m.api.MpoolPushMessage(ctx, &types.Message{ + To: actors.StorageMarketAddress, + From: m.worker, + Value: types.NewInt(0), + GasPrice: types.NewInt(0), + GasLimit: types.NewInt(1000000), + Method: actors.SMAMethods.PublishStorageDeals, + Params: params, + }) + if err != nil { + return nil, err + } + r, err := m.api.StateWaitMsg(ctx, smsg.Cid()) + if err != nil { + return nil, err + } + if r.Receipt.ExitCode != 0 { + log.Error(xerrors.Errorf("publishing deal failed: exit %d", r.Receipt.ExitCode)) + } + var resp actors.PublishStorageDealResponse + if err := resp.UnmarshalCBOR(bytes.NewReader(r.Receipt.Return)); err != nil { + return nil, err + } + if len(resp.DealIDs) != len(sizes) { + return nil, xerrors.New("got unexpected number of DealIDs from PublishStorageDeals") + } + + sectorIDs := make([]uint64, len(sizes)) + + for i, size := range sizes { name := fmt.Sprintf("fake-file-%d", rand.Intn(100000000)) - sectorId, err := m.secst.AddPiece(name, size, io.LimitReader(rand.New(rand.NewSource(42)), int64(size)), resp.DealIDs[0]) + sectorID, err := m.secst.AddPiece(name, size, io.LimitReader(rand.New(rand.NewSource(42)), int64(size)), resp.DealIDs[i]) if err != nil { - log.Error(err) + return nil, err + } + + sectorIDs[i] = sectorID + } + + return sectorIDs, nil +} + +func (m *Miner) StoreGarbageData(_ context.Context) error { + ctx := context.TODO() + ssize, err := m.SectorSize(ctx) + if err != nil { + return xerrors.Errorf("failed to get miner sector size: %w", err) + } + go func() { + size := sectorbuilder.UserBytesForSectorSize(ssize) + + sids, err := m.storeGarbage(ctx, size) + if err != nil { + log.Errorf("%+v", err) return } - if err := m.SealSector(context.TODO(), sectorId); err != nil { - log.Error(err) + if err := m.SealSector(context.TODO(), sids[0]); err != nil { + log.Errorf("%+v", err) return } }() diff --git a/storage/sealing.go b/storage/sealing.go index 625a35c1b..637eb8da2 100644 --- a/storage/sealing.go +++ b/storage/sealing.go @@ -71,7 +71,7 @@ func (m *Miner) onSectorIncoming(sector *SectorInfo) { go func() { select { case m.sectorUpdated <- sectorUpdate{ - newState: api.Unsealed, + newState: api.Packing, id: sector.SectorID, }: case <-m.stop: @@ -102,6 +102,8 @@ func (m *Miner) onSectorUpdated(ctx context.Context, update sectorUpdate) { } switch update.newState { + case api.Packing: + m.handle(ctx, sector, m.finishPacking, api.Unsealed) case api.Unsealed: m.handle(ctx, sector, m.sealPreCommit, api.PreCommitting) case api.PreCommitting: diff --git a/storage/sector/cbor_gen.go b/storage/sector/cbor_gen.go new file mode 100644 index 000000000..11a2365f1 --- /dev/null +++ b/storage/sector/cbor_gen.go @@ -0,0 +1,119 @@ +package sector + +import ( + "fmt" + "io" + + cbg "github.com/whyrusleeping/cbor-gen" + xerrors "golang.org/x/xerrors" +) + +/* This file was generated by github.com/whyrusleeping/cbor-gen */ + +var _ = xerrors.Errorf + +func (t *DealMapping) MarshalCBOR(w io.Writer) error { + if t == nil { + _, err := w.Write(cbg.CborNull) + return err + } + if _, err := w.Write([]byte{131}); err != nil { + return err + } + + // t.t.DealIDs ([]uint64) (slice) + if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajArray, uint64(len(t.DealIDs)))); err != nil { + return err + } + for _, v := range t.DealIDs { + if err := cbg.CborWriteHeader(w, cbg.MajUnsignedInt, v); err != nil { + return err + } + } + + // t.t.Allocated (uint64) (uint64) + if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, uint64(t.Allocated))); err != nil { + return err + } + + // t.t.Committed (bool) (bool) + if err := cbg.WriteBool(w, t.Committed); err != nil { + return err + } + return nil +} + +func (t *DealMapping) UnmarshalCBOR(r io.Reader) error { + br := cbg.GetPeeker(r) + + maj, extra, err := cbg.CborReadHeader(br) + if err != nil { + return err + } + if maj != cbg.MajArray { + return fmt.Errorf("cbor input should be of type array") + } + + if extra != 3 { + return fmt.Errorf("cbor input had wrong number of fields") + } + + // t.t.DealIDs ([]uint64) (slice) + + maj, extra, err = cbg.CborReadHeader(br) + if err != nil { + return err + } + if extra > 8192 { + return fmt.Errorf("t.DealIDs: array too large (%d)", extra) + } + + if maj != cbg.MajArray { + return fmt.Errorf("expected cbor array") + } + if extra > 0 { + t.DealIDs = make([]uint64, extra) + } + for i := 0; i < int(extra); i++ { + + maj, val, err := cbg.CborReadHeader(br) + if err != nil { + return xerrors.Errorf("failed to read uint64 for t.DealIDs slice: %w", err) + } + + if maj != cbg.MajUnsignedInt { + return xerrors.Errorf("value read for array t.DealIDs was not a uint, instead got %d", maj) + } + + t.DealIDs[i] = val + } + + // t.t.Allocated (uint64) (uint64) + + maj, extra, err = cbg.CborReadHeader(br) + if err != nil { + return err + } + if maj != cbg.MajUnsignedInt { + return fmt.Errorf("wrong type for uint64 field") + } + t.Allocated = uint64(extra) + // t.t.Committed (bool) (bool) + + maj, extra, err = cbg.CborReadHeader(br) + if err != nil { + return err + } + if maj != cbg.MajOther { + return fmt.Errorf("booleans must be major type 7") + } + switch extra { + case 20: + t.Committed = false + case 21: + t.Committed = true + default: + return fmt.Errorf("booleans are either major type 7, value 20 or 21 (got %d)", extra) + } + return nil +} diff --git a/storage/sector/store.go b/storage/sector/store.go index 52c45deef..8b7f59d5c 100644 --- a/storage/sector/store.go +++ b/storage/sector/store.go @@ -1,33 +1,32 @@ package sector import ( + "bytes" "context" "fmt" "io" + "math/bits" "sync" "github.com/filecoin-project/go-sectorbuilder/sealing_state" "github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore/namespace" - cbor "github.com/ipfs/go-ipld-cbor" logging "github.com/ipfs/go-log" "golang.org/x/xerrors" "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/lib/cborrpc" "github.com/filecoin-project/lotus/lib/sectorbuilder" "github.com/filecoin-project/lotus/node/modules/dtypes" ) -func init() { - cbor.RegisterCborType(dealMapping{}) -} - var log = logging.Logger("sectorstore") var sectorDealsPrefix = datastore.NewKey("/sectordeals") -type dealMapping struct { +type DealMapping struct { DealIDs []uint64 + Allocated uint64 Committed bool } @@ -70,10 +69,10 @@ func (s *Store) AddPiece(ref string, size uint64, r io.Reader, dealIDs ...uint64 k := datastore.NewKey(fmt.Sprint(sectorID)) e, err := s.deals.Get(k) - var deals dealMapping + var deals DealMapping switch err { case nil: - if err := cbor.DecodeInto(e, &deals); err != nil { + if err := cborrpc.ReadCborRPC(bytes.NewReader(e), &deals); err != nil { return 0, err } if deals.Committed { @@ -82,7 +81,9 @@ func (s *Store) AddPiece(ref string, size uint64, r io.Reader, dealIDs ...uint64 fallthrough case datastore.ErrNotFound: deals.DealIDs = append(deals.DealIDs, dealIDs...) - d, err := cbor.DumpObject(&deals) + deals.Allocated += size + + d, err := cborrpc.Dump(&deals) if err != nil { return 0, err } @@ -96,7 +97,40 @@ func (s *Store) AddPiece(ref string, size uint64, r io.Reader, dealIDs ...uint64 return sectorID, nil } -func (s *Store) DealsForCommit(sectorID uint64) ([]uint64, error) { +func (s *Store) PieceSizesToFill(sectorID uint64) ([]uint64, error) { + s.dealsLk.Lock() + defer s.dealsLk.Unlock() + + k := datastore.NewKey(fmt.Sprint(sectorID)) + e, err := s.deals.Get(k) + if err != nil { + return nil, err + } + var info DealMapping + if err := cborrpc.ReadCborRPC(bytes.NewReader(e), &info); err != nil { + return nil, err + } + if info.Allocated > s.sb.SectorSize() { + return nil, xerrors.Errorf("more data allocated in sector than should be able to fit: %d > %d", info.Allocated, s.sb.SectorSize()) + } + + return fillersFromRem(sectorbuilder.UserBytesForSectorSize(s.sb.SectorSize()) - info.Allocated) +} + +func fillersFromRem(toFill uint64) ([]uint64, error) { + toFill += toFill / 127 // convert to in-sector bytes for easier math + + out := make([]uint64, bits.OnesCount64(toFill)) + for i := range out { + next := bits.TrailingZeros64(toFill) + psize := uint64(1) << next + toFill ^= psize + out[i] = sectorbuilder.UserBytesForSectorSize(psize) + } + return out, nil +} + +func (s *Store) DealsForCommit(sectorID uint64, commit bool) ([]uint64, error) { s.dealsLk.Lock() defer s.dealsLk.Unlock() @@ -105,16 +139,20 @@ func (s *Store) DealsForCommit(sectorID uint64) ([]uint64, error) { switch err { case nil: - var deals dealMapping - if err := cbor.DecodeInto(e, &deals); err != nil { + var deals DealMapping + if err := cborrpc.ReadCborRPC(bytes.NewReader(e), &deals); err != nil { return nil, err } + if !commit { + return nil, nil + } + if deals.Committed { log.Errorf("getting deal IDs for sector %d: sector already marked as committed", sectorID) } deals.Committed = true - d, err := cbor.DumpObject(&deals) + d, err := cborrpc.Dump(&deals) if err != nil { return nil, err } diff --git a/storage/sector/store_test.go b/storage/sector/store_test.go index a2e1d00ac..613f2836b 100644 --- a/storage/sector/store_test.go +++ b/storage/sector/store_test.go @@ -1,12 +1,46 @@ package sector import ( - "gotest.tools/assert" "testing" + + "github.com/stretchr/testify/assert" + + "github.com/filecoin-project/lotus/lib/sectorbuilder" ) -func TestComputePaddedSize(t *testing.T) { - assert.Equal(t, uint64(1040384), computePaddedSize(1000000)) - assert.Equal(t, uint64(1016), computePaddedSize(548)) - assert.Equal(t, uint64(4064), computePaddedSize(2048)) +func testFill(t *testing.T, n uint64, exp []uint64) { + f, err := fillersFromRem(n) + assert.NoError(t, err) + assert.Equal(t, exp, f) + + var sum uint64 + for _, u := range f { + sum += u + } + assert.Equal(t, n, sum) +} + +func TestFillersFromRem(t *testing.T) { + for i := 8; i < 32; i++ { + // single + ub := sectorbuilder.UserBytesForSectorSize(uint64(1) << i) + testFill(t, ub, []uint64{ub}) + + // 2 + ub = sectorbuilder.UserBytesForSectorSize(uint64(5) << i) + ub1 := sectorbuilder.UserBytesForSectorSize(uint64(1) << i) + ub3 := sectorbuilder.UserBytesForSectorSize(uint64(4) << i) + testFill(t, ub, []uint64{ub1, ub3}) + + // 4 + ub = sectorbuilder.UserBytesForSectorSize(uint64(15) << i) + ub2 := sectorbuilder.UserBytesForSectorSize(uint64(2) << i) + ub4 := sectorbuilder.UserBytesForSectorSize(uint64(8) << i) + testFill(t, ub, []uint64{ub1, ub2, ub3, ub4}) + + // different 2 + ub = sectorbuilder.UserBytesForSectorSize(uint64(9) << i) + testFill(t, ub, []uint64{ub1, ub4}) + } + } diff --git a/storage/sector_states.go b/storage/sector_states.go index 1122ea9a6..b57a5e315 100644 --- a/storage/sector_states.go +++ b/storage/sector_states.go @@ -33,6 +33,32 @@ func (m *Miner) handle(ctx context.Context, sector SectorInfo, cb providerHandle }() } +func (m *Miner) finishPacking(ctx context.Context, sector SectorInfo) (func(*SectorInfo), error) { + log.Infow("performing filling up rest of the sector...", "sector", sector.SectorID) + + fillerSizes, err := m.secst.PieceSizesToFill(sector.SectorID) + if err != nil { + return nil, err + } + + if len(fillerSizes) > 0 { + log.Warnf("Creating %d filler pieces for sector %d", len(fillerSizes), sector.SectorID) + } + + ids, err := m.storeGarbage(ctx, fillerSizes...) + if err != nil { + return nil, xerrors.Errorf("filling up the sector (%v): %w", fillerSizes, err) + } + + for _, id := range ids { + if id != sector.SectorID { + panic("todo: pass SectorID into storeGarbage") + } + } + + return nil, nil +} + func (m *Miner) sealPreCommit(ctx context.Context, sector SectorInfo) (func(*SectorInfo), error) { log.Infow("performing sector replication...", "sector", sector.SectorID) sinfo, err := m.secst.SealPreCommit(ctx, sector.SectorID) @@ -135,7 +161,7 @@ func (m *Miner) committing(ctx context.Context, sector SectorInfo) (func(*Sector return nil, xerrors.Errorf("computing seal proof failed: %w", err) } - deals, err := m.secst.DealsForCommit(sector.SectorID) + deals, err := m.secst.DealsForCommit(sector.SectorID, true) if err != nil { return nil, err }