lotus/chain/store/store.go

1056 lines
23 KiB
Go
Raw Normal View History

2019-07-26 04:54:22 +00:00
package store
2019-07-05 14:29:17 +00:00
import (
2020-01-16 18:05:07 +00:00
"bytes"
2019-07-05 14:29:17 +00:00
"context"
"crypto/sha256"
"encoding/binary"
"encoding/json"
2020-01-16 18:05:07 +00:00
"io"
2019-07-05 14:29:17 +00:00
"sync"
"github.com/filecoin-project/go-address"
2020-02-08 02:18:32 +00:00
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/specs-actors/actors/util/adt"
"go.opencensus.io/trace"
"go.uber.org/multierr"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/state"
"github.com/filecoin-project/lotus/chain/vm"
2019-09-30 23:55:35 +00:00
amt "github.com/filecoin-project/go-amt-ipld/v2"
2020-02-08 02:18:32 +00:00
"github.com/filecoin-project/lotus/chain/types"
2019-07-08 12:51:45 +00:00
lru "github.com/hashicorp/golang-lru"
2019-07-26 04:54:22 +00:00
block "github.com/ipfs/go-block-format"
2020-01-16 18:05:07 +00:00
"github.com/ipfs/go-blockservice"
car "github.com/ipfs/go-car"
2019-07-05 14:29:17 +00:00
"github.com/ipfs/go-cid"
dstore "github.com/ipfs/go-datastore"
2020-01-16 18:05:07 +00:00
blockstore "github.com/ipfs/go-ipfs-blockstore"
2019-07-05 14:29:17 +00:00
bstore "github.com/ipfs/go-ipfs-blockstore"
2020-02-04 22:19:05 +00:00
cbor "github.com/ipfs/go-ipld-cbor"
2020-01-16 18:05:07 +00:00
format "github.com/ipfs/go-ipld-format"
logging "github.com/ipfs/go-log/v2"
2020-01-16 18:05:07 +00:00
dag "github.com/ipfs/go-merkledag"
2019-09-17 01:56:37 +00:00
cbg "github.com/whyrusleeping/cbor-gen"
2019-07-05 14:29:17 +00:00
pubsub "github.com/whyrusleeping/pubsub"
2019-09-17 01:56:37 +00:00
"golang.org/x/xerrors"
2019-07-05 14:29:17 +00:00
)
2019-07-26 04:54:22 +00:00
var log = logging.Logger("chainstore")
2019-07-05 14:29:17 +00:00
var chainHeadKey = dstore.NewKey("head")
2019-07-05 14:29:17 +00:00
type ChainStore struct {
bs bstore.Blockstore
ds dstore.Datastore
2019-07-05 14:29:17 +00:00
heaviestLk sync.Mutex
2019-07-26 04:54:22 +00:00
heaviest *types.TipSet
2019-07-05 14:29:17 +00:00
bestTips *pubsub.PubSub
2019-09-17 22:43:47 +00:00
pubLk sync.Mutex
2019-07-05 14:29:17 +00:00
tstLk sync.Mutex
2020-02-08 02:18:32 +00:00
tipsets map[abi.ChainEpoch][]cid.Cid
reorgCh chan<- reorg
2019-07-26 04:54:22 +00:00
headChangeNotifs []func(rev, app []*types.TipSet) error
mmCache *lru.ARCCache
2019-12-16 19:22:56 +00:00
tsCache *lru.ARCCache
2020-01-13 20:47:27 +00:00
vmcalls *types.VMSyscalls
2019-07-05 14:29:17 +00:00
}
2020-01-13 20:47:27 +00:00
func NewChainStore(bs bstore.Blockstore, ds dstore.Batching, vmcalls *types.VMSyscalls) *ChainStore {
c, _ := lru.NewARC(2048)
2019-12-16 19:22:56 +00:00
tsc, _ := lru.NewARC(4096)
cs := &ChainStore{
2019-07-05 14:29:17 +00:00
bs: bs,
ds: ds,
bestTips: pubsub.New(64),
2020-02-08 02:18:32 +00:00
tipsets: make(map[abi.ChainEpoch][]cid.Cid),
mmCache: c,
2019-12-16 19:22:56 +00:00
tsCache: tsc,
2020-01-13 20:47:27 +00:00
vmcalls: vmcalls,
2019-07-05 14:29:17 +00:00
}
cs.reorgCh = cs.reorgWorker(context.TODO())
hcnf := func(rev, app []*types.TipSet) error {
2019-09-17 22:43:47 +00:00
cs.pubLk.Lock()
defer cs.pubLk.Unlock()
2019-09-18 11:01:52 +00:00
notif := make([]*HeadChange, len(rev)+len(app))
for i, r := range rev {
notif[i] = &HeadChange{
Type: HCRevert,
Val: r,
2019-09-18 11:01:52 +00:00
}
}
2019-09-18 11:01:52 +00:00
for i, r := range app {
notif[i+len(rev)] = &HeadChange{
Type: HCApply,
Val: r,
2019-09-18 11:01:52 +00:00
}
}
2019-09-18 11:01:52 +00:00
cs.bestTips.Pub(notif, "headchange")
return nil
}
cs.headChangeNotifs = append(cs.headChangeNotifs, hcnf)
return cs
2019-07-05 14:29:17 +00:00
}
func (cs *ChainStore) Load() error {
head, err := cs.ds.Get(chainHeadKey)
2019-07-24 21:10:27 +00:00
if err == dstore.ErrNotFound {
log.Warn("no previous chain state found")
return nil
}
if err != nil {
return xerrors.Errorf("failed to load chain state from datastore: %w", err)
}
var tscids []cid.Cid
if err := json.Unmarshal(head, &tscids); err != nil {
return xerrors.Errorf("failed to unmarshal stored chain head: %w", err)
}
2019-12-16 19:22:56 +00:00
ts, err := cs.LoadTipSet(types.NewTipSetKey(tscids...))
if err != nil {
2019-09-30 23:55:35 +00:00
return xerrors.Errorf("loading tipset: %w", err)
}
cs.heaviest = ts
return nil
}
2019-07-26 04:54:22 +00:00
func (cs *ChainStore) writeHead(ts *types.TipSet) error {
data, err := json.Marshal(ts.Cids())
if err != nil {
return xerrors.Errorf("failed to marshal tipset: %w", err)
}
if err := cs.ds.Put(chainHeadKey, data); err != nil {
return xerrors.Errorf("failed to write chain head to datastore: %w", err)
}
return nil
}
const (
2019-09-17 22:43:47 +00:00
HCRevert = "revert"
HCApply = "apply"
HCCurrent = "current"
)
type HeadChange struct {
Type string
2019-07-26 04:54:22 +00:00
Val *types.TipSet
}
2019-09-18 11:01:52 +00:00
func (cs *ChainStore) SubHeadChanges(ctx context.Context) chan []*HeadChange {
2019-09-17 22:43:47 +00:00
cs.pubLk.Lock()
subch := cs.bestTips.Sub("headchange")
2019-09-17 22:43:47 +00:00
head := cs.GetHeaviestTipSet()
cs.pubLk.Unlock()
2019-09-18 11:01:52 +00:00
out := make(chan []*HeadChange, 16)
out <- []*HeadChange{{
2019-09-17 22:43:47 +00:00
Type: HCCurrent,
Val: head,
2019-09-18 11:01:52 +00:00
}}
2019-09-17 22:43:47 +00:00
go func() {
defer close(out)
var unsubOnce sync.Once
for {
select {
case val, ok := <-subch:
if !ok {
2019-08-31 01:03:10 +00:00
log.Warn("chain head sub exit loop")
return
}
2019-09-18 11:01:52 +00:00
if len(out) > 0 {
log.Warnf("head change sub is slow, has %d buffered entries", len(out))
}
2019-08-31 01:03:10 +00:00
select {
2019-09-18 11:01:52 +00:00
case out <- val.([]*HeadChange):
2019-08-31 01:03:10 +00:00
case <-ctx.Done():
}
case <-ctx.Done():
unsubOnce.Do(func() {
go cs.bestTips.Unsub(subch)
})
}
}
}()
return out
}
2019-07-26 04:54:22 +00:00
func (cs *ChainStore) SubscribeHeadChanges(f func(rev, app []*types.TipSet) error) {
cs.headChangeNotifs = append(cs.headChangeNotifs, f)
2019-07-05 14:29:17 +00:00
}
func (cs *ChainStore) SetGenesis(b *types.BlockHeader) error {
ts, err := types.NewTipSet([]*types.BlockHeader{b})
if err != nil {
return err
2019-07-05 14:29:17 +00:00
}
2019-10-15 04:33:29 +00:00
if err := cs.PutTipSet(context.TODO(), ts); err != nil {
2019-07-05 14:29:17 +00:00
return err
}
return cs.ds.Put(dstore.NewKey("0"), b.Cid().Bytes())
2019-07-05 14:29:17 +00:00
}
2019-10-15 04:33:29 +00:00
func (cs *ChainStore) PutTipSet(ctx context.Context, ts *types.TipSet) error {
for _, b := range ts.Blocks() {
2019-11-12 10:18:46 +00:00
if err := cs.PersistBlockHeaders(b); err != nil {
2019-07-05 14:29:17 +00:00
return err
}
}
expanded, err := cs.expandTipset(ts.Blocks()[0])
if err != nil {
return xerrors.Errorf("errored while expanding tipset: %w", err)
}
log.Debugf("expanded %s into %s\n", ts.Cids(), expanded.Cids())
2019-10-15 04:33:29 +00:00
if err := cs.MaybeTakeHeavierTipSet(ctx, expanded); err != nil {
return xerrors.Errorf("MaybeTakeHeavierTipSet failed in PutTipSet: %w", err)
}
2019-07-05 14:29:17 +00:00
return nil
}
2019-10-15 04:33:29 +00:00
func (cs *ChainStore) MaybeTakeHeavierTipSet(ctx context.Context, ts *types.TipSet) error {
2019-07-05 14:29:17 +00:00
cs.heaviestLk.Lock()
defer cs.heaviestLk.Unlock()
2019-10-15 04:33:29 +00:00
w, err := cs.Weight(ctx, ts)
if err != nil {
return err
}
heaviestW, err := cs.Weight(ctx, cs.heaviest)
if err != nil {
return err
}
if w.GreaterThan(heaviestW) {
2019-07-31 07:13:49 +00:00
// TODO: don't do this for initial sync. Now that we don't have a
// difference between 'bootstrap sync' and 'caught up' sync, we need
// some other heuristic.
return cs.takeHeaviestTipSet(ctx, ts)
2019-10-10 03:50:50 +00:00
}
return nil
}
type reorg struct {
old *types.TipSet
new *types.TipSet
}
func (cs *ChainStore) reorgWorker(ctx context.Context) chan<- reorg {
out := make(chan reorg, 32)
go func() {
defer log.Warn("reorgWorker quit")
for {
select {
case r := <-out:
revert, apply, err := cs.ReorgOps(r.old, r.new)
if err != nil {
log.Error("computing reorg ops failed: ", err)
continue
}
// reverse the apply array
for i := len(apply)/2 - 1; i >= 0; i-- {
opp := len(apply) - 1 - i
apply[i], apply[opp] = apply[opp], apply[i]
}
for _, hcf := range cs.headChangeNotifs {
if err := hcf(revert, apply); err != nil {
log.Error("head change func errored (BAD): ", err)
}
}
case <-ctx.Done():
return
}
}
}()
return out
}
func (cs *ChainStore) takeHeaviestTipSet(ctx context.Context, ts *types.TipSet) error {
2019-12-05 05:14:19 +00:00
_, span := trace.StartSpan(ctx, "takeHeaviestTipSet")
defer span.End()
if cs.heaviest != nil { // buf
if len(cs.reorgCh) > 0 {
log.Warnf("Reorg channel running behind, %d reorgs buffered", len(cs.reorgCh))
2019-10-10 03:50:50 +00:00
}
cs.reorgCh <- reorg{
old: cs.heaviest,
new: ts,
}
2019-10-10 03:50:50 +00:00
} else {
2019-10-16 08:01:41 +00:00
log.Warnf("no heaviest tipset found, using %s", ts.Cids())
2019-10-10 03:50:50 +00:00
}
span.AddAttributes(trace.BoolAttribute("newHead", true))
2019-12-03 23:04:52 +00:00
log.Infof("New heaviest tipset! %s (height=%d)", ts.Cids(), ts.Height())
2019-10-10 03:50:50 +00:00
cs.heaviest = ts
2019-10-10 03:50:50 +00:00
if err := cs.writeHead(ts); err != nil {
log.Errorf("failed to write chain head: %s", err)
return nil
2019-07-05 14:29:17 +00:00
}
2019-10-10 03:50:50 +00:00
2019-07-05 14:29:17 +00:00
return nil
}
2019-10-10 03:50:50 +00:00
// SetHead sets the chainstores current 'best' head node.
// This should only be called if something is broken and needs fixing
func (cs *ChainStore) SetHead(ts *types.TipSet) error {
cs.heaviestLk.Lock()
defer cs.heaviestLk.Unlock()
return cs.takeHeaviestTipSet(context.TODO(), ts)
2019-10-10 03:50:50 +00:00
}
2019-07-26 04:54:22 +00:00
func (cs *ChainStore) Contains(ts *types.TipSet) (bool, error) {
for _, c := range ts.Cids() {
2019-07-05 14:29:17 +00:00
has, err := cs.bs.Has(c)
if err != nil {
return false, err
}
if !has {
return false, nil
}
}
return true, nil
}
func (cs *ChainStore) GetBlock(c cid.Cid) (*types.BlockHeader, error) {
2019-07-05 14:29:17 +00:00
sb, err := cs.bs.Get(c)
if err != nil {
return nil, err
}
return types.DecodeBlock(sb.RawData())
2019-07-05 14:29:17 +00:00
}
2019-12-16 19:22:56 +00:00
func (cs *ChainStore) LoadTipSet(tsk types.TipSetKey) (*types.TipSet, error) {
v, ok := cs.tsCache.Get(tsk)
if ok {
return v.(*types.TipSet), nil
}
var blks []*types.BlockHeader
2019-12-16 19:22:56 +00:00
for _, c := range tsk.Cids() {
2019-07-05 14:29:17 +00:00
b, err := cs.GetBlock(c)
if err != nil {
2019-09-30 23:55:35 +00:00
return nil, xerrors.Errorf("get block %s: %w", c, err)
2019-07-05 14:29:17 +00:00
}
blks = append(blks, b)
}
2019-12-16 19:22:56 +00:00
ts, err := types.NewTipSet(blks)
if err != nil {
return nil, err
}
cs.tsCache.Add(tsk, ts)
return ts, nil
2019-07-05 14:29:17 +00:00
}
// returns true if 'a' is an ancestor of 'b'
2019-07-26 04:54:22 +00:00
func (cs *ChainStore) IsAncestorOf(a, b *types.TipSet) (bool, error) {
2019-07-05 14:29:17 +00:00
if b.Height() <= a.Height() {
return false, nil
}
cur := b
for !a.Equals(cur) && cur.Height() > a.Height() {
next, err := cs.LoadTipSet(b.Parents())
if err != nil {
return false, err
}
cur = next
}
return cur.Equals(a), nil
}
2019-07-26 04:54:22 +00:00
func (cs *ChainStore) NearestCommonAncestor(a, b *types.TipSet) (*types.TipSet, error) {
2019-07-05 14:29:17 +00:00
l, _, err := cs.ReorgOps(a, b)
if err != nil {
return nil, err
}
return cs.LoadTipSet(l[len(l)-1].Parents())
}
2019-07-26 04:54:22 +00:00
func (cs *ChainStore) ReorgOps(a, b *types.TipSet) ([]*types.TipSet, []*types.TipSet, error) {
2019-07-05 14:29:17 +00:00
left := a
right := b
2019-07-26 04:54:22 +00:00
var leftChain, rightChain []*types.TipSet
2019-07-05 14:29:17 +00:00
for !left.Equals(right) {
if left.Height() > right.Height() {
leftChain = append(leftChain, left)
par, err := cs.LoadTipSet(left.Parents())
if err != nil {
return nil, nil, err
}
left = par
} else {
rightChain = append(rightChain, right)
par, err := cs.LoadTipSet(right.Parents())
if err != nil {
2019-07-31 07:13:49 +00:00
log.Infof("failed to fetch right.Parents: %s", err)
2019-07-05 14:29:17 +00:00
return nil, nil, err
}
right = par
}
}
return leftChain, rightChain, nil
}
2019-07-26 04:54:22 +00:00
func (cs *ChainStore) GetHeaviestTipSet() *types.TipSet {
2019-07-05 14:29:17 +00:00
cs.heaviestLk.Lock()
defer cs.heaviestLk.Unlock()
return cs.heaviest
}
2019-10-10 03:04:10 +00:00
func (cs *ChainStore) AddToTipSetTracker(b *types.BlockHeader) error {
cs.tstLk.Lock()
defer cs.tstLk.Unlock()
tss := cs.tipsets[b.Height]
for _, oc := range tss {
if oc == b.Cid() {
2019-09-25 13:38:59 +00:00
log.Debug("tried to add block to tipset tracker that was already there")
return nil
}
}
cs.tipsets[b.Height] = append(tss, b.Cid())
// TODO: do we want to look for slashable submissions here? might as well...
return nil
}
func (cs *ChainStore) PersistBlockHeaders(b ...*types.BlockHeader) error {
2019-11-12 10:18:46 +00:00
sbs := make([]block.Block, len(b))
for i, header := range b {
var err error
2019-11-12 10:18:46 +00:00
sbs[i], err = header.ToStorageBlock()
if err != nil {
return err
}
2019-07-05 14:29:17 +00:00
}
batchSize := 256
calls := len(b) / batchSize
var err error
for i := 0; i <= calls; i++ {
start := batchSize * i
end := start + batchSize
if end > len(b) {
end = len(b)
}
err = multierr.Append(err, cs.bs.PutMany(sbs[start:end]))
}
return err
2019-07-05 14:29:17 +00:00
}
2019-07-26 04:54:22 +00:00
type storable interface {
ToStorageBlock() (block.Block, error)
}
2019-11-19 03:24:48 +00:00
func PutMessage(bs bstore.Blockstore, m storable) (cid.Cid, error) {
2019-07-31 07:13:49 +00:00
b, err := m.ToStorageBlock()
2019-07-05 14:29:17 +00:00
if err != nil {
2019-07-26 04:54:22 +00:00
return cid.Undef, err
2019-07-05 14:29:17 +00:00
}
2019-07-31 07:13:49 +00:00
if err := bs.Put(b); err != nil {
return cid.Undef, err
}
return b.Cid(), nil
}
func (cs *ChainStore) PutMessage(m storable) (cid.Cid, error) {
return PutMessage(cs.bs, m)
2019-07-05 14:29:17 +00:00
}
func (cs *ChainStore) expandTipset(b *types.BlockHeader) (*types.TipSet, error) {
// Hold lock for the whole function for now, if it becomes a problem we can
// fix pretty easily
cs.tstLk.Lock()
defer cs.tstLk.Unlock()
all := []*types.BlockHeader{b}
tsets, ok := cs.tipsets[b.Height]
if !ok {
return types.NewTipSet(all)
}
inclMiners := map[address.Address]bool{b.Miner: true}
for _, bhc := range tsets {
if bhc == b.Cid() {
continue
}
h, err := cs.GetBlock(bhc)
if err != nil {
return nil, xerrors.Errorf("failed to load block (%s) for tipset expansion: %w", bhc, err)
}
if inclMiners[h.Miner] {
log.Warnf("Have multiple blocks from miner %s at height %d in our tipset cache", h.Miner, h.Height)
continue
}
if types.CidArrsEqual(h.Parents, b.Parents) {
all = append(all, h)
inclMiners[h.Miner] = true
}
}
// TODO: other validation...?
return types.NewTipSet(all)
}
2019-10-15 04:33:29 +00:00
func (cs *ChainStore) AddBlock(ctx context.Context, b *types.BlockHeader) error {
2019-11-12 10:18:46 +00:00
if err := cs.PersistBlockHeaders(b); err != nil {
2019-07-05 14:29:17 +00:00
return err
}
ts, err := cs.expandTipset(b)
if err != nil {
return err
}
2019-10-15 04:33:29 +00:00
if err := cs.MaybeTakeHeavierTipSet(ctx, ts); err != nil {
return xerrors.Errorf("MaybeTakeHeavierTipSet failed: %w", err)
}
2019-07-05 14:29:17 +00:00
return nil
}
func (cs *ChainStore) GetGenesis() (*types.BlockHeader, error) {
data, err := cs.ds.Get(dstore.NewKey("0"))
2019-07-05 14:29:17 +00:00
if err != nil {
return nil, err
}
c, err := cid.Cast(data)
if err != nil {
return nil, err
}
genb, err := cs.bs.Get(c)
if err != nil {
return nil, err
}
return types.DecodeBlock(genb.RawData())
2019-07-05 14:29:17 +00:00
}
func (cs *ChainStore) GetCMessage(c cid.Cid) (ChainMsg, error) {
m, err := cs.GetMessage(c)
if err == nil {
return m, nil
}
2019-11-24 16:35:50 +00:00
if err != bstore.ErrNotFound {
log.Warn("GetCMessage: unexpected error getting unsigned message: %s", err)
}
return cs.GetSignedMessage(c)
}
func (cs *ChainStore) GetMessage(c cid.Cid) (*types.Message, error) {
sb, err := cs.bs.Get(c)
if err != nil {
log.Errorf("get message get failed: %s: %s", c, err)
return nil, err
}
return types.DecodeMessage(sb.RawData())
}
func (cs *ChainStore) GetSignedMessage(c cid.Cid) (*types.SignedMessage, error) {
2019-07-05 14:29:17 +00:00
sb, err := cs.bs.Get(c)
if err != nil {
2019-07-31 07:13:49 +00:00
log.Errorf("get message get failed: %s: %s", c, err)
2019-07-05 14:29:17 +00:00
return nil, err
}
return types.DecodeSignedMessage(sb.RawData())
2019-07-05 14:29:17 +00:00
}
2019-09-17 01:56:37 +00:00
func (cs *ChainStore) readAMTCids(root cid.Cid) ([]cid.Cid, error) {
ctx := context.TODO()
bs := cbor.NewCborStore(cs.bs)
a, err := amt.LoadAMT(ctx, bs, root)
2019-07-05 14:29:17 +00:00
if err != nil {
2019-09-17 01:56:37 +00:00
return nil, xerrors.Errorf("amt load: %w", err)
2019-07-05 14:29:17 +00:00
}
var cids []cid.Cid
2019-09-17 01:56:37 +00:00
for i := uint64(0); i < a.Count; i++ {
var c cbg.CborCid
if err := a.Get(ctx, i, &c); err != nil {
2019-09-17 01:56:37 +00:00
return nil, xerrors.Errorf("failed to load cid from amt: %w", err)
2019-07-05 14:29:17 +00:00
}
2019-09-17 01:56:37 +00:00
cids = append(cids, cid.Cid(c))
2019-07-05 14:29:17 +00:00
}
return cids, nil
}
type ChainMsg interface {
Cid() cid.Cid
VMMessage() *types.Message
ToStorageBlock() (block.Block, error)
}
func (cs *ChainStore) MessagesForTipset(ts *types.TipSet) ([]ChainMsg, error) {
applied := make(map[address.Address]uint64)
balances := make(map[address.Address]types.BigInt)
2020-02-04 22:19:05 +00:00
cst := cbor.NewCborStore(cs.bs)
st, err := state.LoadStateTree(cst, ts.Blocks()[0].ParentStateRoot)
if err != nil {
return nil, xerrors.Errorf("failed to load state tree")
}
preloadAddr := func(a address.Address) error {
if _, ok := applied[a]; !ok {
act, err := st.GetActor(a)
if err != nil {
return err
}
applied[a] = act.Nonce
balances[a] = act.Balance
}
return nil
}
var out []ChainMsg
for _, b := range ts.Blocks() {
bms, sms, err := cs.MessagesForBlock(b)
if err != nil {
return nil, xerrors.Errorf("failed to get messages for block: %w", err)
}
cmsgs := make([]ChainMsg, 0, len(bms)+len(sms))
for _, m := range bms {
cmsgs = append(cmsgs, m)
}
for _, sm := range sms {
cmsgs = append(cmsgs, sm)
}
for _, cm := range cmsgs {
m := cm.VMMessage()
if err := preloadAddr(m.From); err != nil {
return nil, err
}
if applied[m.From] != m.Nonce {
continue
}
applied[m.From]++
if balances[m.From].LessThan(m.RequiredFunds()) {
continue
}
balances[m.From] = types.BigSub(balances[m.From], m.RequiredFunds())
out = append(out, cm)
}
}
return out, nil
}
type mmCids struct {
bls []cid.Cid
secpk []cid.Cid
}
func (cs *ChainStore) readMsgMetaCids(mmc cid.Cid) ([]cid.Cid, []cid.Cid, error) {
o, ok := cs.mmCache.Get(mmc)
if ok {
mmcids := o.(*mmCids)
return mmcids.bls, mmcids.secpk, nil
}
2020-02-04 22:19:05 +00:00
cst := cbor.NewCborStore(cs.bs)
var msgmeta types.MsgMeta
if err := cst.Get(context.TODO(), mmc, &msgmeta); err != nil {
2019-09-06 20:03:28 +00:00
return nil, nil, xerrors.Errorf("failed to load msgmeta: %w", err)
}
2019-09-17 01:56:37 +00:00
blscids, err := cs.readAMTCids(msgmeta.BlsMessages)
if err != nil {
return nil, nil, xerrors.Errorf("loading bls message cids for block: %w", err)
}
2019-09-17 01:56:37 +00:00
secpkcids, err := cs.readAMTCids(msgmeta.SecpkMessages)
if err != nil {
return nil, nil, xerrors.Errorf("loading secpk message cids for block: %w", err)
}
cs.mmCache.Add(mmc, &mmCids{
bls: blscids,
secpk: secpkcids,
})
return blscids, secpkcids, nil
}
func (cs *ChainStore) GetPath(ctx context.Context, from types.TipSetKey, to types.TipSetKey) ([]*HeadChange, error) {
fts, err := cs.LoadTipSet(from)
if err != nil {
return nil, xerrors.Errorf("loading from tipset %s: %w", from, err)
}
tts, err := cs.LoadTipSet(to)
if err != nil {
return nil, xerrors.Errorf("loading to tipset %s: %w", to, err)
}
revert, apply, err := cs.ReorgOps(fts, tts)
if err != nil {
return nil, xerrors.Errorf("error getting tipset branches: %w", err)
}
path := make([]*HeadChange, len(revert)+len(apply))
for i, r := range revert {
path[i] = &HeadChange{Type: HCRevert, Val: r}
}
for j, i := 0, len(apply)-1; i >= 0; j, i = j+1, i-1 {
path[j+len(revert)] = &HeadChange{Type: HCApply, Val: apply[i]}
}
return path, nil
}
func (cs *ChainStore) MessagesForBlock(b *types.BlockHeader) ([]*types.Message, []*types.SignedMessage, error) {
blscids, secpkcids, err := cs.readMsgMetaCids(b.Messages)
if err != nil {
return nil, nil, err
}
blsmsgs, err := cs.LoadMessagesFromCids(blscids)
if err != nil {
return nil, nil, xerrors.Errorf("loading bls messages for block: %w", err)
}
secpkmsgs, err := cs.LoadSignedMessagesFromCids(secpkcids)
if err != nil {
return nil, nil, xerrors.Errorf("loading secpk messages for block: %w", err)
}
return blsmsgs, secpkmsgs, nil
2019-07-05 14:29:17 +00:00
}
func (cs *ChainStore) GetParentReceipt(b *types.BlockHeader, i int) (*types.MessageReceipt, error) {
ctx := context.TODO()
bs := cbor.NewCborStore(cs.bs)
a, err := amt.LoadAMT(ctx, bs, b.ParentMessageReceipts)
if err != nil {
return nil, xerrors.Errorf("amt load: %w", err)
}
var r types.MessageReceipt
if err := a.Get(ctx, uint64(i), &r); err != nil {
return nil, err
}
return &r, nil
}
func (cs *ChainStore) LoadMessagesFromCids(cids []cid.Cid) ([]*types.Message, error) {
msgs := make([]*types.Message, 0, len(cids))
2019-07-31 07:13:49 +00:00
for i, c := range cids {
2019-07-05 14:29:17 +00:00
m, err := cs.GetMessage(c)
if err != nil {
return nil, xerrors.Errorf("failed to get message: (%s):%d: %w", err, c, i)
2019-07-05 14:29:17 +00:00
}
msgs = append(msgs, m)
}
return msgs, nil
}
2019-07-18 20:26:04 +00:00
func (cs *ChainStore) LoadSignedMessagesFromCids(cids []cid.Cid) ([]*types.SignedMessage, error) {
msgs := make([]*types.SignedMessage, 0, len(cids))
for i, c := range cids {
m, err := cs.GetSignedMessage(c)
if err != nil {
return nil, xerrors.Errorf("failed to get message: (%s):%d: %w", err, c, i)
}
msgs = append(msgs, m)
}
return msgs, nil
}
2019-11-19 03:24:48 +00:00
func (cs *ChainStore) Blockstore() bstore.Blockstore {
2019-07-26 04:54:22 +00:00
return cs.bs
}
2020-02-08 02:18:32 +00:00
func ActorStore(ctx context.Context, bs blockstore.Blockstore) adt.Store {
return &astore{
2020-02-08 02:18:37 +00:00
cst: cbor.NewCborStore(bs),
2020-02-08 02:18:32 +00:00
ctx: ctx,
}
}
type astore struct {
cst cbor.IpldStore
ctx context.Context
}
func (a *astore) Context() context.Context {
return a.ctx
}
func (a *astore) Get(ctx context.Context, c cid.Cid, out interface{}) error {
return a.cst.Get(ctx, c, out)
}
func (a *astore) Put(ctx context.Context, v interface{}) (cid.Cid, error) {
return a.cst.Put(ctx, v)
}
func (cs *ChainStore) Store(ctx context.Context) adt.Store {
return ActorStore(ctx, cs.bs)
}
2020-01-13 20:47:27 +00:00
func (cs *ChainStore) VMSys() *types.VMSyscalls {
return cs.vmcalls
}
func (cs *ChainStore) TryFillTipSet(ts *types.TipSet) (*FullTipSet, error) {
var out []*types.FullBlock
for _, b := range ts.Blocks() {
bmsgs, smsgs, err := cs.MessagesForBlock(b)
if err != nil {
2019-08-02 00:57:29 +00:00
// TODO: check for 'not found' errors, and only return nil if this
// is actually a 'not found' error
return nil, nil
}
fb := &types.FullBlock{
Header: b,
BlsMessages: bmsgs,
SecpkMessages: smsgs,
}
out = append(out, fb)
}
return NewFullTipSet(out), nil
}
2019-11-21 22:21:45 +00:00
func drawRandomness(t *types.Ticket, round int64) []byte {
h := sha256.New()
var buf [8]byte
binary.LittleEndian.PutUint64(buf[:], uint64(round))
2019-10-13 09:08:34 +00:00
h.Write(t.VRFProof)
h.Write(buf[:])
2019-09-09 20:03:10 +00:00
return h.Sum(nil)
}
func (cs *ChainStore) GetRandomness(ctx context.Context, blks []cid.Cid, round int64) ([]byte, error) {
2019-12-02 12:51:16 +00:00
_, span := trace.StartSpan(ctx, "store.GetRandomness")
defer span.End()
span.AddAttributes(trace.Int64Attribute("round", round))
for {
2019-12-16 19:22:56 +00:00
nts, err := cs.LoadTipSet(types.NewTipSetKey(blks...))
if err != nil {
return nil, err
}
mtb := nts.MinTicketBlock()
if int64(nts.Height()) <= round {
2019-11-21 22:21:45 +00:00
return drawRandomness(nts.MinTicketBlock().Ticket, round), nil
}
// special case for lookback behind genesis block
// TODO(spec): this is not in the spec, need to sync that
if mtb.Height == 0 {
// round is negative
2019-11-21 22:21:45 +00:00
thash := drawRandomness(mtb.Ticket, round*-1)
// for negative lookbacks, just use the hash of the positive tickethash value
h := sha256.Sum256(thash)
return h[:], nil
}
2019-09-06 20:03:28 +00:00
2019-09-20 12:22:46 +00:00
blks = mtb.Parents
}
}
2019-09-18 03:25:12 +00:00
2020-02-08 02:18:32 +00:00
func (cs *ChainStore) GetTipsetByHeight(ctx context.Context, h abi.ChainEpoch, ts *types.TipSet) (*types.TipSet, error) {
2019-09-18 03:25:12 +00:00
if ts == nil {
ts = cs.GetHeaviestTipSet()
}
if h > ts.Height() {
return nil, xerrors.Errorf("looking for tipset with height less than start point")
}
2019-09-19 20:34:18 +00:00
if ts.Height()-h > build.ForkLengthThreshold {
log.Warnf("expensive call to GetTipsetByHeight, seeking %d levels", ts.Height()-h)
}
2019-09-18 03:25:12 +00:00
for {
pts, err := cs.LoadTipSet(ts.Parents())
if err != nil {
return nil, err
}
if h > pts.Height() {
return ts, nil
}
2019-09-18 03:25:12 +00:00
ts = pts
}
}
2019-10-15 04:33:29 +00:00
2020-01-16 18:05:07 +00:00
func recurseLinks(bs blockstore.Blockstore, root cid.Cid, in []cid.Cid) ([]cid.Cid, error) {
data, err := bs.Get(root)
if err != nil {
return nil, err
}
top, err := cbg.ScanForLinks(bytes.NewReader(data.RawData()))
if err != nil {
return nil, err
}
in = append(in, top...)
for _, c := range top {
var err error
in, err = recurseLinks(bs, c, in)
if err != nil {
return nil, err
}
}
return in, nil
}
func (cs *ChainStore) Export(ctx context.Context, ts *types.TipSet, w io.Writer) error {
2020-01-21 01:53:55 +00:00
if ts == nil {
ts = cs.GetHeaviestTipSet()
}
2020-01-16 18:05:07 +00:00
bsrv := blockservice.New(cs.bs, nil)
dserv := dag.NewDAGService(bsrv)
return car.WriteCarWithWalker(ctx, dserv, ts.Cids(), w, func(nd format.Node) ([]*format.Link, error) {
var b types.BlockHeader
if err := b.UnmarshalCBOR(bytes.NewBuffer(nd.RawData())); err != nil {
return nil, err
}
var out []*format.Link
for _, p := range b.Parents {
out = append(out, &format.Link{Cid: p})
}
cids, err := recurseLinks(cs.bs, b.Messages, nil)
if err != nil {
return nil, err
}
for _, c := range cids {
out = append(out, &format.Link{Cid: c})
}
if b.Height == 0 {
cids, err := recurseLinks(cs.bs, b.ParentStateRoot, nil)
if err != nil {
return nil, err
}
for _, c := range cids {
out = append(out, &format.Link{Cid: c})
}
}
return out, nil
})
}
2020-01-21 01:53:55 +00:00
func (cs *ChainStore) Import(r io.Reader) (*types.TipSet, error) {
header, err := car.LoadCar(cs.Blockstore(), r)
if err != nil {
return nil, xerrors.Errorf("loadcar failed: %w", err)
}
root, err := cs.LoadTipSet(types.NewTipSetKey(header.Roots...))
if err != nil {
return nil, xerrors.Errorf("failed to load root tipset from chainfile: %w", err)
}
return root, nil
}
2019-10-15 04:33:29 +00:00
type chainRand struct {
cs *ChainStore
blks []cid.Cid
2020-02-08 02:18:32 +00:00
bh abi.ChainEpoch
2019-10-15 04:33:29 +00:00
}
2020-02-08 02:18:32 +00:00
func NewChainRand(cs *ChainStore, blks []cid.Cid, bheight abi.ChainEpoch) vm.Rand {
2019-10-15 04:33:29 +00:00
return &chainRand{
cs: cs,
blks: blks,
bh: bheight,
2019-10-15 04:33:29 +00:00
}
}
func (cr *chainRand) GetRandomness(ctx context.Context, round int64) ([]byte, error) {
return cr.cs.GetRandomness(ctx, cr.blks, round)
2019-10-15 04:33:29 +00:00
}