Merge remote-tracking branch 'origin/master' into next

This commit is contained in:
Łukasz Magiera 2020-06-26 13:08:37 +02:00
commit d27ff60ae2
46 changed files with 759 additions and 164 deletions

View File

@ -314,7 +314,7 @@ workflows:
ci:
jobs:
- lint-changes:
args: "--new-from-rev origin/next"
args: "--new-from-rev origin/master"
- mod-tidy-check
- gofmt
- test:

View File

@ -36,6 +36,7 @@ type StorageMiner interface {
SectorsRefs(context.Context) (map[string][]SealedRef, error)
SectorsUpdate(context.Context, abi.SectorNumber, SectorState) error
SectorRemove(context.Context, abi.SectorNumber) error
StorageList(ctx context.Context) (map[stores.ID][]stores.Decl, error)
StorageLocal(ctx context.Context) (map[stores.ID]string, error)
@ -56,6 +57,7 @@ type StorageMiner interface {
DealsImportData(ctx context.Context, dealPropCid cid.Cid, file string) error
DealsList(ctx context.Context) ([]storagemarket.StorageDeal, error)
DealsSetAcceptingStorageDeals(context.Context, bool) error
DealsSetAcceptingRetrievalDeals(context.Context, bool) error
DealsPieceCidBlocklist(context.Context) ([]cid.Cid, error)
DealsSetPieceCidBlocklist(context.Context, []cid.Cid) error

View File

@ -206,6 +206,7 @@ type StorageMinerStruct struct {
SectorsList func(context.Context) ([]abi.SectorNumber, error) `perm:"read"`
SectorsRefs func(context.Context) (map[string][]api.SealedRef, error) `perm:"read"`
SectorsUpdate func(context.Context, abi.SectorNumber, api.SectorState) error `perm:"write"`
SectorRemove func(context.Context, abi.SectorNumber) error `perm:"admin"`
WorkerConnect func(context.Context, string) error `perm:"admin"` // TODO: worker perm
WorkerStats func(context.Context) (map[uint64]storiface.WorkerStats, error) `perm:"admin"`
@ -223,11 +224,12 @@ type StorageMinerStruct struct {
StorageLock func(ctx context.Context, sector abi.SectorID, read stores.SectorFileType, write stores.SectorFileType) error `perm:"admin"`
StorageTryLock func(ctx context.Context, sector abi.SectorID, read stores.SectorFileType, write stores.SectorFileType) (bool, error) `perm:"admin"`
DealsImportData func(ctx context.Context, dealPropCid cid.Cid, file string) error `perm:"write"`
DealsList func(ctx context.Context) ([]storagemarket.StorageDeal, error) `perm:"read"`
DealsSetAcceptingStorageDeals func(context.Context, bool) error `perm:"admin"`
DealsPieceCidBlocklist func(context.Context) ([]cid.Cid, error) `perm:"admin"`
DealsSetPieceCidBlocklist func(context.Context, []cid.Cid) error `perm:"read"`
DealsImportData func(ctx context.Context, dealPropCid cid.Cid, file string) error `perm:"write"`
DealsList func(ctx context.Context) ([]storagemarket.StorageDeal, error) `perm:"read"`
DealsSetAcceptingStorageDeals func(context.Context, bool) error `perm:"admin"`
DealsSetAcceptingRetrievalDeals func(context.Context, bool) error `perm:"admin"`
DealsPieceCidBlocklist func(context.Context) ([]cid.Cid, error) `perm:"admin"`
DealsSetPieceCidBlocklist func(context.Context, []cid.Cid) error `perm:"read"`
StorageAddLocal func(ctx context.Context, path string) error `perm:"admin"`
}
@ -243,12 +245,14 @@ type WorkerStruct struct {
Paths func(context.Context) ([]stores.StoragePath, error) `perm:"admin"`
Info func(context.Context) (storiface.WorkerInfo, error) `perm:"admin"`
SealPreCommit1 func(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storage.PreCommit1Out, error) `perm:"admin"`
SealPreCommit2 func(context.Context, abi.SectorID, storage.PreCommit1Out) (cids storage.SectorCids, err error) `perm:"admin"`
SealCommit1 func(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (storage.Commit1Out, error) `perm:"admin"`
SealCommit2 func(context.Context, abi.SectorID, storage.Commit1Out) (storage.Proof, error) `perm:"admin"`
FinalizeSector func(context.Context, abi.SectorID) error `perm:"admin"`
MoveStorage func(ctx context.Context, sector abi.SectorID) error `perm:"admin"`
SealPreCommit1 func(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storage.PreCommit1Out, error) `perm:"admin"`
SealPreCommit2 func(context.Context, abi.SectorID, storage.PreCommit1Out) (cids storage.SectorCids, err error) `perm:"admin"`
SealCommit1 func(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (storage.Commit1Out, error) `perm:"admin"`
SealCommit2 func(context.Context, abi.SectorID, storage.Commit1Out) (storage.Proof, error) `perm:"admin"`
FinalizeSector func(context.Context, abi.SectorID, []storage.Range) error `perm:"admin"`
ReleaseUnsealed func(ctx context.Context, sector abi.SectorID, safeToFree []storage.Range) error `perm:"admin"`
Remove func(ctx context.Context, sector abi.SectorID) error `perm:"admin"`
MoveStorage func(ctx context.Context, sector abi.SectorID) error `perm:"admin"`
UnsealPiece func(context.Context, abi.SectorID, storiface.UnpaddedByteIndex, abi.UnpaddedPieceSize, abi.SealRandomness, cid.Cid) error `perm:"admin"`
ReadPiece func(context.Context, io.Writer, abi.SectorID, storiface.UnpaddedByteIndex, abi.UnpaddedPieceSize) error `perm:"admin"`
@ -786,6 +790,10 @@ func (c *StorageMinerStruct) SectorsUpdate(ctx context.Context, id abi.SectorNum
return c.Internal.SectorsUpdate(ctx, id, state)
}
func (c *StorageMinerStruct) SectorRemove(ctx context.Context, number abi.SectorNumber) error {
return c.Internal.SectorRemove(ctx, number)
}
func (c *StorageMinerStruct) WorkerConnect(ctx context.Context, url string) error {
return c.Internal.WorkerConnect(ctx, url)
}
@ -874,6 +882,10 @@ func (c *StorageMinerStruct) DealsSetAcceptingStorageDeals(ctx context.Context,
return c.Internal.DealsSetAcceptingStorageDeals(ctx, b)
}
func (c *StorageMinerStruct) DealsSetAcceptingRetrievalDeals(ctx context.Context, b bool) error {
return c.Internal.DealsSetAcceptingRetrievalDeals(ctx, b)
}
func (c *StorageMinerStruct) DealsPieceCidBlocklist(ctx context.Context) ([]cid.Cid, error) {
return c.Internal.DealsPieceCidBlocklist(ctx)
}
@ -920,8 +932,16 @@ func (w *WorkerStruct) SealCommit2(ctx context.Context, sector abi.SectorID, c1o
return w.Internal.SealCommit2(ctx, sector, c1o)
}
func (w *WorkerStruct) FinalizeSector(ctx context.Context, sector abi.SectorID) error {
return w.Internal.FinalizeSector(ctx, sector)
func (w *WorkerStruct) FinalizeSector(ctx context.Context, sector abi.SectorID, keepUnsealed []storage.Range) error {
return w.Internal.FinalizeSector(ctx, sector, keepUnsealed)
}
func (w *WorkerStruct) ReleaseUnsealed(ctx context.Context, sector abi.SectorID, safeToFree []storage.Range) error {
return w.Internal.ReleaseUnsealed(ctx, sector, safeToFree)
}
func (w *WorkerStruct) Remove(ctx context.Context, sector abi.SectorID) error {
return w.Internal.Remove(ctx, sector)
}
func (w *WorkerStruct) MoveStorage(ctx context.Context, sector abi.SectorID) error {

View File

@ -8,6 +8,7 @@ import (
"math/rand"
"os"
"path/filepath"
"sync/atomic"
"testing"
"time"
@ -52,11 +53,11 @@ func TestDealFlow(t *testing.T, b APIBuilder, blocktime time.Duration, carExport
}
time.Sleep(time.Second)
mine := true
mine := int64(1)
done := make(chan struct{})
go func() {
defer close(done)
for mine {
for atomic.LoadInt64(&mine) == 1 {
time.Sleep(blocktime)
if err := sn[0].MineOne(ctx, func(bool) {}); err != nil {
t.Error(err)
@ -66,7 +67,7 @@ func TestDealFlow(t *testing.T, b APIBuilder, blocktime time.Duration, carExport
makeDeal(t, ctx, 6, client, miner, carExport)
mine = false
atomic.AddInt64(&mine, -1)
fmt.Println("shutting down mining")
<-done
}
@ -89,12 +90,12 @@ func TestDoubleDealFlow(t *testing.T, b APIBuilder, blocktime time.Duration) {
}
time.Sleep(time.Second)
mine := true
mine := int64(1)
done := make(chan struct{})
go func() {
defer close(done)
for mine {
for atomic.LoadInt64(&mine) == 1 {
time.Sleep(blocktime)
if err := sn[0].MineOne(ctx, func(bool) {}); err != nil {
t.Error(err)
@ -105,7 +106,7 @@ func TestDoubleDealFlow(t *testing.T, b APIBuilder, blocktime time.Duration) {
makeDeal(t, ctx, 6, client, miner, false)
makeDeal(t, ctx, 7, client, miner, false)
mine = false
atomic.AddInt64(&mine, -1)
fmt.Println("shutting down mining")
<-done
}

View File

@ -126,6 +126,7 @@ func TestDealMining(t *testing.T, b APIBuilder, blocktime time.Duration, carExpo
minedTwo := make(chan struct{})
go func() {
doneMinedTwo := false
defer close(done)
prevExpect := 0
@ -179,9 +180,9 @@ func TestDealMining(t *testing.T, b APIBuilder, blocktime time.Duration, carExpo
time.Sleep(blocktime)
}
if prevExpect == 2 && expect == 2 && minedTwo != nil {
if prevExpect == 2 && expect == 2 && !doneMinedTwo {
close(minedTwo)
minedTwo = nil
doneMinedTwo = true
}
prevExpect = expect

View File

@ -121,4 +121,11 @@ const VerifSigCacheSize = 32000
const BlockMessageLimit = 512
const BlockGasLimit = 100_000_000_000
var DrandChain = `{"public_key":"922a2e93828ff83345bae533f5172669a26c02dc76d6bf59c80892e12ab1455c229211886f35bb56af6d5bea981024df","period":25,"genesis_time":1590445175,"hash":"138a324aa6540f93d0dad002aa89454b1bec2b6e948682cde6bd4db40f4b7c9b"}`
var DrandConfig = dtypes.DrandConfig{
Servers: []string{
"https://pl-eu.testnet.drand.sh",
"https://pl-us.testnet.drand.sh",
"https://pl-sin.testnet.drand.sh",
},
ChainInfoJSON: `{"public_key":"922a2e93828ff83345bae533f5172669a26c02dc76d6bf59c80892e12ab1455c229211886f35bb56af6d5bea981024df","period":25,"genesis_time":1590445175,"hash":"138a324aa6540f93d0dad002aa89454b1bec2b6e948682cde6bd4db40f4b7c9b"}`,
}

View File

@ -53,7 +53,7 @@ func (ve Version) EqMajorMinor(v2 Version) bool {
}
// APIVersion is a semver version of the rpc api exposed
var APIVersion Version = newVer(0, 3, 0)
var APIVersion Version = newVer(0, 4, 0)
//nolint:varcheck,deadcode
const (

View File

@ -17,6 +17,10 @@ type Response struct {
Err error
}
// RandomBeacon represents a system that provides randomness to Lotus.
// Other components interrogate the RandomBeacon to acquire randomness that's
// valid for a specific chain epoch. Also to verify beacon entries that have
// been posted on chain.
type RandomBeacon interface {
Entry(context.Context, uint64) <-chan Response
VerifyEntry(types.BeaconEntry, types.BeaconEntry) error

View File

@ -19,31 +19,15 @@ import (
logging "github.com/ipfs/go-log"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/lotus/chain/beacon"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/lotus/node/modules/dtypes"
)
var log = logging.Logger("drand")
var drandServers = []string{
"https://pl-eu.testnet.drand.sh",
"https://pl-us.testnet.drand.sh",
"https://pl-sin.testnet.drand.sh",
}
var drandChain *dchain.Info
func init() {
var err error
drandChain, err = dchain.InfoFromJSON(bytes.NewReader([]byte(build.DrandChain)))
if err != nil {
panic("could not unmarshal chain info: " + err.Error())
}
}
type drandPeer struct {
addr string
tls bool
@ -57,6 +41,13 @@ func (dp *drandPeer) IsTLS() bool {
return dp.tls
}
// DrandBeacon connects Lotus with a drand network in order to provide
// randomness to the system in a way that's aligned with Filecoin rounds/epochs.
//
// We connect to drand peers via their public HTTP endpoints. The peers are
// enumerated in the drandServers variable.
//
// The root trust for the Drand chain is configured from build.DrandChain.
type DrandBeacon struct {
client dclient.Client
@ -73,16 +64,21 @@ type DrandBeacon struct {
localCache map[uint64]types.BeaconEntry
}
func NewDrandBeacon(genesisTs, interval uint64, ps *pubsub.PubSub) (*DrandBeacon, error) {
func NewDrandBeacon(genesisTs, interval uint64, ps *pubsub.PubSub, config dtypes.DrandConfig) (*DrandBeacon, error) {
if genesisTs == 0 {
panic("what are you doing this cant be zero")
}
drandChain, err := dchain.InfoFromJSON(bytes.NewReader([]byte(config.ChainInfoJSON)))
if err != nil {
return nil, xerrors.Errorf("unable to unmarshal drand chain info: %w", err)
}
dlogger := dlog.NewKitLoggerFrom(kzap.NewZapSugarLogger(
log.SugaredLogger.Desugar(), zapcore.InfoLevel))
var clients []dclient.Client
for _, url := range drandServers {
for _, url := range config.Servers {
hc, err := hclient.NewWithInfo(url, drandChain, nil)
if err != nil {
return nil, xerrors.Errorf("could not create http drand client: %w", err)

View File

@ -7,10 +7,13 @@ import (
dchain "github.com/drand/drand/chain"
hclient "github.com/drand/drand/client/http"
"github.com/stretchr/testify/assert"
"github.com/filecoin-project/lotus/build"
)
func TestPrintGroupInfo(t *testing.T) {
c, err := hclient.New(drandServers[0], nil, nil)
server := build.DrandConfig.Servers[0]
c, err := hclient.New(server, nil, nil)
assert.NoError(t, err)
cg := c.(interface {
FetchChainInfo(groupHash []byte) (*dchain.Info, error)

View File

@ -10,6 +10,7 @@ import (
"golang.org/x/xerrors"
cborutil "github.com/filecoin-project/go-cbor-util"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types"
@ -27,6 +28,24 @@ const BlockSyncProtocolID = "/fil/sync/blk/0.0.1"
const BlockSyncMaxRequestLength = 800
// BlockSyncService is the component that services BlockSync requests from
// peers.
//
// BlockSync is the basic chain synchronization protocol of Filecoin. BlockSync
// is an RPC-oriented protocol, with a single operation to request blocks.
//
// A request contains a start anchor block (referred to with a CID), and a
// amount of blocks requested beyond the anchor (including the anchor itself).
//
// A client can also pass options, encoded as a 64-bit bitfield. Lotus supports
// two options at the moment:
//
// - include block contents
// - include block messages
//
// The response will include a status code, an optional message, and the
// response payload in case of success. The payload is a slice of serialized
// tipsets.
type BlockSyncService struct {
cs *store.ChainStore
}

View File

@ -64,6 +64,11 @@ func (bs *BlockSync) processStatus(req *BlockSyncRequest, res *BlockSyncResponse
}
}
// GetBlocks fetches count blocks from the network, from the provided tipset
// *backwards*, returning as many tipsets as count.
//
// {hint/usage}: This is used by the Syncer during normal chain syncing and when
// resolving forks.
func (bs *BlockSync) GetBlocks(ctx context.Context, tsk types.TipSetKey, count int) ([]*types.TipSet, error) {
ctx, span := trace.StartSpan(ctx, "bsync.GetBlocks")
defer span.End()
@ -80,7 +85,9 @@ func (bs *BlockSync) GetBlocks(ctx context.Context, tsk types.TipSetKey, count i
Options: BSOptBlocks,
}
// this peerset is sorted by latency and failure counting.
peers := bs.getPeers()
// randomize the first few peers so we don't always pick the same peer
shufflePrefix(peers)
@ -356,6 +363,7 @@ func (bs *BlockSync) RemovePeer(p peer.ID) {
bs.syncPeers.removePeer(p)
}
// getPeers returns a preference-sorted set of peers to query.
func (bs *BlockSync) getPeers() []peer.ID {
return bs.syncPeers.prefSortedPeers()
}

View File

@ -84,6 +84,9 @@ type calledEvents struct {
}
func (e *calledEvents) headChangeCalled(rev, app []*types.TipSet) error {
e.lk.Lock()
defer e.lk.Unlock()
for _, ts := range rev {
e.handleReverts(ts)
e.at = ts.Height()
@ -134,7 +137,6 @@ func (e *calledEvents) checkNewCalls(ts *types.TipSet) {
e.messagesForTs(pts, func(msg *types.Message) {
// TODO: provide receipts
for tid, matchFns := range e.matchers {
var matched bool
for _, matchFn := range matchFns {

View File

@ -26,12 +26,15 @@ type heightEvents struct {
}
func (e *heightEvents) headChangeAt(rev, app []*types.TipSet) error {
ctx, span := trace.StartSpan(e.ctx, "events.HeightHeadChange")
defer span.End()
span.AddAttributes(trace.Int64Attribute("endHeight", int64(app[0].Height())))
span.AddAttributes(trace.Int64Attribute("reverts", int64(len(rev))))
span.AddAttributes(trace.Int64Attribute("applies", int64(len(app))))
e.lk.Lock()
defer e.lk.Unlock()
for _, ts := range rev {
// TODO: log error if h below gcconfidence
// revert height-based triggers
@ -40,7 +43,10 @@ func (e *heightEvents) headChangeAt(rev, app []*types.TipSet) error {
for _, tid := range e.htHeights[h] {
ctx, span := trace.StartSpan(ctx, "events.HeightRevert")
err := e.heightTriggers[tid].revert(ctx, ts)
rev := e.heightTriggers[tid].revert
e.lk.Unlock()
err := rev(ctx, ts)
e.lk.Lock()
e.heightTriggers[tid].called = false
span.End()
@ -98,8 +104,10 @@ func (e *heightEvents) headChangeAt(rev, app []*types.TipSet) error {
ctx, span := trace.StartSpan(ctx, "events.HeightApply")
span.AddAttributes(trace.BoolAttribute("immediate", false))
err = hnd.handle(ctx, incTs, h)
handle := hnd.handle
e.lk.Unlock()
err = handle(ctx, incTs, h)
e.lk.Lock()
span.End()
if err != nil {

View File

@ -32,8 +32,11 @@ func (fts *FullTipSet) Cids() []cid.Cid {
return cids
}
// TipSet returns a narrower view of this FullTipSet elliding the block
// messages.
func (fts *FullTipSet) TipSet() *types.TipSet {
if fts.tipset != nil {
// FIXME: fts.tipset is actually never set. Should it memoize?
return fts.tipset
}

View File

@ -34,7 +34,7 @@ type lbEntry struct {
target types.TipSetKey
}
func (ci *ChainIndex) GetTipsetByHeight(ctx context.Context, from *types.TipSet, to abi.ChainEpoch) (*types.TipSet, error) {
func (ci *ChainIndex) GetTipsetByHeight(_ context.Context, from *types.TipSet, to abi.ChainEpoch) (*types.TipSet, error) {
if from.Height()-to <= ci.skipLength {
return ci.walkBack(from, to)
}

View File

@ -52,6 +52,15 @@ var blockValidationCacheKeyPrefix = dstore.NewKey("blockValidation")
// ReorgNotifee represents a callback that gets called upon reorgs.
type ReorgNotifee func(rev, app []*types.TipSet) error
// ChainStore is the main point of access to chain data.
//
// Raw chain data is stored in the Blockstore, with relevant markers (genesis,
// latest head tipset references) being tracked in the Datastore (key-value
// store).
//
// To alleviate disk access, the ChainStore has two ARC caches:
// 1. a tipset cache
// 2. a block => messages references cache.
type ChainStore struct {
bs bstore.Blockstore
ds dstore.Datastore
@ -266,6 +275,9 @@ func (cs *ChainStore) PutTipSet(ctx context.Context, ts *types.TipSet) error {
return nil
}
// MaybeTakeHeavierTipSet evaluates the incoming tipset and locks it in our
// internal state as our new head, if and only if it is heavier than the current
// head.
func (cs *ChainStore) MaybeTakeHeavierTipSet(ctx context.Context, ts *types.TipSet) error {
cs.heaviestLk.Lock()
defer cs.heaviestLk.Unlock()
@ -331,6 +343,9 @@ func (cs *ChainStore) reorgWorker(ctx context.Context, initialNotifees []ReorgNo
return out
}
// takeHeaviestTipSet actually sets the incoming tipset as our head both in
// memory and in the ChainStore. It also sends a notification to deliver to
// ReorgNotifees.
func (cs *ChainStore) takeHeaviestTipSet(ctx context.Context, ts *types.TipSet) error {
_, span := trace.StartSpan(ctx, "takeHeaviestTipSet")
defer span.End()
@ -368,6 +383,7 @@ func (cs *ChainStore) SetHead(ts *types.TipSet) error {
return cs.takeHeaviestTipSet(context.TODO(), ts)
}
// Contains returns whether our BlockStore has all blocks in the supplied TipSet.
func (cs *ChainStore) Contains(ts *types.TipSet) (bool, error) {
for _, c := range ts.Cids() {
has, err := cs.bs.Has(c)
@ -382,6 +398,8 @@ func (cs *ChainStore) Contains(ts *types.TipSet) (bool, error) {
return true, nil
}
// GetBlock fetches a BlockHeader with the supplied CID. It returns
// blockstore.ErrNotFound if the block was not found in the BlockStore.
func (cs *ChainStore) GetBlock(c cid.Cid) (*types.BlockHeader, error) {
sb, err := cs.bs.Get(c)
if err != nil {
@ -474,6 +492,7 @@ func (cs *ChainStore) ReorgOps(a, b *types.TipSet) ([]*types.TipSet, []*types.Ti
return leftChain, rightChain, nil
}
// GetHeaviestTipSet returns the current heaviest tipset known (i.e. our head).
func (cs *ChainStore) GetHeaviestTipSet() *types.TipSet {
cs.heaviestLk.Lock()
defer cs.heaviestLk.Unlock()

View File

@ -53,6 +53,29 @@ var log = logging.Logger("chain")
var LocalIncoming = "incoming"
// Syncer is in charge of running the chain synchronization logic. As such, it
// is tasked with these functions, amongst others:
//
// * Fast-forwards the chain as it learns of new TipSets from the network via
// the SyncManager.
// * Applies the fork choice rule to select the correct side when confronted
// with a fork in the network.
// * Requests block headers and messages from other peers when not available
// in our BlockStore.
// * Tracks blocks marked as bad in a cache.
// * Keeps the BlockStore and ChainStore consistent with our view of the world,
// the latter of which in turn informs other components when a reorg has been
// committed.
//
// The Syncer does not run workers itself. It's mainly concerned with
// ensuring a consistent state of chain consensus. The reactive and network-
// interfacing processes are part of other components, such as the SyncManager
// (which owns the sync scheduler and sync workers), BlockSync, the HELLO
// protocol, and the gossipsub block propagation layer.
//
// {hint/concept} The fork-choice rule as it currently stands is: "pick the
// chain with the heaviest weight, so long as it hasnt deviated one finality
// threshold from our head (900 epochs, parameter determined by spec-actors)".
type Syncer struct {
// The interface for accessing and putting tipsets into local storage
store *store.ChainStore
@ -85,6 +108,7 @@ type Syncer struct {
verifier ffiwrapper.Verifier
}
// NewSyncer creates a new Syncer object.
func NewSyncer(sm *stmgr.StateManager, bsync *blocksync.BlockSync, connmgr connmgr.ConnManager, self peer.ID, beacon beacon.RandomBeacon, verifier ffiwrapper.Verifier) (*Syncer, error) {
gen, err := sm.ChainStore().GetGenesis()
if err != nil {
@ -182,6 +206,11 @@ func (syncer *Syncer) InformNewHead(from peer.ID, fts *store.FullTipSet) bool {
return true
}
// IncomingBlocks spawns a goroutine that subscribes to the local eventbus to
// receive new block headers as they arrive from the network, and sends them to
// the returned channel.
//
// These blocks have not necessarily been incorporated to our view of the chain.
func (syncer *Syncer) IncomingBlocks(ctx context.Context) (<-chan *types.BlockHeader, error) {
sub := syncer.incoming.Sub(LocalIncoming)
out := make(chan *types.BlockHeader, 10)
@ -209,11 +238,15 @@ func (syncer *Syncer) IncomingBlocks(ctx context.Context) (<-chan *types.BlockHe
return out, nil
}
// ValidateMsgMeta performs structural and content hash validation of the
// messages within this block. If validation passes, it stores the messages in
// the underlying IPLD block store.
func (syncer *Syncer) ValidateMsgMeta(fblk *types.FullBlock) error {
if msgc := len(fblk.BlsMessages) + len(fblk.SecpkMessages); msgc > build.BlockMessageLimit {
return xerrors.Errorf("block %s has too many messages (%d)", fblk.Header.Cid(), msgc)
}
// Collect the CIDs of both types of messages separately: BLS and Secpk.
var bcids, scids []cbg.CBORMarshaler
for _, m := range fblk.BlsMessages {
c := cbg.CborCid(m.Cid())
@ -231,11 +264,14 @@ func (syncer *Syncer) ValidateMsgMeta(fblk *types.FullBlock) error {
blockstore := syncer.store.Blockstore()
bs := cbor.NewCborStore(blockstore)
// Compute the root CID of the combined message trie.
smroot, err := computeMsgMeta(bs, bcids, scids)
if err != nil {
return xerrors.Errorf("validating msgmeta, compute failed: %w", err)
}
// Check that the message trie root matches with what's in the block.
if fblk.Header.Messages != smroot {
return xerrors.Errorf("messages in full block did not match msgmeta root in header (%s != %s)", fblk.Header.Messages, smroot)
}
@ -345,6 +381,8 @@ func zipTipSetAndMessages(bs cbor.IpldStore, ts *types.TipSet, allbmsgs []*types
return fts, nil
}
// computeMsgMeta computes the root CID of the combined arrays of message CIDs
// of both types (BLS and Secpk).
func computeMsgMeta(bs cbor.IpldStore, bmsgCids, smsgCids []cbg.CBORMarshaler) (cid.Cid, error) {
ctx := context.TODO()
bmroot, err := amt.FromArray(ctx, bs, bmsgCids)
@ -368,14 +406,24 @@ func computeMsgMeta(bs cbor.IpldStore, bmsgCids, smsgCids []cbg.CBORMarshaler) (
return mrcid, nil
}
// FetchTipSet tries to load the provided tipset from the store, and falls back
// to the network (BlockSync) by querying the supplied peer if not found
// locally.
//
// {hint/usage} This is used from the HELLO protocol, to fetch the greeting
// peer's heaviest tipset if we don't have it.
func (syncer *Syncer) FetchTipSet(ctx context.Context, p peer.ID, tsk types.TipSetKey) (*store.FullTipSet, error) {
if fts, err := syncer.tryLoadFullTipSet(tsk); err == nil {
return fts, nil
}
// fall back to the network.
return syncer.Bsync.GetFullTipSet(ctx, p, tsk)
}
// tryLoadFullTipSet queries the tipset in the ChainStore, and returns a full
// representation of it containing FullBlocks. If ALL blocks are not found
// locally, it errors entirely with blockstore.ErrNotFound.
func (syncer *Syncer) tryLoadFullTipSet(tsk types.TipSetKey) (*store.FullTipSet, error) {
ts, err := syncer.store.LoadTipSet(tsk)
if err != nil {
@ -400,6 +448,12 @@ func (syncer *Syncer) tryLoadFullTipSet(tsk types.TipSetKey) (*store.FullTipSet,
return fts, nil
}
// Sync tries to advance our view of the chain to `maybeHead`. It does nothing
// if our current head is heavier than the requested tipset, or if we're already
// at the requested head, or if the head is the genesis.
//
// Most of the heavy-lifting logic happens in syncer#collectChain. Refer to the
// godocs on that method for a more detailed view.
func (syncer *Syncer) Sync(ctx context.Context, maybeHead *types.TipSet) error {
ctx, span := trace.StartSpan(ctx, "chain.Sync")
defer span.End()
@ -466,16 +520,27 @@ func (syncer *Syncer) ValidateTipSet(ctx context.Context, fts *store.FullTipSet)
return nil
}
var futures []async.ErrorFuture
for _, b := range fts.Blocks {
if err := syncer.ValidateBlock(ctx, b); err != nil {
if isPermanent(err) {
syncer.bad.Add(b.Cid(), err.Error())
}
return xerrors.Errorf("validating block %s: %w", b.Cid(), err)
}
b := b // rebind to a scoped variable
if err := syncer.sm.ChainStore().AddToTipSetTracker(b.Header); err != nil {
return xerrors.Errorf("failed to add validated header to tipset tracker: %w", err)
futures = append(futures, async.Err(func() error {
if err := syncer.ValidateBlock(ctx, b); err != nil {
if isPermanent(err) {
syncer.bad.Add(b.Cid(), err.Error())
}
return xerrors.Errorf("validating block %s: %w", b.Cid(), err)
}
if err := syncer.sm.ChainStore().AddToTipSetTracker(b.Header); err != nil {
return xerrors.Errorf("failed to add validated header to tipset tracker: %w", err)
}
return nil
}))
}
for _, f := range futures {
if err := f.AwaitContext(ctx); err != nil {
return err
}
}
return nil
@ -997,6 +1062,39 @@ func extractSyncState(ctx context.Context) *SyncerState {
return nil
}
// collectHeaders collects the headers from the blocks between any two tipsets.
//
// `from` is the heaviest/projected/target tipset we have learned about, and
// `to` is usually an anchor tipset we already have in our view of the chain
// (which could be the genesis).
//
// collectHeaders checks if portions of the chain are in our ChainStore; falling
// down to the network to retrieve the missing parts. If during the process, any
// portion we receive is in our denylist (bad list), we short-circuit.
//
// {hint/naming}: `from` and `to` is in inverse order. `from` is the highest,
// and `to` is the lowest. This method traverses the chain backwards.
//
// {hint/usage}: This is used by collectChain, which is in turn called from the
// main Sync method (Syncer#Sync), so it's a pretty central method.
//
// {hint/logic}: The logic of this method is as follows:
//
// 1. Check that the from tipset is not linked to a parent block known to be
// bad.
// 2. Check the consistency of beacon entries in the from tipset. We check
// total equality of the BeaconEntries in each block.
// 3. Travers the chain backwards, for each tipset:
// 3a. Load it from the chainstore; if found, it move on to its parent.
// 3b. Query our peers via BlockSync in batches, requesting up to a
// maximum of 500 tipsets every time.
//
// Once we've concluded, if we find a mismatching tipset at the height where the
// anchor tipset should be, we are facing a fork, and we invoke Syncer#syncFork
// to resolve it. Refer to the godocs there.
//
// All throughout the process, we keep checking if the received blocks are in
// the deny list, and short-circuit the process if so.
func (syncer *Syncer) collectHeaders(ctx context.Context, from *types.TipSet, to *types.TipSet) ([]*types.TipSet, error) {
ctx, span := trace.StartSpan(ctx, "collectHeaders")
defer span.End()
@ -1013,6 +1111,8 @@ func (syncer *Syncer) collectHeaders(ctx context.Context, from *types.TipSet, to
}
}
// Check if the parents of the from block are in the denylist.
// i.e. if a fork of the chain has been requested that we know to be bad.
for _, pcid := range from.Parents().Cids() {
if reason, ok := syncer.bad.Has(pcid); ok {
markBad("linked to %s", pcid)
@ -1083,8 +1183,8 @@ loop:
}
// NB: GetBlocks validates that the blocks are in-fact the ones we
// requested, and that they are correctly linked to eachother. It does
// not validate any state transitions
// requested, and that they are correctly linked to one another. It does
// not validate any state transitions.
window := 500
if gap := int(blockSet[len(blockSet)-1].Height() - untilHeight); gap < window {
window = gap
@ -1125,7 +1225,6 @@ loop:
at = blks[len(blks)-1].Parents()
}
// We have now ascertained that this is *not* a 'fast forward'
if !types.CidArrsEqual(blockSet[len(blockSet)-1].Parents().Cids(), to.Cids()) {
last := blockSet[len(blockSet)-1]
if last.Parents() == to.Parents() {
@ -1133,6 +1232,8 @@ loop:
return blockSet, nil
}
// We have now ascertained that this is *not* a 'fast forward'
log.Warnf("(fork detected) synced header chain (%s - %d) does not link to our best block (%s - %d)", from.Cids(), from.Height(), to.Cids(), to.Height())
fork, err := syncer.syncFork(ctx, last, to)
if err != nil {
@ -1154,6 +1255,12 @@ loop:
var ErrForkTooLong = fmt.Errorf("fork longer than threshold")
// syncFork tries to obtain the chain fragment that links a fork into a common
// ancestor in our view of the chain.
//
// If the fork is too long (build.ForkLengthThreshold), we add the entire subchain to the
// denylist. Else, we find the common ancestor, and add the missing chain
// fragment until the fork point to the returned []TipSet.
func (syncer *Syncer) syncFork(ctx context.Context, from *types.TipSet, to *types.TipSet) ([]*types.TipSet, error) {
tips, err := syncer.Bsync.GetBlocks(ctx, from.Parents(), int(build.ForkLengthThreshold))
if err != nil {
@ -1305,6 +1412,25 @@ func persistMessages(bs bstore.Blockstore, bst *blocksync.BSTipSet) error {
return nil
}
// collectChain tries to advance our view of the chain to the purported head.
//
// It goes through various stages:
//
// 1. StageHeaders: we proceed in the sync process by requesting block headers
// from our peers, moving back from their heads, until we reach a tipset
// that we have in common (such a common tipset must exist, thought it may
// simply be the genesis block).
//
// If the common tipset is our head, we treat the sync as a "fast-forward",
// else we must drop part of our chain to connect to the peer's head
// (referred to as "forking").
//
// 2. StagePersistHeaders: now that we've collected the missing headers,
// augmented by those on the other side of a fork, we persist them to the
// BlockStore.
//
// 3. StageMessages: having acquired the headers and found a common tipset,
// we then move forward, requesting the full blocks, including the messages.
func (syncer *Syncer) collectChain(ctx context.Context, ts *types.TipSet) error {
ctx, span := trace.StartSpan(ctx, "collectChain")
defer span.End()
@ -1354,9 +1480,8 @@ func (syncer *Syncer) collectChain(ctx context.Context, ts *types.TipSet) error
func VerifyElectionPoStVRF(ctx context.Context, worker address.Address, rand []byte, evrf []byte) error {
if build.InsecurePoStValidation {
return nil
} else {
return gen.VerifyVRF(ctx, worker, rand, evrf)
}
return gen.VerifyVRF(ctx, worker, rand, evrf)
}
func (syncer *Syncer) State() []SyncerState {
@ -1367,6 +1492,7 @@ func (syncer *Syncer) State() []SyncerState {
return out
}
// MarkBad manually adds a block to the "bad blocks" cache.
func (syncer *Syncer) MarkBad(blk cid.Cid) {
syncer.bad.Add(blk, "manually marked bad")
}
@ -1374,7 +1500,7 @@ func (syncer *Syncer) MarkBad(blk cid.Cid) {
func (syncer *Syncer) CheckBadBlockCache(blk cid.Cid) (string, bool) {
return syncer.bad.Has(blk)
}
func (syncer *Syncer) getLatestBeaconEntry(ctx context.Context, ts *types.TipSet) (*types.BeaconEntry, error) {
func (syncer *Syncer) getLatestBeaconEntry(_ context.Context, ts *types.TipSet) (*types.BeaconEntry, error) {
cur := ts
for i := 0; i < 20; i++ {
cbe := cur.Blocks()[0].BeaconEntries

View File

@ -21,16 +21,16 @@ type ExecutionTrace struct {
type GasTrace struct {
Name string
Location []Loc
TotalGas int64
ComputeGas int64
StorageGas int64
TotalVirtualGas int64
VirtualComputeGas int64
VirtualStorageGas int64
Location []Loc `json:"loc"`
TotalGas int64 `json:"tg"`
ComputeGas int64 `json:"cg"`
StorageGas int64 `json:"sg"`
TotalVirtualGas int64 `json:"vtg"`
VirtualComputeGas int64 `json:"vcg"`
VirtualStorageGas int64 `json:"vsg"`
TimeTaken time.Duration
Extra interface{} `json:",omitempty"`
TimeTaken time.Duration `json:"tt"`
Extra interface{} `json:"ex,omitempty"`
Callers []uintptr `json:"-"`
}

View File

@ -408,7 +408,7 @@ func (rt *Runtime) internalSend(from, to address.Address, method abi.MethodNum,
if subrt != nil {
rt.numActorsCreated = subrt.numActorsCreated
}
rt.executionTrace.Subcalls = append(rt.executionTrace.Subcalls, subrt.executionTrace) //&er)
rt.executionTrace.Subcalls = append(rt.executionTrace.Subcalls, subrt.executionTrace)
return ret, errSend
}

View File

@ -149,6 +149,7 @@ var importBenchCmd = &cli.Command{
if err != nil {
return err
}
stripCallers(trace)
lastTse = &TipSetExec{
TipSet: cur.Key(),
@ -168,6 +169,21 @@ var importBenchCmd = &cli.Command{
},
}
func walkExecutionTrace(et *types.ExecutionTrace) {
for _, gc := range et.GasCharges {
gc.Callers = nil
}
for _, sub := range et.Subcalls {
walkExecutionTrace(&sub) //nolint:scopelint,gosec
}
}
func stripCallers(trace []*api.InvocResult) {
for _, t := range trace {
walkExecutionTrace(&t.ExecutionTrace)
}
}
type Invocation struct {
TipSet types.TipSetKey
Invoc *api.InvocResult

View File

@ -118,7 +118,7 @@ create unique index if not exists block_cid_uindex
create materialized view if not exists state_heights
as select distinct height, parentstateroot from blocks;
create unique index if not exists state_heights_uindex
create index if not exists state_heights_index
on state_heights (height);
create index if not exists state_heights_height_index

View File

@ -53,6 +53,7 @@ type minerKey struct {
addr address.Address
act types.Actor
stateroot cid.Cid
tsKey types.TipSetKey
}
type minerInfo struct {
@ -66,10 +67,11 @@ type minerInfo struct {
type actorInfo struct {
stateroot cid.Cid
tsKey types.TipSetKey
state string
}
func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipSet, maxBatch int) {
func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types.TipSet, maxBatch int) {
var alk sync.Mutex
log.Infof("Getting synced block list")
@ -81,7 +83,7 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipS
allToSync := map[cid.Cid]*types.BlockHeader{}
toVisit := list.New()
for _, header := range ts.Blocks() {
for _, header := range headTs.Blocks() {
toVisit.PushBack(header)
}
@ -116,7 +118,7 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipS
for len(allToSync) > 0 {
actors := map[address.Address]map[types.Actor]actorInfo{}
addresses := map[address.Address]address.Address{}
addressToID := map[address.Address]address.Address{}
minH := abi.ChainEpoch(math.MaxInt64)
for _, header := range allToSync {
@ -129,7 +131,7 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipS
for c, header := range allToSync {
if header.Height < minH+abi.ChainEpoch(maxBatch) {
toSync[c] = header
addresses[header.Miner] = address.Undef
addressToID[header.Miner] = address.Undef
}
}
for c := range toSync {
@ -146,20 +148,20 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipS
}
if len(bh.Parents) == 0 { // genesis case
ts, _ := types.NewTipSet([]*types.BlockHeader{bh})
aadrs, err := api.StateListActors(ctx, ts.Key())
genesisTs, _ := types.NewTipSet([]*types.BlockHeader{bh})
aadrs, err := api.StateListActors(ctx, genesisTs.Key())
if err != nil {
log.Error(err)
return
}
parmap.Par(50, aadrs, func(addr address.Address) {
act, err := api.StateGetActor(ctx, addr, ts.Key())
act, err := api.StateGetActor(ctx, addr, genesisTs.Key())
if err != nil {
log.Error(err)
return
}
ast, err := api.StateReadState(ctx, act, ts.Key())
ast, err := api.StateReadState(ctx, act, genesisTs.Key())
if err != nil {
log.Error(err)
return
@ -177,9 +179,10 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipS
}
actors[addr][*act] = actorInfo{
stateroot: bh.ParentStateRoot,
tsKey: genesisTs.Key(),
state: string(state),
}
addresses[addr] = address.Undef
addressToID[addr] = address.Undef
alk.Unlock()
})
@ -206,11 +209,13 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipS
log.Error(err)
return
}
ast, err := api.StateReadState(ctx, &act, pts.Key())
if err != nil {
log.Error(err)
return
}
state, err := json.Marshal(ast.State)
if err != nil {
log.Error(err)
@ -225,8 +230,9 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipS
actors[addr][act] = actorInfo{
stateroot: bh.ParentStateRoot,
state: string(state),
tsKey: pts.Key(),
}
addresses[addr] = address.Undef
addressToID[addr] = address.Undef
alk.Unlock()
}
})
@ -238,18 +244,20 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipS
log.Infof("Resolving addresses")
for _, message := range msgs {
addresses[message.To] = address.Undef
addresses[message.From] = address.Undef
addressToID[message.To] = address.Undef
addressToID[message.From] = address.Undef
}
parmap.Par(50, parmap.KMapArr(addresses), func(addr address.Address) {
parmap.Par(50, parmap.KMapArr(addressToID), func(addr address.Address) {
// FIXME: cannot use EmptyTSK here since actorID's can change during reorgs, need to use the corresponding tipset.
// TODO: figure out a way to get the corresponding tipset...
raddr, err := api.StateLookupID(ctx, addr, types.EmptyTSK)
if err != nil {
log.Warn(err)
return
}
alk.Lock()
addresses[addr] = raddr
addressToID[addr] = raddr
alk.Unlock()
})
@ -267,6 +275,7 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipS
addr: addr,
act: actor,
stateroot: c.stateroot,
tsKey: c.tsKey,
}] = &minerInfo{}
}
}
@ -274,14 +283,17 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipS
parmap.Par(50, parmap.KVMapArr(miners), func(it func() (minerKey, *minerInfo)) {
k, info := it()
pow, err := api.StateMinerPower(ctx, k.addr, types.EmptyTSK)
// TODO: get the storage power actors state and and pull the miner power from there, currently this hits the
// storage power actor once for each miner for each tipset, we can do better by just getting it for each tipset
// and reading each miner power from the result.
pow, err := api.StateMinerPower(ctx, k.addr, k.tsKey)
if err != nil {
log.Error(err)
// Not sure why this would fail, but its probably worth continuing
}
info.power = pow.MinerPower.QualityAdjPower
sszs, err := api.StateMinerSectorCount(ctx, k.addr, types.EmptyTSK)
sszs, err := api.StateMinerSectorCount(ctx, k.addr, k.tsKey)
if err != nil {
log.Error(err)
return
@ -316,7 +328,7 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipS
log.Info("Storing address mapping")
if err := st.storeAddressMap(addresses); err != nil {
if err := st.storeAddressMap(addressToID); err != nil {
log.Error(err)
return
}
@ -361,7 +373,7 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipS
log.Infof("Get deals")
// TODO: incremental, gather expired
deals, err := api.StateMarketDeals(ctx, ts.Key())
deals, err := api.StateMarketDeals(ctx, headTs.Key())
if err != nil {
log.Error(err)
return

View File

@ -88,7 +88,7 @@ func PreSeal(maddr address.Address, spt abi.RegisteredSealProof, offset abi.Sect
return nil, nil, xerrors.Errorf("commit: %w", err)
}
if err := sb.FinalizeSector(context.TODO(), sid); err != nil {
if err := sb.FinalizeSector(context.TODO(), sid, nil); err != nil {
return nil, nil, xerrors.Errorf("trim cache: %w", err)
}

View File

@ -0,0 +1,94 @@
package main
import (
"fmt"
ma "github.com/multiformats/go-multiaddr"
"github.com/urfave/cli/v2"
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/specs-actors/actors/builtin/miner"
"github.com/filecoin-project/lotus/chain/actors"
"github.com/filecoin-project/lotus/chain/types"
lcli "github.com/filecoin-project/lotus/cli"
)
var actorCmd = &cli.Command{
Name: "actor",
Usage: "manipulate the miner actor",
Subcommands: []*cli.Command{
actorSetAddrsCmd,
},
}
var actorSetAddrsCmd = &cli.Command{
Name: "set-addrs",
Usage: "set addresses that your miner can be publically dialed on",
Flags: []cli.Flag{
&cli.Int64Flag{
Name: "gas-limit",
Usage: "set gas limit",
Value: 100000,
},
},
Action: func(cctx *cli.Context) error {
nodeAPI, closer, err := lcli.GetStorageMinerAPI(cctx)
if err != nil {
return err
}
defer closer()
api, acloser, err := lcli.GetFullNodeAPI(cctx)
if err != nil {
return err
}
defer acloser()
ctx := lcli.ReqContext(cctx)
var addrs []abi.Multiaddrs
for _, a := range cctx.Args().Slice() {
maddr, err := ma.NewMultiaddr(a)
if err != nil {
return fmt.Errorf("failed to parse %q as a multiaddr: %w", a, err)
}
addrs = append(addrs, maddr.Bytes())
}
maddr, err := nodeAPI.ActorAddress(ctx)
if err != nil {
return err
}
minfo, err := api.StateMinerInfo(ctx, maddr, types.EmptyTSK)
if err != nil {
return err
}
params, err := actors.SerializeParams(&miner.ChangeMultiaddrsParams{NewMultiaddrs: addrs})
if err != nil {
return err
}
gasLimit := cctx.Int64("gas-limit")
smsg, err := api.MpoolPushMessage(ctx, &types.Message{
To: maddr,
From: minfo.Worker,
Value: types.NewInt(0),
GasPrice: types.NewInt(1),
GasLimit: gasLimit,
Method: 18,
Params: params,
})
if err != nil {
return err
}
fmt.Printf("Requested multiaddrs change in message %s\n", smsg.Cid())
return nil
},
}

View File

@ -4,7 +4,6 @@ import (
"bytes"
"context"
"fmt"
"github.com/filecoin-project/specs-actors/actors/builtin/power"
"sort"
"time"
@ -13,6 +12,7 @@ import (
"golang.org/x/xerrors"
"github.com/filecoin-project/specs-actors/actors/builtin/miner"
"github.com/filecoin-project/specs-actors/actors/builtin/power"
sealing "github.com/filecoin-project/storage-fsm"
"github.com/filecoin-project/lotus/api"

View File

@ -22,7 +22,9 @@ func main() {
lotuslog.SetupLogLevels()
local := []*cli.Command{
dealsCmd,
actorCmd,
storageDealsCmd,
retrievalDealsCmd,
infoCmd,
initCmd,
rewardsCmd,

View File

@ -217,9 +217,9 @@ var getAskCmd = &cli.Command{
},
}
var dealsCmd = &cli.Command{
Name: "deals",
Usage: "interact with your deals",
var storageDealsCmd = &cli.Command{
Name: "storage-deals",
Usage: "Manage storage deals and related configuration",
Subcommands: []*cli.Command{
dealsImportDataCmd,
dealsListCmd,

View File

@ -26,6 +26,75 @@ var provingCmd = &cli.Command{
Subcommands: []*cli.Command{
provingInfoCmd,
provingDeadlinesCmd,
provingFaultsCmd,
},
}
var provingFaultsCmd = &cli.Command{
Name: "faults",
Usage: "View the currently known proving faulty sectors information",
Action: func(cctx *cli.Context) error {
nodeApi, closer, err := lcli.GetStorageMinerAPI(cctx)
if err != nil {
return err
}
defer closer()
api, acloser, err := lcli.GetFullNodeAPI(cctx)
if err != nil {
return err
}
defer acloser()
ctx := lcli.ReqContext(cctx)
maddr, err := nodeApi.ActorAddress(ctx)
if err != nil {
return xerrors.Errorf("getting actor address: %w", err)
}
var mas miner.State
{
mact, err := api.StateGetActor(ctx, maddr, types.EmptyTSK)
if err != nil {
return err
}
rmas, err := api.ChainReadObj(ctx, mact.Head)
if err != nil {
return err
}
if err := mas.UnmarshalCBOR(bytes.NewReader(rmas)); err != nil {
return err
}
}
faults, err := mas.Faults.All(100000000000)
if err != nil {
return err
}
if len(faults) == 0 {
fmt.Println("no faulty sectors")
}
head, err := api.ChainHead(ctx)
if err != nil {
return xerrors.Errorf("getting chain head: %w", err)
}
deadlines, err := api.StateMinerDeadlines(ctx, maddr, head.Key())
if err != nil {
return xerrors.Errorf("getting miner deadlines: %w", err)
}
tw := tabwriter.NewWriter(os.Stdout, 2, 4, 2, ' ', 0)
_, _ = fmt.Fprintln(tw, "deadline\tsectors")
for deadline, sectors := range deadlines.Due {
intersectSectors, _ := bitfield.IntersectBitField(sectors, mas.Faults)
if intersectSectors != nil {
allSectors, _ := intersectSectors.All(100000000000)
for _, num := range allSectors {
_, _ = fmt.Fprintf(tw, "%d\t%d\n", deadline, num)
}
}
}
return tw.Flush()
},
}

View File

@ -0,0 +1,45 @@
package main
import (
lcli "github.com/filecoin-project/lotus/cli"
"github.com/urfave/cli/v2"
)
var retrievalDealsCmd = &cli.Command{
Name: "retrieval-deals",
Usage: "Manage retrieval deals and related configuration",
Subcommands: []*cli.Command{
enableRetrievalCmd,
disableRetrievalCmd,
},
}
var enableRetrievalCmd = &cli.Command{
Name: "enable",
Usage: "Configure the miner to consider retrieval deal proposals",
Flags: []cli.Flag{},
Action: func(cctx *cli.Context) error {
api, closer, err := lcli.GetStorageMinerAPI(cctx)
if err != nil {
return err
}
defer closer()
return api.DealsSetAcceptingRetrievalDeals(lcli.DaemonContext(cctx), true)
},
}
var disableRetrievalCmd = &cli.Command{
Name: "disable",
Usage: "Configure the miner to reject all retrieval deal proposals",
Flags: []cli.Flag{},
Action: func(cctx *cli.Context) error {
api, closer, err := lcli.GetStorageMinerAPI(cctx)
if err != nil {
return err
}
defer closer()
return api.DealsSetAcceptingRetrievalDeals(lcli.DaemonContext(cctx), false)
},
}

View File

@ -27,6 +27,7 @@ var sectorsCmd = &cli.Command{
sectorsRefsCmd,
sectorsUpdateCmd,
sectorsPledgeCmd,
sectorsRemoveCmd,
},
}
@ -46,8 +47,9 @@ var sectorsPledgeCmd = &cli.Command{
}
var sectorsStatusCmd = &cli.Command{
Name: "status",
Usage: "Get the seal status of a sector by its ID",
Name: "status",
Usage: "Get the seal status of a sector by its number",
ArgsUsage: "<sectorNum>",
Flags: []cli.Flag{
&cli.BoolFlag{
Name: "log",
@ -63,7 +65,7 @@ var sectorsStatusCmd = &cli.Command{
ctx := lcli.ReqContext(cctx)
if !cctx.Args().Present() {
return fmt.Errorf("must specify sector ID to get status of")
return fmt.Errorf("must specify sector number to get status of")
}
id, err := strconv.ParseUint(cctx.Args().First(), 10, 64)
@ -208,6 +210,39 @@ var sectorsRefsCmd = &cli.Command{
},
}
var sectorsRemoveCmd = &cli.Command{
Name: "remove",
Usage: "Forcefully remove a sector (WARNING: This means losing power and collateral for the removed sector)",
ArgsUsage: "<sectorNum>",
Flags: []cli.Flag{
&cli.BoolFlag{
Name: "really-do-it",
Usage: "pass this flag if you know what you are doing",
},
},
Action: func(cctx *cli.Context) error {
if !cctx.Bool("really-do-it") {
return xerrors.Errorf("this is a command for advanced users, only use it if you are sure of what you are doing")
}
nodeApi, closer, err := lcli.GetStorageMinerAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := lcli.ReqContext(cctx)
if cctx.Args().Len() != 1 {
return xerrors.Errorf("must pass sector number")
}
id, err := strconv.ParseUint(cctx.Args().Get(0), 10, 64)
if err != nil {
return xerrors.Errorf("could not parse sector number: %w", err)
}
return nodeApi.SectorRemove(ctx, abi.SectorNumber(id))
},
}
var sectorsUpdateCmd = &cli.Command{
Name: "update-state",
Usage: "ADVANCED: manually update the state of a sector, this may aid in error recovery",
@ -228,12 +263,12 @@ var sectorsUpdateCmd = &cli.Command{
defer closer()
ctx := lcli.ReqContext(cctx)
if cctx.Args().Len() < 2 {
return xerrors.Errorf("must pass sector ID and new state")
return xerrors.Errorf("must pass sector number and new state")
}
id, err := strconv.ParseUint(cctx.Args().Get(0), 10, 64)
if err != nil {
return xerrors.Errorf("could not parse sector ID: %w", err)
return xerrors.Errorf("could not parse sector number: %w", err)
}
return nodeApi.SectorsUpdate(ctx, abi.SectorNumber(id), api.SectorState(cctx.Args().Get(1)))

8
go.mod
View File

@ -29,10 +29,10 @@ require (
github.com/filecoin-project/go-paramfetch v0.0.2-0.20200605171344-fcac609550ca
github.com/filecoin-project/go-statestore v0.1.0
github.com/filecoin-project/go-storedcounter v0.0.0-20200421200003-1c99c62e8a5b
github.com/filecoin-project/sector-storage v0.0.0-20200618073200-d9de9b7cb4b4
github.com/filecoin-project/sector-storage v0.0.0-20200625154333-98ef8e4ef246
github.com/filecoin-project/specs-actors v0.6.2-0.20200617175406-de392ca14121
github.com/filecoin-project/specs-storage v0.1.0
github.com/filecoin-project/storage-fsm v0.0.0-20200617183754-4380106d3e94
github.com/filecoin-project/specs-storage v0.1.1-0.20200622113353-88a9704877ea
github.com/filecoin-project/storage-fsm v0.0.0-20200625160832-379a4655b044
github.com/gbrlsnchs/jwt/v3 v3.0.0-beta.1
github.com/go-kit/kit v0.10.0
github.com/go-ole/go-ole v1.2.4 // indirect
@ -44,7 +44,7 @@ require (
github.com/influxdata/influxdb1-client v0.0.0-20191209144304-8bf82d3c094d
github.com/ipfs/go-bitswap v0.2.8
github.com/ipfs/go-block-format v0.0.2
github.com/ipfs/go-blockservice v0.1.3
github.com/ipfs/go-blockservice v0.1.4-0.20200624145336-a978cec6e834
github.com/ipfs/go-cid v0.0.6
github.com/ipfs/go-cidutil v0.0.2
github.com/ipfs/go-datastore v0.4.4

12
go.sum
View File

@ -252,8 +252,8 @@ github.com/filecoin-project/go-statestore v0.1.0/go.mod h1:LFc9hD+fRxPqiHiaqUEZO
github.com/filecoin-project/go-storedcounter v0.0.0-20200421200003-1c99c62e8a5b h1:fkRZSPrYpk42PV3/lIXiL0LHetxde7vyYYvSsttQtfg=
github.com/filecoin-project/go-storedcounter v0.0.0-20200421200003-1c99c62e8a5b/go.mod h1:Q0GQOBtKf1oE10eSXSlhN45kDBdGvEcVOqMiffqX+N8=
github.com/filecoin-project/sector-storage v0.0.0-20200615154852-728a47ab99d6/go.mod h1:M59QnAeA/oV+Z8oHFLoNpGMv0LZ8Rll+vHVXX7GirPM=
github.com/filecoin-project/sector-storage v0.0.0-20200618073200-d9de9b7cb4b4 h1:lQC8Fbyn31/H4QxYAYwVV3PYZ9vS61EmjktZc5CaiYs=
github.com/filecoin-project/sector-storage v0.0.0-20200618073200-d9de9b7cb4b4/go.mod h1:M59QnAeA/oV+Z8oHFLoNpGMv0LZ8Rll+vHVXX7GirPM=
github.com/filecoin-project/sector-storage v0.0.0-20200625154333-98ef8e4ef246 h1:NfYQRmVRe0LzlNbK5Ket3vbBOwFD5TvtcNtfo/Sd8mg=
github.com/filecoin-project/sector-storage v0.0.0-20200625154333-98ef8e4ef246/go.mod h1:8f0hWDzzIi1hKs4IVKH9RnDsO4LEHVz8BNat0okDOuY=
github.com/filecoin-project/specs-actors v0.0.0-20200210130641-2d1fbd8672cf/go.mod h1:xtDZUB6pe4Pksa/bAJbJ693OilaC5Wbot9jMhLm3cZA=
github.com/filecoin-project/specs-actors v0.3.0/go.mod h1:nQYnFbQ7Y0bHZyq6HDEuVlCPR+U3z5Q3wMOQ+2aiV+Y=
github.com/filecoin-project/specs-actors v0.6.0/go.mod h1:dRdy3cURykh2R8O/DKqy8olScl70rmIS7GrB4hB1IDY=
@ -261,8 +261,10 @@ github.com/filecoin-project/specs-actors v0.6.2-0.20200617175406-de392ca14121 h1
github.com/filecoin-project/specs-actors v0.6.2-0.20200617175406-de392ca14121/go.mod h1:dRdy3cURykh2R8O/DKqy8olScl70rmIS7GrB4hB1IDY=
github.com/filecoin-project/specs-storage v0.1.0 h1:PkDgTOT5W5Ao7752onjDl4QSv+sgOVdJbvFjOnD5w94=
github.com/filecoin-project/specs-storage v0.1.0/go.mod h1:Pr5ntAaxsh+sLG/LYiL4tKzvA83Vk5vLODYhfNwOg7k=
github.com/filecoin-project/storage-fsm v0.0.0-20200617183754-4380106d3e94 h1:zPKiZPMgkFF0Lq13hsk8lcWlxeVAs6vvJaa3uHn9v70=
github.com/filecoin-project/storage-fsm v0.0.0-20200617183754-4380106d3e94/go.mod h1:q1YCutTSMq/yGYvDPHReT37bPfDLHltnwJutzR9kOY0=
github.com/filecoin-project/specs-storage v0.1.1-0.20200622113353-88a9704877ea h1:iixjULRQFPn7Q9KlIqfwLJnlAXO10bbkI+xy5GKGdLY=
github.com/filecoin-project/specs-storage v0.1.1-0.20200622113353-88a9704877ea/go.mod h1:Pr5ntAaxsh+sLG/LYiL4tKzvA83Vk5vLODYhfNwOg7k=
github.com/filecoin-project/storage-fsm v0.0.0-20200625160832-379a4655b044 h1:i4oMhv1kx/MAUxRN4EM5tag5fI1uagrwQwINgKrzUt4=
github.com/filecoin-project/storage-fsm v0.0.0-20200625160832-379a4655b044/go.mod h1:JD7fmV1BYADDcy4EYQnqFH/rUzXsh0Je0jXarCjZqSk=
github.com/flynn/go-shlex v0.0.0-20150515145356-3f9db97f8568/go.mod h1:xEzjJPgXI435gkrCt3MPfRiAkVrwSbHsst4LCFVfpJc=
github.com/francoispqt/gojay v1.2.13 h1:d2m3sFjloqoIUQU3TsHBgj6qg/BVGlTBeHDUmyJnXKk=
github.com/francoispqt/gojay v1.2.13/go.mod h1:ehT5mTG4ua4581f1++1WLG0vPdaA9HaiDsoyrBGkyDY=
@ -463,6 +465,8 @@ github.com/ipfs/go-blockservice v0.0.7/go.mod h1:EOfb9k/Y878ZTRY/CH0x5+ATtaipfbR
github.com/ipfs/go-blockservice v0.1.0/go.mod h1:hzmMScl1kXHg3M2BjTymbVPjv627N7sYcvYaKbop39M=
github.com/ipfs/go-blockservice v0.1.3 h1:9XgsPMwwWJSC9uVr2pMDsW2qFTBSkxpGMhmna8mIjPM=
github.com/ipfs/go-blockservice v0.1.3/go.mod h1:OTZhFpkgY48kNzbgyvcexW9cHrpjBYIjSR0KoDOFOLU=
github.com/ipfs/go-blockservice v0.1.4-0.20200624145336-a978cec6e834 h1:hFJoI1D2a3MqiNkSb4nKwrdkhCngUxUTFNwVwovZX2s=
github.com/ipfs/go-blockservice v0.1.4-0.20200624145336-a978cec6e834/go.mod h1:OTZhFpkgY48kNzbgyvcexW9cHrpjBYIjSR0KoDOFOLU=
github.com/ipfs/go-cid v0.0.1/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM=
github.com/ipfs/go-cid v0.0.2/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM=
github.com/ipfs/go-cid v0.0.3/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM=

View File

@ -215,6 +215,9 @@ type MiningBase struct {
}
func (m *Miner) GetBestMiningCandidate(ctx context.Context) (*MiningBase, error) {
m.lk.Lock()
defer m.lk.Unlock()
bts, err := m.api.ChainHead(ctx)
if err != nil {
return nil, err
@ -252,6 +255,12 @@ func (m *Miner) hasPower(ctx context.Context, addr address.Address, ts *types.Ti
return mpower.MinerPower.QualityAdjPower.GreaterThanEqual(power.ConsensusMinerMinPower), nil
}
// mineOne mines a single block, and does so synchronously, if and only if we
// have won the current round.
//
// {hint/landmark}: This method coordinates all the steps involved in mining a
// block, including the condition of whether mine or not at all depending on
// whether we win the round or not.
func (m *Miner) mineOne(ctx context.Context, base *MiningBase) (*types.BlockMsg, error) {
log.Debugw("attempting to mine a block", "tipset", types.LogCids(base.TipSet.Cids()))
start := time.Now()

View File

@ -218,6 +218,7 @@ func Online() Option {
Override(new(dtypes.BootstrapPeers), modules.BuiltinBootstrap),
Override(new(dtypes.DrandBootstrap), modules.DrandBootstrap),
Override(new(dtypes.DrandConfig), modules.BuiltinDrandConfig),
Override(HandleIncomingMessagesKey, modules.HandleIncomingMessages),
@ -312,6 +313,8 @@ func Online() Option {
Override(new(gen.WinningPoStProver), storage.NewWinningPoStProver),
Override(new(*miner.Miner), modules.SetupBlockProducer),
Override(new(dtypes.AcceptingRetrievalDealsConfigFunc), modules.NewAcceptingRetrievalDealsConfigFunc),
Override(new(dtypes.SetAcceptingRetrievalDealsConfigFunc), modules.NewSetAcceptingRetrievalDealsConfigFunc),
Override(new(dtypes.AcceptingStorageDealsConfigFunc), modules.NewAcceptingStorageDealsConfigFunc),
Override(new(dtypes.SetAcceptingStorageDealsConfigFunc), modules.NewSetAcceptingStorageDealsConfigFunc),
Override(new(dtypes.StorageDealPieceCidBlocklistConfigFunc), modules.NewStorageDealPieceCidBlocklistConfigFunc),

View File

@ -34,8 +34,9 @@ type StorageMiner struct {
}
type DealmakingConfig struct {
AcceptingStorageDeals bool
PieceCidBlocklist []cid.Cid
AcceptingStorageDeals bool
AcceptingRetrievalDeals bool
PieceCidBlocklist []cid.Cid
}
// API contains configs for API endpoint
@ -123,8 +124,9 @@ func DefaultStorageMiner() *StorageMiner {
},
Dealmaking: DealmakingConfig{
AcceptingStorageDeals: true,
PieceCidBlocklist: []cid.Cid{},
AcceptingStorageDeals: true,
AcceptingRetrievalDeals: true,
PieceCidBlocklist: []cid.Cid{},
},
}
cfg.Common.API.ListenAddress = "/ip4/127.0.0.1/tcp/2345/http"

View File

@ -31,7 +31,7 @@ import (
"go.uber.org/fx"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
rm "github.com/filecoin-project/go-fil-markets/retrievalmarket"
"github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/sector-storage/ffiwrapper"
"github.com/filecoin-project/specs-actors/actors/abi"
@ -59,8 +59,8 @@ type API struct {
paych.PaychAPI
SMDealClient storagemarket.StorageClient
RetDiscovery retrievalmarket.PeerResolver
Retrieval retrievalmarket.RetrievalClient
RetDiscovery rm.PeerResolver
Retrieval rm.RetrievalClient
Chain *store.ChainStore
LocalDAG dtypes.ClientDAG
@ -202,7 +202,7 @@ func (a *API) ClientFindData(ctx context.Context, root cid.Cid) ([]api.QueryOffe
out := make([]api.QueryOffer, len(peers))
for k, p := range peers {
out[k] = a.makeRetrievalQuery(ctx, p, root, retrievalmarket.QueryParams{})
out[k] = a.makeRetrievalQuery(ctx, p, root, rm.QueryParams{})
}
return out, nil
@ -213,25 +213,25 @@ func (a *API) ClientMinerQueryOffer(ctx context.Context, payload cid.Cid, miner
if err != nil {
return api.QueryOffer{}, err
}
rp := retrievalmarket.RetrievalPeer{
rp := rm.RetrievalPeer{
Address: miner,
ID: mi.PeerId,
}
return a.makeRetrievalQuery(ctx, rp, payload, retrievalmarket.QueryParams{}), nil
return a.makeRetrievalQuery(ctx, rp, payload, rm.QueryParams{}), nil
}
func (a *API) makeRetrievalQuery(ctx context.Context, rp retrievalmarket.RetrievalPeer, payload cid.Cid, qp retrievalmarket.QueryParams) api.QueryOffer {
func (a *API) makeRetrievalQuery(ctx context.Context, rp rm.RetrievalPeer, payload cid.Cid, qp rm.QueryParams) api.QueryOffer {
queryResponse, err := a.Retrieval.Query(ctx, rp, payload, qp)
if err != nil {
return api.QueryOffer{Err: err.Error(), Miner: rp.Address, MinerPeerID: rp.ID}
}
var errStr string
switch queryResponse.Status {
case retrievalmarket.QueryResponseAvailable:
case rm.QueryResponseAvailable:
errStr = ""
case retrievalmarket.QueryResponseUnavailable:
case rm.QueryResponseUnavailable:
errStr = fmt.Sprintf("retrieval query offer was unavailable: %s", queryResponse.Message)
case retrievalmarket.QueryResponseError:
case rm.QueryResponseError:
errStr = fmt.Sprintf("retrieval query offer errored: %s", queryResponse.Message)
}
@ -345,13 +345,35 @@ func (a *API) ClientRetrieve(ctx context.Context, order api.RetrievalOrder, ref
retrievalResult := make(chan error, 1)
unsubscribe := a.Retrieval.SubscribeToEvents(func(event retrievalmarket.ClientEvent, state retrievalmarket.ClientDealState) {
unsubscribe := a.Retrieval.SubscribeToEvents(func(event rm.ClientEvent, state rm.ClientDealState) {
if state.PayloadCID.Equals(order.Root) {
switch state.Status {
case retrievalmarket.DealStatusFailed, retrievalmarket.DealStatusErrored:
retrievalResult <- xerrors.Errorf("Retrieval Error: %s", state.Message)
case retrievalmarket.DealStatusCompleted:
case rm.DealStatusCompleted:
retrievalResult <- nil
case rm.DealStatusRejected:
retrievalResult <- xerrors.Errorf("Retrieval Proposal Rejected: %s", state.Message)
case
rm.DealStatusDealNotFound,
rm.DealStatusErrored,
rm.DealStatusFailed:
retrievalResult <- xerrors.Errorf("Retrieval Error: %s", state.Message)
case
rm.DealStatusAccepted,
rm.DealStatusAwaitingAcceptance,
rm.DealStatusBlocksComplete,
rm.DealStatusFinalizing,
rm.DealStatusFundsNeeded,
rm.DealStatusFundsNeededLastPayment,
rm.DealStatusNew,
rm.DealStatusOngoing,
rm.DealStatusPaymentChannelAddingFunds,
rm.DealStatusPaymentChannelAllocatingLane,
rm.DealStatusPaymentChannelCreating,
rm.DealStatusPaymentChannelReady,
rm.DealStatusVerified:
return
default:
retrievalResult <- xerrors.Errorf("Unhandled Retrieval Status: %+v", state.Status)
}
}
})
@ -361,7 +383,7 @@ func (a *API) ClientRetrieve(ctx context.Context, order api.RetrievalOrder, ref
_, err := a.Retrieval.Retrieve(
ctx,
order.Root,
retrievalmarket.NewParamsV0(ppb, order.PaymentInterval, order.PaymentIntervalIncrease),
rm.NewParamsV0(ppb, order.PaymentInterval, order.PaymentIntervalIncrease),
order.Total,
order.MinerPeerID,
order.Client,

View File

@ -425,7 +425,7 @@ func (a *StateAPI) StateMarketParticipants(ctx context.Context, tsk types.TipSet
if err != nil {
return nil, err
}
locked, err := hamt.LoadNode(ctx, cst, state.EscrowTable, hamt.UseTreeBitWidth(5))
locked, err := hamt.LoadNode(ctx, cst, state.LockedTable, hamt.UseTreeBitWidth(5))
if err != nil {
return nil, err
}
@ -489,13 +489,11 @@ func (a *StateAPI) StateMarketDeals(ctx context.Context, tsk types.TipSetKey) (m
var s market.DealState
if err := sa.Get(ctx, i, &s); err != nil {
if err != nil {
if _, ok := err.(*amt.ErrNotFound); !ok {
return xerrors.Errorf("failed to get state for deal in proposals array: %w", err)
}
s.SectorStartEpoch = -1
if _, ok := err.(*amt.ErrNotFound); !ok {
return xerrors.Errorf("failed to get state for deal in proposals array: %w", err)
}
s.SectorStartEpoch = -1
}
out[strconv.FormatInt(int64(i), 10)] = api.MarketDeal{
Proposal: d,

View File

@ -44,6 +44,7 @@ type StorageMinerAPI struct {
*stores.Index
SetAcceptingStorageDealsConfigFunc dtypes.SetAcceptingStorageDealsConfigFunc
SetAcceptingRetrievalDealsConfigFunc dtypes.SetAcceptingRetrievalDealsConfigFunc
StorageDealPieceCidBlocklistConfigFunc dtypes.StorageDealPieceCidBlocklistConfigFunc
SetStorageDealPieceCidBlocklistConfigFunc dtypes.SetStorageDealPieceCidBlocklistConfigFunc
}
@ -174,6 +175,10 @@ func (sm *StorageMinerAPI) SectorsUpdate(ctx context.Context, id abi.SectorNumbe
return sm.Miner.ForceSectorState(ctx, id, sealing.SectorState(state))
}
func (sm *StorageMinerAPI) SectorRemove(ctx context.Context, id abi.SectorNumber) error {
return sm.Miner.RemoveSector(ctx, id)
}
func (sm *StorageMinerAPI) WorkerConnect(ctx context.Context, url string) error {
w, err := connectRemoteWorker(ctx, sm, url)
if err != nil {
@ -224,6 +229,10 @@ func (sm *StorageMinerAPI) DealsSetAcceptingStorageDeals(ctx context.Context, b
return sm.SetAcceptingStorageDealsConfigFunc(b)
}
func (sm *StorageMinerAPI) DealsSetAcceptingRetrievalDeals(ctx context.Context, b bool) error {
return sm.SetAcceptingRetrievalDealsConfigFunc(b)
}
func (sm *StorageMinerAPI) DealsImportData(ctx context.Context, deal cid.Cid, fname string) error {
fi, err := os.Open(fname)
if err != nil {

View File

@ -0,0 +1,6 @@
package dtypes
type DrandConfig struct {
Servers []string
ChainInfoJSON string
}

View File

@ -10,14 +10,22 @@ import (
type MinerAddress address.Address
type MinerID abi.ActorID
// AcceptingStorageDealsFunc is a function which reads from miner config to
// determine if the user has disabled storage deals (or not).
// AcceptingStorageDealsConfigFunc is a function which reads from miner config
// to determine if the user has disabled storage deals (or not).
type AcceptingStorageDealsConfigFunc func() (bool, error)
// SetAcceptingStorageDealsFunc is a function which is used to disable or enable
// storage deal acceptance.
// SetAcceptingStorageDealsConfigFunc is a function which is used to disable or
// enable storage deal acceptance.
type SetAcceptingStorageDealsConfigFunc func(bool) error
// AcceptingRetrievalDealsConfigFunc is a function which reads from miner config
// to determine if the user has disabled retrieval acceptance (or not).
type AcceptingRetrievalDealsConfigFunc func() (bool, error)
// SetAcceptingRetrievalDealsConfigFunc is a function which is used to disable
// or enable retrieval deal acceptance.
type SetAcceptingRetrievalDealsConfigFunc func(bool) error
// StorageDealPieceCidBlocklistConfigFunc is a function which reads from miner config
// to obtain a list of CIDs for which the storage miner will not accept storage
// proposals.

View File

@ -44,13 +44,14 @@ type GossipIn struct {
Db dtypes.DrandBootstrap
Cfg *config.Pubsub
Sk *dtypes.ScoreKeeper
Dr dtypes.DrandConfig
}
func getDrandTopic() (string, error) {
func getDrandTopic(chainInfoJSON string) (string, error) {
var drandInfo = struct {
Hash string `json:"hash"`
}{}
err := json.Unmarshal([]byte(build.DrandChain), &drandInfo)
err := json.Unmarshal([]byte(chainInfoJSON), &drandInfo)
if err != nil {
return "", xerrors.Errorf("could not unmarshal drand chain info: %w", err)
}
@ -68,7 +69,7 @@ func GossipSub(in GossipIn) (service *pubsub.PubSub, err error) {
}
isBootstrapNode := in.Cfg.Bootstrapper
drandTopic, err := getDrandTopic()
drandTopic, err := getDrandTopic(in.Dr.ChainInfoJSON)
if err != nil {
return nil, err
}

View File

@ -108,8 +108,13 @@ func RetrievalResolver(l *discovery.Local) retrievalmarket.PeerResolver {
type RandomBeaconParams struct {
fx.In
PubSub *pubsub.PubSub `optional:"true"`
Cs *store.ChainStore
PubSub *pubsub.PubSub `optional:"true"`
Cs *store.ChainStore
DrandConfig dtypes.DrandConfig
}
func BuiltinDrandConfig() dtypes.DrandConfig {
return build.DrandConfig
}
func RandomBeacon(p RandomBeaconParams, _ dtypes.AfterGenesisSet) (beacon.RandomBeacon, error) {
@ -119,5 +124,5 @@ func RandomBeacon(p RandomBeaconParams, _ dtypes.AfterGenesisSet) (beacon.Random
}
//return beacon.NewMockBeacon(build.BlockDelay * time.Second)
return drand.NewDrandBeacon(gen.Timestamp, build.BlockDelay, p.PubSub)
return drand.NewDrandBeacon(gen.Timestamp, build.BlockDelay, p.PubSub, p.DrandConfig)
}

View File

@ -357,14 +357,31 @@ func StorageProvider(minerAddress dtypes.MinerAddress, ffiConfig *ffiwrapper.Con
}
// RetrievalProvider creates a new retrieval provider attached to the provider blockstore
func RetrievalProvider(h host.Host, miner *storage.Miner, sealer sectorstorage.SectorManager, full lapi.FullNode, ds dtypes.MetadataDS, pieceStore dtypes.ProviderPieceStore, ibs dtypes.StagingBlockstore) (retrievalmarket.RetrievalProvider, error) {
func RetrievalProvider(h host.Host, miner *storage.Miner, sealer sectorstorage.SectorManager, full lapi.FullNode, ds dtypes.MetadataDS, pieceStore dtypes.ProviderPieceStore, ibs dtypes.StagingBlockstore, isAcceptingFunc dtypes.AcceptingRetrievalDealsConfigFunc) (retrievalmarket.RetrievalProvider, error) {
adapter := retrievaladapter.NewRetrievalProviderNode(miner, sealer, full)
address, err := minerAddrFromDS(ds)
maddr, err := minerAddrFromDS(ds)
if err != nil {
return nil, err
}
network := rmnet.NewFromLibp2pHost(h)
return retrievalimpl.NewProvider(address, adapter, network, pieceStore, ibs, namespace.Wrap(ds, datastore.NewKey("/retrievals/provider")))
netwk := rmnet.NewFromLibp2pHost(h)
opt := retrievalimpl.DealDeciderOpt(func(ctx context.Context, state retrievalmarket.ProviderDealState) (bool, string, error) {
b, err := isAcceptingFunc()
if err != nil {
return false, "miner error", err
}
if !b {
log.Warn("retrieval deal acceptance disabled; rejecting retrieval deal proposal from client")
return false, "miner is not accepting retrieval deals", nil
}
return true, "", nil
})
return retrievalimpl.NewProvider(maddr, adapter, netwk, pieceStore, ibs, namespace.Wrap(ds, datastore.NewKey("/retrievals/provider")), opt)
}
func SectorStorage(mctx helpers.MetricsCtx, lc fx.Lifecycle, ls stores.LocalStorage, si stores.SectorIndex, cfg *ffiwrapper.Config, sc sectorstorage.SealerConfig, urls sectorstorage.URLs, sa sectorstorage.StorageAuth) (*sectorstorage.Manager, error) {
@ -399,6 +416,24 @@ func StorageAuth(ctx helpers.MetricsCtx, ca lapi.Common) (sectorstorage.StorageA
return sectorstorage.StorageAuth(headers), nil
}
func NewAcceptingRetrievalDealsConfigFunc(r repo.LockedRepo) (dtypes.AcceptingRetrievalDealsConfigFunc, error) {
return func() (out bool, err error) {
err = readCfg(r, func(cfg *config.StorageMiner) {
out = cfg.Dealmaking.AcceptingRetrievalDeals
})
return
}, nil
}
func NewSetAcceptingRetrievalDealsConfigFunc(r repo.LockedRepo) (dtypes.SetAcceptingRetrievalDealsConfigFunc, error) {
return func(b bool) (err error) {
err = mutateCfg(r, func(cfg *config.StorageMiner) {
cfg.Dealmaking.AcceptingRetrievalDeals = b
})
return
}, nil
}
func NewAcceptingStorageDealsConfigFunc(r repo.LockedRepo) (dtypes.AcceptingStorageDealsConfigFunc, error) {
return func() (out bool, err error) {
err = readCfg(r, func(cfg *config.StorageMiner) {

View File

@ -39,3 +39,7 @@ func (m *Miner) PledgeSector() error {
func (m *Miner) ForceSectorState(ctx context.Context, id abi.SectorNumber, state sealing.SectorState) error {
return m.sealing.ForceSectorState(ctx, id, state)
}
func (m *Miner) RemoveSector(ctx context.Context, id abi.SectorNumber) error {
return m.sealing.Remove(ctx, id)
}

View File

@ -383,17 +383,14 @@ func (s *WindowPoStScheduler) runPost(ctx context.Context, di miner.DeadlineInfo
return nil, xerrors.Errorf("get need prove sectors: %w", err)
}
var skipped *abi.BitField
{
good, err := s.checkSectors(ctx, nps)
if err != nil {
return nil, xerrors.Errorf("checking sectors to skip: %w", err)
}
good, err := s.checkSectors(ctx, nps)
if err != nil {
return nil, xerrors.Errorf("checking sectors to skip: %w", err)
}
skipped, err = bitfield.SubtractBitField(nps, good)
if err != nil {
return nil, xerrors.Errorf("nps - good: %w", err)
}
skipped, err := bitfield.SubtractBitField(nps, good)
if err != nil {
return nil, xerrors.Errorf("nps - good: %w", err)
}
skipCount, err := skipped.Count()
@ -401,7 +398,7 @@ func (s *WindowPoStScheduler) runPost(ctx context.Context, di miner.DeadlineInfo
return nil, xerrors.Errorf("getting skipped sector count: %w", err)
}
ssi, err := s.sortedSectorInfo(ctx, nps, ts)
ssi, err := s.sortedSectorInfo(ctx, good, ts)
if err != nil {
return nil, xerrors.Errorf("getting sorted sector info: %w", err)
}