diff --git a/.circleci/config.yml b/.circleci/config.yml index b502c8986..0c4f29c87 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -314,7 +314,7 @@ workflows: ci: jobs: - lint-changes: - args: "--new-from-rev origin/next" + args: "--new-from-rev origin/master" - mod-tidy-check - gofmt - test: diff --git a/api/api_storage.go b/api/api_storage.go index 29ae5ea2e..6d8fe384e 100644 --- a/api/api_storage.go +++ b/api/api_storage.go @@ -36,6 +36,7 @@ type StorageMiner interface { SectorsRefs(context.Context) (map[string][]SealedRef, error) SectorsUpdate(context.Context, abi.SectorNumber, SectorState) error + SectorRemove(context.Context, abi.SectorNumber) error StorageList(ctx context.Context) (map[stores.ID][]stores.Decl, error) StorageLocal(ctx context.Context) (map[stores.ID]string, error) @@ -56,6 +57,7 @@ type StorageMiner interface { DealsImportData(ctx context.Context, dealPropCid cid.Cid, file string) error DealsList(ctx context.Context) ([]storagemarket.StorageDeal, error) DealsSetAcceptingStorageDeals(context.Context, bool) error + DealsSetAcceptingRetrievalDeals(context.Context, bool) error DealsPieceCidBlocklist(context.Context) ([]cid.Cid, error) DealsSetPieceCidBlocklist(context.Context, []cid.Cid) error diff --git a/api/apistruct/struct.go b/api/apistruct/struct.go index fb11cbda2..0c0acd05e 100644 --- a/api/apistruct/struct.go +++ b/api/apistruct/struct.go @@ -206,6 +206,7 @@ type StorageMinerStruct struct { SectorsList func(context.Context) ([]abi.SectorNumber, error) `perm:"read"` SectorsRefs func(context.Context) (map[string][]api.SealedRef, error) `perm:"read"` SectorsUpdate func(context.Context, abi.SectorNumber, api.SectorState) error `perm:"write"` + SectorRemove func(context.Context, abi.SectorNumber) error `perm:"admin"` WorkerConnect func(context.Context, string) error `perm:"admin"` // TODO: worker perm WorkerStats func(context.Context) (map[uint64]storiface.WorkerStats, error) `perm:"admin"` @@ -223,11 +224,12 @@ type StorageMinerStruct struct { StorageLock func(ctx context.Context, sector abi.SectorID, read stores.SectorFileType, write stores.SectorFileType) error `perm:"admin"` StorageTryLock func(ctx context.Context, sector abi.SectorID, read stores.SectorFileType, write stores.SectorFileType) (bool, error) `perm:"admin"` - DealsImportData func(ctx context.Context, dealPropCid cid.Cid, file string) error `perm:"write"` - DealsList func(ctx context.Context) ([]storagemarket.StorageDeal, error) `perm:"read"` - DealsSetAcceptingStorageDeals func(context.Context, bool) error `perm:"admin"` - DealsPieceCidBlocklist func(context.Context) ([]cid.Cid, error) `perm:"admin"` - DealsSetPieceCidBlocklist func(context.Context, []cid.Cid) error `perm:"read"` + DealsImportData func(ctx context.Context, dealPropCid cid.Cid, file string) error `perm:"write"` + DealsList func(ctx context.Context) ([]storagemarket.StorageDeal, error) `perm:"read"` + DealsSetAcceptingStorageDeals func(context.Context, bool) error `perm:"admin"` + DealsSetAcceptingRetrievalDeals func(context.Context, bool) error `perm:"admin"` + DealsPieceCidBlocklist func(context.Context) ([]cid.Cid, error) `perm:"admin"` + DealsSetPieceCidBlocklist func(context.Context, []cid.Cid) error `perm:"read"` StorageAddLocal func(ctx context.Context, path string) error `perm:"admin"` } @@ -243,12 +245,14 @@ type WorkerStruct struct { Paths func(context.Context) ([]stores.StoragePath, error) `perm:"admin"` Info func(context.Context) (storiface.WorkerInfo, error) `perm:"admin"` - SealPreCommit1 func(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storage.PreCommit1Out, error) `perm:"admin"` - SealPreCommit2 func(context.Context, abi.SectorID, storage.PreCommit1Out) (cids storage.SectorCids, err error) `perm:"admin"` - SealCommit1 func(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (storage.Commit1Out, error) `perm:"admin"` - SealCommit2 func(context.Context, abi.SectorID, storage.Commit1Out) (storage.Proof, error) `perm:"admin"` - FinalizeSector func(context.Context, abi.SectorID) error `perm:"admin"` - MoveStorage func(ctx context.Context, sector abi.SectorID) error `perm:"admin"` + SealPreCommit1 func(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storage.PreCommit1Out, error) `perm:"admin"` + SealPreCommit2 func(context.Context, abi.SectorID, storage.PreCommit1Out) (cids storage.SectorCids, err error) `perm:"admin"` + SealCommit1 func(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (storage.Commit1Out, error) `perm:"admin"` + SealCommit2 func(context.Context, abi.SectorID, storage.Commit1Out) (storage.Proof, error) `perm:"admin"` + FinalizeSector func(context.Context, abi.SectorID, []storage.Range) error `perm:"admin"` + ReleaseUnsealed func(ctx context.Context, sector abi.SectorID, safeToFree []storage.Range) error `perm:"admin"` + Remove func(ctx context.Context, sector abi.SectorID) error `perm:"admin"` + MoveStorage func(ctx context.Context, sector abi.SectorID) error `perm:"admin"` UnsealPiece func(context.Context, abi.SectorID, storiface.UnpaddedByteIndex, abi.UnpaddedPieceSize, abi.SealRandomness, cid.Cid) error `perm:"admin"` ReadPiece func(context.Context, io.Writer, abi.SectorID, storiface.UnpaddedByteIndex, abi.UnpaddedPieceSize) error `perm:"admin"` @@ -786,6 +790,10 @@ func (c *StorageMinerStruct) SectorsUpdate(ctx context.Context, id abi.SectorNum return c.Internal.SectorsUpdate(ctx, id, state) } +func (c *StorageMinerStruct) SectorRemove(ctx context.Context, number abi.SectorNumber) error { + return c.Internal.SectorRemove(ctx, number) +} + func (c *StorageMinerStruct) WorkerConnect(ctx context.Context, url string) error { return c.Internal.WorkerConnect(ctx, url) } @@ -874,6 +882,10 @@ func (c *StorageMinerStruct) DealsSetAcceptingStorageDeals(ctx context.Context, return c.Internal.DealsSetAcceptingStorageDeals(ctx, b) } +func (c *StorageMinerStruct) DealsSetAcceptingRetrievalDeals(ctx context.Context, b bool) error { + return c.Internal.DealsSetAcceptingRetrievalDeals(ctx, b) +} + func (c *StorageMinerStruct) DealsPieceCidBlocklist(ctx context.Context) ([]cid.Cid, error) { return c.Internal.DealsPieceCidBlocklist(ctx) } @@ -920,8 +932,16 @@ func (w *WorkerStruct) SealCommit2(ctx context.Context, sector abi.SectorID, c1o return w.Internal.SealCommit2(ctx, sector, c1o) } -func (w *WorkerStruct) FinalizeSector(ctx context.Context, sector abi.SectorID) error { - return w.Internal.FinalizeSector(ctx, sector) +func (w *WorkerStruct) FinalizeSector(ctx context.Context, sector abi.SectorID, keepUnsealed []storage.Range) error { + return w.Internal.FinalizeSector(ctx, sector, keepUnsealed) +} + +func (w *WorkerStruct) ReleaseUnsealed(ctx context.Context, sector abi.SectorID, safeToFree []storage.Range) error { + return w.Internal.ReleaseUnsealed(ctx, sector, safeToFree) +} + +func (w *WorkerStruct) Remove(ctx context.Context, sector abi.SectorID) error { + return w.Internal.Remove(ctx, sector) } func (w *WorkerStruct) MoveStorage(ctx context.Context, sector abi.SectorID) error { diff --git a/api/test/deals.go b/api/test/deals.go index 0150a1315..22152d7ab 100644 --- a/api/test/deals.go +++ b/api/test/deals.go @@ -8,6 +8,7 @@ import ( "math/rand" "os" "path/filepath" + "sync/atomic" "testing" "time" @@ -52,11 +53,11 @@ func TestDealFlow(t *testing.T, b APIBuilder, blocktime time.Duration, carExport } time.Sleep(time.Second) - mine := true + mine := int64(1) done := make(chan struct{}) go func() { defer close(done) - for mine { + for atomic.LoadInt64(&mine) == 1 { time.Sleep(blocktime) if err := sn[0].MineOne(ctx, func(bool) {}); err != nil { t.Error(err) @@ -66,7 +67,7 @@ func TestDealFlow(t *testing.T, b APIBuilder, blocktime time.Duration, carExport makeDeal(t, ctx, 6, client, miner, carExport) - mine = false + atomic.AddInt64(&mine, -1) fmt.Println("shutting down mining") <-done } @@ -89,12 +90,12 @@ func TestDoubleDealFlow(t *testing.T, b APIBuilder, blocktime time.Duration) { } time.Sleep(time.Second) - mine := true + mine := int64(1) done := make(chan struct{}) go func() { defer close(done) - for mine { + for atomic.LoadInt64(&mine) == 1 { time.Sleep(blocktime) if err := sn[0].MineOne(ctx, func(bool) {}); err != nil { t.Error(err) @@ -105,7 +106,7 @@ func TestDoubleDealFlow(t *testing.T, b APIBuilder, blocktime time.Duration) { makeDeal(t, ctx, 6, client, miner, false) makeDeal(t, ctx, 7, client, miner, false) - mine = false + atomic.AddInt64(&mine, -1) fmt.Println("shutting down mining") <-done } diff --git a/api/test/mining.go b/api/test/mining.go index 0402b81f8..d9b16fd1f 100644 --- a/api/test/mining.go +++ b/api/test/mining.go @@ -126,6 +126,7 @@ func TestDealMining(t *testing.T, b APIBuilder, blocktime time.Duration, carExpo minedTwo := make(chan struct{}) go func() { + doneMinedTwo := false defer close(done) prevExpect := 0 @@ -179,9 +180,9 @@ func TestDealMining(t *testing.T, b APIBuilder, blocktime time.Duration, carExpo time.Sleep(blocktime) } - if prevExpect == 2 && expect == 2 && minedTwo != nil { + if prevExpect == 2 && expect == 2 && !doneMinedTwo { close(minedTwo) - minedTwo = nil + doneMinedTwo = true } prevExpect = expect diff --git a/build/params_shared.go b/build/params_shared.go index 4da70aaeb..5ab07bd3f 100644 --- a/build/params_shared.go +++ b/build/params_shared.go @@ -121,4 +121,11 @@ const VerifSigCacheSize = 32000 const BlockMessageLimit = 512 const BlockGasLimit = 100_000_000_000 -var DrandChain = `{"public_key":"922a2e93828ff83345bae533f5172669a26c02dc76d6bf59c80892e12ab1455c229211886f35bb56af6d5bea981024df","period":25,"genesis_time":1590445175,"hash":"138a324aa6540f93d0dad002aa89454b1bec2b6e948682cde6bd4db40f4b7c9b"}` +var DrandConfig = dtypes.DrandConfig{ + Servers: []string{ + "https://pl-eu.testnet.drand.sh", + "https://pl-us.testnet.drand.sh", + "https://pl-sin.testnet.drand.sh", + }, + ChainInfoJSON: `{"public_key":"922a2e93828ff83345bae533f5172669a26c02dc76d6bf59c80892e12ab1455c229211886f35bb56af6d5bea981024df","period":25,"genesis_time":1590445175,"hash":"138a324aa6540f93d0dad002aa89454b1bec2b6e948682cde6bd4db40f4b7c9b"}`, +} diff --git a/build/version.go b/build/version.go index d5766ce6e..03d5c0792 100644 --- a/build/version.go +++ b/build/version.go @@ -53,7 +53,7 @@ func (ve Version) EqMajorMinor(v2 Version) bool { } // APIVersion is a semver version of the rpc api exposed -var APIVersion Version = newVer(0, 3, 0) +var APIVersion Version = newVer(0, 4, 0) //nolint:varcheck,deadcode const ( diff --git a/chain/beacon/beacon.go b/chain/beacon/beacon.go index 34405f3c8..2be2e7f1c 100644 --- a/chain/beacon/beacon.go +++ b/chain/beacon/beacon.go @@ -17,6 +17,10 @@ type Response struct { Err error } +// RandomBeacon represents a system that provides randomness to Lotus. +// Other components interrogate the RandomBeacon to acquire randomness that's +// valid for a specific chain epoch. Also to verify beacon entries that have +// been posted on chain. type RandomBeacon interface { Entry(context.Context, uint64) <-chan Response VerifyEntry(types.BeaconEntry, types.BeaconEntry) error diff --git a/chain/beacon/drand/drand.go b/chain/beacon/drand/drand.go index 21bd2501a..00ff05f81 100644 --- a/chain/beacon/drand/drand.go +++ b/chain/beacon/drand/drand.go @@ -19,31 +19,15 @@ import ( logging "github.com/ipfs/go-log" pubsub "github.com/libp2p/go-libp2p-pubsub" - "github.com/filecoin-project/lotus/build" + "github.com/filecoin-project/specs-actors/actors/abi" + "github.com/filecoin-project/lotus/chain/beacon" "github.com/filecoin-project/lotus/chain/types" - "github.com/filecoin-project/specs-actors/actors/abi" + "github.com/filecoin-project/lotus/node/modules/dtypes" ) var log = logging.Logger("drand") -var drandServers = []string{ - "https://pl-eu.testnet.drand.sh", - "https://pl-us.testnet.drand.sh", - "https://pl-sin.testnet.drand.sh", -} - -var drandChain *dchain.Info - -func init() { - - var err error - drandChain, err = dchain.InfoFromJSON(bytes.NewReader([]byte(build.DrandChain))) - if err != nil { - panic("could not unmarshal chain info: " + err.Error()) - } -} - type drandPeer struct { addr string tls bool @@ -57,6 +41,13 @@ func (dp *drandPeer) IsTLS() bool { return dp.tls } +// DrandBeacon connects Lotus with a drand network in order to provide +// randomness to the system in a way that's aligned with Filecoin rounds/epochs. +// +// We connect to drand peers via their public HTTP endpoints. The peers are +// enumerated in the drandServers variable. +// +// The root trust for the Drand chain is configured from build.DrandChain. type DrandBeacon struct { client dclient.Client @@ -73,16 +64,21 @@ type DrandBeacon struct { localCache map[uint64]types.BeaconEntry } -func NewDrandBeacon(genesisTs, interval uint64, ps *pubsub.PubSub) (*DrandBeacon, error) { +func NewDrandBeacon(genesisTs, interval uint64, ps *pubsub.PubSub, config dtypes.DrandConfig) (*DrandBeacon, error) { if genesisTs == 0 { panic("what are you doing this cant be zero") } + drandChain, err := dchain.InfoFromJSON(bytes.NewReader([]byte(config.ChainInfoJSON))) + if err != nil { + return nil, xerrors.Errorf("unable to unmarshal drand chain info: %w", err) + } + dlogger := dlog.NewKitLoggerFrom(kzap.NewZapSugarLogger( log.SugaredLogger.Desugar(), zapcore.InfoLevel)) var clients []dclient.Client - for _, url := range drandServers { + for _, url := range config.Servers { hc, err := hclient.NewWithInfo(url, drandChain, nil) if err != nil { return nil, xerrors.Errorf("could not create http drand client: %w", err) diff --git a/chain/beacon/drand/drand_test.go b/chain/beacon/drand/drand_test.go index f35f3e4cc..d7d9c4d18 100644 --- a/chain/beacon/drand/drand_test.go +++ b/chain/beacon/drand/drand_test.go @@ -7,10 +7,13 @@ import ( dchain "github.com/drand/drand/chain" hclient "github.com/drand/drand/client/http" "github.com/stretchr/testify/assert" + + "github.com/filecoin-project/lotus/build" ) func TestPrintGroupInfo(t *testing.T) { - c, err := hclient.New(drandServers[0], nil, nil) + server := build.DrandConfig.Servers[0] + c, err := hclient.New(server, nil, nil) assert.NoError(t, err) cg := c.(interface { FetchChainInfo(groupHash []byte) (*dchain.Info, error) diff --git a/chain/blocksync/blocksync.go b/chain/blocksync/blocksync.go index daca9ce20..a9251c419 100644 --- a/chain/blocksync/blocksync.go +++ b/chain/blocksync/blocksync.go @@ -10,6 +10,7 @@ import ( "golang.org/x/xerrors" cborutil "github.com/filecoin-project/go-cbor-util" + "github.com/filecoin-project/lotus/chain/store" "github.com/filecoin-project/lotus/chain/types" @@ -27,6 +28,24 @@ const BlockSyncProtocolID = "/fil/sync/blk/0.0.1" const BlockSyncMaxRequestLength = 800 +// BlockSyncService is the component that services BlockSync requests from +// peers. +// +// BlockSync is the basic chain synchronization protocol of Filecoin. BlockSync +// is an RPC-oriented protocol, with a single operation to request blocks. +// +// A request contains a start anchor block (referred to with a CID), and a +// amount of blocks requested beyond the anchor (including the anchor itself). +// +// A client can also pass options, encoded as a 64-bit bitfield. Lotus supports +// two options at the moment: +// +// - include block contents +// - include block messages +// +// The response will include a status code, an optional message, and the +// response payload in case of success. The payload is a slice of serialized +// tipsets. type BlockSyncService struct { cs *store.ChainStore } diff --git a/chain/blocksync/blocksync_client.go b/chain/blocksync/blocksync_client.go index 129e8d332..daa4b6335 100644 --- a/chain/blocksync/blocksync_client.go +++ b/chain/blocksync/blocksync_client.go @@ -64,6 +64,11 @@ func (bs *BlockSync) processStatus(req *BlockSyncRequest, res *BlockSyncResponse } } +// GetBlocks fetches count blocks from the network, from the provided tipset +// *backwards*, returning as many tipsets as count. +// +// {hint/usage}: This is used by the Syncer during normal chain syncing and when +// resolving forks. func (bs *BlockSync) GetBlocks(ctx context.Context, tsk types.TipSetKey, count int) ([]*types.TipSet, error) { ctx, span := trace.StartSpan(ctx, "bsync.GetBlocks") defer span.End() @@ -80,7 +85,9 @@ func (bs *BlockSync) GetBlocks(ctx context.Context, tsk types.TipSetKey, count i Options: BSOptBlocks, } + // this peerset is sorted by latency and failure counting. peers := bs.getPeers() + // randomize the first few peers so we don't always pick the same peer shufflePrefix(peers) @@ -356,6 +363,7 @@ func (bs *BlockSync) RemovePeer(p peer.ID) { bs.syncPeers.removePeer(p) } +// getPeers returns a preference-sorted set of peers to query. func (bs *BlockSync) getPeers() []peer.ID { return bs.syncPeers.prefSortedPeers() } diff --git a/chain/events/events_called.go b/chain/events/events_called.go index 04e7be715..0bae99404 100644 --- a/chain/events/events_called.go +++ b/chain/events/events_called.go @@ -84,6 +84,9 @@ type calledEvents struct { } func (e *calledEvents) headChangeCalled(rev, app []*types.TipSet) error { + e.lk.Lock() + defer e.lk.Unlock() + for _, ts := range rev { e.handleReverts(ts) e.at = ts.Height() @@ -134,7 +137,6 @@ func (e *calledEvents) checkNewCalls(ts *types.TipSet) { e.messagesForTs(pts, func(msg *types.Message) { // TODO: provide receipts - for tid, matchFns := range e.matchers { var matched bool for _, matchFn := range matchFns { diff --git a/chain/events/events_height.go b/chain/events/events_height.go index cbf756c20..fc94d6262 100644 --- a/chain/events/events_height.go +++ b/chain/events/events_height.go @@ -26,12 +26,15 @@ type heightEvents struct { } func (e *heightEvents) headChangeAt(rev, app []*types.TipSet) error { + ctx, span := trace.StartSpan(e.ctx, "events.HeightHeadChange") defer span.End() span.AddAttributes(trace.Int64Attribute("endHeight", int64(app[0].Height()))) span.AddAttributes(trace.Int64Attribute("reverts", int64(len(rev)))) span.AddAttributes(trace.Int64Attribute("applies", int64(len(app)))) + e.lk.Lock() + defer e.lk.Unlock() for _, ts := range rev { // TODO: log error if h below gcconfidence // revert height-based triggers @@ -40,7 +43,10 @@ func (e *heightEvents) headChangeAt(rev, app []*types.TipSet) error { for _, tid := range e.htHeights[h] { ctx, span := trace.StartSpan(ctx, "events.HeightRevert") - err := e.heightTriggers[tid].revert(ctx, ts) + rev := e.heightTriggers[tid].revert + e.lk.Unlock() + err := rev(ctx, ts) + e.lk.Lock() e.heightTriggers[tid].called = false span.End() @@ -98,8 +104,10 @@ func (e *heightEvents) headChangeAt(rev, app []*types.TipSet) error { ctx, span := trace.StartSpan(ctx, "events.HeightApply") span.AddAttributes(trace.BoolAttribute("immediate", false)) - - err = hnd.handle(ctx, incTs, h) + handle := hnd.handle + e.lk.Unlock() + err = handle(ctx, incTs, h) + e.lk.Lock() span.End() if err != nil { diff --git a/chain/store/fts.go b/chain/store/fts.go index f9ec4459e..0324938d7 100644 --- a/chain/store/fts.go +++ b/chain/store/fts.go @@ -32,8 +32,11 @@ func (fts *FullTipSet) Cids() []cid.Cid { return cids } +// TipSet returns a narrower view of this FullTipSet elliding the block +// messages. func (fts *FullTipSet) TipSet() *types.TipSet { if fts.tipset != nil { + // FIXME: fts.tipset is actually never set. Should it memoize? return fts.tipset } diff --git a/chain/store/index.go b/chain/store/index.go index bb363ec18..7edbf251f 100644 --- a/chain/store/index.go +++ b/chain/store/index.go @@ -34,7 +34,7 @@ type lbEntry struct { target types.TipSetKey } -func (ci *ChainIndex) GetTipsetByHeight(ctx context.Context, from *types.TipSet, to abi.ChainEpoch) (*types.TipSet, error) { +func (ci *ChainIndex) GetTipsetByHeight(_ context.Context, from *types.TipSet, to abi.ChainEpoch) (*types.TipSet, error) { if from.Height()-to <= ci.skipLength { return ci.walkBack(from, to) } diff --git a/chain/store/store.go b/chain/store/store.go index 0edccb95c..4dabb96f7 100644 --- a/chain/store/store.go +++ b/chain/store/store.go @@ -52,6 +52,15 @@ var blockValidationCacheKeyPrefix = dstore.NewKey("blockValidation") // ReorgNotifee represents a callback that gets called upon reorgs. type ReorgNotifee func(rev, app []*types.TipSet) error +// ChainStore is the main point of access to chain data. +// +// Raw chain data is stored in the Blockstore, with relevant markers (genesis, +// latest head tipset references) being tracked in the Datastore (key-value +// store). +// +// To alleviate disk access, the ChainStore has two ARC caches: +// 1. a tipset cache +// 2. a block => messages references cache. type ChainStore struct { bs bstore.Blockstore ds dstore.Datastore @@ -266,6 +275,9 @@ func (cs *ChainStore) PutTipSet(ctx context.Context, ts *types.TipSet) error { return nil } +// MaybeTakeHeavierTipSet evaluates the incoming tipset and locks it in our +// internal state as our new head, if and only if it is heavier than the current +// head. func (cs *ChainStore) MaybeTakeHeavierTipSet(ctx context.Context, ts *types.TipSet) error { cs.heaviestLk.Lock() defer cs.heaviestLk.Unlock() @@ -331,6 +343,9 @@ func (cs *ChainStore) reorgWorker(ctx context.Context, initialNotifees []ReorgNo return out } +// takeHeaviestTipSet actually sets the incoming tipset as our head both in +// memory and in the ChainStore. It also sends a notification to deliver to +// ReorgNotifees. func (cs *ChainStore) takeHeaviestTipSet(ctx context.Context, ts *types.TipSet) error { _, span := trace.StartSpan(ctx, "takeHeaviestTipSet") defer span.End() @@ -368,6 +383,7 @@ func (cs *ChainStore) SetHead(ts *types.TipSet) error { return cs.takeHeaviestTipSet(context.TODO(), ts) } +// Contains returns whether our BlockStore has all blocks in the supplied TipSet. func (cs *ChainStore) Contains(ts *types.TipSet) (bool, error) { for _, c := range ts.Cids() { has, err := cs.bs.Has(c) @@ -382,6 +398,8 @@ func (cs *ChainStore) Contains(ts *types.TipSet) (bool, error) { return true, nil } +// GetBlock fetches a BlockHeader with the supplied CID. It returns +// blockstore.ErrNotFound if the block was not found in the BlockStore. func (cs *ChainStore) GetBlock(c cid.Cid) (*types.BlockHeader, error) { sb, err := cs.bs.Get(c) if err != nil { @@ -474,6 +492,7 @@ func (cs *ChainStore) ReorgOps(a, b *types.TipSet) ([]*types.TipSet, []*types.Ti return leftChain, rightChain, nil } +// GetHeaviestTipSet returns the current heaviest tipset known (i.e. our head). func (cs *ChainStore) GetHeaviestTipSet() *types.TipSet { cs.heaviestLk.Lock() defer cs.heaviestLk.Unlock() diff --git a/chain/sync.go b/chain/sync.go index b190dc449..defe34a7b 100644 --- a/chain/sync.go +++ b/chain/sync.go @@ -53,6 +53,29 @@ var log = logging.Logger("chain") var LocalIncoming = "incoming" +// Syncer is in charge of running the chain synchronization logic. As such, it +// is tasked with these functions, amongst others: +// +// * Fast-forwards the chain as it learns of new TipSets from the network via +// the SyncManager. +// * Applies the fork choice rule to select the correct side when confronted +// with a fork in the network. +// * Requests block headers and messages from other peers when not available +// in our BlockStore. +// * Tracks blocks marked as bad in a cache. +// * Keeps the BlockStore and ChainStore consistent with our view of the world, +// the latter of which in turn informs other components when a reorg has been +// committed. +// +// The Syncer does not run workers itself. It's mainly concerned with +// ensuring a consistent state of chain consensus. The reactive and network- +// interfacing processes are part of other components, such as the SyncManager +// (which owns the sync scheduler and sync workers), BlockSync, the HELLO +// protocol, and the gossipsub block propagation layer. +// +// {hint/concept} The fork-choice rule as it currently stands is: "pick the +// chain with the heaviest weight, so long as it hasn’t deviated one finality +// threshold from our head (900 epochs, parameter determined by spec-actors)". type Syncer struct { // The interface for accessing and putting tipsets into local storage store *store.ChainStore @@ -85,6 +108,7 @@ type Syncer struct { verifier ffiwrapper.Verifier } +// NewSyncer creates a new Syncer object. func NewSyncer(sm *stmgr.StateManager, bsync *blocksync.BlockSync, connmgr connmgr.ConnManager, self peer.ID, beacon beacon.RandomBeacon, verifier ffiwrapper.Verifier) (*Syncer, error) { gen, err := sm.ChainStore().GetGenesis() if err != nil { @@ -182,6 +206,11 @@ func (syncer *Syncer) InformNewHead(from peer.ID, fts *store.FullTipSet) bool { return true } +// IncomingBlocks spawns a goroutine that subscribes to the local eventbus to +// receive new block headers as they arrive from the network, and sends them to +// the returned channel. +// +// These blocks have not necessarily been incorporated to our view of the chain. func (syncer *Syncer) IncomingBlocks(ctx context.Context) (<-chan *types.BlockHeader, error) { sub := syncer.incoming.Sub(LocalIncoming) out := make(chan *types.BlockHeader, 10) @@ -209,11 +238,15 @@ func (syncer *Syncer) IncomingBlocks(ctx context.Context) (<-chan *types.BlockHe return out, nil } +// ValidateMsgMeta performs structural and content hash validation of the +// messages within this block. If validation passes, it stores the messages in +// the underlying IPLD block store. func (syncer *Syncer) ValidateMsgMeta(fblk *types.FullBlock) error { if msgc := len(fblk.BlsMessages) + len(fblk.SecpkMessages); msgc > build.BlockMessageLimit { return xerrors.Errorf("block %s has too many messages (%d)", fblk.Header.Cid(), msgc) } + // Collect the CIDs of both types of messages separately: BLS and Secpk. var bcids, scids []cbg.CBORMarshaler for _, m := range fblk.BlsMessages { c := cbg.CborCid(m.Cid()) @@ -231,11 +264,14 @@ func (syncer *Syncer) ValidateMsgMeta(fblk *types.FullBlock) error { blockstore := syncer.store.Blockstore() bs := cbor.NewCborStore(blockstore) + + // Compute the root CID of the combined message trie. smroot, err := computeMsgMeta(bs, bcids, scids) if err != nil { return xerrors.Errorf("validating msgmeta, compute failed: %w", err) } + // Check that the message trie root matches with what's in the block. if fblk.Header.Messages != smroot { return xerrors.Errorf("messages in full block did not match msgmeta root in header (%s != %s)", fblk.Header.Messages, smroot) } @@ -345,6 +381,8 @@ func zipTipSetAndMessages(bs cbor.IpldStore, ts *types.TipSet, allbmsgs []*types return fts, nil } +// computeMsgMeta computes the root CID of the combined arrays of message CIDs +// of both types (BLS and Secpk). func computeMsgMeta(bs cbor.IpldStore, bmsgCids, smsgCids []cbg.CBORMarshaler) (cid.Cid, error) { ctx := context.TODO() bmroot, err := amt.FromArray(ctx, bs, bmsgCids) @@ -368,14 +406,24 @@ func computeMsgMeta(bs cbor.IpldStore, bmsgCids, smsgCids []cbg.CBORMarshaler) ( return mrcid, nil } +// FetchTipSet tries to load the provided tipset from the store, and falls back +// to the network (BlockSync) by querying the supplied peer if not found +// locally. +// +// {hint/usage} This is used from the HELLO protocol, to fetch the greeting +// peer's heaviest tipset if we don't have it. func (syncer *Syncer) FetchTipSet(ctx context.Context, p peer.ID, tsk types.TipSetKey) (*store.FullTipSet, error) { if fts, err := syncer.tryLoadFullTipSet(tsk); err == nil { return fts, nil } + // fall back to the network. return syncer.Bsync.GetFullTipSet(ctx, p, tsk) } +// tryLoadFullTipSet queries the tipset in the ChainStore, and returns a full +// representation of it containing FullBlocks. If ALL blocks are not found +// locally, it errors entirely with blockstore.ErrNotFound. func (syncer *Syncer) tryLoadFullTipSet(tsk types.TipSetKey) (*store.FullTipSet, error) { ts, err := syncer.store.LoadTipSet(tsk) if err != nil { @@ -400,6 +448,12 @@ func (syncer *Syncer) tryLoadFullTipSet(tsk types.TipSetKey) (*store.FullTipSet, return fts, nil } +// Sync tries to advance our view of the chain to `maybeHead`. It does nothing +// if our current head is heavier than the requested tipset, or if we're already +// at the requested head, or if the head is the genesis. +// +// Most of the heavy-lifting logic happens in syncer#collectChain. Refer to the +// godocs on that method for a more detailed view. func (syncer *Syncer) Sync(ctx context.Context, maybeHead *types.TipSet) error { ctx, span := trace.StartSpan(ctx, "chain.Sync") defer span.End() @@ -466,16 +520,27 @@ func (syncer *Syncer) ValidateTipSet(ctx context.Context, fts *store.FullTipSet) return nil } + var futures []async.ErrorFuture for _, b := range fts.Blocks { - if err := syncer.ValidateBlock(ctx, b); err != nil { - if isPermanent(err) { - syncer.bad.Add(b.Cid(), err.Error()) - } - return xerrors.Errorf("validating block %s: %w", b.Cid(), err) - } + b := b // rebind to a scoped variable - if err := syncer.sm.ChainStore().AddToTipSetTracker(b.Header); err != nil { - return xerrors.Errorf("failed to add validated header to tipset tracker: %w", err) + futures = append(futures, async.Err(func() error { + if err := syncer.ValidateBlock(ctx, b); err != nil { + if isPermanent(err) { + syncer.bad.Add(b.Cid(), err.Error()) + } + return xerrors.Errorf("validating block %s: %w", b.Cid(), err) + } + + if err := syncer.sm.ChainStore().AddToTipSetTracker(b.Header); err != nil { + return xerrors.Errorf("failed to add validated header to tipset tracker: %w", err) + } + return nil + })) + } + for _, f := range futures { + if err := f.AwaitContext(ctx); err != nil { + return err } } return nil @@ -997,6 +1062,39 @@ func extractSyncState(ctx context.Context) *SyncerState { return nil } +// collectHeaders collects the headers from the blocks between any two tipsets. +// +// `from` is the heaviest/projected/target tipset we have learned about, and +// `to` is usually an anchor tipset we already have in our view of the chain +// (which could be the genesis). +// +// collectHeaders checks if portions of the chain are in our ChainStore; falling +// down to the network to retrieve the missing parts. If during the process, any +// portion we receive is in our denylist (bad list), we short-circuit. +// +// {hint/naming}: `from` and `to` is in inverse order. `from` is the highest, +// and `to` is the lowest. This method traverses the chain backwards. +// +// {hint/usage}: This is used by collectChain, which is in turn called from the +// main Sync method (Syncer#Sync), so it's a pretty central method. +// +// {hint/logic}: The logic of this method is as follows: +// +// 1. Check that the from tipset is not linked to a parent block known to be +// bad. +// 2. Check the consistency of beacon entries in the from tipset. We check +// total equality of the BeaconEntries in each block. +// 3. Travers the chain backwards, for each tipset: +// 3a. Load it from the chainstore; if found, it move on to its parent. +// 3b. Query our peers via BlockSync in batches, requesting up to a +// maximum of 500 tipsets every time. +// +// Once we've concluded, if we find a mismatching tipset at the height where the +// anchor tipset should be, we are facing a fork, and we invoke Syncer#syncFork +// to resolve it. Refer to the godocs there. +// +// All throughout the process, we keep checking if the received blocks are in +// the deny list, and short-circuit the process if so. func (syncer *Syncer) collectHeaders(ctx context.Context, from *types.TipSet, to *types.TipSet) ([]*types.TipSet, error) { ctx, span := trace.StartSpan(ctx, "collectHeaders") defer span.End() @@ -1013,6 +1111,8 @@ func (syncer *Syncer) collectHeaders(ctx context.Context, from *types.TipSet, to } } + // Check if the parents of the from block are in the denylist. + // i.e. if a fork of the chain has been requested that we know to be bad. for _, pcid := range from.Parents().Cids() { if reason, ok := syncer.bad.Has(pcid); ok { markBad("linked to %s", pcid) @@ -1083,8 +1183,8 @@ loop: } // NB: GetBlocks validates that the blocks are in-fact the ones we - // requested, and that they are correctly linked to eachother. It does - // not validate any state transitions + // requested, and that they are correctly linked to one another. It does + // not validate any state transitions. window := 500 if gap := int(blockSet[len(blockSet)-1].Height() - untilHeight); gap < window { window = gap @@ -1125,7 +1225,6 @@ loop: at = blks[len(blks)-1].Parents() } - // We have now ascertained that this is *not* a 'fast forward' if !types.CidArrsEqual(blockSet[len(blockSet)-1].Parents().Cids(), to.Cids()) { last := blockSet[len(blockSet)-1] if last.Parents() == to.Parents() { @@ -1133,6 +1232,8 @@ loop: return blockSet, nil } + // We have now ascertained that this is *not* a 'fast forward' + log.Warnf("(fork detected) synced header chain (%s - %d) does not link to our best block (%s - %d)", from.Cids(), from.Height(), to.Cids(), to.Height()) fork, err := syncer.syncFork(ctx, last, to) if err != nil { @@ -1154,6 +1255,12 @@ loop: var ErrForkTooLong = fmt.Errorf("fork longer than threshold") +// syncFork tries to obtain the chain fragment that links a fork into a common +// ancestor in our view of the chain. +// +// If the fork is too long (build.ForkLengthThreshold), we add the entire subchain to the +// denylist. Else, we find the common ancestor, and add the missing chain +// fragment until the fork point to the returned []TipSet. func (syncer *Syncer) syncFork(ctx context.Context, from *types.TipSet, to *types.TipSet) ([]*types.TipSet, error) { tips, err := syncer.Bsync.GetBlocks(ctx, from.Parents(), int(build.ForkLengthThreshold)) if err != nil { @@ -1305,6 +1412,25 @@ func persistMessages(bs bstore.Blockstore, bst *blocksync.BSTipSet) error { return nil } +// collectChain tries to advance our view of the chain to the purported head. +// +// It goes through various stages: +// +// 1. StageHeaders: we proceed in the sync process by requesting block headers +// from our peers, moving back from their heads, until we reach a tipset +// that we have in common (such a common tipset must exist, thought it may +// simply be the genesis block). +// +// If the common tipset is our head, we treat the sync as a "fast-forward", +// else we must drop part of our chain to connect to the peer's head +// (referred to as "forking"). +// +// 2. StagePersistHeaders: now that we've collected the missing headers, +// augmented by those on the other side of a fork, we persist them to the +// BlockStore. +// +// 3. StageMessages: having acquired the headers and found a common tipset, +// we then move forward, requesting the full blocks, including the messages. func (syncer *Syncer) collectChain(ctx context.Context, ts *types.TipSet) error { ctx, span := trace.StartSpan(ctx, "collectChain") defer span.End() @@ -1354,9 +1480,8 @@ func (syncer *Syncer) collectChain(ctx context.Context, ts *types.TipSet) error func VerifyElectionPoStVRF(ctx context.Context, worker address.Address, rand []byte, evrf []byte) error { if build.InsecurePoStValidation { return nil - } else { - return gen.VerifyVRF(ctx, worker, rand, evrf) } + return gen.VerifyVRF(ctx, worker, rand, evrf) } func (syncer *Syncer) State() []SyncerState { @@ -1367,6 +1492,7 @@ func (syncer *Syncer) State() []SyncerState { return out } +// MarkBad manually adds a block to the "bad blocks" cache. func (syncer *Syncer) MarkBad(blk cid.Cid) { syncer.bad.Add(blk, "manually marked bad") } @@ -1374,7 +1500,7 @@ func (syncer *Syncer) MarkBad(blk cid.Cid) { func (syncer *Syncer) CheckBadBlockCache(blk cid.Cid) (string, bool) { return syncer.bad.Has(blk) } -func (syncer *Syncer) getLatestBeaconEntry(ctx context.Context, ts *types.TipSet) (*types.BeaconEntry, error) { +func (syncer *Syncer) getLatestBeaconEntry(_ context.Context, ts *types.TipSet) (*types.BeaconEntry, error) { cur := ts for i := 0; i < 20; i++ { cbe := cur.Blocks()[0].BeaconEntries diff --git a/chain/types/execresult.go b/chain/types/execresult.go index 443147f9e..6fc93fac6 100644 --- a/chain/types/execresult.go +++ b/chain/types/execresult.go @@ -21,16 +21,16 @@ type ExecutionTrace struct { type GasTrace struct { Name string - Location []Loc - TotalGas int64 - ComputeGas int64 - StorageGas int64 - TotalVirtualGas int64 - VirtualComputeGas int64 - VirtualStorageGas int64 + Location []Loc `json:"loc"` + TotalGas int64 `json:"tg"` + ComputeGas int64 `json:"cg"` + StorageGas int64 `json:"sg"` + TotalVirtualGas int64 `json:"vtg"` + VirtualComputeGas int64 `json:"vcg"` + VirtualStorageGas int64 `json:"vsg"` - TimeTaken time.Duration - Extra interface{} `json:",omitempty"` + TimeTaken time.Duration `json:"tt"` + Extra interface{} `json:"ex,omitempty"` Callers []uintptr `json:"-"` } diff --git a/chain/vm/runtime.go b/chain/vm/runtime.go index d6d49c214..595664de1 100644 --- a/chain/vm/runtime.go +++ b/chain/vm/runtime.go @@ -408,7 +408,7 @@ func (rt *Runtime) internalSend(from, to address.Address, method abi.MethodNum, if subrt != nil { rt.numActorsCreated = subrt.numActorsCreated } - rt.executionTrace.Subcalls = append(rt.executionTrace.Subcalls, subrt.executionTrace) //&er) + rt.executionTrace.Subcalls = append(rt.executionTrace.Subcalls, subrt.executionTrace) return ret, errSend } diff --git a/cmd/lotus-bench/import.go b/cmd/lotus-bench/import.go index f7538daec..ebc62aa5d 100644 --- a/cmd/lotus-bench/import.go +++ b/cmd/lotus-bench/import.go @@ -149,6 +149,7 @@ var importBenchCmd = &cli.Command{ if err != nil { return err } + stripCallers(trace) lastTse = &TipSetExec{ TipSet: cur.Key(), @@ -168,6 +169,21 @@ var importBenchCmd = &cli.Command{ }, } +func walkExecutionTrace(et *types.ExecutionTrace) { + for _, gc := range et.GasCharges { + gc.Callers = nil + } + for _, sub := range et.Subcalls { + walkExecutionTrace(&sub) //nolint:scopelint,gosec + } +} + +func stripCallers(trace []*api.InvocResult) { + for _, t := range trace { + walkExecutionTrace(&t.ExecutionTrace) + } +} + type Invocation struct { TipSet types.TipSetKey Invoc *api.InvocResult diff --git a/cmd/lotus-chainwatch/storage.go b/cmd/lotus-chainwatch/storage.go index 020ca1d7f..f7f80a9c6 100644 --- a/cmd/lotus-chainwatch/storage.go +++ b/cmd/lotus-chainwatch/storage.go @@ -118,7 +118,7 @@ create unique index if not exists block_cid_uindex create materialized view if not exists state_heights as select distinct height, parentstateroot from blocks; -create unique index if not exists state_heights_uindex +create index if not exists state_heights_index on state_heights (height); create index if not exists state_heights_height_index diff --git a/cmd/lotus-chainwatch/sync.go b/cmd/lotus-chainwatch/sync.go index d42a72b9b..88afb647e 100644 --- a/cmd/lotus-chainwatch/sync.go +++ b/cmd/lotus-chainwatch/sync.go @@ -53,6 +53,7 @@ type minerKey struct { addr address.Address act types.Actor stateroot cid.Cid + tsKey types.TipSetKey } type minerInfo struct { @@ -66,10 +67,11 @@ type minerInfo struct { type actorInfo struct { stateroot cid.Cid + tsKey types.TipSetKey state string } -func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipSet, maxBatch int) { +func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types.TipSet, maxBatch int) { var alk sync.Mutex log.Infof("Getting synced block list") @@ -81,7 +83,7 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipS allToSync := map[cid.Cid]*types.BlockHeader{} toVisit := list.New() - for _, header := range ts.Blocks() { + for _, header := range headTs.Blocks() { toVisit.PushBack(header) } @@ -116,7 +118,7 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipS for len(allToSync) > 0 { actors := map[address.Address]map[types.Actor]actorInfo{} - addresses := map[address.Address]address.Address{} + addressToID := map[address.Address]address.Address{} minH := abi.ChainEpoch(math.MaxInt64) for _, header := range allToSync { @@ -129,7 +131,7 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipS for c, header := range allToSync { if header.Height < minH+abi.ChainEpoch(maxBatch) { toSync[c] = header - addresses[header.Miner] = address.Undef + addressToID[header.Miner] = address.Undef } } for c := range toSync { @@ -146,20 +148,20 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipS } if len(bh.Parents) == 0 { // genesis case - ts, _ := types.NewTipSet([]*types.BlockHeader{bh}) - aadrs, err := api.StateListActors(ctx, ts.Key()) + genesisTs, _ := types.NewTipSet([]*types.BlockHeader{bh}) + aadrs, err := api.StateListActors(ctx, genesisTs.Key()) if err != nil { log.Error(err) return } parmap.Par(50, aadrs, func(addr address.Address) { - act, err := api.StateGetActor(ctx, addr, ts.Key()) + act, err := api.StateGetActor(ctx, addr, genesisTs.Key()) if err != nil { log.Error(err) return } - ast, err := api.StateReadState(ctx, act, ts.Key()) + ast, err := api.StateReadState(ctx, act, genesisTs.Key()) if err != nil { log.Error(err) return @@ -177,9 +179,10 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipS } actors[addr][*act] = actorInfo{ stateroot: bh.ParentStateRoot, + tsKey: genesisTs.Key(), state: string(state), } - addresses[addr] = address.Undef + addressToID[addr] = address.Undef alk.Unlock() }) @@ -206,11 +209,13 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipS log.Error(err) return } + ast, err := api.StateReadState(ctx, &act, pts.Key()) if err != nil { log.Error(err) return } + state, err := json.Marshal(ast.State) if err != nil { log.Error(err) @@ -225,8 +230,9 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipS actors[addr][act] = actorInfo{ stateroot: bh.ParentStateRoot, state: string(state), + tsKey: pts.Key(), } - addresses[addr] = address.Undef + addressToID[addr] = address.Undef alk.Unlock() } }) @@ -238,18 +244,20 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipS log.Infof("Resolving addresses") for _, message := range msgs { - addresses[message.To] = address.Undef - addresses[message.From] = address.Undef + addressToID[message.To] = address.Undef + addressToID[message.From] = address.Undef } - parmap.Par(50, parmap.KMapArr(addresses), func(addr address.Address) { + parmap.Par(50, parmap.KMapArr(addressToID), func(addr address.Address) { + // FIXME: cannot use EmptyTSK here since actorID's can change during reorgs, need to use the corresponding tipset. + // TODO: figure out a way to get the corresponding tipset... raddr, err := api.StateLookupID(ctx, addr, types.EmptyTSK) if err != nil { log.Warn(err) return } alk.Lock() - addresses[addr] = raddr + addressToID[addr] = raddr alk.Unlock() }) @@ -267,6 +275,7 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipS addr: addr, act: actor, stateroot: c.stateroot, + tsKey: c.tsKey, }] = &minerInfo{} } } @@ -274,14 +283,17 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipS parmap.Par(50, parmap.KVMapArr(miners), func(it func() (minerKey, *minerInfo)) { k, info := it() - pow, err := api.StateMinerPower(ctx, k.addr, types.EmptyTSK) + // TODO: get the storage power actors state and and pull the miner power from there, currently this hits the + // storage power actor once for each miner for each tipset, we can do better by just getting it for each tipset + // and reading each miner power from the result. + pow, err := api.StateMinerPower(ctx, k.addr, k.tsKey) if err != nil { log.Error(err) // Not sure why this would fail, but its probably worth continuing } info.power = pow.MinerPower.QualityAdjPower - sszs, err := api.StateMinerSectorCount(ctx, k.addr, types.EmptyTSK) + sszs, err := api.StateMinerSectorCount(ctx, k.addr, k.tsKey) if err != nil { log.Error(err) return @@ -316,7 +328,7 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipS log.Info("Storing address mapping") - if err := st.storeAddressMap(addresses); err != nil { + if err := st.storeAddressMap(addressToID); err != nil { log.Error(err) return } @@ -361,7 +373,7 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipS log.Infof("Get deals") // TODO: incremental, gather expired - deals, err := api.StateMarketDeals(ctx, ts.Key()) + deals, err := api.StateMarketDeals(ctx, headTs.Key()) if err != nil { log.Error(err) return diff --git a/cmd/lotus-seed/seed/seed.go b/cmd/lotus-seed/seed/seed.go index be366d4db..08ae91200 100644 --- a/cmd/lotus-seed/seed/seed.go +++ b/cmd/lotus-seed/seed/seed.go @@ -88,7 +88,7 @@ func PreSeal(maddr address.Address, spt abi.RegisteredSealProof, offset abi.Sect return nil, nil, xerrors.Errorf("commit: %w", err) } - if err := sb.FinalizeSector(context.TODO(), sid); err != nil { + if err := sb.FinalizeSector(context.TODO(), sid, nil); err != nil { return nil, nil, xerrors.Errorf("trim cache: %w", err) } diff --git a/cmd/lotus-storage-miner/actor.go b/cmd/lotus-storage-miner/actor.go new file mode 100644 index 000000000..44eb7a0e4 --- /dev/null +++ b/cmd/lotus-storage-miner/actor.go @@ -0,0 +1,94 @@ +package main + +import ( + "fmt" + + ma "github.com/multiformats/go-multiaddr" + "github.com/urfave/cli/v2" + + "github.com/filecoin-project/specs-actors/actors/abi" + "github.com/filecoin-project/specs-actors/actors/builtin/miner" + + "github.com/filecoin-project/lotus/chain/actors" + "github.com/filecoin-project/lotus/chain/types" + lcli "github.com/filecoin-project/lotus/cli" +) + +var actorCmd = &cli.Command{ + Name: "actor", + Usage: "manipulate the miner actor", + Subcommands: []*cli.Command{ + actorSetAddrsCmd, + }, +} + +var actorSetAddrsCmd = &cli.Command{ + Name: "set-addrs", + Usage: "set addresses that your miner can be publically dialed on", + Flags: []cli.Flag{ + &cli.Int64Flag{ + Name: "gas-limit", + Usage: "set gas limit", + Value: 100000, + }, + }, + Action: func(cctx *cli.Context) error { + nodeAPI, closer, err := lcli.GetStorageMinerAPI(cctx) + if err != nil { + return err + } + defer closer() + + api, acloser, err := lcli.GetFullNodeAPI(cctx) + if err != nil { + return err + } + defer acloser() + + ctx := lcli.ReqContext(cctx) + + var addrs []abi.Multiaddrs + for _, a := range cctx.Args().Slice() { + maddr, err := ma.NewMultiaddr(a) + if err != nil { + return fmt.Errorf("failed to parse %q as a multiaddr: %w", a, err) + } + + addrs = append(addrs, maddr.Bytes()) + } + + maddr, err := nodeAPI.ActorAddress(ctx) + if err != nil { + return err + } + + minfo, err := api.StateMinerInfo(ctx, maddr, types.EmptyTSK) + if err != nil { + return err + } + + params, err := actors.SerializeParams(&miner.ChangeMultiaddrsParams{NewMultiaddrs: addrs}) + if err != nil { + return err + } + + gasLimit := cctx.Int64("gas-limit") + + smsg, err := api.MpoolPushMessage(ctx, &types.Message{ + To: maddr, + From: minfo.Worker, + Value: types.NewInt(0), + GasPrice: types.NewInt(1), + GasLimit: gasLimit, + Method: 18, + Params: params, + }) + if err != nil { + return err + } + + fmt.Printf("Requested multiaddrs change in message %s\n", smsg.Cid()) + return nil + + }, +} diff --git a/cmd/lotus-storage-miner/info.go b/cmd/lotus-storage-miner/info.go index 01077bc83..17e06e214 100644 --- a/cmd/lotus-storage-miner/info.go +++ b/cmd/lotus-storage-miner/info.go @@ -4,7 +4,6 @@ import ( "bytes" "context" "fmt" - "github.com/filecoin-project/specs-actors/actors/builtin/power" "sort" "time" @@ -13,6 +12,7 @@ import ( "golang.org/x/xerrors" "github.com/filecoin-project/specs-actors/actors/builtin/miner" + "github.com/filecoin-project/specs-actors/actors/builtin/power" sealing "github.com/filecoin-project/storage-fsm" "github.com/filecoin-project/lotus/api" diff --git a/cmd/lotus-storage-miner/main.go b/cmd/lotus-storage-miner/main.go index dc6de7029..62efe9370 100644 --- a/cmd/lotus-storage-miner/main.go +++ b/cmd/lotus-storage-miner/main.go @@ -22,7 +22,9 @@ func main() { lotuslog.SetupLogLevels() local := []*cli.Command{ - dealsCmd, + actorCmd, + storageDealsCmd, + retrievalDealsCmd, infoCmd, initCmd, rewardsCmd, diff --git a/cmd/lotus-storage-miner/market.go b/cmd/lotus-storage-miner/market.go index c668456f0..e658be1cf 100644 --- a/cmd/lotus-storage-miner/market.go +++ b/cmd/lotus-storage-miner/market.go @@ -217,9 +217,9 @@ var getAskCmd = &cli.Command{ }, } -var dealsCmd = &cli.Command{ - Name: "deals", - Usage: "interact with your deals", +var storageDealsCmd = &cli.Command{ + Name: "storage-deals", + Usage: "Manage storage deals and related configuration", Subcommands: []*cli.Command{ dealsImportDataCmd, dealsListCmd, diff --git a/cmd/lotus-storage-miner/proving.go b/cmd/lotus-storage-miner/proving.go index 8d3965fa8..60cf2ab99 100644 --- a/cmd/lotus-storage-miner/proving.go +++ b/cmd/lotus-storage-miner/proving.go @@ -26,6 +26,75 @@ var provingCmd = &cli.Command{ Subcommands: []*cli.Command{ provingInfoCmd, provingDeadlinesCmd, + provingFaultsCmd, + }, +} + +var provingFaultsCmd = &cli.Command{ + Name: "faults", + Usage: "View the currently known proving faulty sectors information", + Action: func(cctx *cli.Context) error { + nodeApi, closer, err := lcli.GetStorageMinerAPI(cctx) + if err != nil { + return err + } + defer closer() + + api, acloser, err := lcli.GetFullNodeAPI(cctx) + if err != nil { + return err + } + defer acloser() + + ctx := lcli.ReqContext(cctx) + + maddr, err := nodeApi.ActorAddress(ctx) + if err != nil { + return xerrors.Errorf("getting actor address: %w", err) + } + + var mas miner.State + { + mact, err := api.StateGetActor(ctx, maddr, types.EmptyTSK) + if err != nil { + return err + } + rmas, err := api.ChainReadObj(ctx, mact.Head) + if err != nil { + return err + } + if err := mas.UnmarshalCBOR(bytes.NewReader(rmas)); err != nil { + return err + } + } + faults, err := mas.Faults.All(100000000000) + if err != nil { + return err + } + if len(faults) == 0 { + fmt.Println("no faulty sectors") + } + head, err := api.ChainHead(ctx) + if err != nil { + return xerrors.Errorf("getting chain head: %w", err) + } + deadlines, err := api.StateMinerDeadlines(ctx, maddr, head.Key()) + if err != nil { + return xerrors.Errorf("getting miner deadlines: %w", err) + } + tw := tabwriter.NewWriter(os.Stdout, 2, 4, 2, ' ', 0) + _, _ = fmt.Fprintln(tw, "deadline\tsectors") + for deadline, sectors := range deadlines.Due { + intersectSectors, _ := bitfield.IntersectBitField(sectors, mas.Faults) + if intersectSectors != nil { + allSectors, _ := intersectSectors.All(100000000000) + for _, num := range allSectors { + _, _ = fmt.Fprintf(tw, "%d\t%d\n", deadline, num) + } + } + + } + return tw.Flush() }, } diff --git a/cmd/lotus-storage-miner/retrieval-deals.go b/cmd/lotus-storage-miner/retrieval-deals.go new file mode 100644 index 000000000..ee503fb2b --- /dev/null +++ b/cmd/lotus-storage-miner/retrieval-deals.go @@ -0,0 +1,45 @@ +package main + +import ( + lcli "github.com/filecoin-project/lotus/cli" + "github.com/urfave/cli/v2" +) + +var retrievalDealsCmd = &cli.Command{ + Name: "retrieval-deals", + Usage: "Manage retrieval deals and related configuration", + Subcommands: []*cli.Command{ + enableRetrievalCmd, + disableRetrievalCmd, + }, +} + +var enableRetrievalCmd = &cli.Command{ + Name: "enable", + Usage: "Configure the miner to consider retrieval deal proposals", + Flags: []cli.Flag{}, + Action: func(cctx *cli.Context) error { + api, closer, err := lcli.GetStorageMinerAPI(cctx) + if err != nil { + return err + } + defer closer() + + return api.DealsSetAcceptingRetrievalDeals(lcli.DaemonContext(cctx), true) + }, +} + +var disableRetrievalCmd = &cli.Command{ + Name: "disable", + Usage: "Configure the miner to reject all retrieval deal proposals", + Flags: []cli.Flag{}, + Action: func(cctx *cli.Context) error { + api, closer, err := lcli.GetStorageMinerAPI(cctx) + if err != nil { + return err + } + defer closer() + + return api.DealsSetAcceptingRetrievalDeals(lcli.DaemonContext(cctx), false) + }, +} diff --git a/cmd/lotus-storage-miner/sectors.go b/cmd/lotus-storage-miner/sectors.go index 4a3109f37..0dfce2f38 100644 --- a/cmd/lotus-storage-miner/sectors.go +++ b/cmd/lotus-storage-miner/sectors.go @@ -27,6 +27,7 @@ var sectorsCmd = &cli.Command{ sectorsRefsCmd, sectorsUpdateCmd, sectorsPledgeCmd, + sectorsRemoveCmd, }, } @@ -46,8 +47,9 @@ var sectorsPledgeCmd = &cli.Command{ } var sectorsStatusCmd = &cli.Command{ - Name: "status", - Usage: "Get the seal status of a sector by its ID", + Name: "status", + Usage: "Get the seal status of a sector by its number", + ArgsUsage: "", Flags: []cli.Flag{ &cli.BoolFlag{ Name: "log", @@ -63,7 +65,7 @@ var sectorsStatusCmd = &cli.Command{ ctx := lcli.ReqContext(cctx) if !cctx.Args().Present() { - return fmt.Errorf("must specify sector ID to get status of") + return fmt.Errorf("must specify sector number to get status of") } id, err := strconv.ParseUint(cctx.Args().First(), 10, 64) @@ -208,6 +210,39 @@ var sectorsRefsCmd = &cli.Command{ }, } +var sectorsRemoveCmd = &cli.Command{ + Name: "remove", + Usage: "Forcefully remove a sector (WARNING: This means losing power and collateral for the removed sector)", + ArgsUsage: "", + Flags: []cli.Flag{ + &cli.BoolFlag{ + Name: "really-do-it", + Usage: "pass this flag if you know what you are doing", + }, + }, + Action: func(cctx *cli.Context) error { + if !cctx.Bool("really-do-it") { + return xerrors.Errorf("this is a command for advanced users, only use it if you are sure of what you are doing") + } + nodeApi, closer, err := lcli.GetStorageMinerAPI(cctx) + if err != nil { + return err + } + defer closer() + ctx := lcli.ReqContext(cctx) + if cctx.Args().Len() != 1 { + return xerrors.Errorf("must pass sector number") + } + + id, err := strconv.ParseUint(cctx.Args().Get(0), 10, 64) + if err != nil { + return xerrors.Errorf("could not parse sector number: %w", err) + } + + return nodeApi.SectorRemove(ctx, abi.SectorNumber(id)) + }, +} + var sectorsUpdateCmd = &cli.Command{ Name: "update-state", Usage: "ADVANCED: manually update the state of a sector, this may aid in error recovery", @@ -228,12 +263,12 @@ var sectorsUpdateCmd = &cli.Command{ defer closer() ctx := lcli.ReqContext(cctx) if cctx.Args().Len() < 2 { - return xerrors.Errorf("must pass sector ID and new state") + return xerrors.Errorf("must pass sector number and new state") } id, err := strconv.ParseUint(cctx.Args().Get(0), 10, 64) if err != nil { - return xerrors.Errorf("could not parse sector ID: %w", err) + return xerrors.Errorf("could not parse sector number: %w", err) } return nodeApi.SectorsUpdate(ctx, abi.SectorNumber(id), api.SectorState(cctx.Args().Get(1))) diff --git a/go.mod b/go.mod index 146e01b2a..70931a19c 100644 --- a/go.mod +++ b/go.mod @@ -29,10 +29,10 @@ require ( github.com/filecoin-project/go-paramfetch v0.0.2-0.20200605171344-fcac609550ca github.com/filecoin-project/go-statestore v0.1.0 github.com/filecoin-project/go-storedcounter v0.0.0-20200421200003-1c99c62e8a5b - github.com/filecoin-project/sector-storage v0.0.0-20200618073200-d9de9b7cb4b4 + github.com/filecoin-project/sector-storage v0.0.0-20200625154333-98ef8e4ef246 github.com/filecoin-project/specs-actors v0.6.2-0.20200617175406-de392ca14121 - github.com/filecoin-project/specs-storage v0.1.0 - github.com/filecoin-project/storage-fsm v0.0.0-20200617183754-4380106d3e94 + github.com/filecoin-project/specs-storage v0.1.1-0.20200622113353-88a9704877ea + github.com/filecoin-project/storage-fsm v0.0.0-20200625160832-379a4655b044 github.com/gbrlsnchs/jwt/v3 v3.0.0-beta.1 github.com/go-kit/kit v0.10.0 github.com/go-ole/go-ole v1.2.4 // indirect @@ -44,7 +44,7 @@ require ( github.com/influxdata/influxdb1-client v0.0.0-20191209144304-8bf82d3c094d github.com/ipfs/go-bitswap v0.2.8 github.com/ipfs/go-block-format v0.0.2 - github.com/ipfs/go-blockservice v0.1.3 + github.com/ipfs/go-blockservice v0.1.4-0.20200624145336-a978cec6e834 github.com/ipfs/go-cid v0.0.6 github.com/ipfs/go-cidutil v0.0.2 github.com/ipfs/go-datastore v0.4.4 diff --git a/go.sum b/go.sum index c8199601a..5d2e5d845 100644 --- a/go.sum +++ b/go.sum @@ -252,8 +252,8 @@ github.com/filecoin-project/go-statestore v0.1.0/go.mod h1:LFc9hD+fRxPqiHiaqUEZO github.com/filecoin-project/go-storedcounter v0.0.0-20200421200003-1c99c62e8a5b h1:fkRZSPrYpk42PV3/lIXiL0LHetxde7vyYYvSsttQtfg= github.com/filecoin-project/go-storedcounter v0.0.0-20200421200003-1c99c62e8a5b/go.mod h1:Q0GQOBtKf1oE10eSXSlhN45kDBdGvEcVOqMiffqX+N8= github.com/filecoin-project/sector-storage v0.0.0-20200615154852-728a47ab99d6/go.mod h1:M59QnAeA/oV+Z8oHFLoNpGMv0LZ8Rll+vHVXX7GirPM= -github.com/filecoin-project/sector-storage v0.0.0-20200618073200-d9de9b7cb4b4 h1:lQC8Fbyn31/H4QxYAYwVV3PYZ9vS61EmjktZc5CaiYs= -github.com/filecoin-project/sector-storage v0.0.0-20200618073200-d9de9b7cb4b4/go.mod h1:M59QnAeA/oV+Z8oHFLoNpGMv0LZ8Rll+vHVXX7GirPM= +github.com/filecoin-project/sector-storage v0.0.0-20200625154333-98ef8e4ef246 h1:NfYQRmVRe0LzlNbK5Ket3vbBOwFD5TvtcNtfo/Sd8mg= +github.com/filecoin-project/sector-storage v0.0.0-20200625154333-98ef8e4ef246/go.mod h1:8f0hWDzzIi1hKs4IVKH9RnDsO4LEHVz8BNat0okDOuY= github.com/filecoin-project/specs-actors v0.0.0-20200210130641-2d1fbd8672cf/go.mod h1:xtDZUB6pe4Pksa/bAJbJ693OilaC5Wbot9jMhLm3cZA= github.com/filecoin-project/specs-actors v0.3.0/go.mod h1:nQYnFbQ7Y0bHZyq6HDEuVlCPR+U3z5Q3wMOQ+2aiV+Y= github.com/filecoin-project/specs-actors v0.6.0/go.mod h1:dRdy3cURykh2R8O/DKqy8olScl70rmIS7GrB4hB1IDY= @@ -261,8 +261,10 @@ github.com/filecoin-project/specs-actors v0.6.2-0.20200617175406-de392ca14121 h1 github.com/filecoin-project/specs-actors v0.6.2-0.20200617175406-de392ca14121/go.mod h1:dRdy3cURykh2R8O/DKqy8olScl70rmIS7GrB4hB1IDY= github.com/filecoin-project/specs-storage v0.1.0 h1:PkDgTOT5W5Ao7752onjDl4QSv+sgOVdJbvFjOnD5w94= github.com/filecoin-project/specs-storage v0.1.0/go.mod h1:Pr5ntAaxsh+sLG/LYiL4tKzvA83Vk5vLODYhfNwOg7k= -github.com/filecoin-project/storage-fsm v0.0.0-20200617183754-4380106d3e94 h1:zPKiZPMgkFF0Lq13hsk8lcWlxeVAs6vvJaa3uHn9v70= -github.com/filecoin-project/storage-fsm v0.0.0-20200617183754-4380106d3e94/go.mod h1:q1YCutTSMq/yGYvDPHReT37bPfDLHltnwJutzR9kOY0= +github.com/filecoin-project/specs-storage v0.1.1-0.20200622113353-88a9704877ea h1:iixjULRQFPn7Q9KlIqfwLJnlAXO10bbkI+xy5GKGdLY= +github.com/filecoin-project/specs-storage v0.1.1-0.20200622113353-88a9704877ea/go.mod h1:Pr5ntAaxsh+sLG/LYiL4tKzvA83Vk5vLODYhfNwOg7k= +github.com/filecoin-project/storage-fsm v0.0.0-20200625160832-379a4655b044 h1:i4oMhv1kx/MAUxRN4EM5tag5fI1uagrwQwINgKrzUt4= +github.com/filecoin-project/storage-fsm v0.0.0-20200625160832-379a4655b044/go.mod h1:JD7fmV1BYADDcy4EYQnqFH/rUzXsh0Je0jXarCjZqSk= github.com/flynn/go-shlex v0.0.0-20150515145356-3f9db97f8568/go.mod h1:xEzjJPgXI435gkrCt3MPfRiAkVrwSbHsst4LCFVfpJc= github.com/francoispqt/gojay v1.2.13 h1:d2m3sFjloqoIUQU3TsHBgj6qg/BVGlTBeHDUmyJnXKk= github.com/francoispqt/gojay v1.2.13/go.mod h1:ehT5mTG4ua4581f1++1WLG0vPdaA9HaiDsoyrBGkyDY= @@ -463,6 +465,8 @@ github.com/ipfs/go-blockservice v0.0.7/go.mod h1:EOfb9k/Y878ZTRY/CH0x5+ATtaipfbR github.com/ipfs/go-blockservice v0.1.0/go.mod h1:hzmMScl1kXHg3M2BjTymbVPjv627N7sYcvYaKbop39M= github.com/ipfs/go-blockservice v0.1.3 h1:9XgsPMwwWJSC9uVr2pMDsW2qFTBSkxpGMhmna8mIjPM= github.com/ipfs/go-blockservice v0.1.3/go.mod h1:OTZhFpkgY48kNzbgyvcexW9cHrpjBYIjSR0KoDOFOLU= +github.com/ipfs/go-blockservice v0.1.4-0.20200624145336-a978cec6e834 h1:hFJoI1D2a3MqiNkSb4nKwrdkhCngUxUTFNwVwovZX2s= +github.com/ipfs/go-blockservice v0.1.4-0.20200624145336-a978cec6e834/go.mod h1:OTZhFpkgY48kNzbgyvcexW9cHrpjBYIjSR0KoDOFOLU= github.com/ipfs/go-cid v0.0.1/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM= github.com/ipfs/go-cid v0.0.2/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM= github.com/ipfs/go-cid v0.0.3/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM= diff --git a/miner/miner.go b/miner/miner.go index bdeed8ac5..1f5f8dad5 100644 --- a/miner/miner.go +++ b/miner/miner.go @@ -215,6 +215,9 @@ type MiningBase struct { } func (m *Miner) GetBestMiningCandidate(ctx context.Context) (*MiningBase, error) { + m.lk.Lock() + defer m.lk.Unlock() + bts, err := m.api.ChainHead(ctx) if err != nil { return nil, err @@ -252,6 +255,12 @@ func (m *Miner) hasPower(ctx context.Context, addr address.Address, ts *types.Ti return mpower.MinerPower.QualityAdjPower.GreaterThanEqual(power.ConsensusMinerMinPower), nil } +// mineOne mines a single block, and does so synchronously, if and only if we +// have won the current round. +// +// {hint/landmark}: This method coordinates all the steps involved in mining a +// block, including the condition of whether mine or not at all depending on +// whether we win the round or not. func (m *Miner) mineOne(ctx context.Context, base *MiningBase) (*types.BlockMsg, error) { log.Debugw("attempting to mine a block", "tipset", types.LogCids(base.TipSet.Cids())) start := time.Now() diff --git a/node/builder.go b/node/builder.go index e84c4422c..6b987dc28 100644 --- a/node/builder.go +++ b/node/builder.go @@ -218,6 +218,7 @@ func Online() Option { Override(new(dtypes.BootstrapPeers), modules.BuiltinBootstrap), Override(new(dtypes.DrandBootstrap), modules.DrandBootstrap), + Override(new(dtypes.DrandConfig), modules.BuiltinDrandConfig), Override(HandleIncomingMessagesKey, modules.HandleIncomingMessages), @@ -312,6 +313,8 @@ func Online() Option { Override(new(gen.WinningPoStProver), storage.NewWinningPoStProver), Override(new(*miner.Miner), modules.SetupBlockProducer), + Override(new(dtypes.AcceptingRetrievalDealsConfigFunc), modules.NewAcceptingRetrievalDealsConfigFunc), + Override(new(dtypes.SetAcceptingRetrievalDealsConfigFunc), modules.NewSetAcceptingRetrievalDealsConfigFunc), Override(new(dtypes.AcceptingStorageDealsConfigFunc), modules.NewAcceptingStorageDealsConfigFunc), Override(new(dtypes.SetAcceptingStorageDealsConfigFunc), modules.NewSetAcceptingStorageDealsConfigFunc), Override(new(dtypes.StorageDealPieceCidBlocklistConfigFunc), modules.NewStorageDealPieceCidBlocklistConfigFunc), diff --git a/node/config/def.go b/node/config/def.go index 76a6a89ea..a86f87d24 100644 --- a/node/config/def.go +++ b/node/config/def.go @@ -34,8 +34,9 @@ type StorageMiner struct { } type DealmakingConfig struct { - AcceptingStorageDeals bool - PieceCidBlocklist []cid.Cid + AcceptingStorageDeals bool + AcceptingRetrievalDeals bool + PieceCidBlocklist []cid.Cid } // API contains configs for API endpoint @@ -123,8 +124,9 @@ func DefaultStorageMiner() *StorageMiner { }, Dealmaking: DealmakingConfig{ - AcceptingStorageDeals: true, - PieceCidBlocklist: []cid.Cid{}, + AcceptingStorageDeals: true, + AcceptingRetrievalDeals: true, + PieceCidBlocklist: []cid.Cid{}, }, } cfg.Common.API.ListenAddress = "/ip4/127.0.0.1/tcp/2345/http" diff --git a/node/impl/client/client.go b/node/impl/client/client.go index df6fb862c..6e7f50f26 100644 --- a/node/impl/client/client.go +++ b/node/impl/client/client.go @@ -31,7 +31,7 @@ import ( "go.uber.org/fx" "github.com/filecoin-project/go-address" - "github.com/filecoin-project/go-fil-markets/retrievalmarket" + rm "github.com/filecoin-project/go-fil-markets/retrievalmarket" "github.com/filecoin-project/go-fil-markets/storagemarket" "github.com/filecoin-project/sector-storage/ffiwrapper" "github.com/filecoin-project/specs-actors/actors/abi" @@ -59,8 +59,8 @@ type API struct { paych.PaychAPI SMDealClient storagemarket.StorageClient - RetDiscovery retrievalmarket.PeerResolver - Retrieval retrievalmarket.RetrievalClient + RetDiscovery rm.PeerResolver + Retrieval rm.RetrievalClient Chain *store.ChainStore LocalDAG dtypes.ClientDAG @@ -202,7 +202,7 @@ func (a *API) ClientFindData(ctx context.Context, root cid.Cid) ([]api.QueryOffe out := make([]api.QueryOffer, len(peers)) for k, p := range peers { - out[k] = a.makeRetrievalQuery(ctx, p, root, retrievalmarket.QueryParams{}) + out[k] = a.makeRetrievalQuery(ctx, p, root, rm.QueryParams{}) } return out, nil @@ -213,25 +213,25 @@ func (a *API) ClientMinerQueryOffer(ctx context.Context, payload cid.Cid, miner if err != nil { return api.QueryOffer{}, err } - rp := retrievalmarket.RetrievalPeer{ + rp := rm.RetrievalPeer{ Address: miner, ID: mi.PeerId, } - return a.makeRetrievalQuery(ctx, rp, payload, retrievalmarket.QueryParams{}), nil + return a.makeRetrievalQuery(ctx, rp, payload, rm.QueryParams{}), nil } -func (a *API) makeRetrievalQuery(ctx context.Context, rp retrievalmarket.RetrievalPeer, payload cid.Cid, qp retrievalmarket.QueryParams) api.QueryOffer { +func (a *API) makeRetrievalQuery(ctx context.Context, rp rm.RetrievalPeer, payload cid.Cid, qp rm.QueryParams) api.QueryOffer { queryResponse, err := a.Retrieval.Query(ctx, rp, payload, qp) if err != nil { return api.QueryOffer{Err: err.Error(), Miner: rp.Address, MinerPeerID: rp.ID} } var errStr string switch queryResponse.Status { - case retrievalmarket.QueryResponseAvailable: + case rm.QueryResponseAvailable: errStr = "" - case retrievalmarket.QueryResponseUnavailable: + case rm.QueryResponseUnavailable: errStr = fmt.Sprintf("retrieval query offer was unavailable: %s", queryResponse.Message) - case retrievalmarket.QueryResponseError: + case rm.QueryResponseError: errStr = fmt.Sprintf("retrieval query offer errored: %s", queryResponse.Message) } @@ -345,13 +345,35 @@ func (a *API) ClientRetrieve(ctx context.Context, order api.RetrievalOrder, ref retrievalResult := make(chan error, 1) - unsubscribe := a.Retrieval.SubscribeToEvents(func(event retrievalmarket.ClientEvent, state retrievalmarket.ClientDealState) { + unsubscribe := a.Retrieval.SubscribeToEvents(func(event rm.ClientEvent, state rm.ClientDealState) { if state.PayloadCID.Equals(order.Root) { switch state.Status { - case retrievalmarket.DealStatusFailed, retrievalmarket.DealStatusErrored: - retrievalResult <- xerrors.Errorf("Retrieval Error: %s", state.Message) - case retrievalmarket.DealStatusCompleted: + case rm.DealStatusCompleted: retrievalResult <- nil + case rm.DealStatusRejected: + retrievalResult <- xerrors.Errorf("Retrieval Proposal Rejected: %s", state.Message) + case + rm.DealStatusDealNotFound, + rm.DealStatusErrored, + rm.DealStatusFailed: + retrievalResult <- xerrors.Errorf("Retrieval Error: %s", state.Message) + case + rm.DealStatusAccepted, + rm.DealStatusAwaitingAcceptance, + rm.DealStatusBlocksComplete, + rm.DealStatusFinalizing, + rm.DealStatusFundsNeeded, + rm.DealStatusFundsNeededLastPayment, + rm.DealStatusNew, + rm.DealStatusOngoing, + rm.DealStatusPaymentChannelAddingFunds, + rm.DealStatusPaymentChannelAllocatingLane, + rm.DealStatusPaymentChannelCreating, + rm.DealStatusPaymentChannelReady, + rm.DealStatusVerified: + return + default: + retrievalResult <- xerrors.Errorf("Unhandled Retrieval Status: %+v", state.Status) } } }) @@ -361,7 +383,7 @@ func (a *API) ClientRetrieve(ctx context.Context, order api.RetrievalOrder, ref _, err := a.Retrieval.Retrieve( ctx, order.Root, - retrievalmarket.NewParamsV0(ppb, order.PaymentInterval, order.PaymentIntervalIncrease), + rm.NewParamsV0(ppb, order.PaymentInterval, order.PaymentIntervalIncrease), order.Total, order.MinerPeerID, order.Client, diff --git a/node/impl/full/state.go b/node/impl/full/state.go index e39527201..048ae7858 100644 --- a/node/impl/full/state.go +++ b/node/impl/full/state.go @@ -425,7 +425,7 @@ func (a *StateAPI) StateMarketParticipants(ctx context.Context, tsk types.TipSet if err != nil { return nil, err } - locked, err := hamt.LoadNode(ctx, cst, state.EscrowTable, hamt.UseTreeBitWidth(5)) + locked, err := hamt.LoadNode(ctx, cst, state.LockedTable, hamt.UseTreeBitWidth(5)) if err != nil { return nil, err } @@ -489,13 +489,11 @@ func (a *StateAPI) StateMarketDeals(ctx context.Context, tsk types.TipSetKey) (m var s market.DealState if err := sa.Get(ctx, i, &s); err != nil { - if err != nil { - if _, ok := err.(*amt.ErrNotFound); !ok { - return xerrors.Errorf("failed to get state for deal in proposals array: %w", err) - } - - s.SectorStartEpoch = -1 + if _, ok := err.(*amt.ErrNotFound); !ok { + return xerrors.Errorf("failed to get state for deal in proposals array: %w", err) } + + s.SectorStartEpoch = -1 } out[strconv.FormatInt(int64(i), 10)] = api.MarketDeal{ Proposal: d, diff --git a/node/impl/storminer.go b/node/impl/storminer.go index 360a9442c..b993d1b46 100644 --- a/node/impl/storminer.go +++ b/node/impl/storminer.go @@ -44,6 +44,7 @@ type StorageMinerAPI struct { *stores.Index SetAcceptingStorageDealsConfigFunc dtypes.SetAcceptingStorageDealsConfigFunc + SetAcceptingRetrievalDealsConfigFunc dtypes.SetAcceptingRetrievalDealsConfigFunc StorageDealPieceCidBlocklistConfigFunc dtypes.StorageDealPieceCidBlocklistConfigFunc SetStorageDealPieceCidBlocklistConfigFunc dtypes.SetStorageDealPieceCidBlocklistConfigFunc } @@ -174,6 +175,10 @@ func (sm *StorageMinerAPI) SectorsUpdate(ctx context.Context, id abi.SectorNumbe return sm.Miner.ForceSectorState(ctx, id, sealing.SectorState(state)) } +func (sm *StorageMinerAPI) SectorRemove(ctx context.Context, id abi.SectorNumber) error { + return sm.Miner.RemoveSector(ctx, id) +} + func (sm *StorageMinerAPI) WorkerConnect(ctx context.Context, url string) error { w, err := connectRemoteWorker(ctx, sm, url) if err != nil { @@ -224,6 +229,10 @@ func (sm *StorageMinerAPI) DealsSetAcceptingStorageDeals(ctx context.Context, b return sm.SetAcceptingStorageDealsConfigFunc(b) } +func (sm *StorageMinerAPI) DealsSetAcceptingRetrievalDeals(ctx context.Context, b bool) error { + return sm.SetAcceptingRetrievalDealsConfigFunc(b) +} + func (sm *StorageMinerAPI) DealsImportData(ctx context.Context, deal cid.Cid, fname string) error { fi, err := os.Open(fname) if err != nil { diff --git a/node/modules/dtypes/beacon.go b/node/modules/dtypes/beacon.go new file mode 100644 index 000000000..f3ebc4fac --- /dev/null +++ b/node/modules/dtypes/beacon.go @@ -0,0 +1,6 @@ +package dtypes + +type DrandConfig struct { + Servers []string + ChainInfoJSON string +} diff --git a/node/modules/dtypes/miner.go b/node/modules/dtypes/miner.go index 584642a3b..9ea8c3440 100644 --- a/node/modules/dtypes/miner.go +++ b/node/modules/dtypes/miner.go @@ -10,14 +10,22 @@ import ( type MinerAddress address.Address type MinerID abi.ActorID -// AcceptingStorageDealsFunc is a function which reads from miner config to -// determine if the user has disabled storage deals (or not). +// AcceptingStorageDealsConfigFunc is a function which reads from miner config +// to determine if the user has disabled storage deals (or not). type AcceptingStorageDealsConfigFunc func() (bool, error) -// SetAcceptingStorageDealsFunc is a function which is used to disable or enable -// storage deal acceptance. +// SetAcceptingStorageDealsConfigFunc is a function which is used to disable or +// enable storage deal acceptance. type SetAcceptingStorageDealsConfigFunc func(bool) error +// AcceptingRetrievalDealsConfigFunc is a function which reads from miner config +// to determine if the user has disabled retrieval acceptance (or not). +type AcceptingRetrievalDealsConfigFunc func() (bool, error) + +// SetAcceptingRetrievalDealsConfigFunc is a function which is used to disable +// or enable retrieval deal acceptance. +type SetAcceptingRetrievalDealsConfigFunc func(bool) error + // StorageDealPieceCidBlocklistConfigFunc is a function which reads from miner config // to obtain a list of CIDs for which the storage miner will not accept storage // proposals. diff --git a/node/modules/lp2p/pubsub.go b/node/modules/lp2p/pubsub.go index bf9d88d42..ac23f56a8 100644 --- a/node/modules/lp2p/pubsub.go +++ b/node/modules/lp2p/pubsub.go @@ -44,13 +44,14 @@ type GossipIn struct { Db dtypes.DrandBootstrap Cfg *config.Pubsub Sk *dtypes.ScoreKeeper + Dr dtypes.DrandConfig } -func getDrandTopic() (string, error) { +func getDrandTopic(chainInfoJSON string) (string, error) { var drandInfo = struct { Hash string `json:"hash"` }{} - err := json.Unmarshal([]byte(build.DrandChain), &drandInfo) + err := json.Unmarshal([]byte(chainInfoJSON), &drandInfo) if err != nil { return "", xerrors.Errorf("could not unmarshal drand chain info: %w", err) } @@ -68,7 +69,7 @@ func GossipSub(in GossipIn) (service *pubsub.PubSub, err error) { } isBootstrapNode := in.Cfg.Bootstrapper - drandTopic, err := getDrandTopic() + drandTopic, err := getDrandTopic(in.Dr.ChainInfoJSON) if err != nil { return nil, err } diff --git a/node/modules/services.go b/node/modules/services.go index 27244ad3a..2cba3a0be 100644 --- a/node/modules/services.go +++ b/node/modules/services.go @@ -108,8 +108,13 @@ func RetrievalResolver(l *discovery.Local) retrievalmarket.PeerResolver { type RandomBeaconParams struct { fx.In - PubSub *pubsub.PubSub `optional:"true"` - Cs *store.ChainStore + PubSub *pubsub.PubSub `optional:"true"` + Cs *store.ChainStore + DrandConfig dtypes.DrandConfig +} + +func BuiltinDrandConfig() dtypes.DrandConfig { + return build.DrandConfig } func RandomBeacon(p RandomBeaconParams, _ dtypes.AfterGenesisSet) (beacon.RandomBeacon, error) { @@ -119,5 +124,5 @@ func RandomBeacon(p RandomBeaconParams, _ dtypes.AfterGenesisSet) (beacon.Random } //return beacon.NewMockBeacon(build.BlockDelay * time.Second) - return drand.NewDrandBeacon(gen.Timestamp, build.BlockDelay, p.PubSub) + return drand.NewDrandBeacon(gen.Timestamp, build.BlockDelay, p.PubSub, p.DrandConfig) } diff --git a/node/modules/storageminer.go b/node/modules/storageminer.go index c6cadec34..3ccc5daa7 100644 --- a/node/modules/storageminer.go +++ b/node/modules/storageminer.go @@ -357,14 +357,31 @@ func StorageProvider(minerAddress dtypes.MinerAddress, ffiConfig *ffiwrapper.Con } // RetrievalProvider creates a new retrieval provider attached to the provider blockstore -func RetrievalProvider(h host.Host, miner *storage.Miner, sealer sectorstorage.SectorManager, full lapi.FullNode, ds dtypes.MetadataDS, pieceStore dtypes.ProviderPieceStore, ibs dtypes.StagingBlockstore) (retrievalmarket.RetrievalProvider, error) { +func RetrievalProvider(h host.Host, miner *storage.Miner, sealer sectorstorage.SectorManager, full lapi.FullNode, ds dtypes.MetadataDS, pieceStore dtypes.ProviderPieceStore, ibs dtypes.StagingBlockstore, isAcceptingFunc dtypes.AcceptingRetrievalDealsConfigFunc) (retrievalmarket.RetrievalProvider, error) { adapter := retrievaladapter.NewRetrievalProviderNode(miner, sealer, full) - address, err := minerAddrFromDS(ds) + + maddr, err := minerAddrFromDS(ds) if err != nil { return nil, err } - network := rmnet.NewFromLibp2pHost(h) - return retrievalimpl.NewProvider(address, adapter, network, pieceStore, ibs, namespace.Wrap(ds, datastore.NewKey("/retrievals/provider"))) + + netwk := rmnet.NewFromLibp2pHost(h) + + opt := retrievalimpl.DealDeciderOpt(func(ctx context.Context, state retrievalmarket.ProviderDealState) (bool, string, error) { + b, err := isAcceptingFunc() + if err != nil { + return false, "miner error", err + } + + if !b { + log.Warn("retrieval deal acceptance disabled; rejecting retrieval deal proposal from client") + return false, "miner is not accepting retrieval deals", nil + } + + return true, "", nil + }) + + return retrievalimpl.NewProvider(maddr, adapter, netwk, pieceStore, ibs, namespace.Wrap(ds, datastore.NewKey("/retrievals/provider")), opt) } func SectorStorage(mctx helpers.MetricsCtx, lc fx.Lifecycle, ls stores.LocalStorage, si stores.SectorIndex, cfg *ffiwrapper.Config, sc sectorstorage.SealerConfig, urls sectorstorage.URLs, sa sectorstorage.StorageAuth) (*sectorstorage.Manager, error) { @@ -399,6 +416,24 @@ func StorageAuth(ctx helpers.MetricsCtx, ca lapi.Common) (sectorstorage.StorageA return sectorstorage.StorageAuth(headers), nil } +func NewAcceptingRetrievalDealsConfigFunc(r repo.LockedRepo) (dtypes.AcceptingRetrievalDealsConfigFunc, error) { + return func() (out bool, err error) { + err = readCfg(r, func(cfg *config.StorageMiner) { + out = cfg.Dealmaking.AcceptingRetrievalDeals + }) + return + }, nil +} + +func NewSetAcceptingRetrievalDealsConfigFunc(r repo.LockedRepo) (dtypes.SetAcceptingRetrievalDealsConfigFunc, error) { + return func(b bool) (err error) { + err = mutateCfg(r, func(cfg *config.StorageMiner) { + cfg.Dealmaking.AcceptingRetrievalDeals = b + }) + return + }, nil +} + func NewAcceptingStorageDealsConfigFunc(r repo.LockedRepo) (dtypes.AcceptingStorageDealsConfigFunc, error) { return func() (out bool, err error) { err = readCfg(r, func(cfg *config.StorageMiner) { diff --git a/storage/sealing.go b/storage/sealing.go index c8b485379..f5716e85d 100644 --- a/storage/sealing.go +++ b/storage/sealing.go @@ -39,3 +39,7 @@ func (m *Miner) PledgeSector() error { func (m *Miner) ForceSectorState(ctx context.Context, id abi.SectorNumber, state sealing.SectorState) error { return m.sealing.ForceSectorState(ctx, id, state) } + +func (m *Miner) RemoveSector(ctx context.Context, id abi.SectorNumber) error { + return m.sealing.Remove(ctx, id) +} diff --git a/storage/wdpost_run.go b/storage/wdpost_run.go index 5c1170551..fcace70dd 100644 --- a/storage/wdpost_run.go +++ b/storage/wdpost_run.go @@ -383,17 +383,14 @@ func (s *WindowPoStScheduler) runPost(ctx context.Context, di miner.DeadlineInfo return nil, xerrors.Errorf("get need prove sectors: %w", err) } - var skipped *abi.BitField - { - good, err := s.checkSectors(ctx, nps) - if err != nil { - return nil, xerrors.Errorf("checking sectors to skip: %w", err) - } + good, err := s.checkSectors(ctx, nps) + if err != nil { + return nil, xerrors.Errorf("checking sectors to skip: %w", err) + } - skipped, err = bitfield.SubtractBitField(nps, good) - if err != nil { - return nil, xerrors.Errorf("nps - good: %w", err) - } + skipped, err := bitfield.SubtractBitField(nps, good) + if err != nil { + return nil, xerrors.Errorf("nps - good: %w", err) } skipCount, err := skipped.Count() @@ -401,7 +398,7 @@ func (s *WindowPoStScheduler) runPost(ctx context.Context, di miner.DeadlineInfo return nil, xerrors.Errorf("getting skipped sector count: %w", err) } - ssi, err := s.sortedSectorInfo(ctx, nps, ts) + ssi, err := s.sortedSectorInfo(ctx, good, ts) if err != nil { return nil, xerrors.Errorf("getting sorted sector info: %w", err) }