wip accessed-cids retention.

This commit is contained in:
Raúl Kripalani 2020-09-27 12:43:28 +01:00
parent 41ab87e904
commit 400af0d2fa
3 changed files with 92 additions and 28 deletions

View File

@ -6,6 +6,7 @@ import (
"context"
"encoding/json"
"fmt"
"io"
"log"
"os"
@ -51,7 +52,7 @@ var extractMsgCmd = &cli.Command{
&cli.StringFlag{
Name: "state-retain",
Usage: "state retention policy; values: 'accessed-cids' (default), 'accessed-actors'",
Value: "accessed-actors",
Value: "accessed-cids",
Destination: &extractMsgFlags.retain,
},
},
@ -170,9 +171,11 @@ func runExtractMsg(c *cli.Context) error {
var (
preroot cid.Cid
postroot cid.Cid
carWriter func(w io.Writer) error
)
if extractMsgFlags.retain == "accessed-actors" {
switch retention := extractMsgFlags.retain; retention {
case "accessed-actors":
log.Printf("calculating accessed actors...")
// get actors accessed by message.
retain, err := g.GetAccessedActors(ctx, api, mcid)
@ -192,6 +195,30 @@ func runExtractMsg(c *cli.Context) error {
if err != nil {
return fmt.Errorf("failed to execute message: %w", err)
}
carWriter = func(w io.Writer) error {
return g.WriteCAR(w, preroot, postroot)
}
case "accessed-cids":
log.Printf("using state retention: %s", retention)
tbs, ok := pst.Blockstore.(state.TracingBlockstore)
if !ok {
return fmt.Errorf("requested 'accessed-cids' state retention, but no tracing blockstore was present")
}
tbs.StartTracing()
preroot = execTs.ParentState()
_, postroot, err = driver.ExecuteMessage(pst.Blockstore, preroot, execTs.Height(), msg)
if err != nil {
return fmt.Errorf("failed to execute message: %w", err)
}
accessed := tbs.FinishTracing()
carWriter = func(w io.Writer) error {
return g.WriteCARIncluding(w, accessed, preroot, postroot)
}
default:
return fmt.Errorf("unknown state retention option: %s", retention)
}
msgBytes, err := msg.Serialize()
@ -199,17 +226,11 @@ func runExtractMsg(c *cli.Context) error {
return err
}
// don't fetch additional content that wasn't accessed yet during car spidering / generation.
type onlineblockstore interface {
SetOnline(bool)
}
if ob, ok := pst.Blockstore.(onlineblockstore); ok {
ob.SetOnline(false)
}
out := new(bytes.Buffer)
gw := gzip.NewWriter(out)
if err := g.WriteCAR(gw, preroot, postroot); err != nil {
var (
out = new(bytes.Buffer)
gw = gzip.NewWriter(out)
)
if err := carWriter(gw); err != nil {
return err
}
if err = gw.Flush(); err != nil {

View File

@ -58,18 +58,50 @@ type proxyingBlockstore struct {
ctx context.Context
api api.FullNode
online bool
lock sync.RWMutex
lk sync.RWMutex
tracing bool
traced map[cid.Cid]struct{}
blockstore.Blockstore
}
type TracingBlockstore interface {
StartTracing()
FinishTracing() map[cid.Cid]struct{}
}
var _ TracingBlockstore = (*proxyingBlockstore)(nil)
// StartTracing starts tracing the CIDs that are effectively fetched during the
// processing of a message.
func (pb *proxyingBlockstore) StartTracing() {
pb.lk.Lock()
pb.tracing = true
pb.traced = map[cid.Cid]struct{}{}
pb.lk.Unlock()
}
// FinishTracing finishes tracing accessed CIDs, and returns a map of the
// CIDs that were traced.
func (pb *proxyingBlockstore) FinishTracing() map[cid.Cid]struct{} {
pb.lk.Lock()
ret := pb.traced
pb.tracing = false
pb.traced = map[cid.Cid]struct{}{}
pb.lk.Unlock()
return ret
}
func (pb *proxyingBlockstore) Get(cid cid.Cid) (blocks.Block, error) {
pb.lock.RLock()
if block, err := pb.Blockstore.Get(cid); err == nil || !pb.online {
pb.lock.RUnlock()
pb.lk.RLock()
if pb.tracing {
pb.traced[cid] = struct{}{}
}
pb.lk.RUnlock()
if block, err := pb.Blockstore.Get(cid); err == nil {
return block, err
}
pb.lock.RUnlock()
log.Println(color.CyanString("fetching cid via rpc: %v", cid))
item, err := pb.api.ChainReadObj(pb.ctx, cid)
@ -81,8 +113,6 @@ func (pb *proxyingBlockstore) Get(cid cid.Cid) (blocks.Block, error) {
return nil, err
}
pb.lock.Lock()
defer pb.lock.Unlock()
err = pb.Blockstore.Put(block)
if err != nil {
return nil, err
@ -91,10 +121,6 @@ func (pb *proxyingBlockstore) Get(cid cid.Cid) (blocks.Block, error) {
return block, nil
}
func (pb *proxyingBlockstore) SetOnline(online bool) {
pb.online = online
}
// NewProxyingStores is a Stores that proxies get requests for unknown CIDs
// to a Filecoin node, via the ChainReadObj RPC.
func NewProxyingStores(ctx context.Context, api api.FullNode) *Stores {
@ -103,7 +129,6 @@ func NewProxyingStores(ctx context.Context, api api.FullNode) *Stores {
bs := &proxyingBlockstore{
ctx: ctx,
api: api,
online: true,
Blockstore: blockstore.NewBlockstore(ds),
}

View File

@ -153,6 +153,24 @@ func (sg *Surgeon) WriteCAR(w io.Writer, roots ...cid.Cid) error {
return car.WriteCarWithWalker(sg.ctx, sg.stores.DAGService, roots, w, carWalkFn)
}
// WriteCARIncluding writes a CAR including only the CIDs that are listed in
// the include set. This leads to an intentially sparse tree with dangling links.
func (sg *Surgeon) WriteCARIncluding(w io.Writer, include map[cid.Cid]struct{}, roots ...cid.Cid) error {
carWalkFn := func(nd format.Node) (out []*format.Link, err error) {
for _, link := range nd.Links() {
if _, ok := include[link.Cid]; !ok {
continue
}
if link.Cid.Prefix().Codec == cid.FilCommitmentSealed || link.Cid.Prefix().Codec == cid.FilCommitmentUnsealed {
continue
}
out = append(out, link)
}
return out, nil
}
return car.WriteCarWithWalker(sg.ctx, sg.stores.DAGService, roots, w, carWalkFn)
}
// transplantActors plucks the state from the supplied actors at the given
// tipset, and places it into the supplied state map.
func (sg *Surgeon) transplantActors(src *state.StateTree, pluck []address.Address) (*state.StateTree, error) {