lotus/markets/storageadapter/provider.go
Aarsh Shah d7076778e2
integrate DAG store and CARv2 in deal-making (#6671)
This commit removes badger from the deal-making processes, and
moves to a new architecture with the dagstore as the cental
component on the miner-side, and CARv2s on the client-side.

Every deal that has been handed off to the sealing subsystem becomes
a shard in the dagstore. Shards are mounted via the LotusMount, which
teaches the dagstore how to load the related piece when serving
retrievals.

When the miner starts the Lotus for the first time with this patch,
we will perform a one-time migration of all active deals into the
dagstore. This is a lightweight process, and it consists simply
of registering the shards in the dagstore.

Shards are backed by the unsealed copy of the piece. This is currently
a CARv1. However, the dagstore keeps CARv2 indices for all pieces, so
when it's time to acquire a shard to serve a retrieval, the unsealed
CARv1 is joined with its index (safeguarded by the dagstore), to form
a read-only blockstore, thus taking the place of the monolithic
badger.

Data transfers have been adjusted to interface directly with CARv2 files.
On inbound transfers (client retrievals, miner storage deals), we stream
the received data into a CARv2 ReadWrite blockstore. On outbound transfers
(client storage deals, miner retrievals), we serve the data off a CARv2
ReadOnly blockstore.

Client-side imports are managed by the refactored *imports.Manager
component (when not using IPFS integration). Just like it before, we use
the go-filestore library to avoid duplicating the data from the original
file in the resulting UnixFS DAG (concretely the leaves). However, the
target of those imports are what we call "ref-CARv2s": CARv2 files placed
under the `$LOTUS_PATH/imports` directory, containing the intermediate
nodes in full, and the leaves as positional references to the original file
on disk.

Client-side retrievals are placed into CARv2 files in the location:
`$LOTUS_PATH/retrievals`.

A new set of `Dagstore*` JSON-RPC operations and `lotus-miner dagstore`
subcommands have been introduced on the miner-side to inspect and manage
the dagstore.

Despite moving to a CARv2-backed system, the IPFS integration has been
respected, and it continues to be possible to make storage deals with data
held in an IPFS node, and to perform retrievals directly into an IPFS node.

NOTE: because the "staging" and "client" Badger blockstores are no longer
used, existing imports on the client will be rendered useless. On startup,
Lotus will enumerate all imports and print WARN statements on the log for
each import that needs to be reimported. These log lines contain these
messages:

- import lacks carv2 path; import will not work; please reimport
- import has missing/broken carv2; please reimport

At the end, we will print a "sanity check completed" message indicating
the count of imports found, and how many were deemed broken.

Co-authored-by: Aarsh Shah <aarshkshah1992@gmail.com>
Co-authored-by: Dirk McCormick <dirkmdev@gmail.com>

Co-authored-by: Raúl Kripalani <raul@protocol.ai>
Co-authored-by: Dirk McCormick <dirkmdev@gmail.com>
2021-08-16 23:34:32 +01:00

421 lines
14 KiB
Go

package storageadapter
// this file implements storagemarket.StorageProviderNode
import (
"context"
"io"
"time"
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
"go.uber.org/fx"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-fil-markets/shared"
"github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/crypto"
"github.com/filecoin-project/go-state-types/exitcode"
market2 "github.com/filecoin-project/specs-actors/v2/actors/builtin/market"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/api/v1api"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/actors/builtin/market"
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
"github.com/filecoin-project/lotus/chain/events"
"github.com/filecoin-project/lotus/chain/events/state"
"github.com/filecoin-project/lotus/chain/types"
sealing "github.com/filecoin-project/lotus/extern/storage-sealing"
"github.com/filecoin-project/lotus/lib/sigs"
"github.com/filecoin-project/lotus/markets/utils"
"github.com/filecoin-project/lotus/node/config"
"github.com/filecoin-project/lotus/node/modules/helpers"
"github.com/filecoin-project/lotus/storage/sectorblocks"
)
var addPieceRetryWait = 5 * time.Minute
var addPieceRetryTimeout = 6 * time.Hour
var defaultMaxProviderCollateralMultiplier = uint64(2)
var log = logging.Logger("storageadapter")
type ProviderNodeAdapter struct {
v1api.FullNode
secb *sectorblocks.SectorBlocks
ev *events.Events
dealPublisher *DealPublisher
addBalanceSpec *api.MessageSendSpec
maxDealCollateralMultiplier uint64
dsMatcher *dealStateMatcher
scMgr *SectorCommittedManager
}
func NewProviderNodeAdapter(fc *config.MinerFeeConfig, dc *config.DealmakingConfig) func(mctx helpers.MetricsCtx, lc fx.Lifecycle, secb *sectorblocks.SectorBlocks, full v1api.FullNode, dealPublisher *DealPublisher) storagemarket.StorageProviderNode {
return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, secb *sectorblocks.SectorBlocks, full v1api.FullNode, dealPublisher *DealPublisher) storagemarket.StorageProviderNode {
ctx := helpers.LifecycleCtx(mctx, lc)
ev := events.NewEvents(ctx, full)
na := &ProviderNodeAdapter{
FullNode: full,
secb: secb,
ev: ev,
dealPublisher: dealPublisher,
dsMatcher: newDealStateMatcher(state.NewStatePredicates(state.WrapFastAPI(full))),
}
if fc != nil {
na.addBalanceSpec = &api.MessageSendSpec{MaxFee: abi.TokenAmount(fc.MaxMarketBalanceAddFee)}
}
na.maxDealCollateralMultiplier = defaultMaxProviderCollateralMultiplier
if dc != nil {
na.maxDealCollateralMultiplier = dc.MaxProviderCollateralMultiplier
}
na.scMgr = NewSectorCommittedManager(ev, na, &apiWrapper{api: full})
return na
}
}
func (n *ProviderNodeAdapter) PublishDeals(ctx context.Context, deal storagemarket.MinerDeal) (cid.Cid, error) {
return n.dealPublisher.Publish(ctx, deal.ClientDealProposal)
}
func (n *ProviderNodeAdapter) OnDealComplete(ctx context.Context, deal storagemarket.MinerDeal, pieceSize abi.UnpaddedPieceSize, pieceData io.Reader) (*storagemarket.PackingResult, error) {
if deal.PublishCid == nil {
return nil, xerrors.Errorf("deal.PublishCid can't be nil")
}
sdInfo := api.PieceDealInfo{
DealID: deal.DealID,
DealProposal: &deal.Proposal,
PublishCid: deal.PublishCid,
DealSchedule: api.DealSchedule{
StartEpoch: deal.ClientDealProposal.Proposal.StartEpoch,
EndEpoch: deal.ClientDealProposal.Proposal.EndEpoch,
},
KeepUnsealed: deal.FastRetrieval,
}
p, offset, err := n.secb.AddPiece(ctx, pieceSize, pieceData, sdInfo)
curTime := build.Clock.Now()
for build.Clock.Since(curTime) < addPieceRetryTimeout {
if !xerrors.Is(err, sealing.ErrTooManySectorsSealing) {
if err != nil {
log.Errorf("failed to addPiece for deal %d, err: %v", deal.DealID, err)
}
break
}
select {
case <-build.Clock.After(addPieceRetryWait):
p, offset, err = n.secb.AddPiece(ctx, pieceSize, pieceData, sdInfo)
case <-ctx.Done():
return nil, xerrors.New("context expired while waiting to retry AddPiece")
}
}
if err != nil {
return nil, xerrors.Errorf("AddPiece failed: %s", err)
}
log.Warnf("New Deal: deal %d", deal.DealID)
return &storagemarket.PackingResult{
SectorNumber: p,
Offset: offset,
Size: pieceSize.Padded(),
}, nil
}
func (n *ProviderNodeAdapter) VerifySignature(ctx context.Context, sig crypto.Signature, addr address.Address, input []byte, encodedTs shared.TipSetToken) (bool, error) {
addr, err := n.StateAccountKey(ctx, addr, types.EmptyTSK)
if err != nil {
return false, err
}
err = sigs.Verify(&sig, addr, input)
return err == nil, err
}
func (n *ProviderNodeAdapter) GetMinerWorkerAddress(ctx context.Context, maddr address.Address, tok shared.TipSetToken) (address.Address, error) {
tsk, err := types.TipSetKeyFromBytes(tok)
if err != nil {
return address.Undef, err
}
mi, err := n.StateMinerInfo(ctx, maddr, tsk)
if err != nil {
return address.Address{}, err
}
return mi.Worker, nil
}
func (n *ProviderNodeAdapter) GetProofType(ctx context.Context, maddr address.Address, tok shared.TipSetToken) (abi.RegisteredSealProof, error) {
tsk, err := types.TipSetKeyFromBytes(tok)
if err != nil {
return 0, err
}
mi, err := n.StateMinerInfo(ctx, maddr, tsk)
if err != nil {
return 0, err
}
nver, err := n.StateNetworkVersion(ctx, tsk)
if err != nil {
return 0, err
}
return miner.PreferredSealProofTypeFromWindowPoStType(nver, mi.WindowPoStProofType)
}
func (n *ProviderNodeAdapter) SignBytes(ctx context.Context, signer address.Address, b []byte) (*crypto.Signature, error) {
signer, err := n.StateAccountKey(ctx, signer, types.EmptyTSK)
if err != nil {
return nil, err
}
localSignature, err := n.WalletSign(ctx, signer, b)
if err != nil {
return nil, err
}
return localSignature, nil
}
func (n *ProviderNodeAdapter) ReserveFunds(ctx context.Context, wallet, addr address.Address, amt abi.TokenAmount) (cid.Cid, error) {
return n.MarketReserveFunds(ctx, wallet, addr, amt)
}
func (n *ProviderNodeAdapter) ReleaseFunds(ctx context.Context, addr address.Address, amt abi.TokenAmount) error {
return n.MarketReleaseFunds(ctx, addr, amt)
}
// Adds funds with the StorageMinerActor for a storage participant. Used by both providers and clients.
func (n *ProviderNodeAdapter) AddFunds(ctx context.Context, addr address.Address, amount abi.TokenAmount) (cid.Cid, error) {
// (Provider Node API)
smsg, err := n.MpoolPushMessage(ctx, &types.Message{
To: market.Address,
From: addr,
Value: amount,
Method: market.Methods.AddBalance,
}, n.addBalanceSpec)
if err != nil {
return cid.Undef, err
}
return smsg.Cid(), nil
}
func (n *ProviderNodeAdapter) GetBalance(ctx context.Context, addr address.Address, encodedTs shared.TipSetToken) (storagemarket.Balance, error) {
tsk, err := types.TipSetKeyFromBytes(encodedTs)
if err != nil {
return storagemarket.Balance{}, err
}
bal, err := n.StateMarketBalance(ctx, addr, tsk)
if err != nil {
return storagemarket.Balance{}, err
}
return utils.ToSharedBalance(bal), nil
}
// TODO: why doesnt this method take in a sector ID?
func (n *ProviderNodeAdapter) LocatePieceForDealWithinSector(ctx context.Context, dealID abi.DealID, encodedTs shared.TipSetToken) (sectorID abi.SectorNumber, offset abi.PaddedPieceSize, length abi.PaddedPieceSize, err error) {
refs, err := n.secb.GetRefs(dealID)
if err != nil {
return 0, 0, 0, err
}
if len(refs) == 0 {
return 0, 0, 0, xerrors.New("no sector information for deal ID")
}
// TODO: better strategy (e.g. look for already unsealed)
var best api.SealedRef
var bestSi api.SectorInfo
for _, r := range refs {
si, err := n.secb.SectorBuilder.SectorsStatus(ctx, r.SectorID, false)
if err != nil {
return 0, 0, 0, xerrors.Errorf("getting sector info: %w", err)
}
if si.State == api.SectorState(sealing.Proving) {
best = r
bestSi = si
break
}
}
if bestSi.State == api.SectorState(sealing.UndefinedSectorState) {
return 0, 0, 0, xerrors.New("no sealed sector found")
}
return best.SectorID, best.Offset, best.Size.Padded(), nil
}
func (n *ProviderNodeAdapter) DealProviderCollateralBounds(ctx context.Context, size abi.PaddedPieceSize, isVerified bool) (abi.TokenAmount, abi.TokenAmount, error) {
bounds, err := n.StateDealProviderCollateralBounds(ctx, size, isVerified, types.EmptyTSK)
if err != nil {
return abi.TokenAmount{}, abi.TokenAmount{}, err
}
// The maximum amount of collateral that the provider will put into escrow
// for a deal is calculated as a multiple of the minimum bounded amount
max := types.BigMul(bounds.Min, types.NewInt(n.maxDealCollateralMultiplier))
return bounds.Min, max, nil
}
// TODO: Remove dealID parameter, change publishCid to be cid.Cid (instead of pointer)
func (n *ProviderNodeAdapter) OnDealSectorPreCommitted(ctx context.Context, provider address.Address, dealID abi.DealID, proposal market2.DealProposal, publishCid *cid.Cid, cb storagemarket.DealSectorPreCommittedCallback) error {
return n.scMgr.OnDealSectorPreCommitted(ctx, provider, market.DealProposal(proposal), *publishCid, cb)
}
// TODO: Remove dealID parameter, change publishCid to be cid.Cid (instead of pointer)
func (n *ProviderNodeAdapter) OnDealSectorCommitted(ctx context.Context, provider address.Address, dealID abi.DealID, sectorNumber abi.SectorNumber, proposal market2.DealProposal, publishCid *cid.Cid, cb storagemarket.DealSectorCommittedCallback) error {
return n.scMgr.OnDealSectorCommitted(ctx, provider, sectorNumber, market.DealProposal(proposal), *publishCid, cb)
}
func (n *ProviderNodeAdapter) GetChainHead(ctx context.Context) (shared.TipSetToken, abi.ChainEpoch, error) {
head, err := n.ChainHead(ctx)
if err != nil {
return nil, 0, err
}
return head.Key().Bytes(), head.Height(), nil
}
func (n *ProviderNodeAdapter) WaitForMessage(ctx context.Context, mcid cid.Cid, cb func(code exitcode.ExitCode, bytes []byte, finalCid cid.Cid, err error) error) error {
receipt, err := n.StateWaitMsg(ctx, mcid, 2*build.MessageConfidence, api.LookbackNoLimit, true)
if err != nil {
return cb(0, nil, cid.Undef, err)
}
return cb(receipt.Receipt.ExitCode, receipt.Receipt.Return, receipt.Message, nil)
}
func (n *ProviderNodeAdapter) WaitForPublishDeals(ctx context.Context, publishCid cid.Cid, proposal market2.DealProposal) (*storagemarket.PublishDealsWaitResult, error) {
// Wait for deal to be published (plus additional time for confidence)
receipt, err := n.StateWaitMsg(ctx, publishCid, 2*build.MessageConfidence, api.LookbackNoLimit, true)
if err != nil {
return nil, xerrors.Errorf("WaitForPublishDeals errored: %w", err)
}
if receipt.Receipt.ExitCode != exitcode.Ok {
return nil, xerrors.Errorf("WaitForPublishDeals exit code: %s", receipt.Receipt.ExitCode)
}
// The deal ID may have changed since publish if there was a reorg, so
// get the current deal ID
head, err := n.ChainHead(ctx)
if err != nil {
return nil, xerrors.Errorf("WaitForPublishDeals failed to get chain head: %w", err)
}
res, err := n.scMgr.dealInfo.GetCurrentDealInfo(ctx, head.Key().Bytes(), (*market.DealProposal)(&proposal), publishCid)
if err != nil {
return nil, xerrors.Errorf("WaitForPublishDeals getting deal info errored: %w", err)
}
return &storagemarket.PublishDealsWaitResult{DealID: res.DealID, FinalCid: receipt.Message}, nil
}
func (n *ProviderNodeAdapter) GetDataCap(ctx context.Context, addr address.Address, encodedTs shared.TipSetToken) (*abi.StoragePower, error) {
tsk, err := types.TipSetKeyFromBytes(encodedTs)
if err != nil {
return nil, err
}
sp, err := n.StateVerifiedClientStatus(ctx, addr, tsk)
return sp, err
}
func (n *ProviderNodeAdapter) OnDealExpiredOrSlashed(ctx context.Context, dealID abi.DealID, onDealExpired storagemarket.DealExpiredCallback, onDealSlashed storagemarket.DealSlashedCallback) error {
head, err := n.ChainHead(ctx)
if err != nil {
return xerrors.Errorf("client: failed to get chain head: %w", err)
}
sd, err := n.StateMarketStorageDeal(ctx, dealID, head.Key())
if err != nil {
return xerrors.Errorf("client: failed to look up deal %d on chain: %w", dealID, err)
}
// Called immediately to check if the deal has already expired or been slashed
checkFunc := func(ts *types.TipSet) (done bool, more bool, err error) {
if ts == nil {
// keep listening for events
return false, true, nil
}
// Check if the deal has already expired
if sd.Proposal.EndEpoch <= ts.Height() {
onDealExpired(nil)
return true, false, nil
}
// If there is no deal assume it's already been slashed
if sd.State.SectorStartEpoch < 0 {
onDealSlashed(ts.Height(), nil)
return true, false, nil
}
// No events have occurred yet, so return
// done: false, more: true (keep listening for events)
return false, true, nil
}
// Called when there was a match against the state change we're looking for
// and the chain has advanced to the confidence height
stateChanged := func(ts *types.TipSet, ts2 *types.TipSet, states events.StateChange, h abi.ChainEpoch) (more bool, err error) {
// Check if the deal has already expired
if ts2 == nil || sd.Proposal.EndEpoch <= ts2.Height() {
onDealExpired(nil)
return false, nil
}
// Timeout waiting for state change
if states == nil {
log.Error("timed out waiting for deal expiry")
return false, nil
}
changedDeals, ok := states.(state.ChangedDeals)
if !ok {
panic("Expected state.ChangedDeals")
}
deal, ok := changedDeals[dealID]
if !ok {
// No change to deal
return true, nil
}
// Deal was slashed
if deal.To == nil {
onDealSlashed(ts2.Height(), nil)
return false, nil
}
return true, nil
}
// Called when there was a chain reorg and the state change was reverted
revert := func(ctx context.Context, ts *types.TipSet) error {
// TODO: Is it ok to just ignore this?
log.Warn("deal state reverted; TODO: actually handle this!")
return nil
}
// Watch for state changes to the deal
match := n.dsMatcher.matcher(ctx, dealID)
// Wait until after the end epoch for the deal and then timeout
timeout := (sd.Proposal.EndEpoch - head.Height()) + 1
if err := n.ev.StateChanged(checkFunc, stateChanged, revert, int(build.MessageConfidence)+1, timeout, match); err != nil {
return xerrors.Errorf("failed to set up state changed handler: %w", err)
}
return nil
}
var _ storagemarket.StorageProviderNode = &ProviderNodeAdapter{}