introduce the ability to mock time.

This commit is contained in:
Raúl Kripalani 2020-07-10 15:43:14 +01:00
parent 7c73dbfba9
commit 13de81b3b2
31 changed files with 129 additions and 99 deletions

View File

@ -4,6 +4,8 @@ import (
"context"
"fmt"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build"
"os"
"strings"
"testing"
@ -35,14 +37,14 @@ func TestPledgeSector(t *testing.T, b APIBuilder, blocktime time.Duration, nSect
if err := miner.NetConnect(ctx, addrinfo); err != nil {
t.Fatal(err)
}
time.Sleep(time.Second)
build.Clock.Sleep(time.Second)
mine := true
done := make(chan struct{})
go func() {
defer close(done)
for mine {
time.Sleep(blocktime)
build.Clock.Sleep(blocktime)
if err := sn[0].MineOne(ctx, func(bool, error) {}); err != nil {
t.Error(err)
}
@ -69,7 +71,7 @@ func pledgeSectors(t *testing.T, ctx context.Context, miner TestStorageNode, n i
break
}
time.Sleep(100 * time.Millisecond)
build.Clock.Sleep(100 * time.Millisecond)
}
fmt.Printf("All sectors is fsm\n")
@ -94,7 +96,7 @@ func pledgeSectors(t *testing.T, ctx context.Context, miner TestStorageNode, n i
}
}
time.Sleep(100 * time.Millisecond)
build.Clock.Sleep(100 * time.Millisecond)
fmt.Printf("WaitSeal: %d\n", len(s))
}
}
@ -115,14 +117,14 @@ func TestWindowPost(t *testing.T, b APIBuilder, blocktime time.Duration, nSector
if err := miner.NetConnect(ctx, addrinfo); err != nil {
t.Fatal(err)
}
time.Sleep(time.Second)
build.Clock.Sleep(time.Second)
mine := true
done := make(chan struct{})
go func() {
defer close(done)
for mine {
time.Sleep(blocktime)
build.Clock.Sleep(blocktime)
if err := sn[0].MineOne(ctx, func(bool, error) {}); err != nil {
t.Error(err)
}
@ -150,7 +152,7 @@ func TestWindowPost(t *testing.T, b APIBuilder, blocktime time.Duration, nSector
if head.Height()%100 == 0 {
fmt.Printf("@%d\n", head.Height())
}
time.Sleep(blocktime)
build.Clock.Sleep(blocktime)
}
p, err := client.StateMinerPower(ctx, maddr, types.EmptyTSK)

10
build/clock.go Normal file
View File

@ -0,0 +1,10 @@
package build
import "github.com/raulk/clock"
// Clock is the global clock for the system. In standard builds,
// we use a real-time clock, which maps to the `time` package.
//
// Tests that need control of time can replace this variable with
// clock.NewMock().
var Clock = clock.New()

View File

@ -2,12 +2,13 @@ package beacon
import (
"context"
"time"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/specs-actors/actors/abi"
logging "github.com/ipfs/go-log"
"golang.org/x/xerrors"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/types"
)
var log = logging.Logger("beacon")
@ -52,7 +53,7 @@ func ValidateBlockValues(b RandomBeacon, h *types.BlockHeader, prevEntry types.B
}
func BeaconEntriesForBlock(ctx context.Context, beacon RandomBeacon, round abi.ChainEpoch, prev types.BeaconEntry) ([]types.BeaconEntry, error) {
start := time.Now()
start := build.Clock.Now()
maxRound := beacon.MaxBeaconRoundForEpoch(round, prev)
if maxRound == prev.Round {
@ -81,7 +82,7 @@ func BeaconEntriesForBlock(ctx context.Context, beacon RandomBeacon, round abi.C
}
}
log.Debugw("fetching beacon entries", "took", time.Since(start), "numEntries", len(out))
log.Debugw("fetching beacon entries", "took", build.Clock.Since(start), "numEntries", len(out))
reverse(out)
return out, nil
}

View File

@ -21,6 +21,7 @@ import (
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/beacon"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/node/modules/dtypes"
@ -131,7 +132,7 @@ func (db *DrandBeacon) Entry(ctx context.Context, round uint64) <-chan beacon.Re
}
go func() {
start := time.Now()
start := build.Clock.Now()
log.Infow("start fetching randomness", "round", round)
resp, err := db.client.Get(ctx, round)
@ -142,7 +143,7 @@ func (db *DrandBeacon) Entry(ctx context.Context, round uint64) <-chan beacon.Re
br.Entry.Round = resp.Round()
br.Entry.Data = resp.Signature()
}
log.Infow("done fetching randomness", "round", round, "took", time.Since(start))
log.Infow("done fetching randomness", "round", round, "took", build.Clock.Since(start))
out <- br
close(out)
}()

View File

@ -5,6 +5,7 @@ import (
"sync"
"time"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/types"
"github.com/hashicorp/golang-lru"
peer "github.com/libp2p/go-libp2p-core/peer"
@ -37,14 +38,14 @@ func (brt *blockReceiptTracker) Add(p peer.ID, ts *types.TipSet) {
if !ok {
pset := &peerSet{
peers: map[peer.ID]time.Time{
p: time.Now(),
p: build.Clock.Now(),
},
}
brt.cache.Add(ts.Key(), pset)
return
}
val.(*peerSet).peers[p] = time.Now()
val.(*peerSet).peers[p] = build.Clock.Now()
}
func (brt *blockReceiptTracker) GetPeers(ts *types.TipSet) []peer.ID {

View File

@ -11,6 +11,7 @@ import (
cborutil "github.com/filecoin-project/go-cbor-util"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types"
@ -126,7 +127,7 @@ func (bss *BlockSyncService) HandleStream(s inet.Stream) {
}
writeDeadline := 60 * time.Second
_ = s.SetDeadline(time.Now().Add(writeDeadline))
_ = s.SetDeadline(build.Clock.Now().Add(writeDeadline))
if err := cborutil.WriteCborRPC(s, resp); err != nil {
log.Warnw("failed to write back response for handle stream", "err", err, "peer", s.Conn().RemotePeer())
return

View File

@ -21,6 +21,7 @@ import (
"golang.org/x/xerrors"
cborutil "github.com/filecoin-project/go-cbor-util"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types"
incrt "github.com/filecoin-project/lotus/lib/increadtimeout"
@ -91,7 +92,7 @@ func (bs *BlockSync) GetBlocks(ctx context.Context, tsk types.TipSetKey, count i
// randomize the first few peers so we don't always pick the same peer
shufflePrefix(peers)
start := time.Now()
start := build.Clock.Now()
var oerr error
for _, p := range peers {
@ -117,7 +118,7 @@ func (bs *BlockSync) GetBlocks(ctx context.Context, tsk types.TipSetKey, count i
if err != nil {
return nil, xerrors.Errorf("success response from peer failed to process: %w", err)
}
bs.syncPeers.logGlobalSuccess(time.Since(start))
bs.syncPeers.logGlobalSuccess(build.Clock.Since(start))
bs.host.ConnManager().TagPeer(p, "bsync", 25)
return resp, nil
}
@ -197,7 +198,7 @@ func (bs *BlockSync) GetChainMessages(ctx context.Context, h *types.TipSet, coun
}
var err error
start := time.Now()
start := build.Clock.Now()
for _, p := range peers {
res, rerr := bs.sendRequestToPeer(ctx, p, req)
@ -208,7 +209,7 @@ func (bs *BlockSync) GetChainMessages(ctx context.Context, h *types.TipSet, coun
}
if res.Status == StatusOK {
bs.syncPeers.logGlobalSuccess(time.Since(start))
bs.syncPeers.logGlobalSuccess(build.Clock.Since(start))
return res.Chain, nil
}
@ -284,17 +285,17 @@ func (bs *BlockSync) fetchBlocksBlockSync(ctx context.Context, p peer.ID, req *B
ctx, span := trace.StartSpan(ctx, "blockSyncFetch")
defer span.End()
start := time.Now()
start := build.Clock.Now()
s, err := bs.host.NewStream(inet.WithNoDial(ctx, "should already have connection"), p, BlockSyncProtocolID)
if err != nil {
bs.RemovePeer(p)
return nil, xerrors.Errorf("failed to open stream to peer: %w", err)
}
_ = s.SetWriteDeadline(time.Now().Add(5 * time.Second))
_ = s.SetWriteDeadline(build.Clock.Now().Add(5 * time.Second))
if err := cborutil.WriteCborRPC(s, req); err != nil {
_ = s.SetWriteDeadline(time.Time{})
bs.syncPeers.logFailure(p, time.Since(start))
bs.syncPeers.logFailure(p, build.Clock.Since(start))
return nil, err
}
_ = s.SetWriteDeadline(time.Time{})
@ -302,7 +303,7 @@ func (bs *BlockSync) fetchBlocksBlockSync(ctx context.Context, p peer.ID, req *B
var res BlockSyncResponse
r := incrt.New(s, 50<<10, 5*time.Second)
if err := cborutil.ReadCborRPC(bufio.NewReader(r), &res); err != nil {
bs.syncPeers.logFailure(p, time.Since(start))
bs.syncPeers.logFailure(p, build.Clock.Since(start))
return nil, err
}
@ -314,7 +315,7 @@ func (bs *BlockSync) fetchBlocksBlockSync(ctx context.Context, p peer.ID, req *B
)
}
bs.syncPeers.logSuccess(p, time.Since(start))
bs.syncPeers.logSuccess(p, build.Clock.Since(start))
return &res, nil
}
@ -475,7 +476,7 @@ func (bpt *bsPeerTracker) addPeer(p peer.ID) {
return
}
bpt.peers[p] = &peerStats{
firstSeen: time.Now(),
firstSeen: build.Clock.Now(),
}
}

View File

@ -99,7 +99,7 @@ func (e *Events) listenHeadChanges(ctx context.Context) {
log.Warnf("not restarting listenHeadChanges: context error: %s", ctx.Err())
return
}
time.Sleep(time.Second)
build.Clock.Sleep(time.Second)
log.Info("restarting listenHeadChanges")
}
}

View File

@ -196,7 +196,7 @@ func NewGeneratorWithSectors(numSectors int) (*ChainGen, error) {
*genm2,
},
NetworkName: "",
Timestamp: uint64(time.Now().Add(-500 * time.Duration(build.BlockDelaySecs) * time.Second).Unix()),
Timestamp: uint64(build.Clock.Now().Add(-500 * time.Duration(build.BlockDelaySecs) * time.Second).Unix()),
}
genb, err := genesis2.MakeGenesisBlock(context.TODO(), bs, sys, tpl)

View File

@ -23,12 +23,15 @@ import (
"golang.org/x/xerrors"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/stmgr"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/lib/sigs"
"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/raulk/clock"
)
var log = logging.Logger("messagepool")
@ -66,7 +69,7 @@ type MessagePool struct {
lk sync.Mutex
closer chan struct{}
repubTk *time.Ticker
repubTk *clock.Ticker
localAddrs map[address.Address]struct{}
@ -187,7 +190,7 @@ func New(api Provider, ds dtypes.MetadataDS, netName dtypes.NetworkName) (*Messa
mp := &MessagePool{
closer: make(chan struct{}),
repubTk: time.NewTicker(time.Duration(build.BlockDelaySecs) * 10 * time.Second),
repubTk: build.Clock.Ticker(time.Duration(build.BlockDelaySecs) * 10 * time.Second),
localAddrs: make(map[address.Address]struct{}),
pending: make(map[address.Address]*msgSet),
minGasPrice: types.NewInt(0),

View File

@ -3,7 +3,6 @@ package metrics
import (
"context"
"encoding/json"
"time"
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/ipfs/go-cid"
@ -11,6 +10,7 @@ import (
pubsub "github.com/libp2p/go-libp2p-pubsub"
"go.uber.org/fx"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/node/impl/full"
"github.com/filecoin-project/lotus/node/modules/helpers"
@ -89,7 +89,7 @@ func sendHeadNotifs(ctx context.Context, ps *pubsub.PubSub, topic string, chain
}
// using unix nano time makes very sure we pick a nonce higher than previous restart
nonce := uint64(time.Now().UnixNano())
nonce := uint64(build.Clock.Now().UnixNano())
for {
select {
@ -107,7 +107,7 @@ func sendHeadNotifs(ctx context.Context, ps *pubsub.PubSub, topic string, chain
Height: n.Val.Height(),
Weight: w,
NodeName: nickname,
Time: uint64(time.Now().UnixNano() / 1000_000),
Time: uint64(build.Clock.Now().UnixNano() / 1000_000),
Nonce: nonce,
}

View File

@ -61,7 +61,7 @@ func HandleIncomingBlocks(ctx context.Context, bsub *pubsub.Subscription, s *cha
src := msg.GetFrom()
go func() {
start := time.Now()
start := build.Clock.Now()
log.Debug("about to fetch messages for block from pubsub")
bmsgs, err := s.Bsync.FetchMessagesByCids(context.TODO(), blk.BlsMessages)
if err != nil {
@ -75,9 +75,9 @@ func HandleIncomingBlocks(ctx context.Context, bsub *pubsub.Subscription, s *cha
return
}
took := time.Since(start)
took := build.Clock.Since(start)
log.Infow("new block over pubsub", "cid", blk.Header.Cid(), "source", msg.GetFrom(), "msgfetch", took)
if delay := time.Now().Unix() - int64(blk.Header.Timestamp); delay > 5 {
if delay := build.Clock.Now().Unix() - int64(blk.Header.Timestamp); delay > 5 {
log.Warnf("Received block with large delay %d from miner %s", delay, blk.Header.Miner)
}
@ -142,9 +142,9 @@ func (bv *BlockValidator) flagPeer(p peer.ID) {
func (bv *BlockValidator) Validate(ctx context.Context, pid peer.ID, msg *pubsub.Message) pubsub.ValidationResult {
// track validation time
begin := time.Now()
begin := build.Clock.Now()
defer func() {
log.Debugf("block validation time: %s", time.Since(begin))
log.Debugf("block validation time: %s", build.Clock.Since(begin))
}()
stats.Record(ctx, metrics.BlockReceived.M(1))
@ -233,7 +233,7 @@ func (bv *BlockValidator) Validate(ctx context.Context, pid peer.ID, msg *pubsub
func (bv *BlockValidator) isChainNearSynced() bool {
ts := bv.chain.GetHeaviestTipSet()
timestamp := ts.MinTimestamp()
now := time.Now().UnixNano()
now := build.Clock.Now().UnixNano()
cutoff := uint64(now) - uint64(6*time.Hour)
return timestamp > cutoff
}

View File

@ -620,7 +620,7 @@ func (syncer *Syncer) ValidateBlock(ctx context.Context, b *types.FullBlock) (er
return nil
}
validationStart := time.Now()
validationStart := build.Clock.Now()
defer func() {
dur := time.Since(validationStart)
durMilli := dur.Seconds() * float64(1000)
@ -665,12 +665,12 @@ func (syncer *Syncer) ValidateBlock(ctx context.Context, b *types.FullBlock) (er
// fast checks first
now := uint64(time.Now().Unix())
now := uint64(build.Clock.Now().Unix())
if h.Timestamp > now+build.AllowableClockDriftSecs {
return xerrors.Errorf("block was from the future (now=%d, blk=%d): %w", now, h.Timestamp, ErrTemporal)
}
if h.Timestamp > now {
log.Warn("Got block from the future, but within threshold", h.Timestamp, time.Now().Unix())
log.Warn("Got block from the future, but within threshold", h.Timestamp, build.Clock.Now().Unix())
}
if h.Timestamp < baseTs.MinTimestamp()+(build.BlockDelaySecs*uint64(h.Height-baseTs.Height())) {
@ -1538,6 +1538,6 @@ func (syncer *Syncer) IsEpochBeyondCurrMax(epoch abi.ChainEpoch) bool {
return false
}
now := uint64(time.Now().Unix())
now := uint64(build.Clock.Now().Unix())
return epoch > (abi.ChainEpoch((now-g.Timestamp)/build.BlockDelaySecs) + MaxHeightDrift)
}

View File

@ -8,6 +8,7 @@ import (
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/types"
)
@ -48,7 +49,7 @@ func (ss *SyncerState) SetStage(v api.SyncStateStage) {
defer ss.lk.Unlock()
ss.Stage = v
if v == api.StageSyncComplete {
ss.End = time.Now()
ss.End = build.Clock.Now()
}
}
@ -64,7 +65,7 @@ func (ss *SyncerState) Init(base, target *types.TipSet) {
ss.Stage = api.StageHeaders
ss.Height = 0
ss.Message = ""
ss.Start = time.Now()
ss.Start = build.Clock.Now()
ss.End = time.Time{}
}
@ -87,7 +88,7 @@ func (ss *SyncerState) Error(err error) {
defer ss.lk.Unlock()
ss.Message = err.Error()
ss.Stage = api.StageSyncErrored
ss.End = time.Now()
ss.End = build.Clock.Now()
}
func (ss *SyncerState) Snapshot() SyncerState {

View File

@ -373,8 +373,7 @@ func (rt *Runtime) Send(to address.Address, method abi.MethodNum, m vmr.CBORMars
}
func (rt *Runtime) internalSend(from, to address.Address, method abi.MethodNum, value types.BigInt, params []byte) ([]byte, aerrors.ActorError) {
start := time.Now()
start := build.Clock.Now()
ctx, span := trace.StartSpan(rt.ctx, "vmc.Send")
defer span.End()
if span.IsRecordingEvents() {
@ -528,7 +527,7 @@ func (rt *Runtime) chargeGasInternal(gas GasCharge, skip int) aerrors.ActorError
var callers [10]uintptr
cout := gruntime.Callers(2+skip, callers[:])
now := time.Now()
now := build.Clock.Now()
if rt.lastGasCharge != nil {
rt.lastGasCharge.TimeTaken = now.Sub(rt.lastGasChargeTime)
}

View File

@ -28,6 +28,7 @@ import (
"github.com/filecoin-project/specs-actors/actors/runtime"
"github.com/filecoin-project/specs-actors/actors/runtime/exitcode"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/actors/aerrors"
"github.com/filecoin-project/lotus/chain/state"
"github.com/filecoin-project/lotus/chain/types"
@ -285,7 +286,7 @@ func checkMessage(msg *types.Message) error {
}
func (vm *VM) ApplyImplicitMessage(ctx context.Context, msg *types.Message) (*ApplyRet, error) {
start := time.Now()
start := build.Clock.Now()
ret, actorErr, rt := vm.send(ctx, msg, nil, nil, start)
rt.finilizeGasTracing()
return &ApplyRet{
@ -302,7 +303,7 @@ func (vm *VM) ApplyImplicitMessage(ctx context.Context, msg *types.Message) (*Ap
}
func (vm *VM) ApplyMessage(ctx context.Context, cmsg types.ChainMsg) (*ApplyRet, error) {
start := time.Now()
start := build.Clock.Now()
ctx, span := trace.StartSpan(ctx, "vm.ApplyMessage")
defer span.End()
msg := cmsg.VMMessage()

View File

@ -195,7 +195,7 @@ func SyncWait(ctx context.Context, napi api.FullNode) error {
case <-ctx.Done():
fmt.Println("\nExit by user")
return nil
case <-time.After(1 * time.Second):
case <-build.Clock.After(1 * time.Second):
}
}
}

View File

@ -11,6 +11,7 @@ import (
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/types"
lcli "github.com/filecoin-project/lotus/cli"
@ -68,7 +69,7 @@ func sendSmallFundsTxs(ctx context.Context, api api.FullNode, from address.Addre
sendSet = append(sendSet, naddr)
}
tick := time.NewTicker(time.Second / time.Duration(rate))
tick := build.Clock.Ticker(time.Second / time.Duration(rate))
for {
select {
case <-tick.C:

1
go.mod
View File

@ -103,6 +103,7 @@ require (
github.com/multiformats/go-multibase v0.0.3
github.com/multiformats/go-multihash v0.0.13
github.com/opentracing/opentracing-go v1.1.0
github.com/raulk/clock v1.1.0
github.com/stretchr/objx v0.2.0 // indirect
github.com/stretchr/testify v1.6.1
github.com/syndtr/goleveldb v1.0.0

2
go.sum
View File

@ -1221,6 +1221,8 @@ github.com/prometheus/procfs v0.0.11 h1:DhHlBtkHWPYi8O2y31JkK0TF+DGM+51OopZjH/Ia
github.com/prometheus/procfs v0.0.11/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU=
github.com/prometheus/procfs v0.1.0 h1:jhMy6QXfi3y2HEzFoyuCj40z4OZIIHHPtFyCMftmvKA=
github.com/prometheus/procfs v0.1.0/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU=
github.com/raulk/clock v1.1.0 h1:dpb29+UKMbLqiU/jqIJptgLR1nn23HLgMY0sTCDza5Y=
github.com/raulk/clock v1.1.0/go.mod h1:3MpVxdZ/ODBQDxbN+kzshf5OSZwPjtMDx6BBXBmOeY0=
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=

View File

@ -10,6 +10,8 @@ import (
logging "github.com/ipfs/go-log"
"golang.org/x/xerrors"
"github.com/filecoin-project/lotus/build"
)
func InitializeSystemJournal(dir string) error {
@ -103,7 +105,7 @@ func (fsj *fsJournal) rollJournalFile() error {
fsj.fi.Close()
}
nfi, err := os.Create(filepath.Join(fsj.journalDir, fmt.Sprintf("lotus-journal-%s.ndjson", time.Now().Format(time.RFC3339))))
nfi, err := os.Create(filepath.Join(fsj.journalDir, fmt.Sprintf("lotus-journal-%s.ndjson", build.Clock.Now().Format(time.RFC3339))))
if err != nil {
return xerrors.Errorf("failed to open journal file: %w", err)
}
@ -130,7 +132,7 @@ func (fsj *fsJournal) runLoop() {
func (fsj *fsJournal) AddEntry(system string, obj interface{}) {
je := &JournalEntry{
System: system,
Timestamp: time.Now(),
Timestamp: build.Clock.Now(),
Val: obj,
}
select {

View File

@ -5,12 +5,12 @@ import (
"time"
logging "github.com/ipfs/go-log/v2"
"github.com/filecoin-project/lotus/build"
)
var log = logging.Logger("incrt")
var now = time.Now
type ReaderDeadline interface {
Read([]byte) (int, error)
SetReadDeadline(time.Time) error
@ -45,7 +45,7 @@ func (err errNoWait) Timeout() bool {
}
func (crt *incrt) Read(buf []byte) (int, error) {
start := now()
start := build.Clock.Now()
if crt.wait == 0 {
return 0, errNoWait{}
}
@ -59,7 +59,7 @@ func (crt *incrt) Read(buf []byte) (int, error) {
_ = crt.rd.SetReadDeadline(time.Time{})
if err == nil {
dur := now().Sub(start)
dur := build.Clock.Now().Sub(start)
crt.wait -= dur
crt.wait += time.Duration(n) * crt.waitPerByte
if crt.wait < 0 {

View File

@ -5,6 +5,7 @@ import (
"sync"
"time"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/metrics"
"github.com/filecoin-project/lotus/node/modules/dtypes"
"go.opencensus.io/stats"
@ -107,7 +108,7 @@ func (pmgr *PeerMgr) Disconnect(p peer.ID) {
}
func (pmgr *PeerMgr) Run(ctx context.Context) {
tick := time.NewTicker(time.Second * 5)
tick := build.Clock.Ticker(time.Second * 5)
for {
select {
case <-tick.C:

View File

@ -46,7 +46,7 @@ func NewMiner(api api.FullNode, epp gen.WinningPoStProver, addr address.Address)
waitFunc: func(ctx context.Context, baseTime uint64) (func(bool, error), error) {
// Wait around for half the block time in case other parents come in
deadline := baseTime + build.PropagationDelaySecs
time.Sleep(time.Until(time.Unix(int64(deadline), 0)))
build.Clock.Sleep(build.Clock.Until(time.Unix(int64(deadline), 0)))
return func(bool, error) {}, nil
},
@ -107,7 +107,7 @@ func (m *Miner) Stop(ctx context.Context) error {
func (m *Miner) niceSleep(d time.Duration) bool {
select {
case <-time.After(d):
case <-build.Clock.After(d):
return true
case <-m.stop:
return false
@ -170,14 +170,14 @@ func (m *Miner) mine(ctx context.Context) {
if b != nil {
btime := time.Unix(int64(b.Header.Timestamp), 0)
if time.Now().Before(btime) {
if !m.niceSleep(time.Until(btime)) {
if build.Clock.Now().Before(btime) {
if !m.niceSleep(build.Clock.Until(btime)) {
log.Warnf("received interrupt while waiting to broadcast block, will shutdown after block is sent out")
time.Sleep(time.Until(btime))
build.Clock.Sleep(build.Clock.Until(btime))
}
} else {
log.Warnw("mined block in the past", "block-time", btime,
"time", time.Now(), "duration", time.Since(btime))
"time", build.Clock.Now(), "duration", build.Clock.Since(btime))
}
// TODO: should do better 'anti slash' protection here
@ -201,7 +201,7 @@ func (m *Miner) mine(ctx context.Context) {
nextRound := time.Unix(int64(base.TipSet.MinTimestamp()+build.BlockDelaySecs*uint64(base.NullRounds))+int64(build.PropagationDelaySecs), 0)
select {
case <-time.After(time.Until(nextRound)):
case <-build.Clock.After(build.Clock.Until(nextRound)):
case <-m.stop:
stopping := m.stopping
m.stop = nil
@ -271,7 +271,7 @@ func (m *Miner) hasPower(ctx context.Context, addr address.Address, ts *types.Ti
// 1.
func (m *Miner) mineOne(ctx context.Context, base *MiningBase) (*types.BlockMsg, error) {
log.Debugw("attempting to mine a block", "tipset", types.LogCids(base.TipSet.Cids()))
start := time.Now()
start := build.Clock.Now()
round := base.TipSet.Height() + base.NullRounds + 1
@ -283,11 +283,11 @@ func (m *Miner) mineOne(ctx context.Context, base *MiningBase) (*types.BlockMsg,
return nil, nil
}
tMBI := time.Now()
tMBI := build.Clock.Now()
beaconPrev := mbi.PrevBeaconEntry
tDrand := time.Now()
tDrand := build.Clock.Now()
bvals := mbi.BeaconEntries
hasPower, err := m.hasPower(ctx, m.address, base.TipSet)
@ -299,9 +299,9 @@ func (m *Miner) mineOne(ctx context.Context, base *MiningBase) (*types.BlockMsg,
return nil, nil
}
tPowercheck := time.Now()
tPowercheck := build.Clock.Now()
log.Infof("Time delta between now and our mining base: %ds (nulls: %d)", uint64(time.Now().Unix())-base.TipSet.MinTimestamp(), base.NullRounds)
log.Infof("Time delta between now and our mining base: %ds (nulls: %d)", uint64(build.Clock.Now().Unix())-base.TipSet.MinTimestamp(), base.NullRounds)
rbase := beaconPrev
if len(bvals) > 0 {
@ -322,7 +322,7 @@ func (m *Miner) mineOne(ctx context.Context, base *MiningBase) (*types.BlockMsg,
return nil, nil
}
tTicket := time.Now()
tTicket := build.Clock.Now()
buf := new(bytes.Buffer)
if err := m.address.MarshalCBOR(buf); err != nil {
@ -336,7 +336,7 @@ func (m *Miner) mineOne(ctx context.Context, base *MiningBase) (*types.BlockMsg,
prand := abi.PoStRandomness(rand)
tSeed := time.Now()
tSeed := build.Clock.Now()
postProof, err := m.epp.ComputeProof(ctx, mbi.Sectors, prand)
if err != nil {
@ -349,7 +349,7 @@ func (m *Miner) mineOne(ctx context.Context, base *MiningBase) (*types.BlockMsg,
return nil, xerrors.Errorf("failed to get pending messages: %w", err)
}
tPending := time.Now()
tPending := build.Clock.Now()
// TODO: winning post proof
b, err := m.createBlock(base, m.address, ticket, winner, bvals, postProof, pending)
@ -357,7 +357,7 @@ func (m *Miner) mineOne(ctx context.Context, base *MiningBase) (*types.BlockMsg,
return nil, xerrors.Errorf("failed to create block: %w", err)
}
tCreateBlock := time.Now()
tCreateBlock := build.Clock.Now()
dur := tCreateBlock.Sub(start)
log.Infow("mined new block", "cid", b.Cid(), "height", b.Header.Height, "took", dur)
if dur > time.Second*time.Duration(build.BlockDelaySecs) {
@ -502,7 +502,7 @@ func SelectMessages(ctx context.Context, al ActorLookup, ts *types.TipSet, msgs
tooLowFundMsgs := 0
tooHighNonceMsgs := 0
start := time.Now()
start := build.Clock.Now()
vmValid := time.Duration(0)
getbal := time.Duration(0)
@ -511,7 +511,7 @@ func SelectMessages(ctx context.Context, al ActorLookup, ts *types.TipSet, msgs
})
for _, msg := range msgs {
vmstart := time.Now()
vmstart := build.Clock.Now()
minGas := vm.PricelistByEpoch(ts.Height()).OnChainMessage(msg.ChainLength()) // TODO: really should be doing just msg.ChainLength() but the sync side of this code doesnt seem to have access to that
if err := msg.VMMessage().ValidForBlockInclusion(minGas.Total()); err != nil {
@ -519,7 +519,7 @@ func SelectMessages(ctx context.Context, al ActorLookup, ts *types.TipSet, msgs
continue
}
vmValid += time.Since(vmstart)
vmValid += build.Clock.Since(vmstart)
// TODO: this should be in some more general 'validate message' call
if msg.Message.GasLimit > build.BlockGasLimit {
@ -534,7 +534,7 @@ func SelectMessages(ctx context.Context, al ActorLookup, ts *types.TipSet, msgs
from := msg.Message.From
getBalStart := time.Now()
getBalStart := build.Clock.Now()
if _, ok := inclNonces[from]; !ok {
act, err := al(ctx, from, ts.Key())
if err != nil {
@ -545,7 +545,7 @@ func SelectMessages(ctx context.Context, al ActorLookup, ts *types.TipSet, msgs
inclNonces[from] = act.Nonce
inclBalances[from] = act.Balance
}
getbal += time.Since(getBalStart)
getbal += build.Clock.Since(getBalStart)
if inclBalances[from].LessThan(msg.Message.RequiredFunds()) {
tooLowFundMsgs++
@ -648,7 +648,7 @@ func SelectMessages(ctx context.Context, al ActorLookup, ts *types.TipSet, msgs
log.Warnf("%d messages in mempool had too high nonce", tooHighNonceMsgs)
}
sm := time.Now()
sm := build.Clock.Now()
if sm.Sub(start) > time.Second {
log.Warnw("SelectMessages took a long time",
"duration", sm.Sub(start),

View File

@ -15,6 +15,7 @@ import (
protocol "github.com/libp2p/go-libp2p-core/protocol"
cborutil "github.com/filecoin-project/go-cbor-util"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types"
@ -67,7 +68,7 @@ func (hs *Service) HandleStream(s inet.Stream) {
_ = s.Conn().Close()
return
}
arrived := time.Now()
arrived := build.Clock.Now()
log.Debugw("genesis from hello",
"tipset", hmsg.HeaviestTipSet,
@ -82,7 +83,7 @@ func (hs *Service) HandleStream(s inet.Stream) {
go func() {
defer s.Close() //nolint:errcheck
sent := time.Now()
sent := build.Clock.Now()
msg := &LatencyMessage{
TArrial: arrived.UnixNano(),
TSent: sent.UnixNano(),
@ -99,7 +100,7 @@ func (hs *Service) HandleStream(s inet.Stream) {
if len(protos) == 0 {
log.Warn("other peer hasnt completed libp2p identify, waiting a bit")
// TODO: this better
time.Sleep(time.Millisecond * 300)
build.Clock.Sleep(time.Millisecond * 300)
}
ts, err := hs.syncer.FetchTipSet(context.Background(), s.Conn().RemotePeer(), types.NewTipSetKey(hmsg.HeaviestTipSet...))
@ -146,7 +147,7 @@ func (hs *Service) SayHello(ctx context.Context, pid peer.ID) error {
}
log.Debug("Sending hello message: ", hts.Cids(), hts.Height(), gen.Cid())
t0 := time.Now()
t0 := build.Clock.Now()
if err := cborutil.WriteCborRPC(s, hmsg); err != nil {
return err
}
@ -155,13 +156,13 @@ func (hs *Service) SayHello(ctx context.Context, pid peer.ID) error {
defer s.Close() //nolint:errcheck
lmsg := &LatencyMessage{}
_ = s.SetReadDeadline(time.Now().Add(10 * time.Second))
_ = s.SetReadDeadline(build.Clock.Now().Add(10 * time.Second))
err := cborutil.ReadCborRPC(s, lmsg)
if err != nil {
log.Infow("reading latency message", "error", err)
}
t3 := time.Now()
t3 := build.Clock.Now()
lat := t3.Sub(t0)
// add to peer tracker
if hs.pmgr != nil {

View File

@ -7,7 +7,6 @@ import (
"io"
"io/ioutil"
"os"
"time"
"github.com/ipfs/go-blockservice"
"github.com/ipfs/go-cid"
@ -20,6 +19,7 @@ import (
"github.com/filecoin-project/specs-actors/actors/runtime"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/gen"
genesis2 "github.com/filecoin-project/lotus/chain/gen/genesis"
"github.com/filecoin-project/lotus/chain/types"
@ -71,7 +71,7 @@ func MakeGenesis(outFile, genesisTemplate string) func(bs dtypes.ChainBlockstore
}
if template.Timestamp == 0 {
template.Timestamp = uint64(time.Now().Unix())
template.Timestamp = uint64(build.Clock.Now().Unix())
}
b, err := genesis2.MakeGenesisBlock(context.TODO(), bs, syscalls, template)

View File

@ -167,7 +167,7 @@ func NewWinningPoStProver(api api.FullNode, prover storage.Prover, verifier ffiw
var _ gen.WinningPoStProver = (*StorageWpp)(nil)
func (wpp *StorageWpp) GenerateCandidates(ctx context.Context, randomness abi.PoStRandomness, eligibleSectorCount uint64) ([]uint64, error) {
start := time.Now()
start := build.Clock.Now()
cds, err := wpp.verifier.GenerateWinningPoStSectorChallenge(ctx, wpp.winnRpt, wpp.miner, randomness, eligibleSectorCount)
if err != nil {
@ -185,7 +185,7 @@ func (wpp *StorageWpp) ComputeProof(ctx context.Context, ssi []abi.SectorInfo, r
log.Infof("Computing WinningPoSt ;%+v; %v", ssi, rand)
start := time.Now()
start := build.Clock.Now()
proof, err := wpp.prover.GenerateWinningPoSt(ctx, wpp.miner, ssi, rand)
if err != nil {
return nil, err

View File

@ -430,7 +430,7 @@ func (s *WindowPoStScheduler) runPost(ctx context.Context, di miner.DeadlineInfo
snums = append(snums, si.SectorNumber)
}
tsStart := time.Now()
tsStart := build.Clock.Now()
log.Infow("generating windowPost",
"sectors", len(ssi))

View File

@ -14,6 +14,7 @@ import (
"github.com/filecoin-project/specs-storage/storage"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types"
)
@ -85,7 +86,7 @@ func (s *WindowPoStScheduler) Run(ctx context.Context) {
if err != nil {
log.Errorf("ChainNotify error: %+v")
time.Sleep(10 * time.Second)
build.Clock.Sleep(10 * time.Second)
continue
}

View File

@ -66,7 +66,7 @@ func NewInfluxWriteQueue(ctx context.Context, influx client.Client) *InfluxWrite
for i := 0; i < maxRetries; i++ {
if err := influx.Write(batch); err != nil {
log.Warnw("Failed to write batch", "error", err)
time.Sleep(time.Second * 15)
build.Clock.Sleep(15 * time.Second)
continue
}
@ -104,7 +104,7 @@ func InfluxNewBatch() (client.BatchPoints, error) {
}
func NewPoint(name string, value interface{}) models.Point {
pt, _ := models.NewPoint(name, models.Tags{}, map[string]interface{}{"value": value}, time.Now())
pt, _ := models.NewPoint(name, models.Tags{}, map[string]interface{}{"value": value}, build.Clock.Now())
return pt
}

View File

@ -52,7 +52,7 @@ sync_complete:
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(5 * time.Second):
case <-build.Clock.After(5 * time.Second):
state, err := napi.SyncState(ctx)
if err != nil {
return err
@ -97,13 +97,13 @@ sync_complete:
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(5 * time.Second):
case <-build.Clock.After(5 * time.Second):
head, err := napi.ChainHead(ctx)
if err != nil {
return err
}
timestampDelta := time.Now().Unix() - int64(head.MinTimestamp())
timestampDelta := build.Clock.Now().Unix() - int64(head.MinTimestamp())
log.Infow(
"Waiting for reasonable head height",