Merge branch 'master' into badger-viewable

This commit is contained in:
Raúl Kripalani 2020-11-01 13:10:56 +00:00
commit a16d7f221e
51 changed files with 1260 additions and 244 deletions

View File

@ -344,6 +344,8 @@ type FullNode interface {
StateReadState(ctx context.Context, actor address.Address, tsk types.TipSetKey) (*ActorState, error)
// StateListMessages looks back and returns all messages with a matching to or from address, stopping at the given height.
StateListMessages(ctx context.Context, match *MessageMatch, tsk types.TipSetKey, toht abi.ChainEpoch) ([]cid.Cid, error)
// StateDecodeParams attempts to decode the provided params, based on the recipient actor address and method number.
StateDecodeParams(ctx context.Context, toAddr address.Address, method abi.MethodNum, params []byte, tsk types.TipSetKey) (interface{}, error)
// StateNetworkName returns the name of the network the node is synced to
StateNetworkName(context.Context) (dtypes.NetworkName, error)

View File

@ -70,7 +70,7 @@ type StorageMiner interface {
storiface.WorkerReturn
// SealingSchedDiag dumps internal sealing scheduler state
SealingSchedDiag(context.Context) (interface{}, error)
SealingSchedDiag(ctx context.Context, doSched bool) (interface{}, error)
stores.SectorIndex

View File

@ -28,5 +28,19 @@ type WorkerAPI interface {
StorageAddLocal(ctx context.Context, path string) error
// SetEnabled marks the worker as enabled/disabled. Not that this setting
// may take a few seconds to propagate to task scheduler
SetEnabled(ctx context.Context, enabled bool) error
Enabled(ctx context.Context) (bool, error)
// WaitQuiet blocks until there are no tasks running
WaitQuiet(ctx context.Context) error
// returns a random UUID of worker session, generated randomly when worker
// process starts
ProcessSession(context.Context) (uuid.UUID, error)
// Like ProcessSession, but returns an error when worker is disabled
Session(context.Context) (uuid.UUID, error)
}

View File

@ -215,6 +215,7 @@ type FullNodeStruct struct {
StateGetReceipt func(context.Context, cid.Cid, types.TipSetKey) (*types.MessageReceipt, error) `perm:"read"`
StateMinerSectorCount func(context.Context, address.Address, types.TipSetKey) (api.MinerSectors, error) `perm:"read"`
StateListMessages func(ctx context.Context, match *api.MessageMatch, tsk types.TipSetKey, toht abi.ChainEpoch) ([]cid.Cid, error) `perm:"read"`
StateDecodeParams func(context.Context, address.Address, abi.MethodNum, []byte, types.TipSetKey) (interface{}, error) `perm:"read"`
StateCompute func(context.Context, abi.ChainEpoch, []*types.Message, types.TipSetKey) (*api.ComputeStateOutput, error) `perm:"read"`
StateVerifierStatus func(context.Context, address.Address, types.TipSetKey) (*abi.StoragePower, error) `perm:"read"`
StateVerifiedClientStatus func(context.Context, address.Address, types.TipSetKey) (*abi.StoragePower, error) `perm:"read"`
@ -321,7 +322,7 @@ type StorageMinerStruct struct {
ReturnReadPiece func(ctx context.Context, callID storiface.CallID, ok bool, err string) error `perm:"admin" retry:"true"`
ReturnFetch func(ctx context.Context, callID storiface.CallID, err string) error `perm:"admin" retry:"true"`
SealingSchedDiag func(context.Context) (interface{}, error) `perm:"admin"`
SealingSchedDiag func(context.Context, bool) (interface{}, error) `perm:"admin"`
StorageList func(context.Context) (map[stores.ID][]stores.Decl, error) `perm:"admin"`
StorageLocal func(context.Context) (map[stores.ID]string, error) `perm:"admin"`
@ -385,7 +386,13 @@ type WorkerStruct struct {
Remove func(ctx context.Context, sector abi.SectorID) error `perm:"admin"`
StorageAddLocal func(ctx context.Context, path string) error `perm:"admin"`
Session func(context.Context) (uuid.UUID, error) `perm:"admin"`
SetEnabled func(ctx context.Context, enabled bool) error `perm:"admin"`
Enabled func(ctx context.Context) (bool, error) `perm:"admin"`
WaitQuiet func(ctx context.Context) error `perm:"admin"`
ProcessSession func(context.Context) (uuid.UUID, error) `perm:"admin"`
Session func(context.Context) (uuid.UUID, error) `perm:"admin"`
}
}
@ -1014,6 +1021,10 @@ func (c *FullNodeStruct) StateListMessages(ctx context.Context, match *api.Messa
return c.Internal.StateListMessages(ctx, match, tsk, toht)
}
func (c *FullNodeStruct) StateDecodeParams(ctx context.Context, toAddr address.Address, method abi.MethodNum, params []byte, tsk types.TipSetKey) (interface{}, error) {
return c.Internal.StateDecodeParams(ctx, toAddr, method, params, tsk)
}
func (c *FullNodeStruct) StateCompute(ctx context.Context, height abi.ChainEpoch, msgs []*types.Message, tsk types.TipSetKey) (*api.ComputeStateOutput, error) {
return c.Internal.StateCompute(ctx, height, msgs, tsk)
}
@ -1298,8 +1309,8 @@ func (c *StorageMinerStruct) ReturnFetch(ctx context.Context, callID storiface.C
return c.Internal.ReturnFetch(ctx, callID, err)
}
func (c *StorageMinerStruct) SealingSchedDiag(ctx context.Context) (interface{}, error) {
return c.Internal.SealingSchedDiag(ctx)
func (c *StorageMinerStruct) SealingSchedDiag(ctx context.Context, doSched bool) (interface{}, error) {
return c.Internal.SealingSchedDiag(ctx, doSched)
}
func (c *StorageMinerStruct) StorageAttach(ctx context.Context, si stores.StorageInfo, st fsutil.FsStat) error {
@ -1544,6 +1555,22 @@ func (w *WorkerStruct) StorageAddLocal(ctx context.Context, path string) error {
return w.Internal.StorageAddLocal(ctx, path)
}
func (w *WorkerStruct) SetEnabled(ctx context.Context, enabled bool) error {
return w.Internal.SetEnabled(ctx, enabled)
}
func (w *WorkerStruct) Enabled(ctx context.Context) (bool, error) {
return w.Internal.Enabled(ctx)
}
func (w *WorkerStruct) WaitQuiet(ctx context.Context) error {
return w.Internal.WaitQuiet(ctx)
}
func (w *WorkerStruct) ProcessSession(ctx context.Context) (uuid.UUID, error) {
return w.Internal.ProcessSession(ctx)
}
func (w *WorkerStruct) Session(ctx context.Context) (uuid.UUID, error) {
return w.Internal.Session(ctx)
}

View File

@ -0,0 +1,117 @@
package power
import (
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
cbg "github.com/whyrusleeping/cbor-gen"
"github.com/filecoin-project/lotus/chain/actors/adt"
)
type ClaimChanges struct {
Added []ClaimInfo
Modified []ClaimModification
Removed []ClaimInfo
}
type ClaimModification struct {
Miner address.Address
From Claim
To Claim
}
type ClaimInfo struct {
Miner address.Address
Claim Claim
}
func DiffClaims(pre, cur State) (*ClaimChanges, error) {
results := new(ClaimChanges)
prec, err := pre.claims()
if err != nil {
return nil, err
}
curc, err := cur.claims()
if err != nil {
return nil, err
}
if err := adt.DiffAdtMap(prec, curc, &claimDiffer{results, pre, cur}); err != nil {
return nil, err
}
return results, nil
}
type claimDiffer struct {
Results *ClaimChanges
pre, after State
}
func (c *claimDiffer) AsKey(key string) (abi.Keyer, error) {
addr, err := address.NewFromBytes([]byte(key))
if err != nil {
return nil, err
}
return abi.AddrKey(addr), nil
}
func (c *claimDiffer) Add(key string, val *cbg.Deferred) error {
ci, err := c.after.decodeClaim(val)
if err != nil {
return err
}
addr, err := address.NewFromBytes([]byte(key))
if err != nil {
return err
}
c.Results.Added = append(c.Results.Added, ClaimInfo{
Miner: addr,
Claim: ci,
})
return nil
}
func (c *claimDiffer) Modify(key string, from, to *cbg.Deferred) error {
ciFrom, err := c.pre.decodeClaim(from)
if err != nil {
return err
}
ciTo, err := c.after.decodeClaim(to)
if err != nil {
return err
}
addr, err := address.NewFromBytes([]byte(key))
if err != nil {
return err
}
if ciFrom != ciTo {
c.Results.Modified = append(c.Results.Modified, ClaimModification{
Miner: addr,
From: ciFrom,
To: ciTo,
})
}
return nil
}
func (c *claimDiffer) Remove(key string, val *cbg.Deferred) error {
ci, err := c.after.decodeClaim(val)
if err != nil {
return err
}
addr, err := address.NewFromBytes([]byte(key))
if err != nil {
return err
}
c.Results.Removed = append(c.Results.Removed, ClaimInfo{
Miner: addr,
Claim: ci,
})
return nil
}

View File

@ -4,6 +4,7 @@ import (
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/big"
"github.com/ipfs/go-cid"
cbg "github.com/whyrusleeping/cbor-gen"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-state-types/abi"
@ -56,6 +57,11 @@ type State interface {
MinerNominalPowerMeetsConsensusMinimum(address.Address) (bool, error)
ListAllMiners() ([]address.Address, error)
ForEachClaim(func(miner address.Address, claim Claim) error) error
ClaimsChanged(State) (bool, error)
// Diff helpers. Used by Diff* functions internally.
claims() (adt.Map, error)
decodeClaim(*cbg.Deferred) (Claim, error)
}
type Claim struct {

View File

@ -1,9 +1,12 @@
package power
import (
"bytes"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
"github.com/ipfs/go-cid"
cbg "github.com/whyrusleeping/cbor-gen"
"github.com/filecoin-project/lotus/chain/actors/adt"
"github.com/filecoin-project/lotus/chain/actors/builtin"
@ -115,3 +118,28 @@ func (s *state0) ForEachClaim(cb func(miner address.Address, claim Claim) error)
})
})
}
func (s *state0) ClaimsChanged(other State) (bool, error) {
other0, ok := other.(*state0)
if !ok {
// treat an upgrade as a change, always
return true, nil
}
return !s.State.Claims.Equals(other0.State.Claims), nil
}
func (s *state0) claims() (adt.Map, error) {
return adt0.AsMap(s.store, s.Claims)
}
func (s *state0) decodeClaim(val *cbg.Deferred) (Claim, error) {
var ci power0.Claim
if err := ci.UnmarshalCBOR(bytes.NewReader(val.Raw)); err != nil {
return Claim{}, err
}
return fromV0Claim(ci), nil
}
func fromV0Claim(v0 power0.Claim) Claim {
return (Claim)(v0)
}

View File

@ -1,9 +1,12 @@
package power
import (
"bytes"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
"github.com/ipfs/go-cid"
cbg "github.com/whyrusleeping/cbor-gen"
"github.com/filecoin-project/lotus/chain/actors/adt"
"github.com/filecoin-project/lotus/chain/actors/builtin"
@ -115,3 +118,31 @@ func (s *state2) ForEachClaim(cb func(miner address.Address, claim Claim) error)
})
})
}
func (s *state2) ClaimsChanged(other State) (bool, error) {
other2, ok := other.(*state2)
if !ok {
// treat an upgrade as a change, always
return true, nil
}
return !s.State.Claims.Equals(other2.State.Claims), nil
}
func (s *state2) claims() (adt.Map, error) {
return adt2.AsMap(s.store, s.Claims)
}
func (s *state2) decodeClaim(val *cbg.Deferred) (Claim, error) {
var ci power2.Claim
if err := ci.UnmarshalCBOR(bytes.NewReader(val.Raw)); err != nil {
return Claim{}, err
}
return fromV2Claim(ci), nil
}
func fromV2Claim(v2 power2.Claim) Claim {
return Claim{
RawBytePower: v2.RawBytePower,
QualityAdjPower: v2.QualityAdjPower,
}
}

View File

@ -459,7 +459,7 @@ type messageEvents struct {
hcAPI headChangeAPI
lk sync.RWMutex
matchers map[triggerID][]MsgMatchFunc
matchers map[triggerID]MsgMatchFunc
}
func newMessageEvents(ctx context.Context, hcAPI headChangeAPI, cs eventAPI) messageEvents {
@ -467,7 +467,7 @@ func newMessageEvents(ctx context.Context, hcAPI headChangeAPI, cs eventAPI) mes
ctx: ctx,
cs: cs,
hcAPI: hcAPI,
matchers: map[triggerID][]MsgMatchFunc{},
matchers: make(map[triggerID]MsgMatchFunc),
}
}
@ -482,32 +482,23 @@ func (me *messageEvents) checkNewCalls(ts *types.TipSet) (map[triggerID]eventDat
me.lk.RLock()
defer me.lk.RUnlock()
// For each message in the tipset
res := make(map[triggerID]eventData)
me.messagesForTs(pts, func(msg *types.Message) {
// TODO: provide receipts
for tid, matchFns := range me.matchers {
var matched bool
var once bool
for _, matchFn := range matchFns {
matchOne, ok, err := matchFn(msg)
if err != nil {
log.Errorf("event matcher failed: %s", err)
continue
}
matched = ok
once = matchOne
if matched {
break
}
// Run each trigger's matcher against the message
for tid, matchFn := range me.matchers {
matched, err := matchFn(msg)
if err != nil {
log.Errorf("event matcher failed: %s", err)
continue
}
// If there was a match, include the message in the results for the
// trigger
if matched {
res[tid] = msg
if once {
break
}
}
}
})
@ -555,7 +546,7 @@ func (me *messageEvents) messagesForTs(ts *types.TipSet, consume func(*types.Mes
// `curH`-`ts.Height` = `confidence`
type MsgHandler func(msg *types.Message, rec *types.MessageReceipt, ts *types.TipSet, curH abi.ChainEpoch) (more bool, err error)
type MsgMatchFunc func(msg *types.Message) (matchOnce bool, matched bool, err error)
type MsgMatchFunc func(msg *types.Message) (matched bool, err error)
// Called registers a callback which is triggered when a specified method is
// called on an actor, or a timeout is reached.
@ -607,7 +598,7 @@ func (me *messageEvents) Called(check CheckFunc, msgHnd MsgHandler, rev RevertHa
me.lk.Lock()
defer me.lk.Unlock()
me.matchers[id] = append(me.matchers[id], mf)
me.matchers[id] = mf
return nil
}

View File

@ -572,9 +572,9 @@ func TestAtChainedConfidenceNull(t *testing.T) {
require.Equal(t, false, reverted)
}
func matchAddrMethod(to address.Address, m abi.MethodNum) func(msg *types.Message) (matchOnce bool, matched bool, err error) {
return func(msg *types.Message) (matchOnce bool, matched bool, err error) {
return true, to == msg.To && m == msg.Method, nil
func matchAddrMethod(to address.Address, m abi.MethodNum) func(msg *types.Message) (matched bool, err error) {
return func(msg *types.Message) (matched bool, err error) {
return to == msg.To && m == msg.Method, nil
}
}

View File

@ -0,0 +1,69 @@
package test
import (
"context"
"sync"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/lib/blockstore"
"github.com/ipfs/go-cid"
"golang.org/x/xerrors"
)
type MockAPI struct {
bs blockstore.Blockstore
lk sync.Mutex
ts map[types.TipSetKey]*types.Actor
stateGetActorCalled int
}
func NewMockAPI(bs blockstore.Blockstore) *MockAPI {
return &MockAPI{
bs: bs,
ts: make(map[types.TipSetKey]*types.Actor),
}
}
func (m *MockAPI) ChainHasObj(ctx context.Context, c cid.Cid) (bool, error) {
return m.bs.Has(c)
}
func (m *MockAPI) ChainReadObj(ctx context.Context, c cid.Cid) ([]byte, error) {
blk, err := m.bs.Get(c)
if err != nil {
return nil, xerrors.Errorf("blockstore get: %w", err)
}
return blk.RawData(), nil
}
func (m *MockAPI) StateGetActor(ctx context.Context, actor address.Address, tsk types.TipSetKey) (*types.Actor, error) {
m.lk.Lock()
defer m.lk.Unlock()
m.stateGetActorCalled++
return m.ts[tsk], nil
}
func (m *MockAPI) StateGetActorCallCount() int {
m.lk.Lock()
defer m.lk.Unlock()
return m.stateGetActorCalled
}
func (m *MockAPI) ResetCallCounts() {
m.lk.Lock()
defer m.lk.Unlock()
m.stateGetActorCalled = 0
}
func (m *MockAPI) SetActor(tsk types.TipSetKey, act *types.Actor) {
m.lk.Lock()
defer m.lk.Unlock()
m.ts[tsk] = act
}

View File

@ -0,0 +1,32 @@
package test
import (
"context"
"testing"
"github.com/filecoin-project/go-state-types/abi"
"github.com/ipfs/go-cid"
"github.com/filecoin-project/specs-actors/v2/actors/builtin/market"
"github.com/filecoin-project/specs-actors/v2/actors/util/adt"
"github.com/stretchr/testify/require"
)
func CreateEmptyMarketState(t *testing.T, store adt.Store) *market.State {
emptyArrayCid, err := adt.MakeEmptyArray(store).Root()
require.NoError(t, err)
emptyMap, err := adt.MakeEmptyMap(store).Root()
require.NoError(t, err)
return market.ConstructState(emptyArrayCid, emptyMap, emptyMap)
}
func CreateDealAMT(ctx context.Context, t *testing.T, store adt.Store, deals map[abi.DealID]*market.DealState) cid.Cid {
root := adt.MakeEmptyArray(store)
for dealID, dealState := range deals {
err := root.Set(uint64(dealID), dealState)
require.NoError(t, err)
}
rootCid, err := root.Root()
require.NoError(t, err)
return rootCid
}

View File

@ -0,0 +1,27 @@
package test
import (
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/crypto"
"github.com/filecoin-project/lotus/chain/types"
"github.com/ipfs/go-cid"
)
var dummyCid cid.Cid
func init() {
dummyCid, _ = cid.Parse("bafkqaaa")
}
func MockTipset(minerAddr address.Address, timestamp uint64) (*types.TipSet, error) {
return types.NewTipSet([]*types.BlockHeader{{
Miner: minerAddr,
Height: 5,
ParentStateRoot: dummyCid,
Messages: dummyCid,
ParentMessageReceipts: dummyCid,
BlockSig: &crypto.Signature{Type: crypto.SigTypeBLS},
BLSAggregate: &crypto.Signature{Type: crypto.SigTypeBLS},
Timestamp: timestamp,
}})
}

View File

@ -4,21 +4,19 @@ import (
"context"
"testing"
test "github.com/filecoin-project/lotus/chain/events/state/mock"
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
"github.com/filecoin-project/go-bitfield"
"github.com/stretchr/testify/require"
"golang.org/x/xerrors"
"github.com/ipfs/go-cid"
cbornode "github.com/ipfs/go-ipld-cbor"
"github.com/stretchr/testify/require"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/big"
"github.com/filecoin-project/go-state-types/crypto"
builtin2 "github.com/filecoin-project/specs-actors/v2/actors/builtin"
market2 "github.com/filecoin-project/specs-actors/v2/actors/builtin/market"
miner2 "github.com/filecoin-project/specs-actors/v2/actors/builtin/miner"
@ -36,39 +34,6 @@ func init() {
dummyCid, _ = cid.Parse("bafkqaaa")
}
type mockAPI struct {
ts map[types.TipSetKey]*types.Actor
bs bstore.Blockstore
}
func newMockAPI(bs bstore.Blockstore) *mockAPI {
return &mockAPI{
bs: bs,
ts: make(map[types.TipSetKey]*types.Actor),
}
}
func (m mockAPI) ChainHasObj(ctx context.Context, c cid.Cid) (bool, error) {
return m.bs.Has(c)
}
func (m mockAPI) ChainReadObj(ctx context.Context, c cid.Cid) ([]byte, error) {
blk, err := m.bs.Get(c)
if err != nil {
return nil, xerrors.Errorf("blockstore get: %w", err)
}
return blk.RawData(), nil
}
func (m mockAPI) StateGetActor(ctx context.Context, actor address.Address, tsk types.TipSetKey) (*types.Actor, error) {
return m.ts[tsk], nil
}
func (m mockAPI) setActor(tsk types.TipSetKey, act *types.Actor) {
m.ts[tsk] = act
}
func TestMarketPredicates(t *testing.T) {
ctx := context.Background()
bs := bstore.NewTemporarySync()
@ -177,14 +142,14 @@ func TestMarketPredicates(t *testing.T) {
minerAddr, err := address.NewFromString("t00")
require.NoError(t, err)
oldState, err := mockTipset(minerAddr, 1)
oldState, err := test.MockTipset(minerAddr, 1)
require.NoError(t, err)
newState, err := mockTipset(minerAddr, 2)
newState, err := test.MockTipset(minerAddr, 2)
require.NoError(t, err)
api := newMockAPI(bs)
api.setActor(oldState.Key(), &types.Actor{Code: builtin2.StorageMarketActorCodeID, Head: oldStateC})
api.setActor(newState.Key(), &types.Actor{Code: builtin2.StorageMarketActorCodeID, Head: newStateC})
api := test.NewMockAPI(bs)
api.SetActor(oldState.Key(), &types.Actor{Code: builtin2.StorageMarketActorCodeID, Head: oldStateC})
api.SetActor(newState.Key(), &types.Actor{Code: builtin2.StorageMarketActorCodeID, Head: newStateC})
t.Run("deal ID predicate", func(t *testing.T) {
preds := NewStatePredicates(api)
@ -239,7 +204,7 @@ func TestMarketPredicates(t *testing.T) {
t.Fatal("No state change so this should not be called")
return false, nil, nil
})
marketState0 := createEmptyMarketState(t, store)
marketState0 := test.CreateEmptyMarketState(t, store)
marketCid, err := store.Put(ctx, marketState0)
require.NoError(t, err)
marketState, err := market.Load(store, &types.Actor{
@ -352,7 +317,7 @@ func TestMarketPredicates(t *testing.T) {
t.Fatal("No state change so this should not be called")
return false, nil, nil
})
marketState0 := createEmptyMarketState(t, store)
marketState0 := test.CreateEmptyMarketState(t, store)
marketCid, err := store.Put(ctx, marketState0)
require.NoError(t, err)
marketState, err := market.Load(store, &types.Actor{
@ -394,14 +359,14 @@ func TestMinerSectorChange(t *testing.T) {
newMinerC := createMinerState(ctx, t, store, owner, worker, []miner.SectorOnChainInfo{si1Ext, si2, si3})
minerAddr := nextIDAddrF()
oldState, err := mockTipset(minerAddr, 1)
oldState, err := test.MockTipset(minerAddr, 1)
require.NoError(t, err)
newState, err := mockTipset(minerAddr, 2)
newState, err := test.MockTipset(minerAddr, 2)
require.NoError(t, err)
api := newMockAPI(bs)
api.setActor(oldState.Key(), &types.Actor{Head: oldMinerC, Code: builtin2.StorageMinerActorCodeID})
api.setActor(newState.Key(), &types.Actor{Head: newMinerC, Code: builtin2.StorageMinerActorCodeID})
api := test.NewMockAPI(bs)
api.SetActor(oldState.Key(), &types.Actor{Head: oldMinerC, Code: builtin2.StorageMinerActorCodeID})
api.SetActor(newState.Key(), &types.Actor{Head: newMinerC, Code: builtin2.StorageMinerActorCodeID})
preds := NewStatePredicates(api)
@ -449,29 +414,16 @@ func TestMinerSectorChange(t *testing.T) {
require.Equal(t, si1Ext, sectorChanges.Extended[0].From)
}
func mockTipset(minerAddr address.Address, timestamp uint64) (*types.TipSet, error) {
return types.NewTipSet([]*types.BlockHeader{{
Miner: minerAddr,
Height: 5,
ParentStateRoot: dummyCid,
Messages: dummyCid,
ParentMessageReceipts: dummyCid,
BlockSig: &crypto.Signature{Type: crypto.SigTypeBLS},
BLSAggregate: &crypto.Signature{Type: crypto.SigTypeBLS},
Timestamp: timestamp,
}})
}
type balance struct {
available abi.TokenAmount
locked abi.TokenAmount
}
func createMarketState(ctx context.Context, t *testing.T, store adt2.Store, deals map[abi.DealID]*market2.DealState, props map[abi.DealID]*market2.DealProposal, balances map[address.Address]balance) cid.Cid {
dealRootCid := createDealAMT(ctx, t, store, deals)
dealRootCid := test.CreateDealAMT(ctx, t, store, deals)
propRootCid := createProposalAMT(ctx, t, store, props)
balancesCids := createBalanceTable(ctx, t, store, balances)
state := createEmptyMarketState(t, store)
state := test.CreateEmptyMarketState(t, store)
state.States = dealRootCid
state.Proposals = propRootCid
state.EscrowTable = balancesCids[0]
@ -482,25 +434,6 @@ func createMarketState(ctx context.Context, t *testing.T, store adt2.Store, deal
return stateC
}
func createEmptyMarketState(t *testing.T, store adt2.Store) *market2.State {
emptyArrayCid, err := adt2.MakeEmptyArray(store).Root()
require.NoError(t, err)
emptyMap, err := adt2.MakeEmptyMap(store).Root()
require.NoError(t, err)
return market2.ConstructState(emptyArrayCid, emptyMap, emptyMap)
}
func createDealAMT(ctx context.Context, t *testing.T, store adt2.Store, deals map[abi.DealID]*market2.DealState) cid.Cid {
root := adt2.MakeEmptyArray(store)
for dealID, dealState := range deals {
err := root.Set(uint64(dealID), dealState)
require.NoError(t, err)
}
rootCid, err := root.Root()
require.NoError(t, err)
return rootCid
}
func createProposalAMT(ctx context.Context, t *testing.T, store adt2.Store, props map[abi.DealID]*market2.DealProposal) cid.Cid {
root := adt2.MakeEmptyArray(store)
for dealID, prop := range props {

View File

@ -34,11 +34,11 @@ func (me *messageEvents) CheckMsg(ctx context.Context, smsg types.ChainMsg, hnd
}
func (me *messageEvents) MatchMsg(inmsg *types.Message) MsgMatchFunc {
return func(msg *types.Message) (matchOnce bool, matched bool, err error) {
return func(msg *types.Message) (matched bool, err error) {
if msg.From == inmsg.From && msg.Nonce == inmsg.Nonce && !inmsg.Equals(msg) {
return true, false, xerrors.Errorf("matching msg %s from %s, nonce %d: got duplicate origin/nonce msg %d", inmsg.Cid(), inmsg.From, inmsg.Nonce, msg.Nonce)
return false, xerrors.Errorf("matching msg %s from %s, nonce %d: got duplicate origin/nonce msg %d", inmsg.Cid(), inmsg.From, inmsg.Nonce, msg.Nonce)
}
return true, inmsg.Equals(msg), nil
return inmsg.Equals(msg), nil
}
}

View File

@ -59,8 +59,6 @@ var MaxUntrustedActorPendingMessages = 10
var MaxNonceGap = uint64(4)
var DefaultMaxFee = abi.TokenAmount(types.MustParseFIL("0.007"))
var (
ErrMessageTooBig = errors.New("message too big")
@ -183,9 +181,15 @@ func ComputeMinRBF(curPrem abi.TokenAmount) abi.TokenAmount {
return types.BigAdd(minPrice, types.NewInt(1))
}
func CapGasFee(msg *types.Message, maxFee abi.TokenAmount) {
func CapGasFee(mff dtypes.DefaultMaxFeeFunc, msg *types.Message, maxFee abi.TokenAmount) {
if maxFee.Equals(big.Zero()) {
maxFee = DefaultMaxFee
mf, err := mff()
if err != nil {
log.Errorf("failed to get default max gas fee: %+v", err)
mf = big.Zero()
}
maxFee = mf
}
gl := types.NewInt(uint64(msg.GasLimit))

View File

@ -752,11 +752,10 @@ func (mp *MessagePool) createMessageChains(actor address.Address, mset map[uint6
balance = new(big.Int).Sub(balance, required)
value := m.Message.Value.Int
if balance.Cmp(value) >= 0 {
// Note: we only account for the value if the balance doesn't drop below 0
// otherwise the message will fail and the miner can reap the gas rewards
balance = new(big.Int).Sub(balance, value)
if balance.Cmp(value) < 0 {
break
}
balance = new(big.Int).Sub(balance, value)
gasReward := mp.getGasReward(m, baseFee)
rewards = append(rewards, gasReward)

View File

@ -614,6 +614,14 @@ func GetReturnType(ctx context.Context, sm *StateManager, to address.Address, me
return reflect.New(m.Ret.Elem()).Interface().(cbg.CBORUnmarshaler), nil
}
func GetParamType(actCode cid.Cid, method abi.MethodNum) (cbg.CBORUnmarshaler, error) {
m, found := MethodsMap[actCode][method]
if !found {
return nil, fmt.Errorf("unknown method %d for actor %s", method, actCode)
}
return reflect.New(m.Params.Elem()).Interface().(cbg.CBORUnmarshaler), nil
}
func minerHasMinPower(ctx context.Context, sm *StateManager, addr address.Address, ts *types.TipSet) (bool, error) {
pact, err := sm.LoadActor(ctx, power.Address, ts)
if err != nil {

View File

@ -126,6 +126,7 @@ var prices = map[abi.ChainEpoch]Pricelist{
scale: 85639,
},
},
verifyPostDiscount: true,
verifyConsensusFault: 495422,
},
}

View File

@ -90,6 +90,7 @@ type pricelistV0 struct {
computeUnsealedSectorCidBase int64
verifySealBase int64
verifyPostLookup map[abi.RegisteredPoStProof]scalingCost
verifyPostDiscount bool
verifyConsensusFault int64
}
@ -201,7 +202,9 @@ func (pl *pricelistV0) OnVerifyPost(info proof2.WindowPoStVerifyInfo) GasCharge
}
gasUsed := cost.flat + int64(len(info.ChallengedSectors))*cost.scale
gasUsed /= 2 // XXX: this is an artificial discount
if pl.verifyPostDiscount {
gasUsed /= 2 // XXX: this is an artificial discount
}
return newGasCharge("OnVerifyPost", gasUsed, 0).
WithExtra(map[string]interface{}{

View File

@ -19,6 +19,7 @@ import (
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/messagepool"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/node/config"
)
var mpoolCmd = &cli.Command{
@ -434,7 +435,12 @@ var mpoolReplaceCmd = &cli.Command{
msg.GasPremium = big.Max(retm.GasPremium, minRBF)
msg.GasFeeCap = big.Max(retm.GasFeeCap, msg.GasPremium)
messagepool.CapGasFee(&msg, mss.Get().MaxFee)
mff := func() (abi.TokenAmount, error) {
return abi.TokenAmount(config.DefaultDefaultMaxFee), nil
}
messagepool.CapGasFee(mff, &msg, mss.Get().MaxFee)
} else {
msg.GasLimit = cctx.Int64("gas-limit")
msg.GasPremium, err = types.BigFromString(cctx.String("gas-premium"))

View File

@ -15,13 +15,13 @@ import (
"strings"
"time"
"github.com/fatih/color"
"github.com/filecoin-project/lotus/chain/actors/builtin"
"github.com/multiformats/go-multiaddr"
"github.com/ipfs/go-cid"
cbor "github.com/ipfs/go-ipld-cbor"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/multiformats/go-multiaddr"
"github.com/multiformats/go-multihash"
"github.com/urfave/cli/v2"
cbg "github.com/whyrusleeping/cbor-gen"
@ -107,13 +107,18 @@ var stateMinerInfo = &cli.Command{
return err
}
availableBalance, err := api.StateMinerAvailableBalance(ctx, addr, ts.Key())
if err != nil {
return xerrors.Errorf("getting miner available balance: %w", err)
}
fmt.Printf("Available Balance: %s\n", types.FIL(availableBalance))
fmt.Printf("Owner:\t%s\n", mi.Owner)
fmt.Printf("Worker:\t%s\n", mi.Worker)
for i, controlAddress := range mi.ControlAddresses {
fmt.Printf("Control %d: \t%s\n", i, controlAddress)
}
fmt.Printf("PeerID:\t%s\n", mi.PeerId)
fmt.Printf("SectorSize:\t%s (%d)\n", types.SizeStr(types.NewInt(uint64(mi.SectorSize))), mi.SectorSize)
fmt.Printf("Multiaddrs: \t")
for _, addr := range mi.Multiaddrs {
a, err := multiaddr.NewMultiaddrBytes(addr)
@ -122,6 +127,26 @@ var stateMinerInfo = &cli.Command{
}
fmt.Printf("%s ", a)
}
fmt.Printf("SectorSize:\t%s (%d)\n", types.SizeStr(types.NewInt(uint64(mi.SectorSize))), mi.SectorSize)
pow, err := api.StateMinerPower(ctx, addr, ts.Key())
if err != nil {
return err
}
rpercI := types.BigDiv(types.BigMul(pow.MinerPower.RawBytePower, types.NewInt(1000000)), pow.TotalPower.RawBytePower)
qpercI := types.BigDiv(types.BigMul(pow.MinerPower.QualityAdjPower, types.NewInt(1000000)), pow.TotalPower.QualityAdjPower)
fmt.Printf("Byte Power: %s / %s (%0.4f%%)\n",
color.BlueString(types.SizeStr(pow.MinerPower.RawBytePower)),
types.SizeStr(pow.TotalPower.RawBytePower),
float64(rpercI.Int64())/10000)
fmt.Printf("Actual Power: %s / %s (%0.4f%%)\n",
color.GreenString(types.DeciStr(pow.MinerPower.QualityAdjPower)),
types.DeciStr(pow.TotalPower.QualityAdjPower),
float64(qpercI.Int64())/10000)
fmt.Println()
cd, err := api.StateMinerProvingDeadline(ctx, addr, ts.Key())
@ -1299,12 +1324,11 @@ func sumGas(changes []*types.GasTrace) types.GasTrace {
}
func JsonParams(code cid.Cid, method abi.MethodNum, params []byte) (string, error) {
methodMeta, found := stmgr.MethodsMap[code][method]
if !found {
return "", fmt.Errorf("method %d not found on actor %s", method, code)
p, err := stmgr.GetParamType(code, method)
if err != nil {
return "", err
}
re := reflect.New(methodMeta.Params.Elem())
p := re.Interface().(cbg.CBORUnmarshaler)
if err := p.UnmarshalCBOR(bytes.NewReader(params)); err != nil {
return "", err
}

View File

@ -0,0 +1,51 @@
package main
import (
"github.com/urfave/cli/v2"
"golang.org/x/xerrors"
lcli "github.com/filecoin-project/lotus/cli"
)
var setCmd = &cli.Command{
Name: "set",
Usage: "Manage worker settings",
Flags: []cli.Flag{
&cli.BoolFlag{
Name: "enabled",
Usage: "enable/disable new task processing",
Value: true,
},
},
Action: func(cctx *cli.Context) error {
api, closer, err := lcli.GetWorkerAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := lcli.ReqContext(cctx)
if err := api.SetEnabled(ctx, cctx.Bool("enabled")); err != nil {
return xerrors.Errorf("SetEnabled: %w", err)
}
return nil
},
}
var waitQuietCmd = &cli.Command{
Name: "wait-quiet",
Usage: "Block until all running tasks exit",
Action: func(cctx *cli.Context) error {
api, closer, err := lcli.GetWorkerAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := lcli.ReqContext(cctx)
return api.WaitQuiet(ctx)
},
}

View File

@ -32,6 +32,18 @@ var infoCmd = &cli.Command{
cli.VersionPrinter(cctx)
fmt.Println()
sess, err := api.ProcessSession(ctx)
if err != nil {
return xerrors.Errorf("getting session: %w", err)
}
fmt.Printf("Session: %s\n", sess)
enabled, err := api.Enabled(ctx)
if err != nil {
return xerrors.Errorf("checking worker status: %w", err)
}
fmt.Printf("Enabled: %t", enabled)
info, err := api.Info(ctx)
if err != nil {
return xerrors.Errorf("getting info: %w", err)

View File

@ -58,6 +58,8 @@ func main() {
runCmd,
infoCmd,
storageCmd,
setCmd,
waitQuietCmd,
}
app := &cli.App{
@ -451,14 +453,24 @@ var runCmd = &cli.Command{
return xerrors.Errorf("getting miner session: %w", err)
}
waitQuietCh := func() chan struct{} {
out := make(chan struct{})
go func() {
workerApi.LocalWorker.WaitQuiet()
close(out)
}()
return out
}
go func() {
heartbeats := time.NewTicker(stores.HeartbeatInterval)
defer heartbeats.Stop()
var connected, reconnect bool
var redeclareStorage bool
var readyCh chan struct{}
for {
// If we're reconnecting, redeclare storage first
if reconnect {
if redeclareStorage {
log.Info("Redeclaring local storage")
if err := localStore.Redeclare(ctx); err != nil {
@ -471,14 +483,13 @@ var runCmd = &cli.Command{
}
continue
}
connected = false
}
log.Info("Making sure no local tasks are running")
// TODO: we could get rid of this, but that requires tracking resources for restarted tasks correctly
workerApi.LocalWorker.WaitQuiet()
if readyCh == nil {
log.Info("Making sure no local tasks are running")
readyCh = waitQuietCh()
}
for {
curSession, err := nodeApi.Session(ctx)
@ -489,29 +500,28 @@ var runCmd = &cli.Command{
minerSession = curSession
break
}
if !connected {
if err := nodeApi.WorkerConnect(ctx, "http://"+address+"/rpc/v0"); err != nil {
log.Errorf("Registering worker failed: %+v", err)
cancel()
return
}
log.Info("Worker registered successfully, waiting for tasks")
connected = true
}
}
select {
case <-readyCh:
if err := nodeApi.WorkerConnect(ctx, "http://"+address+"/rpc/v0"); err != nil {
log.Errorf("Registering worker failed: %+v", err)
cancel()
return
}
log.Info("Worker registered successfully, waiting for tasks")
readyCh = nil
case <-heartbeats.C:
case <-ctx.Done():
return // graceful shutdown
case <-heartbeats.C:
}
}
log.Errorf("LOTUS-MINER CONNECTION LOST")
reconnect = true
redeclareStorage = true
}
}()

View File

@ -2,7 +2,9 @@ package main
import (
"context"
"sync/atomic"
"github.com/google/uuid"
"github.com/mitchellh/go-homedir"
"golang.org/x/xerrors"
@ -17,6 +19,8 @@ type worker struct {
localStore *stores.Local
ls stores.LocalStorage
disabled int64
}
func (w *worker) Version(context.Context) (build.Version, error) {
@ -42,4 +46,34 @@ func (w *worker) StorageAddLocal(ctx context.Context, path string) error {
return nil
}
func (w *worker) SetEnabled(ctx context.Context, enabled bool) error {
disabled := int64(1)
if enabled {
disabled = 0
}
atomic.StoreInt64(&w.disabled, disabled)
return nil
}
func (w *worker) Enabled(ctx context.Context) (bool, error) {
return atomic.LoadInt64(&w.disabled) == 0, nil
}
func (w *worker) WaitQuiet(ctx context.Context) error {
w.LocalWorker.WaitQuiet() // uses WaitGroup under the hood so no ctx :/
return nil
}
func (w *worker) ProcessSession(ctx context.Context) (uuid.UUID, error) {
return w.LocalWorker.Session(ctx)
}
func (w *worker) Session(ctx context.Context) (uuid.UUID, error) {
if atomic.LoadInt64(&w.disabled) == 1 {
return uuid.UUID{}, xerrors.Errorf("worker disabled")
}
return w.LocalWorker.Session(ctx)
}
var _ storiface.WorkerCalls = &worker{}

View File

@ -1,17 +1,22 @@
package main
import (
"bufio"
"encoding/json"
"fmt"
"io"
"os"
"strings"
"github.com/docker/go-units"
"github.com/ipfs/go-datastore"
dsq "github.com/ipfs/go-datastore/query"
badgerds "github.com/ipfs/go-ds-badger2"
logging "github.com/ipfs/go-log"
"github.com/mitchellh/go-homedir"
"github.com/polydawn/refmt/cbor"
"github.com/urfave/cli/v2"
"go.uber.org/multierr"
"golang.org/x/xerrors"
"github.com/filecoin-project/lotus/lib/backupds"
@ -25,6 +30,7 @@ var datastoreCmd = &cli.Command{
datastoreBackupCmd,
datastoreListCmd,
datastoreGetCmd,
datastoreRewriteCmd,
},
}
@ -288,3 +294,65 @@ func printVal(enc string, val []byte) error {
return nil
}
var datastoreRewriteCmd = &cli.Command{
Name: "rewrite",
Description: "rewrites badger datastore to compact it and possibly change params",
ArgsUsage: "source destination",
Action: func(cctx *cli.Context) error {
if cctx.NArg() != 2 {
return xerrors.Errorf("expected 2 arguments, got %d", cctx.NArg())
}
fromPath, err := homedir.Expand(cctx.Args().Get(0))
if err != nil {
return xerrors.Errorf("cannot get fromPath: %w", err)
}
toPath, err := homedir.Expand(cctx.Args().Get(1))
if err != nil {
return xerrors.Errorf("cannot get toPath: %w", err)
}
opts := repo.ChainBadgerOptions()
opts.Options = opts.Options.WithSyncWrites(false)
to, err := badgerds.NewDatastore(toPath, &opts)
if err != nil {
return xerrors.Errorf("opennig 'to' datastore: %w", err)
}
opts.Options = opts.Options.WithReadOnly(false)
from, err := badgerds.NewDatastore(fromPath, &opts)
if err != nil {
return xerrors.Errorf("opennig 'from' datastore: %w", err)
}
pr, pw := io.Pipe()
errCh := make(chan error)
go func() {
bw := bufio.NewWriterSize(pw, 64<<20)
_, err := from.DB.Backup(bw, 0)
_ = bw.Flush()
_ = pw.CloseWithError(err)
errCh <- err
}()
go func() {
err := to.DB.Load(pr, 256)
errCh <- err
}()
err = <-errCh
if err != nil {
select {
case nerr := <-errCh:
err = multierr.Append(err, nerr)
default:
}
return err
}
err = <-errCh
if err != nil {
return err
}
return multierr.Append(from.Close(), to.Close())
},
}

View File

@ -0,0 +1,70 @@
package main
import (
"encoding/binary"
"fmt"
"math/rand"
"github.com/filecoin-project/lotus/chain/types"
lcli "github.com/filecoin-project/lotus/cli"
"github.com/urfave/cli/v2"
"golang.org/x/xerrors"
)
var electionCmd = &cli.Command{
Name: "election",
Usage: "commands related to leader election",
Subcommands: []*cli.Command{
electionRunDummy,
},
}
var electionRunDummy = &cli.Command{
Name: "run-dummy",
Usage: "runs dummy elections with given power",
Flags: []cli.Flag{
&cli.StringFlag{
Name: "network-power",
},
&cli.StringFlag{
Name: "miner-power",
},
&cli.Uint64Flag{
Name: "seed",
Value: 0,
},
},
Action: func(cctx *cli.Context) error {
ctx := lcli.ReqContext(cctx)
minerPow, err := types.BigFromString(cctx.String("miner-power"))
if err != nil {
return xerrors.Errorf("decoding miner-power: %w", err)
}
networkPow, err := types.BigFromString(cctx.String("network-power"))
if err != nil {
return xerrors.Errorf("decoding miner-power: %w", err)
}
ep := &types.ElectionProof{}
ep.VRFProof = make([]byte, 32)
seed := cctx.Uint64("seed")
if seed == 0 {
seed = rand.Uint64()
}
binary.BigEndian.PutUint64(ep.VRFProof, seed)
i := uint64(0)
for {
if ctx.Err() != nil {
return ctx.Err()
}
binary.BigEndian.PutUint64(ep.VRFProof[8:], i)
j := ep.ComputeWinCount(minerPow, networkPow)
_, err := fmt.Printf("%t, %d\n", j != 0, j)
if err != nil {
return err
}
i++
}
},
}

View File

@ -46,6 +46,7 @@ func main() {
ledgerCmd,
sectorsCmd,
msgCmd,
electionCmd,
}
app := &cli.App{

View File

@ -59,6 +59,24 @@ func infoCmdAct(cctx *cli.Context) error {
ctx := lcli.ReqContext(cctx)
fmt.Print("Full node: ")
head, err := api.ChainHead(ctx)
if err != nil {
return err
}
switch {
case time.Now().Unix()-int64(head.MinTimestamp()) < int64(build.BlockDelaySecs*3/2): // within 1.5 epochs
fmt.Printf("[%s]", color.GreenString("sync ok"))
case time.Now().Unix()-int64(head.MinTimestamp()) < int64(build.BlockDelaySecs*5): // within 5 epochs
fmt.Printf("[%s]", color.YellowString("sync slow (%s behind)", time.Now().Sub(time.Unix(int64(head.MinTimestamp()), 0)).Truncate(time.Second)))
default:
fmt.Printf("[%s]", color.RedString("sync behind! (%s behind)", time.Now().Sub(time.Unix(int64(head.MinTimestamp()), 0)).Truncate(time.Second)))
}
fmt.Println()
maddr, err := getActorAddress(ctx, nodeApi, cctx.String("actor"))
if err != nil {
return err

View File

@ -215,6 +215,11 @@ var sealingJobsCmd = &cli.Command{
var sealingSchedDiagCmd = &cli.Command{
Name: "sched-diag",
Usage: "Dump internal scheduler state",
Flags: []cli.Flag{
&cli.BoolFlag{
Name: "force-sched",
},
},
Action: func(cctx *cli.Context) error {
nodeApi, closer, err := lcli.GetStorageMinerAPI(cctx)
if err != nil {
@ -224,7 +229,7 @@ var sealingSchedDiagCmd = &cli.Command{
ctx := lcli.ReqContext(cctx)
st, err := nodeApi.SealingSchedDiag(ctx)
st, err := nodeApi.SealingSchedDiag(ctx, cctx.Bool("force-sched"))
if err != nil {
return err
}

View File

@ -140,6 +140,7 @@
* [StateCirculatingSupply](#StateCirculatingSupply)
* [StateCompute](#StateCompute)
* [StateDealProviderCollateralBounds](#StateDealProviderCollateralBounds)
* [StateDecodeParams](#StateDecodeParams)
* [StateGetActor](#StateGetActor)
* [StateGetReceipt](#StateGetReceipt)
* [StateListActors](#StateListActors)
@ -3387,6 +3388,31 @@ Response:
}
```
### StateDecodeParams
StateDecodeParams attempts to decode the provided params, based on the recipient actor address and method number.
Perms: read
Inputs:
```json
[
"f01234",
1,
"Ynl0ZSBhcnJheQ==",
[
{
"/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4"
},
{
"/": "bafy2bzacebp3shtrn43k7g3unredz7fxn4gj533d3o43tqn2p2ipxxhrvchve"
}
]
]
```
Response: `{}`
### StateGetActor
StateGetActor returns the indicated actor's nonce and balance.

View File

@ -679,7 +679,15 @@ func (m *Manager) FsStat(ctx context.Context, id stores.ID) (fsutil.FsStat, erro
return m.storage.FsStat(ctx, id)
}
func (m *Manager) SchedDiag(ctx context.Context) (interface{}, error) {
func (m *Manager) SchedDiag(ctx context.Context, doSched bool) (interface{}, error) {
if doSched {
select {
case m.sched.workerChange <- struct{}{}:
case <-ctx.Done():
return nil, ctx.Err()
}
}
return m.sched.Info(ctx)
}

View File

@ -10,6 +10,7 @@ import (
"path/filepath"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
@ -376,3 +377,77 @@ func TestRestartWorker(t *testing.T) {
require.NoError(t, err)
require.Empty(t, uf)
}
func TestReenableWorker(t *testing.T) {
logging.SetAllLoggers(logging.LevelDebug)
stores.HeartbeatInterval = 5 * time.Millisecond
ctx, done := context.WithCancel(context.Background())
defer done()
ds := datastore.NewMapDatastore()
m, lstor, stor, idx, cleanup := newTestMgr(ctx, t, ds)
defer cleanup()
localTasks := []sealtasks.TaskType{
sealtasks.TTAddPiece, sealtasks.TTPreCommit1, sealtasks.TTCommit1, sealtasks.TTFinalize, sealtasks.TTFetch,
}
wds := datastore.NewMapDatastore()
arch := make(chan chan apres)
w := newLocalWorker(func() (ffiwrapper.Storage, error) {
return &testExec{apch: arch}, nil
}, WorkerConfig{
SealProof: 0,
TaskTypes: localTasks,
}, stor, lstor, idx, m, statestore.New(wds))
err := m.AddWorker(ctx, w)
require.NoError(t, err)
time.Sleep(time.Millisecond * 100)
i, _ := m.sched.Info(ctx)
require.Len(t, i.(SchedDiagInfo).OpenWindows, 2)
// disable
atomic.StoreInt64(&w.testDisable, 1)
for i := 0; i < 100; i++ {
if !m.WorkerStats()[w.session].Enabled {
break
}
time.Sleep(time.Millisecond * 3)
}
require.False(t, m.WorkerStats()[w.session].Enabled)
i, _ = m.sched.Info(ctx)
require.Len(t, i.(SchedDiagInfo).OpenWindows, 0)
// reenable
atomic.StoreInt64(&w.testDisable, 0)
for i := 0; i < 100; i++ {
if m.WorkerStats()[w.session].Enabled {
break
}
time.Sleep(time.Millisecond * 3)
}
require.True(t, m.WorkerStats()[w.session].Enabled)
for i := 0; i < 100; i++ {
info, _ := m.sched.Info(ctx)
if len(info.(SchedDiagInfo).OpenWindows) != 0 {
break
}
time.Sleep(time.Millisecond * 3)
}
i, _ = m.sched.Info(ctx)
require.Len(t, i.(SchedDiagInfo).OpenWindows, 2)
}

View File

@ -7,6 +7,7 @@ import (
"sync"
"time"
"github.com/google/uuid"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-state-types/abi"
@ -217,7 +218,7 @@ type SchedDiagRequestInfo struct {
type SchedDiagInfo struct {
Requests []SchedDiagRequestInfo
OpenWindows []WorkerID
OpenWindows []string
}
func (sh *scheduler) runSched() {
@ -324,7 +325,7 @@ func (sh *scheduler) diag() SchedDiagInfo {
defer sh.workersLk.RUnlock()
for _, window := range sh.openWindows {
out.OpenWindows = append(out.OpenWindows, window.worker)
out.OpenWindows = append(out.OpenWindows, uuid.UUID(window.worker).String())
}
return out

View File

@ -104,14 +104,16 @@ func (sw *schedWorker) handleWorker() {
defer sw.heartbeatTimer.Stop()
for {
sched.workersLk.Lock()
enabled := worker.enabled
sched.workersLk.Unlock()
{
sched.workersLk.Lock()
enabled := worker.enabled
sched.workersLk.Unlock()
// ask for more windows if we need them (non-blocking)
if enabled {
if !sw.requestWindows() {
return // graceful shutdown
// ask for more windows if we need them (non-blocking)
if enabled {
if !sw.requestWindows() {
return // graceful shutdown
}
}
}
@ -123,12 +125,16 @@ func (sw *schedWorker) handleWorker() {
}
// session looks good
if !enabled {
{
sched.workersLk.Lock()
enabled := worker.enabled
worker.enabled = true
sched.workersLk.Unlock()
// we'll send window requests on the next loop
if !enabled {
// go send window requests
break
}
}
// wait for more tasks to be assigned by the main scheduler or for the worker

View File

@ -8,6 +8,7 @@ import (
"reflect"
"runtime"
"sync"
"sync/atomic"
"time"
"github.com/elastic/go-sysinfo"
@ -51,8 +52,9 @@ type LocalWorker struct {
acceptTasks map[sealtasks.TaskType]struct{}
running sync.WaitGroup
session uuid.UUID
closing chan struct{}
session uuid.UUID
testDisable int64
closing chan struct{}
}
func newLocalWorker(executor ExecutorFunc, wcfg WorkerConfig, store stores.Store, local *stores.Local, sindex stores.SectorIndex, ret storiface.WorkerReturn, cst *statestore.StateStore) *LocalWorker {
@ -501,6 +503,10 @@ func (l *LocalWorker) Info(context.Context) (storiface.WorkerInfo, error) {
}
func (l *LocalWorker) Session(ctx context.Context) (uuid.UUID, error) {
if atomic.LoadInt64(&l.testDisable) == 1 {
return uuid.UUID{}, xerrors.Errorf("disabled")
}
select {
case <-l.closing:
return ClosedWorkerID, nil

View File

@ -38,8 +38,9 @@ type ClientNodeAdapter struct {
full.ChainAPI
full.MpoolAPI
fm *market.FundMgr
ev *events.Events
fm *market.FundMgr
ev *events.Events
dsMatcher *dealStateMatcher
}
type clientApi struct {
@ -47,14 +48,16 @@ type clientApi struct {
full.StateAPI
}
func NewClientNodeAdapter(state full.StateAPI, chain full.ChainAPI, mpool full.MpoolAPI, fm *market.FundMgr) storagemarket.StorageClientNode {
func NewClientNodeAdapter(stateapi full.StateAPI, chain full.ChainAPI, mpool full.MpoolAPI, fm *market.FundMgr) storagemarket.StorageClientNode {
capi := &clientApi{chain, stateapi}
return &ClientNodeAdapter{
StateAPI: state,
StateAPI: stateapi,
ChainAPI: chain,
MpoolAPI: mpool,
fm: fm,
ev: events.NewEvents(context.TODO(), &clientApi{chain, state}),
fm: fm,
ev: events.NewEvents(context.TODO(), capi),
dsMatcher: newDealStateMatcher(state.NewStatePredicates(capi)),
}
}
@ -263,44 +266,44 @@ func (c *ClientNodeAdapter) OnDealSectorCommitted(ctx context.Context, provider
var sectorNumber abi.SectorNumber
var sectorFound bool
matchEvent := func(msg *types.Message) (matchOnce bool, matched bool, err error) {
matchEvent := func(msg *types.Message) (matched bool, err error) {
if msg.To != provider {
return true, false, nil
return false, nil
}
switch msg.Method {
case miner2.MethodsMiner.PreCommitSector:
var params miner.SectorPreCommitInfo
if err := params.UnmarshalCBOR(bytes.NewReader(msg.Params)); err != nil {
return true, false, xerrors.Errorf("unmarshal pre commit: %w", err)
return false, xerrors.Errorf("unmarshal pre commit: %w", err)
}
for _, did := range params.DealIDs {
if did == dealId {
sectorNumber = params.SectorNumber
sectorFound = true
return true, false, nil
return false, nil
}
}
return true, false, nil
return false, nil
case miner2.MethodsMiner.ProveCommitSector:
var params miner.ProveCommitSectorParams
if err := params.UnmarshalCBOR(bytes.NewReader(msg.Params)); err != nil {
return true, false, xerrors.Errorf("failed to unmarshal prove commit sector params: %w", err)
return false, xerrors.Errorf("failed to unmarshal prove commit sector params: %w", err)
}
if !sectorFound {
return true, false, nil
return false, nil
}
if params.SectorNumber != sectorNumber {
return true, false, nil
return false, nil
}
return false, true, nil
return true, nil
default:
return true, false, nil
return false, nil
}
}
@ -389,13 +392,7 @@ func (c *ClientNodeAdapter) OnDealExpiredOrSlashed(ctx context.Context, dealID a
}
// Watch for state changes to the deal
preds := state.NewStatePredicates(c)
dealDiff := preds.OnStorageMarketActorChanged(
preds.OnDealStateChanged(
preds.DealStateChangedForIDs([]abi.DealID{dealID})))
match := func(oldTs, newTs *types.TipSet) (bool, events.StateChange, error) {
return dealDiff(ctx, oldTs.Key(), newTs.Key())
}
match := c.dsMatcher.matcher(ctx, dealID)
// Wait until after the end epoch for the deal and then timeout
timeout := (sd.Proposal.EndEpoch - head.Height()) + 1

View File

@ -0,0 +1,84 @@
package storageadapter
import (
"context"
"sync"
"github.com/filecoin-project/go-state-types/abi"
actorsmarket "github.com/filecoin-project/lotus/chain/actors/builtin/market"
"github.com/filecoin-project/lotus/chain/events"
"github.com/filecoin-project/lotus/chain/events/state"
"github.com/filecoin-project/lotus/chain/types"
)
// dealStateMatcher caches the DealStates for the most recent
// old/new tipset combination
type dealStateMatcher struct {
preds *state.StatePredicates
lk sync.Mutex
oldTsk types.TipSetKey
newTsk types.TipSetKey
oldDealStateRoot actorsmarket.DealStates
newDealStateRoot actorsmarket.DealStates
}
func newDealStateMatcher(preds *state.StatePredicates) *dealStateMatcher {
return &dealStateMatcher{preds: preds}
}
// matcher returns a function that checks if the state of the given dealID
// has changed.
// It caches the DealStates for the most recent old/new tipset combination.
func (mc *dealStateMatcher) matcher(ctx context.Context, dealID abi.DealID) events.StateMatchFunc {
// The function that is called to check if the deal state has changed for
// the target deal ID
dealStateChangedForID := mc.preds.DealStateChangedForIDs([]abi.DealID{dealID})
// The match function is called by the events API to check if there's
// been a state change for the deal with the target deal ID
match := func(oldTs, newTs *types.TipSet) (bool, events.StateChange, error) {
mc.lk.Lock()
defer mc.lk.Unlock()
// Check if we've already fetched the DealStates for the given tipsets
if mc.oldTsk == oldTs.Key() && mc.newTsk == newTs.Key() {
// If we fetch the DealStates and there is no difference between
// them, they are stored as nil. So we can just bail out.
if mc.oldDealStateRoot == nil || mc.newDealStateRoot == nil {
return false, nil, nil
}
// Check if the deal state has changed for the target ID
return dealStateChangedForID(ctx, mc.oldDealStateRoot, mc.newDealStateRoot)
}
// We haven't already fetched the DealStates for the given tipsets, so
// do so now
// Replace dealStateChangedForID with a function that records the
// DealStates so that we can cache them
var oldDealStateRootSaved, newDealStateRootSaved actorsmarket.DealStates
recorder := func(ctx context.Context, oldDealStateRoot, newDealStateRoot actorsmarket.DealStates) (changed bool, user state.UserData, err error) {
// Record DealStates
oldDealStateRootSaved = oldDealStateRoot
newDealStateRootSaved = newDealStateRoot
return dealStateChangedForID(ctx, oldDealStateRoot, newDealStateRoot)
}
// Call the match function
dealDiff := mc.preds.OnStorageMarketActorChanged(
mc.preds.OnDealStateChanged(recorder))
matched, data, err := dealDiff(ctx, oldTs.Key(), newTs.Key())
// Save the recorded DealStates for the tipsets
mc.oldTsk = oldTs.Key()
mc.newTsk = newTs.Key()
mc.oldDealStateRoot = oldDealStateRootSaved
mc.newDealStateRoot = newDealStateRootSaved
return matched, data, err
}
return match
}

View File

@ -0,0 +1,157 @@
package storageadapter
import (
"context"
"testing"
"github.com/filecoin-project/lotus/chain/events"
"golang.org/x/sync/errgroup"
cbornode "github.com/ipfs/go-ipld-cbor"
adt2 "github.com/filecoin-project/specs-actors/v2/actors/util/adt"
"github.com/ipfs/go-cid"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
test "github.com/filecoin-project/lotus/chain/events/state/mock"
bstore "github.com/filecoin-project/lotus/lib/blockstore"
builtin2 "github.com/filecoin-project/specs-actors/v2/actors/builtin"
market2 "github.com/filecoin-project/specs-actors/v2/actors/builtin/market"
"github.com/stretchr/testify/require"
"github.com/filecoin-project/lotus/chain/events/state"
"github.com/filecoin-project/lotus/chain/types"
)
func TestDealStateMatcher(t *testing.T) {
ctx := context.Background()
bs := bstore.NewTemporarySync()
store := adt2.WrapStore(ctx, cbornode.NewCborStore(bs))
deal1 := &market2.DealState{
SectorStartEpoch: 1,
LastUpdatedEpoch: 2,
}
deal2 := &market2.DealState{
SectorStartEpoch: 4,
LastUpdatedEpoch: 5,
}
deal3 := &market2.DealState{
SectorStartEpoch: 7,
LastUpdatedEpoch: 8,
}
deals1 := map[abi.DealID]*market2.DealState{
abi.DealID(1): deal1,
}
deals2 := map[abi.DealID]*market2.DealState{
abi.DealID(1): deal2,
}
deals3 := map[abi.DealID]*market2.DealState{
abi.DealID(1): deal3,
}
deal1StateC := createMarketState(ctx, t, store, deals1)
deal2StateC := createMarketState(ctx, t, store, deals2)
deal3StateC := createMarketState(ctx, t, store, deals3)
minerAddr, err := address.NewFromString("t00")
require.NoError(t, err)
ts1, err := test.MockTipset(minerAddr, 1)
require.NoError(t, err)
ts2, err := test.MockTipset(minerAddr, 2)
require.NoError(t, err)
ts3, err := test.MockTipset(minerAddr, 3)
require.NoError(t, err)
api := test.NewMockAPI(bs)
api.SetActor(ts1.Key(), &types.Actor{Code: builtin2.StorageMarketActorCodeID, Head: deal1StateC})
api.SetActor(ts2.Key(), &types.Actor{Code: builtin2.StorageMarketActorCodeID, Head: deal2StateC})
api.SetActor(ts3.Key(), &types.Actor{Code: builtin2.StorageMarketActorCodeID, Head: deal3StateC})
t.Run("caching", func(t *testing.T) {
dsm := newDealStateMatcher(state.NewStatePredicates(api))
matcher := dsm.matcher(ctx, abi.DealID(1))
// Call matcher with tipsets that have the same state
ok, stateChange, err := matcher(ts1, ts1)
require.NoError(t, err)
require.False(t, ok)
require.Nil(t, stateChange)
// Should call StateGetActor once for each tipset
require.Equal(t, 2, api.StateGetActorCallCount())
// Call matcher with tipsets that have different state
api.ResetCallCounts()
ok, stateChange, err = matcher(ts1, ts2)
require.NoError(t, err)
require.True(t, ok)
require.NotNil(t, stateChange)
// Should call StateGetActor once for each tipset
require.Equal(t, 2, api.StateGetActorCallCount())
// Call matcher again with the same tipsets as above, should be cached
api.ResetCallCounts()
ok, stateChange, err = matcher(ts1, ts2)
require.NoError(t, err)
require.True(t, ok)
require.NotNil(t, stateChange)
// Should not call StateGetActor (because it should hit the cache)
require.Equal(t, 0, api.StateGetActorCallCount())
// Call matcher with different tipsets, should not be cached
api.ResetCallCounts()
ok, stateChange, err = matcher(ts2, ts3)
require.NoError(t, err)
require.True(t, ok)
require.NotNil(t, stateChange)
// Should call StateGetActor once for each tipset
require.Equal(t, 2, api.StateGetActorCallCount())
})
t.Run("parallel", func(t *testing.T) {
api.ResetCallCounts()
dsm := newDealStateMatcher(state.NewStatePredicates(api))
matcher := dsm.matcher(ctx, abi.DealID(1))
// Call matcher with lots of go-routines in parallel
var eg errgroup.Group
res := make([]struct {
ok bool
stateChange events.StateChange
}, 20)
for i := 0; i < len(res); i++ {
i := i
eg.Go(func() error {
ok, stateChange, err := matcher(ts1, ts2)
res[i].ok = ok
res[i].stateChange = stateChange
return err
})
}
err := eg.Wait()
require.NoError(t, err)
// All go-routines should have got the same (cached) result
for i := 1; i < len(res); i++ {
require.Equal(t, res[i].ok, res[i-1].ok)
require.Equal(t, res[i].stateChange, res[i-1].stateChange)
}
// Only one go-routine should have called StateGetActor
// (once for each tipset)
require.Equal(t, 2, api.StateGetActorCallCount())
})
}
func createMarketState(ctx context.Context, t *testing.T, store adt2.Store, deals map[abi.DealID]*market2.DealState) cid.Cid {
dealRootCid := test.CreateDealAMT(ctx, t, store, deals)
state := test.CreateEmptyMarketState(t, store)
state.States = dealRootCid
stateC, err := store.Put(ctx, state)
require.NoError(t, err)
return stateC
}

View File

@ -51,6 +51,7 @@ type ProviderNodeAdapter struct {
ev *events.Events
publishSpec, addBalanceSpec *api.MessageSendSpec
dsMatcher *dealStateMatcher
}
func NewProviderNodeAdapter(fc *config.MinerFeeConfig) func(dag dtypes.StagingDAG, secb *sectorblocks.SectorBlocks, full api.FullNode) storagemarket.StorageProviderNode {
@ -58,9 +59,10 @@ func NewProviderNodeAdapter(fc *config.MinerFeeConfig) func(dag dtypes.StagingDA
na := &ProviderNodeAdapter{
FullNode: full,
dag: dag,
secb: secb,
ev: events.NewEvents(context.TODO(), full),
dag: dag,
secb: secb,
ev: events.NewEvents(context.TODO(), full),
dsMatcher: newDealStateMatcher(state.NewStatePredicates(full)),
}
if fc != nil {
na.publishSpec = &api.MessageSendSpec{MaxFee: abi.TokenAmount(fc.MaxPublishDealsFee)}
@ -307,44 +309,44 @@ func (n *ProviderNodeAdapter) OnDealSectorCommitted(ctx context.Context, provide
var sectorNumber abi.SectorNumber
var sectorFound bool
matchEvent := func(msg *types.Message) (matchOnce bool, matched bool, err error) {
matchEvent := func(msg *types.Message) (matched bool, err error) {
if msg.To != provider {
return true, false, nil
return false, nil
}
switch msg.Method {
case miner.Methods.PreCommitSector:
var params miner.SectorPreCommitInfo
if err := params.UnmarshalCBOR(bytes.NewReader(msg.Params)); err != nil {
return true, false, xerrors.Errorf("unmarshal pre commit: %w", err)
return false, xerrors.Errorf("unmarshal pre commit: %w", err)
}
for _, did := range params.DealIDs {
if did == dealID {
sectorNumber = params.SectorNumber
sectorFound = true
return true, false, nil
return false, nil
}
}
return true, false, nil
return false, nil
case miner.Methods.ProveCommitSector:
var params miner.ProveCommitSectorParams
if err := params.UnmarshalCBOR(bytes.NewReader(msg.Params)); err != nil {
return true, false, xerrors.Errorf("failed to unmarshal prove commit sector params: %w", err)
return false, xerrors.Errorf("failed to unmarshal prove commit sector params: %w", err)
}
if !sectorFound {
return true, false, nil
return false, nil
}
if params.SectorNumber != sectorNumber {
return true, false, nil
return false, nil
}
return false, true, nil
return true, nil
default:
return true, false, nil
return false, nil
}
}
@ -461,13 +463,7 @@ func (n *ProviderNodeAdapter) OnDealExpiredOrSlashed(ctx context.Context, dealID
}
// Watch for state changes to the deal
preds := state.NewStatePredicates(n)
dealDiff := preds.OnStorageMarketActorChanged(
preds.OnDealStateChanged(
preds.DealStateChangedForIDs([]abi.DealID{dealID})))
match := func(oldTs, newTs *types.TipSet) (bool, events.StateChange, error) {
return dealDiff(ctx, oldTs.Key(), newTs.Key())
}
match := n.dsMatcher.matcher(ctx, dealID)
// Wait until after the end epoch for the deal and then timeout
timeout := (sd.Proposal.EndEpoch - head.Height()) + 1

View File

@ -268,6 +268,7 @@ func Online() Option {
Override(new(*chain.Syncer), modules.NewSyncer),
Override(new(exchange.Client), exchange.NewClient),
Override(new(*messagepool.MessagePool), modules.MessagePool),
Override(new(dtypes.DefaultMaxFeeFunc), modules.NewDefaultMaxFeeFunc),
Override(new(modules.Genesis), modules.ErrorGenesis),
Override(new(dtypes.AfterGenesisSet), modules.SetGenesis),

View File

@ -23,6 +23,7 @@ type FullNode struct {
Client Client
Metrics Metrics
Wallet Wallet
Fees FeeConfig
}
// // Common
@ -116,6 +117,10 @@ type Wallet struct {
DisableLocal bool
}
type FeeConfig struct {
DefaultMaxFee types.FIL
}
func defCommon() Common {
return Common{
API: API{
@ -143,10 +148,15 @@ func defCommon() Common {
}
var DefaultDefaultMaxFee = types.MustParseFIL("0.007")
// DefaultFullNode returns the default config
func DefaultFullNode() *FullNode {
return &FullNode{
Common: defCommon(),
Fees: FeeConfig{
DefaultMaxFee: DefaultDefaultMaxFee,
},
}
}

View File

@ -23,6 +23,7 @@ import (
"github.com/filecoin-project/lotus/chain/stmgr"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/node/modules/dtypes"
)
type GasModuleAPI interface {
@ -34,9 +35,10 @@ type GasModuleAPI interface {
// Injection (for example with a thin RPC client).
type GasModule struct {
fx.In
Stmgr *stmgr.StateManager
Chain *store.ChainStore
Mpool *messagepool.MessagePool
Stmgr *stmgr.StateManager
Chain *store.ChainStore
Mpool *messagepool.MessagePool
GetMaxFee dtypes.DefaultMaxFeeFunc
}
var _ GasModuleAPI = (*GasModule)(nil)
@ -291,7 +293,7 @@ func (m *GasModule) GasEstimateMessageGas(ctx context.Context, msg *types.Messag
msg.GasFeeCap = feeCap
}
messagepool.CapGasFee(msg, spec.Get().MaxFee)
messagepool.CapGasFee(m.GetMaxFee, msg, spec.Get().MaxFee)
return msg, nil
}

View File

@ -491,6 +491,24 @@ func (a *StateAPI) StateReadState(ctx context.Context, actor address.Address, ts
}, nil
}
func (a *StateAPI) StateDecodeParams(ctx context.Context, toAddr address.Address, method abi.MethodNum, params []byte, tsk types.TipSetKey) (interface{}, error) {
act, err := a.StateGetActor(ctx, toAddr, tsk)
if err != nil {
return nil, xerrors.Errorf("getting actor: %w", err)
}
paramType, err := stmgr.GetParamType(act.Code, method)
if err != nil {
return nil, xerrors.Errorf("getting params type: %w", err)
}
if err = paramType.UnmarshalCBOR(bytes.NewReader(params)); err != nil {
return nil, err
}
return paramType, nil
}
// This is on StateAPI because miner.Miner requires this, and MinerAPI requires miner.Miner
func (a *StateAPI) MinerGetBaseInfo(ctx context.Context, maddr address.Address, epoch abi.ChainEpoch, tsk types.TipSetKey) (*api.MiningBaseInfo, error) {
return stmgr.MinerGetBaseInfo(ctx, a.StateManager, a.Beacon, tsk, epoch, maddr, a.ProofVerifier)

View File

@ -296,8 +296,8 @@ func (sm *StorageMinerAPI) WorkerConnect(ctx context.Context, url string) error
return sm.StorageMgr.AddWorker(ctx, w)
}
func (sm *StorageMinerAPI) SealingSchedDiag(ctx context.Context) (interface{}, error) {
return sm.StorageMgr.SchedDiag(ctx)
func (sm *StorageMinerAPI) SealingSchedDiag(ctx context.Context, doSched bool) (interface{}, error) {
return sm.StorageMgr.SchedDiag(ctx, doSched)
}
func (sm *StorageMinerAPI) MarketImportDealData(ctx context.Context, propCid cid.Cid, path string) error {

View File

@ -15,11 +15,13 @@ import (
"golang.org/x/xerrors"
"github.com/filecoin-project/go-jsonrpc/auth"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/api/apistruct"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/lib/addrutil"
"github.com/filecoin-project/lotus/node/config"
"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/node/repo"
)
@ -99,9 +101,34 @@ func DrandBootstrap(ds dtypes.DrandSchedule) (dtypes.DrandBootstrap, error) {
addrs, err := addrutil.ParseAddresses(context.TODO(), d.Config.Relays)
if err != nil {
log.Errorf("reoslving drand relays addresses: %+v", err)
return res, nil
continue
}
res = append(res, addrs...)
}
return res, nil
}
func NewDefaultMaxFeeFunc(r repo.LockedRepo) dtypes.DefaultMaxFeeFunc {
return func() (out abi.TokenAmount, err error) {
err = readNodeCfg(r, func(cfg *config.FullNode) {
out = abi.TokenAmount(cfg.Fees.DefaultMaxFee)
})
return
}
}
func readNodeCfg(r repo.LockedRepo, accessor func(node *config.FullNode)) error {
raw, err := r.Config()
if err != nil {
return err
}
cfg, ok := raw.(*config.FullNode)
if !ok {
return xerrors.New("expected config.FullNode")
}
accessor(cfg)
return nil
}

View File

@ -5,6 +5,7 @@ import (
"sync"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
)
type MpoolLocker struct {
@ -33,3 +34,5 @@ func (ml *MpoolLocker) TakeLock(ctx context.Context, a address.Address) (func(),
<-lk
}, nil
}
type DefaultMaxFeeFunc func() (abi.TokenAmount, error)

View File

@ -4,14 +4,14 @@ import (
"os"
"path/filepath"
"github.com/ipfs/go-datastore"
dgbadger "github.com/dgraph-io/badger/v2"
ldbopts "github.com/syndtr/goleveldb/leveldb/opt"
"golang.org/x/xerrors"
dgbadger "github.com/dgraph-io/badger/v2"
"github.com/ipfs/go-datastore"
badger "github.com/ipfs/go-ds-badger2"
levelds "github.com/ipfs/go-ds-leveldb"
measure "github.com/ipfs/go-ds-measure"
ldbopts "github.com/syndtr/goleveldb/leveldb/opt"
)
type dsCtor func(path string, readonly bool) (datastore.Batching, error)
@ -31,7 +31,6 @@ func badgerDs(path string, readonly bool) (datastore.Batching, error) {
opts.Options = dgbadger.DefaultOptions("").WithTruncate(true).
WithValueThreshold(1 << 10)
return badger.NewDatastore(path, &opts)
}

View File

@ -103,27 +103,27 @@ func (pcs *paymentChannelSettler) revertHandler(ctx context.Context, ts *types.T
return nil
}
func (pcs *paymentChannelSettler) matcher(msg *types.Message) (matchOnce bool, matched bool, err error) {
func (pcs *paymentChannelSettler) matcher(msg *types.Message) (matched bool, err error) {
// Check if this is a settle payment channel message
if msg.Method != paych.Methods.Settle {
return false, false, nil
return false, nil
}
// Check if this payment channel is of concern to this node (i.e. tracked in payment channel store),
// and its inbound (i.e. we're getting vouchers that we may need to redeem)
trackedAddresses, err := pcs.api.PaychList(pcs.ctx)
if err != nil {
return false, false, err
return false, err
}
for _, addr := range trackedAddresses {
if msg.To == addr {
status, err := pcs.api.PaychStatus(pcs.ctx, addr)
if err != nil {
return false, false, err
return false, err
}
if status.Direction == api.PCHInbound {
return false, true, nil
return true, nil
}
}
}
return false, false, nil
return false, nil
}

View File

@ -510,10 +510,10 @@ func (s *WindowPoStScheduler) runPost(ctx context.Context, di dline.Info, ts *ty
skipCount := uint64(0)
postSkipped := bitfield.New()
var postOut []proof2.PoStProof
somethingToProve := true
somethingToProve := false
for retries := 0; retries < 5; retries++ {
// Retry until we run out of sectors to prove.
for retries := 0; ; retries++ {
var partitions []miner.PoStPartition
var sinfos []proof2.SectorInfo
for partIdx, partition := range batch {
@ -567,7 +567,6 @@ func (s *WindowPoStScheduler) runPost(ctx context.Context, di dline.Info, ts *ty
if len(sinfos) == 0 {
// nothing to prove for this batch
somethingToProve = false
break
}
@ -585,27 +584,43 @@ func (s *WindowPoStScheduler) runPost(ctx context.Context, di dline.Info, ts *ty
return nil, err
}
var ps []abi.SectorID
postOut, ps, err = s.prover.GenerateWindowPoSt(ctx, abi.ActorID(mid), sinfos, abi.PoStRandomness(rand))
postOut, ps, err := s.prover.GenerateWindowPoSt(ctx, abi.ActorID(mid), sinfos, abi.PoStRandomness(rand))
elapsed := time.Since(tsStart)
log.Infow("computing window post", "batch", batchIdx, "elapsed", elapsed)
if err == nil {
// Proof generation successful, stop retrying
params.Partitions = append(params.Partitions, partitions...)
if len(postOut) == 0 {
return nil, xerrors.Errorf("received no proofs back from generate window post")
}
// Proof generation successful, stop retrying
somethingToProve = true
params.Partitions = partitions
params.Proofs = postOut
break
}
// Proof generation failed, so retry
if len(ps) == 0 {
// If we didn't skip any new sectors, we failed
// for some other reason and we need to abort.
return nil, xerrors.Errorf("running window post failed: %w", err)
}
// TODO: maybe mark these as faulty somewhere?
log.Warnw("generate window post skipped sectors", "sectors", ps, "error", err, "try", retries)
// Explicitly make sure we haven't aborted this PoSt
// (GenerateWindowPoSt may or may not check this).
// Otherwise, we could try to continue proving a
// deadline after the deadline has ended.
if ctx.Err() != nil {
log.Warnw("aborting PoSt due to context cancellation", "error", ctx.Err(), "deadline", di.Index)
return nil, ctx.Err()
}
skipCount += uint64(len(ps))
for _, sector := range ps {
postSkipped.Set(uint64(sector.Number))
@ -617,12 +632,6 @@ func (s *WindowPoStScheduler) runPost(ctx context.Context, di dline.Info, ts *ty
continue
}
if len(postOut) == 0 {
return nil, xerrors.Errorf("received no proofs back from generate window post")
}
params.Proofs = postOut
posts = append(posts, params)
}