lotus/storage/sealing/garbage.go

151 lines
4.0 KiB
Go
Raw Normal View History

package sealing
import (
"bytes"
"context"
"io"
"math"
2020-01-30 06:41:30 +00:00
"math/bits"
"math/rand"
2020-02-08 02:18:32 +00:00
commcid "github.com/filecoin-project/go-fil-commcid"
"github.com/filecoin-project/specs-actors/actors/abi"
2020-02-11 20:48:03 +00:00
"github.com/filecoin-project/specs-actors/actors/builtin"
2020-02-08 02:18:32 +00:00
"github.com/filecoin-project/specs-actors/actors/builtin/market"
"golang.org/x/xerrors"
"github.com/filecoin-project/lotus/chain/actors"
"github.com/filecoin-project/lotus/chain/types"
)
2020-02-08 02:18:32 +00:00
func (m *Sealing) pledgeReader(size abi.UnpaddedPieceSize, parts uint64) io.Reader {
2020-01-30 06:41:30 +00:00
parts = 1 << bits.Len64(parts) // round down to nearest power of 2
2020-02-08 02:18:32 +00:00
if uint64(size)/parts < 127 {
parts = uint64(size) / 127
2020-01-31 19:07:20 +00:00
}
2020-01-30 06:41:30 +00:00
2020-02-08 02:18:32 +00:00
piece := abi.PaddedPieceSize(uint64(size.Padded()) / parts).Unpadded()
readers := make([]io.Reader, parts)
for i := range readers {
readers[i] = io.LimitReader(rand.New(rand.NewSource(42+int64(i))), int64(piece))
}
return io.MultiReader(readers...)
}
2020-02-08 02:18:32 +00:00
func (m *Sealing) pledgeSector(ctx context.Context, sectorID abi.SectorNumber, existingPieceSizes []abi.UnpaddedPieceSize, sizes ...abi.UnpaddedPieceSize) ([]Piece, error) {
2019-11-07 19:54:24 +00:00
if len(sizes) == 0 {
return nil, nil
}
log.Infof("Pledge %d, contains %+v", sectorID, existingPieceSizes)
2020-02-08 02:18:32 +00:00
deals := make([]market.ClientDealProposal, len(sizes))
2019-11-06 23:09:48 +00:00
for i, size := range sizes {
commP, err := m.fastPledgeCommitment(size, uint64(1))
if err != nil {
2019-11-06 23:09:48 +00:00
return nil, err
}
2020-02-08 02:18:32 +00:00
sdp := market.DealProposal{
PieceCID: commcid.PieceCommitmentV1ToCID(commP[:]),
PieceSize: size.Padded(),
Client: m.worker,
Provider: m.maddr,
2020-02-08 02:18:37 +00:00
StartEpoch: math.MaxInt64,
2020-02-08 02:18:32 +00:00
EndEpoch: math.MaxInt64,
StoragePricePerEpoch: types.NewInt(0),
2020-02-08 02:18:37 +00:00
ProviderCollateral: types.NewInt(0),
}
2020-02-08 02:18:32 +00:00
deals[i] = market.ClientDealProposal{
2020-02-08 02:18:37 +00:00
Proposal: sdp,
2020-02-08 02:18:32 +00:00
}
2019-11-06 23:09:48 +00:00
}
log.Infof("Publishing deals for %d", sectorID)
2020-02-11 20:48:03 +00:00
params, aerr := actors.SerializeParams(&market.PublishStorageDealsParams{
2019-11-06 23:09:48 +00:00
Deals: deals,
})
if aerr != nil {
return nil, xerrors.Errorf("serializing PublishStorageDeals params failed: ", aerr)
}
smsg, err := m.api.MpoolPushMessage(ctx, &types.Message{
To: actors.StorageMarketAddress,
From: m.worker,
Value: types.NewInt(0),
GasPrice: types.NewInt(0),
GasLimit: types.NewInt(1000000),
2020-02-11 20:48:03 +00:00
Method: builtin.MethodsMarket.PublishStorageDeals,
2019-11-06 23:09:48 +00:00
Params: params,
})
if err != nil {
return nil, err
}
r, err := m.api.StateWaitMsg(ctx, smsg.Cid()) // TODO: more finality
2019-11-06 23:09:48 +00:00
if err != nil {
return nil, err
}
if r.Receipt.ExitCode != 0 {
log.Error(xerrors.Errorf("publishing deal failed: exit %d", r.Receipt.ExitCode))
}
var resp actors.PublishStorageDealResponse
if err := resp.UnmarshalCBOR(bytes.NewReader(r.Receipt.Return)); err != nil {
return nil, err
}
2020-02-08 02:18:32 +00:00
if len(resp.IDs) != len(sizes) {
2019-11-06 23:09:48 +00:00
return nil, xerrors.New("got unexpected number of DealIDs from PublishStorageDeals")
}
2020-02-08 02:18:32 +00:00
log.Infof("Deals for sector %d: %+v", sectorID, resp.IDs)
2019-11-06 23:09:48 +00:00
out := make([]Piece, len(sizes))
2019-11-06 23:09:48 +00:00
for i, size := range sizes {
ppi, err := m.sb.AddPiece(ctx, size, sectorID, m.pledgeReader(size, uint64(1)), existingPieceSizes)
if err != nil {
return nil, xerrors.Errorf("add piece: %w", err)
}
existingPieceSizes = append(existingPieceSizes, size)
2019-11-07 18:22:59 +00:00
out[i] = Piece{
2020-02-08 02:18:32 +00:00
DealID: resp.IDs[i],
Size: abi.UnpaddedPieceSize(ppi.Size),
2019-11-07 18:22:59 +00:00
CommP: ppi.CommP[:],
}
2019-11-06 23:09:48 +00:00
}
2019-11-07 18:22:59 +00:00
return out, nil
2019-11-06 23:09:48 +00:00
}
func (m *Sealing) PledgeSector() error {
2019-11-06 23:09:48 +00:00
go func() {
2019-11-08 23:06:07 +00:00
ctx := context.TODO() // we can't use the context from command which invokes
// this, as we run everything here async, and it's cancelled when the
// command exits
2020-02-08 02:18:32 +00:00
size := abi.PaddedPieceSize(m.sb.SectorSize()).Unpadded()
2019-11-06 23:09:48 +00:00
2020-02-11 01:10:50 +00:00
sid, err := m.sb.AcquireSectorNumber()
if err != nil {
2019-11-06 23:09:48 +00:00
log.Errorf("%+v", err)
return
}
2020-02-08 02:18:32 +00:00
pieces, err := m.pledgeSector(ctx, sid, []abi.UnpaddedPieceSize{}, abi.UnpaddedPieceSize(size))
2019-11-07 18:22:59 +00:00
if err != nil {
2019-11-06 23:09:48 +00:00
log.Errorf("%+v", err)
return
}
2019-12-01 17:58:31 +00:00
if err := m.newSector(context.TODO(), sid, pieces[0].DealID, pieces[0].ppi()); err != nil {
2019-11-07 18:22:59 +00:00
log.Errorf("%+v", err)
return
}
}()
return nil
}