This ensures we can't end up decoding nil bitfields from clients when not expecting them. Part of Please see this issue for details and leave any comments there.
972 lines
26 KiB
972 lines
26 KiB
package processor
import (
cw_util ""
func (p *Processor) setupMiners() error {
tx, err := p.db.Begin()
if err != nil {
return err
if _, err := tx.Exec(`
create table if not exists miner_info
miner_id text not null,
owner_addr text not null,
worker_addr text not null,
peer_id text,
sector_size text not null,
constraint miner_info_pk
primary key (miner_id)
create table if not exists sector_precommit_info
miner_id text not null,
sector_id bigint not null,
sealed_cid text not null,
state_root text not null,
seal_rand_epoch bigint not null,
expiration_epoch bigint not null,
precommit_deposit text not null,
precommit_epoch bigint not null,
deal_weight text not null,
verified_deal_weight text not null,
is_replace_capacity bool not null,
replace_sector_deadline bigint,
replace_sector_partition bigint,
replace_sector_number bigint,
unique (miner_id, sector_id),
constraint sector_precommit_info_pk
primary key (miner_id, sector_id, sealed_cid)
create table if not exists sector_info
miner_id text not null,
sector_id bigint not null,
sealed_cid text not null,
state_root text not null,
activation_epoch bigint not null,
expiration_epoch bigint not null,
deal_weight text not null,
verified_deal_weight text not null,
initial_pledge text not null,
constraint sector_info_pk
primary key (miner_id, sector_id, sealed_cid)
* captures miner-specific power state for any given stateroot
create table if not exists miner_power
miner_id text not null,
state_root text not null,
raw_bytes_power text not null,
quality_adjusted_power text not null,
constraint miner_power_pk
primary key (miner_id, state_root)
DO $$
IF NOT EXISTS (SELECT 1 FROM pg_type WHERE typname = 'miner_sector_event_type') THEN
CREATE TYPE miner_sector_event_type AS ENUM
create table if not exists miner_sector_events
miner_id text not null,
sector_id bigint not null,
state_root text not null,
event miner_sector_event_type not null,
constraint miner_sector_events_pk
primary key (sector_id, event, miner_id, state_root)
`); err != nil {
return err
return tx.Commit()
type SectorLifecycleEvent string
const (
PreCommitAdded = "PRECOMMIT_ADDED"
PreCommitExpired = "PRECOMMIT_EXPIRED"
CommitCapacityAdded = "COMMIT_CAPACITY_ADDED"
SectorAdded = "SECTOR_ADDED"
SectorExpired = "SECTOR_EXPIRED"
SectorExtended = "SECTOR_EXTENDED"
SectorFaulted = "SECTOR_FAULTED"
SectorRecovering = "SECTOR_RECOVERING"
SectorRecovered = "SECTOR_RECOVERED"
SectorTerminated = "SECTOR_TERMINATED"
type MinerSectorsEvent struct {
MinerID address.Address
SectorIDs []uint64
StateRoot cid.Cid
Event SectorLifecycleEvent
type SectorDealEvent struct {
MinerID address.Address
SectorID uint64
DealIDs []abi.DealID
type PartitionStatus struct {
Terminated abi.BitField
Expired abi.BitField
Faulted abi.BitField
InRecovery abi.BitField
Recovered abi.BitField
type minerActorInfo struct {
common actorInfo
state miner.State
// tracked by power actor
rawPower big.Int
qalPower big.Int
func (p *Processor) HandleMinerChanges(ctx context.Context, minerTips ActorTips) error {
minerChanges, err := p.processMiners(ctx, minerTips)
if err != nil {
log.Fatalw("Failed to process miner actors", "error", err)
if err := p.persistMiners(ctx, minerChanges); err != nil {
log.Fatalw("Failed to persist miner actors", "error", err)
return nil
func (p *Processor) processMiners(ctx context.Context, minerTips map[types.TipSetKey][]actorInfo) ([]minerActorInfo, error) {
start := time.Now()
defer func() {
log.Debugw("Processed Miners", "duration", time.Since(start).String())
var out []minerActorInfo
// TODO add parallel calls if this becomes slow
for tipset, miners := range minerTips {
// get the power actors claims map
minersClaims, err := getPowerActorClaimsMap(ctx, p.node, tipset)
if err != nil {
return nil, err
// Get miner raw and quality power
for _, act := range miners {
var mi minerActorInfo
mi.common = act
var claim power.Claim
// get miner claim from power actors claim map and store if found, else the miner had no claim at
// this tipset
found, err := minersClaims.Get(adt.AddrKey(act.addr), &claim)
if err != nil {
return nil, err
if found {
mi.qalPower = claim.QualityAdjPower
mi.rawPower = claim.RawBytePower
// Get the miner state info
astb, err := p.node.ChainReadObj(ctx, act.act.Head)
if err != nil {
log.Warnw("failed to find miner actor state", "address", act.addr, "error", err)
if err := mi.state.UnmarshalCBOR(bytes.NewReader(astb)); err != nil {
return nil, err
out = append(out, mi)
return out, nil
func (p *Processor) persistMiners(ctx context.Context, miners []minerActorInfo) error {
start := time.Now()
defer func() {
log.Debugw("Persisted Miners", "duration", time.Since(start).String())
grp, _ := errgroup.WithContext(ctx)
grp.Go(func() error {
if err := p.storeMinersPower(miners); err != nil {
return err
return nil
grp.Go(func() error {
if err := p.storeMinersActorInfoState(ctx, miners); err != nil {
return err
return nil
// 8 is arbitrary, idk what a good value here is.
preCommitEvents := make(chan *MinerSectorsEvent, 8)
sectorEvents := make(chan *MinerSectorsEvent, 8)
partitionEvents := make(chan *MinerSectorsEvent, 8)
p.sectorDealEvents = make(chan *SectorDealEvent, 8)
grp.Go(func() error {
return p.storeMinerSectorEvents(ctx, sectorEvents, preCommitEvents, partitionEvents)
grp.Go(func() error {
defer func() {
return p.storeMinerPreCommitInfo(ctx, miners, preCommitEvents, p.sectorDealEvents)
grp.Go(func() error {
defer close(sectorEvents)
return p.storeMinerSectorInfo(ctx, miners, sectorEvents)
grp.Go(func() error {
defer close(partitionEvents)
return p.getMinerPartitionsDifferences(ctx, miners, partitionEvents)
return grp.Wait()
func (p *Processor) storeMinerPreCommitInfo(ctx context.Context, miners []minerActorInfo, sectorEvents chan<- *MinerSectorsEvent, sectorDeals chan<- *SectorDealEvent) error {
tx, err := p.db.Begin()
if err != nil {
return err
if _, err := tx.Exec(`create temp table spi (like sector_precommit_info excluding constraints) on commit drop;`); err != nil {
return xerrors.Errorf("Failed to create temp table for sector_precommit_info: %w", err)
stmt, err := tx.Prepare(`copy spi (miner_id, sector_id, sealed_cid, state_root, seal_rand_epoch, expiration_epoch, precommit_deposit, precommit_epoch, deal_weight, verified_deal_weight, is_replace_capacity, replace_sector_deadline, replace_sector_partition, replace_sector_number) from STDIN`)
if err != nil {
return xerrors.Errorf("Failed to prepare miner precommit info statement: %w", err)
for _, m := range miners {
minerSectors, err := adt.AsArray(p.ctxStore, m.state.Sectors)
if err != nil {
return err
changes, err := p.getMinerPreCommitChanges(ctx, m)
if err != nil {
if strings.Contains(err.Error(), "address not found") {
} else {
return err
if changes == nil {
preCommitAdded := make([]uint64, len(changes.Added))
for i, added := range changes.Added {
if len(added.Info.DealIDs) > 0 {
sectorDeals <- &SectorDealEvent{
MinerID: m.common.addr,
SectorID: uint64(added.Info.SectorNumber),
DealIDs: added.Info.DealIDs,
if added.Info.ReplaceCapacity {
if _, err := stmt.Exec(
); err != nil {
return err
} else {
if _, err := stmt.Exec(
nil, // replace deadline
nil, // replace partition
nil, // replace sector
); err != nil {
return err
preCommitAdded[i] = uint64(added.Info.SectorNumber)
if len(preCommitAdded) > 0 {
sectorEvents <- &MinerSectorsEvent{
MinerID: m.common.addr,
StateRoot: m.common.stateroot,
SectorIDs: preCommitAdded,
Event: PreCommitAdded,
var preCommitExpired []uint64
for _, removed := range changes.Removed {
var sector miner.SectorOnChainInfo
if found, err := minerSectors.Get(uint64(removed.Info.SectorNumber), §or); err != nil {
return err
} else if !found {
preCommitExpired = append(preCommitExpired, uint64(removed.Info.SectorNumber))
if len(preCommitExpired) > 0 {
sectorEvents <- &MinerSectorsEvent{
MinerID: m.common.addr,
StateRoot: m.common.stateroot,
SectorIDs: preCommitExpired,
Event: PreCommitExpired,
if err := stmt.Close(); err != nil {
return xerrors.Errorf("Failed to close sector precommit info statement: %w", err)
if _, err := tx.Exec(`insert into sector_precommit_info select * from spi on conflict do nothing`); err != nil {
return xerrors.Errorf("Failed to insert into sector precommit info table: %w", err)
if err := tx.Commit(); err != nil {
return xerrors.Errorf("Failed to commit sector precommit info: %w", err)
return nil
func (p *Processor) storeMinerSectorInfo(ctx context.Context, miners []minerActorInfo, events chan<- *MinerSectorsEvent) error {
tx, err := p.db.Begin()
if err != nil {
return err
if _, err := tx.Exec(`create temp table si (like sector_info excluding constraints) on commit drop;`); err != nil {
return xerrors.Errorf("Failed to create temp table for sector_: %w", err)
stmt, err := tx.Prepare(`copy si (miner_id, sector_id, sealed_cid, state_root, activation_epoch, expiration_epoch, deal_weight, verified_deal_weight, initial_pledge) from STDIN`)
if err != nil {
return xerrors.Errorf("Failed to prepare miner sector info statement: %w", err)
for _, m := range miners {
changes, err := p.getMinerSectorChanges(ctx, m)
if err != nil {
if strings.Contains(err.Error(), "address not found") {
} else {
return err
if changes == nil {
var sectorsAdded []uint64
var ccAdded []uint64
var extended []uint64
for _, added := range changes.Added {
// add the sector to the table
if _, err := stmt.Exec(
); err != nil {
return err
if len(added.DealIDs) == 0 {
ccAdded = append(ccAdded, uint64(added.SectorNumber))
} else {
sectorsAdded = append(sectorsAdded, uint64(added.SectorNumber))
for _, mod := range changes.Extended {
extended = append(extended, uint64(mod.To.SectorNumber))
events <- &MinerSectorsEvent{
MinerID: m.common.addr,
StateRoot: m.common.stateroot,
SectorIDs: ccAdded,
Event: CommitCapacityAdded,
events <- &MinerSectorsEvent{
MinerID: m.common.addr,
StateRoot: m.common.stateroot,
SectorIDs: sectorsAdded,
Event: SectorAdded,
events <- &MinerSectorsEvent{
MinerID: m.common.addr,
StateRoot: m.common.stateroot,
SectorIDs: extended,
Event: SectorExtended,
if err := stmt.Close(); err != nil {
return xerrors.Errorf("Failed to close sector info statement: %w", err)
if _, err := tx.Exec(`insert into sector_info select * from si on conflict do nothing`); err != nil {
return xerrors.Errorf("Failed to insert into sector info table: %w", err)
if err := tx.Commit(); err != nil {
return xerrors.Errorf("Failed to commit sector info: %w", err)
return nil
func (p *Processor) getMinerPartitionsDifferences(ctx context.Context, miners []minerActorInfo, events chan<- *MinerSectorsEvent) error {
grp, ctx := errgroup.WithContext(ctx)
for _, m := range miners {
m := m
grp.Go(func() error {
if err := p.diffMinerPartitions(ctx, m, events); err != nil {
if strings.Contains(err.Error(), "address not found") {
return nil
return err
return nil
return grp.Wait()
func (p *Processor) storeMinerSectorEvents(ctx context.Context, sectorEvents, preCommitEvents, partitionEvents <-chan *MinerSectorsEvent) error {
tx, err := p.db.Begin()
if err != nil {
return err
if _, err := tx.Exec(`create temp table mse (like miner_sector_events excluding constraints) on commit drop;`); err != nil {
return xerrors.Errorf("Failed to create temp table for sector_: %w", err)
stmt, err := tx.Prepare(`copy mse (miner_id, sector_id, event, state_root) from STDIN`)
if err != nil {
return xerrors.Errorf("Failed to prepare miner sector info statement: %w", err)
grp, ctx := errgroup.WithContext(ctx)
grp.Go(func() error {
innerGrp, _ := errgroup.WithContext(ctx)
for mse := range sectorEvents {
mse := mse
innerGrp.Go(func() error {
for _, sid := range mse.SectorIDs {
if _, err := stmt.Exec(
); err != nil {
return err
return nil
return innerGrp.Wait()
grp.Go(func() error {
innerGrp, _ := errgroup.WithContext(ctx)
for mse := range preCommitEvents {
mse := mse
innerGrp.Go(func() error {
for _, sid := range mse.SectorIDs {
if _, err := stmt.Exec(
); err != nil {
return err
return nil
return innerGrp.Wait()
grp.Go(func() error {
innerGrp, _ := errgroup.WithContext(ctx)
for mse := range partitionEvents {
mse := mse
grp.Go(func() error {
for _, sid := range mse.SectorIDs {
if _, err := stmt.Exec(
); err != nil {
return err
return nil
return innerGrp.Wait()
if err := grp.Wait(); err != nil {
return err
if err := stmt.Close(); err != nil {
return xerrors.Errorf("Failed to close sector event statement: %w", err)
if _, err := tx.Exec(`insert into miner_sector_events select * from mse on conflict do nothing`); err != nil {
return xerrors.Errorf("Failed to insert into sector event table: %w", err)
if err := tx.Commit(); err != nil {
return xerrors.Errorf("Failed to commit sector events: %w", err)
return nil
func (p *Processor) getMinerStateAt(ctx context.Context, maddr address.Address, tskey types.TipSetKey) (miner.State, error) {
prevActor, err := p.node.StateGetActor(ctx, maddr, tskey)
if err != nil {
return miner.State{}, err
var out miner.State
// Get the miner state info
astb, err := p.node.ChainReadObj(ctx, prevActor.Head)
if err != nil {
return miner.State{}, err
if err := out.UnmarshalCBOR(bytes.NewReader(astb)); err != nil {
return miner.State{}, err
return out, nil
func (p *Processor) getMinerPreCommitChanges(ctx context.Context, m minerActorInfo) (*state.MinerPreCommitChanges, error) {
pred := state.NewStatePredicates(p.node)
changed, val, err := pred.OnMinerActorChange(m.common.addr, pred.OnMinerPreCommitChange())(ctx, m.common.parentTsKey, m.common.tsKey)
if err != nil {
return nil, xerrors.Errorf("Failed to diff miner precommit amt: %w", err)
if !changed {
return nil, nil
out := val.(*state.MinerPreCommitChanges)
return out, nil
func (p *Processor) getMinerSectorChanges(ctx context.Context, m minerActorInfo) (*state.MinerSectorChanges, error) {
pred := state.NewStatePredicates(p.node)
changed, val, err := pred.OnMinerActorChange(m.common.addr, pred.OnMinerSectorChange())(ctx, m.common.parentTsKey, m.common.tsKey)
if err != nil {
return nil, xerrors.Errorf("Failed to diff miner sectors amt: %w", err)
if !changed {
return nil, nil
out := val.(*state.MinerSectorChanges)
return out, nil
func (p *Processor) diffMinerPartitions(ctx context.Context, m minerActorInfo, events chan<- *MinerSectorsEvent) error {
prevMiner, err := p.getMinerStateAt(ctx, m.common.addr, m.common.tsKey)
if err != nil {
return err
dlIdx := prevMiner.CurrentDeadline
curMiner := m.state
// load the old deadline
prevDls, err := prevMiner.LoadDeadlines(p.ctxStore)
if err != nil {
return err
var prevDl miner.Deadline
if err := p.ctxStore.Get(ctx, prevDls.Due[dlIdx], &prevDl); err != nil {
return err
prevPartitions, err := prevDl.PartitionsArray(p.ctxStore)
if err != nil {
return err
// load the new deadline
curDls, err := curMiner.LoadDeadlines(p.ctxStore)
if err != nil {
return err
var curDl miner.Deadline
if err := p.ctxStore.Get(ctx, curDls.Due[dlIdx], &curDl); err != nil {
return err
curPartitions, err := curDl.PartitionsArray(p.ctxStore)
if err != nil {
return err
// TODO this can be optimized by inspecting the miner state for partitions that have changed and only inspecting those.
var prevPart miner.Partition
if err := prevPartitions.ForEach(&prevPart, func(i int64) error {
var curPart miner.Partition
if found, err := curPartitions.Get(uint64(i), &curPart); err != nil {
return err
} else if !found {
log.Fatal("I don't know what this means, are partitions ever removed?")
partitionDiff, err := p.diffPartition(prevPart, curPart)
if err != nil {
return err
recovered, err := partitionDiff.Recovered.All(miner.SectorsMax)
if err != nil {
return err
events <- &MinerSectorsEvent{
MinerID: m.common.addr,
StateRoot: m.common.stateroot,
SectorIDs: recovered,
Event: SectorRecovered,
inRecovery, err := partitionDiff.InRecovery.All(miner.SectorsMax)
if err != nil {
return err
events <- &MinerSectorsEvent{
MinerID: m.common.addr,
StateRoot: m.common.stateroot,
SectorIDs: inRecovery,
Event: SectorRecovering,
faulted, err := partitionDiff.Faulted.All(miner.SectorsMax)
if err != nil {
return err
events <- &MinerSectorsEvent{
MinerID: m.common.addr,
StateRoot: m.common.stateroot,
SectorIDs: faulted,
Event: SectorFaulted,
terminated, err := partitionDiff.Terminated.All(miner.SectorsMax)
if err != nil {
return err
events <- &MinerSectorsEvent{
MinerID: m.common.addr,
StateRoot: m.common.stateroot,
SectorIDs: terminated,
Event: SectorTerminated,
expired, err := partitionDiff.Expired.All(miner.SectorsMax)
if err != nil {
return err
events <- &MinerSectorsEvent{
MinerID: m.common.addr,
StateRoot: m.common.stateroot,
SectorIDs: expired,
Event: SectorExpired,
return nil
}); err != nil {
return err
return nil
func (p *Processor) diffPartition(prevPart, curPart miner.Partition) (*PartitionStatus, error) {
// all the sectors that were in previous but not in current
allRemovedSectors, err := bitfield.SubtractBitField(prevPart.Sectors, curPart.Sectors)
if err != nil {
return nil, err
// list of sectors that were terminated before their expiration.
terminatedEarlyArr, err := adt.AsArray(p.ctxStore, curPart.EarlyTerminated)
if err != nil {
return nil, err
expired := bitfield.New()
var bf abi.BitField
if err := terminatedEarlyArr.ForEach(&bf, func(i int64) error {
// expired = all removals - termination
expirations, err := bitfield.SubtractBitField(allRemovedSectors, bf)
if err != nil {
return err
// merge with expired sectors from other epochs
expired, err = bitfield.MergeBitFields(expirations, expired)
if err != nil {
return nil
return nil
}); err != nil {
return nil, err
// terminated = all removals - expired
terminated, err := bitfield.SubtractBitField(allRemovedSectors, expired)
if err != nil {
return nil, err
// faults in current but not previous
faults, err := bitfield.SubtractBitField(curPart.Recoveries, prevPart.Recoveries)
if err != nil {
return nil, err
// recoveries in current but not previous
inRecovery, err := bitfield.SubtractBitField(curPart.Recoveries, prevPart.Recoveries)
if err != nil {
return nil, err
// all current good sectors
newActiveSectors, err := curPart.ActiveSectors()
if err != nil {
return nil, err
// sectors that were previously fault and are now currently active are considered recovered.
recovered, err := bitfield.IntersectBitField(prevPart.Faults, newActiveSectors)
if err != nil {
return nil, err
return &PartitionStatus{
Terminated: terminated,
Expired: expired,
Faulted: faults,
InRecovery: inRecovery,
Recovered: recovered,
}, nil
func (p *Processor) storeMinersActorInfoState(ctx context.Context, miners []minerActorInfo) error {
start := time.Now()
defer func() {
log.Debugw("Stored Miners Actor State", "duration", time.Since(start).String())
tx, err := p.db.Begin()
if err != nil {
return err
if _, err := tx.Exec(`create temp table mi (like miner_info excluding constraints) on commit drop;`); err != nil {
return xerrors.Errorf("prep temp: %w", err)
stmt, err := tx.Prepare(`copy mi (miner_id, owner_addr, worker_addr, peer_id, sector_size) from STDIN`)
if err != nil {
return err
for _, m := range miners {
mi, err := p.node.StateMinerInfo(ctx, m.common.addr, m.common.tsKey)
if err != nil {
if strings.Contains(err.Error(), "address not found") {
} else {
return err
if _, err := stmt.Exec(
); err != nil {
log.Errorw("failed to store miner state", "state", m.state, "info", m.state.Info, "error", err)
return xerrors.Errorf("failed to store miner state: %w", err)
if err := stmt.Close(); err != nil {
return err
if _, err := tx.Exec(`insert into miner_info select * from mi on conflict do nothing `); err != nil {
return xerrors.Errorf("actor put: %w", err)
return tx.Commit()
func (p *Processor) storeMinersPower(miners []minerActorInfo) error {
start := time.Now()
defer func() {
log.Debugw("Stored Miners Power", "duration", time.Since(start).String())
tx, err := p.db.Begin()
if err != nil {
return xerrors.Errorf("begin miner_power tx: %w", err)
if _, err := tx.Exec(`create temp table mp (like miner_power excluding constraints) on commit drop`); err != nil {
return xerrors.Errorf("prep miner_power temp: %w", err)
stmt, err := tx.Prepare(`copy mp (miner_id, state_root, raw_bytes_power, quality_adjusted_power) from STDIN`)
if err != nil {
return xerrors.Errorf("prepare tmp miner_power: %w", err)
for _, m := range miners {
if _, err := stmt.Exec(
); err != nil {
log.Errorw("failed to store miner power", "miner", m.common.addr, "stateroot", m.common.stateroot, "error", err)
if err := stmt.Close(); err != nil {
return xerrors.Errorf("close prepared miner_power: %w", err)
if _, err := tx.Exec(`insert into miner_power select * from mp on conflict do nothing`); err != nil {
return xerrors.Errorf("insert miner_power from tmp: %w", err)
if err := tx.Commit(); err != nil {
return xerrors.Errorf("commit miner_power tx: %w", err)
return nil
// load the power actor state clam as an adt.Map at the tipset `ts`.
func getPowerActorClaimsMap(ctx context.Context, api api.FullNode, ts types.TipSetKey) (*adt.Map, error) {
powerActor, err := api.StateGetActor(ctx, builtin.StoragePowerActorAddr, ts)
if err != nil {
return nil, err
powerRaw, err := api.ChainReadObj(ctx, powerActor.Head)
if err != nil {
return nil, err
var powerActorState power.State
if err := powerActorState.UnmarshalCBOR(bytes.NewReader(powerRaw)); err != nil {
return nil, fmt.Errorf("failed to unmarshal power actor state: %w", err)
s := cw_util.NewAPIIpldStore(ctx, api)
return adt.AsMap(s, powerActorState.Claims)