2020-11-26 14:51:16 +00:00
package splitstore
2020-11-24 14:51:00 +00:00
import (
2021-03-13 10:00:28 +00:00
"bytes"
2020-11-24 14:51:00 +00:00
"context"
2020-11-24 17:41:07 +00:00
"encoding/binary"
2020-11-24 14:51:00 +00:00
"errors"
2021-07-04 17:17:07 +00:00
"sort"
2020-11-25 07:10:58 +00:00
"sync"
2020-11-29 13:10:30 +00:00
"sync/atomic"
2020-11-25 08:11:42 +00:00
"time"
2020-11-24 14:51:00 +00:00
2021-03-03 08:56:41 +00:00
"go.uber.org/multierr"
2021-02-26 13:59:36 +00:00
"golang.org/x/xerrors"
2020-11-24 14:51:00 +00:00
blocks "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid"
2020-11-24 17:41:07 +00:00
dstore "github.com/ipfs/go-datastore"
2020-11-26 14:51:16 +00:00
logging "github.com/ipfs/go-log/v2"
2021-03-13 10:00:28 +00:00
cbg "github.com/whyrusleeping/cbor-gen"
2020-11-24 14:51:00 +00:00
"github.com/filecoin-project/go-state-types/abi"
2021-03-01 16:15:39 +00:00
2021-03-01 07:25:52 +00:00
bstore "github.com/filecoin-project/lotus/blockstore"
2020-11-24 17:15:38 +00:00
"github.com/filecoin-project/lotus/build"
2020-11-24 14:51:00 +00:00
"github.com/filecoin-project/lotus/chain/types"
2021-03-05 09:54:06 +00:00
"github.com/filecoin-project/lotus/metrics"
"go.opencensus.io/stats"
2020-11-24 14:51:00 +00:00
)
2021-02-28 11:51:42 +00:00
var (
2021-03-02 00:47:21 +00:00
// CompactionThreshold is the number of epochs that need to have elapsed
// from the previously compacted epoch to trigger a new compaction.
//
// |················· CompactionThreshold ··················|
2021-07-04 07:06:55 +00:00
// | |
// =======‖≡≡≡≡≡≡≡≡≡≡≡≡≡≡≡≡≡≡≡≡‖------------------------»
// | | chain --> ↑__ current epoch
// | archived epochs ___↑
// ↑________ CompactionBoundary
2021-03-02 00:47:21 +00:00
//
// === :: cold (already archived)
// ≡≡≡ :: to be archived in this compaction
// --- :: hot
2021-07-04 19:12:51 +00:00
CompactionThreshold = 5 * build . Finality
2021-03-02 00:47:21 +00:00
2021-03-03 09:15:26 +00:00
// CompactionBoundary is the number of epochs from the current epoch at which
2021-03-13 10:00:28 +00:00
// we will walk the chain for live objects.
2021-06-21 11:50:03 +00:00
CompactionBoundary = 4 * build . Finality
2021-03-14 10:32:05 +00:00
2021-07-04 16:21:00 +00:00
// CompactionLookback is the number of epochs from the current epoch at which
// we will consider marking an old block reference.
CompactionLookback = 2 * build . Finality
2021-03-14 10:32:05 +00:00
// SyncGapTime is the time delay from a tipset's min timestamp before we decide
// there is a sync gap
2021-06-14 18:33:53 +00:00
SyncGapTime = time . Minute
2021-02-26 14:59:03 +00:00
)
2020-11-24 17:26:28 +00:00
2021-03-01 16:41:51 +00:00
var (
2021-03-02 00:47:21 +00:00
// baseEpochKey stores the base epoch (last compaction epoch) in the
// metadata store.
baseEpochKey = dstore . NewKey ( "/splitstore/baseEpoch" )
// warmupEpochKey stores whether a hot store warmup has been performed.
// On first start, the splitstore will walk the state tree and will copy
// all active blocks into the hotstore.
2021-03-01 16:41:51 +00:00
warmupEpochKey = dstore . NewKey ( "/splitstore/warmupEpoch" )
2021-03-02 08:04:02 +00:00
2021-03-05 08:00:17 +00:00
// markSetSizeKey stores the current estimate for the mark set size.
// this is first computed at warmup and updated in every compaction
markSetSizeKey = dstore . NewKey ( "/splitstore/markSetSize" )
2021-03-02 08:04:02 +00:00
log = logging . Logger ( "splitstore" )
2021-06-22 07:10:24 +00:00
2021-07-03 09:02:36 +00:00
// used to signal end of walk
errStopWalk = errors . New ( "stop walk" )
2021-06-22 07:10:24 +00:00
// set this to true if you are debugging the splitstore to enable debug logging
enableDebugLog = false
2021-06-29 13:15:45 +00:00
// set this to true if you want to track origin stack traces in the write log
enableDebugLogWriteTraces = false
2021-03-01 16:41:51 +00:00
)
2020-11-24 17:41:07 +00:00
2021-03-02 16:59:00 +00:00
const (
batchSize = 16384
defaultColdPurgeSize = 7_000_000
)
2021-03-01 18:30:15 +00:00
2021-02-27 13:20:14 +00:00
type Config struct {
2021-03-02 00:47:21 +00:00
// MarkSetType is the type of mark set to use.
//
2021-07-07 13:39:37 +00:00
// Only current sane value is "mapts", but we may add an option for a disk-backed
// markset for memory-constrained situations.
2021-03-02 00:47:21 +00:00
MarkSetType string
2021-06-16 11:07:10 +00:00
2021-07-04 09:23:30 +00:00
// DiscardColdBlocks indicates whether to skip moving cold blocks to the coldstore.
2021-07-02 19:34:00 +00:00
// If the splitstore is running with a noop coldstore then this option is set to true
// which skips moving (as it is a noop, but still takes time to read all the cold objects)
// and directly purges cold blocks.
2021-07-04 06:53:58 +00:00
DiscardColdBlocks bool
2021-02-27 13:20:14 +00:00
}
2021-03-01 17:38:02 +00:00
// ChainAccessor allows the Splitstore to access the chain. It will most likely
// be a ChainStore at runtime.
type ChainAccessor interface {
GetTipsetByHeight ( context . Context , abi . ChainEpoch , * types . TipSet , bool ) ( * types . TipSet , error )
GetHeaviestTipSet ( ) * types . TipSet
SubscribeHeadChanges ( change func ( revert [ ] * types . TipSet , apply [ ] * types . TipSet ) error )
}
2020-11-24 14:51:00 +00:00
type SplitStore struct {
2021-03-05 08:11:54 +00:00
compacting int32 // compaction (or warmp up) in progress
critsection int32 // compaction critical section
2021-07-06 12:09:04 +00:00
closing int32 // the splitstore is closing
2020-11-29 13:10:30 +00:00
2021-06-16 11:07:10 +00:00
cfg * Config
2021-07-07 06:55:25 +00:00
mx sync . Mutex
2021-07-07 08:31:27 +00:00
warmupEpoch abi . ChainEpoch // protected by mx
baseEpoch abi . ChainEpoch // protected by compaction lock
2020-11-29 13:10:30 +00:00
2021-03-02 16:59:00 +00:00
coldPurgeSize int
2021-07-04 06:53:58 +00:00
chain ChainAccessor
ds dstore . Datastore
hot bstore . Blockstore
cold bstore . Blockstore
2020-11-24 17:26:28 +00:00
2021-07-03 13:10:37 +00:00
markSetEnv MarkSetEnv
2021-03-02 00:47:21 +00:00
markSetSize int64
2021-06-17 09:32:32 +00:00
ctx context . Context
cancel func ( )
2021-06-21 12:17:00 +00:00
debug * debugLog
2021-06-25 07:07:45 +00:00
2021-07-06 12:09:04 +00:00
// transactional protection for concurrent read/writes during compaction
2021-07-04 15:37:53 +00:00
txnLk sync . RWMutex
txnActive bool
2021-07-04 16:21:00 +00:00
txnLookbackEpoch abi . ChainEpoch
2021-07-04 15:37:53 +00:00
txnProtect MarkSet
txnRefsMx sync . Mutex
txnRefs map [ cid . Cid ] struct { }
2021-07-05 13:08:08 +00:00
txnMissing map [ cid . Cid ] struct { }
2020-11-24 14:51:00 +00:00
}
2020-11-25 07:07:48 +00:00
var _ bstore . Blockstore = ( * SplitStore ) ( nil )
2020-11-24 14:51:00 +00:00
2021-03-01 17:39:00 +00:00
// Open opens an existing splistore, or creates a new splitstore. The splitstore
// is backed by the provided hot and cold stores. The returned SplitStore MUST be
// attached to the ChainStore with Start in order to trigger compaction.
func Open ( path string , ds dstore . Datastore , hot , cold bstore . Blockstore , cfg * Config ) ( * SplitStore , error ) {
2021-07-04 06:53:58 +00:00
// hot blockstore must support BlockstoreIterator
if _ , ok := hot . ( bstore . BlockstoreIterator ) ; ! ok {
return nil , xerrors . Errorf ( "hot blockstore does not support efficient iteration: %T" , hot )
2020-12-01 15:17:34 +00:00
}
2021-03-02 00:47:21 +00:00
// the markset env
2021-07-04 09:23:30 +00:00
markSetEnv , err := OpenMarkSetEnv ( path , cfg . MarkSetType )
2021-02-27 16:27:58 +00:00
if err != nil {
return nil , err
2020-12-01 15:17:34 +00:00
}
// and now we can make a SplitStore
ss := & SplitStore {
2021-07-03 13:10:37 +00:00
cfg : cfg ,
ds : ds ,
hot : hot ,
cold : cold ,
markSetEnv : markSetEnv ,
2021-02-27 13:20:14 +00:00
2021-03-02 16:59:00 +00:00
coldPurgeSize : defaultColdPurgeSize ,
}
2021-06-17 09:32:32 +00:00
ss . ctx , ss . cancel = context . WithCancel ( context . Background ( ) )
2021-06-22 07:10:24 +00:00
if enableDebugLog {
ss . debug , err = openDebugLog ( path )
if err != nil {
return nil , err
}
}
2020-12-01 15:17:34 +00:00
return ss , nil
}
2020-11-24 14:51:00 +00:00
// Blockstore interface
2021-03-01 17:39:00 +00:00
func ( s * SplitStore ) DeleteBlock ( _ cid . Cid ) error {
2020-11-24 14:51:00 +00:00
// afaict we don't seem to be using this method, so it's not implemented
2020-11-24 22:01:10 +00:00
return errors . New ( "DeleteBlock not implemented on SplitStore; don't do this Luke!" ) //nolint
2020-11-24 14:51:00 +00:00
}
2021-03-02 14:45:45 +00:00
func ( s * SplitStore ) DeleteMany ( _ [ ] cid . Cid ) error {
// afaict we don't seem to be using this method, so it's not implemented
return errors . New ( "DeleteMany not implemented on SplitStore; don't do this Luke!" ) //nolint
}
2021-07-07 13:34:02 +00:00
func ( s * SplitStore ) Has ( cid cid . Cid ) ( bool , error ) {
2021-06-25 07:07:45 +00:00
s . txnLk . RLock ( )
defer s . txnLk . RUnlock ( )
2021-07-07 13:34:02 +00:00
has , err := s . hot . Has ( cid )
2020-11-24 14:51:00 +00:00
2021-07-02 06:36:15 +00:00
if err != nil {
return has , err
}
if has {
2021-07-07 13:34:02 +00:00
err = s . trackTxnRef ( cid )
if err != nil {
log . Warnf ( "error tracking reference to %s: %s" , cid , err )
}
return true , nil
2020-11-24 14:51:00 +00:00
}
2021-07-07 13:34:02 +00:00
return s . cold . Has ( cid )
2020-11-24 14:51:00 +00:00
}
func ( s * SplitStore ) Get ( cid cid . Cid ) ( blocks . Block , error ) {
2021-06-25 07:07:45 +00:00
s . txnLk . RLock ( )
defer s . txnLk . RUnlock ( )
2020-11-24 14:51:00 +00:00
blk , err := s . hot . Get ( cid )
switch err {
case nil :
2021-07-04 17:49:39 +00:00
err = s . trackTxnRef ( cid )
2021-07-07 13:34:02 +00:00
if err != nil {
log . Warnf ( "error tracking reference to %s: %s" , cid , err )
}
return blk , nil
2020-11-24 14:51:00 +00:00
case bstore . ErrNotFound :
2021-07-04 06:53:58 +00:00
if s . debug != nil {
s . mx . Lock ( )
warm := s . warmupEpoch > 0
s . mx . Unlock ( )
if warm {
2021-07-06 14:13:38 +00:00
s . debug . LogReadMiss ( cid )
2021-07-04 06:53:58 +00:00
}
2021-06-22 07:56:35 +00:00
}
2021-06-21 12:17:00 +00:00
2021-03-05 09:54:06 +00:00
blk , err = s . cold . Get ( cid )
2021-03-05 10:04:40 +00:00
if err == nil {
2021-03-05 09:54:06 +00:00
stats . Record ( context . Background ( ) , metrics . SplitstoreMiss . M ( 1 ) )
2021-06-21 12:17:00 +00:00
2021-03-05 09:54:06 +00:00
}
return blk , err
2020-11-24 14:51:00 +00:00
default :
return nil , err
}
}
func ( s * SplitStore ) GetSize ( cid cid . Cid ) ( int , error ) {
2021-06-25 07:07:45 +00:00
s . txnLk . RLock ( )
defer s . txnLk . RUnlock ( )
2020-11-24 14:51:00 +00:00
size , err := s . hot . GetSize ( cid )
switch err {
case nil :
2021-07-04 17:49:39 +00:00
err = s . trackTxnRef ( cid )
2021-07-07 13:34:02 +00:00
if err != nil {
log . Warnf ( "error tracking reference to %s: %s" , cid , err )
}
return size , nil
2020-11-24 14:51:00 +00:00
case bstore . ErrNotFound :
2021-07-04 06:53:58 +00:00
if s . debug != nil {
s . mx . Lock ( )
warm := s . warmupEpoch > 0
s . mx . Unlock ( )
if warm {
2021-07-06 14:13:38 +00:00
s . debug . LogReadMiss ( cid )
2021-07-04 06:53:58 +00:00
}
2021-06-22 07:56:35 +00:00
}
2021-06-21 12:17:00 +00:00
2021-03-05 09:54:06 +00:00
size , err = s . cold . GetSize ( cid )
2021-03-05 10:04:40 +00:00
if err == nil {
2021-03-05 09:54:06 +00:00
stats . Record ( context . Background ( ) , metrics . SplitstoreMiss . M ( 1 ) )
}
return size , err
2020-11-24 14:51:00 +00:00
default :
return 0 , err
}
}
func ( s * SplitStore ) Put ( blk blocks . Block ) error {
2021-06-25 07:07:45 +00:00
s . txnLk . RLock ( )
defer s . txnLk . RUnlock ( )
2021-07-02 08:37:35 +00:00
err := s . hot . Put ( blk )
2021-07-07 13:34:02 +00:00
if err != nil {
return err
}
2021-07-06 14:13:38 +00:00
2021-07-07 13:34:02 +00:00
s . debug . LogWrite ( blk )
err = s . trackTxnRef ( blk . Cid ( ) )
if err != nil {
log . Warnf ( "error tracking reference to %s: %s" , blk . Cid ( ) , err )
2021-06-28 12:21:51 +00:00
}
2021-07-07 13:34:02 +00:00
return nil
2020-11-24 14:51:00 +00:00
}
func ( s * SplitStore ) PutMany ( blks [ ] blocks . Block ) error {
batch := make ( [ ] cid . Cid , 0 , len ( blks ) )
for _ , blk := range blks {
batch = append ( batch , blk . Cid ( ) )
}
2021-06-25 07:07:45 +00:00
s . txnLk . RLock ( )
defer s . txnLk . RUnlock ( )
2021-07-02 08:37:35 +00:00
err := s . hot . PutMany ( blks )
2021-07-07 13:34:02 +00:00
if err != nil {
return err
}
2021-07-04 06:53:58 +00:00
2021-07-07 13:34:02 +00:00
s . debug . LogWriteMany ( blks )
err = s . trackTxnRefMany ( batch )
if err != nil {
log . Warnf ( "error tracking reference to batch: %s" , err )
2021-06-28 12:21:51 +00:00
}
2021-07-07 13:34:02 +00:00
return nil
2020-11-24 14:51:00 +00:00
}
func ( s * SplitStore ) AllKeysChan ( ctx context . Context ) ( <- chan cid . Cid , error ) {
ctx , cancel := context . WithCancel ( ctx )
chHot , err := s . hot . AllKeysChan ( ctx )
if err != nil {
2020-11-24 22:01:10 +00:00
cancel ( )
2020-11-24 14:51:00 +00:00
return nil , err
}
chCold , err := s . cold . AllKeysChan ( ctx )
if err != nil {
cancel ( )
return nil , err
}
ch := make ( chan cid . Cid )
go func ( ) {
defer cancel ( )
2020-11-25 07:07:06 +00:00
defer close ( ch )
2020-11-24 14:51:00 +00:00
for _ , in := range [ ] <- chan cid . Cid { chHot , chCold } {
for cid := range in {
select {
case ch <- cid :
case <- ctx . Done ( ) :
return
}
}
}
} ( )
return ch , nil
}
func ( s * SplitStore ) HashOnRead ( enabled bool ) {
s . hot . HashOnRead ( enabled )
s . cold . HashOnRead ( enabled )
}
func ( s * SplitStore ) View ( cid cid . Cid , cb func ( [ ] byte ) error ) error {
2021-06-25 07:07:45 +00:00
s . txnLk . RLock ( )
defer s . txnLk . RUnlock ( )
2020-11-24 14:51:00 +00:00
err := s . hot . View ( cid , cb )
switch err {
2021-06-25 07:07:45 +00:00
case nil :
2021-07-04 17:49:39 +00:00
err = s . trackTxnRef ( cid )
2021-07-07 13:34:02 +00:00
if err != nil {
log . Warnf ( "error tracking reference to %s: %s" , cid , err )
}
return nil
2021-06-25 07:07:45 +00:00
2020-11-24 14:51:00 +00:00
case bstore . ErrNotFound :
2021-07-04 06:53:58 +00:00
if s . debug != nil {
s . mx . Lock ( )
warm := s . warmupEpoch > 0
s . mx . Unlock ( )
if warm {
2021-07-06 14:13:38 +00:00
s . debug . LogReadMiss ( cid )
2021-07-04 06:53:58 +00:00
}
2021-06-22 07:56:35 +00:00
}
2021-06-21 12:17:00 +00:00
2021-03-16 20:05:22 +00:00
err = s . cold . View ( cid , cb )
if err == nil {
stats . Record ( context . Background ( ) , metrics . SplitstoreMiss . M ( 1 ) )
}
return err
2020-11-24 14:51:00 +00:00
default :
return err
}
}
2020-11-24 17:15:38 +00:00
2020-11-24 17:26:28 +00:00
// State tracking
2021-03-01 17:38:02 +00:00
func ( s * SplitStore ) Start ( chain ChainAccessor ) error {
s . chain = chain
2021-07-07 06:55:25 +00:00
curTs := chain . GetHeaviestTipSet ( )
2020-11-24 17:41:07 +00:00
// load base epoch from metadata ds
// if none, then use current epoch because it's a fresh start
bs , err := s . ds . Get ( baseEpochKey )
switch err {
case nil :
2020-11-26 18:37:02 +00:00
s . baseEpoch = bytesToEpoch ( bs )
2020-11-24 17:41:07 +00:00
case dstore . ErrNotFound :
2021-07-07 06:55:25 +00:00
if curTs == nil {
2020-12-01 15:56:22 +00:00
// this can happen in some tests
break
}
2021-07-07 06:55:25 +00:00
err = s . setBaseEpoch ( curTs . Height ( ) )
2020-11-24 17:41:07 +00:00
if err != nil {
2021-03-01 16:41:51 +00:00
return xerrors . Errorf ( "error saving base epoch: %w" , err )
2020-11-24 17:41:07 +00:00
}
default :
2021-03-01 16:41:51 +00:00
return xerrors . Errorf ( "error loading base epoch: %w" , err )
}
// load warmup epoch from metadata ds
bs , err = s . ds . Get ( warmupEpochKey )
switch err {
case nil :
s . warmupEpoch = bytesToEpoch ( bs )
case dstore . ErrNotFound :
2021-07-06 12:09:04 +00:00
// the hotstore hasn't warmed up, start a concurrent warm up
2021-07-07 06:55:25 +00:00
err = s . warmup ( curTs )
2021-06-16 10:58:16 +00:00
if err != nil {
2021-06-16 17:45:33 +00:00
return xerrors . Errorf ( "error warming up: %w" , err )
2021-06-16 10:58:16 +00:00
}
2021-03-01 16:41:51 +00:00
default :
return xerrors . Errorf ( "error loading warmup epoch: %w" , err )
2020-11-24 17:41:07 +00:00
}
2021-07-06 12:09:04 +00:00
// load markSetSize from metadata ds to provide a size hint for marksets
2021-03-05 08:00:17 +00:00
bs , err = s . ds . Get ( markSetSizeKey )
switch err {
case nil :
s . markSetSize = bytesToInt64 ( bs )
case dstore . ErrNotFound :
default :
return xerrors . Errorf ( "error loading mark set size: %w" , err )
}
2021-07-04 06:53:58 +00:00
log . Infow ( "starting splitstore" , "baseEpoch" , s . baseEpoch , "warmupEpoch" , s . warmupEpoch )
2021-06-17 09:32:32 +00:00
2020-11-24 17:41:07 +00:00
// watch the chain
2021-03-01 17:38:02 +00:00
chain . SubscribeHeadChanges ( s . HeadChange )
2020-11-24 17:41:07 +00:00
return nil
2020-11-24 17:26:28 +00:00
}
2020-11-26 15:49:47 +00:00
func ( s * SplitStore ) Close ( ) error {
2021-03-05 08:11:54 +00:00
atomic . StoreInt32 ( & s . closing , 1 )
if atomic . LoadInt32 ( & s . critsection ) == 1 {
log . Warn ( "ongoing compaction in critical section; waiting for it to finish..." )
for atomic . LoadInt32 ( & s . critsection ) == 1 {
2020-11-26 15:49:47 +00:00
time . Sleep ( time . Second )
}
}
2021-06-17 09:32:32 +00:00
s . cancel ( )
2021-07-04 06:53:58 +00:00
return multierr . Combine ( s . markSetEnv . Close ( ) , s . debug . Close ( ) )
2020-11-26 15:49:47 +00:00
}
2021-03-01 17:38:02 +00:00
func ( s * SplitStore ) HeadChange ( _ , apply [ ] * types . TipSet ) error {
2021-04-29 02:55:18 +00:00
// Revert only.
if len ( apply ) == 0 {
return nil
}
2021-03-01 16:47:47 +00:00
curTs := apply [ len ( apply ) - 1 ]
epoch := curTs . Height ( )
2020-11-29 10:48:52 +00:00
2021-07-06 23:11:37 +00:00
if ! atomic . CompareAndSwapInt32 ( & s . compacting , 0 , 1 ) {
// we are currently compacting -- protect the new tipset(s)
s . protectTipSets ( apply )
2021-06-16 16:58:18 +00:00
return nil
2021-03-14 10:32:05 +00:00
}
2021-07-06 23:11:37 +00:00
timestamp := time . Unix ( int64 ( curTs . MinTimestamp ( ) ) , 0 )
if time . Since ( timestamp ) > SyncGapTime {
// don't attempt compaction before we have caught up syncing
atomic . StoreInt32 ( & s . compacting , 0 )
2020-11-29 13:10:30 +00:00
return nil
}
if epoch - s . baseEpoch > CompactionThreshold {
2021-07-06 22:39:58 +00:00
// it's time to compact -- prepare the transaction and go!
2021-07-07 06:52:31 +00:00
s . beginTxnProtect ( curTs )
2020-11-24 17:26:28 +00:00
go func ( ) {
2020-11-29 13:10:30 +00:00
defer atomic . StoreInt32 ( & s . compacting , 0 )
2021-07-06 22:39:58 +00:00
defer s . endTxnProtect ( )
2020-11-25 08:11:42 +00:00
log . Info ( "compacting splitstore" )
start := time . Now ( )
2021-06-21 11:53:56 +00:00
s . compact ( curTs )
2020-11-25 08:11:42 +00:00
log . Infow ( "compaction done" , "took" , time . Since ( start ) )
2020-11-24 17:26:28 +00:00
} ( )
2020-11-29 13:10:30 +00:00
} else {
// no compaction necessary
atomic . StoreInt32 ( & s . compacting , 0 )
2020-11-24 17:26:28 +00:00
}
return nil
}
2021-07-06 23:11:37 +00:00
// transactionally protect incoming tipsets
func ( s * SplitStore ) protectTipSets ( apply [ ] * types . TipSet ) {
s . txnLk . RLock ( )
if ! s . txnActive {
s . txnLk . RUnlock ( )
return
}
// do this in a goroutine to avoid blocking the notifier
go func ( ) {
defer s . txnLk . RUnlock ( )
var cids [ ] cid . Cid
for _ , ts := range apply {
cids = append ( cids , ts . Cids ( ) ... )
}
err := s . trackTxnRefMany ( cids )
if err != nil {
log . Errorf ( "error protecting newly applied tipsets: %s" , err )
}
} ( )
}
2021-07-06 12:09:04 +00:00
// transactionally protect a reference to an object
2021-07-04 17:49:39 +00:00
func ( s * SplitStore ) trackTxnRef ( c cid . Cid ) error {
2021-07-04 06:53:58 +00:00
if ! s . txnActive {
2021-07-02 11:17:10 +00:00
// not compacting
2021-07-02 17:57:46 +00:00
return nil
2021-07-02 11:17:10 +00:00
}
2021-07-04 06:53:58 +00:00
if s . txnRefs != nil {
// we haven't finished marking yet, so track the reference
2021-07-04 07:10:37 +00:00
s . txnRefsMx . Lock ( )
2021-07-04 06:53:58 +00:00
s . txnRefs [ c ] = struct { } { }
2021-07-04 07:10:37 +00:00
s . txnRefsMx . Unlock ( )
2021-07-04 06:53:58 +00:00
return nil
}
// we have finished marking, protect the reference
2021-07-04 17:49:39 +00:00
return s . doTxnProtect ( c , nil )
2021-07-04 16:21:00 +00:00
}
2021-07-06 12:09:04 +00:00
// transactionally protect a batch of references
2021-07-04 16:33:49 +00:00
func ( s * SplitStore ) trackTxnRefMany ( cids [ ] cid . Cid ) error {
if ! s . txnActive {
// not compacting
return nil
}
if s . txnRefs != nil {
2021-07-06 12:09:04 +00:00
// we haven't finished marking yet, so track the references
2021-07-04 16:33:49 +00:00
s . txnRefsMx . Lock ( )
for _ , c := range cids {
s . txnRefs [ c ] = struct { } { }
}
s . txnRefsMx . Unlock ( )
return nil
}
// we have finished marking, protect the refs
batch := make ( map [ cid . Cid ] struct { } , len ( cids ) )
for _ , c := range cids {
batch [ c ] = struct { } { }
}
for _ , c := range cids {
err := s . doTxnProtect ( c , batch )
if err != nil {
return err
}
}
return nil
}
2021-07-06 12:09:04 +00:00
// transactionally protect a reference by walking the object and marking.
// concurrent markings are short circuited by checking the markset.
2021-07-04 16:33:49 +00:00
func ( s * SplitStore ) doTxnProtect ( root cid . Cid , batch map [ cid . Cid ] struct { } ) error {
2021-07-04 17:49:39 +00:00
// Note: cold objects are deleted heaviest first, so the consituents of an object
// cannot be deleted before the object itself.
2021-07-05 07:22:52 +00:00
err := s . walkObjectIncomplete ( root , cid . NewSet ( ) ,
2021-07-04 16:21:00 +00:00
func ( c cid . Cid ) error {
2021-07-05 08:32:52 +00:00
if isFilCommitment ( c ) {
return errStopWalk
}
2021-07-04 16:33:49 +00:00
if c != root {
_ , ok := batch [ c ]
if ok {
// it's on the same batch, stop walk
return errStopWalk
}
}
2021-07-05 17:10:47 +00:00
mark , err := s . txnProtect . Has ( c )
2021-07-04 16:21:00 +00:00
if err != nil {
return xerrors . Errorf ( "error checking mark set for %s: %w" , c , err )
}
// it's marked, nothing to do
if mark {
return errStopWalk
}
// old block reference -- see comment in doCompact about the necessity of this
isOldBlock , err := s . isOldBlockHeader ( c , s . txnLookbackEpoch )
if err != nil {
return xerrors . Errorf ( "error checking object type for %s: %w" , c , err )
}
if isOldBlock {
return errStopWalk
}
return s . txnProtect . Mark ( c )
2021-07-05 07:22:52 +00:00
} ,
func ( c cid . Cid ) error {
2021-07-05 15:16:54 +00:00
log . Warnf ( "missing object reference %s in %s" , c , root )
2021-07-05 13:08:08 +00:00
if s . txnMissing != nil {
s . txnRefsMx . Lock ( )
s . txnMissing [ c ] = struct { } { }
s . txnRefsMx . Unlock ( )
}
2021-07-05 07:22:52 +00:00
return errStopWalk
2021-07-04 16:21:00 +00:00
} )
2021-07-02 17:57:46 +00:00
2021-07-04 17:49:39 +00:00
if err != nil {
log . Warnf ( "error protecting object (cid: %s): %s" , root , err )
}
2021-07-04 15:37:53 +00:00
2021-07-04 17:49:39 +00:00
return err
2021-07-02 11:17:10 +00:00
}
2021-07-06 12:09:04 +00:00
// warmup acuiqres the compaction lock and spawns a goroutine to warm up the hotstore;
// this is necessary when we sync from a snapshot or when we enable the splitstore
// on top of an existing blockstore (which becomes the coldstore).
2021-06-16 17:45:33 +00:00
func ( s * SplitStore ) warmup ( curTs * types . TipSet ) error {
if ! atomic . CompareAndSwapInt32 ( & s . compacting , 0 , 1 ) {
return xerrors . Errorf ( "error locking compaction" )
}
go func ( ) {
defer atomic . StoreInt32 ( & s . compacting , 0 )
log . Info ( "warming up hotstore" )
start := time . Now ( )
2021-07-04 06:53:58 +00:00
err := s . doWarmup ( curTs )
2021-06-16 17:45:33 +00:00
if err != nil {
log . Errorf ( "error warming up hotstore: %s" , err )
return
}
log . Infow ( "warm up done" , "took" , time . Since ( start ) )
} ( )
return nil
}
2021-07-06 12:09:04 +00:00
// the actual warmup procedure; it waslk the chain loading all state roots at the boundary
// and headers all the way up to genesis.
// objects are written in batches so as to minimize overhead.
2021-06-16 17:45:33 +00:00
func ( s * SplitStore ) doWarmup ( curTs * types . TipSet ) error {
2021-03-01 16:41:51 +00:00
epoch := curTs . Height ( )
2021-03-01 18:11:35 +00:00
batchHot := make ( [ ] blocks . Block , 0 , batchSize )
2021-03-01 16:49:20 +00:00
count := int64 ( 0 )
2021-03-13 10:00:28 +00:00
xcount := int64 ( 0 )
missing := int64 ( 0 )
2021-07-04 06:53:58 +00:00
err := s . walkChain ( curTs , epoch , false ,
2021-07-05 08:32:52 +00:00
func ( c cid . Cid ) error {
if isFilCommitment ( c ) {
return errStopWalk
}
2021-03-01 16:49:20 +00:00
count ++
2021-07-05 08:32:52 +00:00
has , err := s . hot . Has ( c )
2021-03-01 16:41:51 +00:00
if err != nil {
return err
}
if has {
return nil
}
2021-07-05 08:32:52 +00:00
blk , err := s . cold . Get ( c )
2021-03-01 16:41:51 +00:00
if err != nil {
2021-03-13 10:00:28 +00:00
if err == bstore . ErrNotFound {
missing ++
return nil
}
2021-03-01 16:41:51 +00:00
return err
}
2021-03-13 10:00:28 +00:00
xcount ++
2021-03-01 18:11:35 +00:00
batchHot = append ( batchHot , blk )
if len ( batchHot ) == batchSize {
err = s . hot . PutMany ( batchHot )
if err != nil {
return err
}
batchHot = batchHot [ : 0 ]
2021-03-01 16:41:51 +00:00
}
2021-03-01 18:11:35 +00:00
return nil
2021-03-01 16:41:51 +00:00
} )
if err != nil {
2021-03-13 10:00:28 +00:00
return err
2021-03-01 16:41:51 +00:00
}
2021-03-01 18:11:35 +00:00
if len ( batchHot ) > 0 {
err = s . hot . PutMany ( batchHot )
if err != nil {
2021-03-13 10:00:28 +00:00
return err
2021-03-01 18:11:35 +00:00
}
}
2021-03-14 10:32:05 +00:00
log . Infow ( "warmup stats" , "visited" , count , "warm" , xcount , "missing" , missing )
2021-03-13 10:00:28 +00:00
2021-07-05 08:51:22 +00:00
s . markSetSize = count + count >> 2 // overestimate a bit
2021-03-05 08:00:17 +00:00
err = s . ds . Put ( markSetSizeKey , int64ToBytes ( s . markSetSize ) )
if err != nil {
2021-03-13 10:00:28 +00:00
log . Warnf ( "error saving mark set size: %s" , err )
2021-03-05 08:00:17 +00:00
}
2021-03-13 10:00:28 +00:00
2021-06-16 21:21:16 +00:00
// save the warmup epoch
err = s . ds . Put ( warmupEpochKey , epochToBytes ( epoch ) )
if err != nil {
return xerrors . Errorf ( "error saving warm up epoch: %w" , err )
}
2021-07-07 08:31:27 +00:00
s . mx . Lock ( )
2021-06-16 21:21:16 +00:00
s . warmupEpoch = epoch
2021-07-07 08:31:27 +00:00
s . mx . Unlock ( )
2021-06-16 21:21:16 +00:00
2021-03-13 10:00:28 +00:00
return nil
2021-03-01 16:41:51 +00:00
}
2021-07-06 12:09:04 +00:00
// --- Compaction ---
// Compaction works transactionally with the following algorithm:
// - We prepare a transaction, whereby all i/o referenced objects through the API are tracked.
// - We walk the chain and mark reachable objects, keeping 4 finalities of state roots and messages and all headers all the way to genesis.
// - Once the chain walk is complete, we begin full transaction protection with concurrent marking; we walk and mark all references created during the chain walk. On the same time, all I/O through the API concurrently marks objects as live references.
// - We collect cold objects by iterating through the hotstore and checking the mark set; if an object is not marked, then it is candidate for purge.
// - When running with a coldstore, we next copy all cold objects to the coldstore.
// - At this point we are ready to begin purging:
// - We sort cold objects heaviest first, so as to never delete the consituents of a DAG before the DAG itself (which would leave dangling references)
// - We delete in small batches taking a lock; each batch is checked again for marks, from the concurrent transactional mark, so as to never delete anything live
// - We then end the transaction and compact/gc the hotstore.
2021-06-21 11:53:56 +00:00
func ( s * SplitStore ) compact ( curTs * types . TipSet ) {
2021-03-05 09:54:06 +00:00
start := time . Now ( )
2021-07-04 06:53:58 +00:00
err := s . doCompact ( curTs )
2021-03-05 09:54:06 +00:00
took := time . Since ( start ) . Milliseconds ( )
stats . Record ( context . Background ( ) , metrics . SplitstoreCompactionTimeSeconds . M ( float64 ( took ) / 1e3 ) )
2021-03-05 08:29:49 +00:00
if err != nil {
log . Errorf ( "COMPACTION ERROR: %s" , err )
2021-02-27 13:20:14 +00:00
}
}
2021-06-21 11:53:56 +00:00
func ( s * SplitStore ) doCompact ( curTs * types . TipSet ) error {
2021-03-03 09:15:26 +00:00
currentEpoch := curTs . Height ( )
boundaryEpoch := currentEpoch - CompactionBoundary
2021-07-04 16:21:00 +00:00
lookbackEpoch := currentEpoch - CompactionLookback
2021-02-27 16:47:13 +00:00
2021-07-04 16:21:00 +00:00
log . Infow ( "running compaction" , "currentEpoch" , currentEpoch , "baseEpoch" , s . baseEpoch , "boundaryEpoch" , boundaryEpoch , "lookbackEpoch" , lookbackEpoch )
2021-02-27 13:20:14 +00:00
2021-07-03 13:10:37 +00:00
markSet , err := s . markSetEnv . Create ( "live" , s . markSetSize )
2021-02-27 16:27:58 +00:00
if err != nil {
2021-03-05 08:29:49 +00:00
return xerrors . Errorf ( "error creating mark set: %w" , err )
2021-02-27 16:27:58 +00:00
}
2021-03-14 10:32:05 +00:00
defer markSet . Close ( ) //nolint:errcheck
2021-06-25 07:07:45 +00:00
defer s . debug . Flush ( )
2021-07-08 07:13:44 +00:00
if err := s . checkClosing ( ) ; err != nil {
return err
}
2021-07-06 12:09:04 +00:00
// 1. mark reachable objects by walking the chain from the current epoch; we keep state roots
// and messages until the boundary epoch.
2021-07-05 08:41:09 +00:00
log . Info ( "marking reachable objects" )
2021-02-27 16:27:58 +00:00
startMark := time . Now ( )
2021-03-02 00:47:21 +00:00
var count int64
2021-07-04 06:53:58 +00:00
err = s . walkChain ( curTs , boundaryEpoch , true ,
2021-07-03 13:10:37 +00:00
func ( c cid . Cid ) error {
2021-07-05 08:32:52 +00:00
if isFilCommitment ( c ) {
return errStopWalk
}
2021-02-28 19:35:18 +00:00
count ++
2021-07-03 13:10:37 +00:00
return markSet . Mark ( c )
2021-02-27 16:27:58 +00:00
} )
if err != nil {
2021-07-05 08:41:09 +00:00
return xerrors . Errorf ( "error marking: %w" , err )
2021-02-27 16:27:58 +00:00
}
2021-07-05 08:51:22 +00:00
s . markSetSize = count + count >> 2 // overestimate a bit
2021-02-28 19:35:18 +00:00
2021-03-13 10:00:28 +00:00
log . Infow ( "marking done" , "took" , time . Since ( startMark ) , "marked" , count )
2021-02-27 16:27:58 +00:00
2021-07-08 07:13:44 +00:00
if err := s . checkClosing ( ) ; err != nil {
return err
}
2021-07-06 22:39:58 +00:00
// begin transactional protection with concurrent marking and fetch references created while marking
2021-07-07 06:52:31 +00:00
txnRefs := s . beginTxnConcurrentMarking ( markSet )
2021-07-03 13:10:37 +00:00
2021-07-04 06:53:58 +00:00
// 1.1 Update markset for references created during marking
2021-07-04 15:37:53 +00:00
if len ( txnRefs ) > 0 {
2021-07-05 10:30:31 +00:00
log . Infow ( "updating mark set for live references" , "refs" , len ( txnRefs ) )
2021-07-04 15:37:53 +00:00
startMark = time . Now ( )
walked := cid . NewSet ( )
count = 0
2021-07-04 03:21:04 +00:00
2021-07-04 15:37:53 +00:00
for c := range txnRefs {
2021-07-08 07:13:44 +00:00
if err := s . checkClosing ( ) ; err != nil {
return err
}
2021-07-05 08:38:53 +00:00
if isFilCommitment ( c ) {
continue
}
2021-07-03 13:10:37 +00:00
mark , err := markSet . Has ( c )
if err != nil {
2021-07-04 06:53:58 +00:00
return xerrors . Errorf ( "error checking markset for %s: %w" , c , err )
2021-07-03 13:10:37 +00:00
}
if mark {
2021-07-04 15:37:53 +00:00
continue
2021-07-03 13:10:37 +00:00
}
2021-07-05 07:22:52 +00:00
err = s . walkObjectIncomplete ( c , walked ,
2021-07-04 15:37:53 +00:00
func ( c cid . Cid ) error {
2021-07-05 08:32:52 +00:00
if isFilCommitment ( c ) {
return errStopWalk
}
2021-07-04 15:37:53 +00:00
mark , err := markSet . Has ( c )
if err != nil {
return xerrors . Errorf ( "error checking markset for %s: %w" , c , err )
}
2021-07-03 13:10:37 +00:00
2021-07-04 15:37:53 +00:00
if mark {
return errStopWalk
}
// we also short-circuit on old blocks, as these can come from a network request
// and cause us to fail because we have purged its consistituents (or missing from
// the beginning in case of snapshot sync, e.g. parent message receipts or old messages)
// if these blocks are on our chain, they would have been marked but they might be
// from a fork.
//
// Ideally, we would have API options to preclude us from trcking references to such
// objects, but we don't so we have to do this check
2021-07-04 16:21:00 +00:00
isOldBlock , err := s . isOldBlockHeader ( c , lookbackEpoch )
2021-07-04 15:37:53 +00:00
if err != nil {
return xerrors . Errorf ( "error checking object type for %s: %w" , c , err )
}
if isOldBlock {
return errStopWalk
}
count ++
return markSet . Mark ( c )
2021-07-05 07:22:52 +00:00
} ,
2021-07-05 15:16:54 +00:00
func ( cm cid . Cid ) error {
2021-07-05 20:53:45 +00:00
log . Warnf ( "missing object reference %s in %s" , cm , c ) //nolint
2021-07-05 15:16:54 +00:00
s . txnRefsMx . Lock ( )
s . txnMissing [ cm ] = struct { } { }
s . txnRefsMx . Unlock ( )
2021-07-05 07:22:52 +00:00
return errStopWalk
2021-07-04 15:37:53 +00:00
} )
if err != nil {
2021-07-04 17:49:39 +00:00
return xerrors . Errorf ( "error walking %s for marking: %w" , c , err )
2021-07-04 09:14:29 +00:00
}
2021-07-03 13:10:37 +00:00
}
2021-07-04 09:14:29 +00:00
2021-07-05 15:16:54 +00:00
log . Infow ( "update mark set done" , "took" , time . Since ( startMark ) , "marked" , count )
2021-07-03 13:10:37 +00:00
}
2021-07-08 07:13:44 +00:00
if err := s . checkClosing ( ) ; err != nil {
return err
}
2021-07-04 06:53:58 +00:00
// 2. iterate through the hotstore to collect cold objects
log . Info ( "collecting cold objects" )
startCollect := time . Now ( )
// some stats for logging
var hotCnt , coldCnt int
2021-07-03 13:10:37 +00:00
2021-07-04 06:53:58 +00:00
cold := make ( [ ] cid . Cid , 0 , s . coldPurgeSize )
err = s . hot . ( bstore . BlockstoreIterator ) . ForEachKey ( func ( c cid . Cid ) error {
// was it marked?
2021-07-04 04:00:37 +00:00
mark , err := markSet . Has ( c )
if err != nil {
2021-07-04 06:53:58 +00:00
return xerrors . Errorf ( "error checkiing mark set for %s: %w" , c , err )
2021-07-04 04:00:37 +00:00
}
2021-07-03 13:10:37 +00:00
2021-07-04 04:00:37 +00:00
if mark {
2021-07-04 06:53:58 +00:00
hotCnt ++
return nil
2021-07-03 13:10:37 +00:00
}
2021-07-04 06:53:58 +00:00
// it's cold, mark it as candidate for move
2021-07-03 13:10:37 +00:00
cold = append ( cold , c )
2021-07-04 06:53:58 +00:00
coldCnt ++
return nil
} )
if err != nil {
2021-07-06 12:09:04 +00:00
return xerrors . Errorf ( "error collecting cold objects: %w" , err )
2021-07-03 13:10:37 +00:00
}
2021-07-05 13:08:08 +00:00
log . Infow ( "cold collection done" , "took" , time . Since ( startCollect ) )
2021-07-04 06:53:58 +00:00
if coldCnt > 0 {
s . coldPurgeSize = coldCnt + coldCnt >> 2 // overestimate a bit
}
log . Infow ( "compaction stats" , "hot" , hotCnt , "cold" , coldCnt )
2021-03-05 09:54:06 +00:00
stats . Record ( context . Background ( ) , metrics . SplitstoreCompactionHot . M ( int64 ( hotCnt ) ) )
stats . Record ( context . Background ( ) , metrics . SplitstoreCompactionCold . M ( int64 ( coldCnt ) ) )
2021-02-27 13:20:14 +00:00
2021-07-08 07:13:44 +00:00
if err := s . checkClosing ( ) ; err != nil {
return err
}
2021-07-05 13:08:08 +00:00
// now that we have collected cold objects, check for missing references from transactional i/o
2021-07-05 20:31:37 +00:00
// and disable further collection of such references (they will not be acted upon as we can't
// possibly delete objects we didn't have when we were collecting cold objects)
2021-07-05 17:10:47 +00:00
s . waitForMissingRefs ( )
2021-07-05 13:08:08 +00:00
2021-07-08 07:13:44 +00:00
if err := s . checkClosing ( ) ; err != nil {
return err
}
2021-07-04 06:53:58 +00:00
// 3. copy the cold objects to the coldstore -- if we have one
if ! s . cfg . DiscardColdBlocks {
2021-07-05 08:41:09 +00:00
log . Info ( "moving cold objects to the coldstore" )
2021-07-02 19:34:00 +00:00
startMove := time . Now ( )
err = s . moveColdBlocks ( cold )
if err != nil {
2021-07-05 08:41:09 +00:00
return xerrors . Errorf ( "error moving cold objects: %w" , err )
2021-07-02 19:34:00 +00:00
}
log . Infow ( "moving done" , "took" , time . Since ( startMove ) )
2021-03-02 09:20:39 +00:00
}
2021-07-03 05:13:26 +00:00
2021-07-04 17:17:07 +00:00
// 4. sort cold objects so that the dags with most references are deleted first
2021-07-06 12:09:04 +00:00
// this ensures that we can't refer to a dag with its consituents already deleted, ie
// we lave no dangling references.
2021-07-04 17:17:07 +00:00
log . Info ( "sorting cold objects" )
startSort := time . Now ( )
err = s . sortObjects ( cold )
if err != nil {
return xerrors . Errorf ( "error sorting objects: %w" , err )
}
log . Infow ( "sorting done" , "took" , time . Since ( startSort ) )
2021-07-04 18:21:53 +00:00
// Enter critical section
log . Info ( "entering critical section" )
atomic . StoreInt32 ( & s . critsection , 1 )
defer atomic . StoreInt32 ( & s . critsection , 0 )
// check to see if we are closing first; if that's the case just return
2021-07-08 07:13:44 +00:00
if err := s . checkClosing ( ) ; err != nil {
return err
2021-07-04 18:21:53 +00:00
}
2021-07-04 17:17:07 +00:00
// 5. purge cold objects from the hotstore, taking protected references into account
2021-03-02 09:20:39 +00:00
log . Info ( "purging cold objects from the hotstore" )
startPurge := time . Now ( )
2021-07-06 22:39:58 +00:00
err = s . purge ( cold )
2021-03-02 09:20:39 +00:00
if err != nil {
2021-03-05 08:29:49 +00:00
return xerrors . Errorf ( "error purging cold blocks: %w" , err )
2021-03-02 09:20:39 +00:00
}
2021-07-05 08:41:09 +00:00
log . Infow ( "purging cold objects from hotstore done" , "took" , time . Since ( startPurge ) )
2021-03-02 09:20:39 +00:00
// we are done; do some housekeeping
2021-07-05 20:31:37 +00:00
s . endTxnProtect ( )
2021-03-08 17:46:21 +00:00
s . gcHotstore ( )
2021-03-08 16:12:09 +00:00
2021-07-04 07:06:55 +00:00
err = s . setBaseEpoch ( boundaryEpoch )
2021-03-02 09:20:39 +00:00
if err != nil {
2021-03-05 08:29:49 +00:00
return xerrors . Errorf ( "error saving base epoch: %w" , err )
2021-03-02 09:20:39 +00:00
}
2021-03-05 08:00:17 +00:00
err = s . ds . Put ( markSetSizeKey , int64ToBytes ( s . markSetSize ) )
if err != nil {
2021-03-05 08:29:49 +00:00
return xerrors . Errorf ( "error saving mark set size: %w" , err )
2021-03-05 08:00:17 +00:00
}
2021-03-05 08:29:49 +00:00
return nil
2021-03-02 09:20:39 +00:00
}
2021-07-07 06:52:31 +00:00
func ( s * SplitStore ) beginTxnProtect ( curTs * types . TipSet ) {
2021-07-06 22:39:58 +00:00
lookbackEpoch := curTs . Height ( ) - CompactionLookback
log . Info ( "preparing compaction transaction" )
2021-07-05 20:31:37 +00:00
s . txnLk . Lock ( )
defer s . txnLk . Unlock ( )
s . txnRefs = make ( map [ cid . Cid ] struct { } )
s . txnActive = true
s . txnLookbackEpoch = lookbackEpoch
}
2021-07-07 06:52:31 +00:00
func ( s * SplitStore ) beginTxnConcurrentMarking ( markSet MarkSet ) map [ cid . Cid ] struct { } {
2021-07-05 20:31:37 +00:00
s . txnLk . Lock ( )
defer s . txnLk . Unlock ( )
txnRefs := s . txnRefs
s . txnRefs = nil
s . txnMissing = make ( map [ cid . Cid ] struct { } )
s . txnProtect = markSet
return txnRefs
}
func ( s * SplitStore ) endTxnProtect ( ) {
s . txnLk . Lock ( )
defer s . txnLk . Unlock ( )
2021-07-05 20:51:10 +00:00
if s . txnProtect != nil {
_ = s . txnProtect . Close ( )
}
2021-07-05 20:31:37 +00:00
s . txnActive = false
s . txnProtect = nil
2021-07-05 20:56:31 +00:00
s . txnRefs = nil
2021-07-05 20:31:37 +00:00
s . txnMissing = nil
}
2021-07-04 06:53:58 +00:00
func ( s * SplitStore ) walkChain ( ts * types . TipSet , boundary abi . ChainEpoch , inclMsgs bool ,
2021-07-01 11:06:28 +00:00
f func ( cid . Cid ) error ) error {
2021-07-01 07:29:30 +00:00
visited := cid . NewSet ( )
2021-03-13 10:00:28 +00:00
walked := cid . NewSet ( )
toWalk := ts . Cids ( )
2021-07-01 11:10:57 +00:00
walkCnt := 0
scanCnt := 0
2021-03-13 10:00:28 +00:00
walkBlock := func ( c cid . Cid ) error {
2021-07-01 07:29:30 +00:00
if ! visited . Visit ( c ) {
2021-03-13 10:00:28 +00:00
return nil
}
2021-07-01 11:10:57 +00:00
walkCnt ++
2021-07-01 11:06:28 +00:00
if err := f ( c ) ; err != nil {
return err
2021-07-01 07:29:30 +00:00
}
2021-03-13 10:00:28 +00:00
var hdr types . BlockHeader
2021-07-03 09:02:36 +00:00
err := s . view ( c , func ( data [ ] byte ) error {
return hdr . UnmarshalCBOR ( bytes . NewBuffer ( data ) )
} )
if err != nil {
2021-03-13 10:00:28 +00:00
return xerrors . Errorf ( "error unmarshaling block header (cid: %s): %w" , c , err )
}
2021-07-04 06:53:58 +00:00
// we only scan the block if it is at or above the boundary
2021-07-04 09:46:45 +00:00
if hdr . Height >= boundary || hdr . Height == 0 {
2021-07-01 11:10:57 +00:00
scanCnt ++
2021-07-04 09:46:45 +00:00
if inclMsgs && hdr . Height > 0 {
2021-07-04 06:53:58 +00:00
if err := s . walkObject ( hdr . Messages , walked , f ) ; err != nil {
2021-06-16 11:07:10 +00:00
return xerrors . Errorf ( "error walking messages (cid: %s): %w" , hdr . Messages , err )
}
2021-06-14 20:21:47 +00:00
2021-07-04 06:53:58 +00:00
if err := s . walkObject ( hdr . ParentMessageReceipts , walked , f ) ; err != nil {
2021-06-16 11:07:10 +00:00
return xerrors . Errorf ( "error walking message receipts (cid: %s): %w" , hdr . ParentMessageReceipts , err )
}
2021-06-14 20:21:47 +00:00
}
2021-03-13 10:00:28 +00:00
2021-07-04 06:53:58 +00:00
if err := s . walkObject ( hdr . ParentStateRoot , walked , f ) ; err != nil {
2021-06-16 11:07:10 +00:00
return xerrors . Errorf ( "error walking state root (cid: %s): %w" , hdr . ParentStateRoot , err )
}
2021-03-13 10:00:28 +00:00
}
2021-06-16 15:26:04 +00:00
if hdr . Height > 0 {
toWalk = append ( toWalk , hdr . Parents ... )
}
2021-07-01 11:06:28 +00:00
2021-03-13 10:00:28 +00:00
return nil
}
for len ( toWalk ) > 0 {
2021-07-08 10:12:19 +00:00
// walking can take a while, so check this with every opportunity
if err := s . checkClosing ( ) ; err != nil {
return err
}
2021-03-13 10:00:28 +00:00
walking := toWalk
toWalk = nil
for _ , c := range walking {
if err := walkBlock ( c ) ; err != nil {
return xerrors . Errorf ( "error walking block (cid: %s): %w" , c , err )
}
}
}
2021-07-01 11:10:57 +00:00
log . Infow ( "chain walk done" , "walked" , walkCnt , "scanned" , scanCnt )
2021-07-01 11:06:28 +00:00
2021-03-13 10:00:28 +00:00
return nil
}
2021-07-04 15:37:53 +00:00
func ( s * SplitStore ) walkObject ( c cid . Cid , walked * cid . Set , f func ( cid . Cid ) error ) error {
if ! walked . Visit ( c ) {
return nil
2021-07-04 10:15:45 +00:00
}
2021-07-04 15:37:53 +00:00
if err := f ( c ) ; err != nil {
if err == errStopWalk {
return nil
}
return err
}
if c . Prefix ( ) . Codec != cid . DagCBOR {
2021-07-04 10:15:45 +00:00
return nil
2021-07-04 15:37:53 +00:00
}
var links [ ] cid . Cid
err := s . view ( c , func ( data [ ] byte ) error {
return cbg . ScanForLinks ( bytes . NewReader ( data ) , func ( c cid . Cid ) {
links = append ( links , c )
} )
2021-07-04 10:15:45 +00:00
} )
2021-07-04 15:37:53 +00:00
if err != nil {
return xerrors . Errorf ( "error scanning linked block (cid: %s): %w" , c , err )
}
for _ , c := range links {
err := s . walkObject ( c , walked , f )
if err != nil {
return xerrors . Errorf ( "error walking link (cid: %s): %w" , c , err )
}
}
return nil
2021-07-04 10:15:45 +00:00
}
2021-07-06 12:09:04 +00:00
// like walkObject, but the object may be potentially incomplete (references missing)
2021-07-05 07:22:52 +00:00
func ( s * SplitStore ) walkObjectIncomplete ( c cid . Cid , walked * cid . Set , f , missing func ( cid . Cid ) error ) error {
2021-03-13 10:00:28 +00:00
if ! walked . Visit ( c ) {
return nil
}
2021-07-05 08:11:08 +00:00
// occurs check -- only for DAGs
if c . Prefix ( ) . Codec == cid . DagCBOR {
2021-07-05 09:41:11 +00:00
has , err := s . has ( c )
2021-07-05 08:11:08 +00:00
if err != nil {
return xerrors . Errorf ( "error occur checking %s: %w" , c , err )
2021-07-05 07:22:52 +00:00
}
2021-07-05 08:11:08 +00:00
if ! has {
err = missing ( c )
if err == errStopWalk {
return nil
}
return err
}
2021-07-05 07:22:52 +00:00
}
if err := f ( c ) ; err != nil {
2021-07-03 09:02:36 +00:00
if err == errStopWalk {
return nil
}
2021-03-13 10:00:28 +00:00
return err
}
2021-03-24 16:23:47 +00:00
if c . Prefix ( ) . Codec != cid . DagCBOR {
2021-07-05 07:22:52 +00:00
return nil
2021-03-24 16:23:47 +00:00
}
2021-07-03 09:02:36 +00:00
var links [ ] cid . Cid
2021-07-05 08:11:08 +00:00
err := s . view ( c , func ( data [ ] byte ) error {
2021-07-03 09:02:36 +00:00
return cbg . ScanForLinks ( bytes . NewReader ( data ) , func ( c cid . Cid ) {
links = append ( links , c )
} )
} )
2021-03-13 10:00:28 +00:00
if err != nil {
2021-07-03 09:02:36 +00:00
return xerrors . Errorf ( "error scanning linked block (cid: %s): %w" , c , err )
2021-03-13 10:00:28 +00:00
}
2021-07-03 09:02:36 +00:00
for _ , c := range links {
2021-07-05 07:22:52 +00:00
err := s . walkObjectIncomplete ( c , walked , f , missing )
2021-07-01 11:06:28 +00:00
if err != nil {
2021-07-03 09:02:36 +00:00
return xerrors . Errorf ( "error walking link (cid: %s): %w" , c , err )
2021-07-01 11:06:28 +00:00
}
2021-03-13 10:00:28 +00:00
}
2021-07-05 07:22:52 +00:00
return nil
2021-03-13 10:00:28 +00:00
}
2021-07-03 09:02:36 +00:00
// internal version used by walk
func ( s * SplitStore ) view ( cid cid . Cid , cb func ( [ ] byte ) error ) error {
err := s . hot . View ( cid , cb )
2021-06-27 14:04:26 +00:00
switch err {
case bstore . ErrNotFound :
2021-07-03 09:02:36 +00:00
return s . cold . View ( cid , cb )
2021-06-27 14:04:26 +00:00
default :
2021-07-03 09:02:36 +00:00
return err
2021-06-27 14:04:26 +00:00
}
}
2021-07-05 09:41:11 +00:00
func ( s * SplitStore ) has ( c cid . Cid ) ( bool , error ) {
has , err := s . hot . Has ( c )
if has || err != nil {
return has , err
}
return s . cold . Has ( c )
}
2021-07-08 07:13:44 +00:00
func ( s * SplitStore ) checkClosing ( ) error {
if atomic . LoadInt32 ( & s . closing ) == 1 {
log . Info ( "splitstore is closing; aborting compaction" )
return xerrors . Errorf ( "compaction aborted" )
}
return nil
}
2021-07-04 15:37:53 +00:00
func ( s * SplitStore ) isOldBlockHeader ( c cid . Cid , epoch abi . ChainEpoch ) ( isOldBlock bool , err error ) {
if c . Prefix ( ) . Codec != cid . DagCBOR {
return false , nil
}
err = s . view ( c , func ( data [ ] byte ) error {
var hdr types . BlockHeader
if hdr . UnmarshalCBOR ( bytes . NewBuffer ( data ) ) == nil {
isOldBlock = hdr . Height < epoch
}
return nil
} )
return isOldBlock , err
}
2021-03-02 16:59:00 +00:00
func ( s * SplitStore ) moveColdBlocks ( cold [ ] cid . Cid ) error {
2021-02-28 08:21:48 +00:00
batch := make ( [ ] blocks . Block , 0 , batchSize )
2021-07-04 06:53:58 +00:00
for _ , c := range cold {
2021-07-08 07:13:44 +00:00
if err := s . checkClosing ( ) ; err != nil {
return err
}
2021-07-04 06:53:58 +00:00
blk , err := s . hot . Get ( c )
2021-02-27 13:20:14 +00:00
if err != nil {
2021-06-28 10:35:06 +00:00
if err == bstore . ErrNotFound {
2021-07-04 06:53:58 +00:00
log . Warnf ( "hotstore missing block %s" , c )
continue
2021-02-27 13:20:14 +00:00
}
2021-07-04 06:53:58 +00:00
return xerrors . Errorf ( "error retrieving block %s from hotstore: %w" , c , err )
2021-02-27 13:20:14 +00:00
}
2021-02-28 08:21:48 +00:00
batch = append ( batch , blk )
if len ( batch ) == batchSize {
err = s . cold . PutMany ( batch )
if err != nil {
2021-03-02 09:20:39 +00:00
return xerrors . Errorf ( "error putting batch to coldstore: %w" , err )
2021-02-28 08:21:48 +00:00
}
batch = batch [ : 0 ]
}
}
if len ( batch ) > 0 {
2021-03-02 09:20:39 +00:00
err := s . cold . PutMany ( batch )
2021-02-27 13:20:14 +00:00
if err != nil {
2021-07-06 12:09:04 +00:00
return xerrors . Errorf ( "error putting batch to coldstore: %w" , err )
2021-02-27 13:20:14 +00:00
}
2021-02-28 08:21:48 +00:00
}
2021-02-27 13:20:14 +00:00
2021-03-02 09:20:39 +00:00
return nil
}
2021-07-06 12:09:04 +00:00
// sorts a slice of objects heaviest first -- it's a little expensive but worth the
// guarantee that we don't leave dangling references behind, e.g. if we die in the middle
// of a purge.
2021-07-04 17:17:07 +00:00
func ( s * SplitStore ) sortObjects ( cids [ ] cid . Cid ) error {
2021-07-06 04:26:13 +00:00
// we cache the keys to avoid making a gazillion of strings
keys := make ( map [ cid . Cid ] string )
key := func ( c cid . Cid ) string {
s , ok := keys [ c ]
if ! ok {
s = string ( c . Hash ( ) )
keys [ c ] = s
}
return s
}
// compute sorting weights as the cumulative number of DAG links
2021-07-06 06:02:44 +00:00
weights := make ( map [ string ] int )
2021-07-04 17:17:07 +00:00
for _ , c := range cids {
2021-07-08 07:13:44 +00:00
// this can take quite a while, so check for shutdown with every opportunity
if err := s . checkClosing ( ) ; err != nil {
return err
}
2021-07-06 06:02:44 +00:00
w := s . getObjectWeight ( c , weights , key )
weights [ key ( c ) ] = w
2021-07-04 17:17:07 +00:00
}
2021-07-06 04:26:13 +00:00
// sort!
2021-07-04 17:17:07 +00:00
sort . Slice ( cids , func ( i , j int ) bool {
2021-07-06 06:02:44 +00:00
wi := weights [ key ( cids [ i ] ) ]
wj := weights [ key ( cids [ j ] ) ]
2021-07-04 17:17:07 +00:00
if wi == wj {
return bytes . Compare ( cids [ i ] . Hash ( ) , cids [ j ] . Hash ( ) ) > 0
}
return wi > wj
} )
return nil
}
2021-07-06 06:02:44 +00:00
func ( s * SplitStore ) getObjectWeight ( c cid . Cid , weights map [ string ] int , key func ( cid . Cid ) string ) int {
w , ok := weights [ key ( c ) ]
if ok {
return w
2021-07-06 05:10:57 +00:00
}
2021-07-06 06:02:44 +00:00
// we treat block headers specially to avoid walking the entire chain
var hdr types . BlockHeader
err := s . view ( c , func ( data [ ] byte ) error {
return hdr . UnmarshalCBOR ( bytes . NewBuffer ( data ) )
} )
if err == nil {
w1 := s . getObjectWeight ( hdr . ParentStateRoot , weights , key )
weights [ key ( hdr . ParentStateRoot ) ] = w1
2021-07-06 05:10:57 +00:00
2021-07-06 06:02:44 +00:00
w2 := s . getObjectWeight ( hdr . Messages , weights , key )
weights [ key ( hdr . Messages ) ] = w2
return 1 + w1 + w2
2021-07-06 05:10:57 +00:00
}
var links [ ] cid . Cid
2021-07-06 06:02:44 +00:00
err = s . view ( c , func ( data [ ] byte ) error {
2021-07-06 05:10:57 +00:00
return cbg . ScanForLinks ( bytes . NewReader ( data ) , func ( c cid . Cid ) {
links = append ( links , c )
} )
} )
if err != nil {
2021-07-06 06:02:44 +00:00
return 1
2021-07-06 05:10:57 +00:00
}
2021-07-06 06:17:35 +00:00
w = 1
2021-07-06 05:10:57 +00:00
for _ , c := range links {
// these are internal refs, so dags will be dags
if c . Prefix ( ) . Codec != cid . DagCBOR {
2021-07-06 06:02:44 +00:00
w ++
2021-07-06 05:10:57 +00:00
continue
}
2021-07-06 06:02:44 +00:00
wc := s . getObjectWeight ( c , weights , key )
weights [ key ( c ) ] = wc
2021-07-06 05:10:57 +00:00
2021-07-06 06:02:44 +00:00
w += wc
2021-07-06 05:10:57 +00:00
}
2021-07-06 06:02:44 +00:00
return w
2021-07-06 05:10:57 +00:00
}
2021-03-03 18:04:29 +00:00
func ( s * SplitStore ) purgeBatch ( cids [ ] cid . Cid , deleteBatch func ( [ ] cid . Cid ) error ) error {
2021-03-02 18:20:07 +00:00
if len ( cids ) == 0 {
return nil
}
2021-07-05 07:22:52 +00:00
// we don't delete one giant batch of millions of objects, but rather do smaller batches
// so that we don't stop the world for an extended period of time
2021-03-02 18:20:07 +00:00
done := false
2021-03-03 17:36:13 +00:00
for i := 0 ; ! done ; i ++ {
2021-03-02 18:20:07 +00:00
start := i * batchSize
end := start + batchSize
if end >= len ( cids ) {
end = len ( cids )
done = true
}
2021-03-03 18:04:29 +00:00
err := deleteBatch ( cids [ start : end ] )
2021-03-02 18:20:07 +00:00
if err != nil {
2021-03-03 18:04:29 +00:00
return xerrors . Errorf ( "error deleting batch: %w" , err )
2021-03-02 18:20:07 +00:00
}
2021-02-27 13:20:14 +00:00
}
2021-02-27 21:08:23 +00:00
2021-03-02 09:20:39 +00:00
return nil
}
2021-02-27 21:08:23 +00:00
2021-07-06 22:39:58 +00:00
func ( s * SplitStore ) purge ( cids [ ] cid . Cid ) error {
2021-06-25 08:39:24 +00:00
deadCids := make ( [ ] cid . Cid , 0 , batchSize )
2021-07-01 05:38:35 +00:00
var purgeCnt , liveCnt int
2021-06-30 17:35:00 +00:00
defer func ( ) {
2021-07-03 05:13:26 +00:00
log . Infow ( "purged cold objects" , "purged" , purgeCnt , "live" , liveCnt )
2021-06-30 17:35:00 +00:00
} ( )
2021-06-25 08:39:24 +00:00
2021-06-25 07:07:45 +00:00
return s . purgeBatch ( cids ,
func ( cids [ ] cid . Cid ) error {
2021-06-25 08:39:24 +00:00
deadCids := deadCids [ : 0 ]
2021-06-25 07:07:45 +00:00
2021-07-08 07:13:44 +00:00
if err := s . checkClosing ( ) ; err != nil {
return err
2021-07-07 18:32:05 +00:00
}
2021-06-25 07:07:45 +00:00
s . txnLk . Lock ( )
defer s . txnLk . Unlock ( )
2021-02-27 16:27:58 +00:00
2021-06-25 07:07:45 +00:00
for _ , c := range cids {
live , err := s . txnProtect . Has ( c )
if err != nil {
return xerrors . Errorf ( "error checking for liveness: %w" , err )
}
if live {
2021-07-01 05:38:35 +00:00
liveCnt ++
2021-06-25 07:07:45 +00:00
continue
}
deadCids = append ( deadCids , c )
}
2021-07-04 06:53:58 +00:00
err := s . hot . DeleteMany ( deadCids )
2021-06-25 07:07:45 +00:00
if err != nil {
return xerrors . Errorf ( "error purging cold objects: %w" , err )
}
2021-07-06 14:13:38 +00:00
s . debug . LogDelete ( deadCids )
2021-06-25 08:39:24 +00:00
purgeCnt += len ( deadCids )
2021-06-25 07:07:45 +00:00
return nil
} )
2021-02-27 13:20:14 +00:00
}
2021-07-06 12:09:04 +00:00
// I really don't like having this code, but we seem to have some occasional DAG references with
// missing constituents. During testing in mainnet *some* of these references *sometimes* appeared
// after a little bit.
2021-07-05 13:08:08 +00:00
// We need to figure out where they are coming from and eliminate that vector, but until then we
// have this gem[TM].
2021-07-06 12:09:04 +00:00
// My best guess is that they are parent message receipts or yet to be computed state roots; magik
// thinks the cause may be block validation.
2021-07-05 17:10:47 +00:00
func ( s * SplitStore ) waitForMissingRefs ( ) {
2021-07-05 15:16:54 +00:00
s . txnLk . Lock ( )
missing := s . txnMissing
s . txnMissing = nil
s . txnLk . Unlock ( )
if len ( missing ) == 0 {
return
}
2021-07-05 13:08:08 +00:00
log . Info ( "waiting for missing references" )
start := time . Now ( )
count := 0
defer func ( ) {
log . Infow ( "waiting for missing references done" , "took" , time . Since ( start ) , "marked" , count )
} ( )
2021-07-05 15:24:48 +00:00
for i := 0 ; i < 3 && len ( missing ) > 0 ; i ++ {
2021-07-08 07:13:44 +00:00
if err := s . checkClosing ( ) ; err != nil {
return
}
2021-07-05 13:08:08 +00:00
wait := time . Duration ( i ) * time . Minute
2021-07-05 15:24:48 +00:00
log . Infof ( "retrying for %d missing references in %s (attempt: %d)" , len ( missing ) , wait , i + 1 )
if wait > 0 {
time . Sleep ( wait )
}
2021-07-05 13:08:08 +00:00
towalk := missing
walked := cid . NewSet ( )
missing = make ( map [ cid . Cid ] struct { } )
for c := range towalk {
err := s . walkObjectIncomplete ( c , walked ,
func ( c cid . Cid ) error {
if isFilCommitment ( c ) {
return errStopWalk
}
2021-07-05 17:10:47 +00:00
mark , err := s . txnProtect . Has ( c )
2021-07-05 15:16:54 +00:00
if err != nil {
return xerrors . Errorf ( "error checking markset for %s: %w" , c , err )
}
2021-07-05 13:08:08 +00:00
2021-07-05 15:16:54 +00:00
if mark {
return errStopWalk
2021-07-05 13:08:08 +00:00
}
isOldBlock , err := s . isOldBlockHeader ( c , s . txnLookbackEpoch )
if err != nil {
return xerrors . Errorf ( "error checking object type for %s: %w" , c , err )
}
if isOldBlock {
return errStopWalk
}
count ++
2021-07-05 15:16:54 +00:00
return s . txnProtect . Mark ( c )
2021-07-05 13:08:08 +00:00
} ,
func ( c cid . Cid ) error {
missing [ c ] = struct { } { }
return errStopWalk
} )
if err != nil {
log . Warnf ( "error marking: %s" , err )
}
}
}
if len ( missing ) > 0 {
log . Warnf ( "still missing %d references" , len ( missing ) )
for c := range missing {
log . Warnf ( "unresolved missing reference: %s" , c )
}
}
}
2021-03-08 17:46:21 +00:00
func ( s * SplitStore ) gcHotstore ( ) {
2021-03-11 09:45:19 +00:00
if compact , ok := s . hot . ( interface { Compact ( ) error } ) ; ok {
log . Infof ( "compacting hotstore" )
startCompact := time . Now ( )
err := compact . Compact ( )
if err != nil {
log . Warnf ( "error compacting hotstore: %s" , err )
return
}
2021-03-11 11:10:44 +00:00
log . Infow ( "hotstore compaction done" , "took" , time . Since ( startCompact ) )
2021-03-11 09:45:19 +00:00
}
2021-03-08 17:46:21 +00:00
if gc , ok := s . hot . ( interface { CollectGarbage ( ) error } ) ; ok {
log . Infof ( "garbage collecting hotstore" )
startGC := time . Now ( )
err := gc . CollectGarbage ( )
if err != nil {
log . Warnf ( "error garbage collecting hotstore: %s" , err )
2021-03-11 11:10:44 +00:00
return
2021-03-08 17:46:21 +00:00
}
2021-03-11 11:10:44 +00:00
log . Infow ( "hotstore garbage collection done" , "took" , time . Since ( startGC ) )
2021-03-08 17:46:21 +00:00
}
}
2020-11-24 17:41:07 +00:00
func ( s * SplitStore ) setBaseEpoch ( epoch abi . ChainEpoch ) error {
s . baseEpoch = epoch
2020-11-26 18:37:02 +00:00
return s . ds . Put ( baseEpochKey , epochToBytes ( epoch ) )
}
func epochToBytes ( epoch abi . ChainEpoch ) [ ] byte {
2021-03-05 08:00:17 +00:00
return uint64ToBytes ( uint64 ( epoch ) )
}
func bytesToEpoch ( buf [ ] byte ) abi . ChainEpoch {
return abi . ChainEpoch ( bytesToUint64 ( buf ) )
}
func int64ToBytes ( i int64 ) [ ] byte {
return uint64ToBytes ( uint64 ( i ) )
}
func bytesToInt64 ( buf [ ] byte ) int64 {
return int64 ( bytesToUint64 ( buf ) )
}
func uint64ToBytes ( i uint64 ) [ ] byte {
2020-11-26 18:37:02 +00:00
buf := make ( [ ] byte , 16 )
2021-03-05 08:00:17 +00:00
n := binary . PutUvarint ( buf , i )
2020-11-26 18:37:02 +00:00
return buf [ : n ]
}
2021-03-05 08:00:17 +00:00
func bytesToUint64 ( buf [ ] byte ) uint64 {
i , _ := binary . Uvarint ( buf )
return i
2020-11-24 17:15:38 +00:00
}
2021-07-05 08:32:52 +00:00
func isFilCommitment ( c cid . Cid ) bool {
switch c . Prefix ( ) . Codec {
case cid . FilCommitmentSealed , cid . FilCommitmentUnsealed :
return true
default :
return false
}
}