Merge remote-tracking branch 'origin/next' into feat/client-multi-bstore

This commit is contained in:
Łukasz Magiera 2020-07-08 22:48:20 +02:00
commit 66237415c5
52 changed files with 1418 additions and 345 deletions

View File

@ -15,6 +15,7 @@ import (
"github.com/filecoin-project/specs-actors/actors/builtin/miner"
"github.com/filecoin-project/specs-actors/actors/builtin/paych"
"github.com/filecoin-project/specs-actors/actors/builtin/power"
"github.com/filecoin-project/specs-actors/actors/builtin/verifreg"
"github.com/filecoin-project/specs-actors/actors/crypto"
"github.com/filecoin-project/lotus/chain/types"
@ -301,6 +302,10 @@ type FullNode interface {
// StateCompute is a flexible command that applies the given messages on the given tipset.
// The messages are run as though the VM were at the provided height.
StateCompute(context.Context, abi.ChainEpoch, []*types.Message, types.TipSetKey) (*ComputeStateOutput, error)
// StateVerifiedClientStatus returns the data cap for the given address.
// Returns nil if there is no entry in the data cap table for the
// address.
StateVerifiedClientStatus(ctx context.Context, addr address.Address, tsk types.TipSetKey) (*verifreg.DataCap, error)
// MethodGroup: Msig
// The Msig methods are used to interact with multisig wallets on the

View File

@ -3,12 +3,14 @@ package api
import (
"bytes"
"context"
"time"
"github.com/ipfs/go-cid"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/sector-storage/fsutil"
"github.com/filecoin-project/sector-storage/stores"
"github.com/filecoin-project/sector-storage/storiface"
"github.com/filecoin-project/specs-actors/actors/abi"
@ -35,16 +37,22 @@ type StorageMiner interface {
SectorsRefs(context.Context) (map[string][]SealedRef, error)
// SectorStartSealing can be called on sectors in Empty on WaitDeals states
// SectorStartSealing can be called on sectors in Empty or WaitDeals states
// to trigger sealing early
SectorStartSealing(context.Context, abi.SectorNumber) error
// SectorSetSealDelay sets the time that a newly-created sector
// waits for more deals before it starts sealing
SectorSetSealDelay(context.Context, time.Duration) error
// SectorGetSealDelay gets the time that a newly-created sector
// waits for more deals before it starts sealing
SectorGetSealDelay(context.Context) (time.Duration, error)
SectorsUpdate(context.Context, abi.SectorNumber, SectorState) error
SectorRemove(context.Context, abi.SectorNumber) error
SectorMarkForUpgrade(ctx context.Context, id abi.SectorNumber) error
StorageList(ctx context.Context) (map[stores.ID][]stores.Decl, error)
StorageLocal(ctx context.Context) (map[stores.ID]string, error)
StorageStat(ctx context.Context, id stores.ID) (stores.FsStat, error)
StorageStat(ctx context.Context, id stores.ID) (fsutil.FsStat, error)
// WorkerConnect tells the node to connect to workers RPC
WorkerConnect(context.Context, string) error

View File

@ -3,6 +3,7 @@ package apistruct
import (
"context"
"io"
"time"
"github.com/ipfs/go-cid"
"github.com/libp2p/go-libp2p-core/network"
@ -11,7 +12,7 @@ import (
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/go-jsonrpc/auth"
"github.com/filecoin-project/sector-storage/fsutil"
"github.com/filecoin-project/sector-storage/sealtasks"
"github.com/filecoin-project/sector-storage/stores"
"github.com/filecoin-project/sector-storage/storiface"
@ -19,6 +20,7 @@ import (
"github.com/filecoin-project/specs-actors/actors/abi/big"
"github.com/filecoin-project/specs-actors/actors/builtin/miner"
"github.com/filecoin-project/specs-actors/actors/builtin/paych"
"github.com/filecoin-project/specs-actors/actors/builtin/verifreg"
"github.com/filecoin-project/specs-actors/actors/crypto"
"github.com/filecoin-project/specs-storage/storage"
@ -157,6 +159,7 @@ type FullNodeStruct struct {
StateMinerSectorCount func(context.Context, address.Address, types.TipSetKey) (api.MinerSectors, error) `perm:"read"`
StateListMessages func(ctx context.Context, match *types.Message, tsk types.TipSetKey, toht abi.ChainEpoch) ([]cid.Cid, error) `perm:"read"`
StateCompute func(context.Context, abi.ChainEpoch, []*types.Message, types.TipSetKey) (*api.ComputeStateOutput, error) `perm:"read"`
StateVerifiedClientStatus func(context.Context, address.Address, types.TipSetKey) (*verifreg.DataCap, error) `perm:"read"`
MsigGetAvailableBalance func(context.Context, address.Address, types.TipSetKey) (types.BigInt, error) `perm:"read"`
MsigCreate func(context.Context, uint64, []address.Address, types.BigInt, address.Address, types.BigInt) (cid.Cid, error) `perm:"sign"`
@ -207,6 +210,8 @@ type StorageMinerStruct struct {
SectorsList func(context.Context) ([]abi.SectorNumber, error) `perm:"read"`
SectorsRefs func(context.Context) (map[string][]api.SealedRef, error) `perm:"read"`
SectorStartSealing func(context.Context, abi.SectorNumber) error `perm:"write"`
SectorSetSealDelay func(context.Context, time.Duration) error `perm:"write"`
SectorGetSealDelay func(context.Context) (time.Duration, error) `perm:"read"`
SectorsUpdate func(context.Context, abi.SectorNumber, api.SectorState) error `perm:"admin"`
SectorRemove func(context.Context, abi.SectorNumber) error `perm:"admin"`
SectorMarkForUpgrade func(ctx context.Context, id abi.SectorNumber) error `perm:"admin"`
@ -216,8 +221,8 @@ type StorageMinerStruct struct {
StorageList func(context.Context) (map[stores.ID][]stores.Decl, error) `perm:"admin"`
StorageLocal func(context.Context) (map[stores.ID]string, error) `perm:"admin"`
StorageStat func(context.Context, stores.ID) (stores.FsStat, error) `perm:"admin"`
StorageAttach func(context.Context, stores.StorageInfo, stores.FsStat) error `perm:"admin"`
StorageStat func(context.Context, stores.ID) (fsutil.FsStat, error) `perm:"admin"`
StorageAttach func(context.Context, stores.StorageInfo, fsutil.FsStat) error `perm:"admin"`
StorageDeclareSector func(context.Context, stores.ID, abi.SectorID, stores.SectorFileType, bool) error `perm:"admin"`
StorageDropSector func(context.Context, stores.ID, abi.SectorID, stores.SectorFileType) error `perm:"admin"`
StorageFindSector func(context.Context, abi.SectorID, stores.SectorFileType, bool) ([]stores.SectorStorageInfo, error) `perm:"admin"`
@ -695,6 +700,10 @@ func (c *FullNodeStruct) StateCompute(ctx context.Context, height abi.ChainEpoch
return c.Internal.StateCompute(ctx, height, msgs, tsk)
}
func (c *FullNodeStruct) StateVerifiedClientStatus(ctx context.Context, addr address.Address, tsk types.TipSetKey) (*verifreg.DataCap, error) {
return c.Internal.StateVerifiedClientStatus(ctx, addr, tsk)
}
func (c *FullNodeStruct) MsigGetAvailableBalance(ctx context.Context, a address.Address, tsk types.TipSetKey) (types.BigInt, error) {
return c.Internal.MsigGetAvailableBalance(ctx, a, tsk)
}
@ -803,6 +812,14 @@ func (c *StorageMinerStruct) SectorStartSealing(ctx context.Context, number abi.
return c.Internal.SectorStartSealing(ctx, number)
}
func (c *StorageMinerStruct) SectorSetSealDelay(ctx context.Context, delay time.Duration) error {
return c.Internal.SectorSetSealDelay(ctx, delay)
}
func (c *StorageMinerStruct) SectorGetSealDelay(ctx context.Context) (time.Duration, error) {
return c.Internal.SectorGetSealDelay(ctx)
}
func (c *StorageMinerStruct) SectorsUpdate(ctx context.Context, id abi.SectorNumber, state api.SectorState) error {
return c.Internal.SectorsUpdate(ctx, id, state)
}
@ -823,7 +840,7 @@ func (c *StorageMinerStruct) WorkerStats(ctx context.Context) (map[uint64]storif
return c.Internal.WorkerStats(ctx)
}
func (c *StorageMinerStruct) StorageAttach(ctx context.Context, si stores.StorageInfo, st stores.FsStat) error {
func (c *StorageMinerStruct) StorageAttach(ctx context.Context, si stores.StorageInfo, st fsutil.FsStat) error {
return c.Internal.StorageAttach(ctx, si, st)
}
@ -847,7 +864,7 @@ func (c *StorageMinerStruct) StorageLocal(ctx context.Context) (map[stores.ID]st
return c.Internal.StorageLocal(ctx)
}
func (c *StorageMinerStruct) StorageStat(ctx context.Context, id stores.ID) (stores.FsStat, error) {
func (c *StorageMinerStruct) StorageStat(ctx context.Context, id stores.ID) (fsutil.FsStat, error) {
return c.Internal.StorageStat(ctx, id)
}

View File

@ -78,7 +78,7 @@ func TestCCUpgrade(t *testing.T, b APIBuilder, blocktime time.Duration) {
t.Fatal(err)
}
makeDeal(t, ctx, 6, client, miner, false)
makeDeal(t, ctx, 6, client, miner, false, false)
// Validate upgrade

View File

@ -37,7 +37,7 @@ func init() {
build.InsecurePoStValidation = true
}
func TestDealFlow(t *testing.T, b APIBuilder, blocktime time.Duration, carExport bool) {
func TestDealFlow(t *testing.T, b APIBuilder, blocktime time.Duration, carExport, fastRet bool) {
_ = os.Setenv("BELLMAN_NO_GPU", "1")
ctx := context.Background()
@ -67,7 +67,7 @@ func TestDealFlow(t *testing.T, b APIBuilder, blocktime time.Duration, carExport
}
}()
makeDeal(t, ctx, 6, client, miner, carExport)
makeDeal(t, ctx, 6, client, miner, carExport, fastRet)
atomic.AddInt64(&mine, -1)
fmt.Println("shutting down mining")
@ -105,15 +105,15 @@ func TestDoubleDealFlow(t *testing.T, b APIBuilder, blocktime time.Duration) {
}
}()
makeDeal(t, ctx, 6, client, miner, false)
makeDeal(t, ctx, 7, client, miner, false)
makeDeal(t, ctx, 6, client, miner, false, false)
makeDeal(t, ctx, 7, client, miner, false, false)
atomic.AddInt64(&mine, -1)
fmt.Println("shutting down mining")
<-done
}
func makeDeal(t *testing.T, ctx context.Context, rseed int, client *impl.FullNodeAPI, miner TestStorageNode, carExport bool) {
func makeDeal(t *testing.T, ctx context.Context, rseed int, client *impl.FullNodeAPI, miner TestStorageNode, carExport, fastRet bool) {
data := make([]byte, 1600)
rand.New(rand.NewSource(int64(rseed))).Read(data)
@ -125,7 +125,7 @@ func makeDeal(t *testing.T, ctx context.Context, rseed int, client *impl.FullNod
fmt.Println("FILE CID: ", fcid)
deal := startDeal(t, ctx, miner, client, fcid)
deal := startDeal(t, ctx, miner, client, fcid, fastRet)
// TODO: this sleep is only necessary because deals don't immediately get logged in the dealstore, we should fix this
time.Sleep(time.Second)
@ -136,7 +136,7 @@ func makeDeal(t *testing.T, ctx context.Context, rseed int, client *impl.FullNod
testRetrieval(t, ctx, err, client, fcid, carExport, data)
}
func startDeal(t *testing.T, ctx context.Context, miner TestStorageNode, client *impl.FullNodeAPI, fcid cid.Cid) *cid.Cid {
func startDeal(t *testing.T, ctx context.Context, miner TestStorageNode, client *impl.FullNodeAPI, fcid cid.Cid, fastRet bool) *cid.Cid {
maddr, err := miner.ActorAddress(ctx)
if err != nil {
t.Fatal(err)
@ -152,6 +152,7 @@ func startDeal(t *testing.T, ctx context.Context, miner TestStorageNode, client
Miner: maddr,
EpochPrice: types.NewInt(1000000),
MinBlocksDuration: 100,
FastRetrieval: fastRet,
})
if err != nil {
t.Fatalf("%+v", err)

View File

@ -189,7 +189,7 @@ func TestDealMining(t *testing.T, b APIBuilder, blocktime time.Duration, carExpo
}
}()
deal := startDeal(t, ctx, provider, client, fcid)
deal := startDeal(t, ctx, provider, client, fcid, false)
// TODO: this sleep is only necessary because deals don't immediately get logged in the dealstore, we should fix this
time.Sleep(time.Second)

View File

@ -30,7 +30,7 @@ func (bbr BadBlockReason) Linked(reason string, i ...interface{}) BadBlockReason
if bbr.OriginalReason != nil {
or = bbr.OriginalReason
}
return BadBlockReason{Reason: reason, OriginalReason: or}
return BadBlockReason{Reason: fmt.Sprintf(reason, i...), OriginalReason: or}
}
func (bbr BadBlockReason) String() string {

View File

@ -485,13 +485,15 @@ func (me *messageEvents) checkNewCalls(ts *types.TipSet) (map[triggerID]eventDat
for tid, matchFns := range me.matchers {
var matched bool
var once bool
for _, matchFn := range matchFns {
ok, err := matchFn(msg)
matchOne, ok, err := matchFn(msg)
if err != nil {
log.Errorf("event matcher failed: %s", err)
continue
}
matched = ok
once = matchOne
if matched {
break
@ -500,7 +502,9 @@ func (me *messageEvents) checkNewCalls(ts *types.TipSet) (map[triggerID]eventDat
if matched {
res[tid] = msg
break
if once {
break
}
}
}
})
@ -548,7 +552,7 @@ func (me *messageEvents) messagesForTs(ts *types.TipSet, consume func(*types.Mes
// `curH`-`ts.Height` = `confidence`
type MsgHandler func(msg *types.Message, rec *types.MessageReceipt, ts *types.TipSet, curH abi.ChainEpoch) (more bool, err error)
type MsgMatchFunc func(msg *types.Message) (bool, error)
type MsgMatchFunc func(msg *types.Message) (matchOnce bool, matched bool, err error)
// Called registers a callback which is triggered when a specified method is
// called on an actor, or a timeout is reached.

View File

@ -561,9 +561,9 @@ func TestAtChainedConfidenceNull(t *testing.T) {
require.Equal(t, false, reverted)
}
func matchAddrMethod(to address.Address, m abi.MethodNum) func(msg *types.Message) (bool, error) {
return func(msg *types.Message) (bool, error) {
return to == msg.To && m == msg.Method, nil
func matchAddrMethod(to address.Address, m abi.MethodNum) func(msg *types.Message) (matchOnce bool, matched bool, err error) {
return func(msg *types.Message) (matchOnce bool, matched bool, err error) {
return true, to == msg.To && m == msg.Method, nil
}
}

View File

@ -0,0 +1,25 @@
package state
import (
"context"
"github.com/ipfs/go-cid"
cbor "github.com/ipfs/go-ipld-cbor"
)
type contextStore struct {
ctx context.Context
cst *cbor.BasicIpldStore
}
func (cs *contextStore) Context() context.Context {
return cs.ctx
}
func (cs *contextStore) Get(ctx context.Context, c cid.Cid, out interface{}) error {
return cs.cst.Get(ctx, c, out)
}
func (cs *contextStore) Put(ctx context.Context, v interface{}) (cid.Cid, error) {
return cs.cst.Put(ctx, v)
}

View File

@ -0,0 +1,55 @@
package state
import (
"github.com/filecoin-project/specs-actors/actors/util/adt"
typegen "github.com/whyrusleeping/cbor-gen"
)
// AdtArrayDiff generalizes adt.Array diffing by accepting a Deferred type that can unmarshalled to its corresponding struct
// in an interface implantation.
// Add should be called when a new k,v is added to the array
// Modify should be called when a value is modified in the array
// Remove should be called when a value is removed from the array
type AdtArrayDiff interface {
Add(key uint64, val *typegen.Deferred) error
Modify(key uint64, from, to *typegen.Deferred) error
Remove(key uint64, val *typegen.Deferred) error
}
// TODO Performance can be improved by diffing the underlying IPLD graph, e.g. https://github.com/ipfs/go-merkledag/blob/749fd8717d46b4f34c9ce08253070079c89bc56d/dagutils/diff.go#L104
// CBOR Marshaling will likely be the largest performance bottleneck here.
// DiffAdtArray accepts two *adt.Array's and an AdtArrayDiff implementation. It does the following:
// - All values that exist in preArr and not in curArr are passed to AdtArrayDiff.Remove()
// - All values that exist in curArr nnd not in prevArr are passed to adtArrayDiff.Add()
// - All values that exist in preArr and in curArr are passed to AdtArrayDiff.Modify()
// - It is the responsibility of AdtArrayDiff.Modify() to determine if the values it was passed have been modified.
func DiffAdtArray(preArr, curArr *adt.Array, out AdtArrayDiff) error {
prevVal := new(typegen.Deferred)
if err := preArr.ForEach(prevVal, func(i int64) error {
curVal := new(typegen.Deferred)
found, err := curArr.Get(uint64(i), curVal)
if err != nil {
return err
}
if !found {
if err := out.Remove(uint64(i), prevVal); err != nil {
return err
}
return nil
}
if err := out.Modify(uint64(i), prevVal, curVal); err != nil {
return err
}
return curArr.Delete(uint64(i))
}); err != nil {
return err
}
curVal := new(typegen.Deferred)
return curArr.ForEach(curVal, func(i int64) error {
return out.Add(uint64(i), curVal)
})
}

View File

@ -0,0 +1,155 @@
package state
import (
"bytes"
"context"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
ds "github.com/ipfs/go-datastore"
ds_sync "github.com/ipfs/go-datastore/sync"
bstore "github.com/ipfs/go-ipfs-blockstore"
cbornode "github.com/ipfs/go-ipld-cbor"
typegen "github.com/whyrusleeping/cbor-gen"
"github.com/filecoin-project/specs-actors/actors/runtime"
"github.com/filecoin-project/specs-actors/actors/util/adt"
)
func TestDiffAdtArray(t *testing.T) {
ctxstoreA := newContextStore()
ctxstoreB := newContextStore()
arrA := adt.MakeEmptyArray(ctxstoreA)
arrB := adt.MakeEmptyArray(ctxstoreB)
require.NoError(t, arrA.Set(0, runtime.CBORBytes([]byte{0}))) // delete
require.NoError(t, arrA.Set(1, runtime.CBORBytes([]byte{0}))) // modify
require.NoError(t, arrB.Set(1, runtime.CBORBytes([]byte{1})))
require.NoError(t, arrA.Set(2, runtime.CBORBytes([]byte{1}))) // delete
require.NoError(t, arrA.Set(3, runtime.CBORBytes([]byte{0}))) // noop
require.NoError(t, arrB.Set(3, runtime.CBORBytes([]byte{0})))
require.NoError(t, arrA.Set(4, runtime.CBORBytes([]byte{0}))) // modify
require.NoError(t, arrB.Set(4, runtime.CBORBytes([]byte{6})))
require.NoError(t, arrB.Set(5, runtime.CBORBytes{8})) // add
require.NoError(t, arrB.Set(6, runtime.CBORBytes{9})) // add
changes := new(TestAdtDiff)
assert.NoError(t, DiffAdtArray(arrA, arrB, changes))
assert.NotNil(t, changes)
assert.Equal(t, 2, len(changes.Added))
// keys 5 and 6 were added
assert.EqualValues(t, uint64(5), changes.Added[0].key)
assert.EqualValues(t, []byte{8}, changes.Added[0].val)
assert.EqualValues(t, uint64(6), changes.Added[1].key)
assert.EqualValues(t, []byte{9}, changes.Added[1].val)
assert.Equal(t, 2, len(changes.Modified))
// keys 1 and 4 were modified
assert.EqualValues(t, uint64(1), changes.Modified[0].From.key)
assert.EqualValues(t, []byte{0}, changes.Modified[0].From.val)
assert.EqualValues(t, uint64(1), changes.Modified[0].To.key)
assert.EqualValues(t, []byte{1}, changes.Modified[0].To.val)
assert.EqualValues(t, uint64(4), changes.Modified[1].From.key)
assert.EqualValues(t, []byte{0}, changes.Modified[1].From.val)
assert.EqualValues(t, uint64(4), changes.Modified[1].To.key)
assert.EqualValues(t, []byte{6}, changes.Modified[1].To.val)
assert.Equal(t, 2, len(changes.Removed))
// keys 0 and 2 were deleted
assert.EqualValues(t, uint64(0), changes.Removed[0].key)
assert.EqualValues(t, []byte{0}, changes.Removed[0].val)
assert.EqualValues(t, uint64(2), changes.Removed[1].key)
assert.EqualValues(t, []byte{1}, changes.Removed[1].val)
}
type adtDiffResult struct {
key uint64
val runtime.CBORBytes
}
type TestAdtDiff struct {
Added []adtDiffResult
Modified []TestAdtDiffModified
Removed []adtDiffResult
}
var _ AdtArrayDiff = &TestAdtDiff{}
type TestAdtDiffModified struct {
From adtDiffResult
To adtDiffResult
}
func (t *TestAdtDiff) Add(key uint64, val *typegen.Deferred) error {
v := new(runtime.CBORBytes)
err := v.UnmarshalCBOR(bytes.NewReader(val.Raw))
if err != nil {
return err
}
t.Added = append(t.Added, adtDiffResult{
key: key,
val: *v,
})
return nil
}
func (t *TestAdtDiff) Modify(key uint64, from, to *typegen.Deferred) error {
vFrom := new(runtime.CBORBytes)
err := vFrom.UnmarshalCBOR(bytes.NewReader(from.Raw))
if err != nil {
return err
}
vTo := new(runtime.CBORBytes)
err = vTo.UnmarshalCBOR(bytes.NewReader(to.Raw))
if err != nil {
return err
}
if !bytes.Equal(*vFrom, *vTo) {
t.Modified = append(t.Modified, TestAdtDiffModified{
From: adtDiffResult{
key: key,
val: *vFrom,
},
To: adtDiffResult{
key: key,
val: *vTo,
},
})
}
return nil
}
func (t *TestAdtDiff) Remove(key uint64, val *typegen.Deferred) error {
v := new(runtime.CBORBytes)
err := v.UnmarshalCBOR(bytes.NewReader(val.Raw))
if err != nil {
return err
}
t.Removed = append(t.Removed, adtDiffResult{
key: key,
val: *v,
})
return nil
}
func newContextStore() *contextStore {
ctx := context.Background()
bs := bstore.NewBlockstore(ds_sync.MutexWrap(ds.NewMapDatastore()))
store := cbornode.NewCborStore(bs)
return &contextStore{
ctx: ctx,
cst: store,
}
}

View File

@ -1,19 +1,27 @@
package state
import (
"bytes"
"context"
"github.com/ipfs/go-cid"
cbor "github.com/ipfs/go-ipld-cbor"
typegen "github.com/whyrusleeping/cbor-gen"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-amt-ipld/v2"
"github.com/filecoin-project/lotus/api/apibstore"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/specs-actors/actors/builtin"
"github.com/filecoin-project/specs-actors/actors/builtin/market"
"github.com/ipfs/go-cid"
cbor "github.com/ipfs/go-ipld-cbor"
"github.com/filecoin-project/specs-actors/actors/builtin/miner"
"github.com/filecoin-project/specs-actors/actors/util/adt"
"github.com/filecoin-project/lotus/api/apibstore"
"github.com/filecoin-project/lotus/chain/types"
)
// UserData is the data returned from the DiffFunc
// UserData is the data returned from the DiffTipSetKeyFunc
type UserData interface{}
// ChainAPI abstracts out calls made by this class to external APIs
@ -35,22 +43,22 @@ func NewStatePredicates(api ChainAPI) *StatePredicates {
}
}
// DiffFunc check if there's a change form oldState to newState, and returns
// DiffTipSetKeyFunc check if there's a change form oldState to newState, and returns
// - changed: was there a change
// - user: user-defined data representing the state change
// - err
type DiffFunc func(ctx context.Context, oldState, newState *types.TipSet) (changed bool, user UserData, err error)
type DiffTipSetKeyFunc func(ctx context.Context, oldState, newState types.TipSetKey) (changed bool, user UserData, err error)
type DiffStateFunc func(ctx context.Context, oldActorStateHead, newActorStateHead cid.Cid) (changed bool, user UserData, err error)
type DiffActorStateFunc func(ctx context.Context, oldActorStateHead, newActorStateHead cid.Cid) (changed bool, user UserData, err error)
// OnActorStateChanged calls diffStateFunc when the state changes for the given actor
func (sp *StatePredicates) OnActorStateChanged(addr address.Address, diffStateFunc DiffStateFunc) DiffFunc {
return func(ctx context.Context, oldState, newState *types.TipSet) (changed bool, user UserData, err error) {
oldActor, err := sp.api.StateGetActor(ctx, addr, oldState.Key())
func (sp *StatePredicates) OnActorStateChanged(addr address.Address, diffStateFunc DiffActorStateFunc) DiffTipSetKeyFunc {
return func(ctx context.Context, oldState, newState types.TipSetKey) (changed bool, user UserData, err error) {
oldActor, err := sp.api.StateGetActor(ctx, addr, oldState)
if err != nil {
return false, nil, err
}
newActor, err := sp.api.StateGetActor(ctx, addr, newState.Key())
newActor, err := sp.api.StateGetActor(ctx, addr, newState)
if err != nil {
return false, nil, err
}
@ -65,7 +73,7 @@ func (sp *StatePredicates) OnActorStateChanged(addr address.Address, diffStateFu
type DiffStorageMarketStateFunc func(ctx context.Context, oldState *market.State, newState *market.State) (changed bool, user UserData, err error)
// OnStorageMarketActorChanged calls diffStorageMarketState when the state changes for the market actor
func (sp *StatePredicates) OnStorageMarketActorChanged(diffStorageMarketState DiffStorageMarketStateFunc) DiffFunc {
func (sp *StatePredicates) OnStorageMarketActorChanged(diffStorageMarketState DiffStorageMarketStateFunc) DiffTipSetKeyFunc {
return sp.OnActorStateChanged(builtin.StorageMarketActorAddr, func(ctx context.Context, oldActorStateHead, newActorStateHead cid.Cid) (changed bool, user UserData, err error) {
var oldState market.State
if err := sp.cst.Get(ctx, oldActorStateHead, &oldState); err != nil {
@ -142,3 +150,115 @@ func (sp *StatePredicates) DealStateChangedForIDs(dealIds []abi.DealID) DiffDeal
return false, nil, nil
}
}
type DiffMinerActorStateFunc func(ctx context.Context, oldState *miner.State, newState *miner.State) (changed bool, user UserData, err error)
func (sp *StatePredicates) OnMinerActorChange(minerAddr address.Address, diffMinerActorState DiffMinerActorStateFunc) DiffTipSetKeyFunc {
return sp.OnActorStateChanged(minerAddr, func(ctx context.Context, oldActorStateHead, newActorStateHead cid.Cid) (changed bool, user UserData, err error) {
var oldState miner.State
if err := sp.cst.Get(ctx, oldActorStateHead, &oldState); err != nil {
return false, nil, err
}
var newState miner.State
if err := sp.cst.Get(ctx, newActorStateHead, &newState); err != nil {
return false, nil, err
}
return diffMinerActorState(ctx, &oldState, &newState)
})
}
type MinerSectorChanges struct {
Added []miner.SectorOnChainInfo
Extended []SectorExtensions
Removed []miner.SectorOnChainInfo
}
var _ AdtArrayDiff = &MinerSectorChanges{}
type SectorExtensions struct {
From miner.SectorOnChainInfo
To miner.SectorOnChainInfo
}
func (m *MinerSectorChanges) Add(key uint64, val *typegen.Deferred) error {
si := new(miner.SectorOnChainInfo)
err := si.UnmarshalCBOR(bytes.NewReader(val.Raw))
if err != nil {
return err
}
m.Added = append(m.Added, *si)
return nil
}
func (m *MinerSectorChanges) Modify(key uint64, from, to *typegen.Deferred) error {
siFrom := new(miner.SectorOnChainInfo)
err := siFrom.UnmarshalCBOR(bytes.NewReader(from.Raw))
if err != nil {
return err
}
siTo := new(miner.SectorOnChainInfo)
err = siTo.UnmarshalCBOR(bytes.NewReader(to.Raw))
if err != nil {
return err
}
if siFrom.Expiration != siTo.Expiration {
m.Extended = append(m.Extended, SectorExtensions{
From: *siFrom,
To: *siTo,
})
}
return nil
}
func (m *MinerSectorChanges) Remove(key uint64, val *typegen.Deferred) error {
si := new(miner.SectorOnChainInfo)
err := si.UnmarshalCBOR(bytes.NewReader(val.Raw))
if err != nil {
return err
}
m.Removed = append(m.Removed, *si)
return nil
}
func (sp *StatePredicates) OnMinerSectorChange() DiffMinerActorStateFunc {
return func(ctx context.Context, oldState, newState *miner.State) (changed bool, user UserData, err error) {
ctxStore := &contextStore{
ctx: ctx,
cst: sp.cst,
}
sectorChanges := &MinerSectorChanges{
Added: []miner.SectorOnChainInfo{},
Extended: []SectorExtensions{},
Removed: []miner.SectorOnChainInfo{},
}
// no sector changes
if oldState.Sectors.Equals(newState.Sectors) {
return false, nil, nil
}
oldSectors, err := adt.AsArray(ctxStore, oldState.Sectors)
if err != nil {
return false, nil, err
}
newSectors, err := adt.AsArray(ctxStore, newState.Sectors)
if err != nil {
return false, nil, err
}
if err := DiffAdtArray(oldSectors, newSectors, sectorChanges); err != nil {
return false, nil, err
}
// nothing changed
if len(sectorChanges.Added)+len(sectorChanges.Extended)+len(sectorChanges.Removed) == 0 {
return false, nil, nil
}
return true, sectorChanges, nil
}
}

View File

@ -4,23 +4,26 @@ import (
"context"
"testing"
"github.com/filecoin-project/specs-actors/actors/crypto"
"github.com/ipfs/go-hamt-ipld"
"github.com/filecoin-project/go-amt-ipld/v2"
"github.com/filecoin-project/specs-actors/actors/builtin/market"
ds "github.com/ipfs/go-datastore"
ds_sync "github.com/ipfs/go-datastore/sync"
bstore "github.com/ipfs/go-ipfs-blockstore"
cbornode "github.com/ipfs/go-ipld-cbor"
"github.com/stretchr/testify/require"
"golang.org/x/xerrors"
"github.com/ipfs/go-cid"
"github.com/stretchr/testify/require"
ds "github.com/ipfs/go-datastore"
ds_sync "github.com/ipfs/go-datastore/sync"
"github.com/ipfs/go-hamt-ipld"
bstore "github.com/ipfs/go-ipfs-blockstore"
cbornode "github.com/ipfs/go-ipld-cbor"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/go-amt-ipld/v2"
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/specs-actors/actors/abi/big"
"github.com/filecoin-project/specs-actors/actors/builtin/market"
"github.com/filecoin-project/specs-actors/actors/builtin/miner"
"github.com/filecoin-project/specs-actors/actors/crypto"
tutils "github.com/filecoin-project/specs-actors/support/testing"
"github.com/filecoin-project/lotus/chain/types"
)
var dummyCid cid.Cid
@ -104,12 +107,12 @@ func TestPredicates(t *testing.T) {
diffFn := preds.OnStorageMarketActorChanged(preds.OnDealStateChanged(preds.DealStateChangedForIDs(dealIds)))
// Diff a state against itself: expect no change
changed, _, err := diffFn(ctx, oldState, oldState)
changed, _, err := diffFn(ctx, oldState.Key(), oldState.Key())
require.NoError(t, err)
require.False(t, changed)
// Diff old state against new state
changed, val, err := diffFn(ctx, oldState, newState)
changed, val, err := diffFn(ctx, oldState.Key(), newState.Key())
require.NoError(t, err)
require.True(t, changed)
@ -130,7 +133,7 @@ func TestPredicates(t *testing.T) {
// Diff with non-existent deal.
noDeal := []abi.DealID{3}
diffNoDealFn := preds.OnStorageMarketActorChanged(preds.OnDealStateChanged(preds.DealStateChangedForIDs(noDeal)))
changed, _, err = diffNoDealFn(ctx, oldState, newState)
changed, _, err = diffNoDealFn(ctx, oldState.Key(), newState.Key())
require.NoError(t, err)
require.False(t, changed)
@ -141,7 +144,7 @@ func TestPredicates(t *testing.T) {
t.Fatal("No state change so this should not be called")
return false, nil, nil
})
changed, _, err = actorDiffFn(ctx, oldState, oldState)
changed, _, err = actorDiffFn(ctx, oldState.Key(), oldState.Key())
require.NoError(t, err)
require.False(t, changed)
@ -156,6 +159,87 @@ func TestPredicates(t *testing.T) {
require.False(t, changed)
}
func TestMinerSectorChange(t *testing.T) {
ctx := context.Background()
bs := bstore.NewBlockstore(ds_sync.MutexWrap(ds.NewMapDatastore()))
store := cbornode.NewCborStore(bs)
nextID := uint64(0)
nextIDAddrF := func() address.Address {
defer func() { nextID++ }()
return tutils.NewIDAddr(t, nextID)
}
owner, worker := nextIDAddrF(), nextIDAddrF()
si0 := newSectorOnChainInfo(0, tutils.MakeCID("0"), big.NewInt(0), abi.ChainEpoch(0), abi.ChainEpoch(10))
si1 := newSectorOnChainInfo(1, tutils.MakeCID("1"), big.NewInt(1), abi.ChainEpoch(1), abi.ChainEpoch(11))
si2 := newSectorOnChainInfo(2, tutils.MakeCID("2"), big.NewInt(2), abi.ChainEpoch(2), abi.ChainEpoch(11))
oldMinerC := createMinerState(ctx, t, store, owner, worker, []miner.SectorOnChainInfo{si0, si1, si2})
si3 := newSectorOnChainInfo(3, tutils.MakeCID("3"), big.NewInt(3), abi.ChainEpoch(3), abi.ChainEpoch(12))
// 0 delete
// 1 extend
// 2 same
// 3 added
si1Ext := si1
si1Ext.Expiration++
newMinerC := createMinerState(ctx, t, store, owner, worker, []miner.SectorOnChainInfo{si1Ext, si2, si3})
minerAddr := nextIDAddrF()
oldState, err := mockTipset(minerAddr, 1)
require.NoError(t, err)
newState, err := mockTipset(minerAddr, 2)
require.NoError(t, err)
api := newMockAPI(bs)
api.setActor(oldState.Key(), &types.Actor{Head: oldMinerC})
api.setActor(newState.Key(), &types.Actor{Head: newMinerC})
preds := NewStatePredicates(api)
minerDiffFn := preds.OnMinerActorChange(minerAddr, preds.OnMinerSectorChange())
change, val, err := minerDiffFn(ctx, oldState.Key(), newState.Key())
require.NoError(t, err)
require.True(t, change)
require.NotNil(t, val)
sectorChanges, ok := val.(*MinerSectorChanges)
require.True(t, ok)
require.Equal(t, len(sectorChanges.Added), 1)
require.Equal(t, sectorChanges.Added[0], si3)
require.Equal(t, len(sectorChanges.Removed), 1)
require.Equal(t, sectorChanges.Removed[0], si0)
require.Equal(t, len(sectorChanges.Extended), 1)
require.Equal(t, sectorChanges.Extended[0].From, si1)
require.Equal(t, sectorChanges.Extended[0].To, si1Ext)
change, val, err = minerDiffFn(ctx, oldState.Key(), oldState.Key())
require.NoError(t, err)
require.False(t, change)
require.Nil(t, val)
change, val, err = minerDiffFn(ctx, newState.Key(), oldState.Key())
require.NoError(t, err)
require.True(t, change)
require.NotNil(t, val)
sectorChanges, ok = val.(*MinerSectorChanges)
require.True(t, ok)
require.Equal(t, len(sectorChanges.Added), 1)
require.Equal(t, sectorChanges.Added[0], si0)
require.Equal(t, len(sectorChanges.Removed), 1)
require.Equal(t, sectorChanges.Removed[0], si3)
require.Equal(t, len(sectorChanges.Extended), 1)
require.Equal(t, sectorChanges.Extended[0].To, si1)
require.Equal(t, sectorChanges.Extended[0].From, si1Ext)
}
func mockTipset(miner address.Address, timestamp uint64) (*types.TipSet, error) {
return types.NewTipSet([]*types.BlockHeader{{
Miner: miner,
@ -170,7 +254,7 @@ func mockTipset(miner address.Address, timestamp uint64) (*types.TipSet, error)
}
func createMarketState(ctx context.Context, t *testing.T, store *cbornode.BasicIpldStore, deals map[abi.DealID]*market.DealState) cid.Cid {
rootCid := createAMT(ctx, t, store, deals)
rootCid := createDealAMT(ctx, t, store, deals)
state := createEmptyMarketState(t, store)
state.States = rootCid
@ -188,7 +272,7 @@ func createEmptyMarketState(t *testing.T, store *cbornode.BasicIpldStore) *marke
return market.ConstructState(emptyArrayCid, emptyMap, emptyMap)
}
func createAMT(ctx context.Context, t *testing.T, store *cbornode.BasicIpldStore, deals map[abi.DealID]*market.DealState) cid.Cid {
func createDealAMT(ctx context.Context, t *testing.T, store *cbornode.BasicIpldStore, deals map[abi.DealID]*market.DealState) cid.Cid {
root := amt.NewAMT(store)
for dealID, dealState := range deals {
err := root.Set(ctx, uint64(dealID), dealState)
@ -198,3 +282,77 @@ func createAMT(ctx context.Context, t *testing.T, store *cbornode.BasicIpldStore
require.NoError(t, err)
return rootCid
}
func createMinerState(ctx context.Context, t *testing.T, store *cbornode.BasicIpldStore, owner, worker address.Address, sectors []miner.SectorOnChainInfo) cid.Cid {
rootCid := createSectorsAMT(ctx, t, store, sectors)
state := createEmptyMinerState(ctx, t, store, owner, worker)
state.Sectors = rootCid
stateC, err := store.Put(ctx, state)
require.NoError(t, err)
return stateC
}
func createEmptyMinerState(ctx context.Context, t *testing.T, store *cbornode.BasicIpldStore, owner, worker address.Address) *miner.State {
emptyArrayCid, err := amt.NewAMT(store).Flush(context.TODO())
require.NoError(t, err)
emptyMap, err := store.Put(context.TODO(), hamt.NewNode(store, hamt.UseTreeBitWidth(5)))
require.NoError(t, err)
emptyDeadlines := miner.ConstructDeadlines()
emptyDeadlinesCid, err := store.Put(context.Background(), emptyDeadlines)
require.NoError(t, err)
minerInfo := emptyMap
state, err := miner.ConstructState(minerInfo, 123, emptyArrayCid, emptyMap, emptyDeadlinesCid)
require.NoError(t, err)
return state
}
func createSectorsAMT(ctx context.Context, t *testing.T, store *cbornode.BasicIpldStore, sectors []miner.SectorOnChainInfo) cid.Cid {
root := amt.NewAMT(store)
for _, sector := range sectors {
sector := sector
err := root.Set(ctx, uint64(sector.SectorNumber), &sector)
require.NoError(t, err)
}
rootCid, err := root.Flush(ctx)
require.NoError(t, err)
return rootCid
}
// returns a unique SectorOnChainInfo with each invocation with SectorNumber set to `sectorNo`.
func newSectorOnChainInfo(sectorNo abi.SectorNumber, sealed cid.Cid, weight big.Int, activation, expiration abi.ChainEpoch) miner.SectorOnChainInfo {
info := newSectorPreCommitInfo(sectorNo, sealed, expiration)
return miner.SectorOnChainInfo{
SectorNumber: info.SectorNumber,
SealProof: info.SealProof,
SealedCID: info.SealedCID,
DealIDs: info.DealIDs,
Expiration: info.Expiration,
Activation: activation,
DealWeight: weight,
VerifiedDealWeight: weight,
InitialPledge: big.Zero(),
}
}
const (
sectorSealRandEpochValue = abi.ChainEpoch(1)
)
// returns a unique SectorPreCommitInfo with each invocation with SectorNumber set to `sectorNo`.
func newSectorPreCommitInfo(sectorNo abi.SectorNumber, sealed cid.Cid, expiration abi.ChainEpoch) *miner.SectorPreCommitInfo {
return &miner.SectorPreCommitInfo{
SealProof: abi.RegisteredSealProof_StackedDrg32GiBV1,
SectorNumber: sectorNo,
SealedCID: sealed,
SealRandEpoch: sectorSealRandEpochValue,
DealIDs: nil,
Expiration: expiration,
}
}

View File

@ -34,11 +34,11 @@ func (me *messageEvents) CheckMsg(ctx context.Context, smsg types.ChainMsg, hnd
}
func (me *messageEvents) MatchMsg(inmsg *types.Message) MsgMatchFunc {
return func(msg *types.Message) (bool, error) {
return func(msg *types.Message) (matchOnce bool, matched bool, err error) {
if msg.From == inmsg.From && msg.Nonce == inmsg.Nonce && !inmsg.Equals(msg) {
return false, xerrors.Errorf("matching msg %s from %s, nonce %d: got duplicate origin/nonce msg %d", inmsg.Cid(), inmsg.From, inmsg.Nonce, msg.Nonce)
return true, false, xerrors.Errorf("matching msg %s from %s, nonce %d: got duplicate origin/nonce msg %d", inmsg.Cid(), inmsg.From, inmsg.Nonce, msg.Nonce)
}
return inmsg.Equals(msg), nil
return true, inmsg.Equals(msg), nil
}
}

View File

@ -21,6 +21,7 @@ import (
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/state"
"github.com/filecoin-project/lotus/chain/vm"
"github.com/filecoin-project/lotus/journal"
"github.com/filecoin-project/lotus/metrics"
"go.opencensus.io/stats"
"go.opencensus.io/trace"
@ -325,6 +326,14 @@ func (cs *ChainStore) reorgWorker(ctx context.Context, initialNotifees []ReorgNo
continue
}
journal.Add("sync", map[string]interface{}{
"op": "headChange",
"from": r.old.Key(),
"to": r.new.Key(),
"rev": len(revert),
"apply": len(apply),
})
// reverse the apply array
for i := len(apply)/2 - 1; i >= 0; i-- {
opp := len(apply) - 1 - i

View File

@ -536,7 +536,7 @@ func (syncer *Syncer) ValidateTipSet(ctx context.Context, fts *store.FullTipSet)
futures = append(futures, async.Err(func() error {
if err := syncer.ValidateBlock(ctx, b); err != nil {
if isPermanent(err) {
syncer.bad.Add(b.Cid(), BadBlockReason{Reason: err.Error()})
syncer.bad.Add(b.Cid(), NewBadBlockReason([]cid.Cid{b.Cid()}, err.Error()))
}
return xerrors.Errorf("validating block %s: %w", b.Cid(), err)
}

View File

@ -146,7 +146,7 @@ func (ss *syscallShim) VerifyConsensusFault(a, b, extra []byte) (*runtime.Consen
// (3) return if no consensus fault by now
if consensusFault == nil {
return consensusFault, nil
return nil, xerrors.Errorf("no consensus fault detected")
}
// else

View File

@ -374,6 +374,27 @@ var clientDealCmd = &cli.Command{
ref.TransferType = storagemarket.TTManual
}
// Check if the address is a verified client
dcap, err := api.StateVerifiedClientStatus(ctx, a, types.EmptyTSK)
if err != nil {
return err
}
isVerified := dcap != nil
// If the user has explicitly set the --verified-deal flag
if cctx.IsSet("verified-deal") {
// If --verified-deal is true, but the address is not a verified
// client, return an error
verifiedDealParam := cctx.Bool("verified-deal")
if verifiedDealParam && !isVerified {
return xerrors.Errorf("address %s does not have verified client status", a)
}
// Override the default
isVerified = verifiedDealParam
}
proposal, err := api.ClientStartDeal(ctx, &lapi.StartDealParams{
Data: ref,
Wallet: a,
@ -382,7 +403,7 @@ var clientDealCmd = &cli.Command{
MinBlocksDuration: uint64(dur),
DealStartEpoch: abi.ChainEpoch(cctx.Int64("start-epoch")),
FastRetrieval: cctx.Bool("fast-retrieval"),
VerifiedDeal: cctx.Bool("verified-deal"),
VerifiedDeal: isVerified,
})
if err != nil {
return err
@ -455,8 +476,8 @@ var clientRetrieveCmd = &cli.Command{
ArgsUsage: "[dataCid outputPath]",
Flags: []cli.Flag{
&cli.StringFlag{
Name: "address",
Usage: "address to use for transactions",
Name: "from",
Usage: "address to send transactions from",
},
&cli.BoolFlag{
Name: "car",
@ -481,8 +502,8 @@ var clientRetrieveCmd = &cli.Command{
ctx := ReqContext(cctx)
var payer address.Address
if cctx.String("address") != "" {
payer, err = address.NewFromString(cctx.String("address"))
if cctx.String("from") != "" {
payer, err = address.NewFromString(cctx.String("from"))
} else {
payer, err = fapi.WalletDefaultAddress(ctx)
}
@ -568,7 +589,7 @@ var clientQueryAskCmd = &cli.Command{
},
Action: func(cctx *cli.Context) error {
if cctx.NArg() != 1 {
fmt.Println("Usage: query-ask [address]")
fmt.Println("Usage: query-ask [minerAddress]")
return nil
}

View File

@ -58,7 +58,7 @@ var msigCreateCmd = &cli.Command{
Value: "0",
},
&cli.StringFlag{
Name: "sender",
Name: "from",
Usage: "account to send the create message from",
},
},
@ -85,7 +85,7 @@ var msigCreateCmd = &cli.Command{
// get the address we're going to use to create the multisig (can be one of the above, as long as they have funds)
var sendAddr address.Address
if send := cctx.String("sender"); send == "" {
if send := cctx.String("from"); send == "" {
defaddr, err := api.WalletDefaultAddress(ctx)
if err != nil {
return err

View File

@ -790,8 +790,8 @@ var stateComputeStateCmd = &cli.Command{
Usage: "Perform state computations",
Flags: []cli.Flag{
&cli.Uint64Flag{
Name: "height",
Usage: "set the height to compute state at",
Name: "vm-height",
Usage: "set the height that the vm will see",
},
&cli.BoolFlag{
Name: "apply-mpool-messages",
@ -820,7 +820,7 @@ var stateComputeStateCmd = &cli.Command{
return err
}
h := abi.ChainEpoch(cctx.Uint64("height"))
h := abi.ChainEpoch(cctx.Uint64("vm-height"))
if h == 0 {
if ts == nil {
head, err := api.ChainHead(ctx)

View File

@ -612,7 +612,7 @@ func runSeals(sb *ffiwrapper.Sealer, sbfs *basicfs.Provider, numSectors int, par
if !skipunseal {
log.Infof("[%d] Unsealing sector", i)
{
p, done, err := sbfs.AcquireSector(context.TODO(), abi.SectorID{Miner: mid, Number: 1}, stores.FTUnsealed, stores.FTNone, true)
p, done, err := sbfs.AcquireSector(context.TODO(), abi.SectorID{Miner: mid, Number: 1}, stores.FTUnsealed, stores.FTNone, stores.PathSealing)
if err != nil {
return xerrors.Errorf("acquire unsealed sector for removing: %w", err)
}

View File

@ -3,20 +3,18 @@ package main
import (
"context"
"database/sql"
"fmt"
"github.com/filecoin-project/specs-actors/actors/util/adt"
"github.com/libp2p/go-libp2p-core/peer"
"sync"
"time"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/specs-actors/actors/abi"
miner_spec "github.com/filecoin-project/specs-actors/actors/builtin/miner"
"github.com/ipfs/go-cid"
_ "github.com/lib/pq"
"github.com/libp2p/go-libp2p-core/peer"
"golang.org/x/xerrors"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/events/state"
"github.com/filecoin-project/lotus/chain/types"
)
@ -25,8 +23,7 @@ type storage struct {
headerLk sync.Mutex
// stateful miner data
minerSectors map[cid.Cid]struct{}
genesisTs *types.TipSet
}
func openStorage(dbSource string) (*storage, error) {
@ -37,10 +34,7 @@ func openStorage(dbSource string) (*storage, error) {
db.SetMaxOpenConns(1350)
ms := make(map[cid.Cid]struct{})
ms[cid.Undef] = struct{}{}
st := &storage{db: db, minerSectors: ms}
st := &storage{db: db}
return st, st.setup()
}
@ -313,6 +307,19 @@ create table if not exists miner_sectors_heads
);
create type miner_sector_event_type as enum ('ADDED', 'EXTENDED', 'EXPIRED', 'TERMINATED');
create table if not exists miner_sector_events
(
miner_id text not null,
sector_id bigint not null,
state_root text not null,
event miner_sector_event_type not null,
constraint miner_sector_events_pk
primary key (sector_id, event, miner_id, state_root)
)
/*
create or replace function miner_tips(epoch bigint)
returns table (head text,
@ -600,12 +607,6 @@ func (st *storage) storeMiners(minerTips map[types.TipSetKey][]*minerStateInfo)
return tx.Commit()
}
type minerSectorUpdate struct {
minerState *minerStateInfo
tskey types.TipSetKey
oldSector cid.Cid
}
func (st *storage) storeMinerSectorsHeads(minerTips map[types.TipSetKey][]*minerStateInfo, api api.FullNode) error {
tx, err := st.db.Begin()
if err != nil {
@ -621,26 +622,8 @@ func (st *storage) storeMinerSectorsHeads(minerTips map[types.TipSetKey][]*miner
return err
}
var updateMiners []*minerSectorUpdate
for tsk, miners := range minerTips {
for _, miners := range minerTips {
for _, miner := range miners {
sectorCID, err := st.getLatestMinerSectorCID(context.TODO(), miner.addr)
if err != nil {
panic(err)
}
if sectorCID == cid.Undef {
continue
}
if _, found := st.minerSectors[sectorCID]; !found {
// schedule miner table update
updateMiners = append(updateMiners, &minerSectorUpdate{
minerState: miner,
tskey: tsk,
oldSector: sectorCID,
})
}
st.minerSectors[sectorCID] = struct{}{}
log.Debugw("got sector CID", "miner", miner.addr, "cid", sectorCID.String())
if _, err := stmt.Exec(
miner.addr.String(),
miner.state.Sectors.String(),
@ -660,94 +643,153 @@ func (st *storage) storeMinerSectorsHeads(minerTips map[types.TipSetKey][]*miner
return xerrors.Errorf("actor put: %w", err)
}
if err := tx.Commit(); err != nil {
return tx.Commit()
}
type sectorUpdate struct {
terminationEpoch abi.ChainEpoch
terminated bool
expirationEpoch abi.ChainEpoch
sectorID abi.SectorNumber
minerID address.Address
}
func (st *storage) updateMinerSectors(minerTips map[types.TipSetKey][]*minerStateInfo, api api.FullNode) error {
log.Debugw("updating miners constant sector table", "#tipsets", len(minerTips))
pred := state.NewStatePredicates(api)
eventTx, err := st.db.Begin()
if err != nil {
return err
}
return st.updateMinerSectors(updateMiners, api)
}
type deletedSector struct {
deletedSector miner_spec.SectorOnChainInfo
miner address.Address
tskey types.TipSetKey
}
if _, err := eventTx.Exec(`create temp table mse (like miner_sector_events excluding constraints) on commit drop;`); err != nil {
return xerrors.Errorf("prep temp: %w", err)
}
func (st *storage) updateMinerSectors(miners []*minerSectorUpdate, api api.FullNode) error {
log.Info("updating miners constant sector table")
var deletedSectors []*deletedSector
for _, miner := range miners {
s := &apiIpldStore{context.TODO(), api}
newSectors, err := adt.AsArray(s, miner.minerState.state.Sectors)
if err != nil {
log.Warnw("new sectors as array", "error", err, "cid", miner.minerState.state.Sectors)
return err
eventStmt, err := eventTx.Prepare(`copy mse (sector_id, event, miner_id, state_root) from STDIN `)
if err != nil {
return err
}
var sectorUpdates []sectorUpdate
// TODO consider performing the miner sector diffing in parallel and performing the database update after.
for _, miners := range minerTips {
for _, miner := range miners {
// special case genesis miners
if miner.tsKey == st.genesisTs.Key() {
sectors, err := api.StateMinerSectors(context.TODO(), miner.addr, nil, true, miner.tsKey)
if err != nil {
log.Debugw("failed to get miner info for genesis", "miner", miner.addr.String())
continue
}
for _, sector := range sectors {
if _, err := eventStmt.Exec(sector.Info.SectorNumber, "ADDED", miner.addr.String(), miner.stateroot.String()); err != nil {
return err
}
}
} else {
sectorDiffFn := pred.OnMinerActorChange(miner.addr, pred.OnMinerSectorChange())
changed, val, err := sectorDiffFn(context.TODO(), miner.parentTsKey, miner.tsKey)
if err != nil {
log.Debugw("error getting miner sector diff", "miner", miner.addr, "error", err)
continue
}
if !changed {
continue
}
changes := val.(*state.MinerSectorChanges)
log.Debugw("sector changes for miner", "miner", miner.addr.String(), "Added", len(changes.Added), "Extended", len(changes.Extended), "Removed", len(changes.Removed), "oldState", miner.parentTsKey, "newState", miner.tsKey)
for _, extended := range changes.Extended {
if _, err := eventStmt.Exec(extended.To.SectorNumber, "EXTENDED", miner.addr.String(), miner.stateroot.String()); err != nil {
return err
}
sectorUpdates = append(sectorUpdates, sectorUpdate{
terminationEpoch: 0,
terminated: false,
expirationEpoch: extended.To.Expiration,
sectorID: extended.To.SectorNumber,
minerID: miner.addr,
})
log.Debugw("sector extended", "miner", miner.addr.String(), "sector", extended.To.SectorNumber, "old", extended.To.Expiration, "new", extended.From.Expiration)
}
curTs, err := api.ChainGetTipSet(context.TODO(), miner.tsKey)
if err != nil {
return err
}
for _, removed := range changes.Removed {
// decide if they were terminated or extended
if removed.Expiration > curTs.Height() {
if _, err := eventStmt.Exec(removed.SectorNumber, "TERMINATED", miner.addr.String(), miner.stateroot.String()); err != nil {
return err
}
log.Debugw("sector terminated", "miner", miner.addr.String(), "sector", removed.SectorNumber, "old", "sectorExpiration", removed.Expiration, "terminationEpoch", curTs.Height())
sectorUpdates = append(sectorUpdates, sectorUpdate{
terminationEpoch: curTs.Height(),
terminated: true,
expirationEpoch: removed.Expiration,
sectorID: removed.SectorNumber,
minerID: miner.addr,
})
}
if _, err := eventStmt.Exec(removed.SectorNumber, "EXPIRED", miner.addr.String(), miner.stateroot.String()); err != nil {
return err
}
log.Debugw("sector removed", "miner", miner.addr.String(), "sector", removed.SectorNumber, "old", "sectorExpiration", removed.Expiration, "currEpoch", curTs.Height())
}
for _, added := range changes.Added {
if _, err := eventStmt.Exec(miner.addr.String(), added.SectorNumber, miner.stateroot.String(), "ADDED"); err != nil {
return err
}
}
}
}
}
if err := eventStmt.Close(); err != nil {
return err
}
oldSectors, err := adt.AsArray(s, miner.oldSector)
if err != nil {
log.Warnw("old sectors as array", "error", err, "cid", miner.oldSector.String())
return err
}
if _, err := eventTx.Exec(`insert into miner_sector_events select * from mse on conflict do nothing `); err != nil {
return xerrors.Errorf("actor put: %w", err)
}
var oldSecInfo miner_spec.SectorOnChainInfo
var newSecInfo miner_spec.SectorOnChainInfo
// if we cannot find an old sector in the new list then it was removed.
if err := oldSectors.ForEach(&oldSecInfo, func(i int64) error {
found, err := newSectors.Get(uint64(oldSecInfo.SectorNumber), &newSecInfo)
if err != nil {
log.Warnw("new sectors get", "error", err)
if err := eventTx.Commit(); err != nil {
return err
}
updateTx, err := st.db.Begin()
if err != nil {
return err
}
updateStmt, err := updateTx.Prepare(`UPDATE miner_sectors SET termination_epoch=$1, expiration_epoch=$2 WHERE miner_id=$3 AND sector_id=$4`)
if err != nil {
return err
}
for _, update := range sectorUpdates {
if update.terminated {
if _, err := updateStmt.Exec(update.terminationEpoch, update.expirationEpoch, update.minerID.String(), update.sectorID); err != nil {
return err
}
if !found {
log.Infow("MINER DELETED SECTOR", "miner", miner.minerState.addr.String(), "sector", oldSecInfo.SectorNumber, "tipset", miner.tskey.String())
deletedSectors = append(deletedSectors, &deletedSector{
deletedSector: oldSecInfo,
miner: miner.minerState.addr,
tskey: miner.tskey,
})
} else {
if _, err := updateStmt.Exec(nil, update.expirationEpoch, update.minerID.String(), update.sectorID); err != nil {
return err
}
return nil
}); err != nil {
log.Warnw("old sectors foreach", "error", err)
return err
}
if len(deletedSectors) > 0 {
log.Infow("Calculated updates", "miner", miner.minerState.addr, "deleted sectors", len(deletedSectors))
}
}
// now we have all the sectors that were removed, update the database
tx, err := st.db.Begin()
if err != nil {
return err
}
stmt, err := tx.Prepare(`UPDATE miner_sectors SET termination_epoch=$1 WHERE miner_id=$2 AND sector_id=$3`)
if err != nil {
return err
}
for _, ds := range deletedSectors {
ts, err := api.ChainGetTipSet(context.TODO(), ds.tskey)
if err != nil {
log.Warnw("get tipset", "error", err)
return err
}
// TODO validate this shits right
if ts.Height() >= ds.deletedSector.Expiration {
// means it expired, do nothing
log.Infow("expired sector", "miner", ds.miner.String(), "sector", ds.deletedSector.SectorNumber)
continue
}
log.Infow("terminated sector", "miner", ds.miner.String(), "sector", ds.deletedSector.SectorNumber)
// means it was terminated.
if _, err := stmt.Exec(int64(ts.Height()), ds.miner.String(), int64(ds.deletedSector.SectorNumber)); err != nil {
return err
}
}
if err := stmt.Close(); err != nil {
if err := updateStmt.Close(); err != nil {
return err
}
defer log.Info("update miner sectors complete")
return tx.Commit()
return updateTx.Commit()
}
func (st *storage) storeHeaders(bhs map[cid.Cid]*types.BlockHeader, sync bool) error {
@ -1252,27 +1294,3 @@ func (st *storage) refreshViews() error {
func (st *storage) close() error {
return st.db.Close()
}
func (st *storage) getLatestMinerSectorCID(ctx context.Context, miner address.Address) (cid.Cid, error) {
queryStr := fmt.Sprintf(`
SELECT miner_sectors_cid
FROM miner_sectors_heads
LEFT JOIN blocks ON miner_sectors_heads.state_root = blocks.parentstateroot
WHERE miner_id = '%s'
ORDER BY blocks.height DESC
LIMIT 1;
`,
miner.String())
var cidstr string
err := st.db.QueryRowContext(ctx, queryStr).Scan(&cidstr)
switch {
case err == sql.ErrNoRows:
log.Warnf("no miner with miner_id: %s in table", miner)
return cid.Undef, nil
case err != nil:
return cid.Undef, err
default:
return cid.Decode(cidstr)
}
}

View File

@ -59,6 +59,10 @@ type minerStateInfo struct {
act types.Actor
stateroot cid.Cid
// calculating changes
tsKey types.TipSetKey
parentTsKey types.TipSetKey
// miner specific
state miner.State
info *miner.MinerInfo
@ -71,9 +75,10 @@ type minerStateInfo struct {
}
type actorInfo struct {
stateroot cid.Cid
tsKey types.TipSetKey
state string
stateroot cid.Cid
tsKey types.TipSetKey
parentTsKey types.TipSetKey
state string
}
func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types.TipSet, maxBatch int) {
@ -169,6 +174,8 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types.
if len(bh.Parents) == 0 { // genesis case
genesisTs, _ := types.NewTipSet([]*types.BlockHeader{bh})
st.genesisTs = genesisTs
aadrs, err := api.StateListActors(ctx, genesisTs.Key())
if err != nil {
log.Error(err)
@ -201,9 +208,10 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types.
actors[addr] = map[types.Actor]actorInfo{}
}
actors[addr][*act] = actorInfo{
stateroot: bh.ParentStateRoot,
tsKey: genesisTs.Key(),
state: string(state),
stateroot: bh.ParentStateRoot,
tsKey: genesisTs.Key(),
parentTsKey: genesisTs.Key(),
state: string(state),
}
addressToID[addr] = address.Undef
alk.Unlock()
@ -237,7 +245,6 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types.
}
ast, err := api.StateReadState(ctx, addr, pts.Key())
if err != nil {
log.Error(err)
return
@ -256,9 +263,10 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types.
}
// a change occurred for the actor with address `addr` and state `act` at tipset `pts`.
actors[addr][act] = actorInfo{
stateroot: bh.ParentStateRoot,
state: string(state),
tsKey: pts.Key(),
stateroot: bh.ParentStateRoot,
state: string(state),
tsKey: pts.Key(),
parentTsKey: pts.Parents(),
}
addressToID[addr] = address.Undef
alk.Unlock()
@ -314,6 +322,9 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types.
act: actor,
stateroot: c.stateroot,
tsKey: c.tsKey,
parentTsKey: c.parentTsKey,
state: miner.State{},
info: nil,
@ -430,6 +441,12 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types.
return
}
log.Info("updating miner sectors heads")
if err := st.updateMinerSectors(minerTips, api); err != nil {
log.Error(err)
return
}
log.Infof("Storing messages")
if err := st.storeMessages(msgs); err != nil {

View File

@ -186,7 +186,7 @@ func presealSector(sb *ffiwrapper.Sealer, sbfs *basicfs.Provider, sid abi.Sector
}
func presealSectorFake(sbfs *basicfs.Provider, sid abi.SectorID, spt abi.RegisteredSealProof, ssize abi.SectorSize) (*genesis.PreSeal, error) {
paths, done, err := sbfs.AcquireSector(context.TODO(), sid, 0, stores.FTSealed|stores.FTCache, true)
paths, done, err := sbfs.AcquireSector(context.TODO(), sid, 0, stores.FTSealed|stores.FTCache, stores.PathSealing)
if err != nil {
return nil, xerrors.Errorf("acquire unsealed sector: %w", err)
}

View File

@ -3,10 +3,10 @@ package main
import (
"bytes"
"fmt"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/lotus/build"
"github.com/urfave/cli/v2"
"golang.org/x/xerrors"
"github.com/filecoin-project/lotus/chain/types"
lcli "github.com/filecoin-project/lotus/cli"
@ -299,30 +299,15 @@ var verifRegCheckClientCmd = &cli.Command{
defer closer()
ctx := lcli.ReqContext(cctx)
act, err := api.StateGetActor(ctx, builtin.VerifiedRegistryActorAddr, types.EmptyTSK)
dcap, err := api.StateVerifiedClientStatus(ctx, caddr, types.EmptyTSK)
if err != nil {
return err
}
apibs := apibstore.NewAPIBlockstore(api)
cst := cbor.NewCborStore(apibs)
var st verifreg.State
if err := cst.Get(ctx, act.Head, &st); err != nil {
return err
if dcap == nil {
return xerrors.Errorf("client %s is not a verified client", err)
}
vh, err := hamt.LoadNode(ctx, cst, st.VerifiedClients)
if err != nil {
return err
}
var dcap verifreg.DataCap
if err := vh.Find(ctx, string(caddr.Bytes()), &dcap); err != nil {
return err
}
fmt.Println(dcap)
fmt.Println(*dcap)
return nil
},

View File

@ -44,7 +44,7 @@ var infoCmd = &cli.Command{
ctx := lcli.ReqContext(cctx)
maddr, err := nodeApi.ActorAddress(ctx)
maddr, err := getActorAddress(ctx, nodeApi, cctx.String("actor"))
if err != nil {
return err
}

View File

@ -1,12 +1,16 @@
package main
import (
"context"
"os"
logging "github.com/ipfs/go-log/v2"
"github.com/urfave/cli/v2"
"go.opencensus.io/trace"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build"
lcli "github.com/filecoin-project/lotus/cli"
"github.com/filecoin-project/lotus/lib/lotuslog"
@ -62,6 +66,12 @@ func main() {
Version: build.UserVersion(),
EnableBashCompletion: true,
Flags: []cli.Flag{
&cli.StringFlag{
Name: "actor",
Value: "",
Usage: "specify other actor to check state for (read only)",
Aliases: []string{"a"},
},
&cli.StringFlag{
Name: "repo",
EnvVars: []string{"LOTUS_PATH"},
@ -85,3 +95,19 @@ func main() {
os.Exit(1)
}
}
func getActorAddress(ctx context.Context, nodeAPI api.StorageMiner, overrideMaddr string) (maddr address.Address, err error) {
if overrideMaddr != "" {
maddr, err = address.NewFromString(overrideMaddr)
if err != nil {
return maddr, err
}
}
maddr, err = nodeAPI.ActorAddress(ctx)
if err != nil {
return maddr, xerrors.Errorf("getting actor address: %w", err)
}
return maddr, nil
}

View File

@ -51,9 +51,9 @@ var provingFaultsCmd = &cli.Command{
ctx := lcli.ReqContext(cctx)
maddr, err := nodeApi.ActorAddress(ctx)
maddr, err := getActorAddress(ctx, nodeApi, cctx.String("actor"))
if err != nil {
return xerrors.Errorf("getting actor address: %w", err)
return err
}
var mas miner.State
@ -120,9 +120,9 @@ var provingInfoCmd = &cli.Command{
ctx := lcli.ReqContext(cctx)
maddr, err := nodeApi.ActorAddress(ctx)
maddr, err := getActorAddress(ctx, nodeApi, cctx.String("actor"))
if err != nil {
return xerrors.Errorf("getting actor address: %w", err)
return err
}
head, err := api.ChainHead(ctx)
@ -140,11 +140,6 @@ var provingInfoCmd = &cli.Command{
return xerrors.Errorf("getting miner deadlines: %w", err)
}
curDeadlineSectors, err := deadlines.Due[cd.Index].Count()
if err != nil {
return xerrors.Errorf("counting deadline sectors: %w", err)
}
var mas miner.State
{
mact, err := api.StateGetActor(ctx, maddr, types.EmptyTSK)
@ -203,7 +198,15 @@ var provingInfoCmd = &cli.Command{
fmt.Printf("New Sectors: %d\n\n", newSectors)
fmt.Printf("Deadline Index: %d\n", cd.Index)
fmt.Printf("Deadline Sectors: %d\n", curDeadlineSectors)
if cd.Index < uint64(len(deadlines.Due)) {
curDeadlineSectors, err := deadlines.Due[cd.Index].Count()
if err != nil {
return xerrors.Errorf("counting deadline sectors: %w", err)
}
fmt.Printf("Deadline Sectors: %d\n", curDeadlineSectors)
}
fmt.Printf("Deadline Open: %s\n", epochTime(cd.CurrentEpoch, cd.Open))
fmt.Printf("Deadline Close: %s\n", epochTime(cd.CurrentEpoch, cd.Close))
fmt.Printf("Deadline Challenge: %s\n", epochTime(cd.CurrentEpoch, cd.Challenge))
@ -243,9 +246,9 @@ var provingDeadlinesCmd = &cli.Command{
ctx := lcli.ReqContext(cctx)
maddr, err := nodeApi.ActorAddress(ctx)
maddr, err := getActorAddress(ctx, nodeApi, cctx.String("actor"))
if err != nil {
return xerrors.Errorf("getting actor address: %w", err)
return err
}
deadlines, err := api.StateMinerDeadlines(ctx, maddr, types.EmptyTSK)

View File

@ -30,6 +30,7 @@ var sectorsCmd = &cli.Command{
sectorsRemoveCmd,
sectorsMarkForUpgradeCmd,
sectorsStartSealCmd,
sectorsSealDelayCmd,
},
}
@ -293,6 +294,32 @@ var sectorsStartSealCmd = &cli.Command{
},
}
var sectorsSealDelayCmd = &cli.Command{
Name: "set-seal-delay",
Usage: "Set the time, in minutes, that a new sector waits for deals before sealing starts",
ArgsUsage: "<minutes>",
Action: func(cctx *cli.Context) error {
nodeApi, closer, err := lcli.GetStorageMinerAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := lcli.ReqContext(cctx)
if cctx.Args().Len() != 1 {
return xerrors.Errorf("must pass duration in minutes")
}
hs, err := strconv.ParseUint(cctx.Args().Get(0), 10, 64)
if err != nil {
return xerrors.Errorf("could not parse sector number: %w", err)
}
delay := hs * uint64(time.Minute)
return nodeApi.SectorSetSealDelay(ctx, time.Duration(delay))
},
}
var sectorsUpdateCmd = &cli.Command{
Name: "update-state",
Usage: "ADVANCED: manually update the state of a sector, this may aid in error recovery",

View File

@ -18,11 +18,12 @@ import (
"golang.org/x/xerrors"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/sector-storage/fsutil"
"github.com/filecoin-project/sector-storage/stores"
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/lotus/chain/types"
lcli "github.com/filecoin-project/lotus/cli"
"github.com/filecoin-project/sector-storage/stores"
)
const metaFile = "sectorstore.json"
@ -145,7 +146,7 @@ var storageListCmd = &cli.Command{
type fsInfo struct {
stores.ID
sectors []stores.Decl
stat stores.FsStat
stat fsutil.FsStat
}
sorted := make([]fsInfo, 0, len(st))
@ -197,13 +198,13 @@ var storageListCmd = &cli.Command{
percCol = color.FgYellow
}
var barCols = uint64(50)
var barCols = int64(50)
set := (st.Capacity - st.Available) * barCols / st.Capacity
bar := strings.Repeat("|", int(set)) + strings.Repeat(" ", int(barCols-set))
fmt.Printf("\t[%s] %s/%s %s\n", color.New(percCol).Sprint(bar),
types.SizeStr(types.NewInt(st.Capacity-st.Available)),
types.SizeStr(types.NewInt(st.Capacity)),
types.SizeStr(types.NewInt(uint64(st.Capacity-st.Available))),
types.SizeStr(types.NewInt(uint64(st.Capacity))),
color.New(percCol).Sprintf("%d%%", usedPercent))
fmt.Printf("\t%s; %s; %s\n",
color.YellowString("Unsealed: %d", cnt[0]),

View File

@ -63,6 +63,7 @@ func serveRPC(a api.FullNode, stop node.StopFunc, addr multiaddr.Multiaddr, shut
srv := &http.Server{Handler: http.DefaultServeMux}
sigCh := make(chan os.Signal, 2)
shutdownDone := make(chan struct{})
go func() {
select {
case <-sigCh:
@ -77,10 +78,17 @@ func serveRPC(a api.FullNode, stop node.StopFunc, addr multiaddr.Multiaddr, shut
log.Errorf("graceful shutting down failed: %s", err)
}
log.Warn("Graceful shutdown successful")
_ = log.Sync() //nolint:errcheck
close(shutdownDone)
}()
signal.Notify(sigCh, syscall.SIGTERM, syscall.SIGINT)
return srv.Serve(manet.NetListener(lst))
err = srv.Serve(manet.NetListener(lst))
if err == http.ErrServerClosed {
<-shutdownDone
return nil
}
return err
}
func handleImport(a *impl.FullNodeAPI) func(w http.ResponseWriter, r *http.Request) {

15
go.mod
View File

@ -10,6 +10,7 @@ require (
github.com/Gurpartap/async v0.0.0-20180927173644-4f7f499dd9ee
github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d // indirect
github.com/coreos/go-systemd/v22 v22.0.0
github.com/dgraph-io/badger/v2 v2.0.3
github.com/docker/go-units v0.4.0
github.com/drand/drand v0.9.2-0.20200616080806-a94e9c1636a4
github.com/drand/kyber v1.1.0
@ -18,20 +19,20 @@ require (
github.com/filecoin-project/filecoin-ffi v0.26.1-0.20200508175440-05b30afeb00d
github.com/filecoin-project/go-address v0.0.2-0.20200504173055-8b6f2fb2b3ef
github.com/filecoin-project/go-amt-ipld/v2 v2.0.1-0.20200424220931-6263827e49f2
github.com/filecoin-project/go-bitfield v0.0.2-0.20200629135455-587b27927d38
github.com/filecoin-project/go-bitfield v0.0.4-0.20200703174658-f4a5758051a1
github.com/filecoin-project/go-cbor-util v0.0.0-20191219014500-08c40a1e63a2
github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03
github.com/filecoin-project/go-data-transfer v0.3.0
github.com/filecoin-project/go-data-transfer v0.4.0
github.com/filecoin-project/go-fil-commcid v0.0.0-20200208005934-2b8bd03caca5
github.com/filecoin-project/go-fil-markets v0.3.2
github.com/filecoin-project/go-fil-markets v0.4.0
github.com/filecoin-project/go-jsonrpc v0.1.1-0.20200602181149-522144ab4e24
github.com/filecoin-project/go-paramfetch v0.0.2-0.20200701152213-3e0f0afdc261
github.com/filecoin-project/go-statestore v0.1.0
github.com/filecoin-project/go-storedcounter v0.0.0-20200421200003-1c99c62e8a5b
github.com/filecoin-project/sector-storage v0.0.0-20200701092105-a2de752a3324
github.com/filecoin-project/sector-storage v0.0.0-20200708195134-e3b9ba01c287
github.com/filecoin-project/specs-actors v0.7.1
github.com/filecoin-project/specs-storage v0.1.1-0.20200622113353-88a9704877ea
github.com/filecoin-project/storage-fsm v0.0.0-20200701221241-171e0d0e4bf9
github.com/filecoin-project/storage-fsm v0.0.0-20200707194229-bc5e298e2b4c
github.com/gbrlsnchs/jwt/v3 v3.0.0-beta.1
github.com/go-kit/kit v0.10.0
github.com/go-ole/go-ole v1.2.4 // indirect
@ -47,12 +48,12 @@ require (
github.com/ipfs/go-cid v0.0.6
github.com/ipfs/go-cidutil v0.0.2
github.com/ipfs/go-datastore v0.4.4
github.com/ipfs/go-ds-badger2 v0.1.0
github.com/ipfs/go-ds-badger2 v0.1.1-0.20200708190120-187fc06f714e
github.com/ipfs/go-ds-leveldb v0.4.2
github.com/ipfs/go-ds-measure v0.1.0
github.com/ipfs/go-filestore v1.0.0
github.com/ipfs/go-fs-lock v0.0.1
github.com/ipfs/go-graphsync v0.0.6-0.20200504202014-9d5f2c26a103
github.com/ipfs/go-graphsync v0.0.6-0.20200708073926-caa872f68b2c
github.com/ipfs/go-hamt-ipld v0.1.1-0.20200605182717-0310ad2b0b1f
github.com/ipfs/go-ipfs-blockstore v1.0.0
github.com/ipfs/go-ipfs-chunker v0.0.5

32
go.sum
View File

@ -225,18 +225,18 @@ github.com/filecoin-project/go-bitfield v0.0.0-20200416002808-b3ee67ec9060/go.mo
github.com/filecoin-project/go-bitfield v0.0.1/go.mod h1:Ry9/iUlWSyjPUzlAvdnfy4Gtvrq4kWmWDztCU1yEgJY=
github.com/filecoin-project/go-bitfield v0.0.2-0.20200518150651-562fdb554b6e h1:gkG/7G+iKy4He+IiQNeQn+nndFznb/vCoOR8iRQsm60=
github.com/filecoin-project/go-bitfield v0.0.2-0.20200518150651-562fdb554b6e/go.mod h1:Ry9/iUlWSyjPUzlAvdnfy4Gtvrq4kWmWDztCU1yEgJY=
github.com/filecoin-project/go-bitfield v0.0.2-0.20200629135455-587b27927d38 h1:B2gUde2DlfCb5YMYNVems2orobxC3KhrX3migym1IOQ=
github.com/filecoin-project/go-bitfield v0.0.2-0.20200629135455-587b27927d38/go.mod h1:Ry9/iUlWSyjPUzlAvdnfy4Gtvrq4kWmWDztCU1yEgJY=
github.com/filecoin-project/go-bitfield v0.0.4-0.20200703174658-f4a5758051a1 h1:xuHlrdznafh7ul5t4xEncnA4qgpQvJZEw+mr98eqHXw=
github.com/filecoin-project/go-bitfield v0.0.4-0.20200703174658-f4a5758051a1/go.mod h1:Ry9/iUlWSyjPUzlAvdnfy4Gtvrq4kWmWDztCU1yEgJY=
github.com/filecoin-project/go-cbor-util v0.0.0-20191219014500-08c40a1e63a2 h1:av5fw6wmm58FYMgJeoB/lK9XXrgdugYiTqkdxjTy9k8=
github.com/filecoin-project/go-cbor-util v0.0.0-20191219014500-08c40a1e63a2/go.mod h1:pqTiPHobNkOVM5thSRsHYjyQfq7O5QSCMhvuu9JoDlg=
github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03 h1:2pMXdBnCiXjfCYx/hLqFxccPoqsSveQFxVLvNxy9bus=
github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03/go.mod h1:+viYnvGtUTgJRdy6oaeF4MTFKAfatX071MPDPBL11EQ=
github.com/filecoin-project/go-data-transfer v0.3.0 h1:BwBrrXu9Unh9JjjX4GAc5FfzUNioor/aATIjfc7JTBg=
github.com/filecoin-project/go-data-transfer v0.3.0/go.mod h1:cONglGP4s/d+IUQw5mWZrQK+FQATQxr3AXzi4dRh0l4=
github.com/filecoin-project/go-data-transfer v0.4.0 h1:xiC0qVZten8VtqEs5rRjyz2n/nZ8prbZSWvAr1V+CBE=
github.com/filecoin-project/go-data-transfer v0.4.0/go.mod h1:5ksROBkSREsb2O4h5vBcGMr9lXTpfeyjHo8o0yxf6FQ=
github.com/filecoin-project/go-fil-commcid v0.0.0-20200208005934-2b8bd03caca5 h1:yvQJCW9mmi9zy+51xA01Ea2X7/dL7r8eKDPuGUjRmbo=
github.com/filecoin-project/go-fil-commcid v0.0.0-20200208005934-2b8bd03caca5/go.mod h1:JbkIgFF/Z9BDlvrJO1FuKkaWsH673/UdFaiVS6uIHlA=
github.com/filecoin-project/go-fil-markets v0.3.2 h1:fvNgdTTIVtckBu61wxbKYSMJzedoFFIKYJagiCDFCiM=
github.com/filecoin-project/go-fil-markets v0.3.2/go.mod h1:e/IofcotbwH7ftgeK+TjjdjFsrCDWrh5vvnr7k1OSH8=
github.com/filecoin-project/go-fil-markets v0.4.0 h1:toDPViYyQOHtUs6jl0KB9EzgdfCxXR11dZO/rqWbFtU=
github.com/filecoin-project/go-fil-markets v0.4.0/go.mod h1:VAH6h+sWuhPAsSwAS9Kecx8MI/dIjFkrLO8jJUmLWQc=
github.com/filecoin-project/go-jsonrpc v0.1.1-0.20200602181149-522144ab4e24 h1:Jc7vkplmZYVuaEcSXGHDwefvZIdoyyaoGDLqSr8Svms=
github.com/filecoin-project/go-jsonrpc v0.1.1-0.20200602181149-522144ab4e24/go.mod h1:j6zV//WXIIY5kky873Q3iIKt/ViOE8rcijovmpxrXzM=
github.com/filecoin-project/go-padreader v0.0.0-20200210211231-548257017ca6 h1:92PET+sx1Hb4W/8CgFwGuxaKbttwY+UNspYZTvXY0vs=
@ -247,8 +247,8 @@ github.com/filecoin-project/go-paramfetch v0.0.2-0.20200701152213-3e0f0afdc261 h
github.com/filecoin-project/go-paramfetch v0.0.2-0.20200701152213-3e0f0afdc261/go.mod h1:fZzmf4tftbwf9S37XRifoJlz7nCjRdIrMGLR07dKLCc=
github.com/filecoin-project/go-statemachine v0.0.0-20200226041606-2074af6d51d9 h1:k9qVR9ItcziSB2rxtlkN/MDWNlbsI6yzec+zjUatLW0=
github.com/filecoin-project/go-statemachine v0.0.0-20200226041606-2074af6d51d9/go.mod h1:FGwQgZAt2Gh5mjlwJUlVB62JeYdo+if0xWxSEfBD9ig=
github.com/filecoin-project/go-statemachine v0.0.0-20200619205156-c7bf525c06ef h1:qFXGHKLq49qFmvXjvhvQ2eU3jVk2Z0QaKYQpO5S3SF0=
github.com/filecoin-project/go-statemachine v0.0.0-20200619205156-c7bf525c06ef/go.mod h1:FGwQgZAt2Gh5mjlwJUlVB62JeYdo+if0xWxSEfBD9ig=
github.com/filecoin-project/go-statemachine v0.0.0-20200703171610-a74a697973b9 h1:NagIOq5osclBprc95ILEnGCOpubuhalqwWvayYJmXLQ=
github.com/filecoin-project/go-statemachine v0.0.0-20200703171610-a74a697973b9/go.mod h1:FGwQgZAt2Gh5mjlwJUlVB62JeYdo+if0xWxSEfBD9ig=
github.com/filecoin-project/go-statestore v0.1.0 h1:t56reH59843TwXHkMcwyuayStBIiWBRilQjQ+5IiwdQ=
github.com/filecoin-project/go-statestore v0.1.0/go.mod h1:LFc9hD+fRxPqiHiaqUEZOinUJB4WARkRfNl10O7kTnI=
github.com/filecoin-project/go-storedcounter v0.0.0-20200421200003-1c99c62e8a5b h1:fkRZSPrYpk42PV3/lIXiL0LHetxde7vyYYvSsttQtfg=
@ -256,20 +256,21 @@ github.com/filecoin-project/go-storedcounter v0.0.0-20200421200003-1c99c62e8a5b/
github.com/filecoin-project/sector-storage v0.0.0-20200615154852-728a47ab99d6/go.mod h1:M59QnAeA/oV+Z8oHFLoNpGMv0LZ8Rll+vHVXX7GirPM=
github.com/filecoin-project/sector-storage v0.0.0-20200625154333-98ef8e4ef246 h1:NfYQRmVRe0LzlNbK5Ket3vbBOwFD5TvtcNtfo/Sd8mg=
github.com/filecoin-project/sector-storage v0.0.0-20200625154333-98ef8e4ef246/go.mod h1:8f0hWDzzIi1hKs4IVKH9RnDsO4LEHVz8BNat0okDOuY=
github.com/filecoin-project/sector-storage v0.0.0-20200701092105-a2de752a3324 h1:MmxTkkhQMGWH3fr4BPpGoFQocG1dTvAAbkL3VEaZcsY=
github.com/filecoin-project/sector-storage v0.0.0-20200701092105-a2de752a3324/go.mod h1:r12d7tsmJKz8QDGoCvl65Ay2al6mOgDqxAGUxbyrgMs=
github.com/filecoin-project/sector-storage v0.0.0-20200708195134-e3b9ba01c287 h1:lMzTlms23AnWKx7+Bq0nMqSCnuE1tcMkMwtT8zfS7gE=
github.com/filecoin-project/sector-storage v0.0.0-20200708195134-e3b9ba01c287/go.mod h1:salgVdX7qeXFo/xaiEQE29J4pPkjn71T0kt0n+VDBzo=
github.com/filecoin-project/specs-actors v0.0.0-20200210130641-2d1fbd8672cf/go.mod h1:xtDZUB6pe4Pksa/bAJbJ693OilaC5Wbot9jMhLm3cZA=
github.com/filecoin-project/specs-actors v0.3.0/go.mod h1:nQYnFbQ7Y0bHZyq6HDEuVlCPR+U3z5Q3wMOQ+2aiV+Y=
github.com/filecoin-project/specs-actors v0.6.0/go.mod h1:dRdy3cURykh2R8O/DKqy8olScl70rmIS7GrB4hB1IDY=
github.com/filecoin-project/specs-actors v0.6.1/go.mod h1:dRdy3cURykh2R8O/DKqy8olScl70rmIS7GrB4hB1IDY=
github.com/filecoin-project/specs-actors v0.7.0/go.mod h1:+z0htZu/wLBDbOLcQTKKUEC2rkUTFzL2KJ/bRAVWkws=
github.com/filecoin-project/specs-actors v0.7.1 h1:/zW++MN4gGIPvG+s0zmSI97k0Z/aaeiREjLC10gQbco=
github.com/filecoin-project/specs-actors v0.7.1/go.mod h1:+z0htZu/wLBDbOLcQTKKUEC2rkUTFzL2KJ/bRAVWkws=
github.com/filecoin-project/specs-storage v0.1.0 h1:PkDgTOT5W5Ao7752onjDl4QSv+sgOVdJbvFjOnD5w94=
github.com/filecoin-project/specs-storage v0.1.0/go.mod h1:Pr5ntAaxsh+sLG/LYiL4tKzvA83Vk5vLODYhfNwOg7k=
github.com/filecoin-project/specs-storage v0.1.1-0.20200622113353-88a9704877ea h1:iixjULRQFPn7Q9KlIqfwLJnlAXO10bbkI+xy5GKGdLY=
github.com/filecoin-project/specs-storage v0.1.1-0.20200622113353-88a9704877ea/go.mod h1:Pr5ntAaxsh+sLG/LYiL4tKzvA83Vk5vLODYhfNwOg7k=
github.com/filecoin-project/storage-fsm v0.0.0-20200701221241-171e0d0e4bf9 h1:X6TkCA+aT0TJxjL8S8agEVjqHBVgIe9WrvdHlYcNW3M=
github.com/filecoin-project/storage-fsm v0.0.0-20200701221241-171e0d0e4bf9/go.mod h1:SXO4VnXG056B/lXHL8HZv54eMqlsyynm+v93BlLwlOY=
github.com/filecoin-project/storage-fsm v0.0.0-20200707194229-bc5e298e2b4c h1:F6guH363a+fpew1zkgoez4/U0RqW4ph6GVXR23lVwng=
github.com/filecoin-project/storage-fsm v0.0.0-20200707194229-bc5e298e2b4c/go.mod h1:SXO4VnXG056B/lXHL8HZv54eMqlsyynm+v93BlLwlOY=
github.com/flynn/go-shlex v0.0.0-20150515145356-3f9db97f8568/go.mod h1:xEzjJPgXI435gkrCt3MPfRiAkVrwSbHsst4LCFVfpJc=
github.com/francoispqt/gojay v1.2.13 h1:d2m3sFjloqoIUQU3TsHBgj6qg/BVGlTBeHDUmyJnXKk=
github.com/francoispqt/gojay v1.2.13/go.mod h1:ehT5mTG4ua4581f1++1WLG0vPdaA9HaiDsoyrBGkyDY=
@ -504,6 +505,8 @@ github.com/ipfs/go-ds-badger v0.2.3 h1:J27YvAcpuA5IvZUbeBxOcQgqnYHUPxoygc6Qxxkod
github.com/ipfs/go-ds-badger v0.2.3/go.mod h1:pEYw0rgg3FIrywKKnL+Snr+w/LjJZVMTBRn4FS6UHUk=
github.com/ipfs/go-ds-badger2 v0.1.0 h1:784py6lXkwlVF+K6XSuqmdMgy5l8GI6k60ngBokb9Fg=
github.com/ipfs/go-ds-badger2 v0.1.0/go.mod h1:pbR1p817OZbdId9EvLOhKBgUVTM3BMCSTan78lDDVaw=
github.com/ipfs/go-ds-badger2 v0.1.1-0.20200708190120-187fc06f714e h1:Xi1nil8K2lBOorBS6Ys7+hmUCzH8fr3U9ipdL/IrcEI=
github.com/ipfs/go-ds-badger2 v0.1.1-0.20200708190120-187fc06f714e/go.mod h1:lJnws7amT9Ehqzta0gwMrRsURU04caT0iRPr1W8AsOU=
github.com/ipfs/go-ds-leveldb v0.0.1/go.mod h1:feO8V3kubwsEF22n0YRQCffeb79OOYIykR4L04tMOYc=
github.com/ipfs/go-ds-leveldb v0.1.0/go.mod h1:hqAW8y4bwX5LWcCtku2rFNX3vjDZCy5LZCg+cSZvYb8=
github.com/ipfs/go-ds-leveldb v0.4.1/go.mod h1:jpbku/YqBSsBc1qgME8BkWS4AxzF2cEu1Ii2r79Hh9s=
@ -515,8 +518,8 @@ github.com/ipfs/go-filestore v1.0.0 h1:QR7ekKH+q2AGiWDc7W2Q0qHuYSRZGUJqUn0GsegEP
github.com/ipfs/go-filestore v1.0.0/go.mod h1:/XOCuNtIe2f1YPbiXdYvD0BKLA0JR1MgPiFOdcuu9SM=
github.com/ipfs/go-fs-lock v0.0.1 h1:XHX8uW4jQBYWHj59XXcjg7BHlHxV9ZOYs6Y43yb7/l0=
github.com/ipfs/go-fs-lock v0.0.1/go.mod h1:DNBekbboPKcxs1aukPSaOtFA3QfSdi5C855v0i9XJ8Y=
github.com/ipfs/go-graphsync v0.0.6-0.20200504202014-9d5f2c26a103 h1:SD+bXod/pOWKJCGj0tG140ht8Us5k+3JBcHw0PVYTho=
github.com/ipfs/go-graphsync v0.0.6-0.20200504202014-9d5f2c26a103/go.mod h1:jMXfqIEDFukLPZHqDPp8tJMbHO9Rmeb9CEGevngQbmE=
github.com/ipfs/go-graphsync v0.0.6-0.20200708073926-caa872f68b2c h1:fCW8JzwvBMfODvdliK+s3ziYZPD/5FAzluahZYXVg3k=
github.com/ipfs/go-graphsync v0.0.6-0.20200708073926-caa872f68b2c/go.mod h1:jMXfqIEDFukLPZHqDPp8tJMbHO9Rmeb9CEGevngQbmE=
github.com/ipfs/go-hamt-ipld v0.0.15-0.20200131012125-dd88a59d3f2e/go.mod h1:9aQJu/i/TaRDW6jqB5U217dLIDopn50wxLdHXM2CTfE=
github.com/ipfs/go-hamt-ipld v0.0.15-0.20200204200533-99b8553ef242/go.mod h1:kq3Pi+UP3oHhAdKexE+kHHYRKMoFNuGero0R7q3hWGg=
github.com/ipfs/go-hamt-ipld v0.1.1-0.20200501020327-d53d20a7063e h1:Klv6s+kbuhh0JVpGFmFK2t6AtZxJfAnVneQHh1DlFOo=
@ -1478,6 +1481,7 @@ golang.org/x/exp v0.0.0-20191129062945-2f5052295587/go.mod h1:2RIsYlXP63K8oxa1u0
golang.org/x/exp v0.0.0-20191227195350-da58074b4299/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4=
golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4=
golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM=
golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6 h1:QE6XYQK6naiK1EPAe1g/ILLxN5RBoH5xkJk3CqlMI/Y=
golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU=
golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js=
golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0=

146
journal/journal.go Normal file
View File

@ -0,0 +1,146 @@
package journal
import (
"encoding/json"
"fmt"
"os"
"path/filepath"
"sync"
"time"
logging "github.com/ipfs/go-log"
"golang.org/x/xerrors"
)
func InitializeSystemJournal(dir string) error {
if err := os.MkdirAll(dir, 0755); err != nil {
return err
}
j, err := OpenFSJournal(dir)
if err != nil {
return err
}
currentJournal = j
return nil
}
func Add(sys string, val interface{}) {
if currentJournal == nil {
log.Warn("no journal configured")
return
}
currentJournal.AddEntry(sys, val)
}
var log = logging.Logger("journal")
var currentJournal Journal
type Journal interface {
AddEntry(system string, obj interface{})
Close() error
}
// fsJournal is a basic journal backed by files on a filesystem
type fsJournal struct {
fi *os.File
fSize int64
lk sync.Mutex
journalDir string
incoming chan *JournalEntry
journalSizeLimit int64
closing chan struct{}
}
func OpenFSJournal(dir string) (*fsJournal, error) {
fsj := &fsJournal{
journalDir: dir,
incoming: make(chan *JournalEntry, 32),
journalSizeLimit: 1 << 30,
closing: make(chan struct{}),
}
if err := fsj.rollJournalFile(); err != nil {
return nil, err
}
go fsj.runLoop()
return fsj, nil
}
type JournalEntry struct {
System string
Timestamp time.Time
Val interface{}
}
func (fsj *fsJournal) putEntry(je *JournalEntry) error {
b, err := json.Marshal(je)
if err != nil {
return err
}
n, err := fsj.fi.Write(append(b, '\n'))
if err != nil {
return err
}
fsj.fSize += int64(n)
if fsj.fSize >= fsj.journalSizeLimit {
fsj.rollJournalFile()
}
return nil
}
func (fsj *fsJournal) rollJournalFile() error {
if fsj.fi != nil {
fsj.fi.Close()
}
nfi, err := os.Create(filepath.Join(fsj.journalDir, fmt.Sprintf("lotus-journal-%s.ndjson", time.Now().Format(time.RFC3339))))
if err != nil {
return xerrors.Errorf("failed to open journal file: %w", err)
}
fsj.fi = nfi
fsj.fSize = 0
return nil
}
func (fsj *fsJournal) runLoop() {
for {
select {
case je := <-fsj.incoming:
if err := fsj.putEntry(je); err != nil {
log.Errorw("failed to write out journal entry", "entry", je, "err", err)
}
case <-fsj.closing:
fsj.fi.Close()
return
}
}
}
func (fsj *fsJournal) AddEntry(system string, obj interface{}) {
je := &JournalEntry{
System: system,
Timestamp: time.Now(),
Val: obj,
}
select {
case fsj.incoming <- je:
case <-fsj.closing:
log.Warnw("journal closed but tried to log event", "entry", je)
}
}
func (fsj *fsJournal) Close() error {
close(fsj.closing)
return nil
}

View File

@ -6,7 +6,6 @@ import (
"bytes"
"context"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/events/state"
"golang.org/x/xerrors"
@ -292,44 +291,44 @@ func (c *ClientNodeAdapter) OnDealSectorCommitted(ctx context.Context, provider
var sectorNumber abi.SectorNumber
var sectorFound bool
matchEvent := func(msg *types.Message) (bool, error) {
matchEvent := func(msg *types.Message) (matchOnce bool, matched bool, err error) {
if msg.To != provider {
return false, nil
return true, false, nil
}
switch msg.Method {
case builtin.MethodsMiner.PreCommitSector:
var params miner.SectorPreCommitInfo
if err := params.UnmarshalCBOR(bytes.NewReader(msg.Params)); err != nil {
return false, xerrors.Errorf("unmarshal pre commit: %w", err)
return true, false, xerrors.Errorf("unmarshal pre commit: %w", err)
}
for _, did := range params.DealIDs {
if did == abi.DealID(dealId) {
sectorNumber = params.SectorNumber
sectorFound = true
return false, nil
return true, false, nil
}
}
return false, nil
return true, false, nil
case builtin.MethodsMiner.ProveCommitSector:
var params miner.ProveCommitSectorParams
if err := params.UnmarshalCBOR(bytes.NewReader(msg.Params)); err != nil {
return false, xerrors.Errorf("failed to unmarshal prove commit sector params: %w", err)
return true, false, xerrors.Errorf("failed to unmarshal prove commit sector params: %w", err)
}
if !sectorFound {
return false, nil
return true, false, nil
}
if params.SectorNumber != sectorNumber {
return false, nil
return true, false, nil
}
return true, nil
return false, true, nil
default:
return false, nil
return true, false, nil
}
}
@ -341,16 +340,18 @@ func (c *ClientNodeAdapter) OnDealSectorCommitted(ctx context.Context, provider
}
func (c *ClientNodeAdapter) OnDealExpiredOrSlashed(ctx context.Context, dealID abi.DealID, onDealExpired storagemarket.DealExpiredCallback, onDealSlashed storagemarket.DealSlashedCallback) error {
var sd *api.MarketDeal
head, err := c.ChainHead(ctx)
if err != nil {
return xerrors.Errorf("client: failed to get chain head: %w", err)
}
sd, err := c.StateMarketStorageDeal(ctx, dealID, head.Key())
if err != nil {
return xerrors.Errorf("client: failed to look up deal %d on chain: %w", dealID, err)
}
// Called immediately to check if the deal has already expired or been slashed
checkFunc := func(ts *types.TipSet) (done bool, more bool, err error) {
sd, err = stmgr.GetStorageDeal(ctx, c.StateManager, dealID, ts)
if err != nil {
return false, false, xerrors.Errorf("client: failed to look up deal on chain: %w", err)
}
// Check if the deal has already expired
if sd.Proposal.EndEpoch <= ts.Height() {
onDealExpired(nil)
@ -371,17 +372,18 @@ func (c *ClientNodeAdapter) OnDealExpiredOrSlashed(ctx context.Context, dealID a
// Called when there was a match against the state change we're looking for
// and the chain has advanced to the confidence height
stateChanged := func(ts *types.TipSet, ts2 *types.TipSet, states events.StateChange, h abi.ChainEpoch) (more bool, err error) {
if states == nil {
log.Error("timed out waiting for deal expiry")
return false, nil
}
// Check if the deal has already expired
if sd.Proposal.EndEpoch <= ts2.Height() {
onDealExpired(nil)
return false, nil
}
// Timeout waiting for state change
if states == nil {
log.Error("timed out waiting for deal expiry")
return false, nil
}
changedDeals, ok := states.(state.ChangedDeals)
if !ok {
panic("Expected state.ChangedDeals")
@ -415,9 +417,12 @@ func (c *ClientNodeAdapter) OnDealExpiredOrSlashed(ctx context.Context, dealID a
preds.OnDealStateChanged(
preds.DealStateChangedForIDs([]abi.DealID{dealID})))
match := func(oldTs, newTs *types.TipSet) (bool, events.StateChange, error) {
return dealDiff(ctx, oldTs, newTs)
return dealDiff(ctx, oldTs.Key(), newTs.Key())
}
if err := c.ev.StateChanged(checkFunc, stateChanged, revert, int(build.MessageConfidence)+1, build.SealRandomnessLookbackLimit, match); err != nil {
// Wait until after the end epoch for the deal and then timeout
timeout := (sd.Proposal.EndEpoch - head.Height()) + 1
if err := c.ev.StateChanged(checkFunc, stateChanged, revert, int(build.MessageConfidence)+1, timeout, match); err != nil {
return xerrors.Errorf("failed to set up state changed handler: %w", err)
}

View File

@ -18,6 +18,7 @@ import (
"github.com/filecoin-project/specs-actors/actors/builtin"
"github.com/filecoin-project/specs-actors/actors/builtin/market"
"github.com/filecoin-project/specs-actors/actors/builtin/miner"
"github.com/filecoin-project/specs-actors/actors/builtin/verifreg"
"github.com/filecoin-project/specs-actors/actors/crypto"
"github.com/filecoin-project/specs-actors/actors/runtime/exitcode"
@ -25,6 +26,7 @@ import (
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/actors"
"github.com/filecoin-project/lotus/chain/events"
"github.com/filecoin-project/lotus/chain/events/state"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/lib/sigs"
"github.com/filecoin-project/lotus/markets/utils"
@ -33,7 +35,7 @@ import (
sealing "github.com/filecoin-project/storage-fsm"
)
var log = logging.Logger("provideradapter")
var log = logging.Logger("storageadapter")
type ProviderNodeAdapter struct {
api.FullNode
@ -93,6 +95,7 @@ func (n *ProviderNodeAdapter) OnDealComplete(ctx context.Context, deal storagema
StartEpoch: deal.ClientDealProposal.Proposal.StartEpoch,
EndEpoch: deal.ClientDealProposal.Proposal.EndEpoch,
},
KeepUnsealed: deal.FastRetrieval,
})
if err != nil {
return xerrors.Errorf("AddPiece failed: %s", err)
@ -279,44 +282,44 @@ func (n *ProviderNodeAdapter) OnDealSectorCommitted(ctx context.Context, provide
var sectorNumber abi.SectorNumber
var sectorFound bool
matchEvent := func(msg *types.Message) (bool, error) {
matchEvent := func(msg *types.Message) (matchOnce bool, matched bool, err error) {
if msg.To != provider {
return false, nil
return true, false, nil
}
switch msg.Method {
case builtin.MethodsMiner.PreCommitSector:
var params miner.SectorPreCommitInfo
if err := params.UnmarshalCBOR(bytes.NewReader(msg.Params)); err != nil {
return false, xerrors.Errorf("unmarshal pre commit: %w", err)
return true, false, xerrors.Errorf("unmarshal pre commit: %w", err)
}
for _, did := range params.DealIDs {
if did == abi.DealID(dealID) {
sectorNumber = params.SectorNumber
sectorFound = true
return false, nil
return true, false, nil
}
}
return false, nil
return true, false, nil
case builtin.MethodsMiner.ProveCommitSector:
var params miner.ProveCommitSectorParams
if err := params.UnmarshalCBOR(bytes.NewReader(msg.Params)); err != nil {
return false, xerrors.Errorf("failed to unmarshal prove commit sector params: %w", err)
return true, false, xerrors.Errorf("failed to unmarshal prove commit sector params: %w", err)
}
if !sectorFound {
return false, nil
return true, false, nil
}
if params.SectorNumber != sectorNumber {
return false, nil
return true, false, nil
}
return true, nil
return false, true, nil
default:
return false, nil
return true, false, nil
}
}
@ -345,4 +348,102 @@ func (n *ProviderNodeAdapter) WaitForMessage(ctx context.Context, mcid cid.Cid,
return cb(receipt.Receipt.ExitCode, receipt.Receipt.Return, nil)
}
func (n *ProviderNodeAdapter) GetDataCap(ctx context.Context, addr address.Address, encodedTs shared.TipSetToken) (*verifreg.DataCap, error) {
tsk, err := types.TipSetKeyFromBytes(encodedTs)
if err != nil {
return nil, err
}
return n.StateVerifiedClientStatus(ctx, addr, tsk)
}
func (n *ProviderNodeAdapter) OnDealExpiredOrSlashed(ctx context.Context, dealID abi.DealID, onDealExpired storagemarket.DealExpiredCallback, onDealSlashed storagemarket.DealSlashedCallback) error {
head, err := n.ChainHead(ctx)
if err != nil {
return xerrors.Errorf("client: failed to get chain head: %w", err)
}
sd, err := n.StateMarketStorageDeal(ctx, dealID, head.Key())
if err != nil {
return xerrors.Errorf("client: failed to look up deal %d on chain: %w", dealID, err)
}
// Called immediately to check if the deal has already expired or been slashed
checkFunc := func(ts *types.TipSet) (done bool, more bool, err error) {
// Check if the deal has already expired
if sd.Proposal.EndEpoch <= ts.Height() {
onDealExpired(nil)
return true, false, nil
}
// If there is no deal assume it's already been slashed
if sd.State.SectorStartEpoch < 0 {
onDealSlashed(ts.Height(), nil)
return true, false, nil
}
// No events have occurred yet, so return
// done: false, more: true (keep listening for events)
return false, true, nil
}
// Called when there was a match against the state change we're looking for
// and the chain has advanced to the confidence height
stateChanged := func(ts *types.TipSet, ts2 *types.TipSet, states events.StateChange, h abi.ChainEpoch) (more bool, err error) {
// Check if the deal has already expired
if sd.Proposal.EndEpoch <= ts2.Height() {
onDealExpired(nil)
return false, nil
}
// Timeout waiting for state change
if states == nil {
log.Error("timed out waiting for deal expiry")
return false, nil
}
changedDeals, ok := states.(state.ChangedDeals)
if !ok {
panic("Expected state.ChangedDeals")
}
deal, ok := changedDeals[dealID]
if !ok {
// No change to deal
return true, nil
}
// Deal was slashed
if deal.To == nil {
onDealSlashed(ts2.Height(), nil)
return false, nil
}
return true, nil
}
// Called when there was a chain reorg and the state change was reverted
revert := func(ctx context.Context, ts *types.TipSet) error {
// TODO: Is it ok to just ignore this?
log.Warn("deal state reverted; TODO: actually handle this!")
return nil
}
// Watch for state changes to the deal
preds := state.NewStatePredicates(n)
dealDiff := preds.OnStorageMarketActorChanged(
preds.OnDealStateChanged(
preds.DealStateChangedForIDs([]abi.DealID{dealID})))
match := func(oldTs, newTs *types.TipSet) (bool, events.StateChange, error) {
return dealDiff(ctx, oldTs.Key(), newTs.Key())
}
// Wait until after the end epoch for the deal and then timeout
timeout := (sd.Proposal.EndEpoch - head.Height()) + 1
if err := n.ev.StateChanged(checkFunc, stateChanged, revert, int(build.MessageConfidence)+1, timeout, match); err != nil {
return xerrors.Errorf("failed to set up state changed handler: %w", err)
}
return nil
}
var _ storagemarket.StorageProviderNode = &ProviderNodeAdapter{}

View File

@ -119,6 +119,7 @@ const (
ExtractApiKey
HeadMetricsKey
RunPeerTaggerKey
JournalKey
SetApiEndpointKey
@ -150,6 +151,7 @@ func defaults() []Option {
Override(new(record.Validator), modules.RecordValidator),
Override(new(dtypes.Bootstrapper), dtypes.Bootstrapper(false)),
Override(new(dtypes.ShutdownChan), make(chan struct{})),
Override(JournalKey, modules.SetupJournal),
// Filecoin modules
@ -322,6 +324,8 @@ func Online() Option {
Override(new(dtypes.SetConsiderOfflineStorageDealsConfigFunc), modules.NewSetConsideringOfflineStorageDealsFunc),
Override(new(dtypes.ConsiderOfflineRetrievalDealsConfigFunc), modules.NewConsiderOfflineRetrievalDealsConfigFunc),
Override(new(dtypes.SetConsiderOfflineRetrievalDealsConfigFunc), modules.NewSetConsiderOfflineRetrievalDealsConfigFunc),
Override(new(dtypes.SetSealingDelayFunc), modules.NewSetSealDelayFunc),
Override(new(dtypes.GetSealingDelayFunc), modules.NewGetSealDelayFunc),
),
)
}

View File

@ -29,8 +29,9 @@ type FullNode struct {
type StorageMiner struct {
Common
Dealmaking DealmakingConfig
Storage sectorstorage.SealerConfig
Dealmaking DealmakingConfig
Storage sectorstorage.SealerConfig
SealingDelay Duration
}
type DealmakingConfig struct {
@ -132,6 +133,8 @@ func DefaultStorageMiner() *StorageMiner {
ConsiderOfflineRetrievalDeals: true,
PieceCidBlocklist: []cid.Cid{},
},
SealingDelay: Duration(time.Hour),
}
cfg.Common.API.ListenAddress = "/ip4/127.0.0.1/tcp/2345/http"
cfg.Common.API.RemoteListenAddress = "127.0.0.1:2345"

View File

@ -107,7 +107,7 @@ func (a *API) ClientStartDeal(ctx context.Context, params *api.StartDealParams)
md, err := a.StateMinerProvingDeadline(ctx, params.Miner, types.EmptyTSK)
if err != nil {
return nil, xerrors.Errorf("failed getting peer ID: %w", err)
return nil, xerrors.Errorf("failed getting miner's deadline info: %w", err)
}
rt, err := ffiwrapper.SealProofTypeFromSectorSize(mi.SectorSize)

View File

@ -24,6 +24,7 @@ import (
samsig "github.com/filecoin-project/specs-actors/actors/builtin/multisig"
"github.com/filecoin-project/specs-actors/actors/builtin/power"
"github.com/filecoin-project/specs-actors/actors/builtin/reward"
"github.com/filecoin-project/specs-actors/actors/builtin/verifreg"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/actors"
@ -832,3 +833,35 @@ func (a *StateAPI) StateMinerAvailableBalance(ctx context.Context, maddr address
return types.BigAdd(st.GetAvailableBalance(act.Balance), vested), nil
}
// StateVerifiedClientStatus returns the data cap for the given address.
// Returns nil if there is no entry in the data cap table for the
// address.
func (a *StateAPI) StateVerifiedClientStatus(ctx context.Context, addr address.Address, tsk types.TipSetKey) (*verifreg.DataCap, error) {
act, err := a.StateGetActor(ctx, builtin.VerifiedRegistryActorAddr, tsk)
if err != nil {
return nil, err
}
cst := cbor.NewCborStore(a.StateManager.ChainStore().Blockstore())
var st verifreg.State
if err := cst.Get(ctx, act.Head, &st); err != nil {
return nil, err
}
vh, err := hamt.LoadNode(ctx, cst, st.VerifiedClients)
if err != nil {
return nil, err
}
var dcap verifreg.DataCap
if err := vh.Find(ctx, string(addr.Bytes()), &dcap); err != nil {
if err == hamt.ErrNotFound {
return nil, nil
}
return nil, err
}
return &dcap, nil
}

View File

@ -3,12 +3,13 @@ package impl
import (
"context"
"encoding/json"
"github.com/filecoin-project/sector-storage/fsutil"
"github.com/ipfs/go-cid"
"golang.org/x/xerrors"
"net/http"
"os"
"strconv"
"github.com/ipfs/go-cid"
"golang.org/x/xerrors"
"time"
"github.com/filecoin-project/go-address"
storagemarket "github.com/filecoin-project/go-fil-markets/storagemarket"
@ -53,6 +54,8 @@ type StorageMinerAPI struct {
SetConsiderOfflineStorageDealsConfigFunc dtypes.SetConsiderOfflineStorageDealsConfigFunc
ConsiderOfflineRetrievalDealsConfigFunc dtypes.ConsiderOfflineRetrievalDealsConfigFunc
SetConsiderOfflineRetrievalDealsConfigFunc dtypes.SetConsiderOfflineRetrievalDealsConfigFunc
SetSealingDelayFunc dtypes.SetSealingDelayFunc
GetSealingDelayFunc dtypes.GetSealingDelayFunc
}
func (sm *StorageMinerAPI) ServeRemote(w http.ResponseWriter, r *http.Request) {
@ -173,7 +176,7 @@ func (sm *StorageMinerAPI) SectorsRefs(context.Context) (map[string][]api.Sealed
return out, nil
}
func (sm *StorageMinerAPI) StorageStat(ctx context.Context, id stores.ID) (stores.FsStat, error) {
func (sm *StorageMinerAPI) StorageStat(ctx context.Context, id stores.ID) (fsutil.FsStat, error) {
return sm.StorageMgr.FsStat(ctx, id)
}
@ -181,6 +184,14 @@ func (sm *StorageMinerAPI) SectorStartSealing(ctx context.Context, number abi.Se
return sm.Miner.StartPackingSector(number)
}
func (sm *StorageMinerAPI) SectorSetSealDelay(ctx context.Context, delay time.Duration) error {
return sm.SetSealingDelayFunc(delay)
}
func (sm *StorageMinerAPI) SectorGetSealDelay(ctx context.Context) (time.Duration, error) {
return sm.GetSealingDelayFunc()
}
func (sm *StorageMinerAPI) SectorsUpdate(ctx context.Context, id abi.SectorNumber, state api.SectorState) error {
return sm.Miner.ForceSectorState(ctx, id, sealing.SectorState(state))
}

View File

@ -3,11 +3,15 @@ package modules
import (
"context"
"github.com/filecoin-project/lotus/lib/bufbstore"
"time"
blockstore "github.com/ipfs/go-ipfs-blockstore"
"github.com/libp2p/go-libp2p-core/host"
"go.uber.org/fx"
graphsyncimpl "github.com/filecoin-project/go-data-transfer/impl/graphsync"
dtimpl "github.com/filecoin-project/go-data-transfer/impl"
dtnet "github.com/filecoin-project/go-data-transfer/network"
dtgstransport "github.com/filecoin-project/go-data-transfer/transport/graphsync"
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
"github.com/filecoin-project/go-fil-markets/retrievalmarket/discovery"
retrievalimpl "github.com/filecoin-project/go-fil-markets/retrievalmarket/impl"
@ -69,9 +73,26 @@ func RegisterClientValidator(crv dtypes.ClientRequestValidator, dtm dtypes.Clien
// NewClientGraphsyncDataTransfer returns a data transfer manager that just
// uses the clients's Client DAG service for transfers
func NewClientGraphsyncDataTransfer(h host.Host, gs dtypes.Graphsync, ds dtypes.MetadataDS) dtypes.ClientDataTransfer {
func NewClientGraphsyncDataTransfer(lc fx.Lifecycle, h host.Host, gs dtypes.Graphsync, ds dtypes.MetadataDS) (dtypes.ClientDataTransfer, error) {
sc := storedcounter.New(ds, datastore.NewKey("/datatransfer/client/counter"))
return graphsyncimpl.NewGraphSyncDataTransfer(h, gs, sc)
net := dtnet.NewFromLibp2pHost(h)
dtDs := namespace.Wrap(ds, datastore.NewKey("/datatransfer/client/transfers"))
transport := dtgstransport.NewTransport(h.ID(), gs)
dt, err := dtimpl.NewDataTransfer(dtDs, net, transport, sc)
if err != nil {
return nil, err
}
lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error {
return dt.Start(ctx)
},
OnStop: func(context.Context) error {
return dt.Stop()
},
})
return dt, nil
}
// NewClientDealStore creates a statestore for the client to store its deals
@ -90,7 +111,7 @@ func NewClientRequestValidator(deals dtypes.ClientDealStore) dtypes.ClientReques
func StorageClient(lc fx.Lifecycle, h host.Host, ibs dtypes.ClientBlockstore, r repo.LockedRepo, dataTransfer dtypes.ClientDataTransfer, discovery *discovery.Local, deals dtypes.ClientDatastore, scn storagemarket.StorageClientNode) (storagemarket.StorageClient, error) {
net := smnet.NewFromLibp2pHost(h)
c, err := storageimpl.NewClient(net, ibs, dataTransfer, discovery, deals, scn)
c, err := storageimpl.NewClient(net, ibs, dataTransfer, discovery, deals, scn, storageimpl.DealPollingInterval(time.Second))
if err != nil {
return nil, err
}

View File

@ -6,6 +6,7 @@ import (
"errors"
"io"
"io/ioutil"
"path/filepath"
"github.com/gbrlsnchs/jwt/v3"
logging "github.com/ipfs/go-log/v2"
@ -18,6 +19,7 @@ import (
"github.com/filecoin-project/lotus/api/apistruct"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/journal"
"github.com/filecoin-project/lotus/lib/addrutil"
"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/node/repo"
@ -93,3 +95,7 @@ func BuiltinBootstrap() (dtypes.BootstrapPeers, error) {
func DrandBootstrap() (dtypes.DrandBootstrap, error) {
return build.DrandBootstrap()
}
func SetupJournal(lr repo.LockedRepo) error {
return journal.InitializeSystemJournal(filepath.Join(lr.Path(), "journal"))
}

View File

@ -1,10 +1,10 @@
package dtypes
import (
"github.com/ipfs/go-cid"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/ipfs/go-cid"
"time"
)
type MinerAddress address.Address
@ -50,3 +50,9 @@ type ConsiderOfflineRetrievalDealsConfigFunc func() (bool, error)
// SetConsiderOfflineRetrievalDealsConfigFunc is a function which is used to
// disable or enable retrieval deal acceptance.
type SetConsiderOfflineRetrievalDealsConfigFunc func(bool) error
// SetSealingDelay sets how long a sector waits for more deals before sealing begins.
type SetSealingDelayFunc func(time.Duration) error
// GetSealingDelay returns how long a sector waits for more deals before sealing begins.
type GetSealingDelayFunc func() (time.Duration, error)

View File

@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"net/http"
"time"
"github.com/ipfs/go-bitswap"
"github.com/ipfs/go-bitswap/network"
@ -24,7 +25,9 @@ import (
"golang.org/x/xerrors"
"github.com/filecoin-project/go-address"
dtgraphsync "github.com/filecoin-project/go-data-transfer/impl/graphsync"
dtimpl "github.com/filecoin-project/go-data-transfer/impl"
dtnet "github.com/filecoin-project/go-data-transfer/network"
dtgstransport "github.com/filecoin-project/go-data-transfer/transport/graphsync"
piecefilestore "github.com/filecoin-project/go-fil-markets/filestore"
"github.com/filecoin-project/go-fil-markets/piecestore"
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
@ -134,7 +137,7 @@ func SectorIDCounter(ds dtypes.MetadataDS) sealing.SectorIDCounter {
return &sidsc{sc}
}
func StorageMiner(mctx helpers.MetricsCtx, lc fx.Lifecycle, api lapi.FullNode, h host.Host, ds dtypes.MetadataDS, sealer sectorstorage.SectorManager, sc sealing.SectorIDCounter, verif ffiwrapper.Verifier) (*storage.Miner, error) {
func StorageMiner(mctx helpers.MetricsCtx, lc fx.Lifecycle, api lapi.FullNode, h host.Host, ds dtypes.MetadataDS, sealer sectorstorage.SectorManager, sc sealing.SectorIDCounter, verif ffiwrapper.Verifier, gsd dtypes.GetSealingDelayFunc) (*storage.Miner, error) {
maddr, err := minerAddrFromDS(ds)
if err != nil {
return nil, err
@ -157,7 +160,7 @@ func StorageMiner(mctx helpers.MetricsCtx, lc fx.Lifecycle, api lapi.FullNode, h
return nil, err
}
sm, err := storage.NewMiner(api, maddr, worker, h, ds, sealer, sc, verif)
sm, err := storage.NewMiner(api, maddr, worker, h, ds, sealer, sc, verif, gsd)
if err != nil {
return nil, err
}
@ -208,9 +211,26 @@ func RegisterProviderValidator(mrv dtypes.ProviderRequestValidator, dtm dtypes.P
// NewProviderDAGServiceDataTransfer returns a data transfer manager that just
// uses the provider's Staging DAG service for transfers
func NewProviderDAGServiceDataTransfer(h host.Host, gs dtypes.StagingGraphsync, ds dtypes.MetadataDS) dtypes.ProviderDataTransfer {
func NewProviderDAGServiceDataTransfer(lc fx.Lifecycle, h host.Host, gs dtypes.StagingGraphsync, ds dtypes.MetadataDS) (dtypes.ProviderDataTransfer, error) {
sc := storedcounter.New(ds, datastore.NewKey("/datatransfer/provider/counter"))
return dtgraphsync.NewGraphSyncDataTransfer(h, gs, sc)
net := dtnet.NewFromLibp2pHost(h)
dtDs := namespace.Wrap(ds, datastore.NewKey("/datatransfer/provider/transfers"))
transport := dtgstransport.NewTransport(h.ID(), gs)
dt, err := dtimpl.NewDataTransfer(dtDs, net, transport, sc)
if err != nil {
return nil, err
}
lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error {
return dt.Start(ctx)
},
OnStop: func(context.Context) error {
return dt.Stop()
},
})
return dt, nil
}
// NewProviderDealStore creates a statestore for the client to store its deals
@ -525,6 +545,24 @@ func NewSetConsiderOfflineRetrievalDealsConfigFunc(r repo.LockedRepo) (dtypes.Se
}, nil
}
func NewSetSealDelayFunc(r repo.LockedRepo) (dtypes.SetSealingDelayFunc, error) {
return func(delay time.Duration) (err error) {
err = mutateCfg(r, func(cfg *config.StorageMiner) {
cfg.SealingDelay = config.Duration(delay)
})
return
}, nil
}
func NewGetSealDelayFunc(r repo.LockedRepo) (dtypes.GetSealingDelayFunc, error) {
return func() (out time.Duration, err error) {
err = readCfg(r, func(cfg *config.StorageMiner) {
out = time.Duration(cfg.SealingDelay)
})
return
}, nil
}
func readCfg(r repo.LockedRepo, accessor func(*config.StorageMiner)) error {
raw, err := r.Config()
if err != nil {

View File

@ -459,10 +459,10 @@ func TestAPIDealFlow(t *testing.T) {
logging.SetLogLevel("storageminer", "ERROR")
t.Run("TestDealFlow", func(t *testing.T) {
test.TestDealFlow(t, mockSbBuilder, 10*time.Millisecond, false)
test.TestDealFlow(t, mockSbBuilder, 10*time.Millisecond, false, false)
})
t.Run("WithExportedCAR", func(t *testing.T) {
test.TestDealFlow(t, mockSbBuilder, 10*time.Millisecond, true)
test.TestDealFlow(t, mockSbBuilder, 10*time.Millisecond, true, false)
})
t.Run("TestDoubleDealFlow", func(t *testing.T) {
test.TestDoubleDealFlow(t, mockSbBuilder, 10*time.Millisecond)
@ -480,7 +480,13 @@ func TestAPIDealFlowReal(t *testing.T) {
logging.SetLogLevel("sub", "ERROR")
logging.SetLogLevel("storageminer", "ERROR")
test.TestDealFlow(t, builder, time.Second, false)
t.Run("basic", func(t *testing.T) {
test.TestDealFlow(t, builder, time.Second, false, false)
})
t.Run("fast-retrieval", func(t *testing.T) {
test.TestDealFlow(t, builder, time.Second, false, true)
})
}
func TestDealMining(t *testing.T) {

View File

@ -20,6 +20,7 @@ import (
"github.com/multiformats/go-multiaddr"
"golang.org/x/xerrors"
"github.com/filecoin-project/sector-storage/fsutil"
"github.com/filecoin-project/sector-storage/stores"
"github.com/filecoin-project/lotus/chain/types"
@ -344,8 +345,16 @@ func (fsr *fsLockedRepo) SetStorage(c func(*stores.StorageConfig)) error {
return config.WriteStorageFile(fsr.join(fsStorageConfig), sc)
}
func (fsr *fsLockedRepo) Stat(path string) (stores.FsStat, error) {
return stores.Stat(path)
func (fsr *fsLockedRepo) Stat(path string) (fsutil.FsStat, error) {
return fsutil.Statfs(path)
}
func (fsr *fsLockedRepo) DiskUsage(path string) (int64, error) {
si, err := fsutil.FileSize(path)
if err != nil {
return 0, err
}
return si.OnDisk, nil
}
func (fsr *fsLockedRepo) SetAPIEndpoint(ma multiaddr.Multiaddr) error {

View File

@ -9,9 +9,10 @@ import (
"path/filepath"
"strconv"
dgbadger "github.com/dgraph-io/badger/v2"
badger "github.com/ipfs/go-ds-badger2"
levelds "github.com/ipfs/go-ds-leveldb"
"github.com/ipfs/go-ds-measure"
measure "github.com/ipfs/go-ds-measure"
ldbopts "github.com/syndtr/goleveldb/leveldb/opt"
)
@ -31,7 +32,8 @@ var fsMultiDatastores = map[string]dsCtor{
func badgerDs(path string) (datastore.Batching, error) {
opts := badger.DefaultOptions
opts.Truncate = true
opts.Options = dgbadger.DefaultOptions("").WithTruncate(true).
WithValueThreshold(1 << 10)
return badger.NewDatastore(path, &opts)
}

View File

@ -3,11 +3,12 @@ package repo
import (
"errors"
"github.com/filecoin-project/sector-storage/stores"
"github.com/ipfs/go-datastore"
"github.com/multiformats/go-multiaddr"
"github.com/filecoin-project/sector-storage/fsutil"
"github.com/filecoin-project/sector-storage/stores"
"github.com/filecoin-project/lotus/chain/types"
)
@ -44,7 +45,8 @@ type LockedRepo interface {
GetStorage() (stores.StorageConfig, error)
SetStorage(func(*stores.StorageConfig)) error
Stat(path string) (stores.FsStat, error)
Stat(path string) (fsutil.FsStat, error)
DiskUsage(path string) (int64, error)
// SetAPIEndpoint sets the endpoint of the current API
// so it can be read by API clients

View File

@ -14,6 +14,8 @@ import (
"github.com/multiformats/go-multiaddr"
"golang.org/x/xerrors"
"github.com/filecoin-project/sector-storage/fsutil"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/node/config"
"github.com/filecoin-project/sector-storage/stores"
@ -77,8 +79,16 @@ func (lmem *lockedMemRepo) SetStorage(c func(*stores.StorageConfig)) error {
return nil
}
func (lmem *lockedMemRepo) Stat(path string) (stores.FsStat, error) {
return stores.Stat(path)
func (lmem *lockedMemRepo) Stat(path string) (fsutil.FsStat, error) {
return fsutil.Statfs(path)
}
func (lmem *lockedMemRepo) DiskUsage(path string) (int64, error) {
si, err := fsutil.FileSize(path)
if err != nil {
return 0, err
}
return si.OnDisk, nil
}
func (lmem *lockedMemRepo) Path() string {

View File

@ -41,7 +41,8 @@ type Miner struct {
maddr address.Address
worker address.Address
sealing *sealing.Sealing
getSealDelay dtypes.GetSealingDelayFunc
sealing *sealing.Sealing
}
type storageMinerApi interface {
@ -77,7 +78,7 @@ type storageMinerApi interface {
WalletHas(context.Context, address.Address) (bool, error)
}
func NewMiner(api storageMinerApi, maddr, worker address.Address, h host.Host, ds datastore.Batching, sealer sectorstorage.SectorManager, sc sealing.SectorIDCounter, verif ffiwrapper.Verifier) (*Miner, error) {
func NewMiner(api storageMinerApi, maddr, worker address.Address, h host.Host, ds datastore.Batching, sealer sectorstorage.SectorManager, sc sealing.SectorIDCounter, verif ffiwrapper.Verifier, gsd dtypes.GetSealingDelayFunc) (*Miner, error) {
m := &Miner{
api: api,
h: h,
@ -86,8 +87,9 @@ func NewMiner(api storageMinerApi, maddr, worker address.Address, h host.Host, d
sc: sc,
verif: verif,
maddr: maddr,
worker: worker,
maddr: maddr,
worker: worker,
getSealDelay: gsd,
}
return m, nil
@ -106,7 +108,7 @@ func (m *Miner) Run(ctx context.Context) error {
evts := events.NewEvents(ctx, m.api)
adaptedAPI := NewSealingAPIAdapter(m.api)
pcp := sealing.NewBasicPreCommitPolicy(adaptedAPI, miner.MaxSectorExpirationExtension-(miner.WPoStProvingPeriod*2), md.PeriodStart%miner.WPoStProvingPeriod)
m.sealing = sealing.New(adaptedAPI, NewEventsAdapter(evts), m.maddr, m.ds, m.sealer, m.sc, m.verif, &pcp)
m.sealing = sealing.New(adaptedAPI, NewEventsAdapter(evts), m.maddr, m.ds, m.sealer, m.sc, m.verif, &pcp, sealing.GetSealingDelayFunc(m.getSealDelay))
go m.sealing.Run(ctx) //nolint:errcheck // logged intside the function