lotus/node/impl/storminer.go

1433 lines
45 KiB
Go
Raw Normal View History

package impl
2019-07-24 00:58:31 +00:00
import (
2019-07-27 01:54:03 +00:00
"context"
"encoding/json"
"errors"
integrate DAG store and CARv2 in deal-making (#6671) This commit removes badger from the deal-making processes, and moves to a new architecture with the dagstore as the cental component on the miner-side, and CARv2s on the client-side. Every deal that has been handed off to the sealing subsystem becomes a shard in the dagstore. Shards are mounted via the LotusMount, which teaches the dagstore how to load the related piece when serving retrievals. When the miner starts the Lotus for the first time with this patch, we will perform a one-time migration of all active deals into the dagstore. This is a lightweight process, and it consists simply of registering the shards in the dagstore. Shards are backed by the unsealed copy of the piece. This is currently a CARv1. However, the dagstore keeps CARv2 indices for all pieces, so when it's time to acquire a shard to serve a retrieval, the unsealed CARv1 is joined with its index (safeguarded by the dagstore), to form a read-only blockstore, thus taking the place of the monolithic badger. Data transfers have been adjusted to interface directly with CARv2 files. On inbound transfers (client retrievals, miner storage deals), we stream the received data into a CARv2 ReadWrite blockstore. On outbound transfers (client storage deals, miner retrievals), we serve the data off a CARv2 ReadOnly blockstore. Client-side imports are managed by the refactored *imports.Manager component (when not using IPFS integration). Just like it before, we use the go-filestore library to avoid duplicating the data from the original file in the resulting UnixFS DAG (concretely the leaves). However, the target of those imports are what we call "ref-CARv2s": CARv2 files placed under the `$LOTUS_PATH/imports` directory, containing the intermediate nodes in full, and the leaves as positional references to the original file on disk. Client-side retrievals are placed into CARv2 files in the location: `$LOTUS_PATH/retrievals`. A new set of `Dagstore*` JSON-RPC operations and `lotus-miner dagstore` subcommands have been introduced on the miner-side to inspect and manage the dagstore. Despite moving to a CARv2-backed system, the IPFS integration has been respected, and it continues to be possible to make storage deals with data held in an IPFS node, and to perform retrievals directly into an IPFS node. NOTE: because the "staging" and "client" Badger blockstores are no longer used, existing imports on the client will be rendered useless. On startup, Lotus will enumerate all imports and print WARN statements on the log for each import that needs to be reimported. These log lines contain these messages: - import lacks carv2 path; import will not work; please reimport - import has missing/broken carv2; please reimport At the end, we will print a "sanity check completed" message indicating the count of imports found, and how many were deemed broken. Co-authored-by: Aarsh Shah <aarshkshah1992@gmail.com> Co-authored-by: Dirk McCormick <dirkmdev@gmail.com> Co-authored-by: Raúl Kripalani <raul@protocol.ai> Co-authored-by: Dirk McCormick <dirkmdev@gmail.com>
2021-08-16 22:34:32 +00:00
"fmt"
"net/http"
"os"
integrate DAG store and CARv2 in deal-making (#6671) This commit removes badger from the deal-making processes, and moves to a new architecture with the dagstore as the cental component on the miner-side, and CARv2s on the client-side. Every deal that has been handed off to the sealing subsystem becomes a shard in the dagstore. Shards are mounted via the LotusMount, which teaches the dagstore how to load the related piece when serving retrievals. When the miner starts the Lotus for the first time with this patch, we will perform a one-time migration of all active deals into the dagstore. This is a lightweight process, and it consists simply of registering the shards in the dagstore. Shards are backed by the unsealed copy of the piece. This is currently a CARv1. However, the dagstore keeps CARv2 indices for all pieces, so when it's time to acquire a shard to serve a retrieval, the unsealed CARv1 is joined with its index (safeguarded by the dagstore), to form a read-only blockstore, thus taking the place of the monolithic badger. Data transfers have been adjusted to interface directly with CARv2 files. On inbound transfers (client retrievals, miner storage deals), we stream the received data into a CARv2 ReadWrite blockstore. On outbound transfers (client storage deals, miner retrievals), we serve the data off a CARv2 ReadOnly blockstore. Client-side imports are managed by the refactored *imports.Manager component (when not using IPFS integration). Just like it before, we use the go-filestore library to avoid duplicating the data from the original file in the resulting UnixFS DAG (concretely the leaves). However, the target of those imports are what we call "ref-CARv2s": CARv2 files placed under the `$LOTUS_PATH/imports` directory, containing the intermediate nodes in full, and the leaves as positional references to the original file on disk. Client-side retrievals are placed into CARv2 files in the location: `$LOTUS_PATH/retrievals`. A new set of `Dagstore*` JSON-RPC operations and `lotus-miner dagstore` subcommands have been introduced on the miner-side to inspect and manage the dagstore. Despite moving to a CARv2-backed system, the IPFS integration has been respected, and it continues to be possible to make storage deals with data held in an IPFS node, and to perform retrievals directly into an IPFS node. NOTE: because the "staging" and "client" Badger blockstores are no longer used, existing imports on the client will be rendered useless. On startup, Lotus will enumerate all imports and print WARN statements on the log for each import that needs to be reimported. These log lines contain these messages: - import lacks carv2 path; import will not work; please reimport - import has missing/broken carv2; please reimport At the end, we will print a "sanity check completed" message indicating the count of imports found, and how many were deemed broken. Co-authored-by: Aarsh Shah <aarshkshah1992@gmail.com> Co-authored-by: Dirk McCormick <dirkmdev@gmail.com> Co-authored-by: Raúl Kripalani <raul@protocol.ai> Co-authored-by: Dirk McCormick <dirkmdev@gmail.com>
2021-08-16 22:34:32 +00:00
"sort"
"strconv"
"time"
2020-02-27 21:45:31 +00:00
"github.com/google/uuid"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-graphsync"
gsimpl "github.com/ipfs/go-graphsync/impl"
"github.com/ipfs/go-graphsync/peerstate"
2022-08-25 18:20:41 +00:00
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"go.uber.org/fx"
"golang.org/x/xerrors"
2022-06-14 15:00:51 +00:00
"github.com/filecoin-project/dagstore"
"github.com/filecoin-project/dagstore/shard"
"github.com/filecoin-project/go-address"
2022-08-18 14:37:22 +00:00
"github.com/filecoin-project/go-bitfield"
datatransfer "github.com/filecoin-project/go-data-transfer/v2"
gst "github.com/filecoin-project/go-data-transfer/v2/transport/graphsync"
"github.com/filecoin-project/go-fil-markets/piecestore"
2021-07-28 18:51:45 +00:00
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
"github.com/filecoin-project/go-fil-markets/storagemarket"
2022-05-12 14:16:53 +00:00
filmktsstore "github.com/filecoin-project/go-fil-markets/stores"
2022-06-14 15:00:51 +00:00
"github.com/filecoin-project/go-jsonrpc/auth"
2020-09-07 03:49:10 +00:00
"github.com/filecoin-project/go-state-types/abi"
2022-07-29 14:39:49 +00:00
"github.com/filecoin-project/go-state-types/big"
builtintypes "github.com/filecoin-project/go-state-types/builtin"
2022-09-06 15:49:29 +00:00
minertypes "github.com/filecoin-project/go-state-types/builtin/v9/miner"
"github.com/filecoin-project/go-state-types/network"
2022-06-14 15:00:51 +00:00
"github.com/filecoin-project/lotus/api"
apitypes "github.com/filecoin-project/lotus/api/types"
"github.com/filecoin-project/lotus/build"
2022-07-29 14:39:49 +00:00
"github.com/filecoin-project/lotus/chain/actors"
2022-06-14 15:00:51 +00:00
"github.com/filecoin-project/lotus/chain/actors/builtin"
"github.com/filecoin-project/lotus/chain/gen"
"github.com/filecoin-project/lotus/chain/types"
2023-07-12 16:02:23 +00:00
"github.com/filecoin-project/lotus/lib/sturdy/clusterdb"
2022-06-14 15:00:51 +00:00
mktsdagstore "github.com/filecoin-project/lotus/markets/dagstore"
2021-02-05 17:58:55 +00:00
"github.com/filecoin-project/lotus/markets/storageadapter"
2019-11-25 04:45:13 +00:00
"github.com/filecoin-project/lotus/miner"
"github.com/filecoin-project/lotus/node/modules"
"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/storage/ctladdr"
"github.com/filecoin-project/lotus/storage/paths"
sealing "github.com/filecoin-project/lotus/storage/pipeline"
"github.com/filecoin-project/lotus/storage/pipeline/sealiface"
"github.com/filecoin-project/lotus/storage/sealer"
"github.com/filecoin-project/lotus/storage/sealer/fsutil"
"github.com/filecoin-project/lotus/storage/sealer/storiface"
"github.com/filecoin-project/lotus/storage/sectorblocks"
"github.com/filecoin-project/lotus/storage/wdpost"
2019-07-24 00:58:31 +00:00
)
type StorageMinerAPI struct {
fx.In
api.Common
api.Net
2019-07-27 00:45:27 +00:00
2021-07-28 18:51:45 +00:00
EnabledSubsystems api.MinerSubsystems
2021-05-20 11:01:14 +00:00
Full api.FullNode
LocalStore *paths.Local
RemoteStore *paths.Remote
2021-05-20 11:01:14 +00:00
// Markets
PieceStore dtypes.ProviderPieceStore `optional:"true"`
StorageProvider storagemarket.StorageProvider `optional:"true"`
RetrievalProvider retrievalmarket.RetrievalProvider `optional:"true"`
integrate DAG store and CARv2 in deal-making (#6671) This commit removes badger from the deal-making processes, and moves to a new architecture with the dagstore as the cental component on the miner-side, and CARv2s on the client-side. Every deal that has been handed off to the sealing subsystem becomes a shard in the dagstore. Shards are mounted via the LotusMount, which teaches the dagstore how to load the related piece when serving retrievals. When the miner starts the Lotus for the first time with this patch, we will perform a one-time migration of all active deals into the dagstore. This is a lightweight process, and it consists simply of registering the shards in the dagstore. Shards are backed by the unsealed copy of the piece. This is currently a CARv1. However, the dagstore keeps CARv2 indices for all pieces, so when it's time to acquire a shard to serve a retrieval, the unsealed CARv1 is joined with its index (safeguarded by the dagstore), to form a read-only blockstore, thus taking the place of the monolithic badger. Data transfers have been adjusted to interface directly with CARv2 files. On inbound transfers (client retrievals, miner storage deals), we stream the received data into a CARv2 ReadWrite blockstore. On outbound transfers (client storage deals, miner retrievals), we serve the data off a CARv2 ReadOnly blockstore. Client-side imports are managed by the refactored *imports.Manager component (when not using IPFS integration). Just like it before, we use the go-filestore library to avoid duplicating the data from the original file in the resulting UnixFS DAG (concretely the leaves). However, the target of those imports are what we call "ref-CARv2s": CARv2 files placed under the `$LOTUS_PATH/imports` directory, containing the intermediate nodes in full, and the leaves as positional references to the original file on disk. Client-side retrievals are placed into CARv2 files in the location: `$LOTUS_PATH/retrievals`. A new set of `Dagstore*` JSON-RPC operations and `lotus-miner dagstore` subcommands have been introduced on the miner-side to inspect and manage the dagstore. Despite moving to a CARv2-backed system, the IPFS integration has been respected, and it continues to be possible to make storage deals with data held in an IPFS node, and to perform retrievals directly into an IPFS node. NOTE: because the "staging" and "client" Badger blockstores are no longer used, existing imports on the client will be rendered useless. On startup, Lotus will enumerate all imports and print WARN statements on the log for each import that needs to be reimported. These log lines contain these messages: - import lacks carv2 path; import will not work; please reimport - import has missing/broken carv2; please reimport At the end, we will print a "sanity check completed" message indicating the count of imports found, and how many were deemed broken. Co-authored-by: Aarsh Shah <aarshkshah1992@gmail.com> Co-authored-by: Dirk McCormick <dirkmdev@gmail.com> Co-authored-by: Raúl Kripalani <raul@protocol.ai> Co-authored-by: Dirk McCormick <dirkmdev@gmail.com>
2021-08-16 22:34:32 +00:00
SectorAccessor retrievalmarket.SectorAccessor `optional:"true"`
2021-05-20 11:01:14 +00:00
DataTransfer dtypes.ProviderDataTransfer `optional:"true"`
StagingGraphsync dtypes.StagingGraphsync `optional:"true"`
Transport dtypes.ProviderTransport `optional:"true"`
2021-05-20 11:01:14 +00:00
DealPublisher *storageadapter.DealPublisher `optional:"true"`
SectorBlocks *sectorblocks.SectorBlocks `optional:"true"`
2021-07-02 10:24:07 +00:00
Host host.Host `optional:"true"`
integrate DAG store and CARv2 in deal-making (#6671) This commit removes badger from the deal-making processes, and moves to a new architecture with the dagstore as the cental component on the miner-side, and CARv2s on the client-side. Every deal that has been handed off to the sealing subsystem becomes a shard in the dagstore. Shards are mounted via the LotusMount, which teaches the dagstore how to load the related piece when serving retrievals. When the miner starts the Lotus for the first time with this patch, we will perform a one-time migration of all active deals into the dagstore. This is a lightweight process, and it consists simply of registering the shards in the dagstore. Shards are backed by the unsealed copy of the piece. This is currently a CARv1. However, the dagstore keeps CARv2 indices for all pieces, so when it's time to acquire a shard to serve a retrieval, the unsealed CARv1 is joined with its index (safeguarded by the dagstore), to form a read-only blockstore, thus taking the place of the monolithic badger. Data transfers have been adjusted to interface directly with CARv2 files. On inbound transfers (client retrievals, miner storage deals), we stream the received data into a CARv2 ReadWrite blockstore. On outbound transfers (client storage deals, miner retrievals), we serve the data off a CARv2 ReadOnly blockstore. Client-side imports are managed by the refactored *imports.Manager component (when not using IPFS integration). Just like it before, we use the go-filestore library to avoid duplicating the data from the original file in the resulting UnixFS DAG (concretely the leaves). However, the target of those imports are what we call "ref-CARv2s": CARv2 files placed under the `$LOTUS_PATH/imports` directory, containing the intermediate nodes in full, and the leaves as positional references to the original file on disk. Client-side retrievals are placed into CARv2 files in the location: `$LOTUS_PATH/retrievals`. A new set of `Dagstore*` JSON-RPC operations and `lotus-miner dagstore` subcommands have been introduced on the miner-side to inspect and manage the dagstore. Despite moving to a CARv2-backed system, the IPFS integration has been respected, and it continues to be possible to make storage deals with data held in an IPFS node, and to perform retrievals directly into an IPFS node. NOTE: because the "staging" and "client" Badger blockstores are no longer used, existing imports on the client will be rendered useless. On startup, Lotus will enumerate all imports and print WARN statements on the log for each import that needs to be reimported. These log lines contain these messages: - import lacks carv2 path; import will not work; please reimport - import has missing/broken carv2; please reimport At the end, we will print a "sanity check completed" message indicating the count of imports found, and how many were deemed broken. Co-authored-by: Aarsh Shah <aarshkshah1992@gmail.com> Co-authored-by: Dirk McCormick <dirkmdev@gmail.com> Co-authored-by: Raúl Kripalani <raul@protocol.ai> Co-authored-by: Dirk McCormick <dirkmdev@gmail.com>
2021-08-16 22:34:32 +00:00
DAGStore *dagstore.DAGStore `optional:"true"`
2022-05-12 14:16:53 +00:00
DAGStoreWrapper *mktsdagstore.Wrapper `optional:"true"`
2021-05-20 11:01:14 +00:00
// Miner / storage
Miner *sealing.Sealing `optional:"true"`
BlockMiner *miner.Miner `optional:"true"`
StorageMgr *sealer.Manager `optional:"true"`
IStorageMgr sealer.SectorManager `optional:"true"`
paths.SectorIndex
2021-05-20 11:01:14 +00:00
storiface.WorkerReturn `optional:"true"`
AddrSel *ctladdr.AddressSelector
2021-05-20 11:01:14 +00:00
WdPoSt *wdpost.WindowPoStScheduler `optional:"true"`
2021-05-20 11:01:14 +00:00
Epp gen.WinningPoStProver `optional:"true"`
2021-03-26 05:32:03 +00:00
DS dtypes.MetadataDS
2020-10-01 11:58:26 +00:00
// StorageService is populated when we're not the main storage node (e.g. we're a markets node)
StorageService modules.MinerStorageService `optional:"true"`
2021-05-20 11:01:14 +00:00
ConsiderOnlineStorageDealsConfigFunc dtypes.ConsiderOnlineStorageDealsConfigFunc `optional:"true"`
SetConsiderOnlineStorageDealsConfigFunc dtypes.SetConsiderOnlineStorageDealsConfigFunc `optional:"true"`
ConsiderOnlineRetrievalDealsConfigFunc dtypes.ConsiderOnlineRetrievalDealsConfigFunc `optional:"true"`
SetConsiderOnlineRetrievalDealsConfigFunc dtypes.SetConsiderOnlineRetrievalDealsConfigFunc `optional:"true"`
StorageDealPieceCidBlocklistConfigFunc dtypes.StorageDealPieceCidBlocklistConfigFunc `optional:"true"`
SetStorageDealPieceCidBlocklistConfigFunc dtypes.SetStorageDealPieceCidBlocklistConfigFunc `optional:"true"`
ConsiderOfflineStorageDealsConfigFunc dtypes.ConsiderOfflineStorageDealsConfigFunc `optional:"true"`
SetConsiderOfflineStorageDealsConfigFunc dtypes.SetConsiderOfflineStorageDealsConfigFunc `optional:"true"`
ConsiderOfflineRetrievalDealsConfigFunc dtypes.ConsiderOfflineRetrievalDealsConfigFunc `optional:"true"`
SetConsiderOfflineRetrievalDealsConfigFunc dtypes.SetConsiderOfflineRetrievalDealsConfigFunc `optional:"true"`
ConsiderVerifiedStorageDealsConfigFunc dtypes.ConsiderVerifiedStorageDealsConfigFunc `optional:"true"`
SetConsiderVerifiedStorageDealsConfigFunc dtypes.SetConsiderVerifiedStorageDealsConfigFunc `optional:"true"`
ConsiderUnverifiedStorageDealsConfigFunc dtypes.ConsiderUnverifiedStorageDealsConfigFunc `optional:"true"`
SetConsiderUnverifiedStorageDealsConfigFunc dtypes.SetConsiderUnverifiedStorageDealsConfigFunc `optional:"true"`
SetSealingConfigFunc dtypes.SetSealingConfigFunc `optional:"true"`
GetSealingConfigFunc dtypes.GetSealingConfigFunc `optional:"true"`
GetExpectedSealDurationFunc dtypes.GetExpectedSealDurationFunc `optional:"true"`
SetExpectedSealDurationFunc dtypes.SetExpectedSealDurationFunc `optional:"true"`
2023-07-12 16:02:23 +00:00
ClusterDB *clusterdb.DB
2019-07-24 00:58:31 +00:00
}
integrate DAG store and CARv2 in deal-making (#6671) This commit removes badger from the deal-making processes, and moves to a new architecture with the dagstore as the cental component on the miner-side, and CARv2s on the client-side. Every deal that has been handed off to the sealing subsystem becomes a shard in the dagstore. Shards are mounted via the LotusMount, which teaches the dagstore how to load the related piece when serving retrievals. When the miner starts the Lotus for the first time with this patch, we will perform a one-time migration of all active deals into the dagstore. This is a lightweight process, and it consists simply of registering the shards in the dagstore. Shards are backed by the unsealed copy of the piece. This is currently a CARv1. However, the dagstore keeps CARv2 indices for all pieces, so when it's time to acquire a shard to serve a retrieval, the unsealed CARv1 is joined with its index (safeguarded by the dagstore), to form a read-only blockstore, thus taking the place of the monolithic badger. Data transfers have been adjusted to interface directly with CARv2 files. On inbound transfers (client retrievals, miner storage deals), we stream the received data into a CARv2 ReadWrite blockstore. On outbound transfers (client storage deals, miner retrievals), we serve the data off a CARv2 ReadOnly blockstore. Client-side imports are managed by the refactored *imports.Manager component (when not using IPFS integration). Just like it before, we use the go-filestore library to avoid duplicating the data from the original file in the resulting UnixFS DAG (concretely the leaves). However, the target of those imports are what we call "ref-CARv2s": CARv2 files placed under the `$LOTUS_PATH/imports` directory, containing the intermediate nodes in full, and the leaves as positional references to the original file on disk. Client-side retrievals are placed into CARv2 files in the location: `$LOTUS_PATH/retrievals`. A new set of `Dagstore*` JSON-RPC operations and `lotus-miner dagstore` subcommands have been introduced on the miner-side to inspect and manage the dagstore. Despite moving to a CARv2-backed system, the IPFS integration has been respected, and it continues to be possible to make storage deals with data held in an IPFS node, and to perform retrievals directly into an IPFS node. NOTE: because the "staging" and "client" Badger blockstores are no longer used, existing imports on the client will be rendered useless. On startup, Lotus will enumerate all imports and print WARN statements on the log for each import that needs to be reimported. These log lines contain these messages: - import lacks carv2 path; import will not work; please reimport - import has missing/broken carv2; please reimport At the end, we will print a "sanity check completed" message indicating the count of imports found, and how many were deemed broken. Co-authored-by: Aarsh Shah <aarshkshah1992@gmail.com> Co-authored-by: Dirk McCormick <dirkmdev@gmail.com> Co-authored-by: Raúl Kripalani <raul@protocol.ai> Co-authored-by: Dirk McCormick <dirkmdev@gmail.com>
2021-08-16 22:34:32 +00:00
var _ api.StorageMiner = &StorageMinerAPI{}
func (sm *StorageMinerAPI) StorageAuthVerify(ctx context.Context, token string) ([]auth.Permission, error) {
if sm.StorageService != nil {
return sm.StorageService.AuthVerify(ctx, token)
}
return sm.AuthVerify(ctx, token)
}
func (sm *StorageMinerAPI) ServeRemote(perm bool) func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
if perm == true {
if !auth.HasPerm(r.Context(), nil, api.PermAdmin) {
w.WriteHeader(401)
_ = json.NewEncoder(w).Encode(struct{ Error string }{"unauthorized: missing write permission"})
return
}
}
2019-07-24 00:58:31 +00:00
sm.StorageMgr.ServeHTTP(w, r)
}
}
2020-03-05 19:21:06 +00:00
func (sm *StorageMinerAPI) WorkerStats(ctx context.Context) (map[uuid.UUID]storiface.WorkerStats, error) {
return sm.StorageMgr.WorkerStats(ctx), nil
2020-03-23 14:56:22 +00:00
}
2019-11-08 18:15:13 +00:00
func (sm *StorageMinerAPI) WorkerJobs(ctx context.Context) (map[uuid.UUID][]storiface.WorkerJob, error) {
2020-07-21 18:07:49 +00:00
return sm.StorageMgr.WorkerJobs(), nil
}
func (sm *StorageMinerAPI) ActorAddress(context.Context) (address.Address, error) {
2020-03-17 20:19:52 +00:00
return sm.Miner.Address(), nil
2019-08-10 01:54:45 +00:00
}
2020-04-23 21:12:42 +00:00
func (sm *StorageMinerAPI) MiningBase(ctx context.Context) (*types.TipSet, error) {
mb, err := sm.BlockMiner.GetBestMiningCandidate(ctx)
if err != nil {
return nil, err
}
return mb.TipSet, nil
}
func (sm *StorageMinerAPI) ActorSectorSize(ctx context.Context, addr address.Address) (abi.SectorSize, error) {
mi, err := sm.Full.StateMinerInfo(ctx, addr, types.EmptyTSK)
if err != nil {
return 0, err
}
return mi.SectorSize, nil
}
2021-02-16 18:16:35 +00:00
func (sm *StorageMinerAPI) PledgeSector(ctx context.Context) (abi.SectorID, error) {
sr, err := sm.Miner.PledgeSector(ctx)
if err != nil {
2021-02-16 18:16:35 +00:00
return abi.SectorID{}, err
}
return sm.waitSectorStarted(ctx, sr.ID)
}
func (sm *StorageMinerAPI) waitSectorStarted(ctx context.Context, si abi.SectorID) (abi.SectorID, error) {
// wait for the sector to enter the Packing state
// TODO: instead of polling implement some pubsub-type thing in storagefsm
for {
info, err := sm.Miner.SectorsStatus(ctx, si.Number, false)
if err != nil {
2021-02-16 18:16:35 +00:00
return abi.SectorID{}, xerrors.Errorf("getting pledged sector info: %w", err)
}
2021-05-20 11:01:14 +00:00
if info.State != api.SectorState(sealing.UndefinedSectorState) {
return si, nil
}
select {
case <-time.After(10 * time.Millisecond):
case <-ctx.Done():
2021-02-16 18:16:35 +00:00
return abi.SectorID{}, ctx.Err()
}
}
2019-07-27 01:54:03 +00:00
}
func (sm *StorageMinerAPI) SectorsStatus(ctx context.Context, sid abi.SectorNumber, showOnChainInfo bool) (api.SectorInfo, error) {
2021-05-20 11:01:14 +00:00
sInfo, err := sm.Miner.SectorsStatus(ctx, sid, false)
2019-11-08 18:15:13 +00:00
if err != nil {
return api.SectorInfo{}, err
}
if !showOnChainInfo {
return sInfo, nil
}
onChainInfo, err := sm.Full.StateSectorGetInfo(ctx, sm.Miner.Address(), sid, types.EmptyTSK)
if err != nil {
return sInfo, err
}
if onChainInfo == nil {
return sInfo, nil
}
sInfo.SealProof = onChainInfo.SealProof
sInfo.Activation = onChainInfo.Activation
sInfo.Expiration = onChainInfo.Expiration
sInfo.DealWeight = onChainInfo.DealWeight
sInfo.VerifiedDealWeight = onChainInfo.VerifiedDealWeight
sInfo.InitialPledge = onChainInfo.InitialPledge
ex, err := sm.Full.StateSectorExpiration(ctx, sm.Miner.Address(), sid, types.EmptyTSK)
if err != nil {
return sInfo, nil
}
sInfo.OnTime = ex.OnTime
sInfo.Early = ex.Early
return sInfo, nil
}
func (sm *StorageMinerAPI) SectorAddPieceToAny(ctx context.Context, size abi.UnpaddedPieceSize, r storiface.Data, d api.PieceDealInfo) (api.SectorOffset, error) {
so, err := sm.Miner.SectorAddPieceToAny(ctx, size, r, d)
if err != nil {
// jsonrpc doesn't support returning values with errors, make sure we never do that
return api.SectorOffset{}, err
}
return so, nil
}
func (sm *StorageMinerAPI) SectorsUnsealPiece(ctx context.Context, sector storiface.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, commd *cid.Cid) error {
return sm.StorageMgr.SectorsUnsealPiece(ctx, sector, offset, size, randomness, commd)
}
func (sm *StorageMinerAPI) SectorUnseal(ctx context.Context, sectorNum abi.SectorNumber) error {
status, err := sm.Miner.SectorsStatus(ctx, sectorNum, false)
if err != nil {
return err
}
minerAddr, err := sm.ActorAddress(ctx)
if err != nil {
return err
}
minerID, err := address.IDFromAddress(minerAddr)
if err != nil {
return err
}
sector := storiface.SectorRef{
ID: abi.SectorID{
Miner: abi.ActorID(minerID),
Number: sectorNum,
},
ProofType: status.SealProof,
}
return sm.StorageMgr.SectorsUnsealPiece(ctx, sector, storiface.UnpaddedByteIndex(0), abi.UnpaddedPieceSize(0), status.Ticket.Value, status.CommD)
}
// List all staged sectors
func (sm *StorageMinerAPI) SectorsList(context.Context) ([]abi.SectorNumber, error) {
2019-11-08 18:15:13 +00:00
sectors, err := sm.Miner.ListSectors()
if err != nil {
return nil, err
}
out := make([]abi.SectorNumber, 0, len(sectors))
for _, sector := range sectors {
if sector.State == sealing.UndefinedSectorState {
continue // sector ID not set yet
}
out = append(out, sector.SectorNumber)
2019-11-08 18:15:13 +00:00
}
return out, nil
}
2020-12-06 00:51:48 +00:00
func (sm *StorageMinerAPI) SectorsListInStates(ctx context.Context, states []api.SectorState) ([]abi.SectorNumber, error) {
filterStates := make(map[sealing.SectorState]struct{})
for _, state := range states {
st := sealing.SectorState(state)
if _, ok := sealing.ExistSectorStateList[st]; !ok {
continue
}
filterStates[st] = struct{}{}
}
var sns []abi.SectorNumber
if len(filterStates) == 0 {
return sns, nil
}
sectors, err := sm.Miner.ListSectors()
if err != nil {
return nil, err
}
for i := range sectors {
if _, ok := filterStates[sectors[i].State]; ok {
sns = append(sns, sectors[i].SectorNumber)
}
}
return sns, nil
}
func (sm *StorageMinerAPI) SectorsSummary(ctx context.Context) (map[api.SectorState]int, error) {
sectors, err := sm.Miner.ListSectors()
if err != nil {
return nil, err
}
out := make(map[api.SectorState]int)
for i := range sectors {
state := api.SectorState(sectors[i].State)
2020-12-10 18:36:02 +00:00
out[state]++
2020-12-06 00:51:48 +00:00
}
return out, nil
}
2022-01-18 10:57:04 +00:00
func (sm *StorageMinerAPI) StorageLocal(ctx context.Context) (map[storiface.ID]string, error) {
2021-05-20 11:01:14 +00:00
l, err := sm.LocalStore.Local(ctx)
if err != nil {
return nil, err
}
2022-01-18 10:57:04 +00:00
out := map[storiface.ID]string{}
2021-05-20 11:01:14 +00:00
for _, st := range l {
out[st.ID] = st.LocalPath
}
return out, nil
2020-03-19 19:51:33 +00:00
}
2021-12-11 21:03:00 +00:00
func (sm *StorageMinerAPI) SectorsRefs(ctx context.Context) (map[string][]api.SealedRef, error) {
2019-08-26 10:04:57 +00:00
// json can't handle cids as map keys
out := map[string][]api.SealedRef{}
2021-12-11 21:03:00 +00:00
refs, err := sm.SectorBlocks.List(ctx)
2019-08-26 10:04:57 +00:00
if err != nil {
return nil, err
}
for k, v := range refs {
out[strconv.FormatUint(k, 10)] = v
2019-08-26 10:04:57 +00:00
}
return out, nil
}
2022-01-18 10:57:04 +00:00
func (sm *StorageMinerAPI) StorageStat(ctx context.Context, id storiface.ID) (fsutil.FsStat, error) {
2021-06-09 11:05:54 +00:00
return sm.RemoteStore.FsStat(ctx, id)
}
2020-06-26 15:28:05 +00:00
func (sm *StorageMinerAPI) SectorStartSealing(ctx context.Context, number abi.SectorNumber) error {
return sm.Miner.StartPackingSector(number)
}
func (sm *StorageMinerAPI) SectorSetSealDelay(ctx context.Context, delay time.Duration) error {
2020-08-18 14:20:31 +00:00
cfg, err := sm.GetSealingConfigFunc()
if err != nil {
return xerrors.Errorf("get config: %w", err)
}
cfg.WaitDealsDelay = delay
return sm.SetSealingConfigFunc(cfg)
}
func (sm *StorageMinerAPI) SectorGetSealDelay(ctx context.Context) (time.Duration, error) {
2020-08-18 14:20:31 +00:00
cfg, err := sm.GetSealingConfigFunc()
if err != nil {
return 0, err
}
return cfg.WaitDealsDelay, nil
}
func (sm *StorageMinerAPI) SectorSetExpectedSealDuration(ctx context.Context, delay time.Duration) error {
return sm.SetExpectedSealDurationFunc(delay)
}
func (sm *StorageMinerAPI) SectorGetExpectedSealDuration(ctx context.Context) (time.Duration, error) {
return sm.GetExpectedSealDurationFunc()
}
func (sm *StorageMinerAPI) SectorsUpdate(ctx context.Context, id abi.SectorNumber, state api.SectorState) error {
return sm.Miner.ForceSectorState(ctx, id, sealing.SectorState(state))
}
2020-03-05 19:21:06 +00:00
2020-06-22 17:35:14 +00:00
func (sm *StorageMinerAPI) SectorRemove(ctx context.Context, id abi.SectorNumber) error {
return sm.Miner.RemoveSector(ctx, id)
}
2021-01-12 23:42:01 +00:00
func (sm *StorageMinerAPI) SectorTerminate(ctx context.Context, id abi.SectorNumber) error {
return sm.Miner.TerminateSector(ctx, id)
}
2021-01-13 22:32:04 +00:00
func (sm *StorageMinerAPI) SectorTerminateFlush(ctx context.Context) (*cid.Cid, error) {
return sm.Miner.TerminateFlush(ctx)
}
2021-01-14 11:37:23 +00:00
func (sm *StorageMinerAPI) SectorTerminatePending(ctx context.Context) ([]abi.SectorID, error) {
return sm.Miner.TerminatePending(ctx)
}
func (sm *StorageMinerAPI) SectorPreCommitFlush(ctx context.Context) ([]sealiface.PreCommitBatchRes, error) {
2021-05-18 14:51:06 +00:00
return sm.Miner.SectorPreCommitFlush(ctx)
}
func (sm *StorageMinerAPI) SectorPreCommitPending(ctx context.Context) ([]abi.SectorID, error) {
return sm.Miner.SectorPreCommitPending(ctx)
}
func (sm *StorageMinerAPI) SectorMarkForUpgrade(ctx context.Context, id abi.SectorNumber, snap bool) error {
if !snap {
return fmt.Errorf("non-snap upgrades are not supported")
}
return sm.Miner.MarkForUpgrade(ctx, id)
2020-07-01 14:49:17 +00:00
}
func (sm *StorageMinerAPI) SectorAbortUpgrade(ctx context.Context, number abi.SectorNumber) error {
return sm.Miner.SectorAbortUpgrade(number)
}
2021-06-01 09:56:19 +00:00
func (sm *StorageMinerAPI) SectorCommitFlush(ctx context.Context) ([]sealiface.CommitBatchRes, error) {
2021-03-10 15:16:44 +00:00
return sm.Miner.CommitFlush(ctx)
}
func (sm *StorageMinerAPI) SectorCommitPending(ctx context.Context) ([]abi.SectorID, error) {
return sm.Miner.CommitPending(ctx)
}
func (sm *StorageMinerAPI) SectorMatchPendingPiecesToOpenSectors(ctx context.Context) error {
return sm.Miner.SectorMatchPendingPiecesToOpenSectors(ctx)
}
2022-08-18 14:37:22 +00:00
func (sm *StorageMinerAPI) SectorNumAssignerMeta(ctx context.Context) (api.NumAssignerMeta, error) {
return sm.Miner.NumAssignerMeta(ctx)
}
func (sm *StorageMinerAPI) SectorNumReservations(ctx context.Context) (map[string]bitfield.BitField, error) {
return sm.Miner.NumReservations(ctx)
}
func (sm *StorageMinerAPI) SectorNumReserve(ctx context.Context, name string, field bitfield.BitField, force bool) error {
return sm.Miner.NumReserve(ctx, name, field, force)
}
2022-08-24 15:25:37 +00:00
func (sm *StorageMinerAPI) SectorNumReserveCount(ctx context.Context, name string, count uint64) (bitfield.BitField, error) {
return sm.Miner.NumReserveCount(ctx, name, count)
}
2022-08-18 14:37:22 +00:00
func (sm *StorageMinerAPI) SectorNumFree(ctx context.Context, name string) error {
return sm.Miner.NumFree(ctx, name)
}
2022-08-24 19:27:27 +00:00
func (sm *StorageMinerAPI) SectorReceive(ctx context.Context, meta api.RemoteSectorMeta) error {
if err := sm.Miner.Receive(ctx, meta); err != nil {
return err
}
_, err := sm.waitSectorStarted(ctx, meta.Sector)
return err
2022-08-24 19:27:27 +00:00
}
2022-04-20 21:34:28 +00:00
func (sm *StorageMinerAPI) ComputeWindowPoSt(ctx context.Context, dlIdx uint64, tsk types.TipSetKey) ([]minertypes.SubmitWindowedPoStParams, error) {
var ts *types.TipSet
var err error
if tsk == types.EmptyTSK {
ts, err = sm.Full.ChainHead(ctx)
} else {
ts, err = sm.Full.ChainGetTipSet(ctx, tsk)
}
if err != nil {
return nil, err
}
return sm.WdPoSt.ComputePoSt(ctx, dlIdx, ts)
}
func (sm *StorageMinerAPI) ComputeDataCid(ctx context.Context, pieceSize abi.UnpaddedPieceSize, pieceData storiface.Data) (abi.PieceInfo, error) {
2022-04-26 19:37:48 +00:00
return sm.StorageMgr.DataCid(ctx, pieceSize, pieceData)
}
2020-03-11 01:57:52 +00:00
func (sm *StorageMinerAPI) WorkerConnect(ctx context.Context, url string) error {
w, err := connectRemoteWorker(ctx, sm, url)
2020-03-11 01:57:52 +00:00
if err != nil {
2020-03-18 23:23:28 +00:00
return xerrors.Errorf("connecting remote storage failed: %w", err)
2020-03-11 01:57:52 +00:00
}
2020-03-18 23:23:28 +00:00
log.Infof("Connected to a remote worker at %s", url)
2020-03-20 22:30:17 +00:00
return sm.StorageMgr.AddWorker(ctx, w)
2020-03-11 01:57:52 +00:00
}
func (sm *StorageMinerAPI) SealingSchedDiag(ctx context.Context, doSched bool) (interface{}, error) {
return sm.StorageMgr.SchedDiag(ctx, doSched)
}
func (sm *StorageMinerAPI) SealingAbort(ctx context.Context, call storiface.CallID) error {
return sm.StorageMgr.Abort(ctx, call)
}
2022-08-02 09:47:30 +00:00
func (sm *StorageMinerAPI) SealingRemoveRequest(ctx context.Context, schedId uuid.UUID) error {
return sm.StorageMgr.RemoveSchedRequest(ctx, schedId)
2022-07-20 04:50:05 +00:00
}
func (sm *StorageMinerAPI) MarketImportDealData(ctx context.Context, propCid cid.Cid, path string) error {
fi, err := os.Open(path)
if err != nil {
return xerrors.Errorf("failed to open file: %w", err)
}
defer fi.Close() //nolint:errcheck
return sm.StorageProvider.ImportDataForDeal(ctx, propCid, fi)
}
func (sm *StorageMinerAPI) listDeals(ctx context.Context) ([]*api.MarketDeal, error) {
ts, err := sm.Full.ChainHead(ctx)
if err != nil {
return nil, err
}
tsk := ts.Key()
allDeals, err := sm.Full.StateMarketDeals(ctx, tsk)
if err != nil {
return nil, err
}
var out []*api.MarketDeal
for _, deal := range allDeals {
if deal.Proposal.Provider == sm.Miner.Address() {
out = append(out, deal)
}
}
return out, nil
}
func (sm *StorageMinerAPI) MarketListDeals(ctx context.Context) ([]*api.MarketDeal, error) {
2020-09-17 08:17:14 +00:00
return sm.listDeals(ctx)
}
func (sm *StorageMinerAPI) MarketListRetrievalDeals(ctx context.Context) ([]struct{}, error) {
return []struct{}{}, nil
2020-08-04 23:40:29 +00:00
}
func (sm *StorageMinerAPI) MarketGetDealUpdates(ctx context.Context) (<-chan storagemarket.MinerDeal, error) {
2020-08-06 20:16:55 +00:00
results := make(chan storagemarket.MinerDeal)
unsub := sm.StorageProvider.SubscribeToEvents(func(evt storagemarket.ProviderEvent, deal storagemarket.MinerDeal) {
select {
case results <- deal:
case <-ctx.Done():
2020-08-06 20:16:55 +00:00
}
})
go func() {
<-ctx.Done()
unsub()
close(results)
}()
return results, nil
}
func (sm *StorageMinerAPI) MarketListIncompleteDeals(ctx context.Context) ([]storagemarket.MinerDeal, error) {
return sm.StorageProvider.ListLocalDeals()
}
2020-07-31 19:14:48 +00:00
func (sm *StorageMinerAPI) MarketSetAsk(ctx context.Context, price types.BigInt, verifiedPrice types.BigInt, duration abi.ChainEpoch, minPieceSize abi.PaddedPieceSize, maxPieceSize abi.PaddedPieceSize) error {
options := []storagemarket.StorageAskOption{
storagemarket.MinPieceSize(minPieceSize),
storagemarket.MaxPieceSize(maxPieceSize),
}
2020-07-31 19:14:48 +00:00
return sm.StorageProvider.SetAsk(price, verifiedPrice, duration, options...)
}
2020-06-17 00:18:54 +00:00
func (sm *StorageMinerAPI) MarketGetAsk(ctx context.Context) (*storagemarket.SignedStorageAsk, error) {
return sm.StorageProvider.GetAsk(), nil
}
2020-07-28 21:35:23 +00:00
func (sm *StorageMinerAPI) MarketSetRetrievalAsk(ctx context.Context, rask *retrievalmarket.Ask) error {
sm.RetrievalProvider.SetAsk(rask)
return nil
}
func (sm *StorageMinerAPI) MarketGetRetrievalAsk(ctx context.Context) (*retrievalmarket.Ask, error) {
return sm.RetrievalProvider.GetAsk(), nil
}
func (sm *StorageMinerAPI) MarketListDataTransfers(ctx context.Context) ([]api.DataTransferChannel, error) {
inProgressChannels, err := sm.DataTransfer.InProgressChannels(ctx)
if err != nil {
return nil, err
}
apiChannels := make([]api.DataTransferChannel, 0, len(inProgressChannels))
for _, channelState := range inProgressChannels {
apiChannels = append(apiChannels, api.NewDataTransferChannel(sm.Host.ID(), channelState))
}
return apiChannels, nil
}
2020-10-22 20:40:26 +00:00
func (sm *StorageMinerAPI) MarketRestartDataTransfer(ctx context.Context, transferID datatransfer.TransferID, otherPeer peer.ID, isInitiator bool) error {
selfPeer := sm.Host.ID()
if isInitiator {
return sm.DataTransfer.RestartDataTransferChannel(ctx, datatransfer.ChannelID{Initiator: selfPeer, Responder: otherPeer, ID: transferID})
}
return sm.DataTransfer.RestartDataTransferChannel(ctx, datatransfer.ChannelID{Initiator: otherPeer, Responder: selfPeer, ID: transferID})
}
func (sm *StorageMinerAPI) MarketCancelDataTransfer(ctx context.Context, transferID datatransfer.TransferID, otherPeer peer.ID, isInitiator bool) error {
selfPeer := sm.Host.ID()
if isInitiator {
return sm.DataTransfer.CloseDataTransferChannel(ctx, datatransfer.ChannelID{Initiator: selfPeer, Responder: otherPeer, ID: transferID})
}
return sm.DataTransfer.CloseDataTransferChannel(ctx, datatransfer.ChannelID{Initiator: otherPeer, Responder: selfPeer, ID: transferID})
}
func (sm *StorageMinerAPI) MarketDataTransferUpdates(ctx context.Context) (<-chan api.DataTransferChannel, error) {
channels := make(chan api.DataTransferChannel)
unsub := sm.DataTransfer.SubscribeToEvents(func(evt datatransfer.Event, channelState datatransfer.ChannelState) {
channel := api.NewDataTransferChannel(sm.Host.ID(), channelState)
select {
case <-ctx.Done():
case channels <- channel:
}
})
go func() {
defer unsub()
<-ctx.Done()
}()
return channels, nil
}
func (sm *StorageMinerAPI) MarketDataTransferDiagnostics(ctx context.Context, mpid peer.ID) (*api.TransferDiagnostics, error) {
gsTransport, ok := sm.Transport.(*gst.Transport)
if !ok {
return nil, errors.New("api only works for graphsync as transport")
}
graphsyncConcrete, ok := sm.StagingGraphsync.(*gsimpl.GraphSync)
if !ok {
return nil, errors.New("api only works for non-mock graphsync implementation")
}
inProgressChannels, err := sm.DataTransfer.InProgressChannels(ctx)
if err != nil {
return nil, err
}
allReceivingChannels := make(map[datatransfer.ChannelID]datatransfer.ChannelState)
allSendingChannels := make(map[datatransfer.ChannelID]datatransfer.ChannelState)
for channelID, channel := range inProgressChannels {
if channel.OtherPeer() != mpid {
continue
}
if channel.Status() == datatransfer.Completed {
continue
}
if channel.Status() == datatransfer.Failed || channel.Status() == datatransfer.Cancelled {
continue
}
if channel.SelfPeer() == channel.Sender() {
allSendingChannels[channelID] = channel
} else {
allReceivingChannels[channelID] = channel
}
}
// gather information about active transport channels
transportChannels := gsTransport.ChannelsForPeer(mpid)
// gather information about graphsync state for peer
gsPeerState := graphsyncConcrete.PeerState(mpid)
sendingTransfers := sm.generateTransfers(ctx, transportChannels.SendingChannels, gsPeerState.IncomingState, allSendingChannels)
receivingTransfers := sm.generateTransfers(ctx, transportChannels.ReceivingChannels, gsPeerState.OutgoingState, allReceivingChannels)
return &api.TransferDiagnostics{
SendingTransfers: sendingTransfers,
ReceivingTransfers: receivingTransfers,
}, nil
}
// generate transfers matches graphsync state and data transfer state for a given peer
// to produce detailed output on what's happening with a transfer
func (sm *StorageMinerAPI) generateTransfers(ctx context.Context,
transportChannels map[datatransfer.ChannelID]gst.ChannelGraphsyncRequests,
gsPeerState peerstate.PeerState,
allChannels map[datatransfer.ChannelID]datatransfer.ChannelState) []*api.GraphSyncDataTransfer {
tc := &transferConverter{
matchedChannelIds: make(map[datatransfer.ChannelID]struct{}),
matchedRequests: make(map[graphsync.RequestID]*api.GraphSyncDataTransfer),
gsDiagnostics: gsPeerState.Diagnostics(),
requestStates: gsPeerState.RequestStates,
allChannels: allChannels,
}
// iterate through all operating data transfer transport channels
for channelID, channelRequests := range transportChannels {
originalState, err := sm.DataTransfer.ChannelState(ctx, channelID)
var baseDiagnostics []string
var channelState *api.DataTransferChannel
if err != nil {
baseDiagnostics = append(baseDiagnostics, fmt.Sprintf("Unable to lookup channel state: %s", err))
} else {
cs := api.NewDataTransferChannel(sm.Host.ID(), originalState)
channelState = &cs
}
// add the current request for this channel
tc.convertTransfer(channelID, true, channelState, baseDiagnostics, channelRequests.Current, true)
for _, requestID := range channelRequests.Previous {
// add any previous requests that were cancelled for a restart
tc.convertTransfer(channelID, true, channelState, baseDiagnostics, requestID, false)
}
}
// collect any graphsync data for channels we don't have any data transfer data for
tc.collectRemainingTransfers()
return tc.transfers
}
type transferConverter struct {
matchedChannelIds map[datatransfer.ChannelID]struct{}
matchedRequests map[graphsync.RequestID]*api.GraphSyncDataTransfer
transfers []*api.GraphSyncDataTransfer
gsDiagnostics map[graphsync.RequestID][]string
requestStates graphsync.RequestStates
allChannels map[datatransfer.ChannelID]datatransfer.ChannelState
}
// convert transfer assembles transfer and diagnostic data for a given graphsync/data-transfer request
func (tc *transferConverter) convertTransfer(channelID datatransfer.ChannelID, hasChannelID bool, channelState *api.DataTransferChannel, baseDiagnostics []string,
requestID graphsync.RequestID, isCurrentChannelRequest bool) {
diagnostics := baseDiagnostics
state, hasState := tc.requestStates[requestID]
stateString := state.String()
if !hasState {
stateString = "no graphsync state found"
}
var channelIDPtr *datatransfer.ChannelID
2021-12-11 01:50:12 +00:00
if !hasChannelID {
diagnostics = append(diagnostics, fmt.Sprintf("No data transfer channel id for GraphSync request ID %s", requestID))
} else {
channelIDPtr = &channelID
if isCurrentChannelRequest && !hasState {
diagnostics = append(diagnostics, fmt.Sprintf("No current request state for data transfer channel id %s", channelID))
} else if !isCurrentChannelRequest && hasState {
diagnostics = append(diagnostics, fmt.Sprintf("Graphsync request %s is a previous request on data transfer channel id %s that was restarted, but it is still running", requestID, channelID))
}
}
diagnostics = append(diagnostics, tc.gsDiagnostics[requestID]...)
transfer := &api.GraphSyncDataTransfer{
RequestID: &requestID,
RequestState: stateString,
IsCurrentChannelRequest: isCurrentChannelRequest,
ChannelID: channelIDPtr,
ChannelState: channelState,
Diagnostics: diagnostics,
}
tc.transfers = append(tc.transfers, transfer)
tc.matchedRequests[requestID] = transfer
if hasChannelID {
tc.matchedChannelIds[channelID] = struct{}{}
}
}
func (tc *transferConverter) collectRemainingTransfers() {
for requestID := range tc.requestStates {
if _, ok := tc.matchedRequests[requestID]; !ok {
tc.convertTransfer(datatransfer.ChannelID{}, false, nil, nil, requestID, false)
}
}
for requestID := range tc.gsDiagnostics {
if _, ok := tc.matchedRequests[requestID]; !ok {
tc.convertTransfer(datatransfer.ChannelID{}, false, nil, nil, requestID, false)
}
}
for channelID, channelState := range tc.allChannels {
if _, ok := tc.matchedChannelIds[channelID]; !ok {
channelID := channelID
cs := api.NewDataTransferChannel(channelState.SelfPeer(), channelState)
transfer := &api.GraphSyncDataTransfer{
RequestID: nil,
RequestState: "graphsync state unknown",
IsCurrentChannelRequest: false,
ChannelID: &channelID,
ChannelState: &cs,
Diagnostics: []string{"data transfer with no open transport channel, cannot determine linked graphsync request"},
}
tc.transfers = append(tc.transfers, transfer)
}
}
}
2021-02-05 17:58:55 +00:00
func (sm *StorageMinerAPI) MarketPendingDeals(ctx context.Context) (api.PendingDealInfo, error) {
return sm.DealPublisher.PendingDeals(), nil
}
func (sm *StorageMinerAPI) MarketRetryPublishDeal(ctx context.Context, propcid cid.Cid) error {
return sm.StorageProvider.RetryDealPublishing(propcid)
}
2021-02-05 17:58:55 +00:00
func (sm *StorageMinerAPI) MarketPublishPendingDeals(ctx context.Context) error {
sm.DealPublisher.ForcePublishPendingDeals()
return nil
}
integrate DAG store and CARv2 in deal-making (#6671) This commit removes badger from the deal-making processes, and moves to a new architecture with the dagstore as the cental component on the miner-side, and CARv2s on the client-side. Every deal that has been handed off to the sealing subsystem becomes a shard in the dagstore. Shards are mounted via the LotusMount, which teaches the dagstore how to load the related piece when serving retrievals. When the miner starts the Lotus for the first time with this patch, we will perform a one-time migration of all active deals into the dagstore. This is a lightweight process, and it consists simply of registering the shards in the dagstore. Shards are backed by the unsealed copy of the piece. This is currently a CARv1. However, the dagstore keeps CARv2 indices for all pieces, so when it's time to acquire a shard to serve a retrieval, the unsealed CARv1 is joined with its index (safeguarded by the dagstore), to form a read-only blockstore, thus taking the place of the monolithic badger. Data transfers have been adjusted to interface directly with CARv2 files. On inbound transfers (client retrievals, miner storage deals), we stream the received data into a CARv2 ReadWrite blockstore. On outbound transfers (client storage deals, miner retrievals), we serve the data off a CARv2 ReadOnly blockstore. Client-side imports are managed by the refactored *imports.Manager component (when not using IPFS integration). Just like it before, we use the go-filestore library to avoid duplicating the data from the original file in the resulting UnixFS DAG (concretely the leaves). However, the target of those imports are what we call "ref-CARv2s": CARv2 files placed under the `$LOTUS_PATH/imports` directory, containing the intermediate nodes in full, and the leaves as positional references to the original file on disk. Client-side retrievals are placed into CARv2 files in the location: `$LOTUS_PATH/retrievals`. A new set of `Dagstore*` JSON-RPC operations and `lotus-miner dagstore` subcommands have been introduced on the miner-side to inspect and manage the dagstore. Despite moving to a CARv2-backed system, the IPFS integration has been respected, and it continues to be possible to make storage deals with data held in an IPFS node, and to perform retrievals directly into an IPFS node. NOTE: because the "staging" and "client" Badger blockstores are no longer used, existing imports on the client will be rendered useless. On startup, Lotus will enumerate all imports and print WARN statements on the log for each import that needs to be reimported. These log lines contain these messages: - import lacks carv2 path; import will not work; please reimport - import has missing/broken carv2; please reimport At the end, we will print a "sanity check completed" message indicating the count of imports found, and how many were deemed broken. Co-authored-by: Aarsh Shah <aarshkshah1992@gmail.com> Co-authored-by: Dirk McCormick <dirkmdev@gmail.com> Co-authored-by: Raúl Kripalani <raul@protocol.ai> Co-authored-by: Dirk McCormick <dirkmdev@gmail.com>
2021-08-16 22:34:32 +00:00
func (sm *StorageMinerAPI) DagstoreListShards(ctx context.Context) ([]api.DagstoreShardInfo, error) {
if sm.DAGStore == nil {
return nil, fmt.Errorf("dagstore not available on this node")
}
info := sm.DAGStore.AllShardsInfo()
ret := make([]api.DagstoreShardInfo, 0, len(info))
for k, i := range info {
ret = append(ret, api.DagstoreShardInfo{
Key: k.String(),
State: i.ShardState.String(),
Error: func() string {
if i.Error == nil {
return ""
}
return i.Error.Error()
}(),
})
}
// order by key.
sort.SliceStable(ret, func(i, j int) bool {
return ret[i].Key < ret[j].Key
})
return ret, nil
}
2022-05-12 14:16:53 +00:00
func (sm *StorageMinerAPI) DagstoreRegisterShard(ctx context.Context, key string) error {
if sm.DAGStore == nil {
return fmt.Errorf("dagstore not available on this node")
}
// First check if the shard has already been registered
k := shard.KeyFromString(key)
_, err := sm.DAGStore.GetShardInfo(k)
if err == nil {
// Shard already registered, nothing further to do
return nil
}
// If the shard is not registered we would expect ErrShardUnknown
if !errors.Is(err, dagstore.ErrShardUnknown) {
return fmt.Errorf("getting shard info from DAG store: %w", err)
}
pieceCid, err := cid.Parse(key)
if err != nil {
return fmt.Errorf("parsing shard key as piece cid: %w", err)
}
if err = filmktsstore.RegisterShardSync(ctx, sm.DAGStoreWrapper, pieceCid, "", true); err != nil {
return fmt.Errorf("failed to register shard: %w", err)
}
return nil
}
integrate DAG store and CARv2 in deal-making (#6671) This commit removes badger from the deal-making processes, and moves to a new architecture with the dagstore as the cental component on the miner-side, and CARv2s on the client-side. Every deal that has been handed off to the sealing subsystem becomes a shard in the dagstore. Shards are mounted via the LotusMount, which teaches the dagstore how to load the related piece when serving retrievals. When the miner starts the Lotus for the first time with this patch, we will perform a one-time migration of all active deals into the dagstore. This is a lightweight process, and it consists simply of registering the shards in the dagstore. Shards are backed by the unsealed copy of the piece. This is currently a CARv1. However, the dagstore keeps CARv2 indices for all pieces, so when it's time to acquire a shard to serve a retrieval, the unsealed CARv1 is joined with its index (safeguarded by the dagstore), to form a read-only blockstore, thus taking the place of the monolithic badger. Data transfers have been adjusted to interface directly with CARv2 files. On inbound transfers (client retrievals, miner storage deals), we stream the received data into a CARv2 ReadWrite blockstore. On outbound transfers (client storage deals, miner retrievals), we serve the data off a CARv2 ReadOnly blockstore. Client-side imports are managed by the refactored *imports.Manager component (when not using IPFS integration). Just like it before, we use the go-filestore library to avoid duplicating the data from the original file in the resulting UnixFS DAG (concretely the leaves). However, the target of those imports are what we call "ref-CARv2s": CARv2 files placed under the `$LOTUS_PATH/imports` directory, containing the intermediate nodes in full, and the leaves as positional references to the original file on disk. Client-side retrievals are placed into CARv2 files in the location: `$LOTUS_PATH/retrievals`. A new set of `Dagstore*` JSON-RPC operations and `lotus-miner dagstore` subcommands have been introduced on the miner-side to inspect and manage the dagstore. Despite moving to a CARv2-backed system, the IPFS integration has been respected, and it continues to be possible to make storage deals with data held in an IPFS node, and to perform retrievals directly into an IPFS node. NOTE: because the "staging" and "client" Badger blockstores are no longer used, existing imports on the client will be rendered useless. On startup, Lotus will enumerate all imports and print WARN statements on the log for each import that needs to be reimported. These log lines contain these messages: - import lacks carv2 path; import will not work; please reimport - import has missing/broken carv2; please reimport At the end, we will print a "sanity check completed" message indicating the count of imports found, and how many were deemed broken. Co-authored-by: Aarsh Shah <aarshkshah1992@gmail.com> Co-authored-by: Dirk McCormick <dirkmdev@gmail.com> Co-authored-by: Raúl Kripalani <raul@protocol.ai> Co-authored-by: Dirk McCormick <dirkmdev@gmail.com>
2021-08-16 22:34:32 +00:00
func (sm *StorageMinerAPI) DagstoreInitializeShard(ctx context.Context, key string) error {
if sm.DAGStore == nil {
return fmt.Errorf("dagstore not available on this node")
}
k := shard.KeyFromString(key)
info, err := sm.DAGStore.GetShardInfo(k)
if err != nil {
return fmt.Errorf("failed to get shard info: %w", err)
}
if st := info.ShardState; st != dagstore.ShardStateNew {
return fmt.Errorf("cannot initialize shard; expected state ShardStateNew, was: %s", st.String())
}
ch := make(chan dagstore.ShardResult, 1)
if err = sm.DAGStore.AcquireShard(ctx, k, ch, dagstore.AcquireOpts{}); err != nil {
return fmt.Errorf("failed to acquire shard: %w", err)
}
var res dagstore.ShardResult
select {
case res = <-ch:
case <-ctx.Done():
return ctx.Err()
}
if err := res.Error; err != nil {
return fmt.Errorf("failed to acquire shard: %w", err)
}
if res.Accessor != nil {
err = res.Accessor.Close()
if err != nil {
log.Warnw("failed to close shard accessor; continuing", "shard_key", k, "error", err)
}
}
return nil
}
func (sm *StorageMinerAPI) DagstoreInitializeAll(ctx context.Context, params api.DagstoreInitializeAllParams) (<-chan api.DagstoreInitializeAllEvent, error) {
if sm.DAGStore == nil {
return nil, fmt.Errorf("dagstore not available on this node")
}
if sm.SectorAccessor == nil {
return nil, fmt.Errorf("sector accessor not available on this node")
}
// prepare the thottler tokens.
var throttle chan struct{}
if c := params.MaxConcurrency; c > 0 {
throttle = make(chan struct{}, c)
for i := 0; i < c; i++ {
throttle <- struct{}{}
}
}
// are we initializing only unsealed pieces?
onlyUnsealed := !params.IncludeSealed
info := sm.DAGStore.AllShardsInfo()
var toInitialize []string
for k, i := range info {
if i.ShardState != dagstore.ShardStateNew {
continue
}
// if we're initializing only unsealed pieces, check if there's an
// unsealed deal for this piece available.
if onlyUnsealed {
pieceCid, err := cid.Decode(k.String())
if err != nil {
log.Warnw("DagstoreInitializeAll: failed to decode shard key as piece CID; skipping", "shard_key", k.String(), "error", err)
continue
}
pi, err := sm.PieceStore.GetPieceInfo(pieceCid)
if err != nil {
log.Warnw("DagstoreInitializeAll: failed to get piece info; skipping", "piece_cid", pieceCid, "error", err)
continue
}
var isUnsealed bool
for _, d := range pi.Deals {
isUnsealed, err = sm.SectorAccessor.IsUnsealed(ctx, d.SectorID, d.Offset.Unpadded(), d.Length.Unpadded())
if err != nil {
log.Warnw("DagstoreInitializeAll: failed to get unsealed status; skipping deal", "deal_id", d.DealID, "error", err)
continue
}
if isUnsealed {
break
}
}
if !isUnsealed {
log.Infow("DagstoreInitializeAll: skipping piece because it's sealed", "piece_cid", pieceCid, "error", err)
continue
}
}
// yes, we're initializing this shard.
toInitialize = append(toInitialize, k.String())
}
total := len(toInitialize)
if total == 0 {
out := make(chan api.DagstoreInitializeAllEvent)
close(out)
return out, nil
}
// response channel must be closed when we're done, or the context is cancelled.
// this buffering is necessary to prevent inflight children goroutines from
// publishing to a closed channel (res) when the context is cancelled.
out := make(chan api.DagstoreInitializeAllEvent, 32) // internal buffer.
res := make(chan api.DagstoreInitializeAllEvent, 32) // returned to caller.
// pump events back to caller.
// two events per shard.
go func() {
defer close(res)
for i := 0; i < total*2; i++ {
select {
case res <- <-out:
case <-ctx.Done():
return
}
}
}()
go func() {
for i, k := range toInitialize {
2021-08-24 05:12:00 +00:00
if throttle != nil {
select {
case <-throttle:
// acquired a throttle token, proceed.
case <-ctx.Done():
return
}
integrate DAG store and CARv2 in deal-making (#6671) This commit removes badger from the deal-making processes, and moves to a new architecture with the dagstore as the cental component on the miner-side, and CARv2s on the client-side. Every deal that has been handed off to the sealing subsystem becomes a shard in the dagstore. Shards are mounted via the LotusMount, which teaches the dagstore how to load the related piece when serving retrievals. When the miner starts the Lotus for the first time with this patch, we will perform a one-time migration of all active deals into the dagstore. This is a lightweight process, and it consists simply of registering the shards in the dagstore. Shards are backed by the unsealed copy of the piece. This is currently a CARv1. However, the dagstore keeps CARv2 indices for all pieces, so when it's time to acquire a shard to serve a retrieval, the unsealed CARv1 is joined with its index (safeguarded by the dagstore), to form a read-only blockstore, thus taking the place of the monolithic badger. Data transfers have been adjusted to interface directly with CARv2 files. On inbound transfers (client retrievals, miner storage deals), we stream the received data into a CARv2 ReadWrite blockstore. On outbound transfers (client storage deals, miner retrievals), we serve the data off a CARv2 ReadOnly blockstore. Client-side imports are managed by the refactored *imports.Manager component (when not using IPFS integration). Just like it before, we use the go-filestore library to avoid duplicating the data from the original file in the resulting UnixFS DAG (concretely the leaves). However, the target of those imports are what we call "ref-CARv2s": CARv2 files placed under the `$LOTUS_PATH/imports` directory, containing the intermediate nodes in full, and the leaves as positional references to the original file on disk. Client-side retrievals are placed into CARv2 files in the location: `$LOTUS_PATH/retrievals`. A new set of `Dagstore*` JSON-RPC operations and `lotus-miner dagstore` subcommands have been introduced on the miner-side to inspect and manage the dagstore. Despite moving to a CARv2-backed system, the IPFS integration has been respected, and it continues to be possible to make storage deals with data held in an IPFS node, and to perform retrievals directly into an IPFS node. NOTE: because the "staging" and "client" Badger blockstores are no longer used, existing imports on the client will be rendered useless. On startup, Lotus will enumerate all imports and print WARN statements on the log for each import that needs to be reimported. These log lines contain these messages: - import lacks carv2 path; import will not work; please reimport - import has missing/broken carv2; please reimport At the end, we will print a "sanity check completed" message indicating the count of imports found, and how many were deemed broken. Co-authored-by: Aarsh Shah <aarshkshah1992@gmail.com> Co-authored-by: Dirk McCormick <dirkmdev@gmail.com> Co-authored-by: Raúl Kripalani <raul@protocol.ai> Co-authored-by: Dirk McCormick <dirkmdev@gmail.com>
2021-08-16 22:34:32 +00:00
}
go func(k string, i int) {
r := api.DagstoreInitializeAllEvent{
Key: k,
Event: "start",
Total: total,
Current: i + 1, // start with 1
}
select {
case out <- r:
case <-ctx.Done():
return
}
err := sm.DagstoreInitializeShard(ctx, k)
2021-08-25 10:49:02 +00:00
if throttle != nil {
throttle <- struct{}{}
}
integrate DAG store and CARv2 in deal-making (#6671) This commit removes badger from the deal-making processes, and moves to a new architecture with the dagstore as the cental component on the miner-side, and CARv2s on the client-side. Every deal that has been handed off to the sealing subsystem becomes a shard in the dagstore. Shards are mounted via the LotusMount, which teaches the dagstore how to load the related piece when serving retrievals. When the miner starts the Lotus for the first time with this patch, we will perform a one-time migration of all active deals into the dagstore. This is a lightweight process, and it consists simply of registering the shards in the dagstore. Shards are backed by the unsealed copy of the piece. This is currently a CARv1. However, the dagstore keeps CARv2 indices for all pieces, so when it's time to acquire a shard to serve a retrieval, the unsealed CARv1 is joined with its index (safeguarded by the dagstore), to form a read-only blockstore, thus taking the place of the monolithic badger. Data transfers have been adjusted to interface directly with CARv2 files. On inbound transfers (client retrievals, miner storage deals), we stream the received data into a CARv2 ReadWrite blockstore. On outbound transfers (client storage deals, miner retrievals), we serve the data off a CARv2 ReadOnly blockstore. Client-side imports are managed by the refactored *imports.Manager component (when not using IPFS integration). Just like it before, we use the go-filestore library to avoid duplicating the data from the original file in the resulting UnixFS DAG (concretely the leaves). However, the target of those imports are what we call "ref-CARv2s": CARv2 files placed under the `$LOTUS_PATH/imports` directory, containing the intermediate nodes in full, and the leaves as positional references to the original file on disk. Client-side retrievals are placed into CARv2 files in the location: `$LOTUS_PATH/retrievals`. A new set of `Dagstore*` JSON-RPC operations and `lotus-miner dagstore` subcommands have been introduced on the miner-side to inspect and manage the dagstore. Despite moving to a CARv2-backed system, the IPFS integration has been respected, and it continues to be possible to make storage deals with data held in an IPFS node, and to perform retrievals directly into an IPFS node. NOTE: because the "staging" and "client" Badger blockstores are no longer used, existing imports on the client will be rendered useless. On startup, Lotus will enumerate all imports and print WARN statements on the log for each import that needs to be reimported. These log lines contain these messages: - import lacks carv2 path; import will not work; please reimport - import has missing/broken carv2; please reimport At the end, we will print a "sanity check completed" message indicating the count of imports found, and how many were deemed broken. Co-authored-by: Aarsh Shah <aarshkshah1992@gmail.com> Co-authored-by: Dirk McCormick <dirkmdev@gmail.com> Co-authored-by: Raúl Kripalani <raul@protocol.ai> Co-authored-by: Dirk McCormick <dirkmdev@gmail.com>
2021-08-16 22:34:32 +00:00
r.Event = "end"
if err == nil {
r.Success = true
} else {
r.Success = false
r.Error = err.Error()
}
select {
case out <- r:
case <-ctx.Done():
}
}(k, i)
}
}()
return res, nil
}
func (sm *StorageMinerAPI) DagstoreRecoverShard(ctx context.Context, key string) error {
if sm.DAGStore == nil {
return fmt.Errorf("dagstore not available on this node")
}
k := shard.KeyFromString(key)
info, err := sm.DAGStore.GetShardInfo(k)
if err != nil {
return fmt.Errorf("failed to get shard info: %w", err)
}
if st := info.ShardState; st != dagstore.ShardStateErrored {
return fmt.Errorf("cannot recover shard; expected state ShardStateErrored, was: %s", st.String())
}
ch := make(chan dagstore.ShardResult, 1)
if err = sm.DAGStore.RecoverShard(ctx, k, ch, dagstore.RecoverOpts{}); err != nil {
return fmt.Errorf("failed to recover shard: %w", err)
}
var res dagstore.ShardResult
select {
case res = <-ch:
case <-ctx.Done():
return ctx.Err()
}
return res.Error
}
func (sm *StorageMinerAPI) DagstoreGC(ctx context.Context) ([]api.DagstoreShardResult, error) {
if sm.DAGStore == nil {
return nil, fmt.Errorf("dagstore not available on this node")
}
res, err := sm.DAGStore.GC(ctx)
if err != nil {
return nil, fmt.Errorf("failed to gc: %w", err)
}
ret := make([]api.DagstoreShardResult, 0, len(res.Shards))
for k, err := range res.Shards {
r := api.DagstoreShardResult{Key: k.String()}
if err == nil {
r.Success = true
} else {
r.Success = false
r.Error = err.Error()
}
ret = append(ret, r)
}
return ret, nil
}
func (sm *StorageMinerAPI) IndexerAnnounceDeal(ctx context.Context, proposalCid cid.Cid) error {
return sm.StorageProvider.AnnounceDealToIndexer(ctx, proposalCid)
}
2022-01-12 14:06:48 +00:00
func (sm *StorageMinerAPI) IndexerAnnounceAllDeals(ctx context.Context) error {
return sm.StorageProvider.AnnounceAllDealsToIndexer(ctx)
}
func (sm *StorageMinerAPI) DagstoreLookupPieces(ctx context.Context, cid cid.Cid) ([]api.DagstoreShardInfo, error) {
if sm.DAGStore == nil {
return nil, fmt.Errorf("dagstore not available on this node")
}
2022-02-18 10:00:01 +00:00
keys, err := sm.DAGStore.TopLevelIndex.GetShardsForMultihash(ctx, cid.Hash())
if err != nil {
return nil, err
}
var ret []api.DagstoreShardInfo
for _, k := range keys {
shard, err := sm.DAGStore.GetShardInfo(k)
if err != nil {
return nil, err
}
ret = append(ret, api.DagstoreShardInfo{
Key: k.String(),
State: shard.ShardState.String(),
Error: func() string {
if shard.Error == nil {
return ""
}
return shard.Error.Error()
}(),
})
}
// order by key.
sort.SliceStable(ret, func(i, j int) bool {
return ret[i].Key < ret[j].Key
})
return ret, nil
}
func (sm *StorageMinerAPI) DealsList(ctx context.Context) ([]*api.MarketDeal, error) {
return sm.listDeals(ctx)
}
2020-08-04 23:40:29 +00:00
func (sm *StorageMinerAPI) RetrievalDealsList(ctx context.Context) (map[retrievalmarket.ProviderDealIdentifier]retrievalmarket.ProviderDealState, error) {
return sm.RetrievalProvider.ListDeals(), nil
}
func (sm *StorageMinerAPI) DealsConsiderOnlineStorageDeals(ctx context.Context) (bool, error) {
return sm.ConsiderOnlineStorageDealsConfigFunc()
}
func (sm *StorageMinerAPI) DealsSetConsiderOnlineStorageDeals(ctx context.Context, b bool) error {
return sm.SetConsiderOnlineStorageDealsConfigFunc(b)
}
func (sm *StorageMinerAPI) DealsConsiderOnlineRetrievalDeals(ctx context.Context) (bool, error) {
return sm.ConsiderOnlineRetrievalDealsConfigFunc()
}
func (sm *StorageMinerAPI) DealsSetConsiderOnlineRetrievalDeals(ctx context.Context, b bool) error {
return sm.SetConsiderOnlineRetrievalDealsConfigFunc(b)
}
func (sm *StorageMinerAPI) DealsConsiderOfflineStorageDeals(ctx context.Context) (bool, error) {
return sm.ConsiderOfflineStorageDealsConfigFunc()
}
func (sm *StorageMinerAPI) DealsSetConsiderOfflineStorageDeals(ctx context.Context, b bool) error {
return sm.SetConsiderOfflineStorageDealsConfigFunc(b)
}
func (sm *StorageMinerAPI) DealsConsiderOfflineRetrievalDeals(ctx context.Context) (bool, error) {
return sm.ConsiderOfflineRetrievalDealsConfigFunc()
}
func (sm *StorageMinerAPI) DealsSetConsiderOfflineRetrievalDeals(ctx context.Context, b bool) error {
return sm.SetConsiderOfflineRetrievalDealsConfigFunc(b)
}
func (sm *StorageMinerAPI) DealsConsiderVerifiedStorageDeals(ctx context.Context) (bool, error) {
return sm.ConsiderVerifiedStorageDealsConfigFunc()
}
func (sm *StorageMinerAPI) DealsSetConsiderVerifiedStorageDeals(ctx context.Context, b bool) error {
return sm.SetConsiderVerifiedStorageDealsConfigFunc(b)
}
func (sm *StorageMinerAPI) DealsConsiderUnverifiedStorageDeals(ctx context.Context) (bool, error) {
return sm.ConsiderUnverifiedStorageDealsConfigFunc()
}
func (sm *StorageMinerAPI) DealsSetConsiderUnverifiedStorageDeals(ctx context.Context, b bool) error {
return sm.SetConsiderUnverifiedStorageDealsConfigFunc(b)
}
func (sm *StorageMinerAPI) DealsGetExpectedSealDurationFunc(ctx context.Context) (time.Duration, error) {
return sm.GetExpectedSealDurationFunc()
}
func (sm *StorageMinerAPI) DealsSetExpectedSealDurationFunc(ctx context.Context, d time.Duration) error {
return sm.SetExpectedSealDurationFunc(d)
}
func (sm *StorageMinerAPI) DealsImportData(ctx context.Context, deal cid.Cid, fname string) error {
fi, err := os.Open(fname)
if err != nil {
return xerrors.Errorf("failed to open given file: %w", err)
}
defer fi.Close() //nolint:errcheck
return sm.StorageProvider.ImportDataForDeal(ctx, deal, fi)
}
2020-06-18 22:42:24 +00:00
func (sm *StorageMinerAPI) DealsPieceCidBlocklist(ctx context.Context) ([]cid.Cid, error) {
return sm.StorageDealPieceCidBlocklistConfigFunc()
}
2020-06-18 22:42:24 +00:00
func (sm *StorageMinerAPI) DealsSetPieceCidBlocklist(ctx context.Context, cids []cid.Cid) error {
return sm.SetStorageDealPieceCidBlocklistConfigFunc(cids)
}
func (sm *StorageMinerAPI) StorageAddLocal(ctx context.Context, path string) error {
if sm.StorageMgr == nil {
return xerrors.Errorf("no storage manager")
}
2020-03-19 15:10:19 +00:00
return sm.StorageMgr.AddLocalStorage(ctx, path)
}
func (sm *StorageMinerAPI) StorageDetachLocal(ctx context.Context, path string) error {
if sm.StorageMgr == nil {
return xerrors.Errorf("no storage manager")
}
return sm.StorageMgr.DetachLocalStorage(ctx, path)
}
func (sm *StorageMinerAPI) StorageRedeclareLocal(ctx context.Context, id *storiface.ID, dropMissing bool) error {
if sm.StorageMgr == nil {
return xerrors.Errorf("no storage manager")
}
2022-07-15 13:49:23 +00:00
return sm.StorageMgr.RedeclareLocalStorage(ctx, id, dropMissing)
}
func (sm *StorageMinerAPI) PiecesListPieces(ctx context.Context) ([]cid.Cid, error) {
2020-07-28 21:35:23 +00:00
return sm.PieceStore.ListPieceInfoKeys()
}
func (sm *StorageMinerAPI) PiecesListCidInfos(ctx context.Context) ([]cid.Cid, error) {
2020-07-28 21:35:23 +00:00
return sm.PieceStore.ListCidInfoKeys()
}
func (sm *StorageMinerAPI) PiecesGetPieceInfo(ctx context.Context, pieceCid cid.Cid) (*piecestore.PieceInfo, error) {
pi, err := sm.PieceStore.GetPieceInfo(pieceCid)
if err != nil {
return nil, err
}
return &pi, nil
}
func (sm *StorageMinerAPI) PiecesGetCIDInfo(ctx context.Context, payloadCid cid.Cid) (*piecestore.CIDInfo, error) {
ci, err := sm.PieceStore.GetCIDInfo(payloadCid)
if err != nil {
return nil, err
}
return &ci, nil
}
2020-10-01 11:58:26 +00:00
func (sm *StorageMinerAPI) CreateBackup(ctx context.Context, fpath string) error {
2021-12-11 21:03:00 +00:00
return backup(ctx, sm.DS, fpath)
2020-10-01 11:58:26 +00:00
}
2022-11-14 20:29:50 +00:00
func (sm *StorageMinerAPI) CheckProvable(ctx context.Context, pp abi.RegisteredPoStProof, sectors []storiface.SectorRef) (map[abi.SectorNumber]string, error) {
rg := func(ctx context.Context, id abi.SectorID) (cid.Cid, bool, error) {
si, err := sm.Miner.SectorsStatus(ctx, id.Number, false)
if err != nil {
return cid.Undef, false, err
}
if si.CommR == nil {
return cid.Undef, false, xerrors.Errorf("commr is nil")
2020-12-01 23:32:01 +00:00
}
2022-11-14 20:29:50 +00:00
return *si.CommR, si.ReplicaUpdateMessage != nil, nil
2020-12-01 23:32:01 +00:00
}
2022-03-10 13:20:07 +00:00
bad, err := sm.StorageMgr.CheckProvable(ctx, pp, sectors, rg)
2020-11-26 07:02:43 +00:00
if err != nil {
return nil, err
}
var out = make(map[abi.SectorNumber]string)
for sid, err := range bad {
out[sid.Number] = err
}
return out, nil
}
2020-12-02 19:46:07 +00:00
func (sm *StorageMinerAPI) ActorAddressConfig(ctx context.Context) (api.AddressConfig, error) {
return sm.AddrSel.AddressConfig, nil
}
OpenRPC Support (#5843) * main: init implement rpc.Discover RPC method This implement the basic functionality for the method over HTTP RPC. Signed-off-by: meows <b5c6@protonmail.com> * main,go.mod,go.sum: init example with go-openrpc-reflect lib Signed-off-by: meows <b5c6@protonmail.com> Conflicts: go.mod go.sum * main: make variable name human-friendly Signed-off-by: meows <b5c6@protonmail.com> * main,go.mod,go.sum: init impl of go-openrp-reflect printing document Signed-off-by: meows <b5c6@protonmail.com> Conflicts: go.mod go.sum * go.mod,go.sum: use go-openrpc-reflect and open-rpc/meta-schema hackforks This is for development only. Versions need to be bumped when they're ready for use as canonical remotes. Signed-off-by: meows <b5c6@protonmail.com> * main,openrpc,main: refactor openrpc supporting code to own package This eliminates code duplication. Signed-off-by: meows <b5c6@protonmail.com> * main: add rpc.Discover to openrpc document Signed-off-by: meows <b5c6@protonmail.com> * openrpc: fix rpc.discover method name casing Also fixes casing stuff for the rest of Filecoin. methods. Signed-off-by: meows <b5c6@protonmail.com> * Revert "main: add rpc.Discover to openrpc document" This reverts commit 116898efb10f33e405ac74acb1aa6daefcd46a62. * main: fix document creation method name This fixes an issue caused with the latest reverting commit. Signed-off-by: meows <b5c6@protonmail.com> * main,docgen,openrpc: refactor to share api parsing, etc as docgen exported stuff Signed-off-by: meows <b5c6@protonmail.com> Makefile: fix docgen refactoring for makefile use of command Signed-off-by: meows <b5c6@protonmail.com> * openrpc: add schema.examples to app reflector There are quite of few of these already registered for the docgen command, so it makes sense to use those! Signed-off-by: meows <b5c6@protonmail.com> * openrpc: init method pairing examples Signed-off-by: meows <b5c6@protonmail.com> * go.mod,go.sum: bump go.mod to use latest meta-schema and openrpc-reflect versions Signed-off-by: meows <b5c6@protonmail.com> * openrpc: init SchemaType mapper function This function will handle the manual configurations for app-specific data types w/r/t their json schema representation. This is useful for cases where the reflect library is unable to provide a sufficient representation automatically. Provided in this commit is an initial implementation for the integerD type (assuming number are represented in the API as hexs), and a commonly used cid.Cid type. Signed-off-by: meows <b5c6@protonmail.com> * go.mod,go.sum: tame dependencies by bumping etclabscore/go-openrpc-reflect This removes a problematic dependency on github.com/ethereum/go-ethereum, which was imported as a dependency for a couple github.com/etclabscore/go-openrpc-reflect tests. etclabscore/go-openrpc-reflect v0.0.36 has removed this dependency, so this commit is the result of bumping that version and then running 'go mod tidy' This is in response to a review at https://github.com/filecoin-project/lotus/pull/4711#pullrequestreview-535686205 Date: 2020-11-21 06:52:48-06:00 Signed-off-by: meows <b5c6@protonmail.com> * main: add 'miner' arg to openrpc gen cmd This allows the command to EITHER generate the doc for Full or Miner APIs. See comment for usage. Date: 2020-11-21 07:48:05-06:00 Signed-off-by: meows <b5c6@protonmail.com> * docgen: add missing examples for Miner API Generating the Miner API OpenRPC doc (via 'go run ./api/openrpc/cmd miner') caused the example logic to panic because some types were missing. This commit adds those missing types, although I'm not an expert in the API so I can't suggest that the example values provided are ideal or well representative. Date: 2020-11-21 07:50:21-06:00 Signed-off-by: meows <b5c6@protonmail.com> * build/openrpc/full.json,build/openrpc/miner.json: add build/openrpc/[full/miner].json docs These will be used as static documents provided by the rpc.discover method. Date: 2020-11-21 07:51:39-06:00 Signed-off-by: meows <b5c6@protonmail.com> * build: init go-rice openrpc static assets Date: 2020-11-21 08:23:06-06:00 Signed-off-by: meows <b5c6@protonmail.com> * main: remove rpc.discover implementation from runtime plugin Instead of generating the doc on the fly, we're going to serve a static asset. Rel https://github.com/filecoin-project/lotus/pull/4711#pullrequestreview-535686205 This removes the runtime implementation from the RPC server construction. Date: 2020-11-21 08:41:20-06:00 Signed-off-by: meows <b5c6@protonmail.com> * api,apistruct,common: add Discover(ctx) method to CommonAPI interface and structs Date: 2020-11-21 08:41:56-06:00 Signed-off-by: meows <b5c6@protonmail.com> * main: use rpc server method aliasing for rpc.discover This depends on a currently-forked change at filecoin-project/go-jsonrpc 8350f9463ee451b187d35c492e32f1b999e80210 which establishes this new method RPCServer.AliasMethod. This solves the problem that the OpenRPC spec says that the document should be served at the system extension-prefixed endpoing rpc.discover (not Filecoin.Discover). In fact, the document will be available at BOTH endpoints, but that duplicity is harmless. Date: 2020-11-21 09:18:26-06:00 Signed-off-by: meows <b5c6@protonmail.com> * api,apistruct,build,common: rpc.discover: return json object instead of string Instead of casting the JSON asset from bytes to string, unmarshal it to a map[string]interface{} so the server will provide it as a JSON object. Date: 2020-11-21 09:27:11-06:00 Signed-off-by: meows <b5c6@protonmail.com> * Makefile: merge resolve: docsgen command path Date: 2020-11-22 07:19:36-06:00 Signed-off-by: meows <b5c6@protonmail.com> * apistruct,main,docgen,openrpc: merge resolve: fix func exporteds, signatures Date: 2020-11-22 07:31:03-06:00 Signed-off-by: meows <b5c6@protonmail.com> * go.mod,go.sum: 'get get' auto-bumps version Date: 2020-11-22 07:31:44-06:00 Signed-off-by: meows <b5c6@protonmail.com> * Makefile,docgen,main,build/openrpc: refactor openrpc documentation generation This creates Makefile command docsgen-openrpc-json, and refactors the docsgen command to generate both the markdown and openrpc json documents, redirecting the output of the openrpc json documentation to the build/openrpc/ directory, where those json files will be compiled as static assets via go-rice boxes. The api/openrpc/cmd now uses usage argumentation congruent to that of the docgen command (switching on API context). Date: 2020-11-22 08:01:18-06:00 Signed-off-by: meows <b5c6@protonmail.com> * main,docgen_openrpc: rename api/openrpc -> api/docgen-openrpc Renames the package as well. This is intended to parallel the existing docgen package and command namespacing. Date: 2020-11-22 10:34:46-06:00 Signed-off-by: meows <b5c6@protonmail.com> * api,apistruct,docgen,build,build/openrpc: use typed Discover response Instead of using a map[string]interface{}, use a typed response for the Discover method implementation. This avoids having to set a docgen Example for the generic map[string]interface{} (as an openrpc document) which both pollutes the generic type and lacks useful information for the Discover method example. Date: 2020-11-22 08:31:16-06:00 Signed-off-by: meows <b5c6@protonmail.com> * apistruct,build,main,impl: implement Discover method for Worker and StorageMiner APIs Methods return static compiled assets respective to the APIs. Date: 2020-11-22 08:57:18-06:00 Signed-off-by: meows <b5c6@protonmail.com> * docgen_openrpc,build/openrpc: remove timestamping from openrpc doc info This should allow openrpc docs generated at different times to be equal. This is important because the CI (Circle) runs the docgen command and tests that the output and the source are unchanged (via git diff). Date: 2020-11-22 10:47:07-06:00 Signed-off-by: meows <b5c6@protonmail.com> * main,docgen_openrpc,main,build: fix lint issues Fixes goimports, staticcheck, golint issues. Date: 2020-11-22 11:06:46-06:00 Signed-off-by: meows <b5c6@protonmail.com> * docgenopenrpc: fix: don't use an underscore in package name (golint) Date: 2020-11-22 11:07:53-06:00 Signed-off-by: meows <b5c6@protonmail.com> * go.sum: fix: mod-tidy-check (run 'go mod tidy') Date: 2020-11-22 11:09:48-06:00 Signed-off-by: meows <b5c6@protonmail.com> * go.mod,go.sum: bump filecoin-project/go-jsonrpc dep to latest This version includes the necessary RPCServer.AliasMethod method. Date: 2020-11-23 12:16:15-06:00 Signed-off-by: meows <b5c6@protonmail.com> * Makefile,main,build,build/openrpc: init gzipped openrpc static docs Date: 2020-11-24 06:15:06-06:00 Signed-off-by: meows <b5c6@protonmail.com> * build: refactor gzip reading Date: 2020-11-24 06:18:34-06:00 Signed-off-by: meows <b5c6@protonmail.com> * build: add basic test for openrpc doc from static assets Date: 2020-11-24 06:30:23-06:00 Signed-off-by: meows <b5c6@protonmail.com> * build: handle reader Close error This keeps the errcheck linter happy. Date: 2020-11-24 06:33:14-06:00 Signed-off-by: meows <b5c6@protonmail.com> * go.sum: run 'go mod tidy' Date: 2020-11-24 06:36:07-06:00 Signed-off-by: meows <b5c6@protonmail.com> * go.mod,go.sum: go mod tidy Tidying up after resolving the merge conflicts with master at go.mod Date: 2020-11-24 06:40:45-06:00 Signed-off-by: meows <b5c6@protonmail.com> * go.mod,go.sum: bump filecoin-project/go-jsonrpc to latest This is a repeat of 76e6fd2, since the latest merge to master seems to have reverted this. Date: 2020-11-24 06:42:30-06:00 Signed-off-by: meows <b5c6@protonmail.com> * docgenopenrpc,build/openrpc: remove method example pairings, improve schema examples Removing method example pairings since they were redundant to schema examples and were not implemented well. Improved schema examples by using the ExampleValue method instead of the map lookup. Made a note in the comment here that this is not ideal, since we have to make a shortcut assumption /workaround by using 'unknown' as the method name and the typea as its own parent. Luckily these values aren't heavily used by the method logic. Date: 2020-11-27 12:57:36-06:00 Signed-off-by: meows <b5c6@protonmail.com> * docgenopenrpc: use generic number jsonschema for number types Previously used an integer schema assuming hex encoding. It appears, based on review some of the examples, that this may not be the case. Obvioussly this schema could be more descriptive, but just shooting for mostly likely to be not wrong at this point. Date: 2020-12-15 14:44:37-06:00 Signed-off-by: meows <b5c6@protonmail.com> * cmd/lotus,go.mod,go.sum: maybe fix straggling merge resolution conflicts Date: 2021-01-19 12:30:42-06:00 Signed-off-by: meows <b5c6@protonmail.com> * build/openrpc/full.json.gz,build/openrpc/miner.json.gz,build/openrpc/worker.json.gz: run 'make docsgen' Date: 2021-01-19 12:33:55-06:00 Signed-off-by: meows <b5c6@protonmail.com> * api/apistruct,node/impl: (lint) gofmt Date: 2021-01-19 12:39:48-06:00 Signed-off-by: meows <b5c6@protonmail.com> * api/docgen: maybe fix parse error: open ./api: no such file or directory Date: 2021-01-19 12:52:04-06:00 Signed-off-by: meows <b5c6@protonmail.com> * api/docgen,build/openrpc: maybe fix no such file error and run 'make docsgen' Date: 2021-01-19 12:55:52-06:00 Signed-off-by: meows <b5c6@protonmail.com> * api/docgen: return if AST comment/groupdoc parsing encounters any error This will returns empty comments/docs maps. This should fix issues like: https://app.circleci.com/pipelines/github/filecoin-project/lotus/12445/workflows/4ebadce9-a298-4ad1-939b-f19ef4c0a5bf/jobs/107218 where the environment makes file lookups hard or impossible. Date: 2021-01-19 13:04:58-06:00 Signed-off-by: meows <b5c6@protonmail.com> * api: Don't depend on build/ * make: support parallel docsgen * openrpc gen: Use simple build version * methodgen * goimports Co-authored-by: meows <b5c6@protonmail.com>
2021-03-19 18:22:46 +00:00
func (sm *StorageMinerAPI) Discover(ctx context.Context) (apitypes.OpenRPCDocument, error) {
return build.OpenRPCDiscoverJSON_Miner(), nil
}
func (sm *StorageMinerAPI) ComputeProof(ctx context.Context, ssi []builtin.ExtendedSectorInfo, rand abi.PoStRandomness, poStEpoch abi.ChainEpoch, nv network.Version) ([]builtin.PoStProof, error) {
return sm.Epp.ComputeProof(ctx, ssi, rand, poStEpoch, nv)
2021-03-26 05:32:03 +00:00
}
func (sm *StorageMinerAPI) RecoverFault(ctx context.Context, sectors []abi.SectorNumber) ([]cid.Cid, error) {
allsectors, err := sm.Miner.ListSectors()
if err != nil {
return nil, xerrors.Errorf("could not get a list of all sectors from the miner: %w", err)
}
var found bool
for _, v := range sectors {
found = false
for _, s := range allsectors {
if v == s.SectorNumber {
found = true
break
}
}
if !found {
return nil, xerrors.Errorf("sectors %d not found in the sector list for miner", v)
}
}
return sm.WdPoSt.ManualFaultRecovery(ctx, sm.Miner.Address(), sectors)
}
func (sm *StorageMinerAPI) RuntimeSubsystems(context.Context) (res api.MinerSubsystems, err error) {
2021-07-28 18:51:45 +00:00
return sm.EnabledSubsystems, nil
}
2022-07-29 14:39:49 +00:00
2022-08-02 07:34:54 +00:00
func (sm *StorageMinerAPI) ActorWithdrawBalance(ctx context.Context, amount abi.TokenAmount) (cid.Cid, error) {
return sm.withdrawBalance(ctx, amount, true)
}
func (sm *StorageMinerAPI) BeneficiaryWithdrawBalance(ctx context.Context, amount abi.TokenAmount) (cid.Cid, error) {
return sm.withdrawBalance(ctx, amount, false)
}
func (sm *StorageMinerAPI) withdrawBalance(ctx context.Context, amount abi.TokenAmount, fromOwner bool) (cid.Cid, error) {
2022-07-29 14:39:49 +00:00
available, err := sm.Full.StateMinerAvailableBalance(ctx, sm.Miner.Address(), types.EmptyTSK)
if err != nil {
return cid.Undef, xerrors.Errorf("Error getting miner balance: %w", err)
}
if amount.GreaterThan(available) {
return cid.Undef, xerrors.Errorf("can't withdraw more funds than available; requested: %s; available: %s", types.FIL(amount), types.FIL(available))
}
if amount.Equals(big.Zero()) {
amount = available
}
params, err := actors.SerializeParams(&minertypes.WithdrawBalanceParams{
2022-07-30 11:46:50 +00:00
AmountRequested: amount,
2022-07-29 14:39:49 +00:00
})
if err != nil {
return cid.Undef, err
}
mi, err := sm.Full.StateMinerInfo(ctx, sm.Miner.Address(), types.EmptyTSK)
if err != nil {
return cid.Undef, xerrors.Errorf("Error getting miner's owner address: %w", err)
}
var sender address.Address
if fromOwner {
sender = mi.Owner
} else {
sender = mi.Beneficiary
}
2022-07-29 14:39:49 +00:00
smsg, err := sm.Full.MpoolPushMessage(ctx, &types.Message{
To: sm.Miner.Address(),
From: sender,
2022-07-29 14:39:49 +00:00
Value: types.NewInt(0),
Method: builtintypes.MethodsMiner.WithdrawBalance,
Params: params,
}, nil)
if err != nil {
return cid.Undef, err
}
2022-08-01 18:11:24 +00:00
return smsg.Cid(), nil
2022-07-29 14:39:49 +00:00
}