package main import ( "context" "log" "sync" "github.com/fatih/color" dssync "github.com/ipfs/go-datastore/sync" "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/blockstore" "github.com/filecoin-project/lotus/chain/actors/adt" blocks "github.com/ipfs/go-block-format" "github.com/ipfs/go-blockservice" "github.com/ipfs/go-cid" ds "github.com/ipfs/go-datastore" exchange "github.com/ipfs/go-ipfs-exchange-interface" offline "github.com/ipfs/go-ipfs-exchange-offline" cbor "github.com/ipfs/go-ipld-cbor" format "github.com/ipfs/go-ipld-format" "github.com/ipfs/go-merkledag" ) // Stores is a collection of the different stores and services that are needed // to deal with the data layer of Filecoin, conveniently interlinked with one // another. type Stores struct { CBORStore cbor.IpldStore ADTStore adt.Store Datastore ds.Batching Blockstore blockstore.Blockstore BlockService blockservice.BlockService Exchange exchange.Interface DAGService format.DAGService } // NewProxyingStores is a set of Stores backed by a proxying Blockstore that // proxies Get requests for unknown CIDs to a Filecoin node, via the // ChainReadObj RPC. func NewProxyingStores(ctx context.Context, api api.FullNode) *Stores { ds := dssync.MutexWrap(ds.NewMapDatastore()) bs := &proxyingBlockstore{ ctx: ctx, api: api, Blockstore: blockstore.FromDatastore(ds), } return NewStores(ctx, ds, bs) } // NewStores creates a non-proxying set of Stores. func NewStores(ctx context.Context, ds ds.Batching, bs blockstore.Blockstore) *Stores { var ( cborstore = cbor.NewCborStore(bs) offl = offline.Exchange(bs) blkserv = blockservice.New(bs, offl) dserv = merkledag.NewDAGService(blkserv) ) return &Stores{ CBORStore: cborstore, ADTStore: adt.WrapStore(ctx, cborstore), Datastore: ds, Blockstore: bs, Exchange: offl, BlockService: blkserv, DAGService: dserv, } } // TracingBlockstore is a Blockstore trait that records CIDs that were accessed // through Get. type TracingBlockstore interface { // StartTracing starts tracing CIDs accessed through the this Blockstore. StartTracing() // FinishTracing finishes tracing accessed CIDs, and returns a map of the // CIDs that were traced. FinishTracing() map[cid.Cid]struct{} } // proxyingBlockstore is a Blockstore wrapper that fetches unknown CIDs from // a Filecoin node via JSON-RPC. type proxyingBlockstore struct { ctx context.Context api api.FullNode lk sync.Mutex tracing bool traced map[cid.Cid]struct{} blockstore.Blockstore } var _ TracingBlockstore = (*proxyingBlockstore)(nil) func (pb *proxyingBlockstore) StartTracing() { pb.lk.Lock() pb.tracing = true pb.traced = map[cid.Cid]struct{}{} pb.lk.Unlock() } func (pb *proxyingBlockstore) FinishTracing() map[cid.Cid]struct{} { pb.lk.Lock() ret := pb.traced pb.tracing = false pb.traced = map[cid.Cid]struct{}{} pb.lk.Unlock() return ret } func (pb *proxyingBlockstore) Get(cid cid.Cid) (blocks.Block, error) { pb.lk.Lock() if pb.tracing { pb.traced[cid] = struct{}{} } pb.lk.Unlock() if block, err := pb.Blockstore.Get(cid); err == nil { return block, err } log.Println(color.CyanString("fetching cid via rpc: %v", cid)) item, err := pb.api.ChainReadObj(pb.ctx, cid) if err != nil { return nil, err } block, err := blocks.NewBlockWithCid(item, cid) if err != nil { return nil, err } err = pb.Blockstore.Put(block) if err != nil { return nil, err } return block, nil } func (pb *proxyingBlockstore) Put(block blocks.Block) error { pb.lk.Lock() if pb.tracing { pb.traced[block.Cid()] = struct{}{} } pb.lk.Unlock() return pb.Blockstore.Put(block) } func (pb *proxyingBlockstore) PutMany(blocks []blocks.Block) error { pb.lk.Lock() if pb.tracing { for _, b := range blocks { pb.traced[b.Cid()] = struct{}{} } } pb.lk.Unlock() return pb.Blockstore.PutMany(blocks) }