2020-07-14 20:02:44 +00:00
package processor
import (
"bytes"
"context"
"fmt"
"strings"
"sync"
"time"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/lotus/chain/events/state"
"github.com/filecoin-project/specs-actors/actors/abi"
"golang.org/x/sync/errgroup"
"golang.org/x/xerrors"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/filecoin-project/specs-actors/actors/abi/big"
"github.com/filecoin-project/specs-actors/actors/builtin"
"github.com/filecoin-project/specs-actors/actors/builtin/miner"
"github.com/filecoin-project/specs-actors/actors/builtin/power"
"github.com/filecoin-project/specs-actors/actors/util/adt"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/types"
cw_util "github.com/filecoin-project/lotus/cmd/lotus-chainwatch/util"
)
func ( p * Processor ) setupMiners ( ) error {
tx , err := p . db . Begin ( )
if err != nil {
return err
}
if _ , err := tx . Exec ( `
create table if not exists miner_info
(
miner_id text not null ,
owner_addr text not null ,
worker_addr text not null ,
peer_id text ,
sector_size text not null ,
precommit_deposits text not null ,
locked_funds text not null ,
next_deadline_process_faults bigint not null ,
constraint miner_info_pk
primary key ( miner_id )
) ;
/ *
* captures miner - specific power state for any given stateroot
* /
create table if not exists miner_power
(
miner_id text not null ,
state_root text not null ,
raw_bytes_power text not null ,
quality_adjusted_power text not null ,
constraint miner_power_pk
primary key ( miner_id , state_root )
) ;
2020-07-16 00:22:48 +00:00
create table if not exists miner_precommits
(
miner_id text not null ,
sector_id bigint not null ,
precommit_deposit text not null ,
precommit_epoch text not null ,
constraint miner_precommits_pk
primary key ( miner_id , sector_id )
) ;
create table if not exists miner_sectors
(
miner_id text not null ,
sector_id bigint not null ,
activation_epoch bigint not null ,
expiration_epoch bigint not null ,
termination_epoch bigint ,
deal_weight text not null ,
verified_deal_weight text not null ,
seal_cid text not null ,
seal_rand_epoch bigint not null ,
constraint miner_sectors_pk
primary key ( miner_id , sector_id )
) ;
2020-07-14 20:02:44 +00:00
/* used to tell when a miners sectors (proven-not-yet-expired) changed if the miner_sectors_cid's are different a new sector was added or removed (terminated/expired) */
create table if not exists miner_sectors_heads
(
miner_id text not null ,
miner_sectors_cid text not null ,
state_root text not null ,
constraint miner_sectors_heads_pk
primary key ( miner_id , miner_sectors_cid )
) ;
2020-07-16 00:22:48 +00:00
2020-07-14 20:02:44 +00:00
DO $ $
BEGIN
IF NOT EXISTS ( SELECT 1 FROM pg_type WHERE typname = ' miner_sector_event_type ' ) THEN
CREATE TYPE miner_sector_event_type AS ENUM
(
2020-07-16 00:22:48 +00:00
' PRECOMMIT ' , ' COMMIT ' , ' EXTENDED ' , ' EXPIRED ' , ' TERMINATED '
2020-07-14 20:02:44 +00:00
) ;
END IF ;
END $ $ ;
create table if not exists miner_sector_events
(
miner_id text not null ,
sector_id bigint not null ,
state_root text not null ,
event miner_sector_event_type not null ,
constraint miner_sector_events_pk
primary key ( sector_id , event , miner_id , state_root )
2020-07-16 00:22:48 +00:00
) ;
create materialized view if not exists miner_sectors_view as
select ms . miner_id , ms . sector_id , mp . precommit_epoch , ms . activation_epoch , ms . expiration_epoch , ms . termination_epoch , ms . deal_weight , ms . verified_deal_weight
from miner_sectors ms
left join miner_precommits mp on ms . sector_id = mp . sector_id and ms . miner_id = mp . miner_id
2020-07-14 20:02:44 +00:00
` ) ; err != nil {
return err
}
return tx . Commit ( )
}
type minerActorInfo struct {
common actorInfo
state miner . State
// tracked by power actor
rawPower big . Int
qalPower big . Int
}
type sectorUpdate struct {
terminationEpoch abi . ChainEpoch
terminated bool
expirationEpoch abi . ChainEpoch
sectorID abi . SectorNumber
minerID address . Address
}
func ( p * Processor ) HandleMinerChanges ( ctx context . Context , minerTips ActorTips ) error {
minerChanges , err := p . processMiners ( ctx , minerTips )
if err != nil {
log . Fatalw ( "Failed to process miner actors" , "error" , err )
}
if err := p . persistMiners ( ctx , minerChanges ) ; err != nil {
log . Fatalw ( "Failed to persist miner actors" , "error" , err )
}
if err := p . updateMiners ( ctx , minerChanges ) ; err != nil {
log . Fatalw ( "Failed to update miner actors" , "error" , err )
}
return nil
}
func ( p * Processor ) processMiners ( ctx context . Context , minerTips map [ types . TipSetKey ] [ ] actorInfo ) ( [ ] minerActorInfo , error ) {
start := time . Now ( )
defer func ( ) {
2020-07-16 14:45:31 +00:00
log . Debugw ( "Processed Miners" , "duration" , time . Since ( start ) . String ( ) )
2020-07-14 20:02:44 +00:00
} ( )
var out [ ] minerActorInfo
// TODO add parallel calls if this becomes slow
for tipset , miners := range minerTips {
// get the power actors claims map
minersClaims , err := getPowerActorClaimsMap ( ctx , p . node , tipset )
if err != nil {
return nil , err
}
// Get miner raw and quality power
for _ , act := range miners {
var mi minerActorInfo
mi . common = act
var claim power . Claim
// get miner claim from power actors claim map and store if found, else the miner had no claim at
// this tipset
found , err := minersClaims . Get ( adt . AddrKey ( act . addr ) , & claim )
if err != nil {
return nil , err
}
if found {
mi . qalPower = claim . QualityAdjPower
mi . rawPower = claim . RawBytePower
}
// Get the miner state info
astb , err := p . node . ChainReadObj ( ctx , act . act . Head )
if err != nil {
log . Warnw ( "failed to find miner actor state" , "address" , act . addr , "error" , err )
continue
}
if err := mi . state . UnmarshalCBOR ( bytes . NewReader ( astb ) ) ; err != nil {
return nil , err
}
out = append ( out , mi )
}
}
return out , nil
}
func ( p * Processor ) persistMiners ( ctx context . Context , miners [ ] minerActorInfo ) error {
start := time . Now ( )
defer func ( ) {
2020-07-16 14:45:31 +00:00
log . Debugw ( "Persisted Miners" , "duration" , time . Since ( start ) . String ( ) )
2020-07-14 20:02:44 +00:00
} ( )
grp , _ := errgroup . WithContext ( ctx )
grp . Go ( func ( ) error {
if err := p . storeMinersActorState ( miners ) ; err != nil {
return err
}
return nil
} )
grp . Go ( func ( ) error {
if err := p . storeMinersPower ( miners ) ; err != nil {
return err
}
return nil
} )
grp . Go ( func ( ) error {
2020-07-16 00:22:48 +00:00
if err := p . storeMinersSectorState ( ctx , miners ) ; err != nil {
2020-07-14 20:02:44 +00:00
return err
}
return nil
} )
grp . Go ( func ( ) error {
if err := p . storeMinersSectorHeads ( miners ) ; err != nil {
return err
}
return nil
} )
2020-07-16 00:22:48 +00:00
grp . Go ( func ( ) error {
if err := p . storeMinersPreCommitState ( ctx , miners ) ; err != nil {
return err
}
return nil
} )
2020-07-14 20:02:44 +00:00
return grp . Wait ( )
}
func ( p * Processor ) storeMinersActorState ( miners [ ] minerActorInfo ) error {
start := time . Now ( )
defer func ( ) {
2020-07-16 14:45:31 +00:00
log . Debugw ( "Stored Miners Actor State" , "duration" , time . Since ( start ) . String ( ) )
2020-07-14 20:02:44 +00:00
} ( )
tx , err := p . db . Begin ( )
if err != nil {
return err
}
if _ , err := tx . Exec ( ` create temp table mi (like miner_info excluding constraints) on commit drop; ` ) ; err != nil {
return xerrors . Errorf ( "prep temp: %w" , err )
}
stmt , err := tx . Prepare ( ` copy mi (miner_id, owner_addr, worker_addr, peer_id, sector_size, precommit_deposits, locked_funds, next_deadline_process_faults) from STDIN ` )
if err != nil {
return err
}
for _ , m := range miners {
var pid string
if len ( m . state . Info . PeerId ) != 0 {
peerid , err := peer . IDFromBytes ( m . state . Info . PeerId )
if err != nil {
// this should "never happen", but if it does we should still store info about the miner.
log . Warnw ( "failed to decode peerID" , "peerID (bytes)" , m . state . Info . PeerId , "miner" , m . common . addr , "tipset" , m . common . tsKey . String ( ) )
} else {
pid = peerid . String ( )
}
}
if _ , err := stmt . Exec (
m . common . addr . String ( ) ,
m . state . Info . Owner . String ( ) ,
m . state . Info . Worker . String ( ) ,
pid ,
m . state . Info . SectorSize . ShortString ( ) ,
m . state . PreCommitDeposits . String ( ) ,
m . state . LockedFunds . String ( ) ,
m . state . NextDeadlineToProcessFaults ,
) ; err != nil {
log . Errorw ( "failed to store miner state" , "state" , m . state , "info" , m . state . Info , "error" , err )
return xerrors . Errorf ( "failed to store miner state: %w" , err )
}
}
if err := stmt . Close ( ) ; err != nil {
return err
}
if _ , err := tx . Exec ( ` insert into miner_info select * from mi on conflict do nothing ` ) ; err != nil {
return xerrors . Errorf ( "actor put: %w" , err )
}
return tx . Commit ( )
}
func ( p * Processor ) storeMinersPower ( miners [ ] minerActorInfo ) error {
start := time . Now ( )
defer func ( ) {
2020-07-16 14:45:31 +00:00
log . Debugw ( "Stored Miners Power" , "duration" , time . Since ( start ) . String ( ) )
2020-07-14 20:02:44 +00:00
} ( )
tx , err := p . db . Begin ( )
if err != nil {
return xerrors . Errorf ( "begin miner_power tx: %w" , err )
}
if _ , err := tx . Exec ( ` create temp table mp (like miner_power excluding constraints) on commit drop ` ) ; err != nil {
return xerrors . Errorf ( "prep miner_power temp: %w" , err )
}
stmt , err := tx . Prepare ( ` copy mp (miner_id, state_root, raw_bytes_power, quality_adjusted_power) from STDIN ` )
if err != nil {
return xerrors . Errorf ( "prepare tmp miner_power: %w" , err )
}
for _ , m := range miners {
if _ , err := stmt . Exec (
m . common . addr . String ( ) ,
m . common . stateroot . String ( ) ,
m . rawPower . String ( ) ,
m . qalPower . String ( ) ,
) ; err != nil {
log . Errorw ( "failed to store miner power" , "miner" , m . common . addr , "stateroot" , m . common . stateroot , "error" , err )
}
}
if err := stmt . Close ( ) ; err != nil {
return xerrors . Errorf ( "close prepared miner_power: %w" , err )
}
if _ , err := tx . Exec ( ` insert into miner_power select * from mp on conflict do nothing ` ) ; err != nil {
return xerrors . Errorf ( "insert miner_power from tmp: %w" , err )
}
if err := tx . Commit ( ) ; err != nil {
return xerrors . Errorf ( "commit miner_power tx: %w" , err )
}
return nil
}
2020-07-16 00:22:48 +00:00
func ( p * Processor ) storeMinersSectorState ( ctx context . Context , miners [ ] minerActorInfo ) error {
2020-07-14 20:02:44 +00:00
start := time . Now ( )
defer func ( ) {
2020-07-16 14:45:31 +00:00
log . Debugw ( "Stored Miners Sector State" , "duration" , time . Since ( start ) . String ( ) )
2020-07-14 20:02:44 +00:00
} ( )
tx , err := p . db . Begin ( )
if err != nil {
return err
}
if _ , err := tx . Exec ( ` create temp table ms (like miner_sectors excluding constraints) on commit drop; ` ) ; err != nil {
return xerrors . Errorf ( "prep temp: %w" , err )
}
stmt , err := tx . Prepare ( ` copy ms (miner_id, sector_id, activation_epoch, expiration_epoch, deal_weight, verified_deal_weight, seal_cid, seal_rand_epoch) from STDIN ` )
if err != nil {
return err
}
2020-07-16 00:22:48 +00:00
grp , ctx := errgroup . WithContext ( ctx )
2020-07-14 20:02:44 +00:00
for _ , m := range miners {
m := m
grp . Go ( func ( ) error {
sectors , err := p . node . StateMinerSectors ( ctx , m . common . addr , nil , true , m . common . tsKey )
if err != nil {
log . Debugw ( "Failed to load sectors" , "tipset" , m . common . tsKey . String ( ) , "miner" , m . common . addr . String ( ) , "error" , err )
}
for _ , sector := range sectors {
if _ , err := stmt . Exec (
m . common . addr . String ( ) ,
uint64 ( sector . ID ) ,
int64 ( sector . Info . ActivationEpoch ) ,
int64 ( sector . Info . Info . Expiration ) ,
sector . Info . DealWeight . String ( ) ,
sector . Info . VerifiedDealWeight . String ( ) ,
sector . Info . Info . SealedCID . String ( ) ,
int64 ( sector . Info . Info . SealRandEpoch ) ,
) ; err != nil {
return err
}
}
return nil
} )
}
if err := grp . Wait ( ) ; err != nil {
return err
}
if err := stmt . Close ( ) ; err != nil {
return err
}
if _ , err := tx . Exec ( ` insert into miner_sectors select * from ms on conflict do nothing ` ) ; err != nil {
return xerrors . Errorf ( "actor put: %w" , err )
}
return tx . Commit ( )
}
func ( p * Processor ) storeMinersSectorHeads ( miners [ ] minerActorInfo ) error {
start := time . Now ( )
defer func ( ) {
2020-07-16 14:45:31 +00:00
log . Debugw ( "Stored Miners Sector Heads" , "duration" , time . Since ( start ) . String ( ) )
2020-07-14 20:02:44 +00:00
} ( )
tx , err := p . db . Begin ( )
if err != nil {
return err
}
if _ , err := tx . Exec ( ` create temp table msh (like miner_sectors_heads excluding constraints) on commit drop; ` ) ; err != nil {
return xerrors . Errorf ( "prep temp: %w" , err )
}
stmt , err := tx . Prepare ( ` copy msh (miner_id, miner_sectors_cid, state_root) from STDIN ` )
if err != nil {
return err
}
for _ , m := range miners {
if _ , err := stmt . Exec (
m . common . addr . String ( ) ,
m . state . Sectors . String ( ) ,
m . common . stateroot . String ( ) ,
) ; err != nil {
log . Errorw ( "failed to store miners sectors head" , "state" , m . state , "info" , m . state . Info , "error" , err )
return err
}
}
if err := stmt . Close ( ) ; err != nil {
return err
}
if _ , err := tx . Exec ( ` insert into miner_sectors_heads select * from msh on conflict do nothing ` ) ; err != nil {
return xerrors . Errorf ( "actor put: %w" , err )
}
return tx . Commit ( )
}
2020-07-16 00:22:48 +00:00
func ( p * Processor ) storeMinersPreCommitState ( ctx context . Context , miners [ ] minerActorInfo ) error {
start := time . Now ( )
defer func ( ) {
log . Infow ( "Stored Miners Precommit State" , "duration" , time . Since ( start ) . String ( ) )
} ( )
precommitTx , err := p . db . Begin ( )
if err != nil {
return err
}
if _ , err := precommitTx . Exec ( ` create temp table mp (like miner_precommits excluding constraints) on commit drop; ` ) ; err != nil {
return xerrors . Errorf ( "prep temp: %w" , err )
}
precommitStmt , err := precommitTx . Prepare ( ` copy mp (miner_id, sector_id, precommit_deposit, precommit_epoch) from STDIN ` )
if err != nil {
return err
}
for _ , m := range miners {
m := m
pcMap , err := adt . AsMap ( cw_util . NewAPIIpldStore ( ctx , p . node ) , m . state . PreCommittedSectors )
if err != nil {
return err
}
precommit := new ( miner . SectorPreCommitOnChainInfo )
if err := pcMap . ForEach ( precommit , func ( key string ) error {
if _ , err := precommitStmt . Exec (
m . common . addr . String ( ) ,
precommit . Info . SectorNumber ,
precommit . PreCommitDeposit . String ( ) ,
precommit . PreCommitEpoch ,
) ; err != nil {
return err
}
return nil
} ) ; err != nil {
return err
}
}
if err := precommitStmt . Close ( ) ; err != nil {
return err
}
if _ , err := precommitTx . Exec ( ` insert into miner_precommits select * from mp on conflict do nothing ` ) ; err != nil {
return err
}
return precommitTx . Commit ( )
}
2020-07-14 20:02:44 +00:00
func ( p * Processor ) updateMiners ( ctx context . Context , miners [ ] minerActorInfo ) error {
// TODO when/if there is more than one update operation here use an errgroup as is done in persistMiners
if err := p . updateMinersSectors ( ctx , miners ) ; err != nil {
return err
}
2020-07-16 00:22:48 +00:00
if err := p . updateMinersPrecommits ( ctx , miners ) ; err != nil {
return err
}
2020-07-14 20:02:44 +00:00
return nil
}
2020-07-16 00:22:48 +00:00
func ( p * Processor ) updateMinersPrecommits ( ctx context . Context , miners [ ] minerActorInfo ) error {
start := time . Now ( )
defer func ( ) {
log . Infow ( "Updated Miner Precommits" , "duration" , time . Since ( start ) . String ( ) )
} ( )
pred := state . NewStatePredicates ( p . node )
eventTx , err := p . db . Begin ( )
if err != nil {
return err
}
if _ , err := eventTx . Exec ( ` create temp table mse (like miner_sector_events excluding constraints) on commit drop; ` ) ; err != nil {
return xerrors . Errorf ( "prep temp: %w" , err )
}
eventStmt , err := eventTx . Prepare ( ` copy mse (sector_id, event, miner_id, state_root) from STDIN ` )
if err != nil {
return err
}
for _ , m := range miners {
pcDiffFn := pred . OnMinerActorChange ( m . common . addr , pred . OnMinerPreCommitChange ( ) )
changed , val , err := pcDiffFn ( ctx , m . common . parentTsKey , m . common . tsKey )
if err != nil {
if strings . Contains ( err . Error ( ) , "address not found" ) {
continue
}
log . Errorw ( "error getting miner precommit diff" , "miner" , m . common . addr , "error" , err )
return err
}
if ! changed {
continue
}
changes , ok := val . ( * state . MinerPreCommitChanges )
if ! ok {
log . Fatal ( "Developer Error" )
}
for _ , added := range changes . Added {
if _ , err := eventStmt . Exec ( added . Info . SectorNumber , "PRECOMMIT" , m . common . addr . String ( ) , m . common . stateroot . String ( ) ) ; err != nil {
return err
}
}
}
if err := eventStmt . Close ( ) ; err != nil {
return err
}
if _ , err := eventTx . Exec ( ` insert into miner_sector_events select * from mse on conflict do nothing ` ) ; err != nil {
return xerrors . Errorf ( "actor put: %w" , err )
}
return eventTx . Commit ( )
}
2020-07-14 20:02:44 +00:00
func ( p * Processor ) updateMinersSectors ( ctx context . Context , miners [ ] minerActorInfo ) error {
2020-07-16 14:45:31 +00:00
log . Debugw ( "Updating Miners Sectors" , "#miners" , len ( miners ) )
2020-07-14 20:02:44 +00:00
start := time . Now ( )
defer func ( ) {
2020-07-16 14:45:31 +00:00
log . Debugw ( "Updated Miners Sectors" , "duration" , time . Since ( start ) . String ( ) )
2020-07-14 20:02:44 +00:00
} ( )
pred := state . NewStatePredicates ( p . node )
eventTx , err := p . db . Begin ( )
if err != nil {
return err
}
if _ , err := eventTx . Exec ( ` create temp table mse (like miner_sector_events excluding constraints) on commit drop; ` ) ; err != nil {
return xerrors . Errorf ( "prep temp: %w" , err )
}
eventStmt , err := eventTx . Prepare ( ` copy mse (sector_id, event, miner_id, state_root) from STDIN ` )
if err != nil {
return err
}
var updateWg sync . WaitGroup
updateWg . Add ( 1 )
sectorUpdatesCh := make ( chan sectorUpdate )
var sectorUpdates [ ] sectorUpdate
go func ( ) {
for u := range sectorUpdatesCh {
sectorUpdates = append ( sectorUpdates , u )
}
updateWg . Done ( )
} ( )
minerGrp , ctx := errgroup . WithContext ( ctx )
complete := 0
for _ , m := range miners {
m := m
2020-07-16 00:22:48 +00:00
if m . common . tsKey == p . genesisTs . Key ( ) {
genSectors , err := p . node . StateMinerSectors ( ctx , m . common . addr , nil , true , p . genesisTs . Key ( ) )
if err != nil {
return err
}
for _ , sector := range genSectors {
if _ , err := eventStmt . Exec ( sector . ID , "COMMIT" , m . common . addr . String ( ) , m . common . stateroot . String ( ) ) ; err != nil {
return err
}
}
complete ++
continue
}
2020-07-14 20:02:44 +00:00
minerGrp . Go ( func ( ) error {
// special case genesis miners
sectorDiffFn := pred . OnMinerActorChange ( m . common . addr , pred . OnMinerSectorChange ( ) )
changed , val , err := sectorDiffFn ( ctx , m . common . parentTsKey , m . common . tsKey )
if err != nil {
if strings . Contains ( err . Error ( ) , "address not found" ) {
return nil
}
log . Errorw ( "error getting miner sector diff" , "miner" , m . common . addr , "error" , err )
return err
}
if ! changed {
complete ++
return nil
}
changes , ok := val . ( * state . MinerSectorChanges )
if ! ok {
log . Fatalw ( "Developer Error" )
}
log . Debugw ( "sector changes for miner" , "miner" , m . common . addr . String ( ) , "Added" , len ( changes . Added ) , "Extended" , len ( changes . Extended ) , "Removed" , len ( changes . Removed ) , "oldState" , m . common . parentTsKey , "newState" , m . common . tsKey )
for _ , extended := range changes . Extended {
if _ , err := eventStmt . Exec ( extended . To . Info . SectorNumber , "EXTENDED" , m . common . addr . String ( ) , m . common . stateroot . String ( ) ) ; err != nil {
return err
}
sectorUpdatesCh <- sectorUpdate {
terminationEpoch : 0 ,
terminated : false ,
expirationEpoch : extended . To . Info . Expiration ,
sectorID : extended . From . Info . SectorNumber ,
minerID : m . common . addr ,
}
2020-07-16 14:45:31 +00:00
log . Debugw ( "sector extended" , "miner" , m . common . addr . String ( ) , "sector" , extended . To . Info . SectorNumber , "old" , extended . To . Info . Expiration , "new" , extended . From . Info . Expiration )
2020-07-14 20:02:44 +00:00
}
curTs , err := p . node . ChainGetTipSet ( ctx , m . common . tsKey )
if err != nil {
return err
}
for _ , removed := range changes . Removed {
2020-07-16 14:45:31 +00:00
log . Debugw ( "removed" , "miner" , m . common . addr )
2020-07-14 20:02:44 +00:00
// decide if they were terminated or extended
if removed . Info . Expiration > curTs . Height ( ) {
if _ , err := eventStmt . Exec ( removed . Info . SectorNumber , "TERMINATED" , m . common . addr . String ( ) , m . common . stateroot . String ( ) ) ; err != nil {
return err
}
2020-07-16 14:45:31 +00:00
log . Debugw ( "sector terminated" , "miner" , m . common . addr . String ( ) , "sector" , removed . Info . SectorNumber , "old" , "sectorExpiration" , removed . Info . Expiration , "terminationEpoch" , curTs . Height ( ) )
2020-07-14 20:02:44 +00:00
sectorUpdatesCh <- sectorUpdate {
terminationEpoch : curTs . Height ( ) ,
terminated : true ,
expirationEpoch : removed . Info . Expiration ,
sectorID : removed . Info . SectorNumber ,
minerID : m . common . addr ,
}
}
if _ , err := eventStmt . Exec ( removed . Info . SectorNumber , "EXPIRED" , m . common . addr . String ( ) , m . common . stateroot . String ( ) ) ; err != nil {
return err
}
2020-07-16 14:45:31 +00:00
log . Debugw ( "sector removed" , "miner" , m . common . addr . String ( ) , "sector" , removed . Info . SectorNumber , "old" , "sectorExpiration" , removed . Info . Expiration , "currEpoch" , curTs . Height ( ) )
2020-07-14 20:02:44 +00:00
}
for _ , added := range changes . Added {
2020-07-16 00:22:48 +00:00
if _ , err := eventStmt . Exec ( added . Info . SectorNumber , "COMMIT" , m . common . addr . String ( ) , m . common . stateroot . String ( ) ) ; err != nil {
2020-07-14 20:02:44 +00:00
return err
}
}
complete ++
log . Debugw ( "Update Done" , "complete" , complete , "added" , len ( changes . Added ) , "removed" , len ( changes . Removed ) , "modified" , len ( changes . Extended ) )
return nil
} )
}
if err := minerGrp . Wait ( ) ; err != nil {
return err
}
close ( sectorUpdatesCh )
// wait for the update channel to be drained
updateWg . Wait ( )
if err := eventStmt . Close ( ) ; err != nil {
return err
}
if _ , err := eventTx . Exec ( ` insert into miner_sector_events select * from mse on conflict do nothing ` ) ; err != nil {
return xerrors . Errorf ( "actor put: %w" , err )
}
if err := eventTx . Commit ( ) ; err != nil {
return err
}
updateTx , err := p . db . Begin ( )
if err != nil {
return err
}
updateStmt , err := updateTx . Prepare ( ` UPDATE miner_sectors SET termination_epoch=$1, expiration_epoch=$2 WHERE miner_id=$3 AND sector_id=$4 ` )
if err != nil {
return err
}
for _ , update := range sectorUpdates {
if update . terminated {
if _ , err := updateStmt . Exec ( update . terminationEpoch , update . expirationEpoch , update . minerID . String ( ) , update . sectorID ) ; err != nil {
return err
}
} else {
if _ , err := updateStmt . Exec ( nil , update . expirationEpoch , update . minerID . String ( ) , update . sectorID ) ; err != nil {
return err
}
}
}
if err := updateStmt . Close ( ) ; err != nil {
return err
}
return updateTx . Commit ( )
}
// load the power actor state clam as an adt.Map at the tipset `ts`.
func getPowerActorClaimsMap ( ctx context . Context , api api . FullNode , ts types . TipSetKey ) ( * adt . Map , error ) {
powerActor , err := api . StateGetActor ( ctx , builtin . StoragePowerActorAddr , ts )
if err != nil {
return nil , err
}
powerRaw , err := api . ChainReadObj ( ctx , powerActor . Head )
if err != nil {
return nil , err
}
var powerActorState power . State
if err := powerActorState . UnmarshalCBOR ( bytes . NewReader ( powerRaw ) ) ; err != nil {
return nil , fmt . Errorf ( "failed to unmarshal power actor state: %w" , err )
}
s := cw_util . NewAPIIpldStore ( ctx , api )
return adt . AsMap ( s , powerActorState . Claims )
}