diff --git a/cmd/lotus-chainwatch/mpool.go b/cmd/lotus-chainwatch/mpool.go deleted file mode 100644 index 74ffa8771..000000000 --- a/cmd/lotus-chainwatch/mpool.go +++ /dev/null @@ -1,60 +0,0 @@ -package main - -import ( - "context" - "time" - - "github.com/ipfs/go-cid" - - aapi "github.com/filecoin-project/lotus/api" - "github.com/filecoin-project/lotus/chain/types" -) - -func subMpool(ctx context.Context, api aapi.FullNode, st *storage) { - sub, err := api.MpoolSub(ctx) - if err != nil { - return - } - - for { - var updates []aapi.MpoolUpdate - - select { - case update := <-sub: - updates = append(updates, update) - case <-ctx.Done(): - return - } - - loop: - for { - time.Sleep(10 * time.Millisecond) - select { - case update := <-sub: - updates = append(updates, update) - default: - break loop - } - } - - msgs := map[cid.Cid]*types.Message{} - for _, v := range updates { - if v.Type != aapi.MpoolAdd { - continue - } - - msgs[v.Message.Message.Cid()] = &v.Message.Message - } - - log.Debugf("Processing %d mpool updates", len(msgs)) - - err := st.storeMessages(msgs) - if err != nil { - log.Error(err) - } - - if err := st.storeMpoolInclusions(updates); err != nil { - log.Error(err) - } - } -} diff --git a/cmd/lotus-chainwatch/processor/common_actors.go b/cmd/lotus-chainwatch/processor/common_actors.go new file mode 100644 index 000000000..853390fb6 --- /dev/null +++ b/cmd/lotus-chainwatch/processor/common_actors.go @@ -0,0 +1,299 @@ +package processor + +import ( + "bytes" + "context" + "time" + + "golang.org/x/sync/errgroup" + "golang.org/x/xerrors" + + "github.com/ipfs/go-cid" + typegen "github.com/whyrusleeping/cbor-gen" + + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/specs-actors/actors/builtin" + _init "github.com/filecoin-project/specs-actors/actors/builtin/init" + "github.com/filecoin-project/specs-actors/actors/util/adt" + + "github.com/filecoin-project/lotus/chain/types" + cw_util "github.com/filecoin-project/lotus/cmd/lotus-chainwatch/util" +) + +func (p *Processor) setupCommonActors() error { + tx, err := p.db.Begin() + if err != nil { + return err + } + + if _, err := tx.Exec(` +create table if not exists id_address_map +( + id text not null, + address text not null, + constraint id_address_map_pk + primary key (id, address) +); + +create unique index if not exists id_address_map_id_uindex + on id_address_map (id); + +create unique index if not exists id_address_map_address_uindex + on id_address_map (address); + +create table if not exists actors + ( + id text not null + constraint id_address_map_actors_id_fk + references id_address_map (id), + code text not null, + head text not null, + nonce int not null, + balance text not null, + stateroot text + ); + +create index if not exists actors_id_index + on actors (id); + +create index if not exists id_address_map_address_index + on id_address_map (address); + +create index if not exists id_address_map_id_index + on id_address_map (id); + +create or replace function actor_tips(epoch bigint) + returns table (id text, + code text, + head text, + nonce int, + balance text, + stateroot text, + height bigint, + parentstateroot text) as +$body$ + select distinct on (id) * from actors + inner join state_heights sh on sh.parentstateroot = stateroot + where height < $1 + order by id, height desc; +$body$ language sql; + +create table if not exists actor_states +( + head text not null, + code text not null, + state json not null +); + +create unique index if not exists actor_states_head_code_uindex + on actor_states (head, code); + +create index if not exists actor_states_head_index + on actor_states (head); + +create index if not exists actor_states_code_head_index + on actor_states (head, code); + +`); err != nil { + return err + } + + return tx.Commit() +} + +func (p *Processor) HandleCommonActorsChanges(ctx context.Context, actors map[cid.Cid]ActorTips) error { + if err := p.storeActorAddresses(ctx, actors); err != nil { + return err + } + + grp, _ := errgroup.WithContext(ctx) + + grp.Go(func() error { + if err := p.storeActorHeads(actors); err != nil { + return err + } + return nil + }) + + grp.Go(func() error { + if err := p.storeActorStates(actors); err != nil { + return err + } + return nil + }) + + return grp.Wait() +} + +func (p Processor) storeActorAddresses(ctx context.Context, actors map[cid.Cid]ActorTips) error { + start := time.Now() + defer func() { + log.Infow("Stored Actor Addresses", "duration", time.Since(start).String()) + }() + + addressToID := map[address.Address]address.Address{} + // HACK until genesis storage is figured out: + addressToID[builtin.SystemActorAddr] = builtin.SystemActorAddr + addressToID[builtin.InitActorAddr] = builtin.InitActorAddr + addressToID[builtin.RewardActorAddr] = builtin.RewardActorAddr + addressToID[builtin.CronActorAddr] = builtin.CronActorAddr + addressToID[builtin.StoragePowerActorAddr] = builtin.StoragePowerActorAddr + addressToID[builtin.StorageMarketActorAddr] = builtin.StorageMarketActorAddr + addressToID[builtin.VerifiedRegistryActorAddr] = builtin.VerifiedRegistryActorAddr + addressToID[builtin.BurntFundsActorAddr] = builtin.BurntFundsActorAddr + initActor, err := p.node.StateGetActor(ctx, builtin.InitActorAddr, types.EmptyTSK) + if err != nil { + return err + } + + initActorRaw, err := p.node.ChainReadObj(ctx, initActor.Head) + if err != nil { + return err + } + + var initActorState _init.State + if err := initActorState.UnmarshalCBOR(bytes.NewReader(initActorRaw)); err != nil { + return err + } + ctxStore := cw_util.NewAPIIpldStore(ctx, p.node) + addrMap, err := adt.AsMap(ctxStore, initActorState.AddressMap) + if err != nil { + return err + } + // gross.. + var actorID typegen.CborInt + if err := addrMap.ForEach(&actorID, func(key string) error { + longAddr, err := address.NewFromBytes([]byte(key)) + if err != nil { + return err + } + shortAddr, err := address.NewIDAddress(uint64(actorID)) + if err != nil { + return err + } + addressToID[longAddr] = shortAddr + return nil + }); err != nil { + return err + } + tx, err := p.db.Begin() + if err != nil { + return err + } + + if _, err := tx.Exec(` +create temp table iam (like id_address_map excluding constraints) on commit drop; +`); err != nil { + return xerrors.Errorf("prep temp: %w", err) + } + + stmt, err := tx.Prepare(`copy iam (id, address) from STDIN `) + if err != nil { + return err + } + + for a, i := range addressToID { + if i == address.Undef { + continue + } + if _, err := stmt.Exec( + i.String(), + a.String(), + ); err != nil { + return err + } + } + if err := stmt.Close(); err != nil { + return err + } + + if _, err := tx.Exec(`insert into id_address_map select * from iam on conflict do nothing `); err != nil { + return xerrors.Errorf("actor put: %w", err) + } + + return tx.Commit() +} + +func (p *Processor) storeActorHeads(actors map[cid.Cid]ActorTips) error { + start := time.Now() + defer func() { + log.Infow("Stored Actor Heads", "duration", time.Since(start).String()) + }() + // Basic + tx, err := p.db.Begin() + if err != nil { + return err + } + if _, err := tx.Exec(` + create temp table a (like actors excluding constraints) on commit drop; + `); err != nil { + return xerrors.Errorf("prep temp: %w", err) + } + + stmt, err := tx.Prepare(`copy a (id, code, head, nonce, balance, stateroot) from stdin `) + if err != nil { + return err + } + + for code, actTips := range actors { + for _, actorInfo := range actTips { + for _, a := range actorInfo { + if _, err := stmt.Exec(a.addr.String(), code.String(), a.act.Head.String(), a.act.Nonce, a.act.Balance.String(), a.stateroot.String()); err != nil { + return err + } + } + } + } + + if err := stmt.Close(); err != nil { + return err + } + + if _, err := tx.Exec(`insert into actors select * from a on conflict do nothing `); err != nil { + return xerrors.Errorf("actor put: %w", err) + } + + return tx.Commit() +} + +func (p *Processor) storeActorStates(actors map[cid.Cid]ActorTips) error { + start := time.Now() + defer func() { + log.Infow("Stored Actor States", "duration", time.Since(start).String()) + }() + // States + tx, err := p.db.Begin() + if err != nil { + return err + } + if _, err := tx.Exec(` + create temp table a (like actor_states excluding constraints) on commit drop; + `); err != nil { + return xerrors.Errorf("prep temp: %w", err) + } + + stmt, err := tx.Prepare(`copy a (head, code, state) from stdin `) + if err != nil { + return err + } + + for code, actTips := range actors { + for _, actorInfo := range actTips { + for _, a := range actorInfo { + if _, err := stmt.Exec(a.act.Head.String(), code.String(), a.state); err != nil { + return err + } + } + } + } + + if err := stmt.Close(); err != nil { + return err + } + + if _, err := tx.Exec(`insert into actor_states select * from a on conflict do nothing `); err != nil { + return xerrors.Errorf("actor put: %w", err) + } + + return tx.Commit() +} diff --git a/cmd/lotus-chainwatch/processor/market.go b/cmd/lotus-chainwatch/processor/market.go new file mode 100644 index 000000000..fd07e7d0a --- /dev/null +++ b/cmd/lotus-chainwatch/processor/market.go @@ -0,0 +1,301 @@ +package processor + +import ( + "context" + "strconv" + "time" + + "golang.org/x/sync/errgroup" + "golang.org/x/xerrors" + + "github.com/filecoin-project/lotus/chain/events/state" +) + +func (p *Processor) setupMarket() error { + tx, err := p.db.Begin() + if err != nil { + return err + } + + if _, err := tx.Exec(` +create table if not exists market_deal_proposals +( + deal_id bigint not null, + + state_root text not null, + + piece_cid text not null, + padded_piece_size bigint not null, + unpadded_piece_size bigint not null, + is_verified bool not null, + + client_id text not null, + provider_id text not null, + + start_epoch bigint not null, + end_epoch bigint not null, + slashed_epoch bigint, + storage_price_per_epoch text not null, + + provider_collateral text not null, + client_collateral text not null, + + constraint market_deal_proposal_pk + primary key (deal_id) +); + +create table if not exists market_deal_states +( + deal_id bigint not null, + + sector_start_epoch bigint not null, + last_update_epoch bigint not null, + slash_epoch bigint not null, + + state_root text not null, + + unique (deal_id, sector_start_epoch, last_update_epoch, slash_epoch), + + constraint market_deal_states_pk + primary key (deal_id, state_root) + +); + +`); err != nil { + return err + } + + return tx.Commit() +} + +type marketActorInfo struct { + common actorInfo +} + +func (p *Processor) HandleMarketChanges(ctx context.Context, marketTips ActorTips) error { + marketChanges, err := p.processMarket(ctx, marketTips) + if err != nil { + log.Fatalw("Failed to process market actors", "error", err) + } + + if err := p.persistMarket(ctx, marketChanges); err != nil { + log.Fatalw("Failed to persist market actors", "error", err) + } + + if err := p.updateMarket(ctx, marketChanges); err != nil { + log.Fatalw("Failed to update market actors", "error", err) + } + return nil +} + +func (p *Processor) processMarket(ctx context.Context, marketTips ActorTips) ([]marketActorInfo, error) { + start := time.Now() + defer func() { + log.Infow("Processed Market", "duration", time.Since(start).String()) + }() + + var out []marketActorInfo + for _, markets := range marketTips { + for _, mt := range markets { + // NB: here is where we can extract the market state when we need it. + out = append(out, marketActorInfo{common: mt}) + } + } + return out, nil +} + +func (p *Processor) persistMarket(ctx context.Context, info []marketActorInfo) error { + start := time.Now() + defer func() { + log.Infow("Persisted Market", "duration", time.Since(start).String()) + }() + + grp, ctx := errgroup.WithContext(ctx) + + grp.Go(func() error { + if err := p.storeMarketActorDealProposals(ctx, info); err != nil { + return xerrors.Errorf("Failed to store marker deal proposals: %w", err) + } + return nil + }) + + grp.Go(func() error { + if err := p.storeMarketActorDealStates(info); err != nil { + return xerrors.Errorf("Failed to store marker deal states: %w", err) + } + return nil + }) + + return grp.Wait() + +} + +func (p *Processor) updateMarket(ctx context.Context, info []marketActorInfo) error { + if err := p.updateMarketActorDealProposals(ctx, info); err != nil { + return xerrors.Errorf("Failed to update market info: %w", err) + } + return nil +} + +func (p *Processor) storeMarketActorDealStates(marketTips []marketActorInfo) error { + start := time.Now() + defer func() { + log.Infow("Stored Market Deal States", "duration", time.Since(start).String()) + }() + tx, err := p.db.Begin() + if err != nil { + return err + } + if _, err := tx.Exec(`create temp table mds (like market_deal_states excluding constraints) on commit drop;`); err != nil { + return err + } + stmt, err := tx.Prepare(`copy mds (deal_id, sector_start_epoch, last_update_epoch, slash_epoch, state_root) from STDIN`) + if err != nil { + return err + } + for _, mt := range marketTips { + dealStates, err := p.node.StateMarketDeals(context.TODO(), mt.common.tsKey) + if err != nil { + return err + } + + for dealID, ds := range dealStates { + id, err := strconv.ParseUint(dealID, 10, 64) + if err != nil { + return err + } + + if _, err := stmt.Exec( + id, + ds.State.SectorStartEpoch, + ds.State.LastUpdatedEpoch, + ds.State.SlashEpoch, + mt.common.stateroot.String(), + ); err != nil { + return err + } + + } + } + if err := stmt.Close(); err != nil { + return err + } + + if _, err := tx.Exec(`insert into market_deal_states select * from mds on conflict do nothing`); err != nil { + return err + } + + return tx.Commit() +} + +func (p *Processor) storeMarketActorDealProposals(ctx context.Context, marketTips []marketActorInfo) error { + start := time.Now() + defer func() { + log.Infow("Stored Market Deal Proposals", "duration", time.Since(start).String()) + }() + tx, err := p.db.Begin() + if err != nil { + return err + } + + if _, err := tx.Exec(`create temp table mdp (like market_deal_proposals excluding constraints) on commit drop;`); err != nil { + return xerrors.Errorf("prep temp: %w", err) + } + + stmt, err := tx.Prepare(`copy mdp (deal_id, state_root, piece_cid, padded_piece_size, unpadded_piece_size, is_verified, client_id, provider_id, start_epoch, end_epoch, slashed_epoch, storage_price_per_epoch, provider_collateral, client_collateral) from STDIN`) + if err != nil { + return err + } + + // insert in sorted order (lowest height -> highest height) since dealid is pk of table. + for _, mt := range marketTips { + dealStates, err := p.node.StateMarketDeals(ctx, mt.common.tsKey) + if err != nil { + return err + } + + for dealID, ds := range dealStates { + id, err := strconv.ParseUint(dealID, 10, 64) + if err != nil { + return err + } + + if _, err := stmt.Exec( + id, + mt.common.stateroot.String(), + ds.Proposal.PieceCID.String(), + ds.Proposal.PieceSize, + ds.Proposal.PieceSize.Unpadded(), + ds.Proposal.VerifiedDeal, + ds.Proposal.Client.String(), + ds.Proposal.Provider.String(), + ds.Proposal.StartEpoch, + ds.Proposal.EndEpoch, + nil, // slashed_epoch + ds.Proposal.StoragePricePerEpoch.String(), + ds.Proposal.ProviderCollateral.String(), + ds.Proposal.ClientCollateral.String(), + ); err != nil { + return err + } + + } + } + if err := stmt.Close(); err != nil { + return err + } + if _, err := tx.Exec(`insert into market_deal_proposals select * from mdp on conflict do nothing`); err != nil { + return err + } + + return tx.Commit() + +} + +func (p *Processor) updateMarketActorDealProposals(ctx context.Context, marketTip []marketActorInfo) error { + start := time.Now() + defer func() { + log.Infow("Updated Market Deal Proposals", "duration", time.Since(start).String()) + }() + pred := state.NewStatePredicates(p.node) + + tx, err := p.db.Begin() + if err != nil { + return err + } + + stmt, err := tx.Prepare(`update market_deal_proposals set slashed_epoch=$1 where deal_id=$2`) + if err != nil { + return err + } + + for _, mt := range marketTip { + stateDiff := pred.OnStorageMarketActorChanged(pred.OnDealStateChanged(pred.OnDealStateAmtChanged())) + + changed, val, err := stateDiff(ctx, mt.common.parentTsKey, mt.common.tsKey) + if err != nil { + log.Warnw("error getting market deal state diff", "error", err) + } + if !changed { + continue + } + changes, ok := val.(*state.MarketDealStateChanges) + if !ok { + return xerrors.Errorf("Unknown type returned by Deal State AMT predicate: %T", val) + } + + for _, modified := range changes.Modified { + if modified.From.SlashEpoch != modified.To.SlashEpoch { + if _, err := stmt.Exec(modified.To.SlashEpoch, modified.ID); err != nil { + return err + } + } + } + } + + if err := stmt.Close(); err != nil { + return err + } + + return tx.Commit() +} diff --git a/cmd/lotus-chainwatch/processor/messages.go b/cmd/lotus-chainwatch/processor/messages.go new file mode 100644 index 000000000..b7f80d133 --- /dev/null +++ b/cmd/lotus-chainwatch/processor/messages.go @@ -0,0 +1,316 @@ +package processor + +import ( + "context" + "sync" + "time" + + "golang.org/x/sync/errgroup" + "golang.org/x/xerrors" + + "github.com/ipfs/go-cid" + + "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/lib/parmap" +) + +func (p *Processor) setupMessages() error { + tx, err := p.db.Begin() + if err != nil { + return err + } + + if _, err := tx.Exec(` +create table if not exists messages +( + cid text not null + constraint messages_pk + primary key, + "from" text not null, + "to" text not null, + nonce bigint not null, + value text not null, + gasprice bigint not null, + gaslimit bigint not null, + method bigint, + params bytea +); + +create unique index if not exists messages_cid_uindex + on messages (cid); + +create index if not exists messages_from_index + on messages ("from"); + +create index if not exists messages_to_index + on messages ("to"); + +create table if not exists block_messages +( + block text not null + constraint blocks_block_cids_cid_fk + references block_cids (cid), + message text not null, + constraint block_messages_pk + primary key (block, message) +); + +create table if not exists mpool_messages +( + msg text not null + constraint mpool_messages_pk + primary key + constraint mpool_messages_messages_cid_fk + references messages, + add_ts int not null +); + +create unique index if not exists mpool_messages_msg_uindex + on mpool_messages (msg); + +create table if not exists receipts +( + msg text not null, + state text not null, + idx int not null, + exit int not null, + gas_used int not null, + return bytea, + constraint receipts_pk + primary key (msg, state) +); + +create index if not exists receipts_msg_state_index + on receipts (msg, state); +`); err != nil { + return err + } + + return tx.Commit() +} + +func (p *Processor) HandleMessageChanges(ctx context.Context, blocks map[cid.Cid]*types.BlockHeader) error { + if err := p.persistMessagesAndReceipts(ctx, blocks); err != nil { + return err + } + return nil +} + +func (p *Processor) persistMessagesAndReceipts(ctx context.Context, blocks map[cid.Cid]*types.BlockHeader) error { + messages, inclusions := p.fetchMessages(ctx, blocks) + receipts := p.fetchParentReceipts(ctx, blocks) + + grp, _ := errgroup.WithContext(ctx) + + grp.Go(func() error { + return p.storeMessages(messages) + }) + + grp.Go(func() error { + return p.storeMsgInclusions(inclusions) + }) + + grp.Go(func() error { + return p.storeReceipts(receipts) + }) + + return grp.Wait() +} + +func (p *Processor) storeReceipts(recs map[mrec]*types.MessageReceipt) error { + start := time.Now() + defer func() { + log.Infow("Persisted Receipts", "duration", time.Since(start).String()) + }() + tx, err := p.db.Begin() + if err != nil { + return err + } + + if _, err := tx.Exec(` +create temp table recs (like receipts excluding constraints) on commit drop; +`); err != nil { + return xerrors.Errorf("prep temp: %w", err) + } + + stmt, err := tx.Prepare(`copy recs (msg, state, idx, exit, gas_used, return) from stdin `) + if err != nil { + return err + } + + for c, m := range recs { + if _, err := stmt.Exec( + c.msg.String(), + c.state.String(), + c.idx, + m.ExitCode, + m.GasUsed, + m.Return, + ); err != nil { + return err + } + } + if err := stmt.Close(); err != nil { + return err + } + + if _, err := tx.Exec(`insert into receipts select * from recs on conflict do nothing `); err != nil { + return xerrors.Errorf("actor put: %w", err) + } + + return tx.Commit() +} + +func (p *Processor) storeMsgInclusions(incls map[cid.Cid][]cid.Cid) error { + start := time.Now() + defer func() { + log.Infow("Persisted Message Inclusions", "duration", time.Since(start).String()) + }() + tx, err := p.db.Begin() + if err != nil { + return err + } + + if _, err := tx.Exec(` +create temp table mi (like block_messages excluding constraints) on commit drop; +`); err != nil { + return xerrors.Errorf("prep temp: %w", err) + } + + stmt, err := tx.Prepare(`copy mi (block, message) from STDIN `) + if err != nil { + return err + } + + for b, msgs := range incls { + for _, msg := range msgs { + if _, err := stmt.Exec( + b.String(), + msg.String(), + ); err != nil { + return err + } + } + } + if err := stmt.Close(); err != nil { + return err + } + + if _, err := tx.Exec(`insert into block_messages select * from mi on conflict do nothing `); err != nil { + return xerrors.Errorf("actor put: %w", err) + } + + return tx.Commit() +} + +func (p *Processor) storeMessages(msgs map[cid.Cid]*types.Message) error { + start := time.Now() + defer func() { + log.Debugw("Persisted Messages", "duration", time.Since(start).String()) + }() + tx, err := p.db.Begin() + if err != nil { + return err + } + + if _, err := tx.Exec(` +create temp table msgs (like messages excluding constraints) on commit drop; +`); err != nil { + return xerrors.Errorf("prep temp: %w", err) + } + + stmt, err := tx.Prepare(`copy msgs (cid, "from", "to", nonce, "value", gasprice, gaslimit, method, params) from stdin `) + if err != nil { + return err + } + + for c, m := range msgs { + if _, err := stmt.Exec( + c.String(), + m.From.String(), + m.To.String(), + m.Nonce, + m.Value.String(), + m.GasPrice.String(), + m.GasLimit, + m.Method, + m.Params, + ); err != nil { + return err + } + } + if err := stmt.Close(); err != nil { + return err + } + + if _, err := tx.Exec(`insert into messages select * from msgs on conflict do nothing `); err != nil { + return xerrors.Errorf("actor put: %w", err) + } + + return tx.Commit() +} + +func (p *Processor) fetchMessages(ctx context.Context, blocks map[cid.Cid]*types.BlockHeader) (map[cid.Cid]*types.Message, map[cid.Cid][]cid.Cid) { + var lk sync.Mutex + messages := map[cid.Cid]*types.Message{} + inclusions := map[cid.Cid][]cid.Cid{} // block -> msgs + + parmap.Par(50, parmap.MapArr(blocks), func(header *types.BlockHeader) { + msgs, err := p.node.ChainGetBlockMessages(ctx, header.Cid()) + if err != nil { + panic(err) + } + + vmm := make([]*types.Message, 0, len(msgs.Cids)) + for _, m := range msgs.BlsMessages { + vmm = append(vmm, m) + } + + for _, m := range msgs.SecpkMessages { + vmm = append(vmm, &m.Message) + } + + lk.Lock() + for _, message := range vmm { + messages[message.Cid()] = message + inclusions[header.Cid()] = append(inclusions[header.Cid()], message.Cid()) + } + lk.Unlock() + }) + + return messages, inclusions +} + +type mrec struct { + msg cid.Cid + state cid.Cid + idx int +} + +func (p *Processor) fetchParentReceipts(ctx context.Context, toSync map[cid.Cid]*types.BlockHeader) map[mrec]*types.MessageReceipt { + var lk sync.Mutex + out := map[mrec]*types.MessageReceipt{} + + parmap.Par(50, parmap.MapArr(toSync), func(header *types.BlockHeader) { + recs, err := p.node.ChainGetParentReceipts(ctx, header.Cid()) + if err != nil { + panic(err) + } + msgs, err := p.node.ChainGetParentMessages(ctx, header.Cid()) + if err != nil { + panic(err) + } + + lk.Lock() + for i, r := range recs { + out[mrec{ + msg: msgs[i].Cid, + state: header.ParentStateRoot, + idx: i, + }] = r + } + lk.Unlock() + }) + + return out +} diff --git a/cmd/lotus-chainwatch/processor/miner.go b/cmd/lotus-chainwatch/processor/miner.go new file mode 100644 index 000000000..a76f88dfc --- /dev/null +++ b/cmd/lotus-chainwatch/processor/miner.go @@ -0,0 +1,640 @@ +package processor + +import ( + "bytes" + "context" + "fmt" + "strings" + "sync" + "time" + + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/lotus/chain/events/state" + "github.com/filecoin-project/specs-actors/actors/abi" + "golang.org/x/sync/errgroup" + "golang.org/x/xerrors" + + "github.com/libp2p/go-libp2p-core/peer" + + "github.com/filecoin-project/specs-actors/actors/abi/big" + "github.com/filecoin-project/specs-actors/actors/builtin" + "github.com/filecoin-project/specs-actors/actors/builtin/miner" + "github.com/filecoin-project/specs-actors/actors/builtin/power" + "github.com/filecoin-project/specs-actors/actors/util/adt" + + "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/chain/types" + cw_util "github.com/filecoin-project/lotus/cmd/lotus-chainwatch/util" +) + +func (p *Processor) setupMiners() error { + tx, err := p.db.Begin() + if err != nil { + return err + } + + if _, err := tx.Exec(` +create table if not exists miner_sectors +( + miner_id text not null, + sector_id bigint not null, + + activation_epoch bigint not null, + expiration_epoch bigint not null, + termination_epoch bigint, + + deal_weight text not null, + verified_deal_weight text not null, + seal_cid text not null, + seal_rand_epoch bigint not null, + constraint miner_sectors_pk + primary key (miner_id, sector_id) +); + +create index if not exists miner_sectors_miner_sectorid_index + on miner_sectors (miner_id, sector_id); + +create table if not exists miner_info +( + miner_id text not null, + owner_addr text not null, + worker_addr text not null, + peer_id text, + sector_size text not null, + + precommit_deposits text not null, + locked_funds text not null, + next_deadline_process_faults bigint not null, + constraint miner_info_pk + primary key (miner_id) +); + +/* +* captures miner-specific power state for any given stateroot +*/ +create table if not exists miner_power +( + miner_id text not null, + state_root text not null, + raw_bytes_power text not null, + quality_adjusted_power text not null, + constraint miner_power_pk + primary key (miner_id, state_root) +); + +/* used to tell when a miners sectors (proven-not-yet-expired) changed if the miner_sectors_cid's are different a new sector was added or removed (terminated/expired) */ +create table if not exists miner_sectors_heads +( + miner_id text not null, + miner_sectors_cid text not null, + + state_root text not null, + + constraint miner_sectors_heads_pk + primary key (miner_id,miner_sectors_cid) + +); + +DO $$ +BEGIN + IF NOT EXISTS (SELECT 1 FROM pg_type WHERE typname = 'miner_sector_event_type') THEN + CREATE TYPE miner_sector_event_type AS ENUM + ( + 'ADDED','EXTENDED', 'EXPIRED', 'TERMINATED' + ); + END IF; +END$$; + +create table if not exists miner_sector_events +( + miner_id text not null, + sector_id bigint not null, + state_root text not null, + event miner_sector_event_type not null, + + constraint miner_sector_events_pk + primary key (sector_id, event, miner_id, state_root) +) +`); err != nil { + return err + } + + return tx.Commit() +} + +type minerActorInfo struct { + common actorInfo + + state miner.State + + // tracked by power actor + rawPower big.Int + qalPower big.Int +} + +type sectorUpdate struct { + terminationEpoch abi.ChainEpoch + terminated bool + + expirationEpoch abi.ChainEpoch + + sectorID abi.SectorNumber + minerID address.Address +} + +func (p *Processor) HandleMinerChanges(ctx context.Context, minerTips ActorTips) error { + minerChanges, err := p.processMiners(ctx, minerTips) + if err != nil { + log.Fatalw("Failed to process miner actors", "error", err) + } + + if err := p.persistMiners(ctx, minerChanges); err != nil { + log.Fatalw("Failed to persist miner actors", "error", err) + } + + if err := p.updateMiners(ctx, minerChanges); err != nil { + log.Fatalw("Failed to update miner actors", "error", err) + } + return nil +} + +func (p *Processor) processMiners(ctx context.Context, minerTips map[types.TipSetKey][]actorInfo) ([]minerActorInfo, error) { + start := time.Now() + defer func() { + log.Infow("Processed Miners", "duration", time.Since(start).String()) + }() + + var out []minerActorInfo + // TODO add parallel calls if this becomes slow + for tipset, miners := range minerTips { + // get the power actors claims map + minersClaims, err := getPowerActorClaimsMap(ctx, p.node, tipset) + if err != nil { + return nil, err + } + + // Get miner raw and quality power + for _, act := range miners { + var mi minerActorInfo + mi.common = act + + var claim power.Claim + // get miner claim from power actors claim map and store if found, else the miner had no claim at + // this tipset + found, err := minersClaims.Get(adt.AddrKey(act.addr), &claim) + if err != nil { + return nil, err + } + if found { + mi.qalPower = claim.QualityAdjPower + mi.rawPower = claim.RawBytePower + } + + // Get the miner state info + astb, err := p.node.ChainReadObj(ctx, act.act.Head) + if err != nil { + log.Warnw("failed to find miner actor state", "address", act.addr, "error", err) + continue + } + if err := mi.state.UnmarshalCBOR(bytes.NewReader(astb)); err != nil { + return nil, err + } + out = append(out, mi) + } + } + return out, nil +} + +func (p *Processor) persistMiners(ctx context.Context, miners []minerActorInfo) error { + start := time.Now() + defer func() { + log.Infow("Persisted Miners", "duration", time.Since(start).String()) + }() + + grp, _ := errgroup.WithContext(ctx) + + grp.Go(func() error { + if err := p.storeMinersActorState(miners); err != nil { + return err + } + return nil + }) + + grp.Go(func() error { + if err := p.storeMinersPower(miners); err != nil { + return err + } + return nil + }) + + grp.Go(func() error { + if err := p.storeMinersSectorState(miners); err != nil { + return err + } + return nil + }) + + grp.Go(func() error { + if err := p.storeMinersSectorHeads(miners); err != nil { + return err + } + return nil + }) + + return grp.Wait() +} + +func (p *Processor) storeMinersActorState(miners []minerActorInfo) error { + start := time.Now() + defer func() { + log.Infow("Stored Miners Actor State", "duration", time.Since(start).String()) + }() + + tx, err := p.db.Begin() + if err != nil { + return err + } + + if _, err := tx.Exec(`create temp table mi (like miner_info excluding constraints) on commit drop;`); err != nil { + return xerrors.Errorf("prep temp: %w", err) + } + + stmt, err := tx.Prepare(`copy mi (miner_id, owner_addr, worker_addr, peer_id, sector_size, precommit_deposits, locked_funds, next_deadline_process_faults) from STDIN`) + if err != nil { + return err + } + for _, m := range miners { + var pid string + if len(m.state.Info.PeerId) != 0 { + peerid, err := peer.IDFromBytes(m.state.Info.PeerId) + if err != nil { + // this should "never happen", but if it does we should still store info about the miner. + log.Warnw("failed to decode peerID", "peerID (bytes)", m.state.Info.PeerId, "miner", m.common.addr, "tipset", m.common.tsKey.String()) + } else { + pid = peerid.String() + } + } + if _, err := stmt.Exec( + m.common.addr.String(), + m.state.Info.Owner.String(), + m.state.Info.Worker.String(), + pid, + m.state.Info.SectorSize.ShortString(), + m.state.PreCommitDeposits.String(), + m.state.LockedFunds.String(), + m.state.NextDeadlineToProcessFaults, + ); err != nil { + log.Errorw("failed to store miner state", "state", m.state, "info", m.state.Info, "error", err) + return xerrors.Errorf("failed to store miner state: %w", err) + } + + } + if err := stmt.Close(); err != nil { + return err + } + + if _, err := tx.Exec(`insert into miner_info select * from mi on conflict do nothing `); err != nil { + return xerrors.Errorf("actor put: %w", err) + } + + return tx.Commit() +} + +func (p *Processor) storeMinersPower(miners []minerActorInfo) error { + start := time.Now() + defer func() { + log.Infow("Stored Miners Power", "duration", time.Since(start).String()) + }() + + tx, err := p.db.Begin() + if err != nil { + return xerrors.Errorf("begin miner_power tx: %w", err) + } + + if _, err := tx.Exec(`create temp table mp (like miner_power excluding constraints) on commit drop`); err != nil { + return xerrors.Errorf("prep miner_power temp: %w", err) + } + + stmt, err := tx.Prepare(`copy mp (miner_id, state_root, raw_bytes_power, quality_adjusted_power) from STDIN`) + if err != nil { + return xerrors.Errorf("prepare tmp miner_power: %w", err) + } + + for _, m := range miners { + if _, err := stmt.Exec( + m.common.addr.String(), + m.common.stateroot.String(), + m.rawPower.String(), + m.qalPower.String(), + ); err != nil { + log.Errorw("failed to store miner power", "miner", m.common.addr, "stateroot", m.common.stateroot, "error", err) + } + } + + if err := stmt.Close(); err != nil { + return xerrors.Errorf("close prepared miner_power: %w", err) + } + + if _, err := tx.Exec(`insert into miner_power select * from mp on conflict do nothing`); err != nil { + return xerrors.Errorf("insert miner_power from tmp: %w", err) + } + + if err := tx.Commit(); err != nil { + return xerrors.Errorf("commit miner_power tx: %w", err) + } + + return nil + +} + +func (p *Processor) storeMinersSectorState(miners []minerActorInfo) error { + start := time.Now() + defer func() { + log.Infow("Stored Miners Sector State", "duration", time.Since(start).String()) + }() + + tx, err := p.db.Begin() + if err != nil { + return err + } + + if _, err := tx.Exec(`create temp table ms (like miner_sectors excluding constraints) on commit drop;`); err != nil { + return xerrors.Errorf("prep temp: %w", err) + } + + stmt, err := tx.Prepare(`copy ms (miner_id, sector_id, activation_epoch, expiration_epoch, deal_weight, verified_deal_weight, seal_cid, seal_rand_epoch) from STDIN`) + if err != nil { + return err + } + + grp, ctx := errgroup.WithContext(context.TODO()) + for _, m := range miners { + m := m + grp.Go(func() error { + sectors, err := p.node.StateMinerSectors(ctx, m.common.addr, nil, true, m.common.tsKey) + if err != nil { + log.Debugw("Failed to load sectors", "tipset", m.common.tsKey.String(), "miner", m.common.addr.String(), "error", err) + } + + for _, sector := range sectors { + if _, err := stmt.Exec( + m.common.addr.String(), + uint64(sector.ID), + int64(sector.Info.ActivationEpoch), + int64(sector.Info.Info.Expiration), + sector.Info.DealWeight.String(), + sector.Info.VerifiedDealWeight.String(), + sector.Info.Info.SealedCID.String(), + int64(sector.Info.Info.SealRandEpoch), + ); err != nil { + return err + } + } + return nil + }) + } + + if err := grp.Wait(); err != nil { + return err + } + + if err := stmt.Close(); err != nil { + return err + } + + if _, err := tx.Exec(`insert into miner_sectors select * from ms on conflict do nothing `); err != nil { + return xerrors.Errorf("actor put: %w", err) + } + + return tx.Commit() +} + +func (p *Processor) storeMinersSectorHeads(miners []minerActorInfo) error { + start := time.Now() + defer func() { + log.Infow("Stored Miners Sector Heads", "duration", time.Since(start).String()) + }() + + tx, err := p.db.Begin() + if err != nil { + return err + } + + if _, err := tx.Exec(`create temp table msh (like miner_sectors_heads excluding constraints) on commit drop;`); err != nil { + return xerrors.Errorf("prep temp: %w", err) + } + + stmt, err := tx.Prepare(`copy msh (miner_id, miner_sectors_cid, state_root) from STDIN`) + if err != nil { + return err + } + + for _, m := range miners { + if _, err := stmt.Exec( + m.common.addr.String(), + m.state.Sectors.String(), + m.common.stateroot.String(), + ); err != nil { + log.Errorw("failed to store miners sectors head", "state", m.state, "info", m.state.Info, "error", err) + return err + } + + } + if err := stmt.Close(); err != nil { + return err + } + + if _, err := tx.Exec(`insert into miner_sectors_heads select * from msh on conflict do nothing `); err != nil { + return xerrors.Errorf("actor put: %w", err) + } + + return tx.Commit() +} + +func (p *Processor) updateMiners(ctx context.Context, miners []minerActorInfo) error { + // TODO when/if there is more than one update operation here use an errgroup as is done in persistMiners + if err := p.updateMinersSectors(ctx, miners); err != nil { + return err + } + return nil +} + +func (p *Processor) updateMinersSectors(ctx context.Context, miners []minerActorInfo) error { + log.Infow("Updating Miners Sectors", "#miners", len(miners)) + start := time.Now() + defer func() { + log.Infow("Updated Miners Sectors", "duration", time.Since(start).String()) + }() + + pred := state.NewStatePredicates(p.node) + + eventTx, err := p.db.Begin() + if err != nil { + return err + } + + if _, err := eventTx.Exec(`create temp table mse (like miner_sector_events excluding constraints) on commit drop;`); err != nil { + return xerrors.Errorf("prep temp: %w", err) + } + + eventStmt, err := eventTx.Prepare(`copy mse (sector_id, event, miner_id, state_root) from STDIN `) + if err != nil { + return err + } + + var updateWg sync.WaitGroup + updateWg.Add(1) + sectorUpdatesCh := make(chan sectorUpdate) + var sectorUpdates []sectorUpdate + go func() { + for u := range sectorUpdatesCh { + sectorUpdates = append(sectorUpdates, u) + } + updateWg.Done() + }() + + minerGrp, ctx := errgroup.WithContext(ctx) + complete := 0 + for _, m := range miners { + m := m + minerGrp.Go(func() error { + // special case genesis miners + sectorDiffFn := pred.OnMinerActorChange(m.common.addr, pred.OnMinerSectorChange()) + changed, val, err := sectorDiffFn(ctx, m.common.parentTsKey, m.common.tsKey) + if err != nil { + if strings.Contains(err.Error(), "address not found") { + return nil + } + log.Errorw("error getting miner sector diff", "miner", m.common.addr, "error", err) + return err + } + if !changed { + complete++ + return nil + } + changes, ok := val.(*state.MinerSectorChanges) + if !ok { + log.Fatalw("Developer Error") + } + log.Debugw("sector changes for miner", "miner", m.common.addr.String(), "Added", len(changes.Added), "Extended", len(changes.Extended), "Removed", len(changes.Removed), "oldState", m.common.parentTsKey, "newState", m.common.tsKey) + + for _, extended := range changes.Extended { + if _, err := eventStmt.Exec(extended.To.Info.SectorNumber, "EXTENDED", m.common.addr.String(), m.common.stateroot.String()); err != nil { + return err + } + sectorUpdatesCh <- sectorUpdate{ + terminationEpoch: 0, + terminated: false, + expirationEpoch: extended.To.Info.Expiration, + sectorID: extended.From.Info.SectorNumber, + minerID: m.common.addr, + } + + log.Infow("sector extended", "miner", m.common.addr.String(), "sector", extended.To.Info.SectorNumber, "old", extended.To.Info.Expiration, "new", extended.From.Info.Expiration) + } + curTs, err := p.node.ChainGetTipSet(ctx, m.common.tsKey) + if err != nil { + return err + } + + for _, removed := range changes.Removed { + log.Infow("removed", "miner", m.common.addr) + // decide if they were terminated or extended + if removed.Info.Expiration > curTs.Height() { + if _, err := eventStmt.Exec(removed.Info.SectorNumber, "TERMINATED", m.common.addr.String(), m.common.stateroot.String()); err != nil { + return err + } + log.Infow("sector terminated", "miner", m.common.addr.String(), "sector", removed.Info.SectorNumber, "old", "sectorExpiration", removed.Info.Expiration, "terminationEpoch", curTs.Height()) + sectorUpdatesCh <- sectorUpdate{ + terminationEpoch: curTs.Height(), + terminated: true, + expirationEpoch: removed.Info.Expiration, + sectorID: removed.Info.SectorNumber, + minerID: m.common.addr, + } + + } + if _, err := eventStmt.Exec(removed.Info.SectorNumber, "EXPIRED", m.common.addr.String(), m.common.stateroot.String()); err != nil { + return err + } + log.Infow("sector removed", "miner", m.common.addr.String(), "sector", removed.Info.SectorNumber, "old", "sectorExpiration", removed.Info.Expiration, "currEpoch", curTs.Height()) + } + + for _, added := range changes.Added { + if _, err := eventStmt.Exec(added.Info.SectorNumber, "ADDED", m.common.addr.String(), m.common.stateroot.String()); err != nil { + return err + } + } + complete++ + log.Debugw("Update Done", "complete", complete, "added", len(changes.Added), "removed", len(changes.Removed), "modified", len(changes.Extended)) + return nil + }) + } + if err := minerGrp.Wait(); err != nil { + return err + } + close(sectorUpdatesCh) + // wait for the update channel to be drained + updateWg.Wait() + + if err := eventStmt.Close(); err != nil { + return err + } + + if _, err := eventTx.Exec(`insert into miner_sector_events select * from mse on conflict do nothing `); err != nil { + return xerrors.Errorf("actor put: %w", err) + } + + if err := eventTx.Commit(); err != nil { + return err + } + + updateTx, err := p.db.Begin() + if err != nil { + return err + } + + updateStmt, err := updateTx.Prepare(`UPDATE miner_sectors SET termination_epoch=$1, expiration_epoch=$2 WHERE miner_id=$3 AND sector_id=$4`) + if err != nil { + return err + } + + for _, update := range sectorUpdates { + if update.terminated { + if _, err := updateStmt.Exec(update.terminationEpoch, update.expirationEpoch, update.minerID.String(), update.sectorID); err != nil { + return err + } + } else { + if _, err := updateStmt.Exec(nil, update.expirationEpoch, update.minerID.String(), update.sectorID); err != nil { + return err + } + } + } + + if err := updateStmt.Close(); err != nil { + return err + } + + return updateTx.Commit() +} + +// load the power actor state clam as an adt.Map at the tipset `ts`. +func getPowerActorClaimsMap(ctx context.Context, api api.FullNode, ts types.TipSetKey) (*adt.Map, error) { + powerActor, err := api.StateGetActor(ctx, builtin.StoragePowerActorAddr, ts) + if err != nil { + return nil, err + } + + powerRaw, err := api.ChainReadObj(ctx, powerActor.Head) + if err != nil { + return nil, err + } + + var powerActorState power.State + if err := powerActorState.UnmarshalCBOR(bytes.NewReader(powerRaw)); err != nil { + return nil, fmt.Errorf("failed to unmarshal power actor state: %w", err) + } + + s := cw_util.NewAPIIpldStore(ctx, api) + return adt.AsMap(s, powerActorState.Claims) +} diff --git a/cmd/lotus-chainwatch/processor/mpool.go b/cmd/lotus-chainwatch/processor/mpool.go new file mode 100644 index 000000000..ad75baddc --- /dev/null +++ b/cmd/lotus-chainwatch/processor/mpool.go @@ -0,0 +1,103 @@ +package processor + +import ( + "context" + "time" + + "golang.org/x/xerrors" + + "github.com/ipfs/go-cid" + + "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/chain/types" +) + +func (p *Processor) subMpool(ctx context.Context) { + sub, err := p.node.MpoolSub(ctx) + if err != nil { + return + } + + for { + var updates []api.MpoolUpdate + + select { + case update := <-sub: + updates = append(updates, update) + case <-ctx.Done(): + return + } + + loop: + for { + time.Sleep(10 * time.Millisecond) + select { + case update := <-sub: + updates = append(updates, update) + default: + break loop + } + } + + msgs := map[cid.Cid]*types.Message{} + for _, v := range updates { + if v.Type != api.MpoolAdd { + continue + } + + msgs[v.Message.Message.Cid()] = &v.Message.Message + } + + log.Debugf("Processing %d mpool updates", len(msgs)) + + err := p.storeMessages(msgs) + if err != nil { + log.Error(err) + } + + if err := p.storeMpoolInclusions(updates); err != nil { + log.Error(err) + } + } +} + +func (p *Processor) storeMpoolInclusions(msgs []api.MpoolUpdate) error { + tx, err := p.db.Begin() + if err != nil { + return err + } + + if _, err := tx.Exec(` + create temp table mi (like mpool_messages excluding constraints) on commit drop; + `); err != nil { + return xerrors.Errorf("prep temp: %w", err) + } + + stmt, err := tx.Prepare(`copy mi (msg, add_ts) from stdin `) + if err != nil { + return err + } + + for _, msg := range msgs { + if msg.Type != api.MpoolAdd { + continue + } + + if _, err := stmt.Exec( + msg.Message.Message.Cid().String(), + time.Now().Unix(), + ); err != nil { + return err + } + } + + if err := stmt.Close(); err != nil { + return err + } + + if _, err := tx.Exec(`insert into mpool_messages select * from mi on conflict do nothing `); err != nil { + return xerrors.Errorf("actor put: %w", err) + } + + return tx.Commit() +} diff --git a/cmd/lotus-chainwatch/processor/processor.go b/cmd/lotus-chainwatch/processor/processor.go new file mode 100644 index 000000000..e44172822 --- /dev/null +++ b/cmd/lotus-chainwatch/processor/processor.go @@ -0,0 +1,326 @@ +package processor + +import ( + "context" + "database/sql" + "encoding/json" + "sync" + "time" + + "golang.org/x/sync/errgroup" + "golang.org/x/xerrors" + + "github.com/filecoin-project/go-address" + "github.com/ipfs/go-cid" + logging "github.com/ipfs/go-log/v2" + + "github.com/filecoin-project/specs-actors/actors/abi" + "github.com/filecoin-project/specs-actors/actors/builtin" + + "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/lib/parmap" +) + +var log = logging.Logger("processor") + +type Processor struct { + db *sql.DB + + node api.FullNode + + // number of blocks processed at a time + batch int +} + +type ActorTips map[types.TipSetKey][]actorInfo + +type actorInfo struct { + act types.Actor + + stateroot cid.Cid + height abi.ChainEpoch // so that we can walk the actor changes in chronological order. + + tsKey types.TipSetKey + parentTsKey types.TipSetKey + + addr address.Address + state string +} + +func NewProcessor(db *sql.DB, node api.FullNode, batch int) *Processor { + return &Processor{ + db: db, + node: node, + batch: batch, + } +} + +func (p *Processor) setupSchemas() error { + if err := p.setupMarket(); err != nil { + return err + } + + if err := p.setupMiners(); err != nil { + return err + } + + if err := p.setupRewards(); err != nil { + return err + } + + if err := p.setupMessages(); err != nil { + return err + } + + if err := p.setupCommonActors(); err != nil { + return err + } + + return nil +} + +func (p *Processor) Start(ctx context.Context) { + log.Info("Starting Processor") + + if err := p.setupSchemas(); err != nil { + log.Fatalw("Failed to setup processor", "error", err) + } + + go p.subMpool(ctx) + + // main processor loop + go func() { + for { + select { + case <-ctx.Done(): + log.Infow("Stopping Processor...") + return + default: + toProcess, err := p.unprocessedBlocks(ctx, p.batch) + if err != nil { + log.Fatalw("Failed to get unprocessed blocks", "error", err) + } + + // TODO special case genesis state handling here to avoid all the special cases that will be needed for it else where + // before doing "normal" processing. + + actorChanges, err := p.collectActorChanges(ctx, toProcess) + if err != nil { + log.Fatalw("Failed to collect actor changes", "error", err) + } + + grp, ctx := errgroup.WithContext(ctx) + + grp.Go(func() error { + if err := p.HandleMarketChanges(ctx, actorChanges[builtin.StorageMarketActorCodeID]); err != nil { + return xerrors.Errorf("Failed to handle market changes: %w", err) + } + return nil + }) + + grp.Go(func() error { + if err := p.HandleMinerChanges(ctx, actorChanges[builtin.StorageMinerActorCodeID]); err != nil { + return xerrors.Errorf("Failed to handle miner changes: %w", err) + } + return nil + }) + + grp.Go(func() error { + if err := p.HandleRewardChanges(ctx, actorChanges[builtin.RewardActorCodeID]); err != nil { + return xerrors.Errorf("Failed to handle reward changes: %w", err) + } + return nil + }) + + grp.Go(func() error { + if err := p.HandleMessageChanges(ctx, toProcess); err != nil { + return xerrors.Errorf("Failed to handle message changes: %w", err) + } + return nil + }) + + grp.Go(func() error { + if err := p.HandleCommonActorsChanges(ctx, actorChanges); err != nil { + return xerrors.Errorf("Failed to handle common actor changes: %w", err) + } + return nil + }) + + if err := grp.Wait(); err != nil { + log.Errorw("Failed to handle actor changes...retrying", "error", err) + continue + } + + if err := p.markBlocksProcessed(ctx, toProcess); err != nil { + log.Fatalw("Failed to mark blocks as processed", "error", err) + } + + if err := p.refreshViews(); err != nil { + log.Errorw("Failed to refresh views", "error", err) + } + } + } + }() + +} + +func (p *Processor) refreshViews() error { + if _, err := p.db.Exec(`refresh materialized view state_heights`); err != nil { + return err + } + + return nil +} + +func (p *Processor) collectActorChanges(ctx context.Context, toProcess map[cid.Cid]*types.BlockHeader) (map[cid.Cid]ActorTips, error) { + start := time.Now() + defer func() { + log.Infow("Collected Actor Changes", "duration", time.Since(start).String()) + }() + // ActorCode - > tipset->[]actorInfo + out := map[cid.Cid]ActorTips{} + var outMu sync.Mutex + + // map of addresses to changed actors + var changes map[string]types.Actor + actorsSeen := map[cid.Cid]struct{}{} + + // collect all actor state that has changes between block headers + paDone := 0 + parmap.Par(50, parmap.MapArr(toProcess), func(bh *types.BlockHeader) { + paDone++ + if paDone%100 == 0 { + log.Infow("Collecting actor changes", "done", paDone, "percent", (paDone*100)/len(toProcess)) + } + + pts, err := p.node.ChainGetTipSet(ctx, types.NewTipSetKey(bh.Parents...)) + if err != nil { + panic(err) + } + + // collect all actors that had state changes between the blockheader parent-state and its grandparent-state. + // TODO: changes will contain deleted actors, this causes needless processing further down the pipeline, consider + // a separate strategy for deleted actors + changes, err = p.node.StateChangedActors(ctx, pts.ParentState(), bh.ParentStateRoot) + if err != nil { + panic(err) + } + + // record the state of all actors that have changed + for a, act := range changes { + act := act + a := a + + addr, err := address.NewFromString(a) + if err != nil { + panic(err) + } + + ast, err := p.node.StateReadState(ctx, addr, pts.Key()) + if err != nil { + panic(err) + } + + // TODO look here for an empty state, maybe thats a sign the actor was deleted? + + state, err := json.Marshal(ast.State) + if err != nil { + panic(err) + } + + outMu.Lock() + if _, ok := actorsSeen[act.Head]; !ok { + _, ok := out[act.Code] + if !ok { + out[act.Code] = map[types.TipSetKey][]actorInfo{} + } + out[act.Code][pts.Key()] = append(out[act.Code][pts.Key()], actorInfo{ + act: act, + stateroot: bh.ParentStateRoot, + height: bh.Height, + tsKey: pts.Key(), + parentTsKey: pts.Parents(), + addr: addr, + state: string(state), + }) + } + actorsSeen[act.Head] = struct{}{} + outMu.Unlock() + } + }) + return out, nil +} + +func (p *Processor) unprocessedBlocks(ctx context.Context, batch int) (map[cid.Cid]*types.BlockHeader, error) { + start := time.Now() + defer func() { + log.Infow("Gathered Blocks to process", "duration", time.Since(start).String()) + }() + rows, err := p.db.Query(` +with toProcess as ( + select blocks.cid, blocks.height, rank() over (order by height) as rnk + from blocks + left join blocks_synced bs on blocks.cid = bs.cid + where bs.processed_at is null and blocks.height > 0 +) +select cid +from toProcess +where rnk <= $1 +`, batch) + if err != nil { + return nil, xerrors.Errorf("Failed to query for unprocessed blocks: %w", err) + } + out := map[cid.Cid]*types.BlockHeader{} + + // TODO consider parallel execution here for getting the blocks from the api as is done in fetchMessages() + for rows.Next() { + if rows.Err() != nil { + return nil, err + } + var c string + if err := rows.Scan(&c); err != nil { + return nil, xerrors.Errorf("Failed to scan unprocessed blocks: %w", err) + } + ci, err := cid.Parse(c) + if err != nil { + return nil, xerrors.Errorf("Failed to parse unprocessed blocks: %w", err) + } + bh, err := p.node.ChainGetBlock(ctx, ci) + if err != nil { + // this is a pretty serious issue. + return nil, xerrors.Errorf("Failed to get block header %s: %w", ci.String(), err) + } + out[ci] = bh + } + return out, rows.Close() +} + +func (p *Processor) markBlocksProcessed(ctx context.Context, processed map[cid.Cid]*types.BlockHeader) error { + start := time.Now() + defer func() { + log.Infow("Marked blocks as Processed", "duration", time.Since(start).String()) + }() + tx, err := p.db.Begin() + if err != nil { + return err + } + + processedAt := time.Now().Unix() + stmt, err := tx.Prepare(`update blocks_synced set processed_at=$1 where cid=$2`) + if err != nil { + return err + } + + for c := range processed { + if _, err := stmt.Exec(processedAt, c.String()); err != nil { + return err + } + } + + if err := stmt.Close(); err != nil { + return err + } + + return tx.Commit() +} diff --git a/cmd/lotus-chainwatch/processor/reward.go b/cmd/lotus-chainwatch/processor/reward.go new file mode 100644 index 000000000..8d09d6586 --- /dev/null +++ b/cmd/lotus-chainwatch/processor/reward.go @@ -0,0 +1,144 @@ +package processor + +import ( + "bytes" + "context" + "time" + + "golang.org/x/xerrors" + + "github.com/filecoin-project/specs-actors/actors/abi/big" + "github.com/filecoin-project/specs-actors/actors/builtin" + "github.com/filecoin-project/specs-actors/actors/builtin/reward" +) + +type rewardActorInfo struct { + common actorInfo + + baselinePower big.Int +} + +func (p *Processor) setupRewards() error { + tx, err := p.db.Begin() + if err != nil { + return err + } + + if _, err := tx.Exec(` +/* +* captures chain-specific power state for any given stateroot +*/ +create table if not exists chain_power +( + state_root text not null + constraint chain_power_pk + primary key, + baseline_power text not null +); +`); err != nil { + return err + } + + return tx.Commit() + +} + +func (p *Processor) HandleRewardChanges(ctx context.Context, rewardTips ActorTips) error { + rewardChanges, err := p.processRewardActors(ctx, rewardTips) + if err != nil { + log.Fatalw("Failed to process reward actors", "error", err) + } + + if err := p.persistRewardActors(ctx, rewardChanges); err != nil { + return err + } + + return nil +} + +func (p *Processor) processRewardActors(ctx context.Context, rewardTips ActorTips) ([]rewardActorInfo, error) { + start := time.Now() + defer func() { + log.Infow("Processed Reward Actors", "duration", time.Since(start).String()) + }() + + var out []rewardActorInfo + for tipset, rewards := range rewardTips { + for _, act := range rewards { + var rw rewardActorInfo + rw.common = act + + // get reward actor states at each tipset once for all updates + rewardActor, err := p.node.StateGetActor(ctx, builtin.RewardActorAddr, tipset) + if err != nil { + return nil, xerrors.Errorf("get reward state (@ %s): %w", rw.common.stateroot.String(), err) + } + + rewardStateRaw, err := p.node.ChainReadObj(ctx, rewardActor.Head) + if err != nil { + return nil, xerrors.Errorf("read state obj (@ %s): %w", rw.common.stateroot.String(), err) + } + + var rewardActorState reward.State + if err := rewardActorState.UnmarshalCBOR(bytes.NewReader(rewardStateRaw)); err != nil { + return nil, xerrors.Errorf("unmarshal state (@ %s): %w", rw.common.stateroot.String(), err) + } + + rw.baselinePower = rewardActorState.BaselinePower + out = append(out, rw) + } + } + return out, nil +} + +func (p *Processor) persistRewardActors(ctx context.Context, rewards []rewardActorInfo) error { + start := time.Now() + defer func() { + log.Infow("Persisted Reward Actors", "duration", time.Since(start).String()) + }() + + if err := p.storeChainPower(rewards); err != nil { + return err + } + + return nil +} + +func (p *Processor) storeChainPower(rewards []rewardActorInfo) error { + tx, err := p.db.Begin() + if err != nil { + return xerrors.Errorf("begin chain_power tx: %w", err) + } + + if _, err := tx.Exec(`create temp table cp (like chain_power excluding constraints) on commit drop`); err != nil { + return xerrors.Errorf("prep chain_power temp: %w", err) + } + + stmt, err := tx.Prepare(`copy cp (state_root, baseline_power) from STDIN`) + if err != nil { + return xerrors.Errorf("prepare tmp chain_power: %w", err) + } + + for _, rewardState := range rewards { + if _, err := stmt.Exec( + rewardState.common.stateroot.String(), + rewardState.baselinePower.String(), + ); err != nil { + log.Errorw("failed to store chain power", "state_root", rewardState.common.stateroot, "error", err) + } + } + + if err := stmt.Close(); err != nil { + return xerrors.Errorf("close prepared chain_power: %w", err) + } + + if _, err := tx.Exec(`insert into chain_power select * from cp on conflict do nothing`); err != nil { + return xerrors.Errorf("insert chain_power from tmp: %w", err) + } + + if err := tx.Commit(); err != nil { + return xerrors.Errorf("commit chain_power tx: %w", err) + } + + return nil +} diff --git a/cmd/lotus-chainwatch/blockssub.go b/cmd/lotus-chainwatch/syncer/blockssub.go similarity index 50% rename from cmd/lotus-chainwatch/blockssub.go rename to cmd/lotus-chainwatch/syncer/blockssub.go index c569f1885..04b78da0e 100644 --- a/cmd/lotus-chainwatch/blockssub.go +++ b/cmd/lotus-chainwatch/syncer/blockssub.go @@ -1,25 +1,24 @@ -package main +package syncer import ( "context" + "time" "github.com/filecoin-project/lotus/chain/types" "github.com/ipfs/go-cid" - - aapi "github.com/filecoin-project/lotus/api" ) -func subBlocks(ctx context.Context, api aapi.FullNode, st *storage) { - sub, err := api.SyncIncomingBlocks(ctx) +func (s *Syncer) subBlocks(ctx context.Context) { + sub, err := s.node.SyncIncomingBlocks(ctx) if err != nil { log.Error(err) return } for bh := range sub { - err := st.storeHeaders(map[cid.Cid]*types.BlockHeader{ + err := s.storeHeaders(map[cid.Cid]*types.BlockHeader{ bh.Cid(): bh, - }, false) + }, false, time.Now()) if err != nil { log.Errorf("%+v", err) } diff --git a/cmd/lotus-chainwatch/syncer/sync.go b/cmd/lotus-chainwatch/syncer/sync.go new file mode 100644 index 000000000..83577ebf5 --- /dev/null +++ b/cmd/lotus-chainwatch/syncer/sync.go @@ -0,0 +1,446 @@ +package syncer + +import ( + "container/list" + "context" + "database/sql" + "sync" + "time" + + "golang.org/x/xerrors" + + "github.com/ipfs/go-cid" + logging "github.com/ipfs/go-log/v2" + + "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/chain/store" + "github.com/filecoin-project/lotus/chain/types" +) + +var log = logging.Logger("syncer") + +type Syncer struct { + db *sql.DB + + headerLk sync.Mutex + node api.FullNode +} + +func NewSyncer(db *sql.DB, node api.FullNode) *Syncer { + return &Syncer{ + db: db, + node: node, + } +} + +func (s *Syncer) setupSchemas() error { + tx, err := s.db.Begin() + if err != nil { + return err + } + + if _, err := tx.Exec(` +create table if not exists block_cids +( + cid text not null + constraint block_cids_pk + primary key +); + +create unique index if not exists block_cids_cid_uindex + on block_cids (cid); + +create table if not exists blocks_synced +( + cid text not null + constraint blocks_synced_pk + primary key + constraint blocks_block_cids_cid_fk + references block_cids (cid), + synced_at int not null, + processed_at int +); + +create unique index if not exists blocks_synced_cid_uindex + on blocks_synced (cid,processed_at); + +create table if not exists block_parents +( + block text not null + constraint blocks_block_cids_cid_fk + references block_cids (cid), + parent text not null +); + +create unique index if not exists block_parents_block_parent_uindex + on block_parents (block, parent); + +create table if not exists drand_entries +( + round bigint not null + constraint drand_entries_pk + primary key, + data bytea not null +); +create unique index if not exists drand_entries_round_uindex + on drand_entries (round); + +create table if not exists block_drand_entries +( + round bigint not null + constraint block_drand_entries_drand_entries_round_fk + references drand_entries (round), + block text not null + constraint blocks_block_cids_cid_fk + references block_cids (cid) +); +create unique index if not exists block_drand_entries_round_uindex + on block_drand_entries (round, block); + +create table if not exists blocks +( + cid text not null + constraint blocks_pk + primary key + constraint blocks_block_cids_cid_fk + references block_cids (cid), + parentWeight numeric not null, + parentStateRoot text not null, + height bigint not null, + miner text not null, + timestamp bigint not null, + ticket bytea not null, + eprof bytea, + forksig bigint not null +); + +create unique index if not exists block_cid_uindex + on blocks (cid,height); + +create materialized view if not exists state_heights + as select distinct height, parentstateroot from blocks; + +create index if not exists state_heights_index + on state_heights (height); + +create index if not exists state_heights_height_index + on state_heights (parentstateroot); +`); err != nil { + return err + } + + return tx.Commit() +} + +func (s *Syncer) Start(ctx context.Context) { + log.Info("Starting Syncer") + + if err := s.setupSchemas(); err != nil { + log.Fatal(err) + } + + // doing the initial sync here lets us avoid the HCCurrent case in the switch + head, err := s.node.ChainHead(ctx) + if err != nil { + log.Fatalw("Failed to get chain head form lotus", "error", err) + } + + unsynced, err := s.unsyncedBlocks(ctx, head, time.Unix(0, 0)) + if err != nil { + log.Fatalw("failed to gather unsynced blocks", "error", err) + } + + if err := s.storeHeaders(unsynced, true, time.Now()); err != nil { + log.Fatalw("failed to store unsynced blocks", "error", err) + } + + // continue to keep the block headers table up to date. + notifs, err := s.node.ChainNotify(ctx) + if err != nil { + log.Fatal(err) + } + + lastSynced := time.Now() + go func() { + for notif := range notifs { + for _, change := range notif { + switch change.Type { + case store.HCApply: + unsynced, err := s.unsyncedBlocks(ctx, change.Val, lastSynced) + if err != nil { + log.Errorw("failed to gather unsynced blocks", "error", err) + } + + if len(unsynced) == 0 { + continue + } + + if err := s.storeHeaders(unsynced, true, lastSynced); err != nil { + // so this is pretty bad, need some kind of retry.. + // for now just log an error and the blocks will be attempted again on next notifi + log.Errorw("failed to store unsynced blocks", "error", err) + } + + lastSynced = time.Now() + case store.HCRevert: + log.Debug("revert todo") + } + } + } + }() +} + +func (s *Syncer) unsyncedBlocks(ctx context.Context, head *types.TipSet, since time.Time) (map[cid.Cid]*types.BlockHeader, error) { + // get a list of blocks we have already synced in the past 3 mins. This ensures we aren't returning the entire + // table every time. + lookback := since.Add(-(time.Minute * 3)) + log.Debugw("Gathering unsynced blocks", "since", lookback.String()) + hasList, err := s.syncedBlocks(lookback) + if err != nil { + return nil, err + } + + // build a list of blocks that we have not synced. + toVisit := list.New() + for _, header := range head.Blocks() { + toVisit.PushBack(header) + } + + toSync := map[cid.Cid]*types.BlockHeader{} + + for toVisit.Len() > 0 { + bh := toVisit.Remove(toVisit.Back()).(*types.BlockHeader) + _, has := hasList[bh.Cid()] + if _, seen := toSync[bh.Cid()]; seen || has { + continue + } + + toSync[bh.Cid()] = bh + if len(toSync)%500 == 10 { + log.Infow("To visit", "toVisit", toVisit.Len(), "toSync", len(toSync), "current_height", bh.Height) + } + + if len(bh.Parents) == 0 { + continue + } + + pts, err := s.node.ChainGetTipSet(ctx, types.NewTipSetKey(bh.Parents...)) + if err != nil { + log.Error(err) + continue + } + + for _, header := range pts.Blocks() { + toVisit.PushBack(header) + } + } + log.Debugw("Gathered unsynced blocks", "count", len(toSync)) + return toSync, nil +} + +func (s *Syncer) syncedBlocks(timestamp time.Time) (map[cid.Cid]struct{}, error) { + // timestamp is used to return a configurable amount of rows based on when they were last added. + rws, err := s.db.Query(`select cid FROM blocks_synced where synced_at > $1`, timestamp.Unix()) + if err != nil { + return nil, xerrors.Errorf("Failed to query blocks_synced: %w", err) + } + out := map[cid.Cid]struct{}{} + + for rws.Next() { + var c string + if err := rws.Scan(&c); err != nil { + return nil, xerrors.Errorf("Failed to scan blocks_synced: %w", err) + } + + ci, err := cid.Parse(c) + if err != nil { + return nil, xerrors.Errorf("Failed to parse blocks_synced: %w", err) + } + + out[ci] = struct{}{} + } + return out, nil +} + +func (s *Syncer) storeHeaders(bhs map[cid.Cid]*types.BlockHeader, sync bool, timestamp time.Time) error { + s.headerLk.Lock() + defer s.headerLk.Unlock() + if len(bhs) == 0 { + return nil + } + log.Debugw("Storing Headers", "count", len(bhs)) + + tx, err := s.db.Begin() + if err != nil { + return xerrors.Errorf("begin: %w", err) + } + + if _, err := tx.Exec(` + +create temp table bc (like block_cids excluding constraints) on commit drop; +create temp table de (like drand_entries excluding constraints) on commit drop; +create temp table bde (like block_drand_entries excluding constraints) on commit drop; +create temp table tbp (like block_parents excluding constraints) on commit drop; +create temp table bs (like blocks_synced excluding constraints) on commit drop; +create temp table b (like blocks excluding constraints) on commit drop; + + +`); err != nil { + return xerrors.Errorf("prep temp: %w", err) + } + + { + stmt, err := tx.Prepare(`copy bc (cid) from STDIN`) + if err != nil { + return err + } + + for _, bh := range bhs { + if _, err := stmt.Exec(bh.Cid().String()); err != nil { + log.Error(err) + } + } + + if err := stmt.Close(); err != nil { + return err + } + + if _, err := tx.Exec(`insert into block_cids select * from bc on conflict do nothing `); err != nil { + return xerrors.Errorf("drand entries put: %w", err) + } + } + + { + stmt, err := tx.Prepare(`copy de (round, data) from STDIN`) + if err != nil { + return err + } + + for _, bh := range bhs { + for _, ent := range bh.BeaconEntries { + if _, err := stmt.Exec(ent.Round, ent.Data); err != nil { + log.Error(err) + } + } + } + + if err := stmt.Close(); err != nil { + return err + } + + if _, err := tx.Exec(`insert into drand_entries select * from de on conflict do nothing `); err != nil { + return xerrors.Errorf("drand entries put: %w", err) + } + } + + { + stmt, err := tx.Prepare(`copy bde (round, block) from STDIN`) + if err != nil { + return err + } + + for _, bh := range bhs { + for _, ent := range bh.BeaconEntries { + if _, err := stmt.Exec(ent.Round, bh.Cid().String()); err != nil { + log.Error(err) + } + } + } + + if err := stmt.Close(); err != nil { + return err + } + + if _, err := tx.Exec(`insert into block_drand_entries select * from bde on conflict do nothing `); err != nil { + return xerrors.Errorf("block drand entries put: %w", err) + } + } + + { + stmt, err := tx.Prepare(`copy tbp (block, parent) from STDIN`) + if err != nil { + return err + } + + for _, bh := range bhs { + for _, parent := range bh.Parents { + if _, err := stmt.Exec(bh.Cid().String(), parent.String()); err != nil { + log.Error(err) + } + } + } + + if err := stmt.Close(); err != nil { + return err + } + + if _, err := tx.Exec(`insert into block_parents select * from tbp on conflict do nothing `); err != nil { + return xerrors.Errorf("parent put: %w", err) + } + } + + if sync { + + stmt, err := tx.Prepare(`copy bs (cid, synced_at) from stdin `) + if err != nil { + return err + } + + for _, bh := range bhs { + if _, err := stmt.Exec(bh.Cid().String(), timestamp.Unix()); err != nil { + log.Error(err) + } + } + + if err := stmt.Close(); err != nil { + return err + } + + if _, err := tx.Exec(`insert into blocks_synced select * from bs on conflict do nothing `); err != nil { + return xerrors.Errorf("syncd put: %w", err) + } + } + + stmt2, err := tx.Prepare(`copy b (cid, parentWeight, parentStateRoot, height, miner, "timestamp", ticket, eprof, forksig) from stdin`) + if err != nil { + return err + } + + for _, bh := range bhs { + var eprof interface{} + if bh.ElectionProof != nil { + eprof = bh.ElectionProof.VRFProof + } + + if bh.Ticket == nil { + log.Warnf("got a block with nil ticket") + + bh.Ticket = &types.Ticket{ + VRFProof: []byte{}, + } + } + + if _, err := stmt2.Exec( + bh.Cid().String(), + bh.ParentWeight.String(), + bh.ParentStateRoot.String(), + bh.Height, + bh.Miner.String(), + bh.Timestamp, + bh.Ticket.VRFProof, + eprof, + bh.ForkSignaling); err != nil { + log.Error(err) + } + } + + if err := stmt2.Close(); err != nil { + return xerrors.Errorf("s2 close: %w", err) + } + + if _, err := tx.Exec(`insert into blocks select * from b on conflict do nothing `); err != nil { + return xerrors.Errorf("blk put: %w", err) + } + + return tx.Commit() +} diff --git a/cmd/lotus-chainwatch/util/contextStore.go b/cmd/lotus-chainwatch/util/contextStore.go new file mode 100644 index 000000000..bd812581b --- /dev/null +++ b/cmd/lotus-chainwatch/util/contextStore.go @@ -0,0 +1,51 @@ +package util + +import ( + "bytes" + "context" + "fmt" + + "github.com/ipfs/go-cid" + cbg "github.com/whyrusleeping/cbor-gen" + + "github.com/filecoin-project/lotus/api" +) + +// TODO extract this to a common location in lotus and reuse the code + +// APIIpldStore is required for AMT and HAMT access. +type APIIpldStore struct { + ctx context.Context + api api.FullNode +} + +func NewAPIIpldStore(ctx context.Context, api api.FullNode) *APIIpldStore { + return &APIIpldStore{ + ctx: ctx, + api: api, + } +} + +func (ht *APIIpldStore) Context() context.Context { + return ht.ctx +} + +func (ht *APIIpldStore) Get(ctx context.Context, c cid.Cid, out interface{}) error { + raw, err := ht.api.ChainReadObj(ctx, c) + if err != nil { + return err + } + + cu, ok := out.(cbg.CBORUnmarshaler) + if ok { + if err := cu.UnmarshalCBOR(bytes.NewReader(raw)); err != nil { + return err + } + return nil + } + return fmt.Errorf("Object does not implement CBORUnmarshaler: %T", out) +} + +func (ht *APIIpldStore) Put(ctx context.Context, v interface{}) (cid.Cid, error) { + return cid.Undef, fmt.Errorf("Put is not implemented on APIIpldStore") +} diff --git a/go.mod b/go.mod index 1e2e5e869..60b0aaf3f 100644 --- a/go.mod +++ b/go.mod @@ -75,7 +75,7 @@ require ( github.com/ipld/go-car v0.1.1-0.20200526133713-1c7508d55aae github.com/ipld/go-ipld-prime v0.0.2-0.20200428162820-8b59dc292b8e github.com/kelseyhightower/envconfig v1.4.0 - github.com/lib/pq v1.2.0 + github.com/lib/pq v1.7.0 github.com/libp2p/go-eventbus v0.2.1 github.com/libp2p/go-libp2p v0.10.0 github.com/libp2p/go-libp2p-connmgr v0.2.4 @@ -118,6 +118,7 @@ require ( go.uber.org/multierr v1.5.0 go.uber.org/zap v1.15.0 go4.org v0.0.0-20190313082347-94abd6928b1d // indirect + golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a golang.org/x/sys v0.0.0-20200602225109-6fdc65e7d980 golang.org/x/time v0.0.0-20191024005414-555d28b269f0 golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 diff --git a/go.sum b/go.sum index 35885247f..9324d35e9 100644 --- a/go.sum +++ b/go.sum @@ -702,8 +702,8 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/pty v1.1.3/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= -github.com/lib/pq v1.2.0 h1:LXpIM/LZ5xGFhOpXAQUIMM1HdyqzVYM13zNdjCEEcA0= -github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= +github.com/lib/pq v1.7.0 h1:h93mCPfUSkaul3Ka/VG8uZdmW1uMHDGxzu0NWHuJmHY= +github.com/lib/pq v1.7.0/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/libp2p/go-addr-util v0.0.1/go.mod h1:4ac6O7n9rIAKB1dnd+s8IbbMXkt+oBpzX4/+RACcnlQ= github.com/libp2p/go-addr-util v0.0.2 h1:7cWK5cdA5x72jX0g8iLrQWm5TRJZ6CzGdPEhWj7plWU= github.com/libp2p/go-addr-util v0.0.2/go.mod h1:Ecd6Fb3yIuLzq4bD7VcywcVSBtefcAwnUISBM3WG15E=