Merge remote-tracking branch 'origin/master' into next
This commit is contained in:
commit
c1233291bc
@ -52,7 +52,7 @@ type FullNode interface {
|
|||||||
// the specified block.
|
// the specified block.
|
||||||
ChainGetParentReceipts(ctx context.Context, blockCid cid.Cid) ([]*types.MessageReceipt, error)
|
ChainGetParentReceipts(ctx context.Context, blockCid cid.Cid) ([]*types.MessageReceipt, error)
|
||||||
|
|
||||||
// ChainGetParentReceipts returns messages stored in parent tipset of the
|
// ChainGetParentMessages returns messages stored in parent tipset of the
|
||||||
// specified block.
|
// specified block.
|
||||||
ChainGetParentMessages(ctx context.Context, blockCid cid.Cid) ([]Message, error)
|
ChainGetParentMessages(ctx context.Context, blockCid cid.Cid) ([]Message, error)
|
||||||
|
|
||||||
@ -311,7 +311,7 @@ type FullNode interface {
|
|||||||
|
|
||||||
// MsigGetAvailableBalance returns the portion of a multisig's balance that can be withdrawn or spent
|
// MsigGetAvailableBalance returns the portion of a multisig's balance that can be withdrawn or spent
|
||||||
MsigGetAvailableBalance(context.Context, address.Address, types.TipSetKey) (types.BigInt, error)
|
MsigGetAvailableBalance(context.Context, address.Address, types.TipSetKey) (types.BigInt, error)
|
||||||
// MsigGetAvailableBalance creates a multisig wallet
|
// MsigCreate creates a multisig wallet
|
||||||
// It takes the following params: <required number of senders>, <approving addresses>, <unlock duration>
|
// It takes the following params: <required number of senders>, <approving addresses>, <unlock duration>
|
||||||
//<initial balance>, <sender address of the create msg>, <gas price>
|
//<initial balance>, <sender address of the create msg>, <gas price>
|
||||||
MsigCreate(context.Context, uint64, []address.Address, abi.ChainEpoch, types.BigInt, address.Address, types.BigInt) (cid.Cid, error)
|
MsigCreate(context.Context, uint64, []address.Address, abi.ChainEpoch, types.BigInt, address.Address, types.BigInt) (cid.Cid, error)
|
||||||
|
@ -112,6 +112,10 @@ func NewTipSet(blks []*BlockHeader) (*TipSet, error) {
|
|||||||
return nil, fmt.Errorf("cannot create tipset with mismatching heights")
|
return nil, fmt.Errorf("cannot create tipset with mismatching heights")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if len(blks[0].Parents) != len(b.Parents) {
|
||||||
|
return nil, fmt.Errorf("cannot create tipset with mismatching number of parents")
|
||||||
|
}
|
||||||
|
|
||||||
for i, cid := range b.Parents {
|
for i, cid := range b.Parents {
|
||||||
if cid != blks[0].Parents[i] {
|
if cid != blks[0].Parents[i] {
|
||||||
return nil, fmt.Errorf("cannot create tipset with mismatching parents")
|
return nil, fmt.Errorf("cannot create tipset with mismatching parents")
|
||||||
|
@ -51,7 +51,8 @@ var msigCreateCmd = &cli.Command{
|
|||||||
ArgsUsage: "[address1 address2 ...]",
|
ArgsUsage: "[address1 address2 ...]",
|
||||||
Flags: []cli.Flag{
|
Flags: []cli.Flag{
|
||||||
&cli.Int64Flag{
|
&cli.Int64Flag{
|
||||||
Name: "required",
|
Name: "required",
|
||||||
|
Usage: "number of required approvals (uses number of signers provided if omitted)",
|
||||||
},
|
},
|
||||||
&cli.StringFlag{
|
&cli.StringFlag{
|
||||||
Name: "value",
|
Name: "value",
|
||||||
|
@ -3,6 +3,7 @@ package main
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"database/sql"
|
"database/sql"
|
||||||
|
"strconv"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -330,8 +331,15 @@ create table if not exists miner_sectors_heads
|
|||||||
primary key (miner_id,miner_sectors_cid)
|
primary key (miner_id,miner_sectors_cid)
|
||||||
|
|
||||||
);
|
);
|
||||||
|
DO $$
|
||||||
create type miner_sector_event_type as enum ('ADDED', 'EXTENDED', 'EXPIRED', 'TERMINATED');
|
BEGIN
|
||||||
|
IF NOT EXISTS (SELECT 1 FROM pg_type WHERE typname = 'miner_sector_event_type') THEN
|
||||||
|
CREATE TYPE miner_sector_event_type AS ENUM
|
||||||
|
(
|
||||||
|
'ADDED','EXTENDED', 'EXPIRED', 'TERMINATED'
|
||||||
|
);
|
||||||
|
END IF;
|
||||||
|
END$$;
|
||||||
|
|
||||||
create table if not exists miner_sector_events
|
create table if not exists miner_sector_events
|
||||||
(
|
(
|
||||||
@ -342,7 +350,50 @@ create table if not exists miner_sector_events
|
|||||||
|
|
||||||
constraint miner_sector_events_pk
|
constraint miner_sector_events_pk
|
||||||
primary key (sector_id, event, miner_id, state_root)
|
primary key (sector_id, event, miner_id, state_root)
|
||||||
)
|
);
|
||||||
|
|
||||||
|
create table if not exists market_deal_proposals
|
||||||
|
(
|
||||||
|
deal_id bigint not null,
|
||||||
|
|
||||||
|
state_root text not null,
|
||||||
|
|
||||||
|
piece_cid text not null,
|
||||||
|
padded_piece_size bigint not null,
|
||||||
|
unpadded_piece_size bigint not null,
|
||||||
|
is_verified bool not null,
|
||||||
|
|
||||||
|
client_id text not null,
|
||||||
|
provider_id text not null,
|
||||||
|
|
||||||
|
start_epoch bigint not null,
|
||||||
|
end_epoch bigint not null,
|
||||||
|
slashed_epoch bigint,
|
||||||
|
storage_price_per_epoch text not null,
|
||||||
|
|
||||||
|
provider_collateral text not null,
|
||||||
|
client_collateral text not null,
|
||||||
|
|
||||||
|
constraint market_deal_proposal_pk
|
||||||
|
primary key (deal_id)
|
||||||
|
);
|
||||||
|
|
||||||
|
create table if not exists market_deal_states
|
||||||
|
(
|
||||||
|
deal_id bigint not null,
|
||||||
|
|
||||||
|
sector_start_epoch bigint not null,
|
||||||
|
last_update_epoch bigint not null,
|
||||||
|
slash_epoch bigint not null,
|
||||||
|
|
||||||
|
state_root text not null,
|
||||||
|
|
||||||
|
unique (deal_id, sector_start_epoch, last_update_epoch, slash_epoch),
|
||||||
|
|
||||||
|
constraint market_deal_states_pk
|
||||||
|
primary key (deal_id, state_root)
|
||||||
|
|
||||||
|
);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
create or replace function miner_tips(epoch bigint)
|
create or replace function miner_tips(epoch bigint)
|
||||||
@ -852,7 +903,7 @@ func (st *storage) updateMinerSectors(minerTips map[types.TipSetKey][]*minerStat
|
|||||||
}
|
}
|
||||||
|
|
||||||
for _, added := range changes.Added {
|
for _, added := range changes.Added {
|
||||||
if _, err := eventStmt.Exec(miner.addr.String(), added.SectorNumber, miner.stateroot.String(), "ADDED"); err != nil {
|
if _, err := eventStmt.Exec(added.SectorNumber, "ADDED", miner.addr.String(), miner.stateroot.String()); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -900,6 +951,172 @@ func (st *storage) updateMinerSectors(minerTips map[types.TipSetKey][]*minerStat
|
|||||||
return updateTx.Commit()
|
return updateTx.Commit()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (st *storage) storeMarketActorDealStates(marketTips map[types.TipSetKey]*marketStateInfo, tipHeights []tipsetKeyHeight, api api.FullNode) error {
|
||||||
|
start := time.Now()
|
||||||
|
defer func() {
|
||||||
|
log.Infow("Stored Market Deal States", "duration", time.Since(start).String())
|
||||||
|
}()
|
||||||
|
tx, err := st.db.Begin()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if _, err := tx.Exec(`create temp table mds (like market_deal_states excluding constraints) on commit drop;`); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
stmt, err := tx.Prepare(`copy mds (deal_id, sector_start_epoch, last_update_epoch, slash_epoch, state_root) from STDIN`)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
for _, th := range tipHeights {
|
||||||
|
mt := marketTips[th.tsKey]
|
||||||
|
dealStates, err := api.StateMarketDeals(context.TODO(), mt.tsKey)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
for dealID, ds := range dealStates {
|
||||||
|
id, err := strconv.ParseUint(dealID, 10, 64)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, err := stmt.Exec(
|
||||||
|
id,
|
||||||
|
ds.State.SectorStartEpoch,
|
||||||
|
ds.State.LastUpdatedEpoch,
|
||||||
|
ds.State.SlashEpoch,
|
||||||
|
mt.stateroot.String(),
|
||||||
|
); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if err := stmt.Close(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, err := tx.Exec(`insert into market_deal_states select * from mds on conflict do nothing`); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return tx.Commit()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (st *storage) storeMarketActorDealProposals(marketTips map[types.TipSetKey]*marketStateInfo, tipHeights []tipsetKeyHeight, api api.FullNode) error {
|
||||||
|
start := time.Now()
|
||||||
|
defer func() {
|
||||||
|
log.Infow("Stored Market Deal Proposals", "duration", time.Since(start).String())
|
||||||
|
}()
|
||||||
|
tx, err := st.db.Begin()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, err := tx.Exec(`create temp table mdp (like market_deal_proposals excluding constraints) on commit drop;`); err != nil {
|
||||||
|
return xerrors.Errorf("prep temp: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
stmt, err := tx.Prepare(`copy mdp (deal_id, state_root, piece_cid, padded_piece_size, unpadded_piece_size, is_verified, client_id, provider_id, start_epoch, end_epoch, slashed_epoch, storage_price_per_epoch, provider_collateral, client_collateral) from STDIN`)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// insert in sorted order (lowest height -> highest height) since dealid is pk of table.
|
||||||
|
for _, th := range tipHeights {
|
||||||
|
mt := marketTips[th.tsKey]
|
||||||
|
dealStates, err := api.StateMarketDeals(context.TODO(), mt.tsKey)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
for dealID, ds := range dealStates {
|
||||||
|
id, err := strconv.ParseUint(dealID, 10, 64)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, err := stmt.Exec(
|
||||||
|
id,
|
||||||
|
mt.stateroot.String(),
|
||||||
|
ds.Proposal.PieceCID.String(),
|
||||||
|
ds.Proposal.PieceSize,
|
||||||
|
ds.Proposal.PieceSize.Unpadded(),
|
||||||
|
ds.Proposal.VerifiedDeal,
|
||||||
|
ds.Proposal.Client.String(),
|
||||||
|
ds.Proposal.Provider.String(),
|
||||||
|
ds.Proposal.StartEpoch,
|
||||||
|
ds.Proposal.EndEpoch,
|
||||||
|
nil, // slashed_epoch
|
||||||
|
ds.Proposal.StoragePricePerEpoch.String(),
|
||||||
|
ds.Proposal.ProviderCollateral.String(),
|
||||||
|
ds.Proposal.ClientCollateral.String(),
|
||||||
|
); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if err := stmt.Close(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if _, err := tx.Exec(`insert into market_deal_proposals select * from mdp on conflict do nothing`); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return tx.Commit()
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func (st *storage) updateMarketActorDealProposals(marketTip map[types.TipSetKey]*marketStateInfo, tipHeights []tipsetKeyHeight, api api.FullNode) error {
|
||||||
|
start := time.Now()
|
||||||
|
defer func() {
|
||||||
|
log.Infow("Updated Market Deal Proposals", "duration", time.Since(start).String())
|
||||||
|
}()
|
||||||
|
pred := state.NewStatePredicates(api)
|
||||||
|
|
||||||
|
tx, err := st.db.Begin()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
stmt, err := tx.Prepare(`update market_deal_proposals set slashed_epoch=$1 where deal_id=$2`)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, th := range tipHeights {
|
||||||
|
mt := marketTip[th.tsKey]
|
||||||
|
stateDiff := pred.OnStorageMarketActorChanged(pred.OnDealStateChanged(pred.OnDealStateAmtChanged()))
|
||||||
|
|
||||||
|
changed, val, err := stateDiff(context.TODO(), mt.parentTsKey, mt.tsKey)
|
||||||
|
if err != nil {
|
||||||
|
log.Warnw("error getting market deal state diff", "error", err)
|
||||||
|
}
|
||||||
|
if !changed {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
changes, ok := val.(*state.MarketDealStateChanges)
|
||||||
|
if !ok {
|
||||||
|
return xerrors.Errorf("Unknown type returned by Deal State AMT predicate: %T", val)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, modified := range changes.Modified {
|
||||||
|
if modified.From.SlashEpoch != modified.To.SlashEpoch {
|
||||||
|
if _, err := stmt.Exec(modified.To.SlashEpoch, modified.ID); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := stmt.Close(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return tx.Commit()
|
||||||
|
}
|
||||||
|
|
||||||
func (st *storage) storeHeaders(bhs map[cid.Cid]*types.BlockHeader, sync bool) error {
|
func (st *storage) storeHeaders(bhs map[cid.Cid]*types.BlockHeader, sync bool) error {
|
||||||
st.headerLk.Lock()
|
st.headerLk.Lock()
|
||||||
defer st.headerLk.Unlock()
|
defer st.headerLk.Unlock()
|
||||||
|
@ -7,6 +7,7 @@ import (
|
|||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"math"
|
"math"
|
||||||
|
"sort"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -18,6 +19,7 @@ import (
|
|||||||
"github.com/filecoin-project/specs-actors/actors/abi"
|
"github.com/filecoin-project/specs-actors/actors/abi"
|
||||||
"github.com/filecoin-project/specs-actors/actors/abi/big"
|
"github.com/filecoin-project/specs-actors/actors/abi/big"
|
||||||
"github.com/filecoin-project/specs-actors/actors/builtin"
|
"github.com/filecoin-project/specs-actors/actors/builtin"
|
||||||
|
"github.com/filecoin-project/specs-actors/actors/builtin/market"
|
||||||
"github.com/filecoin-project/specs-actors/actors/builtin/miner"
|
"github.com/filecoin-project/specs-actors/actors/builtin/miner"
|
||||||
"github.com/filecoin-project/specs-actors/actors/builtin/power"
|
"github.com/filecoin-project/specs-actors/actors/builtin/power"
|
||||||
"github.com/filecoin-project/specs-actors/actors/builtin/reward"
|
"github.com/filecoin-project/specs-actors/actors/builtin/reward"
|
||||||
@ -81,6 +83,20 @@ type minerStateInfo struct {
|
|||||||
psize uint64
|
psize uint64
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type marketStateInfo struct {
|
||||||
|
// common
|
||||||
|
act types.Actor
|
||||||
|
stateroot cid.Cid
|
||||||
|
|
||||||
|
// calculating changes
|
||||||
|
// calculating changes
|
||||||
|
tsKey types.TipSetKey
|
||||||
|
parentTsKey types.TipSetKey
|
||||||
|
|
||||||
|
// market actor specific
|
||||||
|
state market.State
|
||||||
|
}
|
||||||
|
|
||||||
type actorInfo struct {
|
type actorInfo struct {
|
||||||
stateroot cid.Cid
|
stateroot cid.Cid
|
||||||
tsKey types.TipSetKey
|
tsKey types.TipSetKey
|
||||||
@ -88,6 +104,11 @@ type actorInfo struct {
|
|||||||
state string
|
state string
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type tipsetKeyHeight struct {
|
||||||
|
height abi.ChainEpoch
|
||||||
|
tsKey types.TipSetKey
|
||||||
|
}
|
||||||
|
|
||||||
func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types.TipSet, maxBatch int) {
|
func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types.TipSet, maxBatch int) {
|
||||||
var alk sync.Mutex
|
var alk sync.Mutex
|
||||||
|
|
||||||
@ -169,6 +190,9 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types.
|
|||||||
|
|
||||||
log.Infow("Starting Sync", "height", minH, "numBlocks", len(toSync), "maxBatch", maxBatch)
|
log.Infow("Starting Sync", "height", minH, "numBlocks", len(toSync), "maxBatch", maxBatch)
|
||||||
|
|
||||||
|
// relate tipset keys to height so they may be processed in ascending order.
|
||||||
|
var tipHeights []tipsetKeyHeight
|
||||||
|
tipsSeen := make(map[types.TipSetKey]struct{})
|
||||||
// map of addresses to changed actors
|
// map of addresses to changed actors
|
||||||
var changes map[string]types.Actor
|
var changes map[string]types.Actor
|
||||||
// collect all actor state that has changes between block headers
|
// collect all actor state that has changes between block headers
|
||||||
@ -276,9 +300,20 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types.
|
|||||||
parentTsKey: pts.Parents(),
|
parentTsKey: pts.Parents(),
|
||||||
}
|
}
|
||||||
addressToID[addr] = address.Undef
|
addressToID[addr] = address.Undef
|
||||||
|
if _, ok := tipsSeen[pts.Key()]; !ok {
|
||||||
|
tipHeights = append(tipHeights, tipsetKeyHeight{
|
||||||
|
height: pts.Height(),
|
||||||
|
tsKey: pts.Key(),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
tipsSeen[pts.Key()] = struct{}{}
|
||||||
alk.Unlock()
|
alk.Unlock()
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
// sort tipHeights in ascending order.
|
||||||
|
sort.Slice(tipHeights, func(i, j int) bool {
|
||||||
|
return tipHeights[i].height < tipHeights[j].height
|
||||||
|
})
|
||||||
|
|
||||||
// map of tipset to reward state
|
// map of tipset to reward state
|
||||||
rewardTips := make(map[types.TipSetKey]*rewardStateInfo, len(changes))
|
rewardTips := make(map[types.TipSetKey]*rewardStateInfo, len(changes))
|
||||||
@ -313,23 +348,28 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types.
|
|||||||
|
|
||||||
log.Infof("Getting actor change info")
|
log.Infof("Getting actor change info")
|
||||||
|
|
||||||
|
// highly likely that the market actor will change at every epoch
|
||||||
|
marketActorChanges := make(map[types.TipSetKey]*marketStateInfo, len(changes))
|
||||||
|
|
||||||
minerChanges := 0
|
minerChanges := 0
|
||||||
for addr, m := range actors {
|
for addr, m := range actors {
|
||||||
for actor, c := range m {
|
for actor, c := range m {
|
||||||
// reward actor
|
// only want actors with head change events
|
||||||
if actor.Code == builtin.RewardActorCodeID {
|
if _, found := headsSeen[actor.Head]; found {
|
||||||
rewardTips[c.tsKey] = &rewardStateInfo{
|
|
||||||
stateroot: c.stateroot,
|
|
||||||
baselinePower: big.Zero(),
|
|
||||||
}
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
headsSeen[actor.Head] = struct{}{}
|
||||||
|
|
||||||
// miner actors with head change events
|
switch actor.Code {
|
||||||
if actor.Code == builtin.StorageMinerActorCodeID {
|
case builtin.StorageMarketActorCodeID:
|
||||||
if _, found := headsSeen[actor.Head]; found {
|
marketActorChanges[c.tsKey] = &marketStateInfo{
|
||||||
continue
|
act: actor,
|
||||||
|
stateroot: c.stateroot,
|
||||||
|
tsKey: c.tsKey,
|
||||||
|
parentTsKey: c.parentTsKey,
|
||||||
|
state: market.State{},
|
||||||
}
|
}
|
||||||
|
case builtin.StorageMinerActorCodeID:
|
||||||
minerChanges++
|
minerChanges++
|
||||||
|
|
||||||
minerTips[c.tsKey] = append(minerTips[c.tsKey], &minerStateInfo{
|
minerTips[c.tsKey] = append(minerTips[c.tsKey], &minerStateInfo{
|
||||||
@ -346,10 +386,14 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types.
|
|||||||
rawPower: big.Zero(),
|
rawPower: big.Zero(),
|
||||||
qalPower: big.Zero(),
|
qalPower: big.Zero(),
|
||||||
})
|
})
|
||||||
|
// reward actor
|
||||||
headsSeen[actor.Head] = struct{}{}
|
case builtin.RewardActorCodeID:
|
||||||
|
rewardTips[c.tsKey] = &rewardStateInfo{
|
||||||
|
stateroot: c.stateroot,
|
||||||
|
baselinePower: big.Zero(),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
continue
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -439,6 +483,21 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types.
|
|||||||
})
|
})
|
||||||
log.Infow("Completed Miner Processing", "duration", time.Since(minerProcessingStartedAt).String(), "processed", minerChanges)
|
log.Infow("Completed Miner Processing", "duration", time.Since(minerProcessingStartedAt).String(), "processed", minerChanges)
|
||||||
|
|
||||||
|
log.Info("Getting market actor info")
|
||||||
|
// TODO: consider taking the min of the array length and using that for concurrency param, e.g:
|
||||||
|
// concurrency := math.Min(len(marketActorChanges), 50)
|
||||||
|
parmap.Par(50, parmap.MapArr(marketActorChanges), func(mrktInfo *marketStateInfo) {
|
||||||
|
astb, err := api.ChainReadObj(ctx, mrktInfo.act.Head)
|
||||||
|
if err != nil {
|
||||||
|
log.Error(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if err := mrktInfo.state.UnmarshalCBOR(bytes.NewReader(astb)); err != nil {
|
||||||
|
log.Error(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
log.Info("Getting receipts")
|
log.Info("Getting receipts")
|
||||||
|
|
||||||
receipts := fetchParentReceipts(ctx, api, toSync)
|
receipts := fetchParentReceipts(ctx, api, toSync)
|
||||||
@ -500,6 +559,24 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types.
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
log.Info("Storing market actor deal proposal info")
|
||||||
|
if err := st.storeMarketActorDealProposals(marketActorChanges, tipHeights, api); err != nil {
|
||||||
|
log.Error(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Info("Storing market actor deal state info")
|
||||||
|
if err := st.storeMarketActorDealStates(marketActorChanges, tipHeights, api); err != nil {
|
||||||
|
log.Error(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Info("Updating market actor deal proposal info")
|
||||||
|
if err := st.updateMarketActorDealProposals(marketActorChanges, tipHeights, api); err != nil {
|
||||||
|
log.Error(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
log.Infof("Storing messages")
|
log.Infof("Storing messages")
|
||||||
|
|
||||||
if err := st.storeMessages(msgs); err != nil {
|
if err := st.storeMessages(msgs); err != nil {
|
||||||
|
86
lib/cachebs/cachebs.go
Normal file
86
lib/cachebs/cachebs.go
Normal file
@ -0,0 +1,86 @@
|
|||||||
|
package cachebs
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
lru "github.com/hashicorp/golang-lru"
|
||||||
|
block "github.com/ipfs/go-block-format"
|
||||||
|
"github.com/ipfs/go-cid"
|
||||||
|
blockstore "github.com/ipfs/go-ipfs-blockstore"
|
||||||
|
bstore "github.com/ipfs/go-ipfs-blockstore"
|
||||||
|
logging "github.com/ipfs/go-log/v2"
|
||||||
|
)
|
||||||
|
|
||||||
|
var log = logging.Logger("cachebs")
|
||||||
|
|
||||||
|
type CacheBS struct {
|
||||||
|
cache *lru.ARCCache
|
||||||
|
bs bstore.Blockstore
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewBufferedBstore(base blockstore.Blockstore, size int) *CacheBS {
|
||||||
|
c, err := lru.NewARC(size)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
return &CacheBS{
|
||||||
|
cache: c,
|
||||||
|
bs: base,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ (bstore.Blockstore) = &CacheBS{}
|
||||||
|
|
||||||
|
func (bs *CacheBS) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
|
||||||
|
return bs.bs.AllKeysChan(ctx)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (bs *CacheBS) DeleteBlock(c cid.Cid) error {
|
||||||
|
bs.cache.Remove(c)
|
||||||
|
|
||||||
|
return bs.bs.DeleteBlock(c)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (bs *CacheBS) Get(c cid.Cid) (block.Block, error) {
|
||||||
|
v, ok := bs.cache.Get(c)
|
||||||
|
if ok {
|
||||||
|
return v.(block.Block), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
out, err := bs.bs.Get(c)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
bs.cache.Add(c, out)
|
||||||
|
return out, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (bs *CacheBS) GetSize(c cid.Cid) (int, error) {
|
||||||
|
return bs.bs.GetSize(c)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (bs *CacheBS) Put(blk block.Block) error {
|
||||||
|
bs.cache.Add(blk.Cid(), blk)
|
||||||
|
|
||||||
|
return bs.bs.Put(blk)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (bs *CacheBS) Has(c cid.Cid) (bool, error) {
|
||||||
|
if bs.cache.Contains(c) {
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return bs.bs.Has(c)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (bs *CacheBS) HashOnRead(hor bool) {
|
||||||
|
bs.bs.HashOnRead(hor)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (bs *CacheBS) PutMany(blks []block.Block) error {
|
||||||
|
for _, blk := range blks {
|
||||||
|
bs.cache.Add(blk.Cid(), blk)
|
||||||
|
}
|
||||||
|
return bs.bs.PutMany(blks)
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user