plumb more contexts
This commit is contained in:
parent
908ba3fd5a
commit
8f31112312
@ -99,7 +99,7 @@ func openBlockstore(optsSupplier func(path string) Options) func(tb testing.TB,
|
|||||||
}
|
}
|
||||||
|
|
||||||
func testMove(t *testing.T, optsF func(string) Options) {
|
func testMove(t *testing.T, optsF func(string) Options) {
|
||||||
ctx := context.TODO()
|
ctx := context.Background()
|
||||||
basePath, err := ioutil.TempDir("", "")
|
basePath, err := ioutil.TempDir("", "")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
|
@ -44,7 +44,7 @@ func (s *Suite) RunTests(t *testing.T, prefix string) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *Suite) TestGetWhenKeyNotPresent(t *testing.T) {
|
func (s *Suite) TestGetWhenKeyNotPresent(t *testing.T) {
|
||||||
ctx := context.TODO()
|
ctx := context.Background()
|
||||||
bs, _ := s.NewBlockstore(t)
|
bs, _ := s.NewBlockstore(t)
|
||||||
if c, ok := bs.(io.Closer); ok {
|
if c, ok := bs.(io.Closer); ok {
|
||||||
defer func() { require.NoError(t, c.Close()) }()
|
defer func() { require.NoError(t, c.Close()) }()
|
||||||
|
@ -304,7 +304,7 @@ func (t *TipSetExecutor) ExecuteTipSet(ctx context.Context, sm *stmgr.StateManag
|
|||||||
|
|
||||||
r := rand.NewStateRand(sm.ChainStore(), ts.Cids(), sm.Beacon())
|
r := rand.NewStateRand(sm.ChainStore(), ts.Cids(), sm.Beacon())
|
||||||
|
|
||||||
blkmsgs, err := sm.ChainStore().BlockMsgsForTipset(ts)
|
blkmsgs, err := sm.ChainStore().BlockMsgsForTipset(ctx, ts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return cid.Undef, cid.Undef, xerrors.Errorf("getting block messages for tipset: %w", err)
|
return cid.Undef, cid.Undef, xerrors.Errorf("getting block messages for tipset: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -171,7 +171,7 @@ func (filec *FilecoinEC) ValidateBlock(ctx context.Context, b *types.FullBlock)
|
|||||||
}
|
}
|
||||||
|
|
||||||
if stateroot != h.ParentStateRoot {
|
if stateroot != h.ParentStateRoot {
|
||||||
msgs, err := filec.store.MessagesForTipset(baseTs)
|
msgs, err := filec.store.MessagesForTipset(ctx, baseTs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("failed to load messages for tipset during tipset state mismatch error: ", err)
|
log.Error("failed to load messages for tipset during tipset state mismatch error: ", err)
|
||||||
} else {
|
} else {
|
||||||
@ -528,7 +528,7 @@ func (filec *FilecoinEC) checkBlockMessages(ctx context.Context, b *types.FullBl
|
|||||||
return xerrors.Errorf("block had invalid bls message at index %d: %w", i, err)
|
return xerrors.Errorf("block had invalid bls message at index %d: %w", i, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
c, err := store.PutMessage(tmpbs, m)
|
c, err := store.PutMessage(ctx, tmpbs, m)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("failed to store message %s: %w", m.Cid(), err)
|
return xerrors.Errorf("failed to store message %s: %w", m.Cid(), err)
|
||||||
}
|
}
|
||||||
@ -562,7 +562,7 @@ func (filec *FilecoinEC) checkBlockMessages(ctx context.Context, b *types.FullBl
|
|||||||
return xerrors.Errorf("secpk message %s has invalid signature: %w", m.Cid(), err)
|
return xerrors.Errorf("secpk message %s has invalid signature: %w", m.Cid(), err)
|
||||||
}
|
}
|
||||||
|
|
||||||
c, err := store.PutMessage(tmpbs, m)
|
c, err := store.PutMessage(ctx, tmpbs, m)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("failed to store message %s: %w", m.Cid(), err)
|
return xerrors.Errorf("failed to store message %s: %w", m.Cid(), err)
|
||||||
}
|
}
|
||||||
|
@ -59,14 +59,14 @@ func (filec *FilecoinEC) CreateBlock(ctx context.Context, w api.Wallet, bt *api.
|
|||||||
blsSigs = append(blsSigs, msg.Signature)
|
blsSigs = append(blsSigs, msg.Signature)
|
||||||
blsMessages = append(blsMessages, &msg.Message)
|
blsMessages = append(blsMessages, &msg.Message)
|
||||||
|
|
||||||
c, err := filec.sm.ChainStore().PutMessage(&msg.Message)
|
c, err := filec.sm.ChainStore().PutMessage(ctx, &msg.Message)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
blsMsgCids = append(blsMsgCids, c)
|
blsMsgCids = append(blsMsgCids, c)
|
||||||
} else if msg.Signature.Type == crypto.SigTypeSecp256k1 {
|
} else if msg.Signature.Type == crypto.SigTypeSecp256k1 {
|
||||||
c, err := filec.sm.ChainStore().PutMessage(msg)
|
c, err := filec.sm.ChainStore().PutMessage(ctx, msg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -172,7 +172,7 @@ func collectChainSegment(ctx context.Context, cs *store.ChainStore, req *validat
|
|||||||
}
|
}
|
||||||
|
|
||||||
if req.options.IncludeMessages {
|
if req.options.IncludeMessages {
|
||||||
bmsgs, bmincl, smsgs, smincl, err := gatherMessages(cs, ts)
|
bmsgs, bmincl, smsgs, smincl, err := gatherMessages(ctx, cs, ts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, xerrors.Errorf("gather messages failed: %w", err)
|
return nil, xerrors.Errorf("gather messages failed: %w", err)
|
||||||
}
|
}
|
||||||
@ -197,14 +197,14 @@ func collectChainSegment(ctx context.Context, cs *store.ChainStore, req *validat
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func gatherMessages(cs *store.ChainStore, ts *types.TipSet) ([]*types.Message, [][]uint64, []*types.SignedMessage, [][]uint64, error) {
|
func gatherMessages(ctx context.Context, cs *store.ChainStore, ts *types.TipSet) ([]*types.Message, [][]uint64, []*types.SignedMessage, [][]uint64, error) {
|
||||||
blsmsgmap := make(map[cid.Cid]uint64)
|
blsmsgmap := make(map[cid.Cid]uint64)
|
||||||
secpkmsgmap := make(map[cid.Cid]uint64)
|
secpkmsgmap := make(map[cid.Cid]uint64)
|
||||||
var secpkincl, blsincl [][]uint64
|
var secpkincl, blsincl [][]uint64
|
||||||
|
|
||||||
var blscids, secpkcids []cid.Cid
|
var blscids, secpkcids []cid.Cid
|
||||||
for _, block := range ts.Blocks() {
|
for _, block := range ts.Blocks() {
|
||||||
bc, sc, err := cs.ReadMsgMetaCids(block.Messages)
|
bc, sc, err := cs.ReadMsgMetaCids(ctx, block.Messages)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, nil, nil, err
|
return nil, nil, nil, nil, err
|
||||||
}
|
}
|
||||||
@ -237,12 +237,12 @@ func gatherMessages(cs *store.ChainStore, ts *types.TipSet) ([]*types.Message, [
|
|||||||
secpkincl = append(secpkincl, smi)
|
secpkincl = append(secpkincl, smi)
|
||||||
}
|
}
|
||||||
|
|
||||||
blsmsgs, err := cs.LoadMessagesFromCids(blscids)
|
blsmsgs, err := cs.LoadMessagesFromCids(ctx, blscids)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, nil, nil, err
|
return nil, nil, nil, nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
secpkmsgs, err := cs.LoadSignedMessagesFromCids(secpkcids)
|
secpkmsgs, err := cs.LoadSignedMessagesFromCids(ctx, secpkcids)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, nil, nil, err
|
return nil, nil, nil, nil, err
|
||||||
}
|
}
|
||||||
|
@ -909,12 +909,12 @@ func (mp *MessagePool) addLocked(ctx context.Context, m *types.SignedMessage, st
|
|||||||
mp.blsSigCache.Add(m.Cid(), m.Signature)
|
mp.blsSigCache.Add(m.Cid(), m.Signature)
|
||||||
}
|
}
|
||||||
|
|
||||||
if _, err := mp.api.PutMessage(m); err != nil {
|
if _, err := mp.api.PutMessage(ctx, m); err != nil {
|
||||||
log.Warnf("mpooladd cs.PutMessage failed: %s", err)
|
log.Warnf("mpooladd cs.PutMessage failed: %s", err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if _, err := mp.api.PutMessage(&m.Message); err != nil {
|
if _, err := mp.api.PutMessage(ctx, &m.Message); err != nil {
|
||||||
log.Warnf("mpooladd cs.PutMessage failed: %s", err)
|
log.Warnf("mpooladd cs.PutMessage failed: %s", err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -1216,7 +1216,7 @@ func (mp *MessagePool) HeadChange(ctx context.Context, revert []*types.TipSet, a
|
|||||||
|
|
||||||
mp.curTs = pts
|
mp.curTs = pts
|
||||||
|
|
||||||
msgs, err := mp.MessagesForBlocks(ts.Blocks())
|
msgs, err := mp.MessagesForBlocks(ctx, ts.Blocks())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("error retrieving messages for reverted block: %s", err)
|
log.Errorf("error retrieving messages for reverted block: %s", err)
|
||||||
merr = multierror.Append(merr, err)
|
merr = multierror.Append(merr, err)
|
||||||
@ -1232,7 +1232,7 @@ func (mp *MessagePool) HeadChange(ctx context.Context, revert []*types.TipSet, a
|
|||||||
mp.curTs = ts
|
mp.curTs = ts
|
||||||
|
|
||||||
for _, b := range ts.Blocks() {
|
for _, b := range ts.Blocks() {
|
||||||
bmsgs, smsgs, err := mp.api.MessagesForBlock(b)
|
bmsgs, smsgs, err := mp.api.MessagesForBlock(ctx, b)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
xerr := xerrors.Errorf("failed to get messages for apply block %s(height %d) (msgroot = %s): %w", b.Cid(), b.Height, b.Messages, err)
|
xerr := xerrors.Errorf("failed to get messages for apply block %s(height %d) (msgroot = %s): %w", b.Cid(), b.Height, b.Messages, err)
|
||||||
log.Errorf("error retrieving messages for block: %s", xerr)
|
log.Errorf("error retrieving messages for block: %s", xerr)
|
||||||
@ -1368,7 +1368,7 @@ func (mp *MessagePool) runHeadChange(ctx context.Context, from *types.TipSet, to
|
|||||||
var merr error
|
var merr error
|
||||||
|
|
||||||
for _, ts := range revert {
|
for _, ts := range revert {
|
||||||
msgs, err := mp.MessagesForBlocks(ts.Blocks())
|
msgs, err := mp.MessagesForBlocks(ctx, ts.Blocks())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("error retrieving messages for reverted block: %s", err)
|
log.Errorf("error retrieving messages for reverted block: %s", err)
|
||||||
merr = multierror.Append(merr, err)
|
merr = multierror.Append(merr, err)
|
||||||
@ -1382,7 +1382,7 @@ func (mp *MessagePool) runHeadChange(ctx context.Context, from *types.TipSet, to
|
|||||||
|
|
||||||
for _, ts := range apply {
|
for _, ts := range apply {
|
||||||
for _, b := range ts.Blocks() {
|
for _, b := range ts.Blocks() {
|
||||||
bmsgs, smsgs, err := mp.api.MessagesForBlock(b)
|
bmsgs, smsgs, err := mp.api.MessagesForBlock(ctx, b)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
xerr := xerrors.Errorf("failed to get messages for apply block %s(height %d) (msgroot = %s): %w", b.Cid(), b.Height, b.Messages, err)
|
xerr := xerrors.Errorf("failed to get messages for apply block %s(height %d) (msgroot = %s): %w", b.Cid(), b.Height, b.Messages, err)
|
||||||
log.Errorf("error retrieving messages for block: %s", xerr)
|
log.Errorf("error retrieving messages for block: %s", xerr)
|
||||||
@ -1407,11 +1407,11 @@ type statBucket struct {
|
|||||||
msgs map[uint64]*types.SignedMessage
|
msgs map[uint64]*types.SignedMessage
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mp *MessagePool) MessagesForBlocks(blks []*types.BlockHeader) ([]*types.SignedMessage, error) {
|
func (mp *MessagePool) MessagesForBlocks(ctx context.Context, blks []*types.BlockHeader) ([]*types.SignedMessage, error) {
|
||||||
out := make([]*types.SignedMessage, 0)
|
out := make([]*types.SignedMessage, 0)
|
||||||
|
|
||||||
for _, b := range blks {
|
for _, b := range blks {
|
||||||
bmsgs, smsgs, err := mp.api.MessagesForBlock(b)
|
bmsgs, smsgs, err := mp.api.MessagesForBlock(ctx, b)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, xerrors.Errorf("failed to get messages for apply block %s(height %d) (msgroot = %s): %w", b.Cid(), b.Height, b.Messages, err)
|
return nil, xerrors.Errorf("failed to get messages for apply block %s(height %d) (msgroot = %s): %w", b.Cid(), b.Height, b.Messages, err)
|
||||||
}
|
}
|
||||||
|
@ -23,12 +23,12 @@ var (
|
|||||||
|
|
||||||
type Provider interface {
|
type Provider interface {
|
||||||
SubscribeHeadChanges(func(rev, app []*types.TipSet) error) *types.TipSet
|
SubscribeHeadChanges(func(rev, app []*types.TipSet) error) *types.TipSet
|
||||||
PutMessage(m types.ChainMsg) (cid.Cid, error)
|
PutMessage(ctx context.Context, m types.ChainMsg) (cid.Cid, error)
|
||||||
PubSubPublish(string, []byte) error
|
PubSubPublish(string, []byte) error
|
||||||
GetActorAfter(address.Address, *types.TipSet) (*types.Actor, error)
|
GetActorAfter(address.Address, *types.TipSet) (*types.Actor, error)
|
||||||
StateAccountKeyAtFinality(context.Context, address.Address, *types.TipSet) (address.Address, error)
|
StateAccountKeyAtFinality(context.Context, address.Address, *types.TipSet) (address.Address, error)
|
||||||
MessagesForBlock(*types.BlockHeader) ([]*types.Message, []*types.SignedMessage, error)
|
MessagesForBlock(context.Context, *types.BlockHeader) ([]*types.Message, []*types.SignedMessage, error)
|
||||||
MessagesForTipset(*types.TipSet) ([]types.ChainMsg, error)
|
MessagesForTipset(context.Context, *types.TipSet) ([]types.ChainMsg, error)
|
||||||
LoadTipSet(ctx context.Context, tsk types.TipSetKey) (*types.TipSet, error)
|
LoadTipSet(ctx context.Context, tsk types.TipSetKey) (*types.TipSet, error)
|
||||||
ChainComputeBaseFee(ctx context.Context, ts *types.TipSet) (types.BigInt, error)
|
ChainComputeBaseFee(ctx context.Context, ts *types.TipSet) (types.BigInt, error)
|
||||||
IsLite() bool
|
IsLite() bool
|
||||||
@ -66,8 +66,8 @@ func (mpp *mpoolProvider) SubscribeHeadChanges(cb func(rev, app []*types.TipSet)
|
|||||||
return mpp.sm.ChainStore().GetHeaviestTipSet()
|
return mpp.sm.ChainStore().GetHeaviestTipSet()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mpp *mpoolProvider) PutMessage(m types.ChainMsg) (cid.Cid, error) {
|
func (mpp *mpoolProvider) PutMessage(ctx context.Context, m types.ChainMsg) (cid.Cid, error) {
|
||||||
return mpp.sm.ChainStore().PutMessage(m)
|
return mpp.sm.ChainStore().PutMessage(ctx, m)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mpp *mpoolProvider) PubSubPublish(k string, v []byte) error {
|
func (mpp *mpoolProvider) PubSubPublish(k string, v []byte) error {
|
||||||
@ -103,12 +103,12 @@ func (mpp *mpoolProvider) StateAccountKeyAtFinality(ctx context.Context, addr ad
|
|||||||
return mpp.sm.ResolveToKeyAddressAtFinality(ctx, addr, ts)
|
return mpp.sm.ResolveToKeyAddressAtFinality(ctx, addr, ts)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mpp *mpoolProvider) MessagesForBlock(h *types.BlockHeader) ([]*types.Message, []*types.SignedMessage, error) {
|
func (mpp *mpoolProvider) MessagesForBlock(ctx context.Context, h *types.BlockHeader) ([]*types.Message, []*types.SignedMessage, error) {
|
||||||
return mpp.sm.ChainStore().MessagesForBlock(h)
|
return mpp.sm.ChainStore().MessagesForBlock(ctx, h)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mpp *mpoolProvider) MessagesForTipset(ts *types.TipSet) ([]types.ChainMsg, error) {
|
func (mpp *mpoolProvider) MessagesForTipset(ctx context.Context, ts *types.TipSet) ([]types.ChainMsg, error) {
|
||||||
return mpp.sm.ChainStore().MessagesForTipset(ts)
|
return mpp.sm.ChainStore().MessagesForTipset(ctx, ts)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mpp *mpoolProvider) LoadTipSet(ctx context.Context, tsk types.TipSetKey) (*types.TipSet, error) {
|
func (mpp *mpoolProvider) LoadTipSet(ctx context.Context, tsk types.TipSetKey) (*types.TipSet, error) {
|
||||||
|
@ -20,7 +20,7 @@ func (sm *StateManager) WaitForMessage(ctx context.Context, mcid cid.Cid, confid
|
|||||||
ctx, cancel := context.WithCancel(ctx)
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
msg, err := sm.cs.GetCMessage(mcid)
|
msg, err := sm.cs.GetCMessage(ctx, mcid)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, cid.Undef, fmt.Errorf("failed to load message: %w", err)
|
return nil, nil, cid.Undef, fmt.Errorf("failed to load message: %w", err)
|
||||||
}
|
}
|
||||||
@ -130,7 +130,7 @@ func (sm *StateManager) WaitForMessage(ctx context.Context, mcid cid.Cid, confid
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (sm *StateManager) SearchForMessage(ctx context.Context, head *types.TipSet, mcid cid.Cid, lookbackLimit abi.ChainEpoch, allowReplaced bool) (*types.TipSet, *types.MessageReceipt, cid.Cid, error) {
|
func (sm *StateManager) SearchForMessage(ctx context.Context, head *types.TipSet, mcid cid.Cid, lookbackLimit abi.ChainEpoch, allowReplaced bool) (*types.TipSet, *types.MessageReceipt, cid.Cid, error) {
|
||||||
msg, err := sm.cs.GetCMessage(mcid)
|
msg, err := sm.cs.GetCMessage(ctx, mcid)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, cid.Undef, fmt.Errorf("failed to load message: %w", err)
|
return nil, nil, cid.Undef, fmt.Errorf("failed to load message: %w", err)
|
||||||
}
|
}
|
||||||
@ -240,7 +240,7 @@ func (sm *StateManager) tipsetExecutedMessage(ctx context.Context, ts *types.Tip
|
|||||||
return nil, cid.Undef, err
|
return nil, cid.Undef, err
|
||||||
}
|
}
|
||||||
|
|
||||||
cm, err := sm.cs.MessagesForTipset(pts)
|
cm, err := sm.cs.MessagesForTipset(ctx, pts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, cid.Undef, err
|
return nil, cid.Undef, err
|
||||||
}
|
}
|
||||||
@ -267,7 +267,7 @@ func (sm *StateManager) tipsetExecutedMessage(ctx context.Context, ts *types.Tip
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pr, err := sm.cs.GetParentReceipt(ts.Blocks()[0], i)
|
pr, err := sm.cs.GetParentReceipt(ctx, ts.Blocks()[0], i)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, cid.Undef, err
|
return nil, cid.Undef, err
|
||||||
}
|
}
|
||||||
|
@ -58,7 +58,7 @@ func (cs *ChainStore) ComputeBaseFee(ctx context.Context, ts *types.TipSet) (abi
|
|||||||
seen := make(map[cid.Cid]struct{})
|
seen := make(map[cid.Cid]struct{})
|
||||||
|
|
||||||
for _, b := range ts.Blocks() {
|
for _, b := range ts.Blocks() {
|
||||||
msg1, msg2, err := cs.MessagesForBlock(b)
|
msg1, msg2, err := cs.MessagesForBlock(ctx, b)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return zero, xerrors.Errorf("error getting messages for: %s: %w", b.Cid(), err)
|
return zero, xerrors.Errorf("error getting messages for: %s: %w", b.Cid(), err)
|
||||||
}
|
}
|
||||||
|
@ -23,25 +23,25 @@ type storable interface {
|
|||||||
ToStorageBlock() (block.Block, error)
|
ToStorageBlock() (block.Block, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
func PutMessage(bs bstore.Blockstore, m storable) (cid.Cid, error) {
|
func PutMessage(ctx context.Context, bs bstore.Blockstore, m storable) (cid.Cid, error) {
|
||||||
b, err := m.ToStorageBlock()
|
b, err := m.ToStorageBlock()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return cid.Undef, err
|
return cid.Undef, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := bs.Put(context.TODO(), b); err != nil {
|
if err := bs.Put(ctx, b); err != nil {
|
||||||
return cid.Undef, err
|
return cid.Undef, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return b.Cid(), nil
|
return b.Cid(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cs *ChainStore) PutMessage(m storable) (cid.Cid, error) {
|
func (cs *ChainStore) PutMessage(ctx context.Context, m storable) (cid.Cid, error) {
|
||||||
return PutMessage(cs.chainBlockstore, m)
|
return PutMessage(ctx, cs.chainBlockstore, m)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cs *ChainStore) GetCMessage(c cid.Cid) (types.ChainMsg, error) {
|
func (cs *ChainStore) GetCMessage(ctx context.Context, c cid.Cid) (types.ChainMsg, error) {
|
||||||
m, err := cs.GetMessage(c)
|
m, err := cs.GetMessage(ctx, c)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
return m, nil
|
return m, nil
|
||||||
}
|
}
|
||||||
@ -49,21 +49,21 @@ func (cs *ChainStore) GetCMessage(c cid.Cid) (types.ChainMsg, error) {
|
|||||||
log.Warnf("GetCMessage: unexpected error getting unsigned message: %s", err)
|
log.Warnf("GetCMessage: unexpected error getting unsigned message: %s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return cs.GetSignedMessage(c)
|
return cs.GetSignedMessage(ctx, c)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cs *ChainStore) GetMessage(c cid.Cid) (*types.Message, error) {
|
func (cs *ChainStore) GetMessage(ctx context.Context, c cid.Cid) (*types.Message, error) {
|
||||||
var msg *types.Message
|
var msg *types.Message
|
||||||
err := cs.chainLocalBlockstore.View(context.TODO(), c, func(b []byte) (err error) {
|
err := cs.chainLocalBlockstore.View(ctx, c, func(b []byte) (err error) {
|
||||||
msg, err = types.DecodeMessage(b)
|
msg, err = types.DecodeMessage(b)
|
||||||
return err
|
return err
|
||||||
})
|
})
|
||||||
return msg, err
|
return msg, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cs *ChainStore) GetSignedMessage(c cid.Cid) (*types.SignedMessage, error) {
|
func (cs *ChainStore) GetSignedMessage(ctx context.Context, c cid.Cid) (*types.SignedMessage, error) {
|
||||||
var msg *types.SignedMessage
|
var msg *types.SignedMessage
|
||||||
err := cs.chainLocalBlockstore.View(context.TODO(), c, func(b []byte) (err error) {
|
err := cs.chainLocalBlockstore.View(ctx, c, func(b []byte) (err error) {
|
||||||
msg, err = types.DecodeSignedMessage(b)
|
msg, err = types.DecodeSignedMessage(b)
|
||||||
return err
|
return err
|
||||||
})
|
})
|
||||||
@ -103,7 +103,7 @@ type BlockMessages struct {
|
|||||||
SecpkMessages []types.ChainMsg
|
SecpkMessages []types.ChainMsg
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cs *ChainStore) BlockMsgsForTipset(ts *types.TipSet) ([]BlockMessages, error) {
|
func (cs *ChainStore) BlockMsgsForTipset(ctx context.Context, ts *types.TipSet) ([]BlockMessages, error) {
|
||||||
// returned BlockMessages match block order in tipset
|
// returned BlockMessages match block order in tipset
|
||||||
|
|
||||||
applied := make(map[address.Address]uint64)
|
applied := make(map[address.Address]uint64)
|
||||||
@ -142,7 +142,7 @@ func (cs *ChainStore) BlockMsgsForTipset(ts *types.TipSet) ([]BlockMessages, err
|
|||||||
var out []BlockMessages
|
var out []BlockMessages
|
||||||
for _, b := range ts.Blocks() {
|
for _, b := range ts.Blocks() {
|
||||||
|
|
||||||
bms, sms, err := cs.MessagesForBlock(b)
|
bms, sms, err := cs.MessagesForBlock(ctx, b)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, xerrors.Errorf("failed to get messages for block: %w", err)
|
return nil, xerrors.Errorf("failed to get messages for block: %w", err)
|
||||||
}
|
}
|
||||||
@ -181,8 +181,8 @@ func (cs *ChainStore) BlockMsgsForTipset(ts *types.TipSet) ([]BlockMessages, err
|
|||||||
return out, nil
|
return out, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cs *ChainStore) MessagesForTipset(ts *types.TipSet) ([]types.ChainMsg, error) {
|
func (cs *ChainStore) MessagesForTipset(ctx context.Context, ts *types.TipSet) ([]types.ChainMsg, error) {
|
||||||
bmsgs, err := cs.BlockMsgsForTipset(ts)
|
bmsgs, err := cs.BlockMsgsForTipset(ctx, ts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -206,7 +206,7 @@ type mmCids struct {
|
|||||||
secpk []cid.Cid
|
secpk []cid.Cid
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cs *ChainStore) ReadMsgMetaCids(mmc cid.Cid) ([]cid.Cid, []cid.Cid, error) {
|
func (cs *ChainStore) ReadMsgMetaCids(ctx context.Context, mmc cid.Cid) ([]cid.Cid, []cid.Cid, error) {
|
||||||
o, ok := cs.mmCache.Get(mmc)
|
o, ok := cs.mmCache.Get(mmc)
|
||||||
if ok {
|
if ok {
|
||||||
mmcids := o.(*mmCids)
|
mmcids := o.(*mmCids)
|
||||||
@ -215,7 +215,7 @@ func (cs *ChainStore) ReadMsgMetaCids(mmc cid.Cid) ([]cid.Cid, []cid.Cid, error)
|
|||||||
|
|
||||||
cst := cbor.NewCborStore(cs.chainLocalBlockstore)
|
cst := cbor.NewCborStore(cs.chainLocalBlockstore)
|
||||||
var msgmeta types.MsgMeta
|
var msgmeta types.MsgMeta
|
||||||
if err := cst.Get(context.TODO(), mmc, &msgmeta); err != nil {
|
if err := cst.Get(ctx, mmc, &msgmeta); err != nil {
|
||||||
return nil, nil, xerrors.Errorf("failed to load msgmeta (%s): %w", mmc, err)
|
return nil, nil, xerrors.Errorf("failed to load msgmeta (%s): %w", mmc, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -237,18 +237,18 @@ func (cs *ChainStore) ReadMsgMetaCids(mmc cid.Cid) ([]cid.Cid, []cid.Cid, error)
|
|||||||
return blscids, secpkcids, nil
|
return blscids, secpkcids, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cs *ChainStore) MessagesForBlock(b *types.BlockHeader) ([]*types.Message, []*types.SignedMessage, error) {
|
func (cs *ChainStore) MessagesForBlock(ctx context.Context, b *types.BlockHeader) ([]*types.Message, []*types.SignedMessage, error) {
|
||||||
blscids, secpkcids, err := cs.ReadMsgMetaCids(b.Messages)
|
blscids, secpkcids, err := cs.ReadMsgMetaCids(ctx, b.Messages)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
blsmsgs, err := cs.LoadMessagesFromCids(blscids)
|
blsmsgs, err := cs.LoadMessagesFromCids(ctx, blscids)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, xerrors.Errorf("loading bls messages for block: %w", err)
|
return nil, nil, xerrors.Errorf("loading bls messages for block: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
secpkmsgs, err := cs.LoadSignedMessagesFromCids(secpkcids)
|
secpkmsgs, err := cs.LoadSignedMessagesFromCids(ctx, secpkcids)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, xerrors.Errorf("loading secpk messages for block: %w", err)
|
return nil, nil, xerrors.Errorf("loading secpk messages for block: %w", err)
|
||||||
}
|
}
|
||||||
@ -256,8 +256,7 @@ func (cs *ChainStore) MessagesForBlock(b *types.BlockHeader) ([]*types.Message,
|
|||||||
return blsmsgs, secpkmsgs, nil
|
return blsmsgs, secpkmsgs, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cs *ChainStore) GetParentReceipt(b *types.BlockHeader, i int) (*types.MessageReceipt, error) {
|
func (cs *ChainStore) GetParentReceipt(ctx context.Context, b *types.BlockHeader, i int) (*types.MessageReceipt, error) {
|
||||||
ctx := context.TODO()
|
|
||||||
// block headers use adt0, for now.
|
// block headers use adt0, for now.
|
||||||
a, err := blockadt.AsArray(cs.ActorStore(ctx), b.ParentMessageReceipts)
|
a, err := blockadt.AsArray(cs.ActorStore(ctx), b.ParentMessageReceipts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -274,10 +273,10 @@ func (cs *ChainStore) GetParentReceipt(b *types.BlockHeader, i int) (*types.Mess
|
|||||||
return &r, nil
|
return &r, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cs *ChainStore) LoadMessagesFromCids(cids []cid.Cid) ([]*types.Message, error) {
|
func (cs *ChainStore) LoadMessagesFromCids(ctx context.Context, cids []cid.Cid) ([]*types.Message, error) {
|
||||||
msgs := make([]*types.Message, 0, len(cids))
|
msgs := make([]*types.Message, 0, len(cids))
|
||||||
for i, c := range cids {
|
for i, c := range cids {
|
||||||
m, err := cs.GetMessage(c)
|
m, err := cs.GetMessage(ctx, c)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, xerrors.Errorf("failed to get message: (%s):%d: %w", c, i, err)
|
return nil, xerrors.Errorf("failed to get message: (%s):%d: %w", c, i, err)
|
||||||
}
|
}
|
||||||
@ -288,10 +287,10 @@ func (cs *ChainStore) LoadMessagesFromCids(cids []cid.Cid) ([]*types.Message, er
|
|||||||
return msgs, nil
|
return msgs, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cs *ChainStore) LoadSignedMessagesFromCids(cids []cid.Cid) ([]*types.SignedMessage, error) {
|
func (cs *ChainStore) LoadSignedMessagesFromCids(ctx context.Context, cids []cid.Cid) ([]*types.SignedMessage, error) {
|
||||||
msgs := make([]*types.SignedMessage, 0, len(cids))
|
msgs := make([]*types.SignedMessage, 0, len(cids))
|
||||||
for i, c := range cids {
|
for i, c := range cids {
|
||||||
m, err := cs.GetSignedMessage(c)
|
m, err := cs.GetSignedMessage(ctx, c)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, xerrors.Errorf("failed to get message: (%s):%d: %w", c, i, err)
|
return nil, xerrors.Errorf("failed to get message: (%s):%d: %w", c, i, err)
|
||||||
}
|
}
|
||||||
|
@ -43,8 +43,7 @@ func (cs *ChainStore) Export(ctx context.Context, ts *types.TipSet, inclRecentRo
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cs *ChainStore) Import(r io.Reader) (*types.TipSet, error) {
|
func (cs *ChainStore) Import(ctx context.Context, r io.Reader) (*types.TipSet, error) {
|
||||||
ctx := context.TODO()
|
|
||||||
// TODO: writing only to the state blockstore is incorrect.
|
// TODO: writing only to the state blockstore is incorrect.
|
||||||
// At this time, both the state and chain blockstores are backed by the
|
// At this time, both the state and chain blockstores are backed by the
|
||||||
// universal store. When we physically segregate the stores, we will need
|
// universal store. When we physically segregate the stores, we will need
|
||||||
|
@ -1108,11 +1108,11 @@ func (cs *ChainStore) ActorStore(ctx context.Context) adt.Store {
|
|||||||
return ActorStore(ctx, cs.stateBlockstore)
|
return ActorStore(ctx, cs.stateBlockstore)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cs *ChainStore) TryFillTipSet(ts *types.TipSet) (*FullTipSet, error) {
|
func (cs *ChainStore) TryFillTipSet(ctx context.Context, ts *types.TipSet) (*FullTipSet, error) {
|
||||||
var out []*types.FullBlock
|
var out []*types.FullBlock
|
||||||
|
|
||||||
for _, b := range ts.Blocks() {
|
for _, b := range ts.Blocks() {
|
||||||
bmsgs, smsgs, err := cs.MessagesForBlock(b)
|
bmsgs, smsgs, err := cs.MessagesForBlock(ctx, b)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// TODO: check for 'not found' errors, and only return nil if this
|
// TODO: check for 'not found' errors, and only return nil if this
|
||||||
// is actually a 'not found' error
|
// is actually a 'not found' error
|
||||||
|
@ -298,11 +298,11 @@ func (syncer *Syncer) ValidateMsgMeta(fblk *types.FullBlock) error {
|
|||||||
// into the blockstore.
|
// into the blockstore.
|
||||||
blockstore := bstore.NewMemory()
|
blockstore := bstore.NewMemory()
|
||||||
cst := cbor.NewCborStore(blockstore)
|
cst := cbor.NewCborStore(blockstore)
|
||||||
|
ctx := context.Background()
|
||||||
var bcids, scids []cid.Cid
|
var bcids, scids []cid.Cid
|
||||||
|
|
||||||
for _, m := range fblk.BlsMessages {
|
for _, m := range fblk.BlsMessages {
|
||||||
c, err := store.PutMessage(blockstore, m)
|
c, err := store.PutMessage(ctx, blockstore, m)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("putting bls message to blockstore after msgmeta computation: %w", err)
|
return xerrors.Errorf("putting bls message to blockstore after msgmeta computation: %w", err)
|
||||||
}
|
}
|
||||||
@ -310,7 +310,7 @@ func (syncer *Syncer) ValidateMsgMeta(fblk *types.FullBlock) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
for _, m := range fblk.SecpkMessages {
|
for _, m := range fblk.SecpkMessages {
|
||||||
c, err := store.PutMessage(blockstore, m)
|
c, err := store.PutMessage(ctx, blockstore, m)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("putting bls message to blockstore after msgmeta computation: %w", err)
|
return xerrors.Errorf("putting bls message to blockstore after msgmeta computation: %w", err)
|
||||||
}
|
}
|
||||||
@ -482,7 +482,7 @@ func (syncer *Syncer) tryLoadFullTipSet(ctx context.Context, tsk types.TipSetKey
|
|||||||
|
|
||||||
fts := &store.FullTipSet{}
|
fts := &store.FullTipSet{}
|
||||||
for _, b := range ts.Blocks() {
|
for _, b := range ts.Blocks() {
|
||||||
bmsgs, smsgs, err := syncer.store.MessagesForBlock(b)
|
bmsgs, smsgs, err := syncer.store.MessagesForBlock(ctx, b)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -965,7 +965,7 @@ func (syncer *Syncer) iterFullTipsets(ctx context.Context, headers []*types.TipS
|
|||||||
span.AddAttributes(trace.Int64Attribute("num_headers", int64(len(headers))))
|
span.AddAttributes(trace.Int64Attribute("num_headers", int64(len(headers))))
|
||||||
|
|
||||||
for i := len(headers) - 1; i >= 0; {
|
for i := len(headers) - 1; i >= 0; {
|
||||||
fts, err := syncer.store.TryFillTipSet(headers[i])
|
fts, err := syncer.store.TryFillTipSet(ctx, headers[i])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -1138,7 +1138,7 @@ func persistMessages(ctx context.Context, bs bstore.Blockstore, bst *exchange.Co
|
|||||||
|
|
||||||
for _, m := range bst.Bls {
|
for _, m := range bst.Bls {
|
||||||
//log.Infof("putting BLS message: %s", m.Cid())
|
//log.Infof("putting BLS message: %s", m.Cid())
|
||||||
if _, err := store.PutMessage(bs, m); err != nil {
|
if _, err := store.PutMessage(ctx, bs, m); err != nil {
|
||||||
log.Errorf("failed to persist messages: %+v", err)
|
log.Errorf("failed to persist messages: %+v", err)
|
||||||
return xerrors.Errorf("BLS message processing failed: %w", err)
|
return xerrors.Errorf("BLS message processing failed: %w", err)
|
||||||
}
|
}
|
||||||
@ -1148,7 +1148,7 @@ func persistMessages(ctx context.Context, bs bstore.Blockstore, bst *exchange.Co
|
|||||||
return xerrors.Errorf("unknown signature type on message %s: %q", m.Cid(), m.Signature.Type)
|
return xerrors.Errorf("unknown signature type on message %s: %q", m.Cid(), m.Signature.Type)
|
||||||
}
|
}
|
||||||
//log.Infof("putting secp256k1 message: %s", m.Cid())
|
//log.Infof("putting secp256k1 message: %s", m.Cid())
|
||||||
if _, err := store.PutMessage(bs, m); err != nil {
|
if _, err := store.PutMessage(ctx, bs, m); err != nil {
|
||||||
log.Errorf("failed to persist messages: %+v", err)
|
log.Errorf("failed to persist messages: %+v", err)
|
||||||
return xerrors.Errorf("secp256k1 message processing failed: %w", err)
|
return xerrors.Errorf("secp256k1 message processing failed: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -304,7 +304,7 @@ var importBenchCmd = &cli.Command{
|
|||||||
return fmt.Errorf("no CAR file provided for import")
|
return fmt.Errorf("no CAR file provided for import")
|
||||||
}
|
}
|
||||||
|
|
||||||
head, err = cs.Import(carFile)
|
head, err = cs.Import(cctx.Context, carFile)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -327,7 +327,7 @@ var importBenchCmd = &cli.Command{
|
|||||||
return xerrors.Errorf("failed to parse head tipset key: %w", err)
|
return xerrors.Errorf("failed to parse head tipset key: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
head, err = cs.LoadTipSet(context.Background(), types.NewTipSetKey(cids...))
|
head, err = cs.LoadTipSet(cctx.Context, types.NewTipSetKey(cids...))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -336,7 +336,7 @@ var importBenchCmd = &cli.Command{
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
head, err = cs.LoadTipSet(context.Background(), types.NewTipSetKey(cr.Header.Roots...))
|
head, err = cs.LoadTipSet(cctx.Context, types.NewTipSetKey(cr.Header.Roots...))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -353,7 +353,7 @@ var importBenchCmd = &cli.Command{
|
|||||||
if cids, err = lcli.ParseTipSetString(tsk); err != nil {
|
if cids, err = lcli.ParseTipSetString(tsk); err != nil {
|
||||||
return xerrors.Errorf("failed to parse genesis tipset key: %w", err)
|
return xerrors.Errorf("failed to parse genesis tipset key: %w", err)
|
||||||
}
|
}
|
||||||
genesis, err = cs.LoadTipSet(context.Background(), types.NewTipSetKey(cids...))
|
genesis, err = cs.LoadTipSet(cctx.Context, types.NewTipSetKey(cids...))
|
||||||
} else {
|
} else {
|
||||||
log.Warnf("getting genesis by height; this will be slow; pass in the genesis tipset through --genesis-tipset")
|
log.Warnf("getting genesis by height; this will be slow; pass in the genesis tipset through --genesis-tipset")
|
||||||
// fallback to the slow path of walking the chain.
|
// fallback to the slow path of walking the chain.
|
||||||
@ -364,7 +364,7 @@ var importBenchCmd = &cli.Command{
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err = cs.SetGenesis(context.Background(), genesis.Blocks()[0]); err != nil {
|
if err = cs.SetGenesis(cctx.Context, genesis.Blocks()[0]); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -375,10 +375,10 @@ var importBenchCmd = &cli.Command{
|
|||||||
if cids, err = lcli.ParseTipSetString(tsk); err != nil {
|
if cids, err = lcli.ParseTipSetString(tsk); err != nil {
|
||||||
return xerrors.Errorf("failed to end genesis tipset key: %w", err)
|
return xerrors.Errorf("failed to end genesis tipset key: %w", err)
|
||||||
}
|
}
|
||||||
end, err = cs.LoadTipSet(context.Background(), types.NewTipSetKey(cids...))
|
end, err = cs.LoadTipSet(cctx.Context, types.NewTipSetKey(cids...))
|
||||||
} else if h := cctx.Int64("end-height"); h != 0 {
|
} else if h := cctx.Int64("end-height"); h != 0 {
|
||||||
log.Infof("getting end tipset at height %d...", h)
|
log.Infof("getting end tipset at height %d...", h)
|
||||||
end, err = cs.GetTipsetByHeight(context.TODO(), abi.ChainEpoch(h), head, true)
|
end, err = cs.GetTipsetByHeight(cctx.Context, abi.ChainEpoch(h), head, true)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -397,7 +397,7 @@ var importBenchCmd = &cli.Command{
|
|||||||
if cids, err = lcli.ParseTipSetString(tsk); err != nil {
|
if cids, err = lcli.ParseTipSetString(tsk); err != nil {
|
||||||
return xerrors.Errorf("failed to start genesis tipset key: %w", err)
|
return xerrors.Errorf("failed to start genesis tipset key: %w", err)
|
||||||
}
|
}
|
||||||
start, err = cs.LoadTipSet(context.Background(), types.NewTipSetKey(cids...))
|
start, err = cs.LoadTipSet(cctx.Context, types.NewTipSetKey(cids...))
|
||||||
} else if h := cctx.Int64("start-height"); h != 0 {
|
} else if h := cctx.Int64("start-height"); h != 0 {
|
||||||
log.Infof("getting start tipset at height %d...", h)
|
log.Infof("getting start tipset at height %d...", h)
|
||||||
// lookback from the end tipset (which falls back to head if not supplied).
|
// lookback from the end tipset (which falls back to head if not supplied).
|
||||||
@ -410,7 +410,7 @@ var importBenchCmd = &cli.Command{
|
|||||||
|
|
||||||
if start != nil {
|
if start != nil {
|
||||||
startEpoch = start.Height()
|
startEpoch = start.Height()
|
||||||
if err := cs.ForceHeadSilent(context.Background(), start); err != nil {
|
if err := cs.ForceHeadSilent(cctx.Context, start); err != nil {
|
||||||
// if err := cs.SetHead(start); err != nil {
|
// if err := cs.SetHead(start); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -421,7 +421,7 @@ var importBenchCmd = &cli.Command{
|
|||||||
if h := ts.Height(); h%100 == 0 {
|
if h := ts.Height(); h%100 == 0 {
|
||||||
log.Infof("walking back the chain; loaded tipset at height %d...", h)
|
log.Infof("walking back the chain; loaded tipset at height %d...", h)
|
||||||
}
|
}
|
||||||
next, err := cs.LoadTipSet(context.Background(), ts.Parents())
|
next, err := cs.LoadTipSet(cctx.Context, ts.Parents())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -347,7 +347,7 @@ func migratePreSealMeta(ctx context.Context, api v1api.FullNode, metadata string
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := mds.Put(context.Background(), sectorKey, b); err != nil {
|
if err := mds.Put(ctx, sectorKey, b); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -387,7 +387,7 @@ func migratePreSealMeta(ctx context.Context, api v1api.FullNode, metadata string
|
|||||||
|
|
||||||
buf := make([]byte, binary.MaxVarintLen64)
|
buf := make([]byte, binary.MaxVarintLen64)
|
||||||
size := binary.PutUvarint(buf, uint64(maxSectorID))
|
size := binary.PutUvarint(buf, uint64(maxSectorID))
|
||||||
return mds.Put(context.Background(), datastore.NewKey(modules.StorageCounterDSPrefix), buf[:size])
|
return mds.Put(ctx, datastore.NewKey(modules.StorageCounterDSPrefix), buf[:size])
|
||||||
}
|
}
|
||||||
|
|
||||||
func findMarketDealID(ctx context.Context, api v1api.FullNode, deal market2.DealProposal) (abi.DealID, error) {
|
func findMarketDealID(ctx context.Context, api v1api.FullNode, deal market2.DealProposal) (abi.DealID, error) {
|
||||||
@ -428,7 +428,7 @@ func storageMinerInit(ctx context.Context, cctx *cli.Context, api v1api.FullNode
|
|||||||
return xerrors.Errorf("peer ID from private key: %w", err)
|
return xerrors.Errorf("peer ID from private key: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
mds, err := lr.Datastore(context.TODO(), "/metadata")
|
mds, err := lr.Datastore(ctx, "/metadata")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -441,7 +441,7 @@ func storageMinerInit(ctx context.Context, cctx *cli.Context, api v1api.FullNode
|
|||||||
}
|
}
|
||||||
|
|
||||||
if cctx.Bool("genesis-miner") {
|
if cctx.Bool("genesis-miner") {
|
||||||
if err := mds.Put(context.Background(), datastore.NewKey("miner-address"), a.Bytes()); err != nil {
|
if err := mds.Put(ctx, datastore.NewKey("miner-address"), a.Bytes()); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -548,7 +548,7 @@ func storageMinerInit(ctx context.Context, cctx *cli.Context, api v1api.FullNode
|
|||||||
}
|
}
|
||||||
|
|
||||||
log.Infof("Created new miner: %s", addr)
|
log.Infof("Created new miner: %s", addr)
|
||||||
if err := mds.Put(context.Background(), datastore.NewKey("miner-address"), addr.Bytes()); err != nil {
|
if err := mds.Put(ctx, datastore.NewKey("miner-address"), addr.Bytes()); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -233,7 +233,7 @@ func restore(ctx context.Context, cctx *cli.Context, targetPath string, strConfi
|
|||||||
|
|
||||||
log.Info("Restoring metadata backup")
|
log.Info("Restoring metadata backup")
|
||||||
|
|
||||||
mds, err := lr.Datastore(context.TODO(), "/metadata")
|
mds, err := lr.Datastore(ctx, "/metadata")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -255,7 +255,7 @@ func restore(ctx context.Context, cctx *cli.Context, targetPath string, strConfi
|
|||||||
|
|
||||||
log.Info("Checking actor metadata")
|
log.Info("Checking actor metadata")
|
||||||
|
|
||||||
abytes, err := mds.Get(context.Background(), datastore.NewKey("miner-address"))
|
abytes, err := mds.Get(ctx, datastore.NewKey("miner-address"))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("getting actor address from metadata datastore: %w", err)
|
return xerrors.Errorf("getting actor address from metadata datastore: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -31,7 +31,7 @@ func (sim *Simulation) storeMessages(ctx context.Context, messages []*types.Mess
|
|||||||
// fail a pre-commit...
|
// fail a pre-commit...
|
||||||
var msgCids []cid.Cid
|
var msgCids []cid.Cid
|
||||||
for _, msg := range messages {
|
for _, msg := range messages {
|
||||||
c, err := sim.Node.Chainstore.PutMessage(msg)
|
c, err := sim.Node.Chainstore.PutMessage(ctx, msg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return cid.Undef, err
|
return cid.Undef, err
|
||||||
}
|
}
|
||||||
|
@ -90,7 +90,7 @@ type Simulation struct {
|
|||||||
// loadConfig loads a simulation's config from the datastore. This must be called on startup and may
|
// loadConfig loads a simulation's config from the datastore. This must be called on startup and may
|
||||||
// be called to restore the config from-disk.
|
// be called to restore the config from-disk.
|
||||||
func (sim *Simulation) loadConfig() error {
|
func (sim *Simulation) loadConfig() error {
|
||||||
configBytes, err := sim.Node.MetadataDS.Get(context.Background(), sim.key("config"))
|
configBytes, err := sim.Node.MetadataDS.Get(context.TODO(), sim.key("config"))
|
||||||
if err == nil {
|
if err == nil {
|
||||||
err = json.Unmarshal(configBytes, &sim.config)
|
err = json.Unmarshal(configBytes, &sim.config)
|
||||||
}
|
}
|
||||||
@ -111,7 +111,7 @@ func (sim *Simulation) saveConfig() error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
return sim.Node.MetadataDS.Put(context.Background(), sim.key("config"), buf)
|
return sim.Node.MetadataDS.Put(context.TODO(), sim.key("config"), buf)
|
||||||
}
|
}
|
||||||
|
|
||||||
var simulationPrefix = datastore.NewKey("/simulation")
|
var simulationPrefix = datastore.NewKey("/simulation")
|
||||||
@ -124,7 +124,7 @@ func (sim *Simulation) key(subkey string) datastore.Key {
|
|||||||
|
|
||||||
// loadNamedTipSet the tipset with the given name (for this simulation)
|
// loadNamedTipSet the tipset with the given name (for this simulation)
|
||||||
func (sim *Simulation) loadNamedTipSet(name string) (*types.TipSet, error) {
|
func (sim *Simulation) loadNamedTipSet(name string) (*types.TipSet, error) {
|
||||||
tskBytes, err := sim.Node.MetadataDS.Get(context.Background(), sim.key(name))
|
tskBytes, err := sim.Node.MetadataDS.Get(context.TODO(), sim.key(name))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, xerrors.Errorf("failed to load tipset %s/%s: %w", sim.name, name, err)
|
return nil, xerrors.Errorf("failed to load tipset %s/%s: %w", sim.name, name, err)
|
||||||
}
|
}
|
||||||
@ -132,7 +132,7 @@ func (sim *Simulation) loadNamedTipSet(name string) (*types.TipSet, error) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, xerrors.Errorf("failed to parse tipste %v (%s/%s): %w", tskBytes, sim.name, name, err)
|
return nil, xerrors.Errorf("failed to parse tipste %v (%s/%s): %w", tskBytes, sim.name, name, err)
|
||||||
}
|
}
|
||||||
ts, err := sim.Node.Chainstore.LoadTipSet(context.Background(), tsk)
|
ts, err := sim.Node.Chainstore.LoadTipSet(context.TODO(), tsk)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, xerrors.Errorf("failed to load tipset %s (%s/%s): %w", tsk, sim.name, name, err)
|
return nil, xerrors.Errorf("failed to load tipset %s (%s/%s): %w", tsk, sim.name, name, err)
|
||||||
}
|
}
|
||||||
@ -141,7 +141,7 @@ func (sim *Simulation) loadNamedTipSet(name string) (*types.TipSet, error) {
|
|||||||
|
|
||||||
// storeNamedTipSet stores the tipset at name (relative to the simulation).
|
// storeNamedTipSet stores the tipset at name (relative to the simulation).
|
||||||
func (sim *Simulation) storeNamedTipSet(name string, ts *types.TipSet) error {
|
func (sim *Simulation) storeNamedTipSet(name string, ts *types.TipSet) error {
|
||||||
if err := sim.Node.MetadataDS.Put(context.Background(), sim.key(name), ts.Key().Bytes()); err != nil {
|
if err := sim.Node.MetadataDS.Put(context.TODO(), sim.key(name), ts.Key().Bytes()); err != nil {
|
||||||
return xerrors.Errorf("failed to store tipset (%s/%s): %w", sim.name, name, err)
|
return xerrors.Errorf("failed to store tipset (%s/%s): %w", sim.name, name, err)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
@ -342,7 +342,7 @@ func (sim *Simulation) Walk(
|
|||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
msgs, err := sim.Node.Chainstore.MessagesForTipset(job.ts)
|
msgs, err := sim.Node.Chainstore.MessagesForTipset(ctx, job.ts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -1,7 +1,6 @@
|
|||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
|
||||||
"os"
|
"os"
|
||||||
|
|
||||||
dstore "github.com/ipfs/go-datastore"
|
dstore "github.com/ipfs/go-datastore"
|
||||||
@ -88,7 +87,7 @@ func restore(cctx *cli.Context, r repo.Repo) error {
|
|||||||
|
|
||||||
log.Info("Restoring metadata backup")
|
log.Info("Restoring metadata backup")
|
||||||
|
|
||||||
mds, err := lr.Datastore(context.TODO(), "/metadata")
|
mds, err := lr.Datastore(cctx.Context, "/metadata")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -111,10 +110,10 @@ func restore(cctx *cli.Context, r repo.Repo) error {
|
|||||||
log.Info("Resetting chainstore metadata")
|
log.Info("Resetting chainstore metadata")
|
||||||
|
|
||||||
chainHead := dstore.NewKey("head")
|
chainHead := dstore.NewKey("head")
|
||||||
if err := mds.Delete(context.Background(), chainHead); err != nil {
|
if err := mds.Delete(cctx.Context, chainHead); err != nil {
|
||||||
return xerrors.Errorf("clearing chain head: %w", err)
|
return xerrors.Errorf("clearing chain head: %w", err)
|
||||||
}
|
}
|
||||||
if err := store.FlushValidationCache(context.Background(), mds); err != nil {
|
if err := store.FlushValidationCache(cctx.Context, mds); err != nil {
|
||||||
return xerrors.Errorf("clearing chain validation cache: %w", err)
|
return xerrors.Errorf("clearing chain validation cache: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -474,7 +474,7 @@ func ImportChain(ctx context.Context, r repo.Repo, fname string, snapshot bool)
|
|||||||
return xerrors.Errorf("failed to open blockstore: %w", err)
|
return xerrors.Errorf("failed to open blockstore: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
mds, err := lr.Datastore(context.TODO(), "/metadata")
|
mds, err := lr.Datastore(ctx, "/metadata")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -499,14 +499,14 @@ func ImportChain(ctx context.Context, r repo.Repo, fname string, snapshot bool)
|
|||||||
bar.Units = pb.U_BYTES
|
bar.Units = pb.U_BYTES
|
||||||
|
|
||||||
bar.Start()
|
bar.Start()
|
||||||
ts, err := cst.Import(br)
|
ts, err := cst.Import(ctx, br)
|
||||||
bar.Finish()
|
bar.Finish()
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("importing chain failed: %w", err)
|
return xerrors.Errorf("importing chain failed: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := cst.FlushValidationCache(context.Background()); err != nil {
|
if err := cst.FlushValidationCache(ctx); err != nil {
|
||||||
return xerrors.Errorf("flushing validation cache failed: %w", err)
|
return xerrors.Errorf("flushing validation cache failed: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -515,7 +515,7 @@ func ImportChain(ctx context.Context, r repo.Repo, fname string, snapshot bool)
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
err = cst.SetGenesis(context.Background(), gb.Blocks()[0])
|
err = cst.SetGenesis(ctx, gb.Blocks()[0])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -282,7 +282,7 @@ func writeStateToTempCAR(bs blockstore.Blockstore, roots ...cid.Cid) (string, er
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
// ignore things we don't have, the state tree is incomplete.
|
// ignore things we don't have, the state tree is incomplete.
|
||||||
if has, err := bs.Has(context.Background(), link.Cid); err != nil {
|
if has, err := bs.Has(context.TODO(), link.Cid); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
} else if has {
|
} else if has {
|
||||||
out = append(out, link)
|
out = append(out, link)
|
||||||
@ -317,7 +317,7 @@ func LoadBlockstore(vectorCAR schema.Base64EncodedBytes) (blockstore.Blockstore,
|
|||||||
defer r.Close() // nolint
|
defer r.Close() // nolint
|
||||||
|
|
||||||
// Load the CAR embedded in the test vector into the Blockstore.
|
// Load the CAR embedded in the test vector into the Blockstore.
|
||||||
_, err = car.LoadCar(context.Background(), bs, r)
|
_, err = car.LoadCar(context.TODO(), bs, r)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to load state tree car from test vector: %s", err)
|
return nil, fmt.Errorf("failed to load state tree car from test vector: %s", err)
|
||||||
}
|
}
|
||||||
|
@ -116,7 +116,7 @@ func (m *ChainModule) ChainGetBlockMessages(ctx context.Context, msg cid.Cid) (*
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
bmsgs, smsgs, err := m.Chain.MessagesForBlock(b)
|
bmsgs, smsgs, err := m.Chain.MessagesForBlock(ctx, b)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -159,7 +159,7 @@ func (a *ChainAPI) ChainGetParentMessages(ctx context.Context, bcid cid.Cid) ([]
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
cm, err := a.Chain.MessagesForTipset(pts)
|
cm, err := a.Chain.MessagesForTipset(ctx, pts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -191,14 +191,14 @@ func (a *ChainAPI) ChainGetParentReceipts(ctx context.Context, bcid cid.Cid) ([]
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
cm, err := a.Chain.MessagesForTipset(pts)
|
cm, err := a.Chain.MessagesForTipset(ctx, pts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
var out []*types.MessageReceipt
|
var out []*types.MessageReceipt
|
||||||
for i := 0; i < len(cm); i++ {
|
for i := 0; i < len(cm); i++ {
|
||||||
r, err := a.Chain.GetParentReceipt(b, i)
|
r, err := a.Chain.GetParentReceipt(ctx, b, i)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -220,7 +220,7 @@ func (a *ChainAPI) ChainGetMessagesInTipset(ctx context.Context, tsk types.TipSe
|
|||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
cm, err := a.Chain.MessagesForTipset(ts)
|
cm, err := a.Chain.MessagesForTipset(ctx, ts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -577,7 +577,7 @@ func (a *ChainAPI) ChainGetNode(ctx context.Context, p string) (*api.IpldObject,
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (m *ChainModule) ChainGetMessage(ctx context.Context, mc cid.Cid) (*types.Message, error) {
|
func (m *ChainModule) ChainGetMessage(ctx context.Context, mc cid.Cid) (*types.Message, error) {
|
||||||
cm, err := m.Chain.GetCMessage(mc)
|
cm, err := m.Chain.GetCMessage(ctx, mc)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -82,14 +82,14 @@ type GasMeta struct {
|
|||||||
Limit int64
|
Limit int64
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *GasPriceCache) GetTSGasStats(cstore *store.ChainStore, ts *types.TipSet) ([]GasMeta, error) {
|
func (g *GasPriceCache) GetTSGasStats(ctx context.Context, cstore *store.ChainStore, ts *types.TipSet) ([]GasMeta, error) {
|
||||||
i, has := g.c.Get(ts.Key())
|
i, has := g.c.Get(ts.Key())
|
||||||
if has {
|
if has {
|
||||||
return i.([]GasMeta), nil
|
return i.([]GasMeta), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
var prices []GasMeta
|
var prices []GasMeta
|
||||||
msgs, err := cstore.MessagesForTipset(ts)
|
msgs, err := cstore.MessagesForTipset(ctx, ts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, xerrors.Errorf("loading messages: %w", err)
|
return nil, xerrors.Errorf("loading messages: %w", err)
|
||||||
}
|
}
|
||||||
@ -204,7 +204,7 @@ func gasEstimateGasPremium(ctx context.Context, cstore *store.ChainStore, cache
|
|||||||
}
|
}
|
||||||
|
|
||||||
blocks += len(pts.Blocks())
|
blocks += len(pts.Blocks())
|
||||||
meta, err := cache.GetTSGasStats(cstore, pts)
|
meta, err := cache.GetTSGasStats(ctx, cstore, pts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return types.BigInt{}, err
|
return types.BigInt{}, err
|
||||||
}
|
}
|
||||||
|
@ -87,7 +87,7 @@ func (a *MpoolAPI) MpoolPending(ctx context.Context, tsk types.TipSetKey) ([]*ty
|
|||||||
|
|
||||||
// different blocks in tipsets of the same height
|
// different blocks in tipsets of the same height
|
||||||
// we exclude messages that have been included in blocks in the mpool tipset
|
// we exclude messages that have been included in blocks in the mpool tipset
|
||||||
have, err := a.Mpool.MessagesForBlocks(mpts.Blocks())
|
have, err := a.Mpool.MessagesForBlocks(ctx, mpts.Blocks())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, xerrors.Errorf("getting messages for base ts: %w", err)
|
return nil, xerrors.Errorf("getting messages for base ts: %w", err)
|
||||||
}
|
}
|
||||||
@ -97,7 +97,7 @@ func (a *MpoolAPI) MpoolPending(ctx context.Context, tsk types.TipSetKey) ([]*ty
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
msgs, err := a.Mpool.MessagesForBlocks(ts.Blocks())
|
msgs, err := a.Mpool.MessagesForBlocks(ctx, ts.Blocks())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, xerrors.Errorf(": %w", err)
|
return nil, xerrors.Errorf(": %w", err)
|
||||||
}
|
}
|
||||||
|
@ -556,7 +556,7 @@ func (m *StateModule) StateWaitMsg(ctx context.Context, msg cid.Cid, confidence
|
|||||||
|
|
||||||
var returndec interface{}
|
var returndec interface{}
|
||||||
if recpt.ExitCode == 0 && len(recpt.Return) > 0 {
|
if recpt.ExitCode == 0 && len(recpt.Return) > 0 {
|
||||||
cmsg, err := m.Chain.GetCMessage(msg)
|
cmsg, err := m.Chain.GetCMessage(ctx, msg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, xerrors.Errorf("failed to load message after successful receipt search: %w", err)
|
return nil, xerrors.Errorf("failed to load message after successful receipt search: %w", err)
|
||||||
}
|
}
|
||||||
@ -874,7 +874,7 @@ func (a *StateAPI) StateListMessages(ctx context.Context, match *api.MessageMatc
|
|||||||
|
|
||||||
var out []cid.Cid
|
var out []cid.Cid
|
||||||
for ts.Height() >= toheight {
|
for ts.Height() >= toheight {
|
||||||
msgs, err := a.Chain.MessagesForTipset(ts)
|
msgs, err := a.Chain.MessagesForTipset(ctx, ts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, xerrors.Errorf("failed to get messages for tipset (%s): %w", ts.Key(), err)
|
return nil, xerrors.Errorf("failed to get messages for tipset (%s): %w", ts.Key(), err)
|
||||||
}
|
}
|
||||||
|
@ -64,12 +64,12 @@ func (a *SyncAPI) SyncSubmitBlock(ctx context.Context, blk *types.BlockMsg) erro
|
|||||||
}
|
}
|
||||||
|
|
||||||
// TODO: should we have some sort of fast path to adding a local block?
|
// TODO: should we have some sort of fast path to adding a local block?
|
||||||
bmsgs, err := a.Syncer.ChainStore().LoadMessagesFromCids(blk.BlsMessages)
|
bmsgs, err := a.Syncer.ChainStore().LoadMessagesFromCids(ctx, blk.BlsMessages)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("failed to load bls messages: %w", err)
|
return xerrors.Errorf("failed to load bls messages: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
smsgs, err := a.Syncer.ChainStore().LoadSignedMessagesFromCids(blk.SecpkMessages)
|
smsgs, err := a.Syncer.ChainStore().LoadSignedMessagesFromCids(ctx, blk.SecpkMessages)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("failed to load secpk message: %w", err)
|
return xerrors.Errorf("failed to load secpk message: %w", err)
|
||||||
}
|
}
|
||||||
@ -142,7 +142,7 @@ func (a *SyncAPI) SyncValidateTipset(ctx context.Context, tsk types.TipSetKey) (
|
|||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
|
|
||||||
fts, err := a.Syncer.ChainStore().TryFillTipSet(ts)
|
fts, err := a.Syncer.ChainStore().TryFillTipSet(ctx, ts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
|
@ -30,7 +30,7 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
// ChainBitswap uses a blockstore that bypasses all caches.
|
// ChainBitswap uses a blockstore that bypasses all caches.
|
||||||
func ChainBitswap(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, rt routing.Routing, bs dtypes.ExposedBlockstore) dtypes.ChainBitswap {
|
func ChainBitswap(lc fx.Lifecycle, mctx helpers.MetricsCtx, host host.Host, rt routing.Routing, bs dtypes.ExposedBlockstore) dtypes.ChainBitswap {
|
||||||
// prefix protocol for chain bitswap
|
// prefix protocol for chain bitswap
|
||||||
// (so bitswap uses /chain/ipfs/bitswap/1.0.0 internally for chain sync stuff)
|
// (so bitswap uses /chain/ipfs/bitswap/1.0.0 internally for chain sync stuff)
|
||||||
bitswapNetwork := network.NewFromIpfsHost(host, rt, network.Prefix("/chain"))
|
bitswapNetwork := network.NewFromIpfsHost(host, rt, network.Prefix("/chain"))
|
||||||
@ -58,8 +58,8 @@ func ChainBlockService(bs dtypes.ExposedBlockstore, rem dtypes.ChainBitswap) dty
|
|||||||
return blockservice.New(bs, rem)
|
return blockservice.New(bs, rem)
|
||||||
}
|
}
|
||||||
|
|
||||||
func MessagePool(lc fx.Lifecycle, us stmgr.UpgradeSchedule, mpp messagepool.Provider, ds dtypes.MetadataDS, nn dtypes.NetworkName, j journal.Journal, protector dtypes.GCReferenceProtector) (*messagepool.MessagePool, error) {
|
func MessagePool(lc fx.Lifecycle, mctx helpers.MetricsCtx, us stmgr.UpgradeSchedule, mpp messagepool.Provider, ds dtypes.MetadataDS, nn dtypes.NetworkName, j journal.Journal, protector dtypes.GCReferenceProtector) (*messagepool.MessagePool, error) {
|
||||||
mp, err := messagepool.New(context.Background(), mpp, ds, us, nn, j)
|
mp, err := messagepool.New(helpers.LifecycleCtx(mctx, lc), mpp, ds, us, nn, j)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, xerrors.Errorf("constructing mpool: %w", err)
|
return nil, xerrors.Errorf("constructing mpool: %w", err)
|
||||||
}
|
}
|
||||||
@ -73,6 +73,7 @@ func MessagePool(lc fx.Lifecycle, us stmgr.UpgradeSchedule, mpp messagepool.Prov
|
|||||||
}
|
}
|
||||||
|
|
||||||
func ChainStore(lc fx.Lifecycle,
|
func ChainStore(lc fx.Lifecycle,
|
||||||
|
mctx helpers.MetricsCtx,
|
||||||
cbs dtypes.ChainBlockstore,
|
cbs dtypes.ChainBlockstore,
|
||||||
sbs dtypes.StateBlockstore,
|
sbs dtypes.StateBlockstore,
|
||||||
ds dtypes.MetadataDS,
|
ds dtypes.MetadataDS,
|
||||||
@ -83,7 +84,7 @@ func ChainStore(lc fx.Lifecycle,
|
|||||||
|
|
||||||
chain := store.NewChainStore(cbs, sbs, ds, weight, j)
|
chain := store.NewChainStore(cbs, sbs, ds, weight, j)
|
||||||
|
|
||||||
if err := chain.Load(context.Background()); err != nil {
|
if err := chain.Load(helpers.LifecycleCtx(mctx, lc)); err != nil {
|
||||||
log.Warnf("loading chain state from disk: %s", err)
|
log.Warnf("loading chain state from disk: %s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -40,11 +40,12 @@ import (
|
|||||||
"github.com/filecoin-project/lotus/node/impl/full"
|
"github.com/filecoin-project/lotus/node/impl/full"
|
||||||
payapi "github.com/filecoin-project/lotus/node/impl/paych"
|
payapi "github.com/filecoin-project/lotus/node/impl/paych"
|
||||||
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
||||||
|
"github.com/filecoin-project/lotus/node/modules/helpers"
|
||||||
"github.com/filecoin-project/lotus/node/repo"
|
"github.com/filecoin-project/lotus/node/repo"
|
||||||
"github.com/filecoin-project/lotus/node/repo/imports"
|
"github.com/filecoin-project/lotus/node/repo/imports"
|
||||||
)
|
)
|
||||||
|
|
||||||
func HandleMigrateClientFunds(lc fx.Lifecycle, ds dtypes.MetadataDS, wallet full.WalletAPI, fundMgr *market.FundManager) {
|
func HandleMigrateClientFunds(lc fx.Lifecycle, mctx helpers.MetricsCtx, ds dtypes.MetadataDS, wallet full.WalletAPI, fundMgr *market.FundManager) {
|
||||||
lc.Append(fx.Hook{
|
lc.Append(fx.Hook{
|
||||||
OnStart: func(ctx context.Context) error {
|
OnStart: func(ctx context.Context) error {
|
||||||
addr, err := wallet.WalletDefaultAddress(ctx)
|
addr, err := wallet.WalletDefaultAddress(ctx)
|
||||||
@ -52,7 +53,7 @@ func HandleMigrateClientFunds(lc fx.Lifecycle, ds dtypes.MetadataDS, wallet full
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
b, err := ds.Get(context.Background(), datastore.NewKey("/marketfunds/client"))
|
b, err := ds.Get(helpers.LifecycleCtx(mctx, lc), datastore.NewKey("/marketfunds/client"))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if xerrors.Is(err, datastore.ErrNotFound) {
|
if xerrors.Is(err, datastore.ErrNotFound) {
|
||||||
return nil
|
return nil
|
||||||
@ -73,7 +74,7 @@ func HandleMigrateClientFunds(lc fx.Lifecycle, ds dtypes.MetadataDS, wallet full
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
return ds.Delete(context.Background(), datastore.NewKey("/marketfunds/client"))
|
return ds.Delete(helpers.LifecycleCtx(mctx, lc), datastore.NewKey("/marketfunds/client"))
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
@ -2,9 +2,10 @@ package modules
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"context"
|
|
||||||
"os"
|
"os"
|
||||||
|
|
||||||
|
"go.uber.org/fx"
|
||||||
|
|
||||||
"github.com/ipfs/go-datastore"
|
"github.com/ipfs/go-datastore"
|
||||||
"github.com/ipld/go-car"
|
"github.com/ipld/go-car"
|
||||||
"golang.org/x/xerrors"
|
"golang.org/x/xerrors"
|
||||||
@ -12,6 +13,7 @@ import (
|
|||||||
"github.com/filecoin-project/lotus/chain/store"
|
"github.com/filecoin-project/lotus/chain/store"
|
||||||
"github.com/filecoin-project/lotus/chain/types"
|
"github.com/filecoin-project/lotus/chain/types"
|
||||||
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
||||||
|
"github.com/filecoin-project/lotus/node/modules/helpers"
|
||||||
)
|
)
|
||||||
|
|
||||||
func ErrorGenesis() Genesis {
|
func ErrorGenesis() Genesis {
|
||||||
@ -20,17 +22,18 @@ func ErrorGenesis() Genesis {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func LoadGenesis(genBytes []byte) func(dtypes.ChainBlockstore) Genesis {
|
func LoadGenesis(genBytes []byte) func(fx.Lifecycle, helpers.MetricsCtx, dtypes.ChainBlockstore) Genesis {
|
||||||
return func(bs dtypes.ChainBlockstore) Genesis {
|
return func(lc fx.Lifecycle, mctx helpers.MetricsCtx, bs dtypes.ChainBlockstore) Genesis {
|
||||||
return func() (header *types.BlockHeader, e error) {
|
return func() (header *types.BlockHeader, e error) {
|
||||||
c, err := car.LoadCar(context.Background(), bs, bytes.NewReader(genBytes))
|
ctx := helpers.LifecycleCtx(mctx, lc)
|
||||||
|
c, err := car.LoadCar(ctx, bs, bytes.NewReader(genBytes))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, xerrors.Errorf("loading genesis car file failed: %w", err)
|
return nil, xerrors.Errorf("loading genesis car file failed: %w", err)
|
||||||
}
|
}
|
||||||
if len(c.Roots) != 1 {
|
if len(c.Roots) != 1 {
|
||||||
return nil, xerrors.New("expected genesis file to have one root")
|
return nil, xerrors.New("expected genesis file to have one root")
|
||||||
}
|
}
|
||||||
root, err := bs.Get(context.Background(), c.Roots[0])
|
root, err := bs.Get(ctx, c.Roots[0])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -46,8 +49,9 @@ func LoadGenesis(genBytes []byte) func(dtypes.ChainBlockstore) Genesis {
|
|||||||
|
|
||||||
func DoSetGenesis(_ dtypes.AfterGenesisSet) {}
|
func DoSetGenesis(_ dtypes.AfterGenesisSet) {}
|
||||||
|
|
||||||
func SetGenesis(cs *store.ChainStore, g Genesis) (dtypes.AfterGenesisSet, error) {
|
func SetGenesis(lc fx.Lifecycle, mctx helpers.MetricsCtx, cs *store.ChainStore, g Genesis) (dtypes.AfterGenesisSet, error) {
|
||||||
genFromRepo, err := cs.GetGenesis(context.Background())
|
ctx := helpers.LifecycleCtx(mctx, lc)
|
||||||
|
genFromRepo, err := cs.GetGenesis(ctx)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
if os.Getenv("LOTUS_SKIP_GENESIS_CHECK") != "_yes_" {
|
if os.Getenv("LOTUS_SKIP_GENESIS_CHECK") != "_yes_" {
|
||||||
expectedGenesis, err := g()
|
expectedGenesis, err := g()
|
||||||
@ -70,5 +74,5 @@ func SetGenesis(cs *store.ChainStore, g Genesis) (dtypes.AfterGenesisSet, error)
|
|||||||
return dtypes.AfterGenesisSet{}, xerrors.Errorf("genesis func failed: %w", err)
|
return dtypes.AfterGenesisSet{}, xerrors.Errorf("genesis func failed: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return dtypes.AfterGenesisSet{}, cs.SetGenesis(context.Background(), genesis)
|
return dtypes.AfterGenesisSet{}, cs.SetGenesis(ctx, genesis)
|
||||||
}
|
}
|
||||||
|
@ -228,8 +228,8 @@ func BuiltinDrandConfig() dtypes.DrandSchedule {
|
|||||||
return build.DrandConfigSchedule()
|
return build.DrandConfigSchedule()
|
||||||
}
|
}
|
||||||
|
|
||||||
func RandomSchedule(p RandomBeaconParams, _ dtypes.AfterGenesisSet) (beacon.Schedule, error) {
|
func RandomSchedule(lc fx.Lifecycle, mctx helpers.MetricsCtx, p RandomBeaconParams, _ dtypes.AfterGenesisSet) (beacon.Schedule, error) {
|
||||||
gen, err := p.Cs.GetGenesis(context.Background())
|
gen, err := p.Cs.GetGenesis(helpers.LifecycleCtx(mctx, lc))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -78,7 +78,7 @@ var (
|
|||||||
)
|
)
|
||||||
|
|
||||||
func minerAddrFromDS(ds dtypes.MetadataDS) (address.Address, error) {
|
func minerAddrFromDS(ds dtypes.MetadataDS) (address.Address, error) {
|
||||||
maddrb, err := ds.Get(context.Background(), datastore.NewKey("miner-address"))
|
maddrb, err := ds.Get(context.TODO(), datastore.NewKey("miner-address"))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return address.Undef, err
|
return address.Undef, err
|
||||||
}
|
}
|
||||||
@ -300,7 +300,7 @@ func HandleDeals(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, h sto
|
|||||||
func HandleMigrateProviderFunds(lc fx.Lifecycle, ds dtypes.MetadataDS, node api.FullNode, minerAddress dtypes.MinerAddress) {
|
func HandleMigrateProviderFunds(lc fx.Lifecycle, ds dtypes.MetadataDS, node api.FullNode, minerAddress dtypes.MinerAddress) {
|
||||||
lc.Append(fx.Hook{
|
lc.Append(fx.Hook{
|
||||||
OnStart: func(ctx context.Context) error {
|
OnStart: func(ctx context.Context) error {
|
||||||
b, err := ds.Get(context.Background(), datastore.NewKey("/marketfunds/provider"))
|
b, err := ds.Get(ctx, datastore.NewKey("/marketfunds/provider"))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if xerrors.Is(err, datastore.ErrNotFound) {
|
if xerrors.Is(err, datastore.ErrNotFound) {
|
||||||
return nil
|
return nil
|
||||||
@ -331,7 +331,7 @@ func HandleMigrateProviderFunds(lc fx.Lifecycle, ds dtypes.MetadataDS, node api.
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
return ds.Delete(context.Background(), datastore.NewKey("/marketfunds/provider"))
|
return ds.Delete(ctx, datastore.NewKey("/marketfunds/provider"))
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user