plumb more contexts

This commit is contained in:
vyzo 2021-12-17 11:42:09 +02:00
parent 62de84d5b6
commit dd327f0b22
32 changed files with 146 additions and 143 deletions

View File

@ -99,7 +99,7 @@ func openBlockstore(optsSupplier func(path string) Options) func(tb testing.TB,
}
func testMove(t *testing.T, optsF func(string) Options) {
ctx := context.TODO()
ctx := context.Background()
basePath, err := ioutil.TempDir("", "")
if err != nil {
t.Fatal(err)

View File

@ -44,7 +44,7 @@ func (s *Suite) RunTests(t *testing.T, prefix string) {
}
func (s *Suite) TestGetWhenKeyNotPresent(t *testing.T) {
ctx := context.TODO()
ctx := context.Background()
bs, _ := s.NewBlockstore(t)
if c, ok := bs.(io.Closer); ok {
defer func() { require.NoError(t, c.Close()) }()

View File

@ -299,7 +299,7 @@ func (t *TipSetExecutor) ExecuteTipSet(ctx context.Context, sm *stmgr.StateManag
r := rand.NewStateRand(sm.ChainStore(), ts.Cids(), sm.Beacon())
blkmsgs, err := sm.ChainStore().BlockMsgsForTipset(ts)
blkmsgs, err := sm.ChainStore().BlockMsgsForTipset(ctx, ts)
if err != nil {
return cid.Undef, cid.Undef, xerrors.Errorf("getting block messages for tipset: %w", err)
}

View File

@ -171,7 +171,7 @@ func (filec *FilecoinEC) ValidateBlock(ctx context.Context, b *types.FullBlock)
}
if stateroot != h.ParentStateRoot {
msgs, err := filec.store.MessagesForTipset(baseTs)
msgs, err := filec.store.MessagesForTipset(ctx, baseTs)
if err != nil {
log.Error("failed to load messages for tipset during tipset state mismatch error: ", err)
} else {
@ -519,7 +519,7 @@ func (filec *FilecoinEC) checkBlockMessages(ctx context.Context, b *types.FullBl
return xerrors.Errorf("block had invalid bls message at index %d: %w", i, err)
}
c, err := store.PutMessage(tmpbs, m)
c, err := store.PutMessage(ctx, tmpbs, m)
if err != nil {
return xerrors.Errorf("failed to store message %s: %w", m.Cid(), err)
}
@ -553,7 +553,7 @@ func (filec *FilecoinEC) checkBlockMessages(ctx context.Context, b *types.FullBl
return xerrors.Errorf("secpk message %s has invalid signature: %w", m.Cid(), err)
}
c, err := store.PutMessage(tmpbs, m)
c, err := store.PutMessage(ctx, tmpbs, m)
if err != nil {
return xerrors.Errorf("failed to store message %s: %w", m.Cid(), err)
}

View File

@ -59,14 +59,14 @@ func (filec *FilecoinEC) CreateBlock(ctx context.Context, w api.Wallet, bt *api.
blsSigs = append(blsSigs, msg.Signature)
blsMessages = append(blsMessages, &msg.Message)
c, err := filec.sm.ChainStore().PutMessage(&msg.Message)
c, err := filec.sm.ChainStore().PutMessage(ctx, &msg.Message)
if err != nil {
return nil, err
}
blsMsgCids = append(blsMsgCids, c)
} else {
c, err := filec.sm.ChainStore().PutMessage(msg)
c, err := filec.sm.ChainStore().PutMessage(ctx, msg)
if err != nil {
return nil, err
}

View File

@ -172,7 +172,7 @@ func collectChainSegment(ctx context.Context, cs *store.ChainStore, req *validat
}
if req.options.IncludeMessages {
bmsgs, bmincl, smsgs, smincl, err := gatherMessages(cs, ts)
bmsgs, bmincl, smsgs, smincl, err := gatherMessages(ctx, cs, ts)
if err != nil {
return nil, xerrors.Errorf("gather messages failed: %w", err)
}
@ -197,14 +197,14 @@ func collectChainSegment(ctx context.Context, cs *store.ChainStore, req *validat
}
}
func gatherMessages(cs *store.ChainStore, ts *types.TipSet) ([]*types.Message, [][]uint64, []*types.SignedMessage, [][]uint64, error) {
func gatherMessages(ctx context.Context, cs *store.ChainStore, ts *types.TipSet) ([]*types.Message, [][]uint64, []*types.SignedMessage, [][]uint64, error) {
blsmsgmap := make(map[cid.Cid]uint64)
secpkmsgmap := make(map[cid.Cid]uint64)
var secpkincl, blsincl [][]uint64
var blscids, secpkcids []cid.Cid
for _, block := range ts.Blocks() {
bc, sc, err := cs.ReadMsgMetaCids(block.Messages)
bc, sc, err := cs.ReadMsgMetaCids(ctx, block.Messages)
if err != nil {
return nil, nil, nil, nil, err
}
@ -237,12 +237,12 @@ func gatherMessages(cs *store.ChainStore, ts *types.TipSet) ([]*types.Message, [
secpkincl = append(secpkincl, smi)
}
blsmsgs, err := cs.LoadMessagesFromCids(blscids)
blsmsgs, err := cs.LoadMessagesFromCids(ctx, blscids)
if err != nil {
return nil, nil, nil, nil, err
}
secpkmsgs, err := cs.LoadSignedMessagesFromCids(secpkcids)
secpkmsgs, err := cs.LoadSignedMessagesFromCids(ctx, secpkcids)
if err != nil {
return nil, nil, nil, nil, err
}

View File

@ -909,12 +909,12 @@ func (mp *MessagePool) addLocked(ctx context.Context, m *types.SignedMessage, st
mp.blsSigCache.Add(m.Cid(), m.Signature)
}
if _, err := mp.api.PutMessage(m); err != nil {
if _, err := mp.api.PutMessage(ctx, m); err != nil {
log.Warnf("mpooladd cs.PutMessage failed: %s", err)
return err
}
if _, err := mp.api.PutMessage(&m.Message); err != nil {
if _, err := mp.api.PutMessage(ctx, &m.Message); err != nil {
log.Warnf("mpooladd cs.PutMessage failed: %s", err)
return err
}
@ -1216,7 +1216,7 @@ func (mp *MessagePool) HeadChange(ctx context.Context, revert []*types.TipSet, a
mp.curTs = pts
msgs, err := mp.MessagesForBlocks(ts.Blocks())
msgs, err := mp.MessagesForBlocks(ctx, ts.Blocks())
if err != nil {
log.Errorf("error retrieving messages for reverted block: %s", err)
merr = multierror.Append(merr, err)
@ -1232,7 +1232,7 @@ func (mp *MessagePool) HeadChange(ctx context.Context, revert []*types.TipSet, a
mp.curTs = ts
for _, b := range ts.Blocks() {
bmsgs, smsgs, err := mp.api.MessagesForBlock(b)
bmsgs, smsgs, err := mp.api.MessagesForBlock(ctx, b)
if err != nil {
xerr := xerrors.Errorf("failed to get messages for apply block %s(height %d) (msgroot = %s): %w", b.Cid(), b.Height, b.Messages, err)
log.Errorf("error retrieving messages for block: %s", xerr)
@ -1368,7 +1368,7 @@ func (mp *MessagePool) runHeadChange(ctx context.Context, from *types.TipSet, to
var merr error
for _, ts := range revert {
msgs, err := mp.MessagesForBlocks(ts.Blocks())
msgs, err := mp.MessagesForBlocks(ctx, ts.Blocks())
if err != nil {
log.Errorf("error retrieving messages for reverted block: %s", err)
merr = multierror.Append(merr, err)
@ -1382,7 +1382,7 @@ func (mp *MessagePool) runHeadChange(ctx context.Context, from *types.TipSet, to
for _, ts := range apply {
for _, b := range ts.Blocks() {
bmsgs, smsgs, err := mp.api.MessagesForBlock(b)
bmsgs, smsgs, err := mp.api.MessagesForBlock(ctx, b)
if err != nil {
xerr := xerrors.Errorf("failed to get messages for apply block %s(height %d) (msgroot = %s): %w", b.Cid(), b.Height, b.Messages, err)
log.Errorf("error retrieving messages for block: %s", xerr)
@ -1407,11 +1407,11 @@ type statBucket struct {
msgs map[uint64]*types.SignedMessage
}
func (mp *MessagePool) MessagesForBlocks(blks []*types.BlockHeader) ([]*types.SignedMessage, error) {
func (mp *MessagePool) MessagesForBlocks(ctx context.Context, blks []*types.BlockHeader) ([]*types.SignedMessage, error) {
out := make([]*types.SignedMessage, 0)
for _, b := range blks {
bmsgs, smsgs, err := mp.api.MessagesForBlock(b)
bmsgs, smsgs, err := mp.api.MessagesForBlock(ctx, b)
if err != nil {
return nil, xerrors.Errorf("failed to get messages for apply block %s(height %d) (msgroot = %s): %w", b.Cid(), b.Height, b.Messages, err)
}

View File

@ -23,12 +23,12 @@ var (
type Provider interface {
SubscribeHeadChanges(func(rev, app []*types.TipSet) error) *types.TipSet
PutMessage(m types.ChainMsg) (cid.Cid, error)
PutMessage(ctx context.Context, m types.ChainMsg) (cid.Cid, error)
PubSubPublish(string, []byte) error
GetActorAfter(address.Address, *types.TipSet) (*types.Actor, error)
StateAccountKeyAtFinality(context.Context, address.Address, *types.TipSet) (address.Address, error)
MessagesForBlock(*types.BlockHeader) ([]*types.Message, []*types.SignedMessage, error)
MessagesForTipset(*types.TipSet) ([]types.ChainMsg, error)
MessagesForBlock(context.Context, *types.BlockHeader) ([]*types.Message, []*types.SignedMessage, error)
MessagesForTipset(context.Context, *types.TipSet) ([]types.ChainMsg, error)
LoadTipSet(ctx context.Context, tsk types.TipSetKey) (*types.TipSet, error)
ChainComputeBaseFee(ctx context.Context, ts *types.TipSet) (types.BigInt, error)
IsLite() bool
@ -66,8 +66,8 @@ func (mpp *mpoolProvider) SubscribeHeadChanges(cb func(rev, app []*types.TipSet)
return mpp.sm.ChainStore().GetHeaviestTipSet()
}
func (mpp *mpoolProvider) PutMessage(m types.ChainMsg) (cid.Cid, error) {
return mpp.sm.ChainStore().PutMessage(m)
func (mpp *mpoolProvider) PutMessage(ctx context.Context, m types.ChainMsg) (cid.Cid, error) {
return mpp.sm.ChainStore().PutMessage(ctx, m)
}
func (mpp *mpoolProvider) PubSubPublish(k string, v []byte) error {
@ -103,12 +103,12 @@ func (mpp *mpoolProvider) StateAccountKeyAtFinality(ctx context.Context, addr ad
return mpp.sm.ResolveToKeyAddressAtFinality(ctx, addr, ts)
}
func (mpp *mpoolProvider) MessagesForBlock(h *types.BlockHeader) ([]*types.Message, []*types.SignedMessage, error) {
return mpp.sm.ChainStore().MessagesForBlock(h)
func (mpp *mpoolProvider) MessagesForBlock(ctx context.Context, h *types.BlockHeader) ([]*types.Message, []*types.SignedMessage, error) {
return mpp.sm.ChainStore().MessagesForBlock(ctx, h)
}
func (mpp *mpoolProvider) MessagesForTipset(ts *types.TipSet) ([]types.ChainMsg, error) {
return mpp.sm.ChainStore().MessagesForTipset(ts)
func (mpp *mpoolProvider) MessagesForTipset(ctx context.Context, ts *types.TipSet) ([]types.ChainMsg, error) {
return mpp.sm.ChainStore().MessagesForTipset(ctx, ts)
}
func (mpp *mpoolProvider) LoadTipSet(ctx context.Context, tsk types.TipSetKey) (*types.TipSet, error) {

View File

@ -20,7 +20,7 @@ func (sm *StateManager) WaitForMessage(ctx context.Context, mcid cid.Cid, confid
ctx, cancel := context.WithCancel(ctx)
defer cancel()
msg, err := sm.cs.GetCMessage(mcid)
msg, err := sm.cs.GetCMessage(ctx, mcid)
if err != nil {
return nil, nil, cid.Undef, fmt.Errorf("failed to load message: %w", err)
}
@ -130,7 +130,7 @@ func (sm *StateManager) WaitForMessage(ctx context.Context, mcid cid.Cid, confid
}
func (sm *StateManager) SearchForMessage(ctx context.Context, head *types.TipSet, mcid cid.Cid, lookbackLimit abi.ChainEpoch, allowReplaced bool) (*types.TipSet, *types.MessageReceipt, cid.Cid, error) {
msg, err := sm.cs.GetCMessage(mcid)
msg, err := sm.cs.GetCMessage(ctx, mcid)
if err != nil {
return nil, nil, cid.Undef, fmt.Errorf("failed to load message: %w", err)
}
@ -240,7 +240,7 @@ func (sm *StateManager) tipsetExecutedMessage(ctx context.Context, ts *types.Tip
return nil, cid.Undef, err
}
cm, err := sm.cs.MessagesForTipset(pts)
cm, err := sm.cs.MessagesForTipset(ctx, pts)
if err != nil {
return nil, cid.Undef, err
}
@ -267,7 +267,7 @@ func (sm *StateManager) tipsetExecutedMessage(ctx context.Context, ts *types.Tip
}
}
pr, err := sm.cs.GetParentReceipt(ts.Blocks()[0], i)
pr, err := sm.cs.GetParentReceipt(ctx, ts.Blocks()[0], i)
if err != nil {
return nil, cid.Undef, err
}

View File

@ -58,7 +58,7 @@ func (cs *ChainStore) ComputeBaseFee(ctx context.Context, ts *types.TipSet) (abi
seen := make(map[cid.Cid]struct{})
for _, b := range ts.Blocks() {
msg1, msg2, err := cs.MessagesForBlock(b)
msg1, msg2, err := cs.MessagesForBlock(ctx, b)
if err != nil {
return zero, xerrors.Errorf("error getting messages for: %s: %w", b.Cid(), err)
}

View File

@ -23,25 +23,25 @@ type storable interface {
ToStorageBlock() (block.Block, error)
}
func PutMessage(bs bstore.Blockstore, m storable) (cid.Cid, error) {
func PutMessage(ctx context.Context, bs bstore.Blockstore, m storable) (cid.Cid, error) {
b, err := m.ToStorageBlock()
if err != nil {
return cid.Undef, err
}
if err := bs.Put(context.TODO(), b); err != nil {
if err := bs.Put(ctx, b); err != nil {
return cid.Undef, err
}
return b.Cid(), nil
}
func (cs *ChainStore) PutMessage(m storable) (cid.Cid, error) {
return PutMessage(cs.chainBlockstore, m)
func (cs *ChainStore) PutMessage(ctx context.Context, m storable) (cid.Cid, error) {
return PutMessage(ctx, cs.chainBlockstore, m)
}
func (cs *ChainStore) GetCMessage(c cid.Cid) (types.ChainMsg, error) {
m, err := cs.GetMessage(c)
func (cs *ChainStore) GetCMessage(ctx context.Context, c cid.Cid) (types.ChainMsg, error) {
m, err := cs.GetMessage(ctx, c)
if err == nil {
return m, nil
}
@ -49,21 +49,21 @@ func (cs *ChainStore) GetCMessage(c cid.Cid) (types.ChainMsg, error) {
log.Warnf("GetCMessage: unexpected error getting unsigned message: %s", err)
}
return cs.GetSignedMessage(c)
return cs.GetSignedMessage(ctx, c)
}
func (cs *ChainStore) GetMessage(c cid.Cid) (*types.Message, error) {
func (cs *ChainStore) GetMessage(ctx context.Context, c cid.Cid) (*types.Message, error) {
var msg *types.Message
err := cs.chainLocalBlockstore.View(context.TODO(), c, func(b []byte) (err error) {
err := cs.chainLocalBlockstore.View(ctx, c, func(b []byte) (err error) {
msg, err = types.DecodeMessage(b)
return err
})
return msg, err
}
func (cs *ChainStore) GetSignedMessage(c cid.Cid) (*types.SignedMessage, error) {
func (cs *ChainStore) GetSignedMessage(ctx context.Context, c cid.Cid) (*types.SignedMessage, error) {
var msg *types.SignedMessage
err := cs.chainLocalBlockstore.View(context.TODO(), c, func(b []byte) (err error) {
err := cs.chainLocalBlockstore.View(ctx, c, func(b []byte) (err error) {
msg, err = types.DecodeSignedMessage(b)
return err
})
@ -103,7 +103,7 @@ type BlockMessages struct {
SecpkMessages []types.ChainMsg
}
func (cs *ChainStore) BlockMsgsForTipset(ts *types.TipSet) ([]BlockMessages, error) {
func (cs *ChainStore) BlockMsgsForTipset(ctx context.Context, ts *types.TipSet) ([]BlockMessages, error) {
// returned BlockMessages match block order in tipset
applied := make(map[address.Address]uint64)
@ -142,7 +142,7 @@ func (cs *ChainStore) BlockMsgsForTipset(ts *types.TipSet) ([]BlockMessages, err
var out []BlockMessages
for _, b := range ts.Blocks() {
bms, sms, err := cs.MessagesForBlock(b)
bms, sms, err := cs.MessagesForBlock(ctx, b)
if err != nil {
return nil, xerrors.Errorf("failed to get messages for block: %w", err)
}
@ -181,8 +181,8 @@ func (cs *ChainStore) BlockMsgsForTipset(ts *types.TipSet) ([]BlockMessages, err
return out, nil
}
func (cs *ChainStore) MessagesForTipset(ts *types.TipSet) ([]types.ChainMsg, error) {
bmsgs, err := cs.BlockMsgsForTipset(ts)
func (cs *ChainStore) MessagesForTipset(ctx context.Context, ts *types.TipSet) ([]types.ChainMsg, error) {
bmsgs, err := cs.BlockMsgsForTipset(ctx, ts)
if err != nil {
return nil, err
}
@ -206,7 +206,7 @@ type mmCids struct {
secpk []cid.Cid
}
func (cs *ChainStore) ReadMsgMetaCids(mmc cid.Cid) ([]cid.Cid, []cid.Cid, error) {
func (cs *ChainStore) ReadMsgMetaCids(ctx context.Context, mmc cid.Cid) ([]cid.Cid, []cid.Cid, error) {
o, ok := cs.mmCache.Get(mmc)
if ok {
mmcids := o.(*mmCids)
@ -215,7 +215,7 @@ func (cs *ChainStore) ReadMsgMetaCids(mmc cid.Cid) ([]cid.Cid, []cid.Cid, error)
cst := cbor.NewCborStore(cs.chainLocalBlockstore)
var msgmeta types.MsgMeta
if err := cst.Get(context.TODO(), mmc, &msgmeta); err != nil {
if err := cst.Get(ctx, mmc, &msgmeta); err != nil {
return nil, nil, xerrors.Errorf("failed to load msgmeta (%s): %w", mmc, err)
}
@ -237,18 +237,18 @@ func (cs *ChainStore) ReadMsgMetaCids(mmc cid.Cid) ([]cid.Cid, []cid.Cid, error)
return blscids, secpkcids, nil
}
func (cs *ChainStore) MessagesForBlock(b *types.BlockHeader) ([]*types.Message, []*types.SignedMessage, error) {
blscids, secpkcids, err := cs.ReadMsgMetaCids(b.Messages)
func (cs *ChainStore) MessagesForBlock(ctx context.Context, b *types.BlockHeader) ([]*types.Message, []*types.SignedMessage, error) {
blscids, secpkcids, err := cs.ReadMsgMetaCids(ctx, b.Messages)
if err != nil {
return nil, nil, err
}
blsmsgs, err := cs.LoadMessagesFromCids(blscids)
blsmsgs, err := cs.LoadMessagesFromCids(ctx, blscids)
if err != nil {
return nil, nil, xerrors.Errorf("loading bls messages for block: %w", err)
}
secpkmsgs, err := cs.LoadSignedMessagesFromCids(secpkcids)
secpkmsgs, err := cs.LoadSignedMessagesFromCids(ctx, secpkcids)
if err != nil {
return nil, nil, xerrors.Errorf("loading secpk messages for block: %w", err)
}
@ -256,8 +256,7 @@ func (cs *ChainStore) MessagesForBlock(b *types.BlockHeader) ([]*types.Message,
return blsmsgs, secpkmsgs, nil
}
func (cs *ChainStore) GetParentReceipt(b *types.BlockHeader, i int) (*types.MessageReceipt, error) {
ctx := context.TODO()
func (cs *ChainStore) GetParentReceipt(ctx context.Context, b *types.BlockHeader, i int) (*types.MessageReceipt, error) {
// block headers use adt0, for now.
a, err := blockadt.AsArray(cs.ActorStore(ctx), b.ParentMessageReceipts)
if err != nil {
@ -274,10 +273,10 @@ func (cs *ChainStore) GetParentReceipt(b *types.BlockHeader, i int) (*types.Mess
return &r, nil
}
func (cs *ChainStore) LoadMessagesFromCids(cids []cid.Cid) ([]*types.Message, error) {
func (cs *ChainStore) LoadMessagesFromCids(ctx context.Context, cids []cid.Cid) ([]*types.Message, error) {
msgs := make([]*types.Message, 0, len(cids))
for i, c := range cids {
m, err := cs.GetMessage(c)
m, err := cs.GetMessage(ctx, c)
if err != nil {
return nil, xerrors.Errorf("failed to get message: (%s):%d: %w", c, i, err)
}
@ -288,10 +287,10 @@ func (cs *ChainStore) LoadMessagesFromCids(cids []cid.Cid) ([]*types.Message, er
return msgs, nil
}
func (cs *ChainStore) LoadSignedMessagesFromCids(cids []cid.Cid) ([]*types.SignedMessage, error) {
func (cs *ChainStore) LoadSignedMessagesFromCids(ctx context.Context, cids []cid.Cid) ([]*types.SignedMessage, error) {
msgs := make([]*types.SignedMessage, 0, len(cids))
for i, c := range cids {
m, err := cs.GetSignedMessage(c)
m, err := cs.GetSignedMessage(ctx, c)
if err != nil {
return nil, xerrors.Errorf("failed to get message: (%s):%d: %w", c, i, err)
}

View File

@ -43,8 +43,7 @@ func (cs *ChainStore) Export(ctx context.Context, ts *types.TipSet, inclRecentRo
})
}
func (cs *ChainStore) Import(r io.Reader) (*types.TipSet, error) {
ctx := context.TODO()
func (cs *ChainStore) Import(ctx context.Context, r io.Reader) (*types.TipSet, error) {
// TODO: writing only to the state blockstore is incorrect.
// At this time, both the state and chain blockstores are backed by the
// universal store. When we physically segregate the stores, we will need

View File

@ -1108,11 +1108,11 @@ func (cs *ChainStore) ActorStore(ctx context.Context) adt.Store {
return ActorStore(ctx, cs.stateBlockstore)
}
func (cs *ChainStore) TryFillTipSet(ts *types.TipSet) (*FullTipSet, error) {
func (cs *ChainStore) TryFillTipSet(ctx context.Context, ts *types.TipSet) (*FullTipSet, error) {
var out []*types.FullBlock
for _, b := range ts.Blocks() {
bmsgs, smsgs, err := cs.MessagesForBlock(b)
bmsgs, smsgs, err := cs.MessagesForBlock(ctx, b)
if err != nil {
// TODO: check for 'not found' errors, and only return nil if this
// is actually a 'not found' error

View File

@ -298,11 +298,11 @@ func (syncer *Syncer) ValidateMsgMeta(fblk *types.FullBlock) error {
// into the blockstore.
blockstore := bstore.NewMemory()
cst := cbor.NewCborStore(blockstore)
ctx := context.Background()
var bcids, scids []cid.Cid
for _, m := range fblk.BlsMessages {
c, err := store.PutMessage(blockstore, m)
c, err := store.PutMessage(ctx, blockstore, m)
if err != nil {
return xerrors.Errorf("putting bls message to blockstore after msgmeta computation: %w", err)
}
@ -310,7 +310,7 @@ func (syncer *Syncer) ValidateMsgMeta(fblk *types.FullBlock) error {
}
for _, m := range fblk.SecpkMessages {
c, err := store.PutMessage(blockstore, m)
c, err := store.PutMessage(ctx, blockstore, m)
if err != nil {
return xerrors.Errorf("putting bls message to blockstore after msgmeta computation: %w", err)
}
@ -482,7 +482,7 @@ func (syncer *Syncer) tryLoadFullTipSet(ctx context.Context, tsk types.TipSetKey
fts := &store.FullTipSet{}
for _, b := range ts.Blocks() {
bmsgs, smsgs, err := syncer.store.MessagesForBlock(b)
bmsgs, smsgs, err := syncer.store.MessagesForBlock(ctx, b)
if err != nil {
return nil, err
}
@ -965,7 +965,7 @@ func (syncer *Syncer) iterFullTipsets(ctx context.Context, headers []*types.TipS
span.AddAttributes(trace.Int64Attribute("num_headers", int64(len(headers))))
for i := len(headers) - 1; i >= 0; {
fts, err := syncer.store.TryFillTipSet(headers[i])
fts, err := syncer.store.TryFillTipSet(ctx, headers[i])
if err != nil {
return err
}
@ -1138,7 +1138,7 @@ func persistMessages(ctx context.Context, bs bstore.Blockstore, bst *exchange.Co
for _, m := range bst.Bls {
//log.Infof("putting BLS message: %s", m.Cid())
if _, err := store.PutMessage(bs, m); err != nil {
if _, err := store.PutMessage(ctx, bs, m); err != nil {
log.Errorf("failed to persist messages: %+v", err)
return xerrors.Errorf("BLS message processing failed: %w", err)
}
@ -1148,7 +1148,7 @@ func persistMessages(ctx context.Context, bs bstore.Blockstore, bst *exchange.Co
return xerrors.Errorf("unknown signature type on message %s: %q", m.Cid(), m.Signature.Type)
}
//log.Infof("putting secp256k1 message: %s", m.Cid())
if _, err := store.PutMessage(bs, m); err != nil {
if _, err := store.PutMessage(ctx, bs, m); err != nil {
log.Errorf("failed to persist messages: %+v", err)
return xerrors.Errorf("secp256k1 message processing failed: %w", err)
}

View File

@ -304,7 +304,7 @@ var importBenchCmd = &cli.Command{
return fmt.Errorf("no CAR file provided for import")
}
head, err = cs.Import(carFile)
head, err = cs.Import(cctx.Context, carFile)
if err != nil {
return err
}
@ -327,7 +327,7 @@ var importBenchCmd = &cli.Command{
return xerrors.Errorf("failed to parse head tipset key: %w", err)
}
head, err = cs.LoadTipSet(context.Background(), types.NewTipSetKey(cids...))
head, err = cs.LoadTipSet(cctx.Context, types.NewTipSetKey(cids...))
if err != nil {
return err
}
@ -336,7 +336,7 @@ var importBenchCmd = &cli.Command{
if err != nil {
return err
}
head, err = cs.LoadTipSet(context.Background(), types.NewTipSetKey(cr.Header.Roots...))
head, err = cs.LoadTipSet(cctx.Context, types.NewTipSetKey(cr.Header.Roots...))
if err != nil {
return err
}
@ -353,7 +353,7 @@ var importBenchCmd = &cli.Command{
if cids, err = lcli.ParseTipSetString(tsk); err != nil {
return xerrors.Errorf("failed to parse genesis tipset key: %w", err)
}
genesis, err = cs.LoadTipSet(context.Background(), types.NewTipSetKey(cids...))
genesis, err = cs.LoadTipSet(cctx.Context, types.NewTipSetKey(cids...))
} else {
log.Warnf("getting genesis by height; this will be slow; pass in the genesis tipset through --genesis-tipset")
// fallback to the slow path of walking the chain.
@ -364,7 +364,7 @@ var importBenchCmd = &cli.Command{
return err
}
if err = cs.SetGenesis(context.Background(), genesis.Blocks()[0]); err != nil {
if err = cs.SetGenesis(cctx.Context, genesis.Blocks()[0]); err != nil {
return err
}
@ -375,10 +375,10 @@ var importBenchCmd = &cli.Command{
if cids, err = lcli.ParseTipSetString(tsk); err != nil {
return xerrors.Errorf("failed to end genesis tipset key: %w", err)
}
end, err = cs.LoadTipSet(context.Background(), types.NewTipSetKey(cids...))
end, err = cs.LoadTipSet(cctx.Context, types.NewTipSetKey(cids...))
} else if h := cctx.Int64("end-height"); h != 0 {
log.Infof("getting end tipset at height %d...", h)
end, err = cs.GetTipsetByHeight(context.TODO(), abi.ChainEpoch(h), head, true)
end, err = cs.GetTipsetByHeight(cctx.Context, abi.ChainEpoch(h), head, true)
}
if err != nil {
@ -397,7 +397,7 @@ var importBenchCmd = &cli.Command{
if cids, err = lcli.ParseTipSetString(tsk); err != nil {
return xerrors.Errorf("failed to start genesis tipset key: %w", err)
}
start, err = cs.LoadTipSet(context.Background(), types.NewTipSetKey(cids...))
start, err = cs.LoadTipSet(cctx.Context, types.NewTipSetKey(cids...))
} else if h := cctx.Int64("start-height"); h != 0 {
log.Infof("getting start tipset at height %d...", h)
// lookback from the end tipset (which falls back to head if not supplied).
@ -410,7 +410,7 @@ var importBenchCmd = &cli.Command{
if start != nil {
startEpoch = start.Height()
if err := cs.ForceHeadSilent(context.Background(), start); err != nil {
if err := cs.ForceHeadSilent(cctx.Context, start); err != nil {
// if err := cs.SetHead(start); err != nil {
return err
}
@ -421,7 +421,7 @@ var importBenchCmd = &cli.Command{
if h := ts.Height(); h%100 == 0 {
log.Infof("walking back the chain; loaded tipset at height %d...", h)
}
next, err := cs.LoadTipSet(context.Background(), ts.Parents())
next, err := cs.LoadTipSet(cctx.Context, ts.Parents())
if err != nil {
return err
}

View File

@ -347,7 +347,7 @@ func migratePreSealMeta(ctx context.Context, api v1api.FullNode, metadata string
return err
}
if err := mds.Put(context.Background(), sectorKey, b); err != nil {
if err := mds.Put(ctx, sectorKey, b); err != nil {
return err
}
@ -387,7 +387,7 @@ func migratePreSealMeta(ctx context.Context, api v1api.FullNode, metadata string
buf := make([]byte, binary.MaxVarintLen64)
size := binary.PutUvarint(buf, uint64(maxSectorID))
return mds.Put(context.Background(), datastore.NewKey(modules.StorageCounterDSPrefix), buf[:size])
return mds.Put(ctx, datastore.NewKey(modules.StorageCounterDSPrefix), buf[:size])
}
func findMarketDealID(ctx context.Context, api v1api.FullNode, deal market2.DealProposal) (abi.DealID, error) {
@ -428,7 +428,7 @@ func storageMinerInit(ctx context.Context, cctx *cli.Context, api v1api.FullNode
return xerrors.Errorf("peer ID from private key: %w", err)
}
mds, err := lr.Datastore(context.TODO(), "/metadata")
mds, err := lr.Datastore(ctx, "/metadata")
if err != nil {
return err
}
@ -441,7 +441,7 @@ func storageMinerInit(ctx context.Context, cctx *cli.Context, api v1api.FullNode
}
if cctx.Bool("genesis-miner") {
if err := mds.Put(context.Background(), datastore.NewKey("miner-address"), a.Bytes()); err != nil {
if err := mds.Put(ctx, datastore.NewKey("miner-address"), a.Bytes()); err != nil {
return err
}
@ -548,7 +548,7 @@ func storageMinerInit(ctx context.Context, cctx *cli.Context, api v1api.FullNode
}
log.Infof("Created new miner: %s", addr)
if err := mds.Put(context.Background(), datastore.NewKey("miner-address"), addr.Bytes()); err != nil {
if err := mds.Put(ctx, datastore.NewKey("miner-address"), addr.Bytes()); err != nil {
return err
}

View File

@ -233,7 +233,7 @@ func restore(ctx context.Context, cctx *cli.Context, targetPath string, strConfi
log.Info("Restoring metadata backup")
mds, err := lr.Datastore(context.TODO(), "/metadata")
mds, err := lr.Datastore(ctx, "/metadata")
if err != nil {
return err
}
@ -255,7 +255,7 @@ func restore(ctx context.Context, cctx *cli.Context, targetPath string, strConfi
log.Info("Checking actor metadata")
abytes, err := mds.Get(context.Background(), datastore.NewKey("miner-address"))
abytes, err := mds.Get(ctx, datastore.NewKey("miner-address"))
if err != nil {
return xerrors.Errorf("getting actor address from metadata datastore: %w", err)
}

View File

@ -31,7 +31,7 @@ func (sim *Simulation) storeMessages(ctx context.Context, messages []*types.Mess
// fail a pre-commit...
var msgCids []cid.Cid
for _, msg := range messages {
c, err := sim.Node.Chainstore.PutMessage(msg)
c, err := sim.Node.Chainstore.PutMessage(ctx, msg)
if err != nil {
return cid.Undef, err
}

View File

@ -90,7 +90,7 @@ type Simulation struct {
// loadConfig loads a simulation's config from the datastore. This must be called on startup and may
// be called to restore the config from-disk.
func (sim *Simulation) loadConfig() error {
configBytes, err := sim.Node.MetadataDS.Get(context.Background(), sim.key("config"))
configBytes, err := sim.Node.MetadataDS.Get(context.TODO(), sim.key("config"))
if err == nil {
err = json.Unmarshal(configBytes, &sim.config)
}
@ -111,7 +111,7 @@ func (sim *Simulation) saveConfig() error {
if err != nil {
return err
}
return sim.Node.MetadataDS.Put(context.Background(), sim.key("config"), buf)
return sim.Node.MetadataDS.Put(context.TODO(), sim.key("config"), buf)
}
var simulationPrefix = datastore.NewKey("/simulation")
@ -124,7 +124,7 @@ func (sim *Simulation) key(subkey string) datastore.Key {
// loadNamedTipSet the tipset with the given name (for this simulation)
func (sim *Simulation) loadNamedTipSet(name string) (*types.TipSet, error) {
tskBytes, err := sim.Node.MetadataDS.Get(context.Background(), sim.key(name))
tskBytes, err := sim.Node.MetadataDS.Get(context.TODO(), sim.key(name))
if err != nil {
return nil, xerrors.Errorf("failed to load tipset %s/%s: %w", sim.name, name, err)
}
@ -132,7 +132,7 @@ func (sim *Simulation) loadNamedTipSet(name string) (*types.TipSet, error) {
if err != nil {
return nil, xerrors.Errorf("failed to parse tipste %v (%s/%s): %w", tskBytes, sim.name, name, err)
}
ts, err := sim.Node.Chainstore.LoadTipSet(context.Background(), tsk)
ts, err := sim.Node.Chainstore.LoadTipSet(context.TODO(), tsk)
if err != nil {
return nil, xerrors.Errorf("failed to load tipset %s (%s/%s): %w", tsk, sim.name, name, err)
}
@ -141,7 +141,7 @@ func (sim *Simulation) loadNamedTipSet(name string) (*types.TipSet, error) {
// storeNamedTipSet stores the tipset at name (relative to the simulation).
func (sim *Simulation) storeNamedTipSet(name string, ts *types.TipSet) error {
if err := sim.Node.MetadataDS.Put(context.Background(), sim.key(name), ts.Key().Bytes()); err != nil {
if err := sim.Node.MetadataDS.Put(context.TODO(), sim.key(name), ts.Key().Bytes()); err != nil {
return xerrors.Errorf("failed to store tipset (%s/%s): %w", sim.name, name, err)
}
return nil
@ -342,7 +342,7 @@ func (sim *Simulation) Walk(
break
}
msgs, err := sim.Node.Chainstore.MessagesForTipset(job.ts)
msgs, err := sim.Node.Chainstore.MessagesForTipset(ctx, job.ts)
if err != nil {
return err
}

View File

@ -1,7 +1,6 @@
package main
import (
"context"
"os"
dstore "github.com/ipfs/go-datastore"
@ -88,7 +87,7 @@ func restore(cctx *cli.Context, r repo.Repo) error {
log.Info("Restoring metadata backup")
mds, err := lr.Datastore(context.TODO(), "/metadata")
mds, err := lr.Datastore(cctx.Context, "/metadata")
if err != nil {
return err
}
@ -111,10 +110,10 @@ func restore(cctx *cli.Context, r repo.Repo) error {
log.Info("Resetting chainstore metadata")
chainHead := dstore.NewKey("head")
if err := mds.Delete(context.Background(), chainHead); err != nil {
if err := mds.Delete(cctx.Context, chainHead); err != nil {
return xerrors.Errorf("clearing chain head: %w", err)
}
if err := store.FlushValidationCache(context.Background(), mds); err != nil {
if err := store.FlushValidationCache(cctx.Context, mds); err != nil {
return xerrors.Errorf("clearing chain validation cache: %w", err)
}

View File

@ -474,7 +474,7 @@ func ImportChain(ctx context.Context, r repo.Repo, fname string, snapshot bool)
return xerrors.Errorf("failed to open blockstore: %w", err)
}
mds, err := lr.Datastore(context.TODO(), "/metadata")
mds, err := lr.Datastore(ctx, "/metadata")
if err != nil {
return err
}
@ -499,14 +499,14 @@ func ImportChain(ctx context.Context, r repo.Repo, fname string, snapshot bool)
bar.Units = pb.U_BYTES
bar.Start()
ts, err := cst.Import(br)
ts, err := cst.Import(ctx, br)
bar.Finish()
if err != nil {
return xerrors.Errorf("importing chain failed: %w", err)
}
if err := cst.FlushValidationCache(context.Background()); err != nil {
if err := cst.FlushValidationCache(ctx); err != nil {
return xerrors.Errorf("flushing validation cache failed: %w", err)
}
@ -515,7 +515,7 @@ func ImportChain(ctx context.Context, r repo.Repo, fname string, snapshot bool)
return err
}
err = cst.SetGenesis(context.Background(), gb.Blocks()[0])
err = cst.SetGenesis(ctx, gb.Blocks()[0])
if err != nil {
return err
}

View File

@ -282,7 +282,7 @@ func writeStateToTempCAR(bs blockstore.Blockstore, roots ...cid.Cid) (string, er
continue
}
// ignore things we don't have, the state tree is incomplete.
if has, err := bs.Has(context.Background(), link.Cid); err != nil {
if has, err := bs.Has(context.TODO(), link.Cid); err != nil {
return nil, err
} else if has {
out = append(out, link)
@ -317,7 +317,7 @@ func LoadBlockstore(vectorCAR schema.Base64EncodedBytes) (blockstore.Blockstore,
defer r.Close() // nolint
// Load the CAR embedded in the test vector into the Blockstore.
_, err = car.LoadCar(context.Background(), bs, r)
_, err = car.LoadCar(context.TODO(), bs, r)
if err != nil {
return nil, fmt.Errorf("failed to load state tree car from test vector: %s", err)
}

View File

@ -116,7 +116,7 @@ func (m *ChainModule) ChainGetBlockMessages(ctx context.Context, msg cid.Cid) (*
return nil, err
}
bmsgs, smsgs, err := m.Chain.MessagesForBlock(b)
bmsgs, smsgs, err := m.Chain.MessagesForBlock(ctx, b)
if err != nil {
return nil, err
}
@ -159,7 +159,7 @@ func (a *ChainAPI) ChainGetParentMessages(ctx context.Context, bcid cid.Cid) ([]
return nil, err
}
cm, err := a.Chain.MessagesForTipset(pts)
cm, err := a.Chain.MessagesForTipset(ctx, pts)
if err != nil {
return nil, err
}
@ -191,14 +191,14 @@ func (a *ChainAPI) ChainGetParentReceipts(ctx context.Context, bcid cid.Cid) ([]
return nil, err
}
cm, err := a.Chain.MessagesForTipset(pts)
cm, err := a.Chain.MessagesForTipset(ctx, pts)
if err != nil {
return nil, err
}
var out []*types.MessageReceipt
for i := 0; i < len(cm); i++ {
r, err := a.Chain.GetParentReceipt(b, i)
r, err := a.Chain.GetParentReceipt(ctx, b, i)
if err != nil {
return nil, err
}
@ -220,7 +220,7 @@ func (a *ChainAPI) ChainGetMessagesInTipset(ctx context.Context, tsk types.TipSe
return nil, nil
}
cm, err := a.Chain.MessagesForTipset(ts)
cm, err := a.Chain.MessagesForTipset(ctx, ts)
if err != nil {
return nil, err
}
@ -577,7 +577,7 @@ func (a *ChainAPI) ChainGetNode(ctx context.Context, p string) (*api.IpldObject,
}
func (m *ChainModule) ChainGetMessage(ctx context.Context, mc cid.Cid) (*types.Message, error) {
cm, err := m.Chain.GetCMessage(mc)
cm, err := m.Chain.GetCMessage(ctx, mc)
if err != nil {
return nil, err
}

View File

@ -82,14 +82,14 @@ type GasMeta struct {
Limit int64
}
func (g *GasPriceCache) GetTSGasStats(cstore *store.ChainStore, ts *types.TipSet) ([]GasMeta, error) {
func (g *GasPriceCache) GetTSGasStats(ctx context.Context, cstore *store.ChainStore, ts *types.TipSet) ([]GasMeta, error) {
i, has := g.c.Get(ts.Key())
if has {
return i.([]GasMeta), nil
}
var prices []GasMeta
msgs, err := cstore.MessagesForTipset(ts)
msgs, err := cstore.MessagesForTipset(ctx, ts)
if err != nil {
return nil, xerrors.Errorf("loading messages: %w", err)
}
@ -204,7 +204,7 @@ func gasEstimateGasPremium(ctx context.Context, cstore *store.ChainStore, cache
}
blocks += len(pts.Blocks())
meta, err := cache.GetTSGasStats(cstore, pts)
meta, err := cache.GetTSGasStats(ctx, cstore, pts)
if err != nil {
return types.BigInt{}, err
}

View File

@ -87,7 +87,7 @@ func (a *MpoolAPI) MpoolPending(ctx context.Context, tsk types.TipSetKey) ([]*ty
// different blocks in tipsets of the same height
// we exclude messages that have been included in blocks in the mpool tipset
have, err := a.Mpool.MessagesForBlocks(mpts.Blocks())
have, err := a.Mpool.MessagesForBlocks(ctx, mpts.Blocks())
if err != nil {
return nil, xerrors.Errorf("getting messages for base ts: %w", err)
}
@ -97,7 +97,7 @@ func (a *MpoolAPI) MpoolPending(ctx context.Context, tsk types.TipSetKey) ([]*ty
}
}
msgs, err := a.Mpool.MessagesForBlocks(ts.Blocks())
msgs, err := a.Mpool.MessagesForBlocks(ctx, ts.Blocks())
if err != nil {
return nil, xerrors.Errorf(": %w", err)
}

View File

@ -556,7 +556,7 @@ func (m *StateModule) StateWaitMsg(ctx context.Context, msg cid.Cid, confidence
var returndec interface{}
if recpt.ExitCode == 0 && len(recpt.Return) > 0 {
cmsg, err := m.Chain.GetCMessage(msg)
cmsg, err := m.Chain.GetCMessage(ctx, msg)
if err != nil {
return nil, xerrors.Errorf("failed to load message after successful receipt search: %w", err)
}
@ -874,7 +874,7 @@ func (a *StateAPI) StateListMessages(ctx context.Context, match *api.MessageMatc
var out []cid.Cid
for ts.Height() >= toheight {
msgs, err := a.Chain.MessagesForTipset(ts)
msgs, err := a.Chain.MessagesForTipset(ctx, ts)
if err != nil {
return nil, xerrors.Errorf("failed to get messages for tipset (%s): %w", ts.Key(), err)
}

View File

@ -64,12 +64,12 @@ func (a *SyncAPI) SyncSubmitBlock(ctx context.Context, blk *types.BlockMsg) erro
}
// TODO: should we have some sort of fast path to adding a local block?
bmsgs, err := a.Syncer.ChainStore().LoadMessagesFromCids(blk.BlsMessages)
bmsgs, err := a.Syncer.ChainStore().LoadMessagesFromCids(ctx, blk.BlsMessages)
if err != nil {
return xerrors.Errorf("failed to load bls messages: %w", err)
}
smsgs, err := a.Syncer.ChainStore().LoadSignedMessagesFromCids(blk.SecpkMessages)
smsgs, err := a.Syncer.ChainStore().LoadSignedMessagesFromCids(ctx, blk.SecpkMessages)
if err != nil {
return xerrors.Errorf("failed to load secpk message: %w", err)
}
@ -142,7 +142,7 @@ func (a *SyncAPI) SyncValidateTipset(ctx context.Context, tsk types.TipSetKey) (
return false, err
}
fts, err := a.Syncer.ChainStore().TryFillTipSet(ts)
fts, err := a.Syncer.ChainStore().TryFillTipSet(ctx, ts)
if err != nil {
return false, err
}

View File

@ -30,7 +30,7 @@ import (
)
// ChainBitswap uses a blockstore that bypasses all caches.
func ChainBitswap(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, rt routing.Routing, bs dtypes.ExposedBlockstore) dtypes.ChainBitswap {
func ChainBitswap(lc fx.Lifecycle, mctx helpers.MetricsCtx, host host.Host, rt routing.Routing, bs dtypes.ExposedBlockstore) dtypes.ChainBitswap {
// prefix protocol for chain bitswap
// (so bitswap uses /chain/ipfs/bitswap/1.0.0 internally for chain sync stuff)
bitswapNetwork := network.NewFromIpfsHost(host, rt, network.Prefix("/chain"))
@ -58,8 +58,8 @@ func ChainBlockService(bs dtypes.ExposedBlockstore, rem dtypes.ChainBitswap) dty
return blockservice.New(bs, rem)
}
func MessagePool(lc fx.Lifecycle, us stmgr.UpgradeSchedule, mpp messagepool.Provider, ds dtypes.MetadataDS, nn dtypes.NetworkName, j journal.Journal, protector dtypes.GCReferenceProtector) (*messagepool.MessagePool, error) {
mp, err := messagepool.New(context.Background(), mpp, ds, us, nn, j)
func MessagePool(lc fx.Lifecycle, mctx helpers.MetricsCtx, us stmgr.UpgradeSchedule, mpp messagepool.Provider, ds dtypes.MetadataDS, nn dtypes.NetworkName, j journal.Journal, protector dtypes.GCReferenceProtector) (*messagepool.MessagePool, error) {
mp, err := messagepool.New(helpers.LifecycleCtx(mctx, lc), mpp, ds, us, nn, j)
if err != nil {
return nil, xerrors.Errorf("constructing mpool: %w", err)
}
@ -73,6 +73,7 @@ func MessagePool(lc fx.Lifecycle, us stmgr.UpgradeSchedule, mpp messagepool.Prov
}
func ChainStore(lc fx.Lifecycle,
mctx helpers.MetricsCtx,
cbs dtypes.ChainBlockstore,
sbs dtypes.StateBlockstore,
ds dtypes.MetadataDS,
@ -83,7 +84,7 @@ func ChainStore(lc fx.Lifecycle,
chain := store.NewChainStore(cbs, sbs, ds, weight, j)
if err := chain.Load(context.Background()); err != nil {
if err := chain.Load(helpers.LifecycleCtx(mctx, lc)); err != nil {
log.Warnf("loading chain state from disk: %s", err)
}

View File

@ -40,11 +40,12 @@ import (
"github.com/filecoin-project/lotus/node/impl/full"
payapi "github.com/filecoin-project/lotus/node/impl/paych"
"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/node/modules/helpers"
"github.com/filecoin-project/lotus/node/repo"
"github.com/filecoin-project/lotus/node/repo/imports"
)
func HandleMigrateClientFunds(lc fx.Lifecycle, ds dtypes.MetadataDS, wallet full.WalletAPI, fundMgr *market.FundManager) {
func HandleMigrateClientFunds(lc fx.Lifecycle, mctx helpers.MetricsCtx, ds dtypes.MetadataDS, wallet full.WalletAPI, fundMgr *market.FundManager) {
lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error {
addr, err := wallet.WalletDefaultAddress(ctx)
@ -52,7 +53,7 @@ func HandleMigrateClientFunds(lc fx.Lifecycle, ds dtypes.MetadataDS, wallet full
if err != nil {
return nil
}
b, err := ds.Get(context.Background(), datastore.NewKey("/marketfunds/client"))
b, err := ds.Get(helpers.LifecycleCtx(mctx, lc), datastore.NewKey("/marketfunds/client"))
if err != nil {
if xerrors.Is(err, datastore.ErrNotFound) {
return nil
@ -73,7 +74,7 @@ func HandleMigrateClientFunds(lc fx.Lifecycle, ds dtypes.MetadataDS, wallet full
return nil
}
return ds.Delete(context.Background(), datastore.NewKey("/marketfunds/client"))
return ds.Delete(helpers.LifecycleCtx(mctx, lc), datastore.NewKey("/marketfunds/client"))
},
})
}

View File

@ -2,9 +2,10 @@ package modules
import (
"bytes"
"context"
"os"
"go.uber.org/fx"
"github.com/ipfs/go-datastore"
"github.com/ipld/go-car"
"golang.org/x/xerrors"
@ -12,6 +13,7 @@ import (
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/node/modules/helpers"
)
func ErrorGenesis() Genesis {
@ -20,17 +22,18 @@ func ErrorGenesis() Genesis {
}
}
func LoadGenesis(genBytes []byte) func(dtypes.ChainBlockstore) Genesis {
return func(bs dtypes.ChainBlockstore) Genesis {
func LoadGenesis(genBytes []byte) func(fx.Lifecycle, helpers.MetricsCtx, dtypes.ChainBlockstore) Genesis {
return func(lc fx.Lifecycle, mctx helpers.MetricsCtx, bs dtypes.ChainBlockstore) Genesis {
return func() (header *types.BlockHeader, e error) {
c, err := car.LoadCar(context.Background(), bs, bytes.NewReader(genBytes))
ctx := helpers.LifecycleCtx(mctx, lc)
c, err := car.LoadCar(ctx, bs, bytes.NewReader(genBytes))
if err != nil {
return nil, xerrors.Errorf("loading genesis car file failed: %w", err)
}
if len(c.Roots) != 1 {
return nil, xerrors.New("expected genesis file to have one root")
}
root, err := bs.Get(context.Background(), c.Roots[0])
root, err := bs.Get(ctx, c.Roots[0])
if err != nil {
return nil, err
}
@ -46,8 +49,9 @@ func LoadGenesis(genBytes []byte) func(dtypes.ChainBlockstore) Genesis {
func DoSetGenesis(_ dtypes.AfterGenesisSet) {}
func SetGenesis(cs *store.ChainStore, g Genesis) (dtypes.AfterGenesisSet, error) {
genFromRepo, err := cs.GetGenesis(context.Background())
func SetGenesis(lc fx.Lifecycle, mctx helpers.MetricsCtx, cs *store.ChainStore, g Genesis) (dtypes.AfterGenesisSet, error) {
ctx := helpers.LifecycleCtx(mctx, lc)
genFromRepo, err := cs.GetGenesis(ctx)
if err == nil {
if os.Getenv("LOTUS_SKIP_GENESIS_CHECK") != "_yes_" {
expectedGenesis, err := g()
@ -70,5 +74,5 @@ func SetGenesis(cs *store.ChainStore, g Genesis) (dtypes.AfterGenesisSet, error)
return dtypes.AfterGenesisSet{}, xerrors.Errorf("genesis func failed: %w", err)
}
return dtypes.AfterGenesisSet{}, cs.SetGenesis(context.Background(), genesis)
return dtypes.AfterGenesisSet{}, cs.SetGenesis(ctx, genesis)
}

View File

@ -228,8 +228,8 @@ func BuiltinDrandConfig() dtypes.DrandSchedule {
return build.DrandConfigSchedule()
}
func RandomSchedule(p RandomBeaconParams, _ dtypes.AfterGenesisSet) (beacon.Schedule, error) {
gen, err := p.Cs.GetGenesis(context.Background())
func RandomSchedule(lc fx.Lifecycle, mctx helpers.MetricsCtx, p RandomBeaconParams, _ dtypes.AfterGenesisSet) (beacon.Schedule, error) {
gen, err := p.Cs.GetGenesis(helpers.LifecycleCtx(mctx, lc))
if err != nil {
return nil, err
}

View File

@ -78,7 +78,7 @@ var (
)
func minerAddrFromDS(ds dtypes.MetadataDS) (address.Address, error) {
maddrb, err := ds.Get(context.Background(), datastore.NewKey("miner-address"))
maddrb, err := ds.Get(context.TODO(), datastore.NewKey("miner-address"))
if err != nil {
return address.Undef, err
}
@ -300,7 +300,7 @@ func HandleDeals(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, h sto
func HandleMigrateProviderFunds(lc fx.Lifecycle, ds dtypes.MetadataDS, node api.FullNode, minerAddress dtypes.MinerAddress) {
lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error {
b, err := ds.Get(context.Background(), datastore.NewKey("/marketfunds/provider"))
b, err := ds.Get(ctx, datastore.NewKey("/marketfunds/provider"))
if err != nil {
if xerrors.Is(err, datastore.ErrNotFound) {
return nil
@ -331,7 +331,7 @@ func HandleMigrateProviderFunds(lc fx.Lifecycle, ds dtypes.MetadataDS, node api.
return nil
}
return ds.Delete(context.Background(), datastore.NewKey("/marketfunds/provider"))
return ds.Delete(ctx, datastore.NewKey("/marketfunds/provider"))
},
})
}