lotus/cmd/tvx/stores.go
2020-09-28 23:14:37 +01:00

143 lines
3.5 KiB
Go

package main
import (
"context"
"log"
"sync"
"github.com/fatih/color"
dssync "github.com/ipfs/go-datastore/sync"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/lib/blockstore"
"github.com/filecoin-project/specs-actors/actors/util/adt"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-blockservice"
"github.com/ipfs/go-cid"
ds "github.com/ipfs/go-datastore"
exchange "github.com/ipfs/go-ipfs-exchange-interface"
offline "github.com/ipfs/go-ipfs-exchange-offline"
cbor "github.com/ipfs/go-ipld-cbor"
format "github.com/ipfs/go-ipld-format"
"github.com/ipfs/go-merkledag"
)
// Stores is a collection of the different stores and services that are needed
// to deal with the data layer of Filecoin, conveniently interlinked with one
// another.
type Stores struct {
CBORStore cbor.IpldStore
ADTStore adt.Store
Datastore ds.Batching
Blockstore blockstore.Blockstore
BlockService blockservice.BlockService
Exchange exchange.Interface
DAGService format.DAGService
}
// NewProxyingStores is a set of Stores backed by a proxying Blockstore that
// proxies Get requests for unknown CIDs to a Filecoin node, via the
// ChainReadObj RPC.
func NewProxyingStores(ctx context.Context, api api.FullNode) *Stores {
ds := dssync.MutexWrap(ds.NewMapDatastore())
bs := &proxyingBlockstore{
ctx: ctx,
api: api,
Blockstore: blockstore.NewBlockstore(ds),
}
return NewStores(ctx, ds, bs)
}
// NewStores creates a non-proxying set of Stores.
func NewStores(ctx context.Context, ds ds.Batching, bs blockstore.Blockstore) *Stores {
var (
cborstore = cbor.NewCborStore(bs)
offl = offline.Exchange(bs)
blkserv = blockservice.New(bs, offl)
dserv = merkledag.NewDAGService(blkserv)
)
return &Stores{
CBORStore: cborstore,
ADTStore: adt.WrapStore(ctx, cborstore),
Datastore: ds,
Blockstore: bs,
Exchange: offl,
BlockService: blkserv,
DAGService: dserv,
}
}
// TracingBlockstore is a Blockstore trait that records CIDs that were accessed
// through Get.
type TracingBlockstore interface {
// StartTracing starts tracing CIDs accessed through the this Blockstore.
StartTracing()
// FinishTracing finishes tracing accessed CIDs, and returns a map of the
// CIDs that were traced.
FinishTracing() map[cid.Cid]struct{}
}
// proxyingBlockstore is a Blockstore wrapper that fetches unknown CIDs from
// a Filecoin node via JSON-RPC.
type proxyingBlockstore struct {
ctx context.Context
api api.FullNode
lk sync.RWMutex
tracing bool
traced map[cid.Cid]struct{}
blockstore.Blockstore
}
var _ TracingBlockstore = (*proxyingBlockstore)(nil)
func (pb *proxyingBlockstore) StartTracing() {
pb.lk.Lock()
pb.tracing = true
pb.traced = map[cid.Cid]struct{}{}
pb.lk.Unlock()
}
func (pb *proxyingBlockstore) FinishTracing() map[cid.Cid]struct{} {
pb.lk.Lock()
ret := pb.traced
pb.tracing = false
pb.traced = map[cid.Cid]struct{}{}
pb.lk.Unlock()
return ret
}
func (pb *proxyingBlockstore) Get(cid cid.Cid) (blocks.Block, error) {
pb.lk.RLock()
if pb.tracing {
pb.traced[cid] = struct{}{}
}
pb.lk.RUnlock()
if block, err := pb.Blockstore.Get(cid); err == nil {
return block, err
}
log.Println(color.CyanString("fetching cid via rpc: %v", cid))
item, err := pb.api.ChainReadObj(pb.ctx, cid)
if err != nil {
return nil, err
}
block, err := blocks.NewBlockWithCid(item, cid)
if err != nil {
return nil, err
}
err = pb.Blockstore.Put(block)
if err != nil {
return nil, err
}
return block, nil
}