This commit is contained in:
Sami Mäkelä 2020-08-19 12:26:19 +03:00
commit e894f360a5
57 changed files with 1091 additions and 248 deletions

View File

@ -17,18 +17,27 @@ A brief description of the problem you encountered while proving (sealing) a sec
Including what commands you ran, and a description of your setup, is very helpful. Including what commands you ran, and a description of your setup, is very helpful.
**Sectors list**
The output of `./lotus-miner sectors list`.
**Sectors status** **Sectors status**
The output of `./lotus-miner sectors status --log <sectorId>` for the failed sector(s). The output of `lotus-miner sectors status --log <sectorId>` for the failed sector(s).
**Lotus miner logs** **Lotus miner logs**
Please go through the logs of your miner, and include screenshots of any error-like messages you find. Please go through the logs of your miner, and include screenshots of any error-like messages you find.
Alternatively please upload full log files and share a link here
**Lotus miner diagnostic info**
Please collect the following diagnostic information, and share a link here
* lotus-miner diagnostic info `lotus-miner info all > allinfo`
** Code modifications **
If you have modified parts of lotus, please describe which areas were modified,
and the scope of those modifications
**Version** **Version**
The output of `./lotus --version`. The output of `lotus --version`.

View File

@ -6,6 +6,14 @@
<h1 align="center">Project Lotus - 莲</h1> <h1 align="center">Project Lotus - 莲</h1>
<p align="center">
<a href="https://circleci.com/gh/filecoin-project/lotus"><img src="https://circleci.com/gh/filecoin-project/lotus.svg?style=svg"></a>
<a href="https://codecov.io/gh/filecoin-project/lotus"><img src="https://codecov.io/gh/filecoin-project/lotus/branch/master/graph/badge.svg"></a>
<a href="https://goreportcard.com/report/github.com/filecoin-project/lotus"><img src="https://goreportcard.com/badge/github.com/filecoin-project/lotus" /></a>
<a href=""><img src="https://img.shields.io/badge/golang-%3E%3D1.14.7-blue.svg" /></a>
<br>
</p>
Lotus is an implementation of the Filecoin Distributed Storage Network. For more details about Filecoin, check out the [Filecoin Spec](https://spec.filecoin.io). Lotus is an implementation of the Filecoin Distributed Storage Network. For more details about Filecoin, check out the [Filecoin Spec](https://spec.filecoin.io).
## Building & Documentation ## Building & Documentation

View File

@ -244,7 +244,10 @@ type FullNode interface {
// ClientMinerQueryOffer returns a QueryOffer for the specific miner and file. // ClientMinerQueryOffer returns a QueryOffer for the specific miner and file.
ClientMinerQueryOffer(ctx context.Context, miner address.Address, root cid.Cid, piece *cid.Cid) (QueryOffer, error) ClientMinerQueryOffer(ctx context.Context, miner address.Address, root cid.Cid, piece *cid.Cid) (QueryOffer, error)
// ClientRetrieve initiates the retrieval of a file, as specified in the order. // ClientRetrieve initiates the retrieval of a file, as specified in the order.
ClientRetrieve(ctx context.Context, order RetrievalOrder, ref *FileRef) (<-chan marketevents.RetrievalEvent, error) ClientRetrieve(ctx context.Context, order RetrievalOrder, ref *FileRef) error
// ClientRetrieveWithEvents initiates the retrieval of a file, as specified in the order, and provides a channel
// of status updates.
ClientRetrieveWithEvents(ctx context.Context, order RetrievalOrder, ref *FileRef) (<-chan marketevents.RetrievalEvent, error)
// ClientQueryAsk returns a signed StorageAsk from the specified miner. // ClientQueryAsk returns a signed StorageAsk from the specified miner.
ClientQueryAsk(ctx context.Context, p peer.ID, miner address.Address) (*storagemarket.SignedStorageAsk, error) ClientQueryAsk(ctx context.Context, p peer.ID, miner address.Address) (*storagemarket.SignedStorageAsk, error)
// ClientCalcCommP calculates the CommP for a specified file // ClientCalcCommP calculates the CommP for a specified file

View File

@ -127,20 +127,21 @@ type FullNodeStruct struct {
WalletImport func(context.Context, *types.KeyInfo) (address.Address, error) `perm:"admin"` WalletImport func(context.Context, *types.KeyInfo) (address.Address, error) `perm:"admin"`
WalletDelete func(context.Context, address.Address) error `perm:"write"` WalletDelete func(context.Context, address.Address) error `perm:"write"`
ClientImport func(ctx context.Context, ref api.FileRef) (*api.ImportRes, error) `perm:"admin"` ClientImport func(ctx context.Context, ref api.FileRef) (*api.ImportRes, error) `perm:"admin"`
ClientListImports func(ctx context.Context) ([]api.Import, error) `perm:"write"` ClientListImports func(ctx context.Context) ([]api.Import, error) `perm:"write"`
ClientRemoveImport func(ctx context.Context, importID multistore.StoreID) error `perm:"admin"` ClientRemoveImport func(ctx context.Context, importID multistore.StoreID) error `perm:"admin"`
ClientHasLocal func(ctx context.Context, root cid.Cid) (bool, error) `perm:"write"` ClientHasLocal func(ctx context.Context, root cid.Cid) (bool, error) `perm:"write"`
ClientFindData func(ctx context.Context, root cid.Cid, piece *cid.Cid) ([]api.QueryOffer, error) `perm:"read"` ClientFindData func(ctx context.Context, root cid.Cid, piece *cid.Cid) ([]api.QueryOffer, error) `perm:"read"`
ClientMinerQueryOffer func(ctx context.Context, miner address.Address, root cid.Cid, piece *cid.Cid) (api.QueryOffer, error) `perm:"read"` ClientMinerQueryOffer func(ctx context.Context, miner address.Address, root cid.Cid, piece *cid.Cid) (api.QueryOffer, error) `perm:"read"`
ClientStartDeal func(ctx context.Context, params *api.StartDealParams) (*cid.Cid, error) `perm:"admin"` ClientStartDeal func(ctx context.Context, params *api.StartDealParams) (*cid.Cid, error) `perm:"admin"`
ClientGetDealInfo func(context.Context, cid.Cid) (*api.DealInfo, error) `perm:"read"` ClientGetDealInfo func(context.Context, cid.Cid) (*api.DealInfo, error) `perm:"read"`
ClientListDeals func(ctx context.Context) ([]api.DealInfo, error) `perm:"write"` ClientListDeals func(ctx context.Context) ([]api.DealInfo, error) `perm:"write"`
ClientRetrieve func(ctx context.Context, order api.RetrievalOrder, ref *api.FileRef) (<-chan marketevents.RetrievalEvent, error) `perm:"admin"` ClientRetrieve func(ctx context.Context, order api.RetrievalOrder, ref *api.FileRef) error `perm:"admin"`
ClientQueryAsk func(ctx context.Context, p peer.ID, miner address.Address) (*storagemarket.SignedStorageAsk, error) `perm:"read"` ClientRetrieveWithEvents func(ctx context.Context, order api.RetrievalOrder, ref *api.FileRef) (<-chan marketevents.RetrievalEvent, error) `perm:"admin"`
ClientCalcCommP func(ctx context.Context, inpath string) (*api.CommPRet, error) `perm:"read"` ClientQueryAsk func(ctx context.Context, p peer.ID, miner address.Address) (*storagemarket.SignedStorageAsk, error) `perm:"read"`
ClientGenCar func(ctx context.Context, ref api.FileRef, outpath string) error `perm:"write"` ClientCalcCommP func(ctx context.Context, inpath string) (*api.CommPRet, error) `perm:"read"`
ClientDealSize func(ctx context.Context, root cid.Cid) (api.DataSize, error) `perm:"read"` ClientGenCar func(ctx context.Context, ref api.FileRef, outpath string) error `perm:"write"`
ClientDealSize func(ctx context.Context, root cid.Cid) (api.DataSize, error) `perm:"read"`
StateNetworkName func(context.Context) (dtypes.NetworkName, error) `perm:"read"` StateNetworkName func(context.Context) (dtypes.NetworkName, error) `perm:"read"`
StateMinerSectors func(context.Context, address.Address, *abi.BitField, bool, types.TipSetKey) ([]*api.ChainSectorInfo, error) `perm:"read"` StateMinerSectors func(context.Context, address.Address, *abi.BitField, bool, types.TipSetKey) ([]*api.ChainSectorInfo, error) `perm:"read"`
@ -425,10 +426,14 @@ func (c *FullNodeStruct) ClientListDeals(ctx context.Context) ([]api.DealInfo, e
return c.Internal.ClientListDeals(ctx) return c.Internal.ClientListDeals(ctx)
} }
func (c *FullNodeStruct) ClientRetrieve(ctx context.Context, order api.RetrievalOrder, ref *api.FileRef) (<-chan marketevents.RetrievalEvent, error) { func (c *FullNodeStruct) ClientRetrieve(ctx context.Context, order api.RetrievalOrder, ref *api.FileRef) error {
return c.Internal.ClientRetrieve(ctx, order, ref) return c.Internal.ClientRetrieve(ctx, order, ref)
} }
func (c *FullNodeStruct) ClientRetrieveWithEvents(ctx context.Context, order api.RetrievalOrder, ref *api.FileRef) (<-chan marketevents.RetrievalEvent, error) {
return c.Internal.ClientRetrieveWithEvents(ctx, order, ref)
}
func (c *FullNodeStruct) ClientQueryAsk(ctx context.Context, p peer.ID, miner address.Address) (*storagemarket.SignedStorageAsk, error) { func (c *FullNodeStruct) ClientQueryAsk(ctx context.Context, p peer.ID, miner address.Address) (*storagemarket.SignedStorageAsk, error) {
return c.Internal.ClientQueryAsk(ctx, p, miner) return c.Internal.ClientQueryAsk(ctx, p, miner)
} }

View File

@ -398,7 +398,7 @@ func testRetrieval(t *testing.T, ctx context.Context, err error, client *impl.Fu
Path: filepath.Join(rpath, "ret"), Path: filepath.Join(rpath, "ret"),
IsCAR: carExport, IsCAR: carExport,
} }
updates, err := client.ClientRetrieve(ctx, offers[0].Order(caddr), ref) updates, err := client.ClientRetrieveWithEvents(ctx, offers[0].Order(caddr), ref)
for update := range updates { for update := range updates {
if update.Err != "" { if update.Err != "" {
t.Fatalf("%v", err) t.Fatalf("%v", err)

View File

@ -50,7 +50,7 @@ type MinerInfo struct {
Worker address.Address // Must be an ID-address. Worker address.Address // Must be an ID-address.
NewWorker address.Address // Must be an ID-address. NewWorker address.Address // Must be an ID-address.
WorkerChangeEpoch abi.ChainEpoch WorkerChangeEpoch abi.ChainEpoch
PeerId peer.ID PeerId *peer.ID
Multiaddrs []abi.Multiaddrs Multiaddrs []abi.Multiaddrs
SealProofType abi.RegisteredSealProof SealProofType abi.RegisteredSealProof
SectorSize abi.SectorSize SectorSize abi.SectorSize
@ -58,12 +58,17 @@ type MinerInfo struct {
} }
func NewApiMinerInfo(info *miner.MinerInfo) MinerInfo { func NewApiMinerInfo(info *miner.MinerInfo) MinerInfo {
var pid *peer.ID
if peerID, err := peer.IDFromBytes(info.PeerId); err == nil {
pid = &peerID
}
mi := MinerInfo{ mi := MinerInfo{
Owner: info.Owner, Owner: info.Owner,
Worker: info.Worker, Worker: info.Worker,
NewWorker: address.Undef, NewWorker: address.Undef,
WorkerChangeEpoch: -1, WorkerChangeEpoch: -1,
PeerId: peer.ID(info.PeerId), PeerId: pid,
Multiaddrs: info.Multiaddrs, Multiaddrs: info.Multiaddrs,
SealProofType: info.SealProofType, SealProofType: info.SealProofType,
SectorSize: info.SectorSize, SectorSize: info.SectorSize,

View File

@ -89,11 +89,14 @@ const VerifSigCacheSize = 32000
// TODO: If this is gonna stay, it should move to specs-actors // TODO: If this is gonna stay, it should move to specs-actors
const BlockMessageLimit = 10000 const BlockMessageLimit = 10000
const BlockGasLimit = 10_000_000_000 const BlockGasLimit = 10_000_000_000
const BlockGasTarget = BlockGasLimit / 2 const BlockGasTarget = BlockGasLimit / 2
const BaseFeeMaxChangeDenom = 8 // 12.5% const BaseFeeMaxChangeDenom = 8 // 12.5%
const InitialBaseFee = 100e6 const InitialBaseFee = 100e6
const MinimumBaseFee = 100 const MinimumBaseFee = 100
const PackingEfficiencyNum = 4
const PackingEfficiencyDenom = 5
// Actor consts // Actor consts
// TODO: Pull from actors when its made not private // TODO: Pull from actors when its made not private

View File

@ -66,4 +66,7 @@ var (
// Actor consts // Actor consts
// TODO: Pull from actors when its made not private // TODO: Pull from actors when its made not private
MinDealDuration = abi.ChainEpoch(180 * builtin.EpochsInDay) MinDealDuration = abi.ChainEpoch(180 * builtin.EpochsInDay)
PackingEfficiencyNum int64 = 4
PackingEfficiencyDenom int64 = 5
) )

View File

@ -107,6 +107,17 @@ var DefaultVerifregRootkeyActor = genesis.Actor{
Meta: rootkeyMultisig.ActorMeta(), Meta: rootkeyMultisig.ActorMeta(),
} }
var remAccTestKey, _ = address.NewFromString("t1ceb34gnsc6qk5dt6n7xg6ycwzasjhbxm3iylkiy")
var remAccMeta = genesis.AccountMeta{
Owner: remAccTestKey,
}
var DefaultRemainderAccountActor = genesis.Actor{
Type: genesis.TAccount,
Balance: big.NewInt(0),
Meta: remAccMeta.ActorMeta(),
}
func NewGeneratorWithSectors(numSectors int) (*ChainGen, error) { func NewGeneratorWithSectors(numSectors int) (*ChainGen, error) {
saminer.SupportedProofTypes = map[abi.RegisteredSealProof]struct{}{ saminer.SupportedProofTypes = map[abi.RegisteredSealProof]struct{}{
abi.RegisteredSealProof_StackedDrg2KiBV1: {}, abi.RegisteredSealProof_StackedDrg2KiBV1: {},
@ -210,9 +221,10 @@ func NewGeneratorWithSectors(numSectors int) (*ChainGen, error) {
*genm1, *genm1,
*genm2, *genm2,
}, },
VerifregRootKey: DefaultVerifregRootkeyActor, VerifregRootKey: DefaultVerifregRootkeyActor,
NetworkName: "", RemainderAccount: DefaultRemainderAccountActor,
Timestamp: uint64(build.Clock.Now().Add(-500 * time.Duration(build.BlockDelaySecs) * time.Second).Unix()), NetworkName: "",
Timestamp: uint64(build.Clock.Now().Add(-500 * time.Duration(build.BlockDelaySecs) * time.Second).Unix()),
} }
genb, err := genesis2.MakeGenesisBlock(context.TODO(), bs, sys, tpl) genb, err := genesis2.MakeGenesisBlock(context.TODO(), bs, sys, tpl)
@ -304,7 +316,8 @@ func (cg *ChainGen) GenesisCar() ([]byte, error) {
func CarWalkFunc(nd format.Node) (out []*format.Link, err error) { func CarWalkFunc(nd format.Node) (out []*format.Link, err error) {
for _, link := range nd.Links() { for _, link := range nd.Links() {
if link.Cid.Prefix().Codec == cid.FilCommitmentSealed || link.Cid.Prefix().Codec == cid.FilCommitmentUnsealed { pref := link.Cid.Prefix()
if pref.Codec == cid.FilCommitmentSealed || pref.Codec == cid.FilCommitmentUnsealed {
continue continue
} }
out = append(out, link) out = append(out, link)

View File

@ -0,0 +1,41 @@
package genesis
import (
"encoding/hex"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
"github.com/multiformats/go-multihash"
)
const genesisMultihashString = "1220107d821c25dc0735200249df94a8bebc9c8e489744f86a4ca8919e81f19dcd72"
const genesisBlockHex = "a5684461746574696d6573323031372d30352d30352030313a32373a3531674e6574776f726b6846696c65636f696e65546f6b656e6846696c65636f696e6c546f6b656e416d6f756e7473a36b546f74616c537570706c796d322c3030302c3030302c303030664d696e6572736d312c3430302c3030302c3030306c50726f746f636f6c4c616273a36b446576656c6f706d656e746b3330302c3030302c3030306b46756e6472616973696e676b3230302c3030302c3030306a466f756e646174696f6e6b3130302c3030302c303030674d657373616765784854686973206973207468652047656e6573697320426c6f636b206f66207468652046696c65636f696e20446563656e7472616c697a65642053746f72616765204e6574776f726b2e"
var cidBuilder = cid.V1Builder{Codec: cid.DagCBOR, MhType: multihash.SHA2_256}
func expectedCid() cid.Cid {
mh, err := multihash.FromHexString(genesisMultihashString)
if err != nil {
panic(err)
}
return cid.NewCidV1(cidBuilder.Codec, mh)
}
func getGenesisBlock() (blocks.Block, error) {
genesisBlockData, err := hex.DecodeString(genesisBlockHex)
if err != nil {
return nil, err
}
genesisCid, err := cidBuilder.Sum(genesisBlockData)
if err != nil {
return nil, err
}
block, err := blocks.NewBlockWithCid(genesisBlockData, genesisCid)
if err != nil {
return nil, err
}
return block, nil
}

View File

@ -3,6 +3,7 @@ package genesis
import ( import (
"context" "context"
"encoding/json" "encoding/json"
"fmt"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore"
@ -258,11 +259,47 @@ func MakeInitialStateTree(ctx context.Context, bs bstore.Blockstore, template ge
Balance: types.NewInt(0), Balance: types.NewInt(0),
Head: verifierState, Head: verifierState,
}) })
if err != nil { if err != nil {
return nil, nil, xerrors.Errorf("setting account from actmap: %w", err) return nil, nil, xerrors.Errorf("setting account from actmap: %w", err)
} }
totalFilAllocated := big.Zero()
// flush as ForEach works on the HAMT
if _, err := state.Flush(ctx); err != nil {
return nil, nil, err
}
err = state.ForEach(func(addr address.Address, act *types.Actor) error {
totalFilAllocated = big.Add(totalFilAllocated, act.Balance)
return nil
})
if err != nil {
return nil, nil, xerrors.Errorf("summing account balances in state tree: %w", err)
}
totalFil := big.Mul(big.NewInt(int64(build.FilBase)), big.NewInt(int64(build.FilecoinPrecision)))
remainingFil := big.Sub(totalFil, totalFilAllocated)
if remainingFil.Sign() < 0 {
return nil, nil, xerrors.Errorf("somehow overallocated filecoin (allocated = %s)", types.FIL(totalFilAllocated))
}
remAccKey, err := address.NewIDAddress(90)
if err != nil {
return nil, nil, err
}
if err := createAccount(ctx, bs, cst, state, remAccKey, template.RemainderAccount); err != nil {
return nil, nil, err
}
err = state.SetActor(remAccKey, &types.Actor{
Code: builtin.AccountActorCodeID,
Balance: remainingFil,
Head: emptyobject,
})
if err != nil {
return nil, nil, xerrors.Errorf("set burnt funds account actor: %w", err)
}
return state, keyIDs, nil return state, keyIDs, nil
} }
@ -284,6 +321,7 @@ func createAccount(ctx context.Context, bs bstore.Blockstore, cst cbor.IpldStore
if err != nil { if err != nil {
return xerrors.Errorf("setting account from actmap: %w", err) return xerrors.Errorf("setting account from actmap: %w", err)
} }
return nil
} else if info.Type == genesis.TMultisig { } else if info.Type == genesis.TMultisig {
var ainfo genesis.MultisigMeta var ainfo genesis.MultisigMeta
if err := json.Unmarshal(info.Meta, &ainfo); err != nil { if err := json.Unmarshal(info.Meta, &ainfo); err != nil {
@ -337,9 +375,10 @@ func createAccount(ctx context.Context, bs bstore.Blockstore, cst cbor.IpldStore
if err != nil { if err != nil {
return xerrors.Errorf("setting account from actmap: %w", err) return xerrors.Errorf("setting account from actmap: %w", err)
} }
return nil
} }
return nil return fmt.Errorf("failed to create account")
} }
func VerifyPreSealedData(ctx context.Context, cs *store.ChainStore, stateroot cid.Cid, template genesis.Template, keyIDs map[address.Address]address.Address) (cid.Cid, error) { func VerifyPreSealedData(ctx context.Context, cs *store.ChainStore, stateroot cid.Cid, template genesis.Template, keyIDs map[address.Address]address.Address) (cid.Cid, error) {
@ -458,10 +497,32 @@ func MakeGenesisBlock(ctx context.Context, bs bstore.Blockstore, sys vm.SyscallB
VRFProof: []byte("vrf proof0000000vrf proof0000000"), VRFProof: []byte("vrf proof0000000vrf proof0000000"),
} }
filecoinGenesisCid, err := cid.Decode("bafyreiaqpwbbyjo4a42saasj36kkrpv4tsherf2e7bvezkert2a7dhonoi")
if err != nil {
return nil, xerrors.Errorf("failed to decode filecoin genesis block CID: %w", err)
}
if !expectedCid().Equals(filecoinGenesisCid) {
return nil, xerrors.Errorf("expectedCid != filecoinGenesisCid")
}
gblk, err := getGenesisBlock()
if err != nil {
return nil, xerrors.Errorf("failed to construct filecoin genesis block: %w", err)
}
if !filecoinGenesisCid.Equals(gblk.Cid()) {
return nil, xerrors.Errorf("filecoinGenesisCid != gblk.Cid")
}
if err := bs.Put(gblk); err != nil {
return nil, xerrors.Errorf("failed writing filecoin genesis block to blockstore: %w", err)
}
b := &types.BlockHeader{ b := &types.BlockHeader{
Miner: builtin.SystemActorAddr, Miner: builtin.SystemActorAddr,
Ticket: genesisticket, Ticket: genesisticket,
Parents: []cid.Cid{}, Parents: []cid.Cid{filecoinGenesisCid},
Height: 0, Height: 0,
ParentWeight: types.NewInt(0), ParentWeight: types.NewInt(0),
ParentStateRoot: stateroot, ParentStateRoot: stateroot,

View File

@ -262,6 +262,8 @@ func SetupStorageMiners(ctx context.Context, cs *store.ChainStore, sroot cid.Cid
return cid.Undef, xerrors.Errorf("getting current total power: %w", err) return cid.Undef, xerrors.Errorf("getting current total power: %w", err)
} }
pcd := miner.PreCommitDepositForPower(epochReward.ThisEpochRewardSmoothed, tpow.QualityAdjPowerSmoothed, sectorWeight)
pledge := miner.InitialPledgeForPower( pledge := miner.InitialPledgeForPower(
sectorWeight, sectorWeight,
epochReward.ThisEpochBaselinePower, epochReward.ThisEpochBaselinePower,
@ -271,6 +273,8 @@ func SetupStorageMiners(ctx context.Context, cs *store.ChainStore, sroot cid.Cid
circSupply(ctx, vm, minerInfos[i].maddr), circSupply(ctx, vm, minerInfos[i].maddr),
) )
pledge = big.Add(pcd, pledge)
fmt.Println(types.FIL(pledge)) fmt.Println(types.FIL(pledge))
_, err = doExecValue(ctx, vm, minerInfos[i].maddr, m.Worker, pledge, builtin.MethodsMiner.PreCommitSector, mustEnc(params)) _, err = doExecValue(ctx, vm, minerInfos[i].maddr, m.Worker, pledge, builtin.MethodsMiner.PreCommitSector, mustEnc(params))
if err != nil { if err != nil {

View File

@ -26,6 +26,28 @@ func noWinnersProb() []float64 {
return noWinnersProbCache return noWinnersProbCache
} }
var noWinnersProbAssumingCache []float64
var noWinnersProbAssumingOnce sync.Once
func noWinnersProbAssumingMoreThanOne() []float64 {
noWinnersProbAssumingOnce.Do(func() {
cond := math.Log(-1 + math.Exp(5))
poissPdf := func(x float64) float64 {
const Mu = 5
lg, _ := math.Lgamma(x + 1)
result := math.Exp((math.Log(Mu) * x) - lg - cond)
return result
}
out := make([]float64, 0, MaxBlocks)
for i := 0; i < MaxBlocks; i++ {
out = append(out, poissPdf(float64(i+1)))
}
noWinnersProbAssumingCache = out
})
return noWinnersProbAssumingCache
}
func binomialCoefficient(n, k float64) float64 { func binomialCoefficient(n, k float64) float64 {
if k > n { if k > n {
return math.NaN() return math.NaN()
@ -40,7 +62,7 @@ func binomialCoefficient(n, k float64) float64 {
} }
func (mp *MessagePool) blockProbabilities(tq float64) []float64 { func (mp *MessagePool) blockProbabilities(tq float64) []float64 {
noWinners := noWinnersProb() // cache this noWinners := noWinnersProbAssumingMoreThanOne()
p := 1 - tq p := 1 - tq
binoPdf := func(x, trials float64) float64 { binoPdf := func(x, trials float64) float64 {
@ -72,7 +94,7 @@ func (mp *MessagePool) blockProbabilities(tq float64) []float64 {
for place := 0; place < MaxBlocks; place++ { for place := 0; place < MaxBlocks; place++ {
var pPlace float64 var pPlace float64
for otherWinners, pCase := range noWinners { for otherWinners, pCase := range noWinners {
pPlace += pCase * binoPdf(float64(place), float64(otherWinners+1)) pPlace += pCase * binoPdf(float64(place), float64(otherWinners))
} }
out = append(out, pPlace) out = append(out, pPlace)
} }

View File

@ -1,6 +1,11 @@
package messagepool package messagepool
import "testing" import (
"math"
"math/rand"
"testing"
"time"
)
func TestBlockProbability(t *testing.T) { func TestBlockProbability(t *testing.T) {
mp := &MessagePool{} mp := &MessagePool{}
@ -13,3 +18,26 @@ func TestBlockProbability(t *testing.T) {
} }
} }
} }
func TestWinnerProba(t *testing.T) {
rand.Seed(time.Now().UnixNano())
const N = 1000000
winnerProba := noWinnersProb()
sum := 0
for i := 0; i < N; i++ {
minersRand := rand.Float64()
j := 0
for ; j < MaxBlocks; j++ {
minersRand -= winnerProba[j]
if minersRand < 0 {
break
}
}
sum += j
}
if avg := float64(sum) / N; math.Abs(avg-5) > 0.01 {
t.Fatalf("avg too far off: %f", avg)
}
}

View File

@ -59,7 +59,7 @@ var (
ErrBroadcastAnyway = errors.New("broadcasting message despite validation fail") ErrBroadcastAnyway = errors.New("broadcasting message despite validation fail")
ErrRBFTooLowPremium = errors.New("replace by fee has too low GasPremium") ErrRBFTooLowPremium = errors.New("replace by fee has too low GasPremium")
ErrTryAgain = errors.New("state inconsistency while signing message; please try again") ErrTryAgain = errors.New("state inconsistency while pushing message; please try again")
) )
const ( const (
@ -264,6 +264,11 @@ func (mp *MessagePool) verifyMsgBeforePush(m *types.SignedMessage, epoch abi.Cha
} }
func (mp *MessagePool) Push(m *types.SignedMessage) (cid.Cid, error) { func (mp *MessagePool) Push(m *types.SignedMessage) (cid.Cid, error) {
err := mp.checkMessage(m)
if err != nil {
return cid.Undef, err
}
// serialize push access to reduce lock contention // serialize push access to reduce lock contention
mp.addSema <- struct{}{} mp.addSema <- struct{}{}
defer func() { defer func() {
@ -271,7 +276,8 @@ func (mp *MessagePool) Push(m *types.SignedMessage) (cid.Cid, error) {
}() }()
mp.curTsLk.Lock() mp.curTsLk.Lock()
epoch := mp.curTs.Height() curTs := mp.curTs
epoch := curTs.Height()
mp.curTsLk.Unlock() mp.curTsLk.Unlock()
if err := mp.verifyMsgBeforePush(m, epoch); err != nil { if err := mp.verifyMsgBeforePush(m, epoch); err != nil {
return cid.Undef, err return cid.Undef, err
@ -282,9 +288,17 @@ func (mp *MessagePool) Push(m *types.SignedMessage) (cid.Cid, error) {
return cid.Undef, err return cid.Undef, err
} }
if err := mp.Add(m); err != nil { mp.curTsLk.Lock()
if mp.curTs != curTs {
mp.curTsLk.Unlock()
return cid.Undef, ErrTryAgain
}
if err := mp.addTs(m, mp.curTs); err != nil {
mp.curTsLk.Unlock()
return cid.Undef, err return cid.Undef, err
} }
mp.curTsLk.Unlock()
mp.lk.Lock() mp.lk.Lock()
if err := mp.addLocal(m, msgb); err != nil { if err := mp.addLocal(m, msgb); err != nil {
@ -296,7 +310,7 @@ func (mp *MessagePool) Push(m *types.SignedMessage) (cid.Cid, error) {
return m.Cid(), mp.api.PubSubPublish(build.MessagesTopic(mp.netName), msgb) return m.Cid(), mp.api.PubSubPublish(build.MessagesTopic(mp.netName), msgb)
} }
func (mp *MessagePool) Add(m *types.SignedMessage) error { func (mp *MessagePool) checkMessage(m *types.SignedMessage) error {
// big messages are bad, anti DOS // big messages are bad, anti DOS
if m.Size() > 32*1024 { if m.Size() > 32*1024 {
return xerrors.Errorf("mpool message too large (%dB): %w", m.Size(), ErrMessageTooBig) return xerrors.Errorf("mpool message too large (%dB): %w", m.Size(), ErrMessageTooBig)
@ -315,6 +329,15 @@ func (mp *MessagePool) Add(m *types.SignedMessage) error {
return err return err
} }
return nil
}
func (mp *MessagePool) Add(m *types.SignedMessage) error {
err := mp.checkMessage(m)
if err != nil {
return err
}
// serialize push access to reduce lock contention // serialize push access to reduce lock contention
mp.addSema <- struct{}{} mp.addSema <- struct{}{}
defer func() { defer func() {

View File

@ -834,7 +834,7 @@ func TestOptimalMessageSelection3(t *testing.T) {
nMessages := int(build.BlockGasLimit/gasLimit) + 1 nMessages := int(build.BlockGasLimit/gasLimit) + 1
for i := 0; i < nMessages; i++ { for i := 0; i < nMessages; i++ {
for j := 0; j < nActors; j++ { for j := 0; j < nActors; j++ {
premium := 500000 + 20000*(nActors-j) + (nMessages+2-i)/(3*nActors) + i%3 premium := 500000 + 10000*(nActors-j) + (nMessages+2-i)/(30*nActors) + i%3
m := makeTestMessage(wallets[j], actors[j], actors[j%nActors], uint64(i), gasLimit, uint64(premium)) m := makeTestMessage(wallets[j], actors[j], actors[j%nActors], uint64(i), gasLimit, uint64(premium))
mustAdd(t, mp, m) mustAdd(t, mp, m)
} }
@ -874,7 +874,7 @@ func TestOptimalMessageSelection3(t *testing.T) {
} }
} }
func testCompetitiveMessageSelection(t *testing.T, rng *rand.Rand, getPremium func() uint64) (float64, float64) { func testCompetitiveMessageSelection(t *testing.T, rng *rand.Rand, getPremium func() uint64) (float64, float64, float64) {
// in this test we use 300 actors and send 10 blocks of messages. // in this test we use 300 actors and send 10 blocks of messages.
// actors send with an randomly distributed premium dictated by the getPremium function. // actors send with an randomly distributed premium dictated by the getPremium function.
// a number of miners select with varying ticket quality and we compare the // a number of miners select with varying ticket quality and we compare the
@ -934,8 +934,11 @@ func testCompetitiveMessageSelection(t *testing.T, rng *rand.Rand, getPremium fu
t.Fatal(err) t.Fatal(err)
} }
capacityBoost := 0.0 totalGreedyCapacity := 0.0
rewardBoost := 0.0 totalGreedyReward := 0.0
totalOptimalCapacity := 0.0
totalOptimalReward := 0.0
totalBestTQReward := 0.0
const runs = 1 const runs = 1
for i := 0; i < runs; i++ { for i := 0; i < runs; i++ {
// 2. optimal selection // 2. optimal selection
@ -945,31 +948,38 @@ func testCompetitiveMessageSelection(t *testing.T, rng *rand.Rand, getPremium fu
for ; i < MaxBlocks && minersRand > 0; i++ { for ; i < MaxBlocks && minersRand > 0; i++ {
minersRand -= winerProba[i] minersRand -= winerProba[i]
} }
nMiners := i nMiners := i - 1
if nMiners == 0 { if nMiners < 1 {
nMiners = 1 nMiners = 1
} }
optMsgs := make(map[cid.Cid]*types.SignedMessage) optMsgs := make(map[cid.Cid]*types.SignedMessage)
bestTq := 0.0
var bestMsgs []*types.SignedMessage
for j := 0; j < nMiners; j++ { for j := 0; j < nMiners; j++ {
tq := rng.Float64() tq := rng.Float64()
msgs, err := mp.SelectMessages(ts, tq) msgs, err := mp.SelectMessages(ts, tq)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
if tq > bestTq {
bestMsgs = msgs
}
for _, m := range msgs { for _, m := range msgs {
optMsgs[m.Cid()] = m optMsgs[m.Cid()] = m
} }
} }
totalGreedyCapacity += float64(len(greedyMsgs))
totalOptimalCapacity += float64(len(optMsgs))
boost := float64(len(optMsgs)) / float64(len(greedyMsgs)) boost := float64(len(optMsgs)) / float64(len(greedyMsgs))
capacityBoost += boost
t.Logf("nMiners: %d", nMiners) t.Logf("nMiners: %d", nMiners)
t.Logf("greedy capacity %d, optimal capacity %d (x %.1f )", len(greedyMsgs), t.Logf("greedy capacity %d, optimal capacity %d (x %.1f )", len(greedyMsgs),
len(optMsgs), boost) len(optMsgs), boost)
if len(greedyMsgs) > len(optMsgs) { if len(greedyMsgs) > len(optMsgs) {
t.Fatal("greedy capacity higher than optimal capacity; wtf") t.Errorf("greedy capacity higher than optimal capacity; wtf")
} }
greedyReward := big.NewInt(0) greedyReward := big.NewInt(0)
@ -982,25 +992,34 @@ func testCompetitiveMessageSelection(t *testing.T, rng *rand.Rand, getPremium fu
optReward.Add(optReward, mp.getGasReward(m, baseFee, ts)) optReward.Add(optReward, mp.getGasReward(m, baseFee, ts))
} }
bestTqReward := big.NewInt(0)
for _, m := range bestMsgs {
bestTqReward.Add(bestTqReward, mp.getGasReward(m, baseFee, ts))
}
totalBestTQReward += float64(bestTqReward.Uint64())
nMinersBig := big.NewInt(int64(nMiners)) nMinersBig := big.NewInt(int64(nMiners))
greedyAvgReward, _ := new(big.Rat).SetFrac(greedyReward, nMinersBig).Float64() greedyAvgReward, _ := new(big.Rat).SetFrac(greedyReward, nMinersBig).Float64()
totalGreedyReward += greedyAvgReward
optimalAvgReward, _ := new(big.Rat).SetFrac(optReward, nMinersBig).Float64() optimalAvgReward, _ := new(big.Rat).SetFrac(optReward, nMinersBig).Float64()
totalOptimalReward += optimalAvgReward
boost = optimalAvgReward / greedyAvgReward boost = optimalAvgReward / greedyAvgReward
rewardBoost += boost
t.Logf("greedy reward: %.0f, optimal reward: %.0f (x %.1f )", greedyAvgReward, t.Logf("greedy reward: %.0f, optimal reward: %.0f (x %.1f )", greedyAvgReward,
optimalAvgReward, boost) optimalAvgReward, boost)
} }
capacityBoost /= runs capacityBoost := totalOptimalCapacity / totalGreedyCapacity
rewardBoost /= runs rewardBoost := totalOptimalReward / totalGreedyReward
t.Logf("Average capacity boost: %f", capacityBoost) t.Logf("Average capacity boost: %f", capacityBoost)
t.Logf("Average reward boost: %f", rewardBoost) t.Logf("Average reward boost: %f", rewardBoost)
t.Logf("Average best tq reward: %f", totalBestTQReward/runs/1e12)
logging.SetLogLevel("messagepool", "info") logging.SetLogLevel("messagepool", "info")
return capacityBoost, rewardBoost return capacityBoost, rewardBoost, totalBestTQReward / runs / 1e12
} }
func makeExpPremiumDistribution(rng *rand.Rand) func() uint64 { func makeExpPremiumDistribution(rng *rand.Rand) func() uint64 {
@ -1018,35 +1037,41 @@ func makeZipfPremiumDistribution(rng *rand.Rand) func() uint64 {
} }
func TestCompetitiveMessageSelectionExp(t *testing.T) { func TestCompetitiveMessageSelectionExp(t *testing.T) {
var capacityBoost, rewardBoost float64 var capacityBoost, rewardBoost, tqReward float64
seeds := []int64{1947, 1976, 2020, 2100, 10000, 143324, 432432, 131, 32, 45} seeds := []int64{1947, 1976, 2020, 2100, 10000, 143324, 432432, 131, 32, 45}
for _, seed := range seeds { for _, seed := range seeds {
t.Log("running competitive message selection with Exponential premium distribution and seed", seed) t.Log("running competitive message selection with Exponential premium distribution and seed", seed)
rng := rand.New(rand.NewSource(seed)) rng := rand.New(rand.NewSource(seed))
cb, rb := testCompetitiveMessageSelection(t, rng, makeExpPremiumDistribution(rng)) cb, rb, tqR := testCompetitiveMessageSelection(t, rng, makeExpPremiumDistribution(rng))
capacityBoost += cb capacityBoost += cb
rewardBoost += rb rewardBoost += rb
tqReward += tqR
} }
capacityBoost /= float64(len(seeds)) capacityBoost /= float64(len(seeds))
rewardBoost /= float64(len(seeds)) rewardBoost /= float64(len(seeds))
tqReward /= float64(len(seeds))
t.Logf("Average capacity boost across all seeds: %f", capacityBoost) t.Logf("Average capacity boost across all seeds: %f", capacityBoost)
t.Logf("Average reward boost across all seeds: %f", rewardBoost) t.Logf("Average reward boost across all seeds: %f", rewardBoost)
t.Logf("Average reward of best ticket across all seeds: %f", tqReward)
} }
func TestCompetitiveMessageSelectionZipf(t *testing.T) { func TestCompetitiveMessageSelectionZipf(t *testing.T) {
var capacityBoost, rewardBoost float64 var capacityBoost, rewardBoost, tqReward float64
seeds := []int64{1947, 1976, 2020, 2100, 10000, 143324, 432432, 131, 32, 45} seeds := []int64{1947, 1976, 2020, 2100, 10000, 143324, 432432, 131, 32, 45}
for _, seed := range seeds { for _, seed := range seeds {
t.Log("running competitive message selection with Zipf premium distribution and seed", seed) t.Log("running competitive message selection with Zipf premium distribution and seed", seed)
rng := rand.New(rand.NewSource(seed)) rng := rand.New(rand.NewSource(seed))
cb, rb := testCompetitiveMessageSelection(t, rng, makeZipfPremiumDistribution(rng)) cb, rb, tqR := testCompetitiveMessageSelection(t, rng, makeZipfPremiumDistribution(rng))
capacityBoost += cb capacityBoost += cb
rewardBoost += rb rewardBoost += rb
tqReward += tqR
} }
tqReward /= float64(len(seeds))
capacityBoost /= float64(len(seeds)) capacityBoost /= float64(len(seeds))
rewardBoost /= float64(len(seeds)) rewardBoost /= float64(len(seeds))
t.Logf("Average capacity boost across all seeds: %f", capacityBoost) t.Logf("Average capacity boost across all seeds: %f", capacityBoost)
t.Logf("Average reward boost across all seeds: %f", rewardBoost) t.Logf("Average reward boost across all seeds: %f", rewardBoost)
t.Logf("Average reward of best ticket across all seeds: %f", tqReward)
} }

View File

@ -3,9 +3,10 @@ package stmgr
import ( import (
"context" "context"
"fmt" "fmt"
"github.com/filecoin-project/specs-actors/actors/builtin/power"
"sync" "sync"
"github.com/filecoin-project/specs-actors/actors/builtin/power"
"github.com/filecoin-project/specs-actors/actors/builtin/multisig" "github.com/filecoin-project/specs-actors/actors/builtin/multisig"
"github.com/filecoin-project/go-address" "github.com/filecoin-project/go-address"
@ -323,7 +324,7 @@ func (sm *StateManager) computeTipSetState(ctx context.Context, ts *types.TipSet
var parentEpoch abi.ChainEpoch var parentEpoch abi.ChainEpoch
pstate := blks[0].ParentStateRoot pstate := blks[0].ParentStateRoot
if len(blks[0].Parents) > 0 { if blks[0].Height > 0 {
parent, err := sm.cs.GetBlock(blks[0].Parents[0]) parent, err := sm.cs.GetBlock(blks[0].Parents[0])
if err != nil { if err != nil {
return cid.Undef, cid.Undef, xerrors.Errorf("getting parent block: %w", err) return cid.Undef, cid.Undef, xerrors.Errorf("getting parent block: %w", err)
@ -787,7 +788,7 @@ type genesisActor struct {
initBal abi.TokenAmount initBal abi.TokenAmount
} }
// sets up information about the non-multisig actors in the genesis state // sets up information about the actors in the genesis state
func (sm *StateManager) setupGenesisActors(ctx context.Context) error { func (sm *StateManager) setupGenesisActors(ctx context.Context) error {
gi := genesisInfo{} gi := genesisInfo{}
@ -850,21 +851,24 @@ func (sm *StateManager) setupGenesisActors(ctx context.Context) error {
} }
} else if act.Code == builtin.AccountActorCodeID { } else if act.Code == builtin.AccountActorCodeID {
// should exclude burnt funds actor and "remainder account actor"
// should only ever be "faucet" accounts in testnets // should only ever be "faucet" accounts in testnets
kaddr, err := address.NewFromBytes([]byte(k)) kaddr, err := address.NewFromBytes([]byte(k))
if err != nil { if err != nil {
return xerrors.Errorf("decoding address: %w", err) return xerrors.Errorf("decoding address: %w", err)
} }
kid, err := sTree.LookupID(kaddr) if kaddr != builtin.BurntFundsActorAddr {
if err != nil { kid, err := sTree.LookupID(kaddr)
return xerrors.Errorf("resolving address: %w", err) if err != nil {
} return xerrors.Errorf("resolving address: %w", err)
}
gi.genesisActors = append(gi.genesisActors, genesisActor{ gi.genesisActors = append(gi.genesisActors, genesisActor{
addr: kid, addr: kid,
initBal: act.Balance, initBal: act.Balance,
}) })
}
} }
return nil return nil
}) })
@ -888,6 +892,83 @@ func (sm *StateManager) setupGenesisActors(ctx context.Context) error {
return nil return nil
} }
// sets up information about the actors in the genesis state
// For testnet we use a hardcoded set of multisig states, instead of what's actually in the genesis multisigs
// We also do not consider ANY account actors (including the faucet)
func (sm *StateManager) setupGenesisActorsTestnet(ctx context.Context) error {
gi := genesisInfo{}
gb, err := sm.cs.GetGenesis()
if err != nil {
return xerrors.Errorf("getting genesis block: %w", err)
}
gts, err := types.NewTipSet([]*types.BlockHeader{gb})
if err != nil {
return xerrors.Errorf("getting genesis tipset: %w", err)
}
st, _, err := sm.TipSetState(ctx, gts)
if err != nil {
return xerrors.Errorf("getting genesis tipset state: %w", err)
}
cst := cbor.NewCborStore(sm.cs.Blockstore())
sTree, err := state.LoadStateTree(cst, st)
if err != nil {
return xerrors.Errorf("loading state tree: %w", err)
}
gi.genesisMarketFunds, err = getFilMarketLocked(ctx, sTree)
if err != nil {
return xerrors.Errorf("setting up genesis market funds: %w", err)
}
gi.genesisPledge, err = getFilPowerLocked(ctx, sTree)
if err != nil {
return xerrors.Errorf("setting up genesis pledge: %w", err)
}
totalsByEpoch := make(map[abi.ChainEpoch]abi.TokenAmount)
// 6 months
sixMonths := abi.ChainEpoch(183 * builtin.EpochsInDay)
totalsByEpoch[sixMonths] = big.NewInt(49_929_341)
totalsByEpoch[sixMonths] = big.Add(totalsByEpoch[sixMonths], big.NewInt(32_787_700))
// 1 year
oneYear := abi.ChainEpoch(365 * builtin.EpochsInDay)
totalsByEpoch[oneYear] = big.NewInt(22_421_712)
// 2 years
twoYears := abi.ChainEpoch(2 * 365 * builtin.EpochsInDay)
totalsByEpoch[twoYears] = big.NewInt(7_223_364)
// 3 years
threeYears := abi.ChainEpoch(3 * 365 * builtin.EpochsInDay)
totalsByEpoch[threeYears] = big.NewInt(87_637_883)
// 6 years
sixYears := abi.ChainEpoch(6 * 365 * builtin.EpochsInDay)
totalsByEpoch[sixYears] = big.NewInt(100_000_000)
totalsByEpoch[sixYears] = big.Add(totalsByEpoch[sixYears], big.NewInt(300_000_000))
gi.genesisMsigs = make([]multisig.State, 0, len(totalsByEpoch))
for k, v := range totalsByEpoch {
ns := multisig.State{
InitialBalance: v,
UnlockDuration: k,
PendingTxns: cid.Undef,
}
gi.genesisMsigs = append(gi.genesisMsigs, ns)
}
sm.genInfo = &gi
return nil
}
// GetVestedFunds returns all funds that have "left" actors that are in the genesis state: // GetVestedFunds returns all funds that have "left" actors that are in the genesis state:
// - For Multisigs, it counts the actual amounts that have vested at the given epoch // - For Multisigs, it counts the actual amounts that have vested at the given epoch
// - For Accounts, it counts max(currentBalance - genesisBalance, 0). // - For Accounts, it counts max(currentBalance - genesisBalance, 0).
@ -898,7 +979,7 @@ func (sm *StateManager) GetFilVested(ctx context.Context, height abi.ChainEpoch,
vf = big.Add(vf, au) vf = big.Add(vf, au)
} }
// these should only ever be "faucet" accounts in testnets // there should not be any such accounts in testnet (and also none in mainnet?)
for _, v := range sm.genInfo.genesisActors { for _, v := range sm.genInfo.genesisActors {
act, err := st.GetActor(v.addr) act, err := st.GetActor(v.addr)
if err != nil { if err != nil {
@ -988,7 +1069,7 @@ func (sm *StateManager) GetCirculatingSupplyDetailed(ctx context.Context, height
sm.genesisMsigLk.Lock() sm.genesisMsigLk.Lock()
defer sm.genesisMsigLk.Unlock() defer sm.genesisMsigLk.Unlock()
if sm.genInfo == nil { if sm.genInfo == nil {
err := sm.setupGenesisActors(ctx) err := sm.setupGenesisActorsTestnet(ctx)
if err != nil { if err != nil {
return api.CirculatingSupply{}, xerrors.Errorf("failed to setup genesis information: %w", err) return api.CirculatingSupply{}, xerrors.Errorf("failed to setup genesis information: %w", err)
} }

View File

@ -7,11 +7,26 @@ import (
"github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/specs-actors/actors/abi/big" "github.com/filecoin-project/specs-actors/actors/abi/big"
"github.com/ipfs/go-cid"
"golang.org/x/xerrors" "golang.org/x/xerrors"
) )
func computeNextBaseFee(baseFee types.BigInt, gasLimitUsed int64, noOfBlocks int) types.BigInt { func computeNextBaseFee(baseFee types.BigInt, gasLimitUsed int64, noOfBlocks int) types.BigInt {
delta := gasLimitUsed/int64(noOfBlocks) - build.BlockGasTarget // deta := 1/PackingEfficiency * gasLimitUsed/noOfBlocks - build.BlockGasTarget
// change := baseFee * deta / BlockGasTarget / BaseFeeMaxChangeDenom
// nextBaseFee = baseFee + change
// nextBaseFee = max(nextBaseFee, build.MinimumBaseFee)
delta := build.PackingEfficiencyDenom * gasLimitUsed / (int64(noOfBlocks) * build.PackingEfficiencyNum)
delta -= build.BlockGasTarget
// cap change at 12.5% (BaseFeeMaxChangeDenom) by capping delta
if delta > build.BlockGasTarget {
delta = build.BlockGasTarget
}
if delta < -build.BlockGasTarget {
delta = -build.BlockGasTarget
}
change := big.Mul(baseFee, big.NewInt(delta)) change := big.Mul(baseFee, big.NewInt(delta))
change = big.Div(change, big.NewInt(build.BlockGasTarget)) change = big.Div(change, big.NewInt(build.BlockGasTarget))
@ -26,17 +41,30 @@ func computeNextBaseFee(baseFee types.BigInt, gasLimitUsed int64, noOfBlocks int
func (cs *ChainStore) ComputeBaseFee(ctx context.Context, ts *types.TipSet) (abi.TokenAmount, error) { func (cs *ChainStore) ComputeBaseFee(ctx context.Context, ts *types.TipSet) (abi.TokenAmount, error) {
zero := abi.NewTokenAmount(0) zero := abi.NewTokenAmount(0)
// totalLimit is sum of GasLimits of unique messages in a tipset
totalLimit := int64(0) totalLimit := int64(0)
seen := make(map[cid.Cid]struct{})
for _, b := range ts.Blocks() { for _, b := range ts.Blocks() {
msg1, msg2, err := cs.MessagesForBlock(b) msg1, msg2, err := cs.MessagesForBlock(b)
if err != nil { if err != nil {
return zero, xerrors.Errorf("error getting messages for: %s: %w", b.Cid(), err) return zero, xerrors.Errorf("error getting messages for: %s: %w", b.Cid(), err)
} }
for _, m := range msg1 { for _, m := range msg1 {
totalLimit += m.GasLimit c := m.Cid()
if _, ok := seen[c]; !ok {
totalLimit += m.GasLimit
seen[c] = struct{}{}
}
} }
for _, m := range msg2 { for _, m := range msg2 {
totalLimit += m.Message.GasLimit c := m.Cid()
if _, ok := seen[c]; !ok {
totalLimit += m.Message.GasLimit
seen[c] = struct{}{}
}
} }
} }
parentBaseFee := ts.Blocks()[0].ParentBaseFee parentBaseFee := ts.Blocks()[0].ParentBaseFee

View File

@ -18,10 +18,10 @@ func TestBaseFee(t *testing.T) {
}{ }{
{100e6, 0, 1, 87.5e6}, {100e6, 0, 1, 87.5e6},
{100e6, 0, 5, 87.5e6}, {100e6, 0, 5, 87.5e6},
{100e6, build.BlockGasTarget, 1, 100e6}, {100e6, build.BlockGasTarget, 1, 103.125e6},
{100e6, build.BlockGasTarget * 2, 2, 100e6}, {100e6, build.BlockGasTarget * 2, 2, 103.125e6},
{100e6, build.BlockGasLimit * 2, 2, 112.5e6}, {100e6, build.BlockGasLimit * 2, 2, 112.5e6},
{100e6, build.BlockGasLimit * 1.5, 2, 106.25e6}, {100e6, build.BlockGasLimit * 1.5, 2, 110937500},
} }
for _, test := range tests { for _, test := range tests {

View File

@ -1177,15 +1177,20 @@ func (cs *ChainStore) Export(ctx context.Context, ts *types.TipSet, w io.Writer)
return xerrors.Errorf("unmarshaling block header (cid=%s): %w", blk, err) return xerrors.Errorf("unmarshaling block header (cid=%s): %w", blk, err)
} }
for _, p := range b.Parents {
blocksToWalk = append(blocksToWalk, p)
}
cids, err := recurseLinks(cs.bs, b.Messages, []cid.Cid{b.Messages}) cids, err := recurseLinks(cs.bs, b.Messages, []cid.Cid{b.Messages})
if err != nil { if err != nil {
return xerrors.Errorf("recursing messages failed: %w", err) return xerrors.Errorf("recursing messages failed: %w", err)
} }
if b.Height > 0 {
for _, p := range b.Parents {
blocksToWalk = append(blocksToWalk, p)
}
} else {
// include the genesis block
cids = append(cids, b.Parents...)
}
out := cids out := cids
if b.Height == 0 { if b.Height == 0 {

View File

@ -550,7 +550,7 @@ func interactiveDeal(cctx *cli.Context) error {
continue continue
} }
a, err := api.ClientQueryAsk(ctx, mi.PeerId, maddr) a, err := api.ClientQueryAsk(ctx, *mi.PeerId, maddr)
if err != nil { if err != nil {
printErr(xerrors.Errorf("failed to query ask: %w", err)) printErr(xerrors.Errorf("failed to query ask: %w", err))
state = "miner" state = "miner"
@ -847,7 +847,7 @@ var clientRetrieveCmd = &cli.Command{
Path: cctx.Args().Get(1), Path: cctx.Args().Get(1),
IsCAR: cctx.Bool("car"), IsCAR: cctx.Bool("car"),
} }
updates, err := fapi.ClientRetrieve(ctx, offer.Order(payer), ref) updates, err := fapi.ClientRetrieveWithEvents(ctx, offer.Order(payer), ref)
if err != nil { if err != nil {
return xerrors.Errorf("error setting up retrieval: %w", err) return xerrors.Errorf("error setting up retrieval: %w", err)
} }
@ -868,7 +868,7 @@ var clientRetrieveCmd = &cli.Command{
} }
if evt.Err != "" { if evt.Err != "" {
return xerrors.Errorf("retrieval failed: %v", err) return xerrors.Errorf("retrieval failed: %s", evt.Err)
} }
case <-ctx.Done(): case <-ctx.Done():
return xerrors.Errorf("retrieval timed out") return xerrors.Errorf("retrieval timed out")
@ -926,11 +926,11 @@ var clientQueryAskCmd = &cli.Command{
return xerrors.Errorf("failed to get peerID for miner: %w", err) return xerrors.Errorf("failed to get peerID for miner: %w", err)
} }
if peer.ID(mi.PeerId) == peer.ID("SETME") { if peer.ID(*mi.PeerId) == peer.ID("SETME") {
return fmt.Errorf("the miner hasn't initialized yet") return fmt.Errorf("the miner hasn't initialized yet")
} }
pid = peer.ID(mi.PeerId) pid = peer.ID(*mi.PeerId)
} }
ask, err := api.ClientQueryAsk(ctx, pid, maddr) ask, err := api.ClientQueryAsk(ctx, pid, maddr)

View File

@ -13,7 +13,7 @@ import (
) )
var pprofCmd = &cli.Command{ var pprofCmd = &cli.Command{
Name: "pprof", Name: "pprof",
Hidden: true, Hidden: true,
Subcommands: []*cli.Command{ Subcommands: []*cli.Command{
PprofGoroutines, PprofGoroutines,
@ -42,7 +42,7 @@ var PprofGoroutines = &cli.Command{
return err return err
} }
addr = "http://" + addr + "/debug/pprof/goroutine?debug=2" addr = "http://" + addr + "/debug/pprof/goroutine?debug=2"
r, err := http.Get(addr) r, err := http.Get(addr)
if err != nil { if err != nil {
@ -56,4 +56,3 @@ var PprofGoroutines = &cli.Command{
return r.Body.Close() return r.Body.Close()
}, },
} }

View File

@ -16,7 +16,6 @@ import (
_init "github.com/filecoin-project/specs-actors/actors/builtin/init" _init "github.com/filecoin-project/specs-actors/actors/builtin/init"
"github.com/filecoin-project/specs-actors/actors/util/adt" "github.com/filecoin-project/specs-actors/actors/util/adt"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
"github.com/multiformats/go-multihash"
typegen "github.com/whyrusleeping/cbor-gen" typegen "github.com/whyrusleeping/cbor-gen"
) )
@ -232,20 +231,20 @@ func (p *Processor) storeActorHeads(actors map[cid.Cid]ActorTips) error {
return err return err
} }
if _, err := tx.Exec(` if _, err := tx.Exec(`
create temp table a (like actors excluding constraints) on commit drop; create temp table a_tmp (like actors excluding constraints) on commit drop;
`); err != nil { `); err != nil {
return xerrors.Errorf("prep temp: %w", err) return xerrors.Errorf("prep temp: %w", err)
} }
stmt, err := tx.Prepare(`copy a (id, code, head, nonce, balance, stateroot) from stdin `) stmt, err := tx.Prepare(`copy a_tmp (id, code, head, nonce, balance, stateroot) from stdin `)
if err != nil { if err != nil {
return err return err
} }
for code, actTips := range actors { for code, actTips := range actors {
actorName := code.String() actorName := code.String()
if s, err := multihash.Decode(code.Hash()); err != nil { if builtin.IsBuiltinActor(code) {
actorName = string(s.Digest) actorName = builtin.ActorNameByCode(code)
} }
for _, actorInfo := range actTips { for _, actorInfo := range actTips {
for _, a := range actorInfo { for _, a := range actorInfo {
@ -260,7 +259,7 @@ func (p *Processor) storeActorHeads(actors map[cid.Cid]ActorTips) error {
return err return err
} }
if _, err := tx.Exec(`insert into actors select * from a on conflict do nothing `); err != nil { if _, err := tx.Exec(`insert into actors select * from a_tmp on conflict do nothing `); err != nil {
return xerrors.Errorf("actor put: %w", err) return xerrors.Errorf("actor put: %w", err)
} }
@ -278,20 +277,20 @@ func (p *Processor) storeActorStates(actors map[cid.Cid]ActorTips) error {
return err return err
} }
if _, err := tx.Exec(` if _, err := tx.Exec(`
create temp table a (like actor_states excluding constraints) on commit drop; create temp table as_tmp (like actor_states excluding constraints) on commit drop;
`); err != nil { `); err != nil {
return xerrors.Errorf("prep temp: %w", err) return xerrors.Errorf("prep temp: %w", err)
} }
stmt, err := tx.Prepare(`copy a (head, code, state) from stdin `) stmt, err := tx.Prepare(`copy as_tmp (head, code, state) from stdin `)
if err != nil { if err != nil {
return err return err
} }
for code, actTips := range actors { for code, actTips := range actors {
actorName := code.String() actorName := code.String()
if s, err := multihash.Decode(code.Hash()); err != nil { if builtin.IsBuiltinActor(code) {
actorName = string(s.Digest) actorName = builtin.ActorNameByCode(code)
} }
for _, actorInfo := range actTips { for _, actorInfo := range actTips {
for _, a := range actorInfo { for _, a := range actorInfo {
@ -306,7 +305,7 @@ func (p *Processor) storeActorStates(actors map[cid.Cid]ActorTips) error {
return err return err
} }
if _, err := tx.Exec(`insert into actor_states select * from a on conflict do nothing `); err != nil { if _, err := tx.Exec(`insert into actor_states select * from as_tmp on conflict do nothing `); err != nil {
return xerrors.Errorf("actor put: %w", err) return xerrors.Errorf("actor put: %w", err)
} }

View File

@ -879,11 +879,15 @@ func (p *Processor) storeMinersActorInfoState(ctx context.Context, miners []mine
return err return err
} }
} }
var pid string
if mi.PeerId != nil {
pid = mi.PeerId.String()
}
if _, err := stmt.Exec( if _, err := stmt.Exec(
m.common.addr.String(), m.common.addr.String(),
mi.Owner.String(), mi.Owner.String(),
mi.Worker.String(), mi.Worker.String(),
mi.PeerId.String(), pid,
mi.SectorSize.ShortString(), mi.SectorSize.ShortString(),
); err != nil { ); err != nil {
log.Errorw("failed to store miner state", "state", m.state, "info", m.state.Info, "error", err) log.Errorw("failed to store miner state", "state", m.state, "info", m.state.Info, "error", err)

View File

@ -336,16 +336,19 @@ where rnk <= $1
} }
var c string var c string
if err := rows.Scan(&c); err != nil { if err := rows.Scan(&c); err != nil {
return nil, xerrors.Errorf("Failed to scan unprocessed blocks: %w", err) log.Errorf("Failed to scan unprocessed blocks: %s", err.Error())
continue
} }
ci, err := cid.Parse(c) ci, err := cid.Parse(c)
if err != nil { if err != nil {
return nil, xerrors.Errorf("Failed to parse unprocessed blocks: %w", err) log.Errorf("Failed to parse unprocessed blocks: %s", err.Error())
continue
} }
bh, err := p.node.ChainGetBlock(ctx, ci) bh, err := p.node.ChainGetBlock(ctx, ci)
if err != nil { if err != nil {
// this is a pretty serious issue. // this is a pretty serious issue.
return nil, xerrors.Errorf("Failed to get block header %s: %w", ci.String(), err) log.Errorf("Failed to get block header %s: %s", ci.String(), err.Error())
continue
} }
out[ci] = bh out[ci] = bh
if bh.Height < minBlock { if bh.Height < minBlock {

View File

@ -239,7 +239,7 @@ func (s *Syncer) unsyncedBlocks(ctx context.Context, head *types.TipSet, since t
log.Debugw("To visit", "toVisit", toVisit.Len(), "toSync", len(toSync), "current_height", bh.Height) log.Debugw("To visit", "toVisit", toVisit.Len(), "toSync", len(toSync), "current_height", bh.Height)
} }
if len(bh.Parents) == 0 { if bh.Height == 0 {
continue continue
} }

View File

@ -10,6 +10,7 @@ import (
"os" "os"
"path/filepath" "path/filepath"
"strconv" "strconv"
"time"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2" logging "github.com/ipfs/go-log/v2"
@ -20,6 +21,7 @@ import (
"golang.org/x/xerrors" "golang.org/x/xerrors"
"github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/specs-actors/actors/abi/big"
"github.com/filecoin-project/specs-actors/actors/builtin" "github.com/filecoin-project/specs-actors/actors/builtin"
"github.com/filecoin-project/specs-actors/actors/builtin/miner" "github.com/filecoin-project/specs-actors/actors/builtin/miner"
"github.com/filecoin-project/specs-actors/actors/runtime/exitcode" "github.com/filecoin-project/specs-actors/actors/runtime/exitcode"
@ -70,16 +72,24 @@ func main() {
EnvVars: []string{"LOTUS_PCR_PATH"}, EnvVars: []string{"LOTUS_PCR_PATH"},
Value: "~/.lotuspcr", // TODO: Consider XDG_DATA_HOME Value: "~/.lotuspcr", // TODO: Consider XDG_DATA_HOME
}, },
&cli.StringFlag{
Name: "log-level",
EnvVars: []string{"LOTUS_PCR_LOG_LEVEL"},
Hidden: true,
Value: "info",
},
},
Before: func(cctx *cli.Context) error {
return logging.SetLogLevel("main", cctx.String("log-level"))
}, },
Commands: local, Commands: local,
} }
if err := app.Run(os.Args); err != nil { if err := app.Run(os.Args); err != nil {
log.Warn(err) log.Errorw("exit in error", "err", err)
os.Exit(1)
return return
} }
} }
var versionCmd = &cli.Command{ var versionCmd = &cli.Command{
@ -90,6 +100,7 @@ var versionCmd = &cli.Command{
return nil return nil
}, },
} }
var runCmd = &cli.Command{ var runCmd = &cli.Command{
Name: "run", Name: "run",
Usage: "Start message reimpursement", Usage: "Start message reimpursement",
@ -97,19 +108,36 @@ var runCmd = &cli.Command{
&cli.StringFlag{ &cli.StringFlag{
Name: "from", Name: "from",
EnvVars: []string{"LOTUS_PCR_FROM"}, EnvVars: []string{"LOTUS_PCR_FROM"},
Usage: "wallet address to send refund from",
}, },
&cli.BoolFlag{ &cli.BoolFlag{
Name: "no-sync", Name: "no-sync",
EnvVars: []string{"LOTUS_PCR_NO_SYNC"}, EnvVars: []string{"LOTUS_PCR_NO_SYNC"},
Usage: "do not wait for chain sync to complete",
}, },
&cli.IntFlag{ &cli.IntFlag{
Name: "percent-extra", Name: "percent-extra",
Value: 3, EnvVars: []string{"LOTUS_PCR_PERCENT_EXTRA"},
Usage: "extra funds to send above the refund",
Value: 3,
}, },
&cli.IntFlag{ &cli.IntFlag{
Name: "head-delay", Name: "max-message-queue",
Usage: "the number of tipsets to delay message processing to smooth chain reorgs", EnvVars: []string{"LOTUS_PCR_MAX_MESSAGE_QUEUE"},
Value: int(build.MessageConfidence), Usage: "set the maximum number of messages that can be queue in the mpool",
Value: 3000,
},
&cli.BoolFlag{
Name: "dry-run",
EnvVars: []string{"LOTUS_PCR_DRY_RUN"},
Usage: "do not send any messages",
Value: false,
},
&cli.IntFlag{
Name: "head-delay",
EnvVars: []string{"LOTUS_PCR_HEAD_DELAY"},
Usage: "the number of tipsets to delay message processing to smooth chain reorgs",
Value: int(build.MessageConfidence),
}, },
}, },
Action: func(cctx *cli.Context) error { Action: func(cctx *cli.Context) error {
@ -150,15 +178,52 @@ var runCmd = &cli.Command{
} }
percentExtra := cctx.Int("percent-extra") percentExtra := cctx.Int("percent-extra")
maxMessageQueue := cctx.Int("max-message-queue")
dryRun := cctx.Bool("dry-run")
rf := &refunder{
api: api,
wallet: from,
percentExtra: percentExtra,
dryRun: dryRun,
}
for tipset := range tipsetsCh { for tipset := range tipsetsCh {
if err := ProcessTipset(ctx, api, tipset, from, percentExtra); err != nil { refunds, err := rf.ProcessTipset(ctx, tipset)
if err != nil {
return err
}
if err := rf.Refund(ctx, tipset, refunds); err != nil {
return err return err
} }
if err := r.SetHeight(tipset.Height()); err != nil { if err := r.SetHeight(tipset.Height()); err != nil {
return err return err
} }
for {
msgs, err := api.MpoolPending(ctx, types.EmptyTSK)
if err != nil {
log.Warnw("failed to fetch pending messages", "err", err)
time.Sleep(time.Duration(int64(time.Second) * int64(build.BlockDelaySecs)))
continue
}
count := 0
for _, msg := range msgs {
if msg.Message.From == from {
count = count + 1
}
}
if count < maxMessageQueue {
break
}
log.Warnw("messages in mpool over max message queue", "message_count", count, "max_message_queue", maxMessageQueue)
time.Sleep(time.Duration(int64(time.Second) * int64(build.BlockDelaySecs)))
}
} }
return nil return nil
@ -167,6 +232,7 @@ var runCmd = &cli.Command{
type MinersRefund struct { type MinersRefund struct {
refunds map[address.Address]types.BigInt refunds map[address.Address]types.BigInt
count int
} }
func NewMinersRefund() *MinersRefund { func NewMinersRefund() *MinersRefund {
@ -180,11 +246,13 @@ func (m *MinersRefund) Track(addr address.Address, value types.BigInt) {
m.refunds[addr] = types.NewInt(0) m.refunds[addr] = types.NewInt(0)
} }
m.count = m.count + 1
m.refunds[addr] = types.BigAdd(m.refunds[addr], value) m.refunds[addr] = types.BigAdd(m.refunds[addr], value)
} }
func (m *MinersRefund) Count() int { func (m *MinersRefund) Count() int {
return len(m.refunds) return m.count
} }
func (m *MinersRefund) Miners() []address.Address { func (m *MinersRefund) Miners() []address.Address {
@ -200,50 +268,58 @@ func (m *MinersRefund) GetRefund(addr address.Address) types.BigInt {
return m.refunds[addr] return m.refunds[addr]
} }
type processTipSetApi interface { type refunderNodeApi interface {
ChainGetParentMessages(ctx context.Context, blockCid cid.Cid) ([]api.Message, error) ChainGetParentMessages(ctx context.Context, blockCid cid.Cid) ([]api.Message, error)
ChainGetParentReceipts(ctx context.Context, blockCid cid.Cid) ([]*types.MessageReceipt, error) ChainGetParentReceipts(ctx context.Context, blockCid cid.Cid) ([]*types.MessageReceipt, error)
ChainGetTipSetByHeight(ctx context.Context, epoch abi.ChainEpoch, tsk types.TipSetKey) (*types.TipSet, error)
StateMinerInitialPledgeCollateral(ctx context.Context, addr address.Address, precommitInfo miner.SectorPreCommitInfo, tsk types.TipSetKey) (types.BigInt, error) StateMinerInitialPledgeCollateral(ctx context.Context, addr address.Address, precommitInfo miner.SectorPreCommitInfo, tsk types.TipSetKey) (types.BigInt, error)
StateSectorPreCommitInfo(ctx context.Context, addr address.Address, sector abi.SectorNumber, tsk types.TipSetKey) (miner.SectorPreCommitOnChainInfo, error)
StateGetActor(ctx context.Context, actor address.Address, tsk types.TipSetKey) (*types.Actor, error) StateGetActor(ctx context.Context, actor address.Address, tsk types.TipSetKey) (*types.Actor, error)
MpoolPushMessage(ctx context.Context, msg *types.Message, spec *api.MessageSendSpec) (*types.SignedMessage, error) MpoolPushMessage(ctx context.Context, msg *types.Message, spec *api.MessageSendSpec) (*types.SignedMessage, error)
GasEstimateGasPremium(ctx context.Context, nblocksincl uint64, sender address.Address, gaslimit int64, tsk types.TipSetKey) (types.BigInt, error) GasEstimateGasPremium(ctx context.Context, nblocksincl uint64, sender address.Address, gaslimit int64, tsk types.TipSetKey) (types.BigInt, error)
WalletBalance(ctx context.Context, addr address.Address) (types.BigInt, error) WalletBalance(ctx context.Context, addr address.Address) (types.BigInt, error)
} }
func ProcessTipset(ctx context.Context, api processTipSetApi, tipset *types.TipSet, wallet address.Address, percentExtra int) error { type refunder struct {
log.Infow("processing tipset", "height", tipset.Height(), "key", tipset.Key().String()) api refunderNodeApi
wallet address.Address
percentExtra int
dryRun bool
}
func (r *refunder) ProcessTipset(ctx context.Context, tipset *types.TipSet) (*MinersRefund, error) {
cids := tipset.Cids() cids := tipset.Cids()
if len(cids) == 0 { if len(cids) == 0 {
return fmt.Errorf("no cids in tipset") log.Errorw("no cids in tipset", "height", tipset.Height(), "key", tipset.Key())
return nil, fmt.Errorf("no cids in tipset")
} }
msgs, err := api.ChainGetParentMessages(ctx, cids[0]) msgs, err := r.api.ChainGetParentMessages(ctx, cids[0])
if err != nil { if err != nil {
log.Errorw("failed to get tipset parent messages", "err", err) log.Errorw("failed to get tipset parent messages", "err", err, "height", tipset.Height(), "key", tipset.Key())
return nil return nil, nil
} }
recps, err := api.ChainGetParentReceipts(ctx, cids[0]) recps, err := r.api.ChainGetParentReceipts(ctx, cids[0])
if err != nil { if err != nil {
log.Errorw("failed to get tipset parent receipts", "err", err) log.Errorw("failed to get tipset parent receipts", "err", err, "height", tipset.Height(), "key", tipset.Key())
return nil return nil, nil
} }
if len(msgs) != len(recps) { if len(msgs) != len(recps) {
log.Errorw("message length does not match receipts length", "messages", len(msgs), "receipts", len(recps)) log.Errorw("message length does not match receipts length", "height", tipset.Height(), "key", tipset.Key(), "messages", len(msgs), "receipts", len(recps))
return nil return nil, nil
} }
refunds := NewMinersRefund() refunds := NewMinersRefund()
count := 0 refundValue := types.NewInt(0)
for i, msg := range msgs { for i, msg := range msgs {
m := msg.Message m := msg.Message
a, err := api.StateGetActor(ctx, m.To, tipset.Key()) a, err := r.api.StateGetActor(ctx, m.To, tipset.Key())
if err != nil { if err != nil {
log.Warnw("failed to look up state actor", "actor", m.To) log.Warnw("failed to look up state actor", "height", tipset.Height(), "key", tipset.Key(), "actor", m.To)
continue continue
} }
@ -251,39 +327,87 @@ func ProcessTipset(ctx context.Context, api processTipSetApi, tipset *types.TipS
continue continue
} }
// we only care to look at PreCommitSector messages var messageMethod string
if m.Method != builtin.MethodsMiner.PreCommitSector {
switch m.Method {
case builtin.MethodsMiner.ProveCommitSector:
messageMethod = "ProveCommitSector"
if recps[i].ExitCode != exitcode.Ok {
log.Debugw("skipping non-ok exitcode message", "method", messageMethod, "cid", msg.Cid, "miner", m.To, "exitcode", recps[i].ExitCode)
continue
}
var proveCommitSector miner.ProveCommitSectorParams
if err := proveCommitSector.UnmarshalCBOR(bytes.NewBuffer(m.Params)); err != nil {
log.Warnw("failed to decode provecommit params", "err", err, "method", messageMethod, "cid", msg.Cid, "miner", m.To)
continue
}
// We use the parent tipset key because precommit information is removed when ProveCommitSector is executed
precommitChainInfo, err := r.api.StateSectorPreCommitInfo(ctx, m.To, proveCommitSector.SectorNumber, tipset.Parents())
if err != nil {
log.Warnw("failed to get precommit info for sector", "err", err, "method", messageMethod, "cid", msg.Cid, "miner", m.To, "sector_number", proveCommitSector.SectorNumber)
continue
}
precommitTipset, err := r.api.ChainGetTipSetByHeight(ctx, precommitChainInfo.PreCommitEpoch, tipset.Key())
if err != nil {
log.Warnf("failed to lookup precommit epoch", "err", err, "method", messageMethod, "cid", msg.Cid, "miner", m.To, "sector_number", proveCommitSector.SectorNumber)
continue
}
collateral, err := r.api.StateMinerInitialPledgeCollateral(ctx, m.To, precommitChainInfo.Info, precommitTipset.Key())
if err != nil {
log.Warnw("failed to get initial pledge collateral", "err", err, "method", messageMethod, "cid", msg.Cid, "miner", m.To, "sector_number", proveCommitSector.SectorNumber)
}
collateral = big.Sub(collateral, precommitChainInfo.PreCommitDeposit)
if collateral.LessThan(big.Zero()) {
log.Debugw("skipping zero pledge collateral difference", "method", messageMethod, "cid", msg.Cid, "miner", m.To, "sector_number", proveCommitSector.SectorNumber)
continue
}
refundValue = collateral
case builtin.MethodsMiner.PreCommitSector:
messageMethod = "PreCommitSector"
if recps[i].ExitCode != exitcode.Ok {
log.Debugw("skipping non-ok exitcode message", "method", messageMethod, "cid", msg.Cid, "miner", m.To, "exitcode", recps[i].ExitCode)
continue
}
var precommitInfo miner.SectorPreCommitInfo
if err := precommitInfo.UnmarshalCBOR(bytes.NewBuffer(m.Params)); err != nil {
log.Warnw("failed to decode precommit params", "err", err, "method", messageMethod, "cid", msg.Cid, "miner", m.To)
continue
}
collateral, err := r.api.StateMinerInitialPledgeCollateral(ctx, m.To, precommitInfo, tipset.Key())
if err != nil {
log.Warnw("failed to calculate initial pledge collateral", "err", err, "method", messageMethod, "cid", msg.Cid, "miner", m.To, "sector_number", precommitInfo.SectorNumber)
continue
}
refundValue = collateral
default:
continue continue
} }
if recps[i].ExitCode != exitcode.Ok { if r.percentExtra > 0 {
log.Debugw("skipping non-ok exitcode message", "cid", msg.Cid.String(), "exitcode", recps[i].ExitCode) refundValue = types.BigAdd(refundValue, types.BigDiv(types.BigMul(refundValue, types.NewInt(100)), types.NewInt(uint64(r.percentExtra))))
} }
var precommitInfo miner.SectorPreCommitInfo log.Debugw("processing message", "method", messageMethod, "cid", msg.Cid, "from", m.From, "to", m.To, "value", m.Value, "gas_fee_cap", m.GasFeeCap, "gas_premium", m.GasPremium, "gas_used", recps[i].GasUsed, "refund", refundValue)
if err := precommitInfo.UnmarshalCBOR(bytes.NewBuffer(m.Params)); err != nil {
log.Warnw("failed to decode precommit params", "err", err)
continue
}
refundValue, err := api.StateMinerInitialPledgeCollateral(ctx, m.To, precommitInfo, tipset.Key())
if err != nil {
log.Warnw("failed to calculate", "err", err)
continue
}
if percentExtra > 0 {
refundValue = types.BigAdd(refundValue, types.BigDiv(refundValue, types.NewInt(100*uint64(percentExtra))))
}
log.Infow("processing message", "from", m.From, "to", m.To, "value", m.Value.String(), "gas_fee_cap", m.GasFeeCap.String(), "gas_premium", m.GasPremium.String(), "gas_used", fmt.Sprintf("%d", recps[i].GasUsed), "refund", refundValue.String())
count = count + 1
refunds.Track(m.From, refundValue) refunds.Track(m.From, refundValue)
} }
return refunds, nil
}
func (r *refunder) Refund(ctx context.Context, tipset *types.TipSet, refunds *MinersRefund) error {
if refunds.Count() == 0 { if refunds.Count() == 0 {
log.Debugw("no messages to refund in tipset") log.Debugw("no messages to refund in tipset", "height", tipset.Height(), "key", tipset.Key())
return nil return nil
} }
@ -294,15 +418,15 @@ func ProcessTipset(ctx context.Context, api processTipSetApi, tipset *types.TipS
refundValue := refunds.GetRefund(maddr) refundValue := refunds.GetRefund(maddr)
// We want to try and ensure these messages get mined quickly // We want to try and ensure these messages get mined quickly
gasPremium, err := api.GasEstimateGasPremium(ctx, 0, wallet, 0, tipset.Key()) gasPremium, err := r.api.GasEstimateGasPremium(ctx, 0, r.wallet, 0, tipset.Key())
if err != nil { if err != nil {
log.Warnw("failed to estimate gas premium", "err", err) log.Warnw("failed to estimate gas premium", "err", err, "height", tipset.Height(), "key", tipset.Key())
continue continue
} }
msg := &types.Message{ msg := &types.Message{
Value: refundValue, Value: refundValue,
From: wallet, From: r.wallet,
To: maddr, To: maddr,
GasPremium: gasPremium, GasPremium: gasPremium,
@ -312,32 +436,34 @@ func ProcessTipset(ctx context.Context, api processTipSetApi, tipset *types.TipS
messages = append(messages, msg) messages = append(messages, msg)
} }
balance, err := api.WalletBalance(ctx, wallet) balance, err := r.api.WalletBalance(ctx, r.wallet)
if err != nil { if err != nil {
log.Errorw("failed to get wallet balance", "err", err, "height", tipset.Height(), "key", tipset.Key())
return xerrors.Errorf("failed to get wallet balance :%w", err) return xerrors.Errorf("failed to get wallet balance :%w", err)
} }
// Calculate the minimum balance as the total refund we need to issue plus 5% to cover fees // Calculate the minimum balance as the total refund we need to issue plus 5% to cover fees
minBalance := types.BigAdd(refundSum, types.BigDiv(refundSum, types.NewInt(500))) minBalance := types.BigAdd(refundSum, types.BigDiv(refundSum, types.NewInt(500)))
if balance.LessThan(minBalance) { if balance.LessThan(minBalance) {
log.Errorw("not sufficent funds to cover refunds", "balance", balance.String(), "refund_sum", refundSum.String(), "minimum_required", minBalance.String()) log.Errorw("not sufficent funds to cover refunds", "balance", balance, "refund_sum", refundSum, "minimum_required", minBalance)
return xerrors.Errorf("wallet does not have enough balance to cover refund") return xerrors.Errorf("wallet does not have enough balance to cover refund")
} }
failures := 0 failures := 0
refundSum.SetUint64(0) refundSum.SetUint64(0)
for _, msg := range messages { for _, msg := range messages {
if _, err = api.MpoolPushMessage(ctx, msg, nil); err != nil { if !r.dryRun {
log.Errorw("failed to MpoolPushMessage", "err", err, "msg", msg) if _, err = r.api.MpoolPushMessage(ctx, msg, nil); err != nil {
failures = failures + 1 log.Errorw("failed to MpoolPushMessage", "err", err, "msg", msg)
continue failures = failures + 1
continue
}
} }
refundSum = types.BigAdd(refundSum, msg.Value) refundSum = types.BigAdd(refundSum, msg.Value)
} }
log.Infow("tipset stats", "messages_sent", len(messages)-failures, "refund_sum", refundSum.String(), "messages_failures", failures) log.Infow("tipset stats", "height", tipset.Height(), "key", tipset.Key(), "messages_sent", len(messages)-failures, "refund_sum", refundSum, "messages_failures", failures, "messages_processed", refunds.Count())
return nil return nil
} }

View File

@ -48,10 +48,11 @@ var genesisNewCmd = &cli.Command{
return xerrors.New("seed genesis new [genesis.json]") return xerrors.New("seed genesis new [genesis.json]")
} }
out := genesis.Template{ out := genesis.Template{
Accounts: []genesis.Actor{}, Accounts: []genesis.Actor{},
Miners: []genesis.Miner{}, Miners: []genesis.Miner{},
VerifregRootKey: gen.DefaultVerifregRootkeyActor, VerifregRootKey: gen.DefaultVerifregRootkeyActor,
NetworkName: cctx.String("network-name"), RemainderAccount: gen.DefaultRemainderAccountActor,
NetworkName: cctx.String("network-name"),
} }
if out.NetworkName == "" { if out.NetworkName == "" {
out.NetworkName = "localnet-" + uuid.New().String() out.NetworkName = "localnet-" + uuid.New().String()

View File

@ -9,7 +9,7 @@ import (
) )
var configCmd = &cli.Command{ var configCmd = &cli.Command{
Name: "config", Name: "config",
Usage: "Output default configuration", Usage: "Output default configuration",
Action: func(cctx *cli.Context) error { Action: func(cctx *cli.Context) error {
comm, err := config.ConfigComment(config.DefaultStorageMiner()) comm, err := config.ConfigComment(config.DefaultStorageMiner())

View File

@ -26,10 +26,12 @@ type ErrBadCommD struct{ error }
type ErrExpiredTicket struct{ error } type ErrExpiredTicket struct{ error }
type ErrBadTicket struct{ error } type ErrBadTicket struct{ error }
type ErrPrecommitOnChain struct{ error } type ErrPrecommitOnChain struct{ error }
type ErrSectorNumberAllocated struct{ error }
type ErrBadSeed struct{ error } type ErrBadSeed struct{ error }
type ErrInvalidProof struct{ error } type ErrInvalidProof struct{ error }
type ErrNoPrecommit struct{ error } type ErrNoPrecommit struct{ error }
type ErrCommitWaitFailed struct{ error }
func checkPieces(ctx context.Context, si SectorInfo, api SealingAPI) error { func checkPieces(ctx context.Context, si SectorInfo, api SealingAPI) error {
tok, height, err := api.ChainHead(ctx) tok, height, err := api.ChainHead(ctx)
@ -87,6 +89,9 @@ func checkPrecommit(ctx context.Context, maddr address.Address, si SectorInfo, t
pci, err := api.StateSectorPreCommitInfo(ctx, maddr, si.SectorNumber, tok) pci, err := api.StateSectorPreCommitInfo(ctx, maddr, si.SectorNumber, tok)
if err != nil { if err != nil {
if err == ErrSectorAllocated {
return &ErrSectorNumberAllocated{err}
}
return &ErrApi{xerrors.Errorf("getting precommit info: %w", err)} return &ErrApi{xerrors.Errorf("getting precommit info: %w", err)}
} }
@ -106,6 +111,16 @@ func (m *Sealing) checkCommit(ctx context.Context, si SectorInfo, proof []byte,
} }
pci, err := m.api.StateSectorPreCommitInfo(ctx, m.maddr, si.SectorNumber, tok) pci, err := m.api.StateSectorPreCommitInfo(ctx, m.maddr, si.SectorNumber, tok)
if err == ErrSectorAllocated {
// not much more we can check here, basically try to wait for commit,
// and hope that this will work
if si.CommitMessage != nil {
return &ErrCommitWaitFailed{err}
}
return err
}
if err != nil { if err != nil {
return xerrors.Errorf("getting precommit info: %w", err) return xerrors.Errorf("getting precommit info: %w", err)
} }

View File

@ -108,6 +108,7 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto
on(SectorRetryPreCommitWait{}, PreCommitWait), on(SectorRetryPreCommitWait{}, PreCommitWait),
on(SectorChainPreCommitFailed{}, PreCommitFailed), on(SectorChainPreCommitFailed{}, PreCommitFailed),
on(SectorRetryPreCommit{}, PreCommitting), on(SectorRetryPreCommit{}, PreCommitting),
on(SectorRetryCommitWait{}, CommitWait),
), ),
FinalizeFailed: planOne( FinalizeFailed: planOne(
on(SectorRetryFinalize{}, FinalizeSector), on(SectorRetryFinalize{}, FinalizeSector),
@ -225,6 +226,8 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta
*/ */
m.stats.updateSector(m.minerSector(state.SectorNumber), state.State)
switch state.State { switch state.State {
// Happy path // Happy path
case Empty: case Empty:
@ -315,6 +318,8 @@ func planCommitting(events []statemachine.Event, state *SectorInfo) error {
state.State = SealPreCommit1Failed state.State = SealPreCommit1Failed
case SectorCommitFailed: case SectorCommitFailed:
state.State = CommitFailed state.State = CommitFailed
case SectorRetryCommitWait:
state.State = CommitWait
default: default:
return xerrors.Errorf("planCommitting got event of unknown type %T, events: %+v", event.User, events) return xerrors.Errorf("planCommitting got event of unknown type %T, events: %+v", event.User, events)
} }
@ -328,10 +333,25 @@ func (m *Sealing) restartSectors(ctx context.Context) error {
log.Errorf("loading sector list: %+v", err) log.Errorf("loading sector list: %+v", err)
} }
cfg, err := m.getConfig()
if err != nil {
return xerrors.Errorf("getting the sealing delay: %w", err)
}
for _, sector := range trackedSectors { for _, sector := range trackedSectors {
if err := m.sectors.Send(uint64(sector.SectorNumber), SectorRestart{}); err != nil { if err := m.sectors.Send(uint64(sector.SectorNumber), SectorRestart{}); err != nil {
log.Errorf("restarting sector %d: %+v", sector.SectorNumber, err) log.Errorf("restarting sector %d: %+v", sector.SectorNumber, err)
} }
if sector.State == WaitDeals {
if cfg.WaitDealsDelay > 0 {
timer := time.NewTimer(cfg.WaitDealsDelay)
go func() {
<-timer.C
m.StartPacking(sector.SectorNumber)
}()
}
}
} }
// TODO: Grab on-chain sector set and diff with trackedSectors // TODO: Grab on-chain sector set and diff with trackedSectors

View File

@ -252,6 +252,10 @@ func (evt SectorRetryInvalidProof) apply(state *SectorInfo) {
state.InvalidProofs++ state.InvalidProofs++
} }
type SectorRetryCommitWait struct{}
func (evt SectorRetryCommitWait) apply(state *SectorInfo) {}
// Faults // Faults
type SectorFaulty struct{} type SectorFaulty struct{}

View File

@ -3,6 +3,8 @@ package sealing
import ( import (
"testing" "testing"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/specs-actors/actors/abi"
logging "github.com/ipfs/go-log/v2" logging "github.com/ipfs/go-log/v2"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
@ -25,8 +27,14 @@ type test struct {
} }
func TestHappyPath(t *testing.T) { func TestHappyPath(t *testing.T) {
ma, _ := address.NewIDAddress(55151)
m := test{ m := test{
s: &Sealing{}, s: &Sealing{
maddr: ma,
stats: SectorStats{
bySector: map[abi.SectorID]statSectorState{},
},
},
t: t, t: t,
state: &SectorInfo{State: Packing}, state: &SectorInfo{State: Packing},
} }
@ -60,8 +68,14 @@ func TestHappyPath(t *testing.T) {
} }
func TestSeedRevert(t *testing.T) { func TestSeedRevert(t *testing.T) {
ma, _ := address.NewIDAddress(55151)
m := test{ m := test{
s: &Sealing{}, s: &Sealing{
maddr: ma,
stats: SectorStats{
bySector: map[abi.SectorID]statSectorState{},
},
},
t: t, t: t,
state: &SectorInfo{State: Packing}, state: &SectorInfo{State: Packing},
} }
@ -101,8 +115,14 @@ func TestSeedRevert(t *testing.T) {
} }
func TestPlanCommittingHandlesSectorCommitFailed(t *testing.T) { func TestPlanCommittingHandlesSectorCommitFailed(t *testing.T) {
ma, _ := address.NewIDAddress(55151)
m := test{ m := test{
s: &Sealing{}, s: &Sealing{
maddr: ma,
stats: SectorStats{
bySector: map[abi.SectorID]statSectorState{},
},
},
t: t, t: t,
state: &SectorInfo{State: Committing}, state: &SectorInfo{State: Committing},
} }

View File

@ -31,6 +31,17 @@ func (m *Sealing) pledgeSector(ctx context.Context, sectorID abi.SectorID, exist
} }
func (m *Sealing) PledgeSector() error { func (m *Sealing) PledgeSector() error {
cfg, err := m.getConfig()
if err != nil {
return xerrors.Errorf("getting config: %w", err)
}
if cfg.MaxSealingSectors > 0 {
if m.stats.curSealing() > cfg.MaxSealingSectors {
return xerrors.Errorf("too many sectors sealing (curSealing: %d, max: %d)", m.stats.curSealing(), cfg.MaxSealingSectors)
}
}
go func() { go func() {
ctx := context.TODO() // we can't use the context from command which invokes ctx := context.TODO() // we can't use the context from command which invokes
// this, as we run everything here async, and it's cancelled when the // this, as we run everything here async, and it's cancelled when the

View File

@ -0,0 +1,18 @@
package sealiface
import "time"
// this has to be in a separate package to not make lotus API depend on filecoin-ffi
type Config struct {
// 0 = no limit
MaxWaitDealsSectors uint64
// includes failed, 0 = no limit
MaxSealingSectors uint64
// includes failed, 0 = no limit
MaxSealingSectorsForDeals uint64
WaitDealsDelay time.Duration
}

View File

@ -2,7 +2,9 @@ package sealing
import ( import (
"context" "context"
"errors"
"io" "io"
"math"
"sync" "sync"
"time" "time"
@ -33,9 +35,14 @@ type SectorLocation struct {
Partition uint64 Partition uint64
} }
var ErrSectorAllocated = errors.New("sectorNumber is allocated, but PreCommit info wasn't found on chain")
type SealingAPI interface { type SealingAPI interface {
StateWaitMsg(context.Context, cid.Cid) (MsgLookup, error) StateWaitMsg(context.Context, cid.Cid) (MsgLookup, error)
StateSearchMsg(context.Context, cid.Cid) (*MsgLookup, error)
StateComputeDataCommitment(ctx context.Context, maddr address.Address, sectorType abi.RegisteredSealProof, deals []abi.DealID, tok TipSetToken) (cid.Cid, error) StateComputeDataCommitment(ctx context.Context, maddr address.Address, sectorType abi.RegisteredSealProof, deals []abi.DealID, tok TipSetToken) (cid.Cid, error)
// Can return ErrSectorAllocated in case precommit info wasn't found, but the sector number is marked as allocated
StateSectorPreCommitInfo(ctx context.Context, maddr address.Address, sectorNumber abi.SectorNumber, tok TipSetToken) (*miner.SectorPreCommitOnChainInfo, error) StateSectorPreCommitInfo(ctx context.Context, maddr address.Address, sectorNumber abi.SectorNumber, tok TipSetToken) (*miner.SectorPreCommitOnChainInfo, error)
StateSectorGetInfo(ctx context.Context, maddr address.Address, sectorNumber abi.SectorNumber, tok TipSetToken) (*miner.SectorOnChainInfo, error) StateSectorGetInfo(ctx context.Context, maddr address.Address, sectorNumber abi.SectorNumber, tok TipSetToken) (*miner.SectorOnChainInfo, error)
StateSectorPartition(ctx context.Context, maddr address.Address, sectorNumber abi.SectorNumber, tok TipSetToken) (*SectorLocation, error) StateSectorPartition(ctx context.Context, maddr address.Address, sectorNumber abi.SectorNumber, tok TipSetToken) (*SectorLocation, error)
@ -70,7 +77,9 @@ type Sealing struct {
upgradeLk sync.Mutex upgradeLk sync.Mutex
toUpgrade map[abi.SectorNumber]struct{} toUpgrade map[abi.SectorNumber]struct{}
getSealDelay GetSealingDelayFunc stats SectorStats
getConfig GetSealingConfigFunc
} }
type FeeConfig struct { type FeeConfig struct {
@ -80,7 +89,7 @@ type FeeConfig struct {
type UnsealedSectorMap struct { type UnsealedSectorMap struct {
infos map[abi.SectorNumber]UnsealedSectorInfo infos map[abi.SectorNumber]UnsealedSectorInfo
mux sync.Mutex lk sync.Mutex
} }
type UnsealedSectorInfo struct { type UnsealedSectorInfo struct {
@ -90,7 +99,7 @@ type UnsealedSectorInfo struct {
pieceSizes []abi.UnpaddedPieceSize pieceSizes []abi.UnpaddedPieceSize
} }
func New(api SealingAPI, fc FeeConfig, events Events, maddr address.Address, ds datastore.Batching, sealer sectorstorage.SectorManager, sc SectorIDCounter, verif ffiwrapper.Verifier, pcp PreCommitPolicy, gsd GetSealingDelayFunc) *Sealing { func New(api SealingAPI, fc FeeConfig, events Events, maddr address.Address, ds datastore.Batching, sealer sectorstorage.SectorManager, sc SectorIDCounter, verif ffiwrapper.Verifier, pcp PreCommitPolicy, gc GetSealingConfigFunc) *Sealing {
s := &Sealing{ s := &Sealing{
api: api, api: api,
feeCfg: fc, feeCfg: fc,
@ -103,11 +112,15 @@ func New(api SealingAPI, fc FeeConfig, events Events, maddr address.Address, ds
pcp: pcp, pcp: pcp,
unsealedInfoMap: UnsealedSectorMap{ unsealedInfoMap: UnsealedSectorMap{
infos: make(map[abi.SectorNumber]UnsealedSectorInfo), infos: make(map[abi.SectorNumber]UnsealedSectorInfo),
mux: sync.Mutex{}, lk: sync.Mutex{},
}, },
toUpgrade: map[abi.SectorNumber]struct{}{}, toUpgrade: map[abi.SectorNumber]struct{}{},
getSealDelay: gsd, getConfig: gc,
stats: SectorStats{
bySector: map[abi.SectorID]statSectorState{},
},
} }
s.sectors = statemachine.New(namespace.Wrap(ds, datastore.NewKey(SectorStorePrefix)), s, SectorInfo{}) s.sectors = statemachine.New(namespace.Wrap(ds, datastore.NewKey(SectorStorePrefix)), s, SectorInfo{})
@ -137,18 +150,18 @@ func (m *Sealing) AddPieceToAnySector(ctx context.Context, size abi.UnpaddedPiec
return 0, 0, xerrors.Errorf("piece cannot fit into a sector") return 0, 0, xerrors.Errorf("piece cannot fit into a sector")
} }
m.unsealedInfoMap.mux.Lock() m.unsealedInfoMap.lk.Lock()
sid, pads, err := m.getSectorAndPadding(size) sid, pads, err := m.getSectorAndPadding(size)
if err != nil { if err != nil {
m.unsealedInfoMap.mux.Unlock() m.unsealedInfoMap.lk.Unlock()
return 0, 0, xerrors.Errorf("getting available sector: %w", err) return 0, 0, xerrors.Errorf("getting available sector: %w", err)
} }
for _, p := range pads { for _, p := range pads {
err = m.addPiece(ctx, sid, p.Unpadded(), NewNullReader(p.Unpadded()), nil) err = m.addPiece(ctx, sid, p.Unpadded(), NewNullReader(p.Unpadded()), nil)
if err != nil { if err != nil {
m.unsealedInfoMap.mux.Unlock() m.unsealedInfoMap.lk.Unlock()
return 0, 0, xerrors.Errorf("writing pads: %w", err) return 0, 0, xerrors.Errorf("writing pads: %w", err)
} }
} }
@ -157,12 +170,15 @@ func (m *Sealing) AddPieceToAnySector(ctx context.Context, size abi.UnpaddedPiec
err = m.addPiece(ctx, sid, size, r, &d) err = m.addPiece(ctx, sid, size, r, &d)
if err != nil { if err != nil {
m.unsealedInfoMap.mux.Unlock() m.unsealedInfoMap.lk.Unlock()
return 0, 0, xerrors.Errorf("adding piece to sector: %w", err) return 0, 0, xerrors.Errorf("adding piece to sector: %w", err)
} }
m.unsealedInfoMap.mux.Unlock() startPacking := m.unsealedInfoMap.infos[sid].numDeals >= getDealPerSectorLimit(m.sealer.SectorSize())
if m.unsealedInfoMap.infos[sid].numDeals == getDealPerSectorLimit(m.sealer.SectorSize()) {
m.unsealedInfoMap.lk.Unlock()
if startPacking {
if err := m.StartPacking(sid); err != nil { if err := m.StartPacking(sid); err != nil {
return 0, 0, xerrors.Errorf("start packing: %w", err) return 0, 0, xerrors.Errorf("start packing: %w", err)
} }
@ -171,7 +187,7 @@ func (m *Sealing) AddPieceToAnySector(ctx context.Context, size abi.UnpaddedPiec
return sid, offset, nil return sid, offset, nil
} }
// Caller should hold m.unsealedInfoMap.mux // Caller should hold m.unsealedInfoMap.lk
func (m *Sealing) addPiece(ctx context.Context, sectorID abi.SectorNumber, size abi.UnpaddedPieceSize, r io.Reader, di *DealInfo) error { func (m *Sealing) addPiece(ctx context.Context, sectorID abi.SectorNumber, size abi.UnpaddedPieceSize, r io.Reader, di *DealInfo) error {
log.Infof("Adding piece to sector %d", sectorID) log.Infof("Adding piece to sector %d", sectorID)
ppi, err := m.sealer.AddPiece(sectorstorage.WithPriority(ctx, DealSectorPriority), m.minerSector(sectorID), m.unsealedInfoMap.infos[sectorID].pieceSizes, size, r) ppi, err := m.sealer.AddPiece(sectorstorage.WithPriority(ctx, DealSectorPriority), m.minerSector(sectorID), m.unsealedInfoMap.infos[sectorID].pieceSizes, size, r)
@ -206,7 +222,7 @@ func (m *Sealing) Remove(ctx context.Context, sid abi.SectorNumber) error {
return m.sectors.Send(uint64(sid), SectorRemove{}) return m.sectors.Send(uint64(sid), SectorRemove{})
} }
// Caller should NOT hold m.unsealedInfoMap.mux // Caller should NOT hold m.unsealedInfoMap.lk
func (m *Sealing) StartPacking(sectorID abi.SectorNumber) error { func (m *Sealing) StartPacking(sectorID abi.SectorNumber) error {
log.Infof("Starting packing sector %d", sectorID) log.Infof("Starting packing sector %d", sectorID)
err := m.sectors.Send(uint64(sectorID), SectorStartPacking{}) err := m.sectors.Send(uint64(sectorID), SectorStartPacking{})
@ -214,14 +230,14 @@ func (m *Sealing) StartPacking(sectorID abi.SectorNumber) error {
return err return err
} }
m.unsealedInfoMap.mux.Lock() m.unsealedInfoMap.lk.Lock()
delete(m.unsealedInfoMap.infos, sectorID) delete(m.unsealedInfoMap.infos, sectorID)
m.unsealedInfoMap.mux.Unlock() m.unsealedInfoMap.lk.Unlock()
return nil return nil
} }
// Caller should hold m.unsealedInfoMap.mux // Caller should hold m.unsealedInfoMap.lk
func (m *Sealing) getSectorAndPadding(size abi.UnpaddedPieceSize) (abi.SectorNumber, []abi.PaddedPieceSize, error) { func (m *Sealing) getSectorAndPadding(size abi.UnpaddedPieceSize) (abi.SectorNumber, []abi.PaddedPieceSize, error) {
ss := abi.PaddedPieceSize(m.sealer.SectorSize()) ss := abi.PaddedPieceSize(m.sealer.SectorSize())
for k, v := range m.unsealedInfoMap.infos { for k, v := range m.unsealedInfoMap.infos {
@ -231,7 +247,7 @@ func (m *Sealing) getSectorAndPadding(size abi.UnpaddedPieceSize) (abi.SectorNum
} }
} }
ns, err := m.newSector() ns, err := m.newDealSector()
if err != nil { if err != nil {
return 0, nil, err return 0, nil, err
} }
@ -245,8 +261,66 @@ func (m *Sealing) getSectorAndPadding(size abi.UnpaddedPieceSize) (abi.SectorNum
return ns, nil, nil return ns, nil, nil
} }
// newSector creates a new sector for deal storage // newDealSector creates a new sector for deal storage
func (m *Sealing) newSector() (abi.SectorNumber, error) { func (m *Sealing) newDealSector() (abi.SectorNumber, error) {
// First make sure we don't have too many 'open' sectors
cfg, err := m.getConfig()
if err != nil {
return 0, xerrors.Errorf("getting config: %w", err)
}
if cfg.MaxSealingSectorsForDeals > 0 {
if m.stats.curSealing() > cfg.MaxSealingSectorsForDeals {
return 0, xerrors.Errorf("too many sectors sealing")
}
}
if cfg.MaxWaitDealsSectors > 0 {
// run in a loop because we have to drop the map lock here for a bit
tries := 0
// we have to run in a loop as we're dropping unsealedInfoMap.lk
// to actually call StartPacking. When we do that, another entry can
// get added to unsealedInfoMap.
for uint64(len(m.unsealedInfoMap.infos)) >= cfg.MaxWaitDealsSectors {
if tries > 10 {
// whatever...
break
}
if tries > 0 {
m.unsealedInfoMap.lk.Unlock()
time.Sleep(time.Second)
m.unsealedInfoMap.lk.Lock()
}
tries++
var mostStored abi.PaddedPieceSize = math.MaxUint64
var best abi.SectorNumber = math.MaxUint64
for sn, info := range m.unsealedInfoMap.infos {
if info.stored+1 > mostStored+1 { // 18446744073709551615 + 1 = 0
best = sn
}
}
if best == math.MaxUint64 {
// probably not possible, but who knows
break
}
m.unsealedInfoMap.lk.Unlock()
if err := m.StartPacking(best); err != nil {
log.Error("newDealSector StartPacking error: %+v", err)
continue // let's pretend this is fine
}
m.unsealedInfoMap.lk.Lock()
}
}
// Now actually create a new sector
sid, err := m.sc.Next() sid, err := m.sc.Next()
if err != nil { if err != nil {
return 0, xerrors.Errorf("getting sector number: %w", err) return 0, xerrors.Errorf("getting sector number: %w", err)
@ -272,13 +346,13 @@ func (m *Sealing) newSector() (abi.SectorNumber, error) {
return 0, xerrors.Errorf("starting the sector fsm: %w", err) return 0, xerrors.Errorf("starting the sector fsm: %w", err)
} }
sd, err := m.getSealDelay() cf, err := m.getConfig()
if err != nil { if err != nil {
return 0, xerrors.Errorf("getting the sealing delay: %w", err) return 0, xerrors.Errorf("getting the sealing delay: %w", err)
} }
if sd > 0 { if cf.WaitDealsDelay > 0 {
timer := time.NewTimer(sd) timer := time.NewTimer(cf.WaitDealsDelay)
go func() { go func() {
<-timer.C <-timer.C
m.StartPacking(sid) m.StartPacking(sid)

View File

@ -36,3 +36,14 @@ const (
RemoveFailed SectorState = "RemoveFailed" RemoveFailed SectorState = "RemoveFailed"
Removed SectorState = "Removed" Removed SectorState = "Removed"
) )
func toStatState(st SectorState) statSectorState {
switch st {
case Empty, WaitDeals, Packing, PreCommit1, PreCommit2, PreCommitting, PreCommitWait, WaitSeed, Committing, CommitWait, FinalizeSector:
return sstSealing
case Proving, Removed, Removing:
return sstProving
}
return sstFailed
}

View File

@ -85,6 +85,10 @@ func (m *Sealing) handlePreCommitFailed(ctx statemachine.Context, sector SectorI
return ctx.Send(SectorRetryPreCommit{}) return ctx.Send(SectorRetryPreCommit{})
case *ErrPrecommitOnChain: case *ErrPrecommitOnChain:
// noop // noop
case *ErrSectorNumberAllocated:
log.Errorf("handlePreCommitFailed: sector number already allocated, not proceeding: %+v", err)
// TODO: check if the sector is committed (not sure how we'd end up here)
return nil
default: default:
return xerrors.Errorf("checkPrecommit sanity check error: %w", err) return xerrors.Errorf("checkPrecommit sanity check error: %w", err)
} }
@ -158,6 +162,8 @@ func (m *Sealing) handleCommitFailed(ctx statemachine.Context, sector SectorInfo
return ctx.Send(SectorChainPreCommitFailed{xerrors.Errorf("no precommit: %w", err)}) return ctx.Send(SectorChainPreCommitFailed{xerrors.Errorf("no precommit: %w", err)})
case *ErrPrecommitOnChain: case *ErrPrecommitOnChain:
// noop, this is expected // noop, this is expected
case *ErrSectorNumberAllocated:
// noop, already committed?
default: default:
return xerrors.Errorf("checkPrecommit sanity check error (%T): %w", err, err) return xerrors.Errorf("checkPrecommit sanity check error (%T): %w", err, err)
} }
@ -186,6 +192,12 @@ func (m *Sealing) handleCommitFailed(ctx statemachine.Context, sector SectorInfo
return ctx.Send(SectorRetryPreCommitWait{}) return ctx.Send(SectorRetryPreCommitWait{})
case *ErrNoPrecommit: case *ErrNoPrecommit:
return ctx.Send(SectorRetryPreCommit{}) return ctx.Send(SectorRetryPreCommit{})
case *ErrCommitWaitFailed:
if err := failedCooldown(ctx, sector); err != nil {
return err
}
return ctx.Send(SectorRetryCommitWait{})
default: default:
return xerrors.Errorf("checkCommit sanity check error (%T): %w", err, err) return xerrors.Errorf("checkCommit sanity check error (%T): %w", err, err)
} }

View File

@ -123,6 +123,14 @@ func (m *Sealing) handlePreCommit2(ctx statemachine.Context, sector SectorInfo)
}) })
} }
// TODO: We should probably invoke this method in most (if not all) state transition failures after handlePreCommitting
func (m *Sealing) remarkForUpgrade(sid abi.SectorNumber) {
err := m.MarkForUpgrade(sid)
if err != nil {
log.Errorf("error re-marking sector %d as for upgrade: %+v", sid, err)
}
}
func (m *Sealing) handlePreCommitting(ctx statemachine.Context, sector SectorInfo) error { func (m *Sealing) handlePreCommitting(ctx statemachine.Context, sector SectorInfo) error {
tok, height, err := m.api.ChainHead(ctx.Context()) tok, height, err := m.api.ChainHead(ctx.Context())
if err != nil { if err != nil {
@ -149,6 +157,10 @@ func (m *Sealing) handlePreCommitting(ctx statemachine.Context, sector SectorInf
return ctx.Send(SectorSealPreCommit1Failed{xerrors.Errorf("bad ticket: %w", err)}) return ctx.Send(SectorSealPreCommit1Failed{xerrors.Errorf("bad ticket: %w", err)})
case *ErrPrecommitOnChain: case *ErrPrecommitOnChain:
return ctx.Send(SectorPreCommitLanded{TipSet: tok}) // we re-did precommit return ctx.Send(SectorPreCommitLanded{TipSet: tok}) // we re-did precommit
case *ErrSectorNumberAllocated:
log.Errorf("handlePreCommitFailed: sector number already allocated, not proceeding: %+v", err)
// TODO: check if the sector is committed (not sure how we'd end up here)
return nil
default: default:
return xerrors.Errorf("checkPrecommit sanity check error: %w", err) return xerrors.Errorf("checkPrecommit sanity check error: %w", err)
} }
@ -193,6 +205,9 @@ func (m *Sealing) handlePreCommitting(ctx statemachine.Context, sector SectorInf
log.Infof("submitting precommit for sector %d (deposit: %s): ", sector.SectorNumber, deposit) log.Infof("submitting precommit for sector %d (deposit: %s): ", sector.SectorNumber, deposit)
mcid, err := m.api.SendMsg(ctx.Context(), waddr, m.maddr, builtin.MethodsMiner.PreCommitSector, deposit, m.feeCfg.MaxPreCommitGasFee, enc.Bytes()) mcid, err := m.api.SendMsg(ctx.Context(), waddr, m.maddr, builtin.MethodsMiner.PreCommitSector, deposit, m.feeCfg.MaxPreCommitGasFee, enc.Bytes())
if err != nil { if err != nil {
if params.ReplaceCapacity {
m.remarkForUpgrade(params.ReplaceSectorNumber)
}
return ctx.Send(SectorChainPreCommitFailed{xerrors.Errorf("pushing message to mpool: %w", err)}) return ctx.Send(SectorChainPreCommitFailed{xerrors.Errorf("pushing message to mpool: %w", err)})
} }
@ -204,7 +219,7 @@ func (m *Sealing) handlePreCommitWait(ctx statemachine.Context, sector SectorInf
return ctx.Send(SectorChainPreCommitFailed{xerrors.Errorf("precommit message was nil")}) return ctx.Send(SectorChainPreCommitFailed{xerrors.Errorf("precommit message was nil")})
} }
// would be ideal to just use the events.Called handler, but it wouldnt be able to handle individual message timeouts // would be ideal to just use the events.Called handler, but it wouldn't be able to handle individual message timeouts
log.Info("Sector precommitted: ", sector.SectorNumber) log.Info("Sector precommitted: ", sector.SectorNumber)
mw, err := m.api.StateWaitMsg(ctx.Context(), *sector.PreCommitMessage) mw, err := m.api.StateWaitMsg(ctx.Context(), *sector.PreCommitMessage)
if err != nil { if err != nil {
@ -275,6 +290,20 @@ func (m *Sealing) handleWaitSeed(ctx statemachine.Context, sector SectorInfo) er
} }
func (m *Sealing) handleCommitting(ctx statemachine.Context, sector SectorInfo) error { func (m *Sealing) handleCommitting(ctx statemachine.Context, sector SectorInfo) error {
if sector.CommitMessage != nil {
log.Warnf("sector %d entered committing state with a commit message cid", sector.SectorNumber)
ml, err := m.api.StateSearchMsg(ctx.Context(), *sector.CommitMessage)
if err != nil {
log.Warnf("sector %d searching existing commit message %s: %+v", sector.SectorNumber, *sector.CommitMessage, err)
}
if ml != nil {
// some weird retry paths can lead here
return ctx.Send(SectorRetryCommitWait{})
}
}
log.Info("scheduling seal proof computation...") log.Info("scheduling seal proof computation...")
log.Infof("KOMIT %d %x(%d); %x(%d); %v; r:%x; d:%x", sector.SectorNumber, sector.TicketValue, sector.TicketEpoch, sector.SeedValue, sector.SeedEpoch, sector.pieceInfos(), sector.CommR, sector.CommD) log.Infof("KOMIT %d %x(%d); %x(%d); %v; r:%x; d:%x", sector.SectorNumber, sector.TicketValue, sector.TicketEpoch, sector.SeedValue, sector.SeedEpoch, sector.pieceInfos(), sector.CommR, sector.CommD)

45
extern/storage-sealing/stats.go vendored Normal file
View File

@ -0,0 +1,45 @@
package sealing
import (
"sync"
"github.com/filecoin-project/specs-actors/actors/abi"
)
type statSectorState int
const (
sstSealing statSectorState = iota
sstFailed
sstProving
nsst
)
type SectorStats struct {
lk sync.Mutex
bySector map[abi.SectorID]statSectorState
totals [nsst]uint64
}
func (ss *SectorStats) updateSector(id abi.SectorID, st SectorState) {
ss.lk.Lock()
defer ss.lk.Unlock()
oldst, found := ss.bySector[id]
if found {
ss.totals[oldst]--
}
sst := toStatState(st)
ss.bySector[id] = sst
ss.totals[sst]++
}
// return the number of sectors currently in the sealing pipeline
func (ss *SectorStats) curSealing() uint64 {
ss.lk.Lock()
defer ss.lk.Unlock()
return ss.totals[sstSealing] + ss.totals[sstFailed]
}

View File

@ -3,8 +3,6 @@ package sealing
import ( import (
"bytes" "bytes"
"context" "context"
"time"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
"github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/abi"
@ -14,6 +12,7 @@ import (
"github.com/filecoin-project/specs-storage/storage" "github.com/filecoin-project/specs-storage/storage"
sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage" sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage"
"github.com/filecoin-project/lotus/extern/storage-sealing/sealiface"
) )
// Piece is a tuple of piece and deal info // Piece is a tuple of piece and deal info
@ -188,7 +187,7 @@ type MessageReceipt struct {
GasUsed int64 GasUsed int64
} }
type GetSealingDelayFunc func() (time.Duration, error) type GetSealingConfigFunc func() (sealiface.Config, error)
func (mr *MessageReceipt) Equals(o *MessageReceipt) bool { func (mr *MessageReceipt) Equals(o *MessageReceipt) bool {
return mr.ExitCode == o.ExitCode && bytes.Equal(mr.Return, o.Return) && mr.GasUsed == o.GasUsed return mr.ExitCode == o.ExitCode && bytes.Equal(mr.Return, o.Return) && mr.GasUsed == o.GasUsed

View File

@ -57,18 +57,18 @@ func (m *Sealing) tryUpgradeSector(ctx context.Context, params *miner.SectorPreC
params.ReplaceSectorDeadline = loc.Deadline params.ReplaceSectorDeadline = loc.Deadline
params.ReplaceSectorPartition = loc.Partition params.ReplaceSectorPartition = loc.Partition
ri, err := m.GetSectorInfo(*replace) ri, err := m.api.StateSectorGetInfo(ctx, m.maddr, *replace, nil)
if err != nil { if err != nil {
log.Errorf("error calling GetSectorInfo for replaced sector: %+v", err) log.Errorf("error calling StateSectorGetInfo for replaced sector: %+v", err)
return big.Zero() return big.Zero()
} }
if params.Expiration < ri.PreCommitInfo.Expiration { if params.Expiration < ri.Expiration {
// TODO: Some limit on this // TODO: Some limit on this
params.Expiration = ri.PreCommitInfo.Expiration params.Expiration = ri.Expiration
} }
return ri.PreCommitDeposit return ri.InitialPledge
} }
return big.Zero() return big.Zero()

View File

@ -79,5 +79,6 @@ type Template struct {
NetworkName string NetworkName string
Timestamp uint64 `json:",omitempty"` Timestamp uint64 `json:",omitempty"`
VerifregRootKey Actor VerifregRootKey Actor
RemainderAccount Actor
} }

2
go.mod
View File

@ -38,7 +38,7 @@ require (
github.com/filecoin-project/go-statestore v0.1.0 github.com/filecoin-project/go-statestore v0.1.0
github.com/filecoin-project/go-storedcounter v0.0.0-20200421200003-1c99c62e8a5b github.com/filecoin-project/go-storedcounter v0.0.0-20200421200003-1c99c62e8a5b
github.com/filecoin-project/sector-storage v0.0.0-20200810171746-eac70842d8e0 // indirect github.com/filecoin-project/sector-storage v0.0.0-20200810171746-eac70842d8e0 // indirect
github.com/filecoin-project/specs-actors v0.9.2 github.com/filecoin-project/specs-actors v0.9.3
github.com/filecoin-project/specs-storage v0.1.1-0.20200730063404-f7db367e9401 github.com/filecoin-project/specs-storage v0.1.1-0.20200730063404-f7db367e9401
github.com/filecoin-project/storage-fsm v0.0.0-20200805013058-9d9ea4e6331f github.com/filecoin-project/storage-fsm v0.0.0-20200805013058-9d9ea4e6331f
github.com/gbrlsnchs/jwt/v3 v3.0.0-beta.1 github.com/gbrlsnchs/jwt/v3 v3.0.0-beta.1

1
go.sum
View File

@ -277,6 +277,7 @@ github.com/filecoin-project/specs-actors v0.8.7-0.20200811203034-272d022c1923/go
github.com/filecoin-project/specs-actors v0.9.2 h1:0JG0QLHw8pO6BPqPRe9eQxQW60biHAQsx1rlQ9QbzZ0= github.com/filecoin-project/specs-actors v0.9.2 h1:0JG0QLHw8pO6BPqPRe9eQxQW60biHAQsx1rlQ9QbzZ0=
github.com/filecoin-project/specs-actors v0.9.2/go.mod h1:YasnVUOUha0DN5wB+twl+V8LlDKVNknRG00kTJpsfFA= github.com/filecoin-project/specs-actors v0.9.2/go.mod h1:YasnVUOUha0DN5wB+twl+V8LlDKVNknRG00kTJpsfFA=
github.com/filecoin-project/specs-actors v0.9.3 h1:Fi75G/UQ7R4eiIwnN+S6bBQ9LqKivyJdw62jJzTi6aE= github.com/filecoin-project/specs-actors v0.9.3 h1:Fi75G/UQ7R4eiIwnN+S6bBQ9LqKivyJdw62jJzTi6aE=
github.com/filecoin-project/specs-actors v0.9.3/go.mod h1:YasnVUOUha0DN5wB+twl+V8LlDKVNknRG00kTJpsfFA=
github.com/filecoin-project/specs-storage v0.1.1-0.20200622113353-88a9704877ea h1:iixjULRQFPn7Q9KlIqfwLJnlAXO10bbkI+xy5GKGdLY= github.com/filecoin-project/specs-storage v0.1.1-0.20200622113353-88a9704877ea h1:iixjULRQFPn7Q9KlIqfwLJnlAXO10bbkI+xy5GKGdLY=
github.com/filecoin-project/specs-storage v0.1.1-0.20200622113353-88a9704877ea/go.mod h1:Pr5ntAaxsh+sLG/LYiL4tKzvA83Vk5vLODYhfNwOg7k= github.com/filecoin-project/specs-storage v0.1.1-0.20200622113353-88a9704877ea/go.mod h1:Pr5ntAaxsh+sLG/LYiL4tKzvA83Vk5vLODYhfNwOg7k=
github.com/filecoin-project/specs-storage v0.1.1-0.20200730063404-f7db367e9401 h1:jLzN1hwO5WpKPu8ASbW8fs1FUCsOWNvoBXzQhv+8/E8= github.com/filecoin-project/specs-storage v0.1.1-0.20200730063404-f7db367e9401 h1:jLzN1hwO5WpKPu8ASbW8fs1FUCsOWNvoBXzQhv+8/E8=

View File

@ -514,7 +514,7 @@ func (c *ClientNodeAdapter) GetMinerInfo(ctx context.Context, addr address.Addre
return nil, err return nil, err
} }
out := utils.NewStorageProviderInfo(addr, mi.Worker, mi.SectorSize, mi.PeerId, mi.Multiaddrs) out := utils.NewStorageProviderInfo(addr, mi.Worker, mi.SectorSize, *mi.PeerId, mi.Multiaddrs)
return &out, nil return &out, nil
} }

View File

@ -328,8 +328,8 @@ func Online() Option {
Override(new(dtypes.SetConsiderOfflineStorageDealsConfigFunc), modules.NewSetConsideringOfflineStorageDealsFunc), Override(new(dtypes.SetConsiderOfflineStorageDealsConfigFunc), modules.NewSetConsideringOfflineStorageDealsFunc),
Override(new(dtypes.ConsiderOfflineRetrievalDealsConfigFunc), modules.NewConsiderOfflineRetrievalDealsConfigFunc), Override(new(dtypes.ConsiderOfflineRetrievalDealsConfigFunc), modules.NewConsiderOfflineRetrievalDealsConfigFunc),
Override(new(dtypes.SetConsiderOfflineRetrievalDealsConfigFunc), modules.NewSetConsiderOfflineRetrievalDealsConfigFunc), Override(new(dtypes.SetConsiderOfflineRetrievalDealsConfigFunc), modules.NewSetConsiderOfflineRetrievalDealsConfigFunc),
Override(new(dtypes.SetSealingDelayFunc), modules.NewSetSealDelayFunc), Override(new(dtypes.SetSealingConfigFunc), modules.NewSetSealConfigFunc),
Override(new(dtypes.GetSealingDelayFunc), modules.NewGetSealDelayFunc), Override(new(dtypes.GetSealingConfigFunc), modules.NewGetSealConfigFunc),
Override(new(dtypes.SetExpectedSealDurationFunc), modules.NewSetExpectedSealDurationFunc), Override(new(dtypes.SetExpectedSealDurationFunc), modules.NewSetExpectedSealDurationFunc),
Override(new(dtypes.GetExpectedSealDurationFunc), modules.NewGetExpectedSealDurationFunc), Override(new(dtypes.GetExpectedSealDurationFunc), modules.NewGetExpectedSealDurationFunc),
), ),

View File

@ -31,10 +31,9 @@ type StorageMiner struct {
Common Common
Dealmaking DealmakingConfig Dealmaking DealmakingConfig
Sealing SealingConfig
Storage sectorstorage.SealerConfig Storage sectorstorage.SealerConfig
Fees MinerFeeConfig Fees MinerFeeConfig
SealingDelay Duration
} }
type DealmakingConfig struct { type DealmakingConfig struct {
@ -48,6 +47,19 @@ type DealmakingConfig struct {
Filter string Filter string
} }
type SealingConfig struct {
// 0 = no limit
MaxWaitDealsSectors uint64
// includes failed, 0 = no limit
MaxSealingSectors uint64
// includes failed, 0 = no limit
MaxSealingSectorsForDeals uint64
WaitDealsDelay Duration
}
type MinerFeeConfig struct { type MinerFeeConfig struct {
MaxPreCommitGasFee types.FIL MaxPreCommitGasFee types.FIL
MaxCommitGasFee types.FIL MaxCommitGasFee types.FIL
@ -131,6 +143,13 @@ func DefaultStorageMiner() *StorageMiner {
cfg := &StorageMiner{ cfg := &StorageMiner{
Common: defCommon(), Common: defCommon(),
Sealing: SealingConfig{
MaxWaitDealsSectors: 2, // 64G with 32G sectors
MaxSealingSectors: 0,
MaxSealingSectorsForDeals: 0,
WaitDealsDelay: Duration(time.Hour),
},
Storage: sectorstorage.SealerConfig{ Storage: sectorstorage.SealerConfig{
AllowAddPiece: true, AllowAddPiece: true,
AllowPreCommit1: true, AllowPreCommit1: true,
@ -158,8 +177,6 @@ func DefaultStorageMiner() *StorageMiner {
MaxCommitGasFee: types.FIL(types.BigDiv(types.FromFil(1), types.NewInt(20))), MaxCommitGasFee: types.FIL(types.BigDiv(types.FromFil(1), types.NewInt(20))),
MaxWindowPoStGasFee: types.FIL(types.FromFil(50)), MaxWindowPoStGasFee: types.FIL(types.FromFil(50)),
}, },
SealingDelay: Duration(time.Hour),
} }
cfg.Common.API.ListenAddress = "/ip4/127.0.0.1/tcp/2345/http" cfg.Common.API.ListenAddress = "/ip4/127.0.0.1/tcp/2345/http"
cfg.Common.API.RemoteListenAddress = "127.0.0.1:2345" cfg.Common.API.RemoteListenAddress = "127.0.0.1:2345"

View File

@ -137,7 +137,7 @@ func (a *API) ClientStartDeal(ctx context.Context, params *api.StartDealParams)
return nil, xerrors.New("data doesn't fit in a sector") return nil, xerrors.New("data doesn't fit in a sector")
} }
providerInfo := utils.NewStorageProviderInfo(params.Miner, mi.Worker, mi.SectorSize, mi.PeerId, mi.Multiaddrs) providerInfo := utils.NewStorageProviderInfo(params.Miner, mi.Worker, mi.SectorSize, *mi.PeerId, mi.Multiaddrs)
dealStart := params.DealStartEpoch dealStart := params.DealStartEpoch
if dealStart <= 0 { // unset, or explicitly 'epoch undefined' if dealStart <= 0 { // unset, or explicitly 'epoch undefined'
@ -255,7 +255,7 @@ func (a *API) ClientMinerQueryOffer(ctx context.Context, miner address.Address,
} }
rp := rm.RetrievalPeer{ rp := rm.RetrievalPeer{
Address: miner, Address: miner,
ID: mi.PeerId, ID: *mi.PeerId,
} }
return a.makeRetrievalQuery(ctx, rp, root, piece, rm.QueryParams{}), nil return a.makeRetrievalQuery(ctx, rp, root, piece, rm.QueryParams{}), nil
} }
@ -400,7 +400,27 @@ func (a *API) ClientListImports(ctx context.Context) ([]api.Import, error) {
return out, nil return out, nil
} }
func (a *API) ClientRetrieve(ctx context.Context, order api.RetrievalOrder, ref *api.FileRef) (<-chan marketevents.RetrievalEvent, error) { func (a *API) ClientRetrieve(ctx context.Context, order api.RetrievalOrder, ref *api.FileRef) error {
events := make(chan marketevents.RetrievalEvent)
go a.clientRetrieve(ctx, order, ref, events)
for {
select {
case evt, ok := <-events:
if !ok { // done successfully
return nil
}
if evt.Err != "" {
return xerrors.Errorf("retrieval failed: %s", evt.Err)
}
case <-ctx.Done():
return xerrors.Errorf("retrieval timed out")
}
}
}
func (a *API) ClientRetrieveWithEvents(ctx context.Context, order api.RetrievalOrder, ref *api.FileRef) (<-chan marketevents.RetrievalEvent, error) {
events := make(chan marketevents.RetrievalEvent) events := make(chan marketevents.RetrievalEvent)
go a.clientRetrieve(ctx, order, ref, events) go a.clientRetrieve(ctx, order, ref, events)
return events, nil return events, nil
@ -423,7 +443,7 @@ func (a *API) clientRetrieve(ctx context.Context, order api.RetrievalOrder, ref
} }
order.MinerPeer = retrievalmarket.RetrievalPeer{ order.MinerPeer = retrievalmarket.RetrievalPeer{
ID: mi.PeerId, ID: *mi.PeerId,
Address: order.Miner, Address: order.Miner,
} }
} }

View File

@ -77,7 +77,7 @@ func (a *GasAPI) GasEstimateGasPremium(ctx context.Context, nblocksincl uint64,
ts := a.Chain.GetHeaviestTipSet() ts := a.Chain.GetHeaviestTipSet()
for i := uint64(0); i < nblocksincl*2; i++ { for i := uint64(0); i < nblocksincl*2; i++ {
if len(ts.Parents().Cids()) == 0 { if ts.Height() == 0 {
break // genesis break // genesis
} }

View File

@ -62,8 +62,8 @@ type StorageMinerAPI struct {
SetConsiderOfflineStorageDealsConfigFunc dtypes.SetConsiderOfflineStorageDealsConfigFunc SetConsiderOfflineStorageDealsConfigFunc dtypes.SetConsiderOfflineStorageDealsConfigFunc
ConsiderOfflineRetrievalDealsConfigFunc dtypes.ConsiderOfflineRetrievalDealsConfigFunc ConsiderOfflineRetrievalDealsConfigFunc dtypes.ConsiderOfflineRetrievalDealsConfigFunc
SetConsiderOfflineRetrievalDealsConfigFunc dtypes.SetConsiderOfflineRetrievalDealsConfigFunc SetConsiderOfflineRetrievalDealsConfigFunc dtypes.SetConsiderOfflineRetrievalDealsConfigFunc
SetSealingDelayFunc dtypes.SetSealingDelayFunc SetSealingConfigFunc dtypes.SetSealingConfigFunc
GetSealingDelayFunc dtypes.GetSealingDelayFunc GetSealingConfigFunc dtypes.GetSealingConfigFunc
GetExpectedSealDurationFunc dtypes.GetExpectedSealDurationFunc GetExpectedSealDurationFunc dtypes.GetExpectedSealDurationFunc
SetExpectedSealDurationFunc dtypes.SetExpectedSealDurationFunc SetExpectedSealDurationFunc dtypes.SetExpectedSealDurationFunc
} }
@ -232,11 +232,22 @@ func (sm *StorageMinerAPI) SectorStartSealing(ctx context.Context, number abi.Se
} }
func (sm *StorageMinerAPI) SectorSetSealDelay(ctx context.Context, delay time.Duration) error { func (sm *StorageMinerAPI) SectorSetSealDelay(ctx context.Context, delay time.Duration) error {
return sm.SetSealingDelayFunc(delay) cfg, err := sm.GetSealingConfigFunc()
if err != nil {
return xerrors.Errorf("get config: %w", err)
}
cfg.WaitDealsDelay = delay
return sm.SetSealingConfigFunc(cfg)
} }
func (sm *StorageMinerAPI) SectorGetSealDelay(ctx context.Context) (time.Duration, error) { func (sm *StorageMinerAPI) SectorGetSealDelay(ctx context.Context) (time.Duration, error) {
return sm.GetSealingDelayFunc() cfg, err := sm.GetSealingConfigFunc()
if err != nil {
return 0, err
}
return cfg.WaitDealsDelay, nil
} }
func (sm *StorageMinerAPI) SectorSetExpectedSealDuration(ctx context.Context, delay time.Duration) error { func (sm *StorageMinerAPI) SectorSetExpectedSealDuration(ctx context.Context, delay time.Duration) error {

View File

@ -9,6 +9,8 @@ import (
"github.com/filecoin-project/go-address" "github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-fil-markets/storagemarket" "github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/lotus/extern/storage-sealing/sealiface"
) )
type MinerAddress address.Address type MinerAddress address.Address
@ -56,10 +58,10 @@ type ConsiderOfflineRetrievalDealsConfigFunc func() (bool, error)
type SetConsiderOfflineRetrievalDealsConfigFunc func(bool) error type SetConsiderOfflineRetrievalDealsConfigFunc func(bool) error
// SetSealingDelay sets how long a sector waits for more deals before sealing begins. // SetSealingDelay sets how long a sector waits for more deals before sealing begins.
type SetSealingDelayFunc func(time.Duration) error type SetSealingConfigFunc func(sealiface.Config) error
// GetSealingDelay returns how long a sector waits for more deals before sealing begins. // GetSealingDelay returns how long a sector waits for more deals before sealing begins.
type GetSealingDelayFunc func() (time.Duration, error) type GetSealingConfigFunc func() (sealiface.Config, error)
// SetExpectedSealDurationFunc is a function which is used to set how long sealing is expected to take. // SetExpectedSealDurationFunc is a function which is used to set how long sealing is expected to take.
// Deals that would need to start earlier than this duration will be rejected. // Deals that would need to start earlier than this duration will be rejected.

View File

@ -48,6 +48,7 @@ import (
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper" "github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
"github.com/filecoin-project/lotus/extern/sector-storage/stores" "github.com/filecoin-project/lotus/extern/sector-storage/stores"
sealing "github.com/filecoin-project/lotus/extern/storage-sealing" sealing "github.com/filecoin-project/lotus/extern/storage-sealing"
"github.com/filecoin-project/lotus/extern/storage-sealing/sealiface"
lapi "github.com/filecoin-project/lotus/api" lapi "github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/build"
@ -141,8 +142,8 @@ func SectorIDCounter(ds dtypes.MetadataDS) sealing.SectorIDCounter {
return &sidsc{sc} return &sidsc{sc}
} }
func StorageMiner(fc config.MinerFeeConfig) func(mctx helpers.MetricsCtx, lc fx.Lifecycle, api lapi.FullNode, h host.Host, ds dtypes.MetadataDS, sealer sectorstorage.SectorManager, sc sealing.SectorIDCounter, verif ffiwrapper.Verifier, gsd dtypes.GetSealingDelayFunc) (*storage.Miner, error) { func StorageMiner(fc config.MinerFeeConfig) func(mctx helpers.MetricsCtx, lc fx.Lifecycle, api lapi.FullNode, h host.Host, ds dtypes.MetadataDS, sealer sectorstorage.SectorManager, sc sealing.SectorIDCounter, verif ffiwrapper.Verifier, gsd dtypes.GetSealingConfigFunc) (*storage.Miner, error) {
return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, api lapi.FullNode, h host.Host, ds dtypes.MetadataDS, sealer sectorstorage.SectorManager, sc sealing.SectorIDCounter, verif ffiwrapper.Verifier, gsd dtypes.GetSealingDelayFunc) (*storage.Miner, error) { return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, api lapi.FullNode, h host.Host, ds dtypes.MetadataDS, sealer sectorstorage.SectorManager, sc sealing.SectorIDCounter, verif ffiwrapper.Verifier, gsd dtypes.GetSealingConfigFunc) (*storage.Miner, error) {
maddr, err := minerAddrFromDS(ds) maddr, err := minerAddrFromDS(ds)
if err != nil { if err != nil {
return nil, err return nil, err
@ -593,19 +594,28 @@ func NewSetConsiderOfflineRetrievalDealsConfigFunc(r repo.LockedRepo) (dtypes.Se
}, nil }, nil
} }
func NewSetSealDelayFunc(r repo.LockedRepo) (dtypes.SetSealingDelayFunc, error) { func NewSetSealConfigFunc(r repo.LockedRepo) (dtypes.SetSealingConfigFunc, error) {
return func(delay time.Duration) (err error) { return func(cfg sealiface.Config) (err error) {
err = mutateCfg(r, func(cfg *config.StorageMiner) { err = mutateCfg(r, func(c *config.StorageMiner) {
cfg.SealingDelay = config.Duration(delay) c.Sealing = config.SealingConfig{
MaxWaitDealsSectors: cfg.MaxWaitDealsSectors,
MaxSealingSectors: cfg.MaxSealingSectors,
WaitDealsDelay: config.Duration(cfg.WaitDealsDelay),
}
}) })
return return
}, nil }, nil
} }
func NewGetSealDelayFunc(r repo.LockedRepo) (dtypes.GetSealingDelayFunc, error) { func NewGetSealConfigFunc(r repo.LockedRepo) (dtypes.GetSealingConfigFunc, error) {
return func() (out time.Duration, err error) { return func() (out sealiface.Config, err error) {
err = readCfg(r, func(cfg *config.StorageMiner) { err = readCfg(r, func(cfg *config.StorageMiner) {
out = time.Duration(cfg.SealingDelay) out = sealiface.Config{
MaxWaitDealsSectors: cfg.Sealing.MaxWaitDealsSectors,
MaxSealingSectors: cfg.Sealing.MaxSealingSectors,
MaxSealingSectorsForDeals: cfg.Sealing.MaxSealingSectorsForDeals,
WaitDealsDelay: time.Duration(cfg.Sealing.WaitDealsDelay),
}
}) })
return return
}, nil }, nil

View File

@ -212,10 +212,11 @@ func builder(t *testing.T, nFull int, storage []test.StorageMiner) ([]test.TestN
genms = append(genms, *genm) genms = append(genms, *genm)
} }
templ := &genesis.Template{ templ := &genesis.Template{
Accounts: genaccs, Accounts: genaccs,
Miners: genms, Miners: genms,
Timestamp: uint64(time.Now().Unix() - 10000), // some time sufficiently far in the past Timestamp: uint64(time.Now().Unix() - 10000), // some time sufficiently far in the past
VerifregRootKey: gen.DefaultVerifregRootkeyActor, VerifregRootKey: gen.DefaultVerifregRootkeyActor,
RemainderAccount: gen.DefaultRemainderAccountActor,
} }
// END PRESEAL SECTION // END PRESEAL SECTION
@ -351,7 +352,7 @@ func mockSbBuilder(t *testing.T, nFull int, storage []test.StorageMiner) ([]test
genaccs = append(genaccs, genesis.Actor{ genaccs = append(genaccs, genesis.Actor{
Type: genesis.TAccount, Type: genesis.TAccount,
Balance: big.Mul(big.NewInt(400_000_000_000), types.NewInt(build.FilecoinPrecision)), Balance: big.Mul(big.NewInt(400_000_000), types.NewInt(build.FilecoinPrecision)),
Meta: (&genesis.AccountMeta{Owner: wk.Address}).ActorMeta(), Meta: (&genesis.AccountMeta{Owner: wk.Address}).ActorMeta(),
}) })
@ -362,10 +363,11 @@ func mockSbBuilder(t *testing.T, nFull int, storage []test.StorageMiner) ([]test
genms = append(genms, *genm) genms = append(genms, *genm)
} }
templ := &genesis.Template{ templ := &genesis.Template{
Accounts: genaccs, Accounts: genaccs,
Miners: genms, Miners: genms,
Timestamp: uint64(time.Now().Unix()) - (build.BlockDelaySecs * 20000), Timestamp: uint64(time.Now().Unix()) - (build.BlockDelaySecs * 20000),
VerifregRootKey: gen.DefaultVerifregRootkeyActor, VerifregRootKey: gen.DefaultVerifregRootkeyActor,
RemainderAccount: gen.DefaultRemainderAccountActor,
} }
// END PRESEAL SECTION // END PRESEAL SECTION

View File

@ -108,6 +108,27 @@ func (s SealingAPIAdapter) StateWaitMsg(ctx context.Context, mcid cid.Cid) (seal
}, nil }, nil
} }
func (s SealingAPIAdapter) StateSearchMsg(ctx context.Context, c cid.Cid) (*sealing.MsgLookup, error) {
wmsg, err := s.delegate.StateSearchMsg(ctx, c)
if err != nil {
return nil, err
}
if wmsg == nil {
return nil, nil
}
return &sealing.MsgLookup{
Receipt: sealing.MessageReceipt{
ExitCode: wmsg.Receipt.ExitCode,
Return: wmsg.Receipt.Return,
GasUsed: wmsg.Receipt.GasUsed,
},
TipSetTok: wmsg.TipSet.Bytes(),
Height: wmsg.Height,
}, nil
}
func (s SealingAPIAdapter) StateComputeDataCommitment(ctx context.Context, maddr address.Address, sectorType abi.RegisteredSealProof, deals []abi.DealID, tok sealing.TipSetToken) (cid.Cid, error) { func (s SealingAPIAdapter) StateComputeDataCommitment(ctx context.Context, maddr address.Address, sectorType abi.RegisteredSealProof, deals []abi.DealID, tok sealing.TipSetToken) (cid.Cid, error) {
tsk, err := types.TipSetKeyFromBytes(tok) tsk, err := types.TipSetKeyFromBytes(tok)
if err != nil { if err != nil {
@ -186,7 +207,7 @@ func (s SealingAPIAdapter) StateSectorPreCommitInfo(ctx context.Context, maddr a
return nil, xerrors.Errorf("checking if sector is allocated: %w", err) return nil, xerrors.Errorf("checking if sector is allocated: %w", err)
} }
if set { if set {
return nil, xerrors.Errorf("sectorNumber is allocated") return nil, sealing.ErrSectorAllocated
} }
return nil, nil return nil, nil

View File

@ -43,8 +43,8 @@ type Miner struct {
maddr address.Address maddr address.Address
worker address.Address worker address.Address
getSealDelay dtypes.GetSealingDelayFunc getSealConfig dtypes.GetSealingConfigFunc
sealing *sealing.Sealing sealing *sealing.Sealing
} }
type storageMinerApi interface { type storageMinerApi interface {
@ -60,6 +60,7 @@ type storageMinerApi interface {
StateMinerProvingDeadline(context.Context, address.Address, types.TipSetKey) (*miner.DeadlineInfo, error) StateMinerProvingDeadline(context.Context, address.Address, types.TipSetKey) (*miner.DeadlineInfo, error)
StateMinerPreCommitDepositForPower(context.Context, address.Address, miner.SectorPreCommitInfo, types.TipSetKey) (types.BigInt, error) StateMinerPreCommitDepositForPower(context.Context, address.Address, miner.SectorPreCommitInfo, types.TipSetKey) (types.BigInt, error)
StateMinerInitialPledgeCollateral(context.Context, address.Address, miner.SectorPreCommitInfo, types.TipSetKey) (types.BigInt, error) StateMinerInitialPledgeCollateral(context.Context, address.Address, miner.SectorPreCommitInfo, types.TipSetKey) (types.BigInt, error)
StateSearchMsg(context.Context, cid.Cid) (*api.MsgLookup, error)
StateWaitMsg(ctx context.Context, cid cid.Cid, confidence uint64) (*api.MsgLookup, error) // TODO: removeme eventually StateWaitMsg(ctx context.Context, cid cid.Cid, confidence uint64) (*api.MsgLookup, error) // TODO: removeme eventually
StateGetActor(ctx context.Context, actor address.Address, ts types.TipSetKey) (*types.Actor, error) StateGetActor(ctx context.Context, actor address.Address, ts types.TipSetKey) (*types.Actor, error)
StateGetReceipt(context.Context, cid.Cid, types.TipSetKey) (*types.MessageReceipt, error) StateGetReceipt(context.Context, cid.Cid, types.TipSetKey) (*types.MessageReceipt, error)
@ -84,7 +85,7 @@ type storageMinerApi interface {
WalletHas(context.Context, address.Address) (bool, error) WalletHas(context.Context, address.Address) (bool, error)
} }
func NewMiner(api storageMinerApi, maddr, worker address.Address, h host.Host, ds datastore.Batching, sealer sectorstorage.SectorManager, sc sealing.SectorIDCounter, verif ffiwrapper.Verifier, gsd dtypes.GetSealingDelayFunc, feeCfg config.MinerFeeConfig) (*Miner, error) { func NewMiner(api storageMinerApi, maddr, worker address.Address, h host.Host, ds datastore.Batching, sealer sectorstorage.SectorManager, sc sealing.SectorIDCounter, verif ffiwrapper.Verifier, gsd dtypes.GetSealingConfigFunc, feeCfg config.MinerFeeConfig) (*Miner, error) {
m := &Miner{ m := &Miner{
api: api, api: api,
feeCfg: feeCfg, feeCfg: feeCfg,
@ -94,9 +95,9 @@ func NewMiner(api storageMinerApi, maddr, worker address.Address, h host.Host, d
sc: sc, sc: sc,
verif: verif, verif: verif,
maddr: maddr, maddr: maddr,
worker: worker, worker: worker,
getSealDelay: gsd, getSealConfig: gsd,
} }
return m, nil return m, nil
@ -120,7 +121,7 @@ func (m *Miner) Run(ctx context.Context) error {
evts := events.NewEvents(ctx, m.api) evts := events.NewEvents(ctx, m.api)
adaptedAPI := NewSealingAPIAdapter(m.api) adaptedAPI := NewSealingAPIAdapter(m.api)
pcp := sealing.NewBasicPreCommitPolicy(adaptedAPI, miner.MaxSectorExpirationExtension-(miner.WPoStProvingPeriod*2), md.PeriodStart%miner.WPoStProvingPeriod) pcp := sealing.NewBasicPreCommitPolicy(adaptedAPI, miner.MaxSectorExpirationExtension-(miner.WPoStProvingPeriod*2), md.PeriodStart%miner.WPoStProvingPeriod)
m.sealing = sealing.New(adaptedAPI, fc, NewEventsAdapter(evts), m.maddr, m.ds, m.sealer, m.sc, m.verif, &pcp, sealing.GetSealingDelayFunc(m.getSealDelay)) m.sealing = sealing.New(adaptedAPI, fc, NewEventsAdapter(evts), m.maddr, m.ds, m.sealer, m.sc, m.verif, &pcp, sealing.GetSealingConfigFunc(m.getSealConfig))
go m.sealing.Run(ctx) //nolint:errcheck // logged intside the function go m.sealing.Run(ctx) //nolint:errcheck // logged intside the function

View File

@ -88,7 +88,7 @@ func (s *WindowPoStScheduler) Run(ctx context.Context) {
if notifs == nil { if notifs == nil {
notifs, err = s.api.ChainNotify(ctx) notifs, err = s.api.ChainNotify(ctx)
if err != nil { if err != nil {
log.Errorf("ChainNotify error: %+v") log.Errorf("ChainNotify error: %+v", err)
build.Clock.Sleep(10 * time.Second) build.Clock.Sleep(10 * time.Second)
continue continue