d7076778e2
This commit removes badger from the deal-making processes, and moves to a new architecture with the dagstore as the cental component on the miner-side, and CARv2s on the client-side. Every deal that has been handed off to the sealing subsystem becomes a shard in the dagstore. Shards are mounted via the LotusMount, which teaches the dagstore how to load the related piece when serving retrievals. When the miner starts the Lotus for the first time with this patch, we will perform a one-time migration of all active deals into the dagstore. This is a lightweight process, and it consists simply of registering the shards in the dagstore. Shards are backed by the unsealed copy of the piece. This is currently a CARv1. However, the dagstore keeps CARv2 indices for all pieces, so when it's time to acquire a shard to serve a retrieval, the unsealed CARv1 is joined with its index (safeguarded by the dagstore), to form a read-only blockstore, thus taking the place of the monolithic badger. Data transfers have been adjusted to interface directly with CARv2 files. On inbound transfers (client retrievals, miner storage deals), we stream the received data into a CARv2 ReadWrite blockstore. On outbound transfers (client storage deals, miner retrievals), we serve the data off a CARv2 ReadOnly blockstore. Client-side imports are managed by the refactored *imports.Manager component (when not using IPFS integration). Just like it before, we use the go-filestore library to avoid duplicating the data from the original file in the resulting UnixFS DAG (concretely the leaves). However, the target of those imports are what we call "ref-CARv2s": CARv2 files placed under the `$LOTUS_PATH/imports` directory, containing the intermediate nodes in full, and the leaves as positional references to the original file on disk. Client-side retrievals are placed into CARv2 files in the location: `$LOTUS_PATH/retrievals`. A new set of `Dagstore*` JSON-RPC operations and `lotus-miner dagstore` subcommands have been introduced on the miner-side to inspect and manage the dagstore. Despite moving to a CARv2-backed system, the IPFS integration has been respected, and it continues to be possible to make storage deals with data held in an IPFS node, and to perform retrievals directly into an IPFS node. NOTE: because the "staging" and "client" Badger blockstores are no longer used, existing imports on the client will be rendered useless. On startup, Lotus will enumerate all imports and print WARN statements on the log for each import that needs to be reimported. These log lines contain these messages: - import lacks carv2 path; import will not work; please reimport - import has missing/broken carv2; please reimport At the end, we will print a "sanity check completed" message indicating the count of imports found, and how many were deemed broken. Co-authored-by: Aarsh Shah <aarshkshah1992@gmail.com> Co-authored-by: Dirk McCormick <dirkmdev@gmail.com> Co-authored-by: Raúl Kripalani <raul@protocol.ai> Co-authored-by: Dirk McCormick <dirkmdev@gmail.com>
983 lines
30 KiB
Go
983 lines
30 KiB
Go
package impl
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"net/http"
|
|
"os"
|
|
"sort"
|
|
"strconv"
|
|
"time"
|
|
|
|
"github.com/filecoin-project/dagstore"
|
|
"github.com/filecoin-project/dagstore/shard"
|
|
"github.com/filecoin-project/go-jsonrpc/auth"
|
|
|
|
"github.com/filecoin-project/lotus/chain/actors/builtin"
|
|
"github.com/filecoin-project/lotus/chain/gen"
|
|
|
|
"github.com/google/uuid"
|
|
"github.com/ipfs/go-cid"
|
|
"github.com/libp2p/go-libp2p-core/host"
|
|
"github.com/libp2p/go-libp2p-core/peer"
|
|
"go.uber.org/fx"
|
|
"golang.org/x/xerrors"
|
|
|
|
"github.com/filecoin-project/lotus/build"
|
|
|
|
"github.com/filecoin-project/go-address"
|
|
datatransfer "github.com/filecoin-project/go-data-transfer"
|
|
"github.com/filecoin-project/go-fil-markets/piecestore"
|
|
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
|
|
"github.com/filecoin-project/go-fil-markets/storagemarket"
|
|
"github.com/filecoin-project/go-state-types/abi"
|
|
|
|
sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage"
|
|
"github.com/filecoin-project/lotus/extern/sector-storage/fsutil"
|
|
"github.com/filecoin-project/lotus/extern/sector-storage/stores"
|
|
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
|
|
sealing "github.com/filecoin-project/lotus/extern/storage-sealing"
|
|
"github.com/filecoin-project/lotus/extern/storage-sealing/sealiface"
|
|
|
|
sto "github.com/filecoin-project/specs-storage/storage"
|
|
|
|
"github.com/filecoin-project/lotus/api"
|
|
apitypes "github.com/filecoin-project/lotus/api/types"
|
|
"github.com/filecoin-project/lotus/chain/types"
|
|
"github.com/filecoin-project/lotus/markets/storageadapter"
|
|
"github.com/filecoin-project/lotus/miner"
|
|
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
|
"github.com/filecoin-project/lotus/storage"
|
|
"github.com/filecoin-project/lotus/storage/sectorblocks"
|
|
)
|
|
|
|
type StorageMinerAPI struct {
|
|
fx.In
|
|
|
|
api.Common
|
|
api.Net
|
|
|
|
EnabledSubsystems api.MinerSubsystems
|
|
|
|
Full api.FullNode
|
|
LocalStore *stores.Local
|
|
RemoteStore *stores.Remote
|
|
|
|
// Markets
|
|
PieceStore dtypes.ProviderPieceStore `optional:"true"`
|
|
StorageProvider storagemarket.StorageProvider `optional:"true"`
|
|
RetrievalProvider retrievalmarket.RetrievalProvider `optional:"true"`
|
|
SectorAccessor retrievalmarket.SectorAccessor `optional:"true"`
|
|
DataTransfer dtypes.ProviderDataTransfer `optional:"true"`
|
|
DealPublisher *storageadapter.DealPublisher `optional:"true"`
|
|
SectorBlocks *sectorblocks.SectorBlocks `optional:"true"`
|
|
Host host.Host `optional:"true"`
|
|
DAGStore *dagstore.DAGStore `optional:"true"`
|
|
|
|
// Miner / storage
|
|
Miner *storage.Miner `optional:"true"`
|
|
BlockMiner *miner.Miner `optional:"true"`
|
|
StorageMgr *sectorstorage.Manager `optional:"true"`
|
|
IStorageMgr sectorstorage.SectorManager `optional:"true"`
|
|
stores.SectorIndex
|
|
storiface.WorkerReturn `optional:"true"`
|
|
AddrSel *storage.AddressSelector
|
|
|
|
Epp gen.WinningPoStProver `optional:"true"`
|
|
DS dtypes.MetadataDS
|
|
|
|
ConsiderOnlineStorageDealsConfigFunc dtypes.ConsiderOnlineStorageDealsConfigFunc `optional:"true"`
|
|
SetConsiderOnlineStorageDealsConfigFunc dtypes.SetConsiderOnlineStorageDealsConfigFunc `optional:"true"`
|
|
ConsiderOnlineRetrievalDealsConfigFunc dtypes.ConsiderOnlineRetrievalDealsConfigFunc `optional:"true"`
|
|
SetConsiderOnlineRetrievalDealsConfigFunc dtypes.SetConsiderOnlineRetrievalDealsConfigFunc `optional:"true"`
|
|
StorageDealPieceCidBlocklistConfigFunc dtypes.StorageDealPieceCidBlocklistConfigFunc `optional:"true"`
|
|
SetStorageDealPieceCidBlocklistConfigFunc dtypes.SetStorageDealPieceCidBlocklistConfigFunc `optional:"true"`
|
|
ConsiderOfflineStorageDealsConfigFunc dtypes.ConsiderOfflineStorageDealsConfigFunc `optional:"true"`
|
|
SetConsiderOfflineStorageDealsConfigFunc dtypes.SetConsiderOfflineStorageDealsConfigFunc `optional:"true"`
|
|
ConsiderOfflineRetrievalDealsConfigFunc dtypes.ConsiderOfflineRetrievalDealsConfigFunc `optional:"true"`
|
|
SetConsiderOfflineRetrievalDealsConfigFunc dtypes.SetConsiderOfflineRetrievalDealsConfigFunc `optional:"true"`
|
|
ConsiderVerifiedStorageDealsConfigFunc dtypes.ConsiderVerifiedStorageDealsConfigFunc `optional:"true"`
|
|
SetConsiderVerifiedStorageDealsConfigFunc dtypes.SetConsiderVerifiedStorageDealsConfigFunc `optional:"true"`
|
|
ConsiderUnverifiedStorageDealsConfigFunc dtypes.ConsiderUnverifiedStorageDealsConfigFunc `optional:"true"`
|
|
SetConsiderUnverifiedStorageDealsConfigFunc dtypes.SetConsiderUnverifiedStorageDealsConfigFunc `optional:"true"`
|
|
SetSealingConfigFunc dtypes.SetSealingConfigFunc `optional:"true"`
|
|
GetSealingConfigFunc dtypes.GetSealingConfigFunc `optional:"true"`
|
|
GetExpectedSealDurationFunc dtypes.GetExpectedSealDurationFunc `optional:"true"`
|
|
SetExpectedSealDurationFunc dtypes.SetExpectedSealDurationFunc `optional:"true"`
|
|
}
|
|
|
|
var _ api.StorageMiner = &StorageMinerAPI{}
|
|
|
|
func (sm *StorageMinerAPI) ServeRemote(perm bool) func(w http.ResponseWriter, r *http.Request) {
|
|
return func(w http.ResponseWriter, r *http.Request) {
|
|
if perm == true {
|
|
if !auth.HasPerm(r.Context(), nil, api.PermAdmin) {
|
|
w.WriteHeader(401)
|
|
_ = json.NewEncoder(w).Encode(struct{ Error string }{"unauthorized: missing write permission"})
|
|
return
|
|
}
|
|
}
|
|
|
|
sm.StorageMgr.ServeHTTP(w, r)
|
|
}
|
|
}
|
|
|
|
func (sm *StorageMinerAPI) WorkerStats(context.Context) (map[uuid.UUID]storiface.WorkerStats, error) {
|
|
return sm.StorageMgr.WorkerStats(), nil
|
|
}
|
|
|
|
func (sm *StorageMinerAPI) WorkerJobs(ctx context.Context) (map[uuid.UUID][]storiface.WorkerJob, error) {
|
|
return sm.StorageMgr.WorkerJobs(), nil
|
|
}
|
|
|
|
func (sm *StorageMinerAPI) ActorAddress(context.Context) (address.Address, error) {
|
|
return sm.Miner.Address(), nil
|
|
}
|
|
|
|
func (sm *StorageMinerAPI) MiningBase(ctx context.Context) (*types.TipSet, error) {
|
|
mb, err := sm.BlockMiner.GetBestMiningCandidate(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return mb.TipSet, nil
|
|
}
|
|
|
|
func (sm *StorageMinerAPI) ActorSectorSize(ctx context.Context, addr address.Address) (abi.SectorSize, error) {
|
|
mi, err := sm.Full.StateMinerInfo(ctx, addr, types.EmptyTSK)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
return mi.SectorSize, nil
|
|
}
|
|
|
|
func (sm *StorageMinerAPI) PledgeSector(ctx context.Context) (abi.SectorID, error) {
|
|
sr, err := sm.Miner.PledgeSector(ctx)
|
|
if err != nil {
|
|
return abi.SectorID{}, err
|
|
}
|
|
|
|
// wait for the sector to enter the Packing state
|
|
// TODO: instead of polling implement some pubsub-type thing in storagefsm
|
|
for {
|
|
info, err := sm.Miner.SectorsStatus(ctx, sr.ID.Number, false)
|
|
if err != nil {
|
|
return abi.SectorID{}, xerrors.Errorf("getting pledged sector info: %w", err)
|
|
}
|
|
|
|
if info.State != api.SectorState(sealing.UndefinedSectorState) {
|
|
return sr.ID, nil
|
|
}
|
|
|
|
select {
|
|
case <-time.After(10 * time.Millisecond):
|
|
case <-ctx.Done():
|
|
return abi.SectorID{}, ctx.Err()
|
|
}
|
|
}
|
|
}
|
|
|
|
func (sm *StorageMinerAPI) SectorsStatus(ctx context.Context, sid abi.SectorNumber, showOnChainInfo bool) (api.SectorInfo, error) {
|
|
sInfo, err := sm.Miner.SectorsStatus(ctx, sid, false)
|
|
if err != nil {
|
|
return api.SectorInfo{}, err
|
|
}
|
|
|
|
if !showOnChainInfo {
|
|
return sInfo, nil
|
|
}
|
|
|
|
onChainInfo, err := sm.Full.StateSectorGetInfo(ctx, sm.Miner.Address(), sid, types.EmptyTSK)
|
|
if err != nil {
|
|
return sInfo, err
|
|
}
|
|
if onChainInfo == nil {
|
|
return sInfo, nil
|
|
}
|
|
sInfo.SealProof = onChainInfo.SealProof
|
|
sInfo.Activation = onChainInfo.Activation
|
|
sInfo.Expiration = onChainInfo.Expiration
|
|
sInfo.DealWeight = onChainInfo.DealWeight
|
|
sInfo.VerifiedDealWeight = onChainInfo.VerifiedDealWeight
|
|
sInfo.InitialPledge = onChainInfo.InitialPledge
|
|
|
|
ex, err := sm.Full.StateSectorExpiration(ctx, sm.Miner.Address(), sid, types.EmptyTSK)
|
|
if err != nil {
|
|
return sInfo, nil
|
|
}
|
|
sInfo.OnTime = ex.OnTime
|
|
sInfo.Early = ex.Early
|
|
|
|
return sInfo, nil
|
|
}
|
|
|
|
func (sm *StorageMinerAPI) SectorAddPieceToAny(ctx context.Context, size abi.UnpaddedPieceSize, r sto.Data, d api.PieceDealInfo) (api.SectorOffset, error) {
|
|
return sm.Miner.SectorAddPieceToAny(ctx, size, r, d)
|
|
}
|
|
|
|
func (sm *StorageMinerAPI) SectorsUnsealPiece(ctx context.Context, sector sto.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, commd *cid.Cid) error {
|
|
return sm.StorageMgr.SectorsUnsealPiece(ctx, sector, offset, size, randomness, commd)
|
|
}
|
|
|
|
// List all staged sectors
|
|
func (sm *StorageMinerAPI) SectorsList(context.Context) ([]abi.SectorNumber, error) {
|
|
sectors, err := sm.Miner.ListSectors()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
out := make([]abi.SectorNumber, 0, len(sectors))
|
|
for _, sector := range sectors {
|
|
if sector.State == sealing.UndefinedSectorState {
|
|
continue // sector ID not set yet
|
|
}
|
|
|
|
out = append(out, sector.SectorNumber)
|
|
}
|
|
return out, nil
|
|
}
|
|
|
|
func (sm *StorageMinerAPI) SectorsListInStates(ctx context.Context, states []api.SectorState) ([]abi.SectorNumber, error) {
|
|
filterStates := make(map[sealing.SectorState]struct{})
|
|
for _, state := range states {
|
|
st := sealing.SectorState(state)
|
|
if _, ok := sealing.ExistSectorStateList[st]; !ok {
|
|
continue
|
|
}
|
|
filterStates[st] = struct{}{}
|
|
}
|
|
|
|
var sns []abi.SectorNumber
|
|
if len(filterStates) == 0 {
|
|
return sns, nil
|
|
}
|
|
|
|
sectors, err := sm.Miner.ListSectors()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
for i := range sectors {
|
|
if _, ok := filterStates[sectors[i].State]; ok {
|
|
sns = append(sns, sectors[i].SectorNumber)
|
|
}
|
|
}
|
|
return sns, nil
|
|
}
|
|
|
|
func (sm *StorageMinerAPI) SectorsSummary(ctx context.Context) (map[api.SectorState]int, error) {
|
|
sectors, err := sm.Miner.ListSectors()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
out := make(map[api.SectorState]int)
|
|
for i := range sectors {
|
|
state := api.SectorState(sectors[i].State)
|
|
out[state]++
|
|
}
|
|
|
|
return out, nil
|
|
}
|
|
|
|
func (sm *StorageMinerAPI) StorageLocal(ctx context.Context) (map[stores.ID]string, error) {
|
|
l, err := sm.LocalStore.Local(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
out := map[stores.ID]string{}
|
|
for _, st := range l {
|
|
out[st.ID] = st.LocalPath
|
|
}
|
|
|
|
return out, nil
|
|
}
|
|
|
|
func (sm *StorageMinerAPI) SectorsRefs(context.Context) (map[string][]api.SealedRef, error) {
|
|
// json can't handle cids as map keys
|
|
out := map[string][]api.SealedRef{}
|
|
|
|
refs, err := sm.SectorBlocks.List()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
for k, v := range refs {
|
|
out[strconv.FormatUint(k, 10)] = v
|
|
}
|
|
|
|
return out, nil
|
|
}
|
|
|
|
func (sm *StorageMinerAPI) StorageStat(ctx context.Context, id stores.ID) (fsutil.FsStat, error) {
|
|
return sm.RemoteStore.FsStat(ctx, id)
|
|
}
|
|
|
|
func (sm *StorageMinerAPI) SectorStartSealing(ctx context.Context, number abi.SectorNumber) error {
|
|
return sm.Miner.StartPackingSector(number)
|
|
}
|
|
|
|
func (sm *StorageMinerAPI) SectorSetSealDelay(ctx context.Context, delay time.Duration) error {
|
|
cfg, err := sm.GetSealingConfigFunc()
|
|
if err != nil {
|
|
return xerrors.Errorf("get config: %w", err)
|
|
}
|
|
|
|
cfg.WaitDealsDelay = delay
|
|
|
|
return sm.SetSealingConfigFunc(cfg)
|
|
}
|
|
|
|
func (sm *StorageMinerAPI) SectorGetSealDelay(ctx context.Context) (time.Duration, error) {
|
|
cfg, err := sm.GetSealingConfigFunc()
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
return cfg.WaitDealsDelay, nil
|
|
}
|
|
|
|
func (sm *StorageMinerAPI) SectorSetExpectedSealDuration(ctx context.Context, delay time.Duration) error {
|
|
return sm.SetExpectedSealDurationFunc(delay)
|
|
}
|
|
|
|
func (sm *StorageMinerAPI) SectorGetExpectedSealDuration(ctx context.Context) (time.Duration, error) {
|
|
return sm.GetExpectedSealDurationFunc()
|
|
}
|
|
|
|
func (sm *StorageMinerAPI) SectorsUpdate(ctx context.Context, id abi.SectorNumber, state api.SectorState) error {
|
|
return sm.Miner.ForceSectorState(ctx, id, sealing.SectorState(state))
|
|
}
|
|
|
|
func (sm *StorageMinerAPI) SectorRemove(ctx context.Context, id abi.SectorNumber) error {
|
|
return sm.Miner.RemoveSector(ctx, id)
|
|
}
|
|
|
|
func (sm *StorageMinerAPI) SectorTerminate(ctx context.Context, id abi.SectorNumber) error {
|
|
return sm.Miner.TerminateSector(ctx, id)
|
|
}
|
|
|
|
func (sm *StorageMinerAPI) SectorTerminateFlush(ctx context.Context) (*cid.Cid, error) {
|
|
return sm.Miner.TerminateFlush(ctx)
|
|
}
|
|
|
|
func (sm *StorageMinerAPI) SectorTerminatePending(ctx context.Context) ([]abi.SectorID, error) {
|
|
return sm.Miner.TerminatePending(ctx)
|
|
}
|
|
|
|
func (sm *StorageMinerAPI) SectorPreCommitFlush(ctx context.Context) ([]sealiface.PreCommitBatchRes, error) {
|
|
return sm.Miner.SectorPreCommitFlush(ctx)
|
|
}
|
|
|
|
func (sm *StorageMinerAPI) SectorPreCommitPending(ctx context.Context) ([]abi.SectorID, error) {
|
|
return sm.Miner.SectorPreCommitPending(ctx)
|
|
}
|
|
|
|
func (sm *StorageMinerAPI) SectorMarkForUpgrade(ctx context.Context, id abi.SectorNumber) error {
|
|
return sm.Miner.MarkForUpgrade(id)
|
|
}
|
|
|
|
func (sm *StorageMinerAPI) SectorCommitFlush(ctx context.Context) ([]sealiface.CommitBatchRes, error) {
|
|
return sm.Miner.CommitFlush(ctx)
|
|
}
|
|
|
|
func (sm *StorageMinerAPI) SectorCommitPending(ctx context.Context) ([]abi.SectorID, error) {
|
|
return sm.Miner.CommitPending(ctx)
|
|
}
|
|
|
|
func (sm *StorageMinerAPI) WorkerConnect(ctx context.Context, url string) error {
|
|
w, err := connectRemoteWorker(ctx, sm, url)
|
|
if err != nil {
|
|
return xerrors.Errorf("connecting remote storage failed: %w", err)
|
|
}
|
|
|
|
log.Infof("Connected to a remote worker at %s", url)
|
|
|
|
return sm.StorageMgr.AddWorker(ctx, w)
|
|
}
|
|
|
|
func (sm *StorageMinerAPI) SealingSchedDiag(ctx context.Context, doSched bool) (interface{}, error) {
|
|
return sm.StorageMgr.SchedDiag(ctx, doSched)
|
|
}
|
|
|
|
func (sm *StorageMinerAPI) SealingAbort(ctx context.Context, call storiface.CallID) error {
|
|
return sm.StorageMgr.Abort(ctx, call)
|
|
}
|
|
|
|
func (sm *StorageMinerAPI) MarketImportDealData(ctx context.Context, propCid cid.Cid, path string) error {
|
|
fi, err := os.Open(path)
|
|
if err != nil {
|
|
return xerrors.Errorf("failed to open file: %w", err)
|
|
}
|
|
defer fi.Close() //nolint:errcheck
|
|
|
|
return sm.StorageProvider.ImportDataForDeal(ctx, propCid, fi)
|
|
}
|
|
|
|
func (sm *StorageMinerAPI) listDeals(ctx context.Context) ([]api.MarketDeal, error) {
|
|
ts, err := sm.Full.ChainHead(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
tsk := ts.Key()
|
|
allDeals, err := sm.Full.StateMarketDeals(ctx, tsk)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var out []api.MarketDeal
|
|
|
|
for _, deal := range allDeals {
|
|
if deal.Proposal.Provider == sm.Miner.Address() {
|
|
out = append(out, deal)
|
|
}
|
|
}
|
|
|
|
return out, nil
|
|
}
|
|
|
|
func (sm *StorageMinerAPI) MarketListDeals(ctx context.Context) ([]api.MarketDeal, error) {
|
|
return sm.listDeals(ctx)
|
|
}
|
|
|
|
func (sm *StorageMinerAPI) MarketListRetrievalDeals(ctx context.Context) ([]retrievalmarket.ProviderDealState, error) {
|
|
var out []retrievalmarket.ProviderDealState
|
|
deals := sm.RetrievalProvider.ListDeals()
|
|
|
|
for _, deal := range deals {
|
|
if deal.ChannelID != nil {
|
|
if deal.ChannelID.Initiator == "" || deal.ChannelID.Responder == "" {
|
|
deal.ChannelID = nil // don't try to push unparsable peer IDs over jsonrpc
|
|
}
|
|
}
|
|
out = append(out, deal)
|
|
}
|
|
|
|
return out, nil
|
|
}
|
|
|
|
func (sm *StorageMinerAPI) MarketGetDealUpdates(ctx context.Context) (<-chan storagemarket.MinerDeal, error) {
|
|
results := make(chan storagemarket.MinerDeal)
|
|
unsub := sm.StorageProvider.SubscribeToEvents(func(evt storagemarket.ProviderEvent, deal storagemarket.MinerDeal) {
|
|
select {
|
|
case results <- deal:
|
|
case <-ctx.Done():
|
|
}
|
|
})
|
|
go func() {
|
|
<-ctx.Done()
|
|
unsub()
|
|
close(results)
|
|
}()
|
|
return results, nil
|
|
}
|
|
|
|
func (sm *StorageMinerAPI) MarketListIncompleteDeals(ctx context.Context) ([]storagemarket.MinerDeal, error) {
|
|
return sm.StorageProvider.ListLocalDeals()
|
|
}
|
|
|
|
func (sm *StorageMinerAPI) MarketSetAsk(ctx context.Context, price types.BigInt, verifiedPrice types.BigInt, duration abi.ChainEpoch, minPieceSize abi.PaddedPieceSize, maxPieceSize abi.PaddedPieceSize) error {
|
|
options := []storagemarket.StorageAskOption{
|
|
storagemarket.MinPieceSize(minPieceSize),
|
|
storagemarket.MaxPieceSize(maxPieceSize),
|
|
}
|
|
|
|
return sm.StorageProvider.SetAsk(price, verifiedPrice, duration, options...)
|
|
}
|
|
|
|
func (sm *StorageMinerAPI) MarketGetAsk(ctx context.Context) (*storagemarket.SignedStorageAsk, error) {
|
|
return sm.StorageProvider.GetAsk(), nil
|
|
}
|
|
|
|
func (sm *StorageMinerAPI) MarketSetRetrievalAsk(ctx context.Context, rask *retrievalmarket.Ask) error {
|
|
sm.RetrievalProvider.SetAsk(rask)
|
|
return nil
|
|
}
|
|
|
|
func (sm *StorageMinerAPI) MarketGetRetrievalAsk(ctx context.Context) (*retrievalmarket.Ask, error) {
|
|
return sm.RetrievalProvider.GetAsk(), nil
|
|
}
|
|
|
|
func (sm *StorageMinerAPI) MarketListDataTransfers(ctx context.Context) ([]api.DataTransferChannel, error) {
|
|
inProgressChannels, err := sm.DataTransfer.InProgressChannels(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
apiChannels := make([]api.DataTransferChannel, 0, len(inProgressChannels))
|
|
for _, channelState := range inProgressChannels {
|
|
apiChannels = append(apiChannels, api.NewDataTransferChannel(sm.Host.ID(), channelState))
|
|
}
|
|
|
|
return apiChannels, nil
|
|
}
|
|
|
|
func (sm *StorageMinerAPI) MarketRestartDataTransfer(ctx context.Context, transferID datatransfer.TransferID, otherPeer peer.ID, isInitiator bool) error {
|
|
selfPeer := sm.Host.ID()
|
|
if isInitiator {
|
|
return sm.DataTransfer.RestartDataTransferChannel(ctx, datatransfer.ChannelID{Initiator: selfPeer, Responder: otherPeer, ID: transferID})
|
|
}
|
|
return sm.DataTransfer.RestartDataTransferChannel(ctx, datatransfer.ChannelID{Initiator: otherPeer, Responder: selfPeer, ID: transferID})
|
|
}
|
|
|
|
func (sm *StorageMinerAPI) MarketCancelDataTransfer(ctx context.Context, transferID datatransfer.TransferID, otherPeer peer.ID, isInitiator bool) error {
|
|
selfPeer := sm.Host.ID()
|
|
if isInitiator {
|
|
return sm.DataTransfer.CloseDataTransferChannel(ctx, datatransfer.ChannelID{Initiator: selfPeer, Responder: otherPeer, ID: transferID})
|
|
}
|
|
return sm.DataTransfer.CloseDataTransferChannel(ctx, datatransfer.ChannelID{Initiator: otherPeer, Responder: selfPeer, ID: transferID})
|
|
}
|
|
|
|
func (sm *StorageMinerAPI) MarketDataTransferUpdates(ctx context.Context) (<-chan api.DataTransferChannel, error) {
|
|
channels := make(chan api.DataTransferChannel)
|
|
|
|
unsub := sm.DataTransfer.SubscribeToEvents(func(evt datatransfer.Event, channelState datatransfer.ChannelState) {
|
|
channel := api.NewDataTransferChannel(sm.Host.ID(), channelState)
|
|
select {
|
|
case <-ctx.Done():
|
|
case channels <- channel:
|
|
}
|
|
})
|
|
|
|
go func() {
|
|
defer unsub()
|
|
<-ctx.Done()
|
|
}()
|
|
|
|
return channels, nil
|
|
}
|
|
|
|
func (sm *StorageMinerAPI) MarketPendingDeals(ctx context.Context) (api.PendingDealInfo, error) {
|
|
return sm.DealPublisher.PendingDeals(), nil
|
|
}
|
|
|
|
func (sm *StorageMinerAPI) MarketPublishPendingDeals(ctx context.Context) error {
|
|
sm.DealPublisher.ForcePublishPendingDeals()
|
|
return nil
|
|
}
|
|
|
|
func (sm *StorageMinerAPI) DagstoreListShards(ctx context.Context) ([]api.DagstoreShardInfo, error) {
|
|
if sm.DAGStore == nil {
|
|
return nil, fmt.Errorf("dagstore not available on this node")
|
|
}
|
|
|
|
info := sm.DAGStore.AllShardsInfo()
|
|
ret := make([]api.DagstoreShardInfo, 0, len(info))
|
|
for k, i := range info {
|
|
ret = append(ret, api.DagstoreShardInfo{
|
|
Key: k.String(),
|
|
State: i.ShardState.String(),
|
|
Error: func() string {
|
|
if i.Error == nil {
|
|
return ""
|
|
}
|
|
return i.Error.Error()
|
|
}(),
|
|
})
|
|
}
|
|
|
|
// order by key.
|
|
sort.SliceStable(ret, func(i, j int) bool {
|
|
return ret[i].Key < ret[j].Key
|
|
})
|
|
|
|
return ret, nil
|
|
}
|
|
|
|
func (sm *StorageMinerAPI) DagstoreInitializeShard(ctx context.Context, key string) error {
|
|
if sm.DAGStore == nil {
|
|
return fmt.Errorf("dagstore not available on this node")
|
|
}
|
|
|
|
k := shard.KeyFromString(key)
|
|
|
|
info, err := sm.DAGStore.GetShardInfo(k)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to get shard info: %w", err)
|
|
}
|
|
if st := info.ShardState; st != dagstore.ShardStateNew {
|
|
return fmt.Errorf("cannot initialize shard; expected state ShardStateNew, was: %s", st.String())
|
|
}
|
|
|
|
ch := make(chan dagstore.ShardResult, 1)
|
|
if err = sm.DAGStore.AcquireShard(ctx, k, ch, dagstore.AcquireOpts{}); err != nil {
|
|
return fmt.Errorf("failed to acquire shard: %w", err)
|
|
}
|
|
|
|
var res dagstore.ShardResult
|
|
select {
|
|
case res = <-ch:
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
}
|
|
|
|
if err := res.Error; err != nil {
|
|
return fmt.Errorf("failed to acquire shard: %w", err)
|
|
}
|
|
|
|
if res.Accessor != nil {
|
|
err = res.Accessor.Close()
|
|
if err != nil {
|
|
log.Warnw("failed to close shard accessor; continuing", "shard_key", k, "error", err)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (sm *StorageMinerAPI) DagstoreInitializeAll(ctx context.Context, params api.DagstoreInitializeAllParams) (<-chan api.DagstoreInitializeAllEvent, error) {
|
|
if sm.DAGStore == nil {
|
|
return nil, fmt.Errorf("dagstore not available on this node")
|
|
}
|
|
|
|
if sm.SectorAccessor == nil {
|
|
return nil, fmt.Errorf("sector accessor not available on this node")
|
|
}
|
|
|
|
// prepare the thottler tokens.
|
|
var throttle chan struct{}
|
|
if c := params.MaxConcurrency; c > 0 {
|
|
throttle = make(chan struct{}, c)
|
|
for i := 0; i < c; i++ {
|
|
throttle <- struct{}{}
|
|
}
|
|
}
|
|
|
|
// are we initializing only unsealed pieces?
|
|
onlyUnsealed := !params.IncludeSealed
|
|
|
|
info := sm.DAGStore.AllShardsInfo()
|
|
var toInitialize []string
|
|
for k, i := range info {
|
|
if i.ShardState != dagstore.ShardStateNew {
|
|
continue
|
|
}
|
|
|
|
// if we're initializing only unsealed pieces, check if there's an
|
|
// unsealed deal for this piece available.
|
|
if onlyUnsealed {
|
|
pieceCid, err := cid.Decode(k.String())
|
|
if err != nil {
|
|
log.Warnw("DagstoreInitializeAll: failed to decode shard key as piece CID; skipping", "shard_key", k.String(), "error", err)
|
|
continue
|
|
}
|
|
|
|
pi, err := sm.PieceStore.GetPieceInfo(pieceCid)
|
|
if err != nil {
|
|
log.Warnw("DagstoreInitializeAll: failed to get piece info; skipping", "piece_cid", pieceCid, "error", err)
|
|
continue
|
|
}
|
|
|
|
var isUnsealed bool
|
|
for _, d := range pi.Deals {
|
|
isUnsealed, err = sm.SectorAccessor.IsUnsealed(ctx, d.SectorID, d.Offset.Unpadded(), d.Length.Unpadded())
|
|
if err != nil {
|
|
log.Warnw("DagstoreInitializeAll: failed to get unsealed status; skipping deal", "deal_id", d.DealID, "error", err)
|
|
continue
|
|
}
|
|
if isUnsealed {
|
|
break
|
|
}
|
|
}
|
|
|
|
if !isUnsealed {
|
|
log.Infow("DagstoreInitializeAll: skipping piece because it's sealed", "piece_cid", pieceCid, "error", err)
|
|
continue
|
|
}
|
|
}
|
|
|
|
// yes, we're initializing this shard.
|
|
toInitialize = append(toInitialize, k.String())
|
|
}
|
|
|
|
total := len(toInitialize)
|
|
if total == 0 {
|
|
out := make(chan api.DagstoreInitializeAllEvent)
|
|
close(out)
|
|
return out, nil
|
|
}
|
|
|
|
// response channel must be closed when we're done, or the context is cancelled.
|
|
// this buffering is necessary to prevent inflight children goroutines from
|
|
// publishing to a closed channel (res) when the context is cancelled.
|
|
out := make(chan api.DagstoreInitializeAllEvent, 32) // internal buffer.
|
|
res := make(chan api.DagstoreInitializeAllEvent, 32) // returned to caller.
|
|
|
|
// pump events back to caller.
|
|
// two events per shard.
|
|
go func() {
|
|
defer close(res)
|
|
|
|
for i := 0; i < total*2; i++ {
|
|
select {
|
|
case res <- <-out:
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
|
|
go func() {
|
|
for i, k := range toInitialize {
|
|
select {
|
|
case <-throttle:
|
|
// acquired a throttle token, proceed.
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
|
|
go func(k string, i int) {
|
|
r := api.DagstoreInitializeAllEvent{
|
|
Key: k,
|
|
Event: "start",
|
|
Total: total,
|
|
Current: i + 1, // start with 1
|
|
}
|
|
select {
|
|
case out <- r:
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
|
|
err := sm.DagstoreInitializeShard(ctx, k)
|
|
throttle <- struct{}{}
|
|
|
|
r.Event = "end"
|
|
if err == nil {
|
|
r.Success = true
|
|
} else {
|
|
r.Success = false
|
|
r.Error = err.Error()
|
|
}
|
|
|
|
select {
|
|
case out <- r:
|
|
case <-ctx.Done():
|
|
}
|
|
}(k, i)
|
|
}
|
|
}()
|
|
|
|
return res, nil
|
|
|
|
}
|
|
|
|
func (sm *StorageMinerAPI) DagstoreRecoverShard(ctx context.Context, key string) error {
|
|
if sm.DAGStore == nil {
|
|
return fmt.Errorf("dagstore not available on this node")
|
|
}
|
|
|
|
k := shard.KeyFromString(key)
|
|
|
|
info, err := sm.DAGStore.GetShardInfo(k)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to get shard info: %w", err)
|
|
}
|
|
if st := info.ShardState; st != dagstore.ShardStateErrored {
|
|
return fmt.Errorf("cannot recover shard; expected state ShardStateErrored, was: %s", st.String())
|
|
}
|
|
|
|
ch := make(chan dagstore.ShardResult, 1)
|
|
if err = sm.DAGStore.RecoverShard(ctx, k, ch, dagstore.RecoverOpts{}); err != nil {
|
|
return fmt.Errorf("failed to recover shard: %w", err)
|
|
}
|
|
|
|
var res dagstore.ShardResult
|
|
select {
|
|
case res = <-ch:
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
}
|
|
|
|
return res.Error
|
|
}
|
|
|
|
func (sm *StorageMinerAPI) DagstoreGC(ctx context.Context) ([]api.DagstoreShardResult, error) {
|
|
if sm.DAGStore == nil {
|
|
return nil, fmt.Errorf("dagstore not available on this node")
|
|
}
|
|
|
|
res, err := sm.DAGStore.GC(ctx)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to gc: %w", err)
|
|
}
|
|
|
|
ret := make([]api.DagstoreShardResult, 0, len(res.Shards))
|
|
for k, err := range res.Shards {
|
|
r := api.DagstoreShardResult{Key: k.String()}
|
|
if err == nil {
|
|
r.Success = true
|
|
} else {
|
|
r.Success = false
|
|
r.Error = err.Error()
|
|
}
|
|
ret = append(ret, r)
|
|
}
|
|
|
|
return ret, nil
|
|
}
|
|
|
|
func (sm *StorageMinerAPI) DealsList(ctx context.Context) ([]api.MarketDeal, error) {
|
|
return sm.listDeals(ctx)
|
|
}
|
|
|
|
func (sm *StorageMinerAPI) RetrievalDealsList(ctx context.Context) (map[retrievalmarket.ProviderDealIdentifier]retrievalmarket.ProviderDealState, error) {
|
|
return sm.RetrievalProvider.ListDeals(), nil
|
|
}
|
|
|
|
func (sm *StorageMinerAPI) DealsConsiderOnlineStorageDeals(ctx context.Context) (bool, error) {
|
|
return sm.ConsiderOnlineStorageDealsConfigFunc()
|
|
}
|
|
|
|
func (sm *StorageMinerAPI) DealsSetConsiderOnlineStorageDeals(ctx context.Context, b bool) error {
|
|
return sm.SetConsiderOnlineStorageDealsConfigFunc(b)
|
|
}
|
|
|
|
func (sm *StorageMinerAPI) DealsConsiderOnlineRetrievalDeals(ctx context.Context) (bool, error) {
|
|
return sm.ConsiderOnlineRetrievalDealsConfigFunc()
|
|
}
|
|
|
|
func (sm *StorageMinerAPI) DealsSetConsiderOnlineRetrievalDeals(ctx context.Context, b bool) error {
|
|
return sm.SetConsiderOnlineRetrievalDealsConfigFunc(b)
|
|
}
|
|
|
|
func (sm *StorageMinerAPI) DealsConsiderOfflineStorageDeals(ctx context.Context) (bool, error) {
|
|
return sm.ConsiderOfflineStorageDealsConfigFunc()
|
|
}
|
|
|
|
func (sm *StorageMinerAPI) DealsSetConsiderOfflineStorageDeals(ctx context.Context, b bool) error {
|
|
return sm.SetConsiderOfflineStorageDealsConfigFunc(b)
|
|
}
|
|
|
|
func (sm *StorageMinerAPI) DealsConsiderOfflineRetrievalDeals(ctx context.Context) (bool, error) {
|
|
return sm.ConsiderOfflineRetrievalDealsConfigFunc()
|
|
}
|
|
|
|
func (sm *StorageMinerAPI) DealsSetConsiderOfflineRetrievalDeals(ctx context.Context, b bool) error {
|
|
return sm.SetConsiderOfflineRetrievalDealsConfigFunc(b)
|
|
}
|
|
|
|
func (sm *StorageMinerAPI) DealsConsiderVerifiedStorageDeals(ctx context.Context) (bool, error) {
|
|
return sm.ConsiderVerifiedStorageDealsConfigFunc()
|
|
}
|
|
|
|
func (sm *StorageMinerAPI) DealsSetConsiderVerifiedStorageDeals(ctx context.Context, b bool) error {
|
|
return sm.SetConsiderVerifiedStorageDealsConfigFunc(b)
|
|
}
|
|
|
|
func (sm *StorageMinerAPI) DealsConsiderUnverifiedStorageDeals(ctx context.Context) (bool, error) {
|
|
return sm.ConsiderUnverifiedStorageDealsConfigFunc()
|
|
}
|
|
|
|
func (sm *StorageMinerAPI) DealsSetConsiderUnverifiedStorageDeals(ctx context.Context, b bool) error {
|
|
return sm.SetConsiderUnverifiedStorageDealsConfigFunc(b)
|
|
}
|
|
|
|
func (sm *StorageMinerAPI) DealsGetExpectedSealDurationFunc(ctx context.Context) (time.Duration, error) {
|
|
return sm.GetExpectedSealDurationFunc()
|
|
}
|
|
|
|
func (sm *StorageMinerAPI) DealsSetExpectedSealDurationFunc(ctx context.Context, d time.Duration) error {
|
|
return sm.SetExpectedSealDurationFunc(d)
|
|
}
|
|
|
|
func (sm *StorageMinerAPI) DealsImportData(ctx context.Context, deal cid.Cid, fname string) error {
|
|
fi, err := os.Open(fname)
|
|
if err != nil {
|
|
return xerrors.Errorf("failed to open given file: %w", err)
|
|
}
|
|
defer fi.Close() //nolint:errcheck
|
|
|
|
return sm.StorageProvider.ImportDataForDeal(ctx, deal, fi)
|
|
}
|
|
|
|
func (sm *StorageMinerAPI) DealsPieceCidBlocklist(ctx context.Context) ([]cid.Cid, error) {
|
|
return sm.StorageDealPieceCidBlocklistConfigFunc()
|
|
}
|
|
|
|
func (sm *StorageMinerAPI) DealsSetPieceCidBlocklist(ctx context.Context, cids []cid.Cid) error {
|
|
return sm.SetStorageDealPieceCidBlocklistConfigFunc(cids)
|
|
}
|
|
|
|
func (sm *StorageMinerAPI) StorageAddLocal(ctx context.Context, path string) error {
|
|
if sm.StorageMgr == nil {
|
|
return xerrors.Errorf("no storage manager")
|
|
}
|
|
|
|
return sm.StorageMgr.AddLocalStorage(ctx, path)
|
|
}
|
|
|
|
func (sm *StorageMinerAPI) PiecesListPieces(ctx context.Context) ([]cid.Cid, error) {
|
|
return sm.PieceStore.ListPieceInfoKeys()
|
|
}
|
|
|
|
func (sm *StorageMinerAPI) PiecesListCidInfos(ctx context.Context) ([]cid.Cid, error) {
|
|
return sm.PieceStore.ListCidInfoKeys()
|
|
}
|
|
|
|
func (sm *StorageMinerAPI) PiecesGetPieceInfo(ctx context.Context, pieceCid cid.Cid) (*piecestore.PieceInfo, error) {
|
|
pi, err := sm.PieceStore.GetPieceInfo(pieceCid)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &pi, nil
|
|
}
|
|
|
|
func (sm *StorageMinerAPI) PiecesGetCIDInfo(ctx context.Context, payloadCid cid.Cid) (*piecestore.CIDInfo, error) {
|
|
ci, err := sm.PieceStore.GetCIDInfo(payloadCid)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &ci, nil
|
|
}
|
|
|
|
func (sm *StorageMinerAPI) CreateBackup(ctx context.Context, fpath string) error {
|
|
return backup(sm.DS, fpath)
|
|
}
|
|
|
|
func (sm *StorageMinerAPI) CheckProvable(ctx context.Context, pp abi.RegisteredPoStProof, sectors []sto.SectorRef, expensive bool) (map[abi.SectorNumber]string, error) {
|
|
var rg storiface.RGetter
|
|
if expensive {
|
|
rg = func(ctx context.Context, id abi.SectorID) (cid.Cid, error) {
|
|
si, err := sm.Miner.SectorsStatus(ctx, id.Number, false)
|
|
if err != nil {
|
|
return cid.Undef, err
|
|
}
|
|
if si.CommR == nil {
|
|
return cid.Undef, xerrors.Errorf("commr is nil")
|
|
}
|
|
|
|
return *si.CommR, nil
|
|
}
|
|
}
|
|
|
|
bad, err := sm.StorageMgr.CheckProvable(ctx, pp, sectors, rg)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var out = make(map[abi.SectorNumber]string)
|
|
for sid, err := range bad {
|
|
out[sid.Number] = err
|
|
}
|
|
|
|
return out, nil
|
|
}
|
|
|
|
func (sm *StorageMinerAPI) ActorAddressConfig(ctx context.Context) (api.AddressConfig, error) {
|
|
return sm.AddrSel.AddressConfig, nil
|
|
}
|
|
|
|
func (sm *StorageMinerAPI) Discover(ctx context.Context) (apitypes.OpenRPCDocument, error) {
|
|
return build.OpenRPCDiscoverJSON_Miner(), nil
|
|
}
|
|
|
|
func (sm *StorageMinerAPI) ComputeProof(ctx context.Context, ssi []builtin.SectorInfo, rand abi.PoStRandomness) ([]builtin.PoStProof, error) {
|
|
return sm.Epp.ComputeProof(ctx, ssi, rand)
|
|
}
|
|
|
|
func (sm *StorageMinerAPI) RuntimeSubsystems(context.Context) (res api.MinerSubsystems, err error) {
|
|
return sm.EnabledSubsystems, nil
|
|
}
|