41f4f1fd83
- When chainwatch is ran it will first start a Syncer that continuously collects blocks from the ChainNotify channel and persists them to the blocks_synced table. Once the Syncer has caught the blocks_synced table up to the lotus daemons current head a Processor is started. The Processor selects a batch of contiguous blocks and extracts and stores their data. It attempts to do as much work as it can in parallel. When the blocks are done being processed their corresponding processed_at and is_processed fields in the blocks_synced table are filled out.
52 lines
1.1 KiB
Go
52 lines
1.1 KiB
Go
package util
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"fmt"
|
|
|
|
"github.com/ipfs/go-cid"
|
|
cbg "github.com/whyrusleeping/cbor-gen"
|
|
|
|
"github.com/filecoin-project/lotus/api"
|
|
)
|
|
|
|
// TODO extract this to a common location in lotus and reuse the code
|
|
|
|
// APIIpldStore is required for AMT and HAMT access.
|
|
type APIIpldStore struct {
|
|
ctx context.Context
|
|
api api.FullNode
|
|
}
|
|
|
|
func NewAPIIpldStore(ctx context.Context, api api.FullNode) *APIIpldStore {
|
|
return &APIIpldStore{
|
|
ctx: ctx,
|
|
api: api,
|
|
}
|
|
}
|
|
|
|
func (ht *APIIpldStore) Context() context.Context {
|
|
return ht.ctx
|
|
}
|
|
|
|
func (ht *APIIpldStore) Get(ctx context.Context, c cid.Cid, out interface{}) error {
|
|
raw, err := ht.api.ChainReadObj(ctx, c)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
cu, ok := out.(cbg.CBORUnmarshaler)
|
|
if ok {
|
|
if err := cu.UnmarshalCBOR(bytes.NewReader(raw)); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
return fmt.Errorf("Object does not implement CBORUnmarshaler: %T", out)
|
|
}
|
|
|
|
func (ht *APIIpldStore) Put(ctx context.Context, v interface{}) (cid.Cid, error) {
|
|
return cid.Undef, fmt.Errorf("Put is not implemented on APIIpldStore")
|
|
}
|