Merge remote-tracking branch 'origin/next' into feat/upgrade-markets-0.4.0

This commit is contained in:
Łukasz Magiera 2020-07-08 14:59:00 +02:00
commit 2b9c05d395
16 changed files with 672 additions and 190 deletions

View File

@ -30,7 +30,7 @@ func (bbr BadBlockReason) Linked(reason string, i ...interface{}) BadBlockReason
if bbr.OriginalReason != nil {
or = bbr.OriginalReason
}
return BadBlockReason{Reason: reason, OriginalReason: or}
return BadBlockReason{Reason: fmt.Sprintf(reason, i...), OriginalReason: or}
}
func (bbr BadBlockReason) String() string {

View File

@ -2,18 +2,24 @@ package state
import (
"context"
"github.com/ipfs/go-cid"
cbor "github.com/ipfs/go-ipld-cbor"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-amt-ipld/v2"
"github.com/filecoin-project/lotus/api/apibstore"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/specs-actors/actors/builtin"
"github.com/filecoin-project/specs-actors/actors/builtin/market"
"github.com/ipfs/go-cid"
cbor "github.com/ipfs/go-ipld-cbor"
"github.com/filecoin-project/specs-actors/actors/builtin/miner"
"github.com/filecoin-project/specs-actors/actors/util/adt"
"github.com/filecoin-project/lotus/api/apibstore"
"github.com/filecoin-project/lotus/chain/types"
)
// UserData is the data returned from the DiffFunc
// UserData is the data returned from the DiffTipSetKeyFunc
type UserData interface{}
// ChainAPI abstracts out calls made by this class to external APIs
@ -35,22 +41,22 @@ func NewStatePredicates(api ChainAPI) *StatePredicates {
}
}
// DiffFunc check if there's a change form oldState to newState, and returns
// DiffTipSetKeyFunc check if there's a change form oldState to newState, and returns
// - changed: was there a change
// - user: user-defined data representing the state change
// - err
type DiffFunc func(ctx context.Context, oldState, newState *types.TipSet) (changed bool, user UserData, err error)
type DiffTipSetKeyFunc func(ctx context.Context, oldState, newState types.TipSetKey) (changed bool, user UserData, err error)
type DiffStateFunc func(ctx context.Context, oldActorStateHead, newActorStateHead cid.Cid) (changed bool, user UserData, err error)
type DiffActorStateFunc func(ctx context.Context, oldActorStateHead, newActorStateHead cid.Cid) (changed bool, user UserData, err error)
// OnActorStateChanged calls diffStateFunc when the state changes for the given actor
func (sp *StatePredicates) OnActorStateChanged(addr address.Address, diffStateFunc DiffStateFunc) DiffFunc {
return func(ctx context.Context, oldState, newState *types.TipSet) (changed bool, user UserData, err error) {
oldActor, err := sp.api.StateGetActor(ctx, addr, oldState.Key())
func (sp *StatePredicates) OnActorStateChanged(addr address.Address, diffStateFunc DiffActorStateFunc) DiffTipSetKeyFunc {
return func(ctx context.Context, oldState, newState types.TipSetKey) (changed bool, user UserData, err error) {
oldActor, err := sp.api.StateGetActor(ctx, addr, oldState)
if err != nil {
return false, nil, err
}
newActor, err := sp.api.StateGetActor(ctx, addr, newState.Key())
newActor, err := sp.api.StateGetActor(ctx, addr, newState)
if err != nil {
return false, nil, err
}
@ -65,7 +71,7 @@ func (sp *StatePredicates) OnActorStateChanged(addr address.Address, diffStateFu
type DiffStorageMarketStateFunc func(ctx context.Context, oldState *market.State, newState *market.State) (changed bool, user UserData, err error)
// OnStorageMarketActorChanged calls diffStorageMarketState when the state changes for the market actor
func (sp *StatePredicates) OnStorageMarketActorChanged(diffStorageMarketState DiffStorageMarketStateFunc) DiffFunc {
func (sp *StatePredicates) OnStorageMarketActorChanged(diffStorageMarketState DiffStorageMarketStateFunc) DiffTipSetKeyFunc {
return sp.OnActorStateChanged(builtin.StorageMarketActorAddr, func(ctx context.Context, oldActorStateHead, newActorStateHead cid.Cid) (changed bool, user UserData, err error) {
var oldState market.State
if err := sp.cst.Get(ctx, oldActorStateHead, &oldState); err != nil {
@ -142,3 +148,123 @@ func (sp *StatePredicates) DealStateChangedForIDs(dealIds []abi.DealID) DiffDeal
return false, nil, nil
}
}
type DiffMinerActorStateFunc func(ctx context.Context, oldState *miner.State, newState *miner.State) (changed bool, user UserData, err error)
func (sp *StatePredicates) OnMinerActorChange(minerAddr address.Address, diffMinerActorState DiffMinerActorStateFunc) DiffTipSetKeyFunc {
return sp.OnActorStateChanged(minerAddr, func(ctx context.Context, oldActorStateHead, newActorStateHead cid.Cid) (changed bool, user UserData, err error) {
var oldState miner.State
if err := sp.cst.Get(ctx, oldActorStateHead, &oldState); err != nil {
return false, nil, err
}
var newState miner.State
if err := sp.cst.Get(ctx, newActorStateHead, &newState); err != nil {
return false, nil, err
}
return diffMinerActorState(ctx, &oldState, &newState)
})
}
type MinerSectorChanges struct {
Added []miner.SectorOnChainInfo
Extended []SectorExtensions
Removed []miner.SectorOnChainInfo
}
type SectorExtensions struct {
From miner.SectorOnChainInfo
To miner.SectorOnChainInfo
}
func (sp *StatePredicates) OnMinerSectorChange() DiffMinerActorStateFunc {
return func(ctx context.Context, oldState, newState *miner.State) (changed bool, user UserData, err error) {
ctxStore := &contextStore{
ctx: context.TODO(),
cst: sp.cst,
}
sectorChanges := &MinerSectorChanges{
Added: []miner.SectorOnChainInfo{},
Extended: []SectorExtensions{},
Removed: []miner.SectorOnChainInfo{},
}
// no sector changes
if oldState.Sectors.Equals(newState.Sectors) {
return false, nil, nil
}
oldSectors, err := adt.AsArray(ctxStore, oldState.Sectors)
if err != nil {
return false, nil, err
}
newSectors, err := adt.AsArray(ctxStore, newState.Sectors)
if err != nil {
return false, nil, err
}
var osi miner.SectorOnChainInfo
// find all sectors that were extended or removed
if err := oldSectors.ForEach(&osi, func(i int64) error {
var nsi miner.SectorOnChainInfo
found, err := newSectors.Get(uint64(osi.SectorNumber), &nsi)
if err != nil {
return err
}
if !found {
sectorChanges.Removed = append(sectorChanges.Removed, osi)
return nil
}
if nsi.Expiration != osi.Expiration {
sectorChanges.Extended = append(sectorChanges.Extended, SectorExtensions{
From: osi,
To: nsi,
})
}
// we don't update miners state filed with `newSectors.Root()` so this operation is safe.
if err := newSectors.Delete(uint64(osi.SectorNumber)); err != nil {
return err
}
return nil
}); err != nil {
return false, nil, err
}
// all sectors that remain in newSectors are new
var nsi miner.SectorOnChainInfo
if err := newSectors.ForEach(&nsi, func(i int64) error {
sectorChanges.Added = append(sectorChanges.Added, nsi)
return nil
}); err != nil {
return false, nil, err
}
// nothing changed
if len(sectorChanges.Added)+len(sectorChanges.Extended)+len(sectorChanges.Removed) == 0 {
return false, nil, nil
}
return true, sectorChanges, nil
}
}
type contextStore struct {
ctx context.Context
cst *cbor.BasicIpldStore
}
func (cs *contextStore) Context() context.Context {
return cs.ctx
}
func (cs *contextStore) Get(ctx context.Context, c cid.Cid, out interface{}) error {
return cs.cst.Get(ctx, c, out)
}
func (cs *contextStore) Put(ctx context.Context, v interface{}) (cid.Cid, error) {
return cs.cst.Put(ctx, v)
}

View File

@ -4,23 +4,26 @@ import (
"context"
"testing"
"github.com/filecoin-project/specs-actors/actors/crypto"
"github.com/ipfs/go-hamt-ipld"
"github.com/filecoin-project/go-amt-ipld/v2"
"github.com/filecoin-project/specs-actors/actors/builtin/market"
ds "github.com/ipfs/go-datastore"
ds_sync "github.com/ipfs/go-datastore/sync"
bstore "github.com/ipfs/go-ipfs-blockstore"
cbornode "github.com/ipfs/go-ipld-cbor"
"github.com/stretchr/testify/require"
"golang.org/x/xerrors"
"github.com/ipfs/go-cid"
"github.com/stretchr/testify/require"
ds "github.com/ipfs/go-datastore"
ds_sync "github.com/ipfs/go-datastore/sync"
"github.com/ipfs/go-hamt-ipld"
bstore "github.com/ipfs/go-ipfs-blockstore"
cbornode "github.com/ipfs/go-ipld-cbor"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/go-amt-ipld/v2"
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/specs-actors/actors/abi/big"
"github.com/filecoin-project/specs-actors/actors/builtin/market"
"github.com/filecoin-project/specs-actors/actors/builtin/miner"
"github.com/filecoin-project/specs-actors/actors/crypto"
tutils "github.com/filecoin-project/specs-actors/support/testing"
"github.com/filecoin-project/lotus/chain/types"
)
var dummyCid cid.Cid
@ -104,12 +107,12 @@ func TestPredicates(t *testing.T) {
diffFn := preds.OnStorageMarketActorChanged(preds.OnDealStateChanged(preds.DealStateChangedForIDs(dealIds)))
// Diff a state against itself: expect no change
changed, _, err := diffFn(ctx, oldState, oldState)
changed, _, err := diffFn(ctx, oldState.Key(), oldState.Key())
require.NoError(t, err)
require.False(t, changed)
// Diff old state against new state
changed, val, err := diffFn(ctx, oldState, newState)
changed, val, err := diffFn(ctx, oldState.Key(), newState.Key())
require.NoError(t, err)
require.True(t, changed)
@ -130,7 +133,7 @@ func TestPredicates(t *testing.T) {
// Diff with non-existent deal.
noDeal := []abi.DealID{3}
diffNoDealFn := preds.OnStorageMarketActorChanged(preds.OnDealStateChanged(preds.DealStateChangedForIDs(noDeal)))
changed, _, err = diffNoDealFn(ctx, oldState, newState)
changed, _, err = diffNoDealFn(ctx, oldState.Key(), newState.Key())
require.NoError(t, err)
require.False(t, changed)
@ -141,7 +144,7 @@ func TestPredicates(t *testing.T) {
t.Fatal("No state change so this should not be called")
return false, nil, nil
})
changed, _, err = actorDiffFn(ctx, oldState, oldState)
changed, _, err = actorDiffFn(ctx, oldState.Key(), oldState.Key())
require.NoError(t, err)
require.False(t, changed)
@ -156,6 +159,87 @@ func TestPredicates(t *testing.T) {
require.False(t, changed)
}
func TestMinerSectorChange(t *testing.T) {
ctx := context.Background()
bs := bstore.NewBlockstore(ds_sync.MutexWrap(ds.NewMapDatastore()))
store := cbornode.NewCborStore(bs)
nextID := uint64(0)
nextIDAddrF := func() address.Address {
defer func() { nextID++ }()
return tutils.NewIDAddr(t, nextID)
}
owner, worker := nextIDAddrF(), nextIDAddrF()
si0 := newSectorOnChainInfo(0, tutils.MakeCID("0"), big.NewInt(0), abi.ChainEpoch(0), abi.ChainEpoch(10))
si1 := newSectorOnChainInfo(1, tutils.MakeCID("1"), big.NewInt(1), abi.ChainEpoch(1), abi.ChainEpoch(11))
si2 := newSectorOnChainInfo(2, tutils.MakeCID("2"), big.NewInt(2), abi.ChainEpoch(2), abi.ChainEpoch(11))
oldMinerC := createMinerState(ctx, t, store, owner, worker, []miner.SectorOnChainInfo{si0, si1, si2})
si3 := newSectorOnChainInfo(3, tutils.MakeCID("3"), big.NewInt(3), abi.ChainEpoch(3), abi.ChainEpoch(12))
// 0 delete
// 1 extend
// 2 same
// 3 added
si1Ext := si1
si1Ext.Expiration++
newMinerC := createMinerState(ctx, t, store, owner, worker, []miner.SectorOnChainInfo{si1Ext, si2, si3})
minerAddr := nextIDAddrF()
oldState, err := mockTipset(minerAddr, 1)
require.NoError(t, err)
newState, err := mockTipset(minerAddr, 2)
require.NoError(t, err)
api := newMockAPI(bs)
api.setActor(oldState.Key(), &types.Actor{Head: oldMinerC})
api.setActor(newState.Key(), &types.Actor{Head: newMinerC})
preds := NewStatePredicates(api)
minerDiffFn := preds.OnMinerActorChange(minerAddr, preds.OnMinerSectorChange())
change, val, err := minerDiffFn(ctx, oldState.Key(), newState.Key())
require.NoError(t, err)
require.True(t, change)
require.NotNil(t, val)
sectorChanges, ok := val.(*MinerSectorChanges)
require.True(t, ok)
require.Equal(t, len(sectorChanges.Added), 1)
require.Equal(t, sectorChanges.Added[0], si3)
require.Equal(t, len(sectorChanges.Removed), 1)
require.Equal(t, sectorChanges.Removed[0], si0)
require.Equal(t, len(sectorChanges.Extended), 1)
require.Equal(t, sectorChanges.Extended[0].From, si1)
require.Equal(t, sectorChanges.Extended[0].To, si1Ext)
change, val, err = minerDiffFn(ctx, oldState.Key(), oldState.Key())
require.NoError(t, err)
require.False(t, change)
require.Nil(t, val)
change, val, err = minerDiffFn(ctx, newState.Key(), oldState.Key())
require.NoError(t, err)
require.True(t, change)
require.NotNil(t, val)
sectorChanges, ok = val.(*MinerSectorChanges)
require.True(t, ok)
require.Equal(t, len(sectorChanges.Added), 1)
require.Equal(t, sectorChanges.Added[0], si0)
require.Equal(t, len(sectorChanges.Removed), 1)
require.Equal(t, sectorChanges.Removed[0], si3)
require.Equal(t, len(sectorChanges.Extended), 1)
require.Equal(t, sectorChanges.Extended[0].To, si1)
require.Equal(t, sectorChanges.Extended[0].From, si1Ext)
}
func mockTipset(miner address.Address, timestamp uint64) (*types.TipSet, error) {
return types.NewTipSet([]*types.BlockHeader{{
Miner: miner,
@ -170,7 +254,7 @@ func mockTipset(miner address.Address, timestamp uint64) (*types.TipSet, error)
}
func createMarketState(ctx context.Context, t *testing.T, store *cbornode.BasicIpldStore, deals map[abi.DealID]*market.DealState) cid.Cid {
rootCid := createAMT(ctx, t, store, deals)
rootCid := createDealAMT(ctx, t, store, deals)
state := createEmptyMarketState(t, store)
state.States = rootCid
@ -188,7 +272,7 @@ func createEmptyMarketState(t *testing.T, store *cbornode.BasicIpldStore) *marke
return market.ConstructState(emptyArrayCid, emptyMap, emptyMap)
}
func createAMT(ctx context.Context, t *testing.T, store *cbornode.BasicIpldStore, deals map[abi.DealID]*market.DealState) cid.Cid {
func createDealAMT(ctx context.Context, t *testing.T, store *cbornode.BasicIpldStore, deals map[abi.DealID]*market.DealState) cid.Cid {
root := amt.NewAMT(store)
for dealID, dealState := range deals {
err := root.Set(ctx, uint64(dealID), dealState)
@ -198,3 +282,77 @@ func createAMT(ctx context.Context, t *testing.T, store *cbornode.BasicIpldStore
require.NoError(t, err)
return rootCid
}
func createMinerState(ctx context.Context, t *testing.T, store *cbornode.BasicIpldStore, owner, worker address.Address, sectors []miner.SectorOnChainInfo) cid.Cid {
rootCid := createSectorsAMT(ctx, t, store, sectors)
state := createEmptyMinerState(ctx, t, store, owner, worker)
state.Sectors = rootCid
stateC, err := store.Put(ctx, state)
require.NoError(t, err)
return stateC
}
func createEmptyMinerState(ctx context.Context, t *testing.T, store *cbornode.BasicIpldStore, owner, worker address.Address) *miner.State {
emptyArrayCid, err := amt.NewAMT(store).Flush(context.TODO())
require.NoError(t, err)
emptyMap, err := store.Put(context.TODO(), hamt.NewNode(store, hamt.UseTreeBitWidth(5)))
require.NoError(t, err)
emptyDeadlines := miner.ConstructDeadlines()
emptyDeadlinesCid, err := store.Put(context.Background(), emptyDeadlines)
require.NoError(t, err)
minerInfo := emptyMap
state, err := miner.ConstructState(minerInfo, 123, emptyArrayCid, emptyMap, emptyDeadlinesCid)
require.NoError(t, err)
return state
}
func createSectorsAMT(ctx context.Context, t *testing.T, store *cbornode.BasicIpldStore, sectors []miner.SectorOnChainInfo) cid.Cid {
root := amt.NewAMT(store)
for _, sector := range sectors {
sector := sector
err := root.Set(ctx, uint64(sector.SectorNumber), &sector)
require.NoError(t, err)
}
rootCid, err := root.Flush(ctx)
require.NoError(t, err)
return rootCid
}
// returns a unique SectorOnChainInfo with each invocation with SectorNumber set to `sectorNo`.
func newSectorOnChainInfo(sectorNo abi.SectorNumber, sealed cid.Cid, weight big.Int, activation, expiration abi.ChainEpoch) miner.SectorOnChainInfo {
info := newSectorPreCommitInfo(sectorNo, sealed, expiration)
return miner.SectorOnChainInfo{
SectorNumber: info.SectorNumber,
SealProof: info.SealProof,
SealedCID: info.SealedCID,
DealIDs: info.DealIDs,
Expiration: info.Expiration,
Activation: activation,
DealWeight: weight,
VerifiedDealWeight: weight,
InitialPledge: big.Zero(),
}
}
const (
sectorSealRandEpochValue = abi.ChainEpoch(1)
)
// returns a unique SectorPreCommitInfo with each invocation with SectorNumber set to `sectorNo`.
func newSectorPreCommitInfo(sectorNo abi.SectorNumber, sealed cid.Cid, expiration abi.ChainEpoch) *miner.SectorPreCommitInfo {
return &miner.SectorPreCommitInfo{
SealProof: abi.RegisteredSealProof_StackedDrg32GiBV1,
SectorNumber: sectorNo,
SealedCID: sealed,
SealRandEpoch: sectorSealRandEpochValue,
DealIDs: nil,
Expiration: expiration,
}
}

View File

@ -21,6 +21,7 @@ import (
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/state"
"github.com/filecoin-project/lotus/chain/vm"
"github.com/filecoin-project/lotus/journal"
"github.com/filecoin-project/lotus/metrics"
"go.opencensus.io/stats"
"go.opencensus.io/trace"
@ -325,6 +326,14 @@ func (cs *ChainStore) reorgWorker(ctx context.Context, initialNotifees []ReorgNo
continue
}
journal.Add("sync", map[string]interface{}{
"op": "headChange",
"from": r.old.Key(),
"to": r.new.Key(),
"rev": len(revert),
"apply": len(apply),
})
// reverse the apply array
for i := len(apply)/2 - 1; i >= 0; i-- {
opp := len(apply) - 1 - i

View File

@ -536,7 +536,7 @@ func (syncer *Syncer) ValidateTipSet(ctx context.Context, fts *store.FullTipSet)
futures = append(futures, async.Err(func() error {
if err := syncer.ValidateBlock(ctx, b); err != nil {
if isPermanent(err) {
syncer.bad.Add(b.Cid(), BadBlockReason{Reason: err.Error()})
syncer.bad.Add(b.Cid(), NewBadBlockReason([]cid.Cid{b.Cid()}, err.Error()))
}
return xerrors.Errorf("validating block %s: %w", b.Cid(), err)
}

View File

@ -418,8 +418,8 @@ var clientRetrieveCmd = &cli.Command{
ArgsUsage: "[dataCid outputPath]",
Flags: []cli.Flag{
&cli.StringFlag{
Name: "address",
Usage: "address to use for transactions",
Name: "from",
Usage: "address to send transactions from",
},
&cli.BoolFlag{
Name: "car",
@ -444,8 +444,8 @@ var clientRetrieveCmd = &cli.Command{
ctx := ReqContext(cctx)
var payer address.Address
if cctx.String("address") != "" {
payer, err = address.NewFromString(cctx.String("address"))
if cctx.String("from") != "" {
payer, err = address.NewFromString(cctx.String("from"))
} else {
payer, err = fapi.WalletDefaultAddress(ctx)
}
@ -531,7 +531,7 @@ var clientQueryAskCmd = &cli.Command{
},
Action: func(cctx *cli.Context) error {
if cctx.NArg() != 1 {
fmt.Println("Usage: query-ask [address]")
fmt.Println("Usage: query-ask [minerAddress]")
return nil
}

View File

@ -58,7 +58,7 @@ var msigCreateCmd = &cli.Command{
Value: "0",
},
&cli.StringFlag{
Name: "sender",
Name: "from",
Usage: "account to send the create message from",
},
},
@ -85,7 +85,7 @@ var msigCreateCmd = &cli.Command{
// get the address we're going to use to create the multisig (can be one of the above, as long as they have funds)
var sendAddr address.Address
if send := cctx.String("sender"); send == "" {
if send := cctx.String("from"); send == "" {
defaddr, err := api.WalletDefaultAddress(ctx)
if err != nil {
return err

View File

@ -790,8 +790,8 @@ var stateComputeStateCmd = &cli.Command{
Usage: "Perform state computations",
Flags: []cli.Flag{
&cli.Uint64Flag{
Name: "height",
Usage: "set the height to compute state at",
Name: "vm-height",
Usage: "set the height that the vm will see",
},
&cli.BoolFlag{
Name: "apply-mpool-messages",
@ -820,7 +820,7 @@ var stateComputeStateCmd = &cli.Command{
return err
}
h := abi.ChainEpoch(cctx.Uint64("height"))
h := abi.ChainEpoch(cctx.Uint64("vm-height"))
if h == 0 {
if ts == nil {
head, err := api.ChainHead(ctx)

View File

@ -3,20 +3,18 @@ package main
import (
"context"
"database/sql"
"fmt"
"github.com/filecoin-project/specs-actors/actors/util/adt"
"github.com/libp2p/go-libp2p-core/peer"
"sync"
"time"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/specs-actors/actors/abi"
miner_spec "github.com/filecoin-project/specs-actors/actors/builtin/miner"
"github.com/ipfs/go-cid"
_ "github.com/lib/pq"
"github.com/libp2p/go-libp2p-core/peer"
"golang.org/x/xerrors"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/events/state"
"github.com/filecoin-project/lotus/chain/types"
)
@ -25,8 +23,7 @@ type storage struct {
headerLk sync.Mutex
// stateful miner data
minerSectors map[cid.Cid]struct{}
genesisTs *types.TipSet
}
func openStorage(dbSource string) (*storage, error) {
@ -37,10 +34,7 @@ func openStorage(dbSource string) (*storage, error) {
db.SetMaxOpenConns(1350)
ms := make(map[cid.Cid]struct{})
ms[cid.Undef] = struct{}{}
st := &storage{db: db, minerSectors: ms}
st := &storage{db: db}
return st, st.setup()
}
@ -313,6 +307,19 @@ create table if not exists miner_sectors_heads
);
create type miner_sector_event_type as enum ('ADDED', 'EXTENDED', 'EXPIRED', 'TERMINATED');
create table if not exists miner_sector_events
(
miner_id text not null,
sector_id bigint not null,
state_root text not null,
event miner_sector_event_type not null,
constraint miner_sector_events_pk
primary key (sector_id, event, miner_id, state_root)
)
/*
create or replace function miner_tips(epoch bigint)
returns table (head text,
@ -600,12 +607,6 @@ func (st *storage) storeMiners(minerTips map[types.TipSetKey][]*minerStateInfo)
return tx.Commit()
}
type minerSectorUpdate struct {
minerState *minerStateInfo
tskey types.TipSetKey
oldSector cid.Cid
}
func (st *storage) storeMinerSectorsHeads(minerTips map[types.TipSetKey][]*minerStateInfo, api api.FullNode) error {
tx, err := st.db.Begin()
if err != nil {
@ -621,26 +622,8 @@ func (st *storage) storeMinerSectorsHeads(minerTips map[types.TipSetKey][]*miner
return err
}
var updateMiners []*minerSectorUpdate
for tsk, miners := range minerTips {
for _, miners := range minerTips {
for _, miner := range miners {
sectorCID, err := st.getLatestMinerSectorCID(context.TODO(), miner.addr)
if err != nil {
panic(err)
}
if sectorCID == cid.Undef {
continue
}
if _, found := st.minerSectors[sectorCID]; !found {
// schedule miner table update
updateMiners = append(updateMiners, &minerSectorUpdate{
minerState: miner,
tskey: tsk,
oldSector: sectorCID,
})
}
st.minerSectors[sectorCID] = struct{}{}
log.Debugw("got sector CID", "miner", miner.addr, "cid", sectorCID.String())
if _, err := stmt.Exec(
miner.addr.String(),
miner.state.Sectors.String(),
@ -660,94 +643,153 @@ func (st *storage) storeMinerSectorsHeads(minerTips map[types.TipSetKey][]*miner
return xerrors.Errorf("actor put: %w", err)
}
if err := tx.Commit(); err != nil {
return tx.Commit()
}
type sectorUpdate struct {
terminationEpoch abi.ChainEpoch
terminated bool
expirationEpoch abi.ChainEpoch
sectorID abi.SectorNumber
minerID address.Address
}
func (st *storage) updateMinerSectors(minerTips map[types.TipSetKey][]*minerStateInfo, api api.FullNode) error {
log.Debugw("updating miners constant sector table", "#tipsets", len(minerTips))
pred := state.NewStatePredicates(api)
eventTx, err := st.db.Begin()
if err != nil {
return err
}
return st.updateMinerSectors(updateMiners, api)
}
type deletedSector struct {
deletedSector miner_spec.SectorOnChainInfo
miner address.Address
tskey types.TipSetKey
}
if _, err := eventTx.Exec(`create temp table mse (like miner_sector_events excluding constraints) on commit drop;`); err != nil {
return xerrors.Errorf("prep temp: %w", err)
}
func (st *storage) updateMinerSectors(miners []*minerSectorUpdate, api api.FullNode) error {
log.Info("updating miners constant sector table")
var deletedSectors []*deletedSector
eventStmt, err := eventTx.Prepare(`copy mse (sector_id, event, miner_id, state_root) from STDIN `)
if err != nil {
return err
}
var sectorUpdates []sectorUpdate
// TODO consider performing the miner sector diffing in parallel and performing the database update after.
for _, miners := range minerTips {
for _, miner := range miners {
s := &apiIpldStore{context.TODO(), api}
newSectors, err := adt.AsArray(s, miner.minerState.state.Sectors)
// special case genesis miners
if miner.tsKey == st.genesisTs.Key() {
sectors, err := api.StateMinerSectors(context.TODO(), miner.addr, nil, true, miner.tsKey)
if err != nil {
log.Warnw("new sectors as array", "error", err, "cid", miner.minerState.state.Sectors)
return err
}
oldSectors, err := adt.AsArray(s, miner.oldSector)
if err != nil {
log.Warnw("old sectors as array", "error", err, "cid", miner.oldSector.String())
return err
}
var oldSecInfo miner_spec.SectorOnChainInfo
var newSecInfo miner_spec.SectorOnChainInfo
// if we cannot find an old sector in the new list then it was removed.
if err := oldSectors.ForEach(&oldSecInfo, func(i int64) error {
found, err := newSectors.Get(uint64(oldSecInfo.SectorNumber), &newSecInfo)
if err != nil {
log.Warnw("new sectors get", "error", err)
return err
}
if !found {
log.Infow("MINER DELETED SECTOR", "miner", miner.minerState.addr.String(), "sector", oldSecInfo.SectorNumber, "tipset", miner.tskey.String())
deletedSectors = append(deletedSectors, &deletedSector{
deletedSector: oldSecInfo,
miner: miner.minerState.addr,
tskey: miner.tskey,
})
}
return nil
}); err != nil {
log.Warnw("old sectors foreach", "error", err)
return err
}
if len(deletedSectors) > 0 {
log.Infow("Calculated updates", "miner", miner.minerState.addr, "deleted sectors", len(deletedSectors))
}
}
// now we have all the sectors that were removed, update the database
tx, err := st.db.Begin()
if err != nil {
return err
}
stmt, err := tx.Prepare(`UPDATE miner_sectors SET termination_epoch=$1 WHERE miner_id=$2 AND sector_id=$3`)
if err != nil {
return err
}
for _, ds := range deletedSectors {
ts, err := api.ChainGetTipSet(context.TODO(), ds.tskey)
if err != nil {
log.Warnw("get tipset", "error", err)
return err
}
// TODO validate this shits right
if ts.Height() >= ds.deletedSector.Expiration {
// means it expired, do nothing
log.Infow("expired sector", "miner", ds.miner.String(), "sector", ds.deletedSector.SectorNumber)
log.Debugw("failed to get miner info for genesis", "miner", miner.addr.String())
continue
}
log.Infow("terminated sector", "miner", ds.miner.String(), "sector", ds.deletedSector.SectorNumber)
// means it was terminated.
if _, err := stmt.Exec(int64(ts.Height()), ds.miner.String(), int64(ds.deletedSector.SectorNumber)); err != nil {
for _, sector := range sectors {
if _, err := eventStmt.Exec(sector.Info.SectorNumber, "ADDED", miner.addr.String(), miner.stateroot.String()); err != nil {
return err
}
}
} else {
sectorDiffFn := pred.OnMinerActorChange(miner.addr, pred.OnMinerSectorChange())
changed, val, err := sectorDiffFn(context.TODO(), miner.parentTsKey, miner.tsKey)
if err != nil {
log.Debugw("error getting miner sector diff", "miner", miner.addr, "error", err)
continue
}
if !changed {
continue
}
changes := val.(*state.MinerSectorChanges)
log.Debugw("sector changes for miner", "miner", miner.addr.String(), "Added", len(changes.Added), "Extended", len(changes.Extended), "Removed", len(changes.Removed), "oldState", miner.parentTsKey, "newState", miner.tsKey)
for _, extended := range changes.Extended {
if _, err := eventStmt.Exec(extended.To.SectorNumber, "EXTENDED", miner.addr.String(), miner.stateroot.String()); err != nil {
return err
}
sectorUpdates = append(sectorUpdates, sectorUpdate{
terminationEpoch: 0,
terminated: false,
expirationEpoch: extended.To.Expiration,
sectorID: extended.To.SectorNumber,
minerID: miner.addr,
})
log.Debugw("sector extended", "miner", miner.addr.String(), "sector", extended.To.SectorNumber, "old", extended.To.Expiration, "new", extended.From.Expiration)
}
curTs, err := api.ChainGetTipSet(context.TODO(), miner.tsKey)
if err != nil {
return err
}
for _, removed := range changes.Removed {
// decide if they were terminated or extended
if removed.Expiration > curTs.Height() {
if _, err := eventStmt.Exec(removed.SectorNumber, "TERMINATED", miner.addr.String(), miner.stateroot.String()); err != nil {
return err
}
log.Debugw("sector terminated", "miner", miner.addr.String(), "sector", removed.SectorNumber, "old", "sectorExpiration", removed.Expiration, "terminationEpoch", curTs.Height())
sectorUpdates = append(sectorUpdates, sectorUpdate{
terminationEpoch: curTs.Height(),
terminated: true,
expirationEpoch: removed.Expiration,
sectorID: removed.SectorNumber,
minerID: miner.addr,
})
}
if _, err := eventStmt.Exec(removed.SectorNumber, "EXPIRED", miner.addr.String(), miner.stateroot.String()); err != nil {
return err
}
log.Debugw("sector removed", "miner", miner.addr.String(), "sector", removed.SectorNumber, "old", "sectorExpiration", removed.Expiration, "currEpoch", curTs.Height())
}
for _, added := range changes.Added {
if _, err := eventStmt.Exec(miner.addr.String(), added.SectorNumber, miner.stateroot.String(), "ADDED"); err != nil {
return err
}
}
}
}
}
if err := eventStmt.Close(); err != nil {
return err
}
if _, err := eventTx.Exec(`insert into miner_sector_events select * from mse on conflict do nothing `); err != nil {
return xerrors.Errorf("actor put: %w", err)
}
if err := eventTx.Commit(); err != nil {
return err
}
updateTx, err := st.db.Begin()
if err != nil {
return err
}
updateStmt, err := updateTx.Prepare(`UPDATE miner_sectors SET termination_epoch=$1, expiration_epoch=$2 WHERE miner_id=$3 AND sector_id=$4`)
if err != nil {
return err
}
for _, update := range sectorUpdates {
if update.terminated {
if _, err := updateStmt.Exec(update.terminationEpoch, update.expirationEpoch, update.minerID.String(), update.sectorID); err != nil {
return err
}
} else {
if _, err := updateStmt.Exec(nil, update.expirationEpoch, update.minerID.String(), update.sectorID); err != nil {
return err
}
}
}
if err := stmt.Close(); err != nil {
if err := updateStmt.Close(); err != nil {
return err
}
defer log.Info("update miner sectors complete")
return tx.Commit()
return updateTx.Commit()
}
func (st *storage) storeHeaders(bhs map[cid.Cid]*types.BlockHeader, sync bool) error {
@ -1252,27 +1294,3 @@ func (st *storage) refreshViews() error {
func (st *storage) close() error {
return st.db.Close()
}
func (st *storage) getLatestMinerSectorCID(ctx context.Context, miner address.Address) (cid.Cid, error) {
queryStr := fmt.Sprintf(`
SELECT miner_sectors_cid
FROM miner_sectors_heads
LEFT JOIN blocks ON miner_sectors_heads.state_root = blocks.parentstateroot
WHERE miner_id = '%s'
ORDER BY blocks.height DESC
LIMIT 1;
`,
miner.String())
var cidstr string
err := st.db.QueryRowContext(ctx, queryStr).Scan(&cidstr)
switch {
case err == sql.ErrNoRows:
log.Warnf("no miner with miner_id: %s in table", miner)
return cid.Undef, nil
case err != nil:
return cid.Undef, err
default:
return cid.Decode(cidstr)
}
}

View File

@ -59,6 +59,10 @@ type minerStateInfo struct {
act types.Actor
stateroot cid.Cid
// calculating changes
tsKey types.TipSetKey
parentTsKey types.TipSetKey
// miner specific
state miner.State
info *miner.MinerInfo
@ -73,6 +77,7 @@ type minerStateInfo struct {
type actorInfo struct {
stateroot cid.Cid
tsKey types.TipSetKey
parentTsKey types.TipSetKey
state string
}
@ -169,6 +174,8 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types.
if len(bh.Parents) == 0 { // genesis case
genesisTs, _ := types.NewTipSet([]*types.BlockHeader{bh})
st.genesisTs = genesisTs
aadrs, err := api.StateListActors(ctx, genesisTs.Key())
if err != nil {
log.Error(err)
@ -203,6 +210,7 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types.
actors[addr][*act] = actorInfo{
stateroot: bh.ParentStateRoot,
tsKey: genesisTs.Key(),
parentTsKey: genesisTs.Key(),
state: string(state),
}
addressToID[addr] = address.Undef
@ -237,7 +245,6 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types.
}
ast, err := api.StateReadState(ctx, addr, pts.Key())
if err != nil {
log.Error(err)
return
@ -259,6 +266,7 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types.
stateroot: bh.ParentStateRoot,
state: string(state),
tsKey: pts.Key(),
parentTsKey: pts.Parents(),
}
addressToID[addr] = address.Undef
alk.Unlock()
@ -314,6 +322,9 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types.
act: actor,
stateroot: c.stateroot,
tsKey: c.tsKey,
parentTsKey: c.parentTsKey,
state: miner.State{},
info: nil,
@ -430,6 +441,12 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types.
return
}
log.Info("updating miner sectors heads")
if err := st.updateMinerSectors(minerTips, api); err != nil {
log.Error(err)
return
}
log.Infof("Storing messages")
if err := st.storeMessages(msgs); err != nil {

146
journal/journal.go Normal file
View File

@ -0,0 +1,146 @@
package journal
import (
"encoding/json"
"fmt"
"os"
"path/filepath"
"sync"
"time"
logging "github.com/ipfs/go-log"
"golang.org/x/xerrors"
)
func InitializeSystemJournal(dir string) error {
if err := os.MkdirAll(dir, 0755); err != nil {
return err
}
j, err := OpenFSJournal(dir)
if err != nil {
return err
}
currentJournal = j
return nil
}
func Add(sys string, val interface{}) {
if currentJournal == nil {
log.Warn("no journal configured")
return
}
currentJournal.AddEntry(sys, val)
}
var log = logging.Logger("journal")
var currentJournal Journal
type Journal interface {
AddEntry(system string, obj interface{})
Close() error
}
// fsJournal is a basic journal backed by files on a filesystem
type fsJournal struct {
fi *os.File
fSize int64
lk sync.Mutex
journalDir string
incoming chan *JournalEntry
journalSizeLimit int64
closing chan struct{}
}
func OpenFSJournal(dir string) (*fsJournal, error) {
fsj := &fsJournal{
journalDir: dir,
incoming: make(chan *JournalEntry, 32),
journalSizeLimit: 1 << 30,
closing: make(chan struct{}),
}
if err := fsj.rollJournalFile(); err != nil {
return nil, err
}
go fsj.runLoop()
return fsj, nil
}
type JournalEntry struct {
System string
Timestamp time.Time
Val interface{}
}
func (fsj *fsJournal) putEntry(je *JournalEntry) error {
b, err := json.Marshal(je)
if err != nil {
return err
}
n, err := fsj.fi.Write(append(b, '\n'))
if err != nil {
return err
}
fsj.fSize += int64(n)
if fsj.fSize >= fsj.journalSizeLimit {
fsj.rollJournalFile()
}
return nil
}
func (fsj *fsJournal) rollJournalFile() error {
if fsj.fi != nil {
fsj.fi.Close()
}
nfi, err := os.Create(filepath.Join(fsj.journalDir, fmt.Sprintf("lotus-journal-%s.ndjson", time.Now().Format(time.RFC3339))))
if err != nil {
return xerrors.Errorf("failed to open journal file: %w", err)
}
fsj.fi = nfi
fsj.fSize = 0
return nil
}
func (fsj *fsJournal) runLoop() {
for {
select {
case je := <-fsj.incoming:
if err := fsj.putEntry(je); err != nil {
log.Errorw("failed to write out journal entry", "entry", je, "err", err)
}
case <-fsj.closing:
fsj.fi.Close()
return
}
}
}
func (fsj *fsJournal) AddEntry(system string, obj interface{}) {
je := &JournalEntry{
System: system,
Timestamp: time.Now(),
Val: obj,
}
select {
case fsj.incoming <- je:
case <-fsj.closing:
log.Warnw("journal closed but tried to log event", "entry", je)
}
}
func (fsj *fsJournal) Close() error {
close(fsj.closing)
return nil
}

View File

@ -415,7 +415,7 @@ func (c *ClientNodeAdapter) OnDealExpiredOrSlashed(ctx context.Context, dealID a
preds.OnDealStateChanged(
preds.DealStateChangedForIDs([]abi.DealID{dealID})))
match := func(oldTs, newTs *types.TipSet) (bool, events.StateChange, error) {
return dealDiff(ctx, oldTs, newTs)
return dealDiff(ctx, oldTs.Key(), newTs.Key())
}
if err := c.ev.StateChanged(checkFunc, stateChanged, revert, int(build.MessageConfidence)+1, build.SealRandomnessLookbackLimit, match); err != nil {
return xerrors.Errorf("failed to set up state changed handler: %w", err)

View File

@ -430,7 +430,7 @@ func (n *ProviderNodeAdapter) OnDealExpiredOrSlashed(ctx context.Context, dealID
preds.OnDealStateChanged(
preds.DealStateChangedForIDs([]abi.DealID{dealID})))
match := func(oldTs, newTs *types.TipSet) (bool, events.StateChange, error) {
return dealDiff(ctx, oldTs, newTs)
return dealDiff(ctx, oldTs.Key(), newTs.Key())
}
if err := n.ev.StateChanged(checkFunc, stateChanged, revert, int(build.MessageConfidence)+1, build.SealRandomnessLookbackLimit, match); err != nil {
return xerrors.Errorf("failed to set up state changed handler: %w", err)

View File

@ -119,6 +119,7 @@ const (
ExtractApiKey
HeadMetricsKey
RunPeerTaggerKey
JournalKey
SetApiEndpointKey
@ -150,6 +151,7 @@ func defaults() []Option {
Override(new(record.Validator), modules.RecordValidator),
Override(new(dtypes.Bootstrapper), dtypes.Bootstrapper(false)),
Override(new(dtypes.ShutdownChan), make(chan struct{})),
Override(JournalKey, modules.SetupJournal),
// Filecoin modules

View File

@ -6,6 +6,7 @@ import (
"errors"
"io"
"io/ioutil"
"path/filepath"
"github.com/gbrlsnchs/jwt/v3"
logging "github.com/ipfs/go-log/v2"
@ -18,6 +19,7 @@ import (
"github.com/filecoin-project/lotus/api/apistruct"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/journal"
"github.com/filecoin-project/lotus/lib/addrutil"
"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/node/repo"
@ -93,3 +95,7 @@ func BuiltinBootstrap() (dtypes.BootstrapPeers, error) {
func DrandBootstrap() (dtypes.DrandBootstrap, error) {
return build.DrandBootstrap()
}
func SetupJournal(lr repo.LockedRepo) error {
return journal.InitializeSystemJournal(filepath.Join(lr.Path(), "journal"))
}