track writeEpoch relative to current wall clock time

The issue: head change notifications are not emitted until after catching up,
which results in all writes during a catch up period being tracked at the base epoch.
This commit is contained in:
vyzo 2021-06-17 12:32:32 +03:00
parent 421f05eab9
commit bb17608ae0

View File

@ -24,6 +24,7 @@ import (
"github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/metrics" "github.com/filecoin-project/lotus/metrics"
"github.com/filecoin-project/specs-actors/v2/actors/builtin"
"go.opencensus.io/stats" "go.opencensus.io/stats"
) )
@ -122,6 +123,7 @@ type SplitStore struct {
baseEpoch abi.ChainEpoch baseEpoch abi.ChainEpoch
syncGapEpoch abi.ChainEpoch syncGapEpoch abi.ChainEpoch
warmupEpoch abi.ChainEpoch warmupEpoch abi.ChainEpoch
writeEpoch abi.ChainEpoch
coldPurgeSize int coldPurgeSize int
@ -137,6 +139,9 @@ type SplitStore struct {
env MarkSetEnv env MarkSetEnv
markSetSize int64 markSetSize int64
ctx context.Context
cancel func()
} }
var _ bstore.Blockstore = (*SplitStore)(nil) var _ bstore.Blockstore = (*SplitStore)(nil)
@ -170,6 +175,8 @@ func Open(path string, ds dstore.Datastore, hot, cold bstore.Blockstore, cfg *Co
coldPurgeSize: defaultColdPurgeSize, coldPurgeSize: defaultColdPurgeSize,
} }
ss.ctx, ss.cancel = context.WithCancel(context.Background())
return ss, nil return ss, nil
} }
@ -239,7 +246,7 @@ func (s *SplitStore) Put(blk blocks.Block) error {
return s.cold.Put(blk) return s.cold.Put(blk)
} }
epoch := s.curTs.Height() epoch := s.writeEpoch
s.mx.Unlock() s.mx.Unlock()
err := s.tracker.Put(blk.Cid(), epoch) err := s.tracker.Put(blk.Cid(), epoch)
@ -258,7 +265,7 @@ func (s *SplitStore) PutMany(blks []blocks.Block) error {
return s.cold.PutMany(blks) return s.cold.PutMany(blks)
} }
epoch := s.curTs.Height() epoch := s.writeEpoch
s.mx.Unlock() s.mx.Unlock()
batch := make([]cid.Cid, 0, len(blks)) batch := make([]cid.Cid, 0, len(blks))
@ -398,7 +405,11 @@ func (s *SplitStore) Start(chain ChainAccessor) error {
return xerrors.Errorf("error loading mark set size: %w", err) return xerrors.Errorf("error loading mark set size: %w", err)
} }
log.Infow("starting splitstore", "baseEpoch", s.baseEpoch, "warmupEpoch", s.warmupEpoch) s.updateWriteEpoch()
log.Infow("starting splitstore", "baseEpoch", s.baseEpoch, "warmupEpoch", s.warmupEpoch, "writeEpoch", s.writeEpoch)
go s.background()
// watch the chain // watch the chain
chain.SubscribeHeadChanges(s.HeadChange) chain.SubscribeHeadChanges(s.HeadChange)
@ -416,6 +427,7 @@ func (s *SplitStore) Close() error {
} }
} }
s.cancel()
return multierr.Combine(s.tracker.Close(), s.env.Close()) return multierr.Combine(s.tracker.Close(), s.env.Close())
} }
@ -431,6 +443,8 @@ func (s *SplitStore) HeadChange(_, apply []*types.TipSet) error {
s.curTs = curTs s.curTs = curTs
s.mx.Unlock() s.mx.Unlock()
s.updateWriteEpoch()
timestamp := time.Unix(int64(curTs.MinTimestamp()), 0) timestamp := time.Unix(int64(curTs.MinTimestamp()), 0)
if time.Since(timestamp) > SyncGapTime { if time.Since(timestamp) > SyncGapTime {
err := s.setSyncGapEpoch(epoch) err := s.setSyncGapEpoch(epoch)
@ -467,6 +481,44 @@ func (s *SplitStore) HeadChange(_, apply []*types.TipSet) error {
return nil return nil
} }
func (s *SplitStore) updateWriteEpoch() {
s.mx.Lock()
defer s.mx.Unlock()
curTs := s.curTs
timestamp := time.Unix(int64(curTs.MinTimestamp()), 0)
dt := time.Since(timestamp)
if dt < 0 {
writeEpoch := curTs.Height()
if writeEpoch > s.writeEpoch {
s.writeEpoch = writeEpoch
}
return
}
writeEpoch := curTs.Height() + abi.ChainEpoch(dt.Seconds())/builtin.EpochDurationSeconds
if writeEpoch > s.writeEpoch {
s.writeEpoch = writeEpoch
}
}
func (s *SplitStore) background() {
ticker := time.NewTicker(time.Duration(builtin.EpochDurationSeconds) * time.Second)
defer ticker.Stop()
for {
select {
case <-s.ctx.Done():
return
case <-ticker.C:
s.updateWriteEpoch()
}
}
}
func (s *SplitStore) warmup(curTs *types.TipSet) error { func (s *SplitStore) warmup(curTs *types.TipSet) error {
err := s.loadGenesisState() err := s.loadGenesisState()
if err != nil { if err != nil {