Merge pull request #2362 from filecoin-project/mock-clock

introduce the ability to mock time.
This commit is contained in:
Łukasz Magiera 2020-07-15 18:41:00 +02:00 committed by GitHub
commit 068f6a2e83
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
31 changed files with 134 additions and 101 deletions

View File

@ -4,6 +4,8 @@ import (
"context" "context"
"fmt" "fmt"
"github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build"
"os" "os"
"strings" "strings"
"testing" "testing"
@ -35,14 +37,14 @@ func TestPledgeSector(t *testing.T, b APIBuilder, blocktime time.Duration, nSect
if err := miner.NetConnect(ctx, addrinfo); err != nil { if err := miner.NetConnect(ctx, addrinfo); err != nil {
t.Fatal(err) t.Fatal(err)
} }
time.Sleep(time.Second) build.Clock.Sleep(time.Second)
mine := true mine := true
done := make(chan struct{}) done := make(chan struct{})
go func() { go func() {
defer close(done) defer close(done)
for mine { for mine {
time.Sleep(blocktime) build.Clock.Sleep(blocktime)
if err := sn[0].MineOne(ctx, func(bool, error) {}); err != nil { if err := sn[0].MineOne(ctx, func(bool, error) {}); err != nil {
t.Error(err) t.Error(err)
} }
@ -69,7 +71,7 @@ func pledgeSectors(t *testing.T, ctx context.Context, miner TestStorageNode, n i
break break
} }
time.Sleep(100 * time.Millisecond) build.Clock.Sleep(100 * time.Millisecond)
} }
fmt.Printf("All sectors is fsm\n") fmt.Printf("All sectors is fsm\n")
@ -94,7 +96,7 @@ func pledgeSectors(t *testing.T, ctx context.Context, miner TestStorageNode, n i
} }
} }
time.Sleep(100 * time.Millisecond) build.Clock.Sleep(100 * time.Millisecond)
fmt.Printf("WaitSeal: %d\n", len(s)) fmt.Printf("WaitSeal: %d\n", len(s))
} }
} }
@ -115,14 +117,14 @@ func TestWindowPost(t *testing.T, b APIBuilder, blocktime time.Duration, nSector
if err := miner.NetConnect(ctx, addrinfo); err != nil { if err := miner.NetConnect(ctx, addrinfo); err != nil {
t.Fatal(err) t.Fatal(err)
} }
time.Sleep(time.Second) build.Clock.Sleep(time.Second)
mine := true mine := true
done := make(chan struct{}) done := make(chan struct{})
go func() { go func() {
defer close(done) defer close(done)
for mine { for mine {
time.Sleep(blocktime) build.Clock.Sleep(blocktime)
if err := sn[0].MineOne(ctx, func(bool, error) {}); err != nil { if err := sn[0].MineOne(ctx, func(bool, error) {}); err != nil {
t.Error(err) t.Error(err)
} }
@ -150,7 +152,7 @@ func TestWindowPost(t *testing.T, b APIBuilder, blocktime time.Duration, nSector
if head.Height()%100 == 0 { if head.Height()%100 == 0 {
fmt.Printf("@%d\n", head.Height()) fmt.Printf("@%d\n", head.Height())
} }
time.Sleep(blocktime) build.Clock.Sleep(blocktime)
} }
p, err := client.StateMinerPower(ctx, maddr, types.EmptyTSK) p, err := client.StateMinerPower(ctx, maddr, types.EmptyTSK)

10
build/clock.go Normal file
View File

@ -0,0 +1,10 @@
package build
import "github.com/raulk/clock"
// Clock is the global clock for the system. In standard builds,
// we use a real-time clock, which maps to the `time` package.
//
// Tests that need control of time can replace this variable with
// clock.NewMock().
var Clock = clock.New()

View File

@ -2,12 +2,13 @@ package beacon
import ( import (
"context" "context"
"time"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/abi"
logging "github.com/ipfs/go-log" logging "github.com/ipfs/go-log"
"golang.org/x/xerrors" "golang.org/x/xerrors"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/types"
) )
var log = logging.Logger("beacon") var log = logging.Logger("beacon")
@ -52,7 +53,7 @@ func ValidateBlockValues(b RandomBeacon, h *types.BlockHeader, prevEntry types.B
} }
func BeaconEntriesForBlock(ctx context.Context, beacon RandomBeacon, round abi.ChainEpoch, prev types.BeaconEntry) ([]types.BeaconEntry, error) { func BeaconEntriesForBlock(ctx context.Context, beacon RandomBeacon, round abi.ChainEpoch, prev types.BeaconEntry) ([]types.BeaconEntry, error) {
start := time.Now() start := build.Clock.Now()
maxRound := beacon.MaxBeaconRoundForEpoch(round, prev) maxRound := beacon.MaxBeaconRoundForEpoch(round, prev)
if maxRound == prev.Round { if maxRound == prev.Round {
@ -81,7 +82,7 @@ func BeaconEntriesForBlock(ctx context.Context, beacon RandomBeacon, round abi.C
} }
} }
log.Debugw("fetching beacon entries", "took", time.Since(start), "numEntries", len(out)) log.Debugw("fetching beacon entries", "took", build.Clock.Since(start), "numEntries", len(out))
reverse(out) reverse(out)
return out, nil return out, nil
} }

View File

@ -21,6 +21,7 @@ import (
"github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/beacon" "github.com/filecoin-project/lotus/chain/beacon"
"github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/node/modules/dtypes"
@ -131,7 +132,7 @@ func (db *DrandBeacon) Entry(ctx context.Context, round uint64) <-chan beacon.Re
} }
go func() { go func() {
start := time.Now() start := build.Clock.Now()
log.Infow("start fetching randomness", "round", round) log.Infow("start fetching randomness", "round", round)
resp, err := db.client.Get(ctx, round) resp, err := db.client.Get(ctx, round)
@ -142,7 +143,7 @@ func (db *DrandBeacon) Entry(ctx context.Context, round uint64) <-chan beacon.Re
br.Entry.Round = resp.Round() br.Entry.Round = resp.Round()
br.Entry.Data = resp.Signature() br.Entry.Data = resp.Signature()
} }
log.Infow("done fetching randomness", "round", round, "took", time.Since(start)) log.Infow("done fetching randomness", "round", round, "took", build.Clock.Since(start))
out <- br out <- br
close(out) close(out)
}() }()

View File

@ -5,6 +5,7 @@ import (
"sync" "sync"
"time" "time"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types"
"github.com/hashicorp/golang-lru" "github.com/hashicorp/golang-lru"
peer "github.com/libp2p/go-libp2p-core/peer" peer "github.com/libp2p/go-libp2p-core/peer"
@ -37,14 +38,14 @@ func (brt *blockReceiptTracker) Add(p peer.ID, ts *types.TipSet) {
if !ok { if !ok {
pset := &peerSet{ pset := &peerSet{
peers: map[peer.ID]time.Time{ peers: map[peer.ID]time.Time{
p: time.Now(), p: build.Clock.Now(),
}, },
} }
brt.cache.Add(ts.Key(), pset) brt.cache.Add(ts.Key(), pset)
return return
} }
val.(*peerSet).peers[p] = time.Now() val.(*peerSet).peers[p] = build.Clock.Now()
} }
func (brt *blockReceiptTracker) GetPeers(ts *types.TipSet) []peer.ID { func (brt *blockReceiptTracker) GetPeers(ts *types.TipSet) []peer.ID {

View File

@ -126,7 +126,7 @@ func (bss *BlockSyncService) HandleStream(s inet.Stream) {
} }
writeDeadline := 60 * time.Second writeDeadline := 60 * time.Second
_ = s.SetDeadline(time.Now().Add(writeDeadline)) _ = s.SetDeadline(time.Now().Add(writeDeadline)) // always use real time for socket/stream deadlines.
if err := cborutil.WriteCborRPC(s, resp); err != nil { if err := cborutil.WriteCborRPC(s, resp); err != nil {
log.Warnw("failed to write back response for handle stream", "err", err, "peer", s.Conn().RemotePeer()) log.Warnw("failed to write back response for handle stream", "err", err, "peer", s.Conn().RemotePeer())
return return

View File

@ -21,6 +21,7 @@ import (
"golang.org/x/xerrors" "golang.org/x/xerrors"
cborutil "github.com/filecoin-project/go-cbor-util" cborutil "github.com/filecoin-project/go-cbor-util"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/store" "github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types"
incrt "github.com/filecoin-project/lotus/lib/increadtimeout" incrt "github.com/filecoin-project/lotus/lib/increadtimeout"
@ -91,7 +92,7 @@ func (bs *BlockSync) GetBlocks(ctx context.Context, tsk types.TipSetKey, count i
// randomize the first few peers so we don't always pick the same peer // randomize the first few peers so we don't always pick the same peer
shufflePrefix(peers) shufflePrefix(peers)
start := time.Now() start := build.Clock.Now()
var oerr error var oerr error
for _, p := range peers { for _, p := range peers {
@ -117,7 +118,7 @@ func (bs *BlockSync) GetBlocks(ctx context.Context, tsk types.TipSetKey, count i
if err != nil { if err != nil {
return nil, xerrors.Errorf("success response from peer failed to process: %w", err) return nil, xerrors.Errorf("success response from peer failed to process: %w", err)
} }
bs.syncPeers.logGlobalSuccess(time.Since(start)) bs.syncPeers.logGlobalSuccess(build.Clock.Since(start))
bs.host.ConnManager().TagPeer(p, "bsync", 25) bs.host.ConnManager().TagPeer(p, "bsync", 25)
return resp, nil return resp, nil
} }
@ -197,7 +198,7 @@ func (bs *BlockSync) GetChainMessages(ctx context.Context, h *types.TipSet, coun
} }
var err error var err error
start := time.Now() start := build.Clock.Now()
for _, p := range peers { for _, p := range peers {
res, rerr := bs.sendRequestToPeer(ctx, p, req) res, rerr := bs.sendRequestToPeer(ctx, p, req)
@ -208,7 +209,7 @@ func (bs *BlockSync) GetChainMessages(ctx context.Context, h *types.TipSet, coun
} }
if res.Status == StatusOK { if res.Status == StatusOK {
bs.syncPeers.logGlobalSuccess(time.Since(start)) bs.syncPeers.logGlobalSuccess(build.Clock.Since(start))
return res.Chain, nil return res.Chain, nil
} }
@ -284,17 +285,17 @@ func (bs *BlockSync) fetchBlocksBlockSync(ctx context.Context, p peer.ID, req *B
ctx, span := trace.StartSpan(ctx, "blockSyncFetch") ctx, span := trace.StartSpan(ctx, "blockSyncFetch")
defer span.End() defer span.End()
start := time.Now() start := build.Clock.Now()
s, err := bs.host.NewStream(inet.WithNoDial(ctx, "should already have connection"), p, BlockSyncProtocolID) s, err := bs.host.NewStream(inet.WithNoDial(ctx, "should already have connection"), p, BlockSyncProtocolID)
if err != nil { if err != nil {
bs.RemovePeer(p) bs.RemovePeer(p)
return nil, xerrors.Errorf("failed to open stream to peer: %w", err) return nil, xerrors.Errorf("failed to open stream to peer: %w", err)
} }
_ = s.SetWriteDeadline(time.Now().Add(5 * time.Second)) _ = s.SetWriteDeadline(time.Now().Add(5 * time.Second)) // always use real time for socket/stream deadlines.
if err := cborutil.WriteCborRPC(s, req); err != nil { if err := cborutil.WriteCborRPC(s, req); err != nil {
_ = s.SetWriteDeadline(time.Time{}) _ = s.SetWriteDeadline(time.Time{})
bs.syncPeers.logFailure(p, time.Since(start)) bs.syncPeers.logFailure(p, build.Clock.Since(start))
return nil, err return nil, err
} }
_ = s.SetWriteDeadline(time.Time{}) _ = s.SetWriteDeadline(time.Time{})
@ -302,7 +303,7 @@ func (bs *BlockSync) fetchBlocksBlockSync(ctx context.Context, p peer.ID, req *B
var res BlockSyncResponse var res BlockSyncResponse
r := incrt.New(s, 50<<10, 5*time.Second) r := incrt.New(s, 50<<10, 5*time.Second)
if err := cborutil.ReadCborRPC(bufio.NewReader(r), &res); err != nil { if err := cborutil.ReadCborRPC(bufio.NewReader(r), &res); err != nil {
bs.syncPeers.logFailure(p, time.Since(start)) bs.syncPeers.logFailure(p, build.Clock.Since(start))
return nil, err return nil, err
} }
@ -314,7 +315,7 @@ func (bs *BlockSync) fetchBlocksBlockSync(ctx context.Context, p peer.ID, req *B
) )
} }
bs.syncPeers.logSuccess(p, time.Since(start)) bs.syncPeers.logSuccess(p, build.Clock.Since(start))
return &res, nil return &res, nil
} }
@ -475,7 +476,7 @@ func (bpt *bsPeerTracker) addPeer(p peer.ID) {
return return
} }
bpt.peers[p] = &peerStats{ bpt.peers[p] = &peerStats{
firstSeen: time.Now(), firstSeen: build.Clock.Now(),
} }
} }

View File

@ -99,7 +99,7 @@ func (e *Events) listenHeadChanges(ctx context.Context) {
log.Warnf("not restarting listenHeadChanges: context error: %s", ctx.Err()) log.Warnf("not restarting listenHeadChanges: context error: %s", ctx.Err())
return return
} }
time.Sleep(time.Second) build.Clock.Sleep(time.Second)
log.Info("restarting listenHeadChanges") log.Info("restarting listenHeadChanges")
} }
} }

View File

@ -196,7 +196,7 @@ func NewGeneratorWithSectors(numSectors int) (*ChainGen, error) {
*genm2, *genm2,
}, },
NetworkName: "", NetworkName: "",
Timestamp: uint64(time.Now().Add(-500 * time.Duration(build.BlockDelaySecs) * time.Second).Unix()), Timestamp: uint64(build.Clock.Now().Add(-500 * time.Duration(build.BlockDelaySecs) * time.Second).Unix()),
} }
genb, err := genesis2.MakeGenesisBlock(context.TODO(), bs, sys, tpl) genb, err := genesis2.MakeGenesisBlock(context.TODO(), bs, sys, tpl)

View File

@ -23,12 +23,15 @@ import (
"golang.org/x/xerrors" "golang.org/x/xerrors"
"github.com/filecoin-project/go-address" "github.com/filecoin-project/go-address"
"github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/stmgr" "github.com/filecoin-project/lotus/chain/stmgr"
"github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/lib/sigs" "github.com/filecoin-project/lotus/lib/sigs"
"github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/raulk/clock"
) )
var log = logging.Logger("messagepool") var log = logging.Logger("messagepool")
@ -66,7 +69,7 @@ type MessagePool struct {
lk sync.Mutex lk sync.Mutex
closer chan struct{} closer chan struct{}
repubTk *time.Ticker repubTk *clock.Ticker
localAddrs map[address.Address]struct{} localAddrs map[address.Address]struct{}
@ -187,7 +190,7 @@ func New(api Provider, ds dtypes.MetadataDS, netName dtypes.NetworkName) (*Messa
mp := &MessagePool{ mp := &MessagePool{
closer: make(chan struct{}), closer: make(chan struct{}),
repubTk: time.NewTicker(time.Duration(build.BlockDelaySecs) * 10 * time.Second), repubTk: build.Clock.Ticker(time.Duration(build.BlockDelaySecs) * 10 * time.Second),
localAddrs: make(map[address.Address]struct{}), localAddrs: make(map[address.Address]struct{}),
pending: make(map[address.Address]*msgSet), pending: make(map[address.Address]*msgSet),
minGasPrice: types.NewInt(0), minGasPrice: types.NewInt(0),

View File

@ -3,7 +3,6 @@ package metrics
import ( import (
"context" "context"
"encoding/json" "encoding/json"
"time"
"github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/abi"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
@ -11,6 +10,7 @@ import (
pubsub "github.com/libp2p/go-libp2p-pubsub" pubsub "github.com/libp2p/go-libp2p-pubsub"
"go.uber.org/fx" "go.uber.org/fx"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/node/impl/full" "github.com/filecoin-project/lotus/node/impl/full"
"github.com/filecoin-project/lotus/node/modules/helpers" "github.com/filecoin-project/lotus/node/modules/helpers"
@ -89,7 +89,7 @@ func sendHeadNotifs(ctx context.Context, ps *pubsub.PubSub, topic string, chain
} }
// using unix nano time makes very sure we pick a nonce higher than previous restart // using unix nano time makes very sure we pick a nonce higher than previous restart
nonce := uint64(time.Now().UnixNano()) nonce := uint64(build.Clock.Now().UnixNano())
for { for {
select { select {
@ -107,7 +107,7 @@ func sendHeadNotifs(ctx context.Context, ps *pubsub.PubSub, topic string, chain
Height: n.Val.Height(), Height: n.Val.Height(),
Weight: w, Weight: w,
NodeName: nickname, NodeName: nickname,
Time: uint64(time.Now().UnixNano() / 1000_000), Time: uint64(build.Clock.Now().UnixNano() / 1000_000),
Nonce: nonce, Nonce: nonce,
} }

View File

@ -61,7 +61,7 @@ func HandleIncomingBlocks(ctx context.Context, bsub *pubsub.Subscription, s *cha
src := msg.GetFrom() src := msg.GetFrom()
go func() { go func() {
start := time.Now() start := build.Clock.Now()
log.Debug("about to fetch messages for block from pubsub") log.Debug("about to fetch messages for block from pubsub")
bmsgs, err := s.Bsync.FetchMessagesByCids(context.TODO(), blk.BlsMessages) bmsgs, err := s.Bsync.FetchMessagesByCids(context.TODO(), blk.BlsMessages)
if err != nil { if err != nil {
@ -75,9 +75,9 @@ func HandleIncomingBlocks(ctx context.Context, bsub *pubsub.Subscription, s *cha
return return
} }
took := time.Since(start) took := build.Clock.Since(start)
log.Infow("new block over pubsub", "cid", blk.Header.Cid(), "source", msg.GetFrom(), "msgfetch", took) log.Infow("new block over pubsub", "cid", blk.Header.Cid(), "source", msg.GetFrom(), "msgfetch", took)
if delay := time.Now().Unix() - int64(blk.Header.Timestamp); delay > 5 { if delay := build.Clock.Now().Unix() - int64(blk.Header.Timestamp); delay > 5 {
log.Warnf("Received block with large delay %d from miner %s", delay, blk.Header.Miner) log.Warnf("Received block with large delay %d from miner %s", delay, blk.Header.Miner)
} }
@ -142,9 +142,9 @@ func (bv *BlockValidator) flagPeer(p peer.ID) {
func (bv *BlockValidator) Validate(ctx context.Context, pid peer.ID, msg *pubsub.Message) pubsub.ValidationResult { func (bv *BlockValidator) Validate(ctx context.Context, pid peer.ID, msg *pubsub.Message) pubsub.ValidationResult {
// track validation time // track validation time
begin := time.Now() begin := build.Clock.Now()
defer func() { defer func() {
log.Debugf("block validation time: %s", time.Since(begin)) log.Debugf("block validation time: %s", build.Clock.Since(begin))
}() }()
stats.Record(ctx, metrics.BlockReceived.M(1)) stats.Record(ctx, metrics.BlockReceived.M(1))
@ -233,7 +233,7 @@ func (bv *BlockValidator) Validate(ctx context.Context, pid peer.ID, msg *pubsub
func (bv *BlockValidator) isChainNearSynced() bool { func (bv *BlockValidator) isChainNearSynced() bool {
ts := bv.chain.GetHeaviestTipSet() ts := bv.chain.GetHeaviestTipSet()
timestamp := ts.MinTimestamp() timestamp := ts.MinTimestamp()
now := time.Now().UnixNano() now := build.Clock.Now().UnixNano()
cutoff := uint64(now) - uint64(6*time.Hour) cutoff := uint64(now) - uint64(6*time.Hour)
return timestamp > cutoff return timestamp > cutoff
} }

View File

@ -620,7 +620,7 @@ func (syncer *Syncer) ValidateBlock(ctx context.Context, b *types.FullBlock) (er
return nil return nil
} }
validationStart := time.Now() validationStart := build.Clock.Now()
defer func() { defer func() {
dur := time.Since(validationStart) dur := time.Since(validationStart)
durMilli := dur.Seconds() * float64(1000) durMilli := dur.Seconds() * float64(1000)
@ -665,12 +665,12 @@ func (syncer *Syncer) ValidateBlock(ctx context.Context, b *types.FullBlock) (er
// fast checks first // fast checks first
now := uint64(time.Now().Unix()) now := uint64(build.Clock.Now().Unix())
if h.Timestamp > now+build.AllowableClockDriftSecs { if h.Timestamp > now+build.AllowableClockDriftSecs {
return xerrors.Errorf("block was from the future (now=%d, blk=%d): %w", now, h.Timestamp, ErrTemporal) return xerrors.Errorf("block was from the future (now=%d, blk=%d): %w", now, h.Timestamp, ErrTemporal)
} }
if h.Timestamp > now { if h.Timestamp > now {
log.Warn("Got block from the future, but within threshold", h.Timestamp, time.Now().Unix()) log.Warn("Got block from the future, but within threshold", h.Timestamp, build.Clock.Now().Unix())
} }
if h.Timestamp < baseTs.MinTimestamp()+(build.BlockDelaySecs*uint64(h.Height-baseTs.Height())) { if h.Timestamp < baseTs.MinTimestamp()+(build.BlockDelaySecs*uint64(h.Height-baseTs.Height())) {
@ -1535,6 +1535,6 @@ func (syncer *Syncer) IsEpochBeyondCurrMax(epoch abi.ChainEpoch) bool {
return false return false
} }
now := uint64(time.Now().Unix()) now := uint64(build.Clock.Now().Unix())
return epoch > (abi.ChainEpoch((now-g.Timestamp)/build.BlockDelaySecs) + MaxHeightDrift) return epoch > (abi.ChainEpoch((now-g.Timestamp)/build.BlockDelaySecs) + MaxHeightDrift)
} }

View File

@ -8,6 +8,7 @@ import (
"github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types"
) )
@ -48,7 +49,7 @@ func (ss *SyncerState) SetStage(v api.SyncStateStage) {
defer ss.lk.Unlock() defer ss.lk.Unlock()
ss.Stage = v ss.Stage = v
if v == api.StageSyncComplete { if v == api.StageSyncComplete {
ss.End = time.Now() ss.End = build.Clock.Now()
} }
} }
@ -64,7 +65,7 @@ func (ss *SyncerState) Init(base, target *types.TipSet) {
ss.Stage = api.StageHeaders ss.Stage = api.StageHeaders
ss.Height = 0 ss.Height = 0
ss.Message = "" ss.Message = ""
ss.Start = time.Now() ss.Start = build.Clock.Now()
ss.End = time.Time{} ss.End = time.Time{}
} }
@ -87,7 +88,7 @@ func (ss *SyncerState) Error(err error) {
defer ss.lk.Unlock() defer ss.lk.Unlock()
ss.Message = err.Error() ss.Message = err.Error()
ss.Stage = api.StageSyncErrored ss.Stage = api.StageSyncErrored
ss.End = time.Now() ss.End = build.Clock.Now()
} }
func (ss *SyncerState) Snapshot() SyncerState { func (ss *SyncerState) Snapshot() SyncerState {

View File

@ -373,8 +373,7 @@ func (rt *Runtime) Send(to address.Address, method abi.MethodNum, m vmr.CBORMars
} }
func (rt *Runtime) internalSend(from, to address.Address, method abi.MethodNum, value types.BigInt, params []byte) ([]byte, aerrors.ActorError) { func (rt *Runtime) internalSend(from, to address.Address, method abi.MethodNum, value types.BigInt, params []byte) ([]byte, aerrors.ActorError) {
start := build.Clock.Now()
start := time.Now()
ctx, span := trace.StartSpan(rt.ctx, "vmc.Send") ctx, span := trace.StartSpan(rt.ctx, "vmc.Send")
defer span.End() defer span.End()
if span.IsRecordingEvents() { if span.IsRecordingEvents() {
@ -528,7 +527,7 @@ func (rt *Runtime) chargeGasInternal(gas GasCharge, skip int) aerrors.ActorError
var callers [10]uintptr var callers [10]uintptr
cout := gruntime.Callers(2+skip, callers[:]) cout := gruntime.Callers(2+skip, callers[:])
now := time.Now() now := build.Clock.Now()
if rt.lastGasCharge != nil { if rt.lastGasCharge != nil {
rt.lastGasCharge.TimeTaken = now.Sub(rt.lastGasChargeTime) rt.lastGasCharge.TimeTaken = now.Sub(rt.lastGasChargeTime)
} }

View File

@ -28,6 +28,7 @@ import (
"github.com/filecoin-project/specs-actors/actors/runtime" "github.com/filecoin-project/specs-actors/actors/runtime"
"github.com/filecoin-project/specs-actors/actors/runtime/exitcode" "github.com/filecoin-project/specs-actors/actors/runtime/exitcode"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/actors/aerrors" "github.com/filecoin-project/lotus/chain/actors/aerrors"
"github.com/filecoin-project/lotus/chain/state" "github.com/filecoin-project/lotus/chain/state"
"github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types"
@ -286,7 +287,7 @@ func checkMessage(msg *types.Message) error {
} }
func (vm *VM) ApplyImplicitMessage(ctx context.Context, msg *types.Message) (*ApplyRet, error) { func (vm *VM) ApplyImplicitMessage(ctx context.Context, msg *types.Message) (*ApplyRet, error) {
start := time.Now() start := build.Clock.Now()
ret, actorErr, rt := vm.send(ctx, msg, nil, nil, start) ret, actorErr, rt := vm.send(ctx, msg, nil, nil, start)
rt.finilizeGasTracing() rt.finilizeGasTracing()
return &ApplyRet{ return &ApplyRet{
@ -303,7 +304,7 @@ func (vm *VM) ApplyImplicitMessage(ctx context.Context, msg *types.Message) (*Ap
} }
func (vm *VM) ApplyMessage(ctx context.Context, cmsg types.ChainMsg) (*ApplyRet, error) { func (vm *VM) ApplyMessage(ctx context.Context, cmsg types.ChainMsg) (*ApplyRet, error) {
start := time.Now() start := build.Clock.Now()
ctx, span := trace.StartSpan(ctx, "vm.ApplyMessage") ctx, span := trace.StartSpan(ctx, "vm.ApplyMessage")
defer span.End() defer span.End()
msg := cmsg.VMMessage() msg := cmsg.VMMessage()

View File

@ -195,7 +195,7 @@ func SyncWait(ctx context.Context, napi api.FullNode) error {
case <-ctx.Done(): case <-ctx.Done():
fmt.Println("\nExit by user") fmt.Println("\nExit by user")
return nil return nil
case <-time.After(1 * time.Second): case <-build.Clock.After(1 * time.Second):
} }
} }
} }

View File

@ -11,6 +11,7 @@ import (
"github.com/filecoin-project/go-address" "github.com/filecoin-project/go-address"
"github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types"
lcli "github.com/filecoin-project/lotus/cli" lcli "github.com/filecoin-project/lotus/cli"
@ -68,7 +69,7 @@ func sendSmallFundsTxs(ctx context.Context, api api.FullNode, from address.Addre
sendSet = append(sendSet, naddr) sendSet = append(sendSet, naddr)
} }
tick := time.NewTicker(time.Second / time.Duration(rate)) tick := build.Clock.Ticker(time.Second / time.Duration(rate))
for { for {
select { select {
case <-tick.C: case <-tick.C:

1
go.mod
View File

@ -103,6 +103,7 @@ require (
github.com/multiformats/go-multibase v0.0.3 github.com/multiformats/go-multibase v0.0.3
github.com/multiformats/go-multihash v0.0.13 github.com/multiformats/go-multihash v0.0.13
github.com/opentracing/opentracing-go v1.1.0 github.com/opentracing/opentracing-go v1.1.0
github.com/raulk/clock v1.1.0
github.com/stretchr/objx v0.2.0 // indirect github.com/stretchr/objx v0.2.0 // indirect
github.com/stretchr/testify v1.6.1 github.com/stretchr/testify v1.6.1
github.com/syndtr/goleveldb v1.0.0 github.com/syndtr/goleveldb v1.0.0

2
go.sum
View File

@ -1233,6 +1233,8 @@ github.com/prometheus/procfs v0.0.11 h1:DhHlBtkHWPYi8O2y31JkK0TF+DGM+51OopZjH/Ia
github.com/prometheus/procfs v0.0.11/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU= github.com/prometheus/procfs v0.0.11/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU=
github.com/prometheus/procfs v0.1.0 h1:jhMy6QXfi3y2HEzFoyuCj40z4OZIIHHPtFyCMftmvKA= github.com/prometheus/procfs v0.1.0 h1:jhMy6QXfi3y2HEzFoyuCj40z4OZIIHHPtFyCMftmvKA=
github.com/prometheus/procfs v0.1.0/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU= github.com/prometheus/procfs v0.1.0/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU=
github.com/raulk/clock v1.1.0 h1:dpb29+UKMbLqiU/jqIJptgLR1nn23HLgMY0sTCDza5Y=
github.com/raulk/clock v1.1.0/go.mod h1:3MpVxdZ/ODBQDxbN+kzshf5OSZwPjtMDx6BBXBmOeY0=
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=

View File

@ -10,6 +10,8 @@ import (
logging "github.com/ipfs/go-log" logging "github.com/ipfs/go-log"
"golang.org/x/xerrors" "golang.org/x/xerrors"
"github.com/filecoin-project/lotus/build"
) )
func InitializeSystemJournal(dir string) error { func InitializeSystemJournal(dir string) error {
@ -103,7 +105,7 @@ func (fsj *fsJournal) rollJournalFile() error {
fsj.fi.Close() fsj.fi.Close()
} }
nfi, err := os.Create(filepath.Join(fsj.journalDir, fmt.Sprintf("lotus-journal-%s.ndjson", time.Now().Format(time.RFC3339)))) nfi, err := os.Create(filepath.Join(fsj.journalDir, fmt.Sprintf("lotus-journal-%s.ndjson", build.Clock.Now().Format(time.RFC3339))))
if err != nil { if err != nil {
return xerrors.Errorf("failed to open journal file: %w", err) return xerrors.Errorf("failed to open journal file: %w", err)
} }
@ -130,7 +132,7 @@ func (fsj *fsJournal) runLoop() {
func (fsj *fsJournal) AddEntry(system string, obj interface{}) { func (fsj *fsJournal) AddEntry(system string, obj interface{}) {
je := &JournalEntry{ je := &JournalEntry{
System: system, System: system,
Timestamp: time.Now(), Timestamp: build.Clock.Now(),
Val: obj, Val: obj,
} }
select { select {

View File

@ -5,12 +5,12 @@ import (
"time" "time"
logging "github.com/ipfs/go-log/v2" logging "github.com/ipfs/go-log/v2"
"github.com/filecoin-project/lotus/build"
) )
var log = logging.Logger("incrt") var log = logging.Logger("incrt")
var now = time.Now
type ReaderDeadline interface { type ReaderDeadline interface {
Read([]byte) (int, error) Read([]byte) (int, error)
SetReadDeadline(time.Time) error SetReadDeadline(time.Time) error
@ -45,7 +45,7 @@ func (err errNoWait) Timeout() bool {
} }
func (crt *incrt) Read(buf []byte) (int, error) { func (crt *incrt) Read(buf []byte) (int, error) {
start := now() start := build.Clock.Now()
if crt.wait == 0 { if crt.wait == 0 {
return 0, errNoWait{} return 0, errNoWait{}
} }
@ -59,7 +59,7 @@ func (crt *incrt) Read(buf []byte) (int, error) {
_ = crt.rd.SetReadDeadline(time.Time{}) _ = crt.rd.SetReadDeadline(time.Time{})
if err == nil { if err == nil {
dur := now().Sub(start) dur := build.Clock.Now().Sub(start)
crt.wait -= dur crt.wait -= dur
crt.wait += time.Duration(n) * crt.waitPerByte crt.wait += time.Duration(n) * crt.waitPerByte
if crt.wait < 0 { if crt.wait < 0 {

View File

@ -5,6 +5,7 @@ import (
"sync" "sync"
"time" "time"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/metrics" "github.com/filecoin-project/lotus/metrics"
"github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/node/modules/dtypes"
"go.opencensus.io/stats" "go.opencensus.io/stats"
@ -107,7 +108,7 @@ func (pmgr *PeerMgr) Disconnect(p peer.ID) {
} }
func (pmgr *PeerMgr) Run(ctx context.Context) { func (pmgr *PeerMgr) Run(ctx context.Context) {
tick := time.NewTicker(time.Second * 5) tick := build.Clock.Ticker(time.Second * 5)
for { for {
select { select {
case <-tick.C: case <-tick.C:

View File

@ -46,7 +46,7 @@ func NewMiner(api api.FullNode, epp gen.WinningPoStProver, addr address.Address)
waitFunc: func(ctx context.Context, baseTime uint64) (func(bool, error), error) { waitFunc: func(ctx context.Context, baseTime uint64) (func(bool, error), error) {
// Wait around for half the block time in case other parents come in // Wait around for half the block time in case other parents come in
deadline := baseTime + build.PropagationDelaySecs deadline := baseTime + build.PropagationDelaySecs
time.Sleep(time.Until(time.Unix(int64(deadline), 0))) build.Clock.Sleep(build.Clock.Until(time.Unix(int64(deadline), 0)))
return func(bool, error) {}, nil return func(bool, error) {}, nil
}, },
@ -107,7 +107,7 @@ func (m *Miner) Stop(ctx context.Context) error {
func (m *Miner) niceSleep(d time.Duration) bool { func (m *Miner) niceSleep(d time.Duration) bool {
select { select {
case <-time.After(d): case <-build.Clock.After(d):
return true return true
case <-m.stop: case <-m.stop:
return false return false
@ -170,14 +170,18 @@ func (m *Miner) mine(ctx context.Context) {
if b != nil { if b != nil {
btime := time.Unix(int64(b.Header.Timestamp), 0) btime := time.Unix(int64(b.Header.Timestamp), 0)
if time.Now().Before(btime) { now := build.Clock.Now()
if !m.niceSleep(time.Until(btime)) { switch {
case btime == now:
// block timestamp is perfectly aligned with time.
case btime.After(now):
if !m.niceSleep(build.Clock.Until(btime)) {
log.Warnf("received interrupt while waiting to broadcast block, will shutdown after block is sent out") log.Warnf("received interrupt while waiting to broadcast block, will shutdown after block is sent out")
time.Sleep(time.Until(btime)) build.Clock.Sleep(build.Clock.Until(btime))
} }
} else { default:
log.Warnw("mined block in the past", "block-time", btime, log.Warnw("mined block in the past",
"time", time.Now(), "duration", time.Since(btime)) "block-time", btime, "time", build.Clock.Now(), "difference", build.Clock.Since(btime))
} }
// TODO: should do better 'anti slash' protection here // TODO: should do better 'anti slash' protection here
@ -201,7 +205,7 @@ func (m *Miner) mine(ctx context.Context) {
nextRound := time.Unix(int64(base.TipSet.MinTimestamp()+build.BlockDelaySecs*uint64(base.NullRounds))+int64(build.PropagationDelaySecs), 0) nextRound := time.Unix(int64(base.TipSet.MinTimestamp()+build.BlockDelaySecs*uint64(base.NullRounds))+int64(build.PropagationDelaySecs), 0)
select { select {
case <-time.After(time.Until(nextRound)): case <-build.Clock.After(build.Clock.Until(nextRound)):
case <-m.stop: case <-m.stop:
stopping := m.stopping stopping := m.stopping
m.stop = nil m.stop = nil
@ -271,7 +275,7 @@ func (m *Miner) hasPower(ctx context.Context, addr address.Address, ts *types.Ti
// 1. // 1.
func (m *Miner) mineOne(ctx context.Context, base *MiningBase) (*types.BlockMsg, error) { func (m *Miner) mineOne(ctx context.Context, base *MiningBase) (*types.BlockMsg, error) {
log.Debugw("attempting to mine a block", "tipset", types.LogCids(base.TipSet.Cids())) log.Debugw("attempting to mine a block", "tipset", types.LogCids(base.TipSet.Cids()))
start := time.Now() start := build.Clock.Now()
round := base.TipSet.Height() + base.NullRounds + 1 round := base.TipSet.Height() + base.NullRounds + 1
@ -283,11 +287,11 @@ func (m *Miner) mineOne(ctx context.Context, base *MiningBase) (*types.BlockMsg,
return nil, nil return nil, nil
} }
tMBI := time.Now() tMBI := build.Clock.Now()
beaconPrev := mbi.PrevBeaconEntry beaconPrev := mbi.PrevBeaconEntry
tDrand := time.Now() tDrand := build.Clock.Now()
bvals := mbi.BeaconEntries bvals := mbi.BeaconEntries
hasPower, err := m.hasPower(ctx, m.address, base.TipSet) hasPower, err := m.hasPower(ctx, m.address, base.TipSet)
@ -299,9 +303,9 @@ func (m *Miner) mineOne(ctx context.Context, base *MiningBase) (*types.BlockMsg,
return nil, nil return nil, nil
} }
tPowercheck := time.Now() tPowercheck := build.Clock.Now()
log.Infof("Time delta between now and our mining base: %ds (nulls: %d)", uint64(time.Now().Unix())-base.TipSet.MinTimestamp(), base.NullRounds) log.Infof("Time delta between now and our mining base: %ds (nulls: %d)", uint64(build.Clock.Now().Unix())-base.TipSet.MinTimestamp(), base.NullRounds)
rbase := beaconPrev rbase := beaconPrev
if len(bvals) > 0 { if len(bvals) > 0 {
@ -322,7 +326,7 @@ func (m *Miner) mineOne(ctx context.Context, base *MiningBase) (*types.BlockMsg,
return nil, nil return nil, nil
} }
tTicket := time.Now() tTicket := build.Clock.Now()
buf := new(bytes.Buffer) buf := new(bytes.Buffer)
if err := m.address.MarshalCBOR(buf); err != nil { if err := m.address.MarshalCBOR(buf); err != nil {
@ -336,7 +340,7 @@ func (m *Miner) mineOne(ctx context.Context, base *MiningBase) (*types.BlockMsg,
prand := abi.PoStRandomness(rand) prand := abi.PoStRandomness(rand)
tSeed := time.Now() tSeed := build.Clock.Now()
postProof, err := m.epp.ComputeProof(ctx, mbi.Sectors, prand) postProof, err := m.epp.ComputeProof(ctx, mbi.Sectors, prand)
if err != nil { if err != nil {
@ -349,7 +353,7 @@ func (m *Miner) mineOne(ctx context.Context, base *MiningBase) (*types.BlockMsg,
return nil, xerrors.Errorf("failed to get pending messages: %w", err) return nil, xerrors.Errorf("failed to get pending messages: %w", err)
} }
tPending := time.Now() tPending := build.Clock.Now()
// TODO: winning post proof // TODO: winning post proof
b, err := m.createBlock(base, m.address, ticket, winner, bvals, postProof, pending) b, err := m.createBlock(base, m.address, ticket, winner, bvals, postProof, pending)
@ -357,7 +361,7 @@ func (m *Miner) mineOne(ctx context.Context, base *MiningBase) (*types.BlockMsg,
return nil, xerrors.Errorf("failed to create block: %w", err) return nil, xerrors.Errorf("failed to create block: %w", err)
} }
tCreateBlock := time.Now() tCreateBlock := build.Clock.Now()
dur := tCreateBlock.Sub(start) dur := tCreateBlock.Sub(start)
log.Infow("mined new block", "cid", b.Cid(), "height", b.Header.Height, "took", dur) log.Infow("mined new block", "cid", b.Cid(), "height", b.Header.Height, "took", dur)
if dur > time.Second*time.Duration(build.BlockDelaySecs) { if dur > time.Second*time.Duration(build.BlockDelaySecs) {
@ -502,7 +506,7 @@ func SelectMessages(ctx context.Context, al ActorLookup, ts *types.TipSet, msgs
tooLowFundMsgs := 0 tooLowFundMsgs := 0
tooHighNonceMsgs := 0 tooHighNonceMsgs := 0
start := time.Now() start := build.Clock.Now()
vmValid := time.Duration(0) vmValid := time.Duration(0)
getbal := time.Duration(0) getbal := time.Duration(0)
@ -511,7 +515,7 @@ func SelectMessages(ctx context.Context, al ActorLookup, ts *types.TipSet, msgs
}) })
for _, msg := range msgs { for _, msg := range msgs {
vmstart := time.Now() vmstart := build.Clock.Now()
minGas := vm.PricelistByEpoch(ts.Height()).OnChainMessage(msg.ChainLength()) // TODO: really should be doing just msg.ChainLength() but the sync side of this code doesnt seem to have access to that minGas := vm.PricelistByEpoch(ts.Height()).OnChainMessage(msg.ChainLength()) // TODO: really should be doing just msg.ChainLength() but the sync side of this code doesnt seem to have access to that
if err := msg.VMMessage().ValidForBlockInclusion(minGas.Total()); err != nil { if err := msg.VMMessage().ValidForBlockInclusion(minGas.Total()); err != nil {
@ -519,7 +523,7 @@ func SelectMessages(ctx context.Context, al ActorLookup, ts *types.TipSet, msgs
continue continue
} }
vmValid += time.Since(vmstart) vmValid += build.Clock.Since(vmstart)
// TODO: this should be in some more general 'validate message' call // TODO: this should be in some more general 'validate message' call
if msg.Message.GasLimit > build.BlockGasLimit { if msg.Message.GasLimit > build.BlockGasLimit {
@ -534,7 +538,7 @@ func SelectMessages(ctx context.Context, al ActorLookup, ts *types.TipSet, msgs
from := msg.Message.From from := msg.Message.From
getBalStart := time.Now() getBalStart := build.Clock.Now()
if _, ok := inclNonces[from]; !ok { if _, ok := inclNonces[from]; !ok {
act, err := al(ctx, from, ts.Key()) act, err := al(ctx, from, ts.Key())
if err != nil { if err != nil {
@ -545,7 +549,7 @@ func SelectMessages(ctx context.Context, al ActorLookup, ts *types.TipSet, msgs
inclNonces[from] = act.Nonce inclNonces[from] = act.Nonce
inclBalances[from] = act.Balance inclBalances[from] = act.Balance
} }
getbal += time.Since(getBalStart) getbal += build.Clock.Since(getBalStart)
if inclBalances[from].LessThan(msg.Message.RequiredFunds()) { if inclBalances[from].LessThan(msg.Message.RequiredFunds()) {
tooLowFundMsgs++ tooLowFundMsgs++
@ -660,7 +664,7 @@ func SelectMessages(ctx context.Context, al ActorLookup, ts *types.TipSet, msgs
log.Warnf("%d messages in mempool had too high nonce", tooHighNonceMsgs) log.Warnf("%d messages in mempool had too high nonce", tooHighNonceMsgs)
} }
sm := time.Now() sm := build.Clock.Now()
if sm.Sub(start) > time.Second { if sm.Sub(start) > time.Second {
log.Warnw("SelectMessages took a long time", log.Warnw("SelectMessages took a long time",
"duration", sm.Sub(start), "duration", sm.Sub(start),

View File

@ -15,6 +15,7 @@ import (
protocol "github.com/libp2p/go-libp2p-core/protocol" protocol "github.com/libp2p/go-libp2p-core/protocol"
cborutil "github.com/filecoin-project/go-cbor-util" cborutil "github.com/filecoin-project/go-cbor-util"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain" "github.com/filecoin-project/lotus/chain"
"github.com/filecoin-project/lotus/chain/store" "github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types"
@ -67,7 +68,7 @@ func (hs *Service) HandleStream(s inet.Stream) {
_ = s.Conn().Close() _ = s.Conn().Close()
return return
} }
arrived := time.Now() arrived := build.Clock.Now()
log.Debugw("genesis from hello", log.Debugw("genesis from hello",
"tipset", hmsg.HeaviestTipSet, "tipset", hmsg.HeaviestTipSet,
@ -82,7 +83,7 @@ func (hs *Service) HandleStream(s inet.Stream) {
go func() { go func() {
defer s.Close() //nolint:errcheck defer s.Close() //nolint:errcheck
sent := time.Now() sent := build.Clock.Now()
msg := &LatencyMessage{ msg := &LatencyMessage{
TArrial: arrived.UnixNano(), TArrial: arrived.UnixNano(),
TSent: sent.UnixNano(), TSent: sent.UnixNano(),
@ -99,7 +100,7 @@ func (hs *Service) HandleStream(s inet.Stream) {
if len(protos) == 0 { if len(protos) == 0 {
log.Warn("other peer hasnt completed libp2p identify, waiting a bit") log.Warn("other peer hasnt completed libp2p identify, waiting a bit")
// TODO: this better // TODO: this better
time.Sleep(time.Millisecond * 300) build.Clock.Sleep(time.Millisecond * 300)
} }
ts, err := hs.syncer.FetchTipSet(context.Background(), s.Conn().RemotePeer(), types.NewTipSetKey(hmsg.HeaviestTipSet...)) ts, err := hs.syncer.FetchTipSet(context.Background(), s.Conn().RemotePeer(), types.NewTipSetKey(hmsg.HeaviestTipSet...))
@ -146,7 +147,7 @@ func (hs *Service) SayHello(ctx context.Context, pid peer.ID) error {
} }
log.Debug("Sending hello message: ", hts.Cids(), hts.Height(), gen.Cid()) log.Debug("Sending hello message: ", hts.Cids(), hts.Height(), gen.Cid())
t0 := time.Now() t0 := build.Clock.Now()
if err := cborutil.WriteCborRPC(s, hmsg); err != nil { if err := cborutil.WriteCborRPC(s, hmsg); err != nil {
return err return err
} }
@ -155,13 +156,13 @@ func (hs *Service) SayHello(ctx context.Context, pid peer.ID) error {
defer s.Close() //nolint:errcheck defer s.Close() //nolint:errcheck
lmsg := &LatencyMessage{} lmsg := &LatencyMessage{}
_ = s.SetReadDeadline(time.Now().Add(10 * time.Second)) _ = s.SetReadDeadline(build.Clock.Now().Add(10 * time.Second))
err := cborutil.ReadCborRPC(s, lmsg) err := cborutil.ReadCborRPC(s, lmsg)
if err != nil { if err != nil {
log.Infow("reading latency message", "error", err) log.Infow("reading latency message", "error", err)
} }
t3 := time.Now() t3 := build.Clock.Now()
lat := t3.Sub(t0) lat := t3.Sub(t0)
// add to peer tracker // add to peer tracker
if hs.pmgr != nil { if hs.pmgr != nil {

View File

@ -7,7 +7,6 @@ import (
"io" "io"
"io/ioutil" "io/ioutil"
"os" "os"
"time"
"github.com/ipfs/go-blockservice" "github.com/ipfs/go-blockservice"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
@ -20,6 +19,7 @@ import (
"github.com/filecoin-project/specs-actors/actors/runtime" "github.com/filecoin-project/specs-actors/actors/runtime"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/gen" "github.com/filecoin-project/lotus/chain/gen"
genesis2 "github.com/filecoin-project/lotus/chain/gen/genesis" genesis2 "github.com/filecoin-project/lotus/chain/gen/genesis"
"github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types"
@ -71,7 +71,7 @@ func MakeGenesis(outFile, genesisTemplate string) func(bs dtypes.ChainBlockstore
} }
if template.Timestamp == 0 { if template.Timestamp == 0 {
template.Timestamp = uint64(time.Now().Unix()) template.Timestamp = uint64(build.Clock.Now().Unix())
} }
b, err := genesis2.MakeGenesisBlock(context.TODO(), bs, syscalls, template) b, err := genesis2.MakeGenesisBlock(context.TODO(), bs, syscalls, template)

View File

@ -167,7 +167,7 @@ func NewWinningPoStProver(api api.FullNode, prover storage.Prover, verifier ffiw
var _ gen.WinningPoStProver = (*StorageWpp)(nil) var _ gen.WinningPoStProver = (*StorageWpp)(nil)
func (wpp *StorageWpp) GenerateCandidates(ctx context.Context, randomness abi.PoStRandomness, eligibleSectorCount uint64) ([]uint64, error) { func (wpp *StorageWpp) GenerateCandidates(ctx context.Context, randomness abi.PoStRandomness, eligibleSectorCount uint64) ([]uint64, error) {
start := time.Now() start := build.Clock.Now()
cds, err := wpp.verifier.GenerateWinningPoStSectorChallenge(ctx, wpp.winnRpt, wpp.miner, randomness, eligibleSectorCount) cds, err := wpp.verifier.GenerateWinningPoStSectorChallenge(ctx, wpp.winnRpt, wpp.miner, randomness, eligibleSectorCount)
if err != nil { if err != nil {
@ -185,7 +185,7 @@ func (wpp *StorageWpp) ComputeProof(ctx context.Context, ssi []abi.SectorInfo, r
log.Infof("Computing WinningPoSt ;%+v; %v", ssi, rand) log.Infof("Computing WinningPoSt ;%+v; %v", ssi, rand)
start := time.Now() start := build.Clock.Now()
proof, err := wpp.prover.GenerateWinningPoSt(ctx, wpp.miner, ssi, rand) proof, err := wpp.prover.GenerateWinningPoSt(ctx, wpp.miner, ssi, rand)
if err != nil { if err != nil {
return nil, err return nil, err

View File

@ -430,7 +430,7 @@ func (s *WindowPoStScheduler) runPost(ctx context.Context, di miner.DeadlineInfo
snums = append(snums, si.SectorNumber) snums = append(snums, si.SectorNumber)
} }
tsStart := time.Now() tsStart := build.Clock.Now()
log.Infow("generating windowPost", log.Infow("generating windowPost",
"sectors", len(ssi)) "sectors", len(ssi))

View File

@ -14,6 +14,7 @@ import (
"github.com/filecoin-project/specs-storage/storage" "github.com/filecoin-project/specs-storage/storage"
"github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/store" "github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types"
) )
@ -85,7 +86,7 @@ func (s *WindowPoStScheduler) Run(ctx context.Context) {
if err != nil { if err != nil {
log.Errorf("ChainNotify error: %+v") log.Errorf("ChainNotify error: %+v")
time.Sleep(10 * time.Second) build.Clock.Sleep(10 * time.Second)
continue continue
} }

View File

@ -66,7 +66,7 @@ func NewInfluxWriteQueue(ctx context.Context, influx client.Client) *InfluxWrite
for i := 0; i < maxRetries; i++ { for i := 0; i < maxRetries; i++ {
if err := influx.Write(batch); err != nil { if err := influx.Write(batch); err != nil {
log.Warnw("Failed to write batch", "error", err) log.Warnw("Failed to write batch", "error", err)
time.Sleep(time.Second * 15) build.Clock.Sleep(15 * time.Second)
continue continue
} }
@ -104,7 +104,7 @@ func InfluxNewBatch() (client.BatchPoints, error) {
} }
func NewPoint(name string, value interface{}) models.Point { func NewPoint(name string, value interface{}) models.Point {
pt, _ := models.NewPoint(name, models.Tags{}, map[string]interface{}{"value": value}, time.Now()) pt, _ := models.NewPoint(name, models.Tags{}, map[string]interface{}{"value": value}, build.Clock.Now())
return pt return pt
} }

View File

@ -52,7 +52,7 @@ sync_complete:
select { select {
case <-ctx.Done(): case <-ctx.Done():
return ctx.Err() return ctx.Err()
case <-time.After(5 * time.Second): case <-build.Clock.After(5 * time.Second):
state, err := napi.SyncState(ctx) state, err := napi.SyncState(ctx)
if err != nil { if err != nil {
return err return err
@ -97,13 +97,13 @@ sync_complete:
select { select {
case <-ctx.Done(): case <-ctx.Done():
return ctx.Err() return ctx.Err()
case <-time.After(5 * time.Second): case <-build.Clock.After(5 * time.Second):
head, err := napi.ChainHead(ctx) head, err := napi.ChainHead(ctx)
if err != nil { if err != nil {
return err return err
} }
timestampDelta := time.Now().Unix() - int64(head.MinTimestamp()) timestampDelta := build.Clock.Now().Unix() - int64(head.MinTimestamp())
log.Infow( log.Infow(
"Waiting for reasonable head height", "Waiting for reasonable head height",