lotus/cmd/tvx/stores.go

171 lines
4.2 KiB
Go
Raw Normal View History

2020-09-27 19:10:05 +00:00
package main
import (
"context"
"log"
"sync"
"github.com/fatih/color"
chore: migrate to boxo This migrates everything except the `go-car` librairy: https://github.com/ipfs/boxo/issues/218#issuecomment-1529922103 I didn't migrated everything in the previous release because all the boxo code wasn't compatible with the go-ipld-prime one due to a an in flight (/ aftermath) revert of github.com/ipfs/go-block-format. go-block-format has been unmigrated since slight bellow absolutely everything depends on it that would have required everything to be moved on boxo or everything to optin into using boxo which were all deal breakers for different groups. This worked fine because lotus's codebase could live hapely on the first multirepo setup however boost is now trying to use boxo's code with lotus's (still on multirepo) setup: https://filecoinproject.slack.com/archives/C03AQ3QAUG1/p1685022344779649 The alternative would be for boost to write shim types which just forward calls and return with the different interface definitions. Btw why is that an issue in the first place is because unlike what go's duck typing model suggest interfaces are not transparent https://github.com/golang/go/issues/58112, interfaces are strongly typed but they have implicit narrowing. The issue is if you return an interface from an interface Go does not have a function definition to insert the implicit conversion thus instead the type checker complains you are not returning the right type. Stubbing types were reverted https://github.com/ipfs/boxo/issues/218#issuecomment-1478650351 Last time I only migrated `go-bitswap` to `boxo/bitswap` because of the security issues and because we never had the interface return an interface problem (we had concrete wrappers where the implicit conversion took place).
2023-05-25 14:31:53 +00:00
"github.com/ipfs/boxo/blockservice"
exchange "github.com/ipfs/boxo/exchange"
offline "github.com/ipfs/boxo/exchange/offline"
"github.com/ipfs/boxo/ipld/merkledag"
blocks "github.com/ipfs/go-block-format"
2020-09-27 19:10:05 +00:00
"github.com/ipfs/go-cid"
ds "github.com/ipfs/go-datastore"
2022-06-14 15:00:51 +00:00
dssync "github.com/ipfs/go-datastore/sync"
2020-09-27 19:10:05 +00:00
cbor "github.com/ipfs/go-ipld-cbor"
format "github.com/ipfs/go-ipld-format"
2022-11-07 21:50:55 +00:00
"golang.org/x/xerrors"
2022-06-14 15:00:51 +00:00
2023-09-21 15:37:02 +00:00
"github.com/filecoin-project/lotus/api/v1api"
2022-06-14 15:00:51 +00:00
"github.com/filecoin-project/lotus/blockstore"
"github.com/filecoin-project/lotus/chain/actors/adt"
2020-09-27 19:10:05 +00:00
)
// Stores is a collection of the different stores and services that are needed
// to deal with the data layer of Filecoin, conveniently interlinked with one
// another.
type Stores struct {
CBORStore cbor.IpldStore
ADTStore adt.Store
Datastore ds.Batching
Blockstore blockstore.Blockstore
BlockService blockservice.BlockService
Exchange exchange.Interface
DAGService format.DAGService
}
// NewProxyingStores is a set of Stores backed by a proxying Blockstore that
// proxies Get requests for unknown CIDs to a Filecoin node, via the
// ChainReadObj RPC.
2023-09-21 15:37:02 +00:00
func NewProxyingStores(ctx context.Context, api v1api.FullNode) *Stores {
2020-09-27 20:06:07 +00:00
ds := dssync.MutexWrap(ds.NewMapDatastore())
2020-09-27 19:10:05 +00:00
bs := &proxyingBlockstore{
ctx: ctx,
api: api,
Blockstore: blockstore.FromDatastore(ds),
2020-09-27 19:10:05 +00:00
}
return NewStores(ctx, ds, bs)
}
// NewStores creates a non-proxying set of Stores.
func NewStores(ctx context.Context, ds ds.Batching, bs blockstore.Blockstore) *Stores {
var (
cborstore = cbor.NewCborStore(bs)
offl = offline.Exchange(bs)
blkserv = blockservice.New(bs, offl)
dserv = merkledag.NewDAGService(blkserv)
)
return &Stores{
CBORStore: cborstore,
ADTStore: adt.WrapStore(ctx, cborstore),
Datastore: ds,
Blockstore: bs,
Exchange: offl,
BlockService: blkserv,
DAGService: dserv,
}
}
// TracingBlockstore is a Blockstore trait that records CIDs that were accessed
// through Get.
type TracingBlockstore interface {
// StartTracing starts tracing CIDs accessed through the this Blockstore.
StartTracing()
// FinishTracing finishes tracing accessed CIDs, and returns a map of the
// CIDs that were traced.
FinishTracing() map[cid.Cid]struct{}
}
// proxyingBlockstore is a Blockstore wrapper that fetches unknown CIDs from
// a Filecoin node via JSON-RPC.
type proxyingBlockstore struct {
ctx context.Context
2023-09-21 15:37:02 +00:00
api v1api.FullNode
2020-09-27 19:10:05 +00:00
lk sync.Mutex
2020-09-27 19:10:05 +00:00
tracing bool
traced map[cid.Cid]struct{}
blockstore.Blockstore
}
var _ TracingBlockstore = (*proxyingBlockstore)(nil)
func (pb *proxyingBlockstore) StartTracing() {
pb.lk.Lock()
pb.tracing = true
pb.traced = map[cid.Cid]struct{}{}
pb.lk.Unlock()
}
func (pb *proxyingBlockstore) FinishTracing() map[cid.Cid]struct{} {
pb.lk.Lock()
ret := pb.traced
pb.tracing = false
pb.traced = map[cid.Cid]struct{}{}
pb.lk.Unlock()
return ret
}
2021-12-13 13:03:54 +00:00
func (pb *proxyingBlockstore) Get(ctx context.Context, cid cid.Cid) (blocks.Block, error) {
pb.lk.Lock()
2020-09-27 19:10:05 +00:00
if pb.tracing {
pb.traced[cid] = struct{}{}
}
pb.lk.Unlock()
2020-09-27 19:10:05 +00:00
2021-12-13 13:03:54 +00:00
if block, err := pb.Blockstore.Get(ctx, cid); err == nil {
2020-09-27 19:10:05 +00:00
return block, err
}
log.Println(color.CyanString("fetching cid via rpc: %v", cid))
item, err := pb.api.ChainReadObj(pb.ctx, cid)
if err != nil {
return nil, err
}
block, err := blocks.NewBlockWithCid(item, cid)
if err != nil {
return nil, err
}
2021-12-13 13:03:54 +00:00
err = pb.Blockstore.Put(ctx, block)
2020-09-27 19:10:05 +00:00
if err != nil {
return nil, err
}
return block, nil
}
2021-12-13 13:03:54 +00:00
func (pb *proxyingBlockstore) Put(ctx context.Context, block blocks.Block) error {
pb.lk.Lock()
if pb.tracing {
pb.traced[block.Cid()] = struct{}{}
}
pb.lk.Unlock()
2021-12-13 13:03:54 +00:00
return pb.Blockstore.Put(ctx, block)
}
2021-12-13 13:03:54 +00:00
func (pb *proxyingBlockstore) PutMany(ctx context.Context, blocks []blocks.Block) error {
pb.lk.Lock()
if pb.tracing {
for _, b := range blocks {
pb.traced[b.Cid()] = struct{}{}
}
}
pb.lk.Unlock()
2021-12-13 13:03:54 +00:00
return pb.Blockstore.PutMany(ctx, blocks)
}
2022-11-07 21:38:50 +00:00
func (pb *proxyingBlockstore) View(ctx context.Context, c cid.Cid, callback func([]byte) error) error {
blk, err := pb.Get(ctx, c)
if err != nil {
return xerrors.Errorf("failed to Get cid %s: %w", c, err)
}
return callback(blk.RawData())
}