Merge remote-tracking branch 'origin/master' into revert-10852-sbansal/revert-10848

This commit is contained in:
Łukasz Magiera 2023-05-30 15:53:23 +02:00
commit 2636815311
39 changed files with 1146 additions and 191 deletions

View File

@ -1114,13 +1114,17 @@ func (s *SplitStore) walkChain(ts *types.TipSet, inclState, inclMsgs abi.ChainEp
if err := walkBlock(c); err != nil {
return xerrors.Errorf("error walking block (cid: %s): %w", c, err)
}
if err := s.checkYield(); err != nil {
return xerrors.Errorf("check yield: %w", err)
}
}
return nil
})
}
if err := g.Wait(); err != nil {
return err
return xerrors.Errorf("walkBlock workers errored: %w", err)
}
}
@ -1153,8 +1157,8 @@ func (s *SplitStore) walkObject(c cid.Cid, visitor ObjectVisitor, f func(cid.Cid
}
// check this before recursing
if err := s.checkYield(); err != nil {
return 0, err
if err := s.checkClosing(); err != nil {
return 0, xerrors.Errorf("check closing: %w", err)
}
var links []cid.Cid
@ -1222,8 +1226,8 @@ func (s *SplitStore) walkObjectIncomplete(c cid.Cid, visitor ObjectVisitor, f, m
}
// check this before recursing
if err := s.checkYield(); err != nil {
return sz, err
if err := s.checkClosing(); err != nil {
return sz, xerrors.Errorf("check closing: %w", err)
}
var links []cid.Cid

View File

@ -49,16 +49,11 @@ var UpgradeHyperdriveHeight = abi.ChainEpoch(-16)
var UpgradeChocolateHeight = abi.ChainEpoch(-17)
var UpgradeOhSnapHeight = abi.ChainEpoch(-18)
var UpgradeSkyrHeight = abi.ChainEpoch(-19)
var UpgradeSharkHeight = abi.ChainEpoch(-20)
var UpgradeHyggeHeight = abi.ChainEpoch(-21)
var UpgradeLightningHeight = abi.ChainEpoch(-22)
const UpgradeSharkHeight = abi.ChainEpoch(-20)
const UpgradeHyggeHeight = abi.ChainEpoch(100)
// ??????????
const UpgradeLightningHeight = 200
// ??????????????????
const UpgradeThunderHeight = 300
const UpgradeThunderHeight = 50
var DrandSchedule = map[abi.ChainEpoch]DrandEnum{
0: DrandMainnet,

View File

@ -35,6 +35,7 @@ func (mp *MessagePool) CheckPendingMessages(ctx context.Context, from address.Ad
mp.lk.RLock()
mset, ok, err := mp.getPendingMset(ctx, from)
if err != nil {
mp.lk.RUnlock()
return nil, xerrors.Errorf("errored while getting pending mset: %w", err)
}
if ok {
@ -70,6 +71,7 @@ func (mp *MessagePool) CheckReplaceMessages(ctx context.Context, replace []*type
msgMap[m.From] = mmap
mset, ok, err := mp.getPendingMset(ctx, m.From)
if err != nil {
mp.lk.RUnlock()
return nil, xerrors.Errorf("errored while getting pending mset: %w", err)
}
if ok {
@ -153,6 +155,7 @@ func (mp *MessagePool) checkMessages(ctx context.Context, msgs []*types.Message,
mp.lk.RLock()
mset, ok, err := mp.getPendingMset(ctx, m.From)
if err != nil {
mp.lk.RUnlock()
return nil, xerrors.Errorf("errored while getting pending mset: %w", err)
}
if ok && !interned {

View File

@ -72,7 +72,7 @@ func ComputeState(ctx context.Context, sm *StateManager, height abi.ChainEpoch,
base, trace, err := sm.ExecutionTrace(ctx, ts)
if err != nil {
return cid.Undef, nil, err
return cid.Undef, nil, xerrors.Errorf("failed to compute base state: %w", err)
}
for i := ts.Height(); i < height; i++ {
@ -116,6 +116,21 @@ func ComputeState(ctx context.Context, sm *StateManager, height abi.ChainEpoch,
if ret.ExitCode != 0 {
log.Infof("compute state apply message %d failed (exit: %d): %s", i, ret.ExitCode, ret.ActorErr)
}
ir := &api.InvocResult{
MsgCid: msg.Cid(),
Msg: msg,
MsgRct: &ret.MessageReceipt,
ExecutionTrace: ret.ExecutionTrace,
Duration: ret.Duration,
}
if ret.ActorErr != nil {
ir.Error = ret.ActorErr.Error()
}
if ret.GasCosts != nil {
ir.GasCost = MakeMsgGasCost(msg, ret)
}
trace = append(trace, ir)
}
root, err := vmi.Flush(ctx)

View File

@ -2,18 +2,21 @@ package store
import (
"context"
"hash/maphash"
"os"
"strconv"
"sync"
"github.com/puzpuzpuz/xsync/v2"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/lib/shardedmutex"
)
var DefaultChainIndexCacheSize = 32 << 15
// DefaultChainIndexCacheSize no longer sets the maximum size, just the initial size of the map.
var DefaultChainIndexCacheSize = 1 << 15
func init() {
if s := os.Getenv("LOTUS_CHAIN_INDEX_CACHE"); s != "" {
@ -27,8 +30,9 @@ func init() {
}
type ChainIndex struct {
indexCacheLk sync.Mutex
indexCache map[types.TipSetKey]*lbEntry
indexCache *xsync.MapOf[types.TipSetKey, *lbEntry]
fillCacheLock shardedmutex.ShardedMutexFor[types.TipSetKey]
loadTipSet loadTipSetFunc
@ -36,11 +40,16 @@ type ChainIndex struct {
}
type loadTipSetFunc func(context.Context, types.TipSetKey) (*types.TipSet, error)
func maphashTSK(s maphash.Seed, tsk types.TipSetKey) uint64 {
return maphash.Bytes(s, tsk.Bytes())
}
func NewChainIndex(lts loadTipSetFunc) *ChainIndex {
return &ChainIndex{
indexCache: make(map[types.TipSetKey]*lbEntry, DefaultChainIndexCacheSize),
loadTipSet: lts,
skipLength: 20,
indexCache: xsync.NewTypedMapOfPresized[types.TipSetKey, *lbEntry](maphashTSK, DefaultChainIndexCacheSize),
fillCacheLock: shardedmutex.NewFor(maphashTSK, 32),
loadTipSet: lts,
skipLength: 20,
}
}
@ -59,17 +68,23 @@ func (ci *ChainIndex) GetTipsetByHeight(ctx context.Context, from *types.TipSet,
return nil, xerrors.Errorf("failed to round down: %w", err)
}
ci.indexCacheLk.Lock()
defer ci.indexCacheLk.Unlock()
cur := rounded.Key()
for {
lbe, ok := ci.indexCache[cur]
lbe, ok := ci.indexCache.Load(cur) // check the cache
if !ok {
fc, err := ci.fillCache(ctx, cur)
if err != nil {
return nil, xerrors.Errorf("failed to fill cache: %w", err)
lk := ci.fillCacheLock.GetLock(cur)
lk.Lock() // if entry is missing, take the lock
lbe, ok = ci.indexCache.Load(cur) // check if someone else added it while we waited for lock
if !ok {
fc, err := ci.fillCache(ctx, cur)
if err != nil {
lk.Unlock()
return nil, xerrors.Errorf("failed to fill cache: %w", err)
}
lbe = fc
ci.indexCache.Store(cur, lbe)
}
lbe = fc
lk.Unlock()
}
if to == lbe.targetHeight {
@ -137,7 +152,6 @@ func (ci *ChainIndex) fillCache(ctx context.Context, tsk types.TipSetKey) (*lbEn
targetHeight: skipTarget.Height(),
target: skipTarget.Key(),
}
ci.indexCache[tsk] = lbe
return lbe, nil
}

View File

@ -1161,6 +1161,10 @@ func (cs *ChainStore) TryFillTipSet(ctx context.Context, ts *types.TipSet) (*Ful
// selects the tipset before the null round if true, and the tipset following
// the null round if false.
func (cs *ChainStore) GetTipsetByHeight(ctx context.Context, h abi.ChainEpoch, ts *types.TipSet, prev bool) (*types.TipSet, error) {
if h < 0 {
return nil, xerrors.Errorf("height %d is negative", h)
}
if ts == nil {
ts = cs.GetHeaviestTipSet()
}

View File

@ -12,7 +12,7 @@ import (
bserv "github.com/ipfs/go-blockservice"
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
"github.com/ipni/storetheindex/announce/message"
"github.com/ipni/go-libipni/announce/message"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/connmgr"
"github.com/libp2p/go-libp2p/core/peer"
@ -358,6 +358,8 @@ func (mv *MessageValidator) Validate(ctx context.Context, pid peer.ID, msg *pubs
fallthrough
case xerrors.Is(err, messagepool.ErrNonceGap):
fallthrough
case xerrors.Is(err, messagepool.ErrGasFeeCapTooLow):
fallthrough
case xerrors.Is(err, messagepool.ErrNonceTooLow):
return pubsub.ValidationIgnore
default:

View File

@ -9,7 +9,7 @@ import (
"github.com/golang/mock/gomock"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
"github.com/ipni/storetheindex/announce/message"
"github.com/ipni/go-libipni/announce/message"
pubsub "github.com/libp2p/go-libp2p-pubsub"
pb "github.com/libp2p/go-libp2p-pubsub/pb"
"github.com/libp2p/go-libp2p/core/peer"

View File

@ -2,6 +2,7 @@ package types
import (
"bytes"
"fmt"
"github.com/ipfs/go-cid"
)
@ -14,10 +15,13 @@ type BlockMsg struct {
func DecodeBlockMsg(b []byte) (*BlockMsg, error) {
var bm BlockMsg
if err := bm.UnmarshalCBOR(bytes.NewReader(b)); err != nil {
data := bytes.NewReader(b)
if err := bm.UnmarshalCBOR(data); err != nil {
return nil, err
}
if l := data.Len(); l != 0 {
return nil, fmt.Errorf("extraneous data in BlockMsg CBOR encoding: got %d unexpected bytes", l)
}
return &bm, nil
}

View File

@ -0,0 +1,40 @@
package types
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestDecodeBlockMsg(t *testing.T) {
type args struct {
b []byte
}
tests := []struct {
name string
data []byte
want *BlockMsg
wantErr bool
}{
{"decode empty BlockMsg with extra data at the end", []byte{0x83, 0xf6, 0x80, 0x80, 0x20}, nil, true},
{"decode valid empty BlockMsg", []byte{0x83, 0xf6, 0x80, 0x80}, new(BlockMsg), false},
{"decode invalid cbor", []byte{0x83, 0xf6, 0x80}, nil, true},
}
for _, tt := range tests {
data := tt.data
want := tt.want
wantErr := tt.wantErr
t.Run(tt.name, func(t *testing.T) {
got, err := DecodeBlockMsg(data)
if wantErr {
assert.Errorf(t, err, "DecodeBlockMsg(%x)", data)
return
}
assert.NoErrorf(t, err, "DecodeBlockMsg(%x)", data)
assert.Equalf(t, want, got, "DecodeBlockMsg(%x)", data)
serialized, err := got.Serialize()
assert.NoErrorf(t, err, "DecodeBlockMsg(%x)", data)
assert.Equalf(t, serialized, data, "DecodeBlockMsg(%x)", data)
})
}
}

View File

@ -20,6 +20,7 @@ import (
"github.com/filecoin-project/lotus/api/v1api"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/journal/alerting"
)
var infoCmd = &cli.Command{
@ -62,6 +63,21 @@ func infoCmdAct(cctx *cli.Context) error {
fmt.Printf(" [epoch %s]\n", color.MagentaString(("%d"), status.SyncStatus.Epoch))
fmt.Printf("Peers to: [publish messages %d] [publish blocks %d]\n", status.PeerStatus.PeersToPublishMsgs, status.PeerStatus.PeersToPublishBlocks)
alerts, err := fullapi.LogAlerts(ctx)
if err != nil {
fmt.Printf("ERROR: getting alerts: %s\n", err)
}
activeAlerts := make([]alerting.Alert, 0)
for _, alert := range alerts {
if alert.Active {
activeAlerts = append(activeAlerts, alert)
}
}
if len(activeAlerts) > 0 {
fmt.Printf("%s (check %s)\n", color.RedString("⚠ %d Active alerts", len(activeAlerts)), color.YellowString("lotus log alerts"))
}
//Chain health calculated as percentage: amount of blocks in last finality / very healthy amount of blocks in a finality (900 epochs * 5 blocks per tipset)
health := (100 * (900 * status.ChainStatus.BlocksPerTipsetLastFinality) / (900 * 5))
switch {

View File

@ -1528,6 +1528,9 @@ func printMsg(ctx context.Context, api v0api.FullNode, msg cid.Cid, mw *lapi.Msg
if err := printReceiptReturn(ctx, api, m, mw.Receipt); err != nil {
return err
}
if mw.Receipt.EventsRoot != nil {
fmt.Printf("Events Root: %s\n", mw.Receipt.EventsRoot)
}
return nil
}

View File

@ -7,6 +7,7 @@ import (
"net"
"net/http"
"os"
"strings"
"time"
rice "github.com/GeertJohan/go.rice"
@ -15,10 +16,14 @@ import (
"golang.org/x/xerrors"
"github.com/filecoin-project/go-address"
verifregtypes9 "github.com/filecoin-project/go-state-types/builtin/v9/verifreg"
"github.com/filecoin-project/lotus/api/v0api"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/actors"
"github.com/filecoin-project/lotus/chain/actors/builtin/verifreg"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/chain/types/ethtypes"
lcli "github.com/filecoin-project/lotus/cli"
)
@ -70,6 +75,11 @@ var runCmd = &cli.Command{
EnvVars: []string{"LOTUS_FOUNTAIN_AMOUNT"},
Value: "50",
},
&cli.Uint64Flag{
Name: "data-cap",
EnvVars: []string{"LOTUS_DATACAP_AMOUNT"},
Value: verifregtypes9.MinVerifiedDealSize.Uint64(),
},
&cli.Float64Flag{
Name: "captcha-threshold",
Value: 0.5,
@ -108,6 +118,7 @@ var runCmd = &cli.Command{
ctx: ctx,
api: nodeApi,
from: from,
allowance: types.NewInt(cctx.Uint64("data-cap")),
sendPerRequest: sendPerRequest,
limiter: NewLimiter(LimiterConfig{
TotalRate: 500 * time.Millisecond,
@ -124,6 +135,8 @@ var runCmd = &cli.Command{
http.Handle("/", http.FileServer(box.HTTPBox()))
http.HandleFunc("/funds.html", prepFundsHtml(box))
http.Handle("/send", h)
http.HandleFunc("/datacap.html", prepDataCapHtml(box))
http.Handle("/datacap", h)
fmt.Printf("Open http://%s\n", cctx.String("front"))
go func() {
@ -156,12 +169,24 @@ func prepFundsHtml(box *rice.Box) http.HandlerFunc {
}
}
func prepDataCapHtml(box *rice.Box) http.HandlerFunc {
tmpl := template.Must(template.New("datacaps").Parse(box.MustString("datacap.html")))
return func(w http.ResponseWriter, r *http.Request) {
err := tmpl.Execute(w, os.Getenv("RECAPTCHA_SITE_KEY"))
if err != nil {
http.Error(w, err.Error(), http.StatusBadGateway)
return
}
}
}
type handler struct {
ctx context.Context
api v0api.FullNode
from address.Address
sendPerRequest types.FIL
allowance types.BigInt
limiter *Limiter
recapThreshold float64
@ -187,24 +212,41 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
http.Error(w, err.Error(), http.StatusBadGateway)
return
}
if !capResp.Success || capResp.Score < h.recapThreshold {
log.Infow("spam", "capResp", capResp)
http.Error(w, "spam protection", http.StatusUnprocessableEntity)
return
}
to, err := address.NewFromString(r.FormValue("address"))
if err != nil {
addressInput := r.FormValue("address")
var filecoinAddress address.Address
var decodeError error
if strings.HasPrefix(addressInput, "0x") {
ethAddress, err := ethtypes.ParseEthAddress(addressInput)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
filecoinAddress, decodeError = ethAddress.ToFilecoinAddress()
} else {
filecoinAddress, decodeError = address.NewFromString(addressInput)
}
if decodeError != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
if to == address.Undef {
if filecoinAddress == address.Undef {
http.Error(w, "empty address", http.StatusBadRequest)
return
}
// Limit based on wallet address
limiter := h.limiter.GetWalletLimiter(to.String())
limiter := h.limiter.GetWalletLimiter(filecoinAddress.String())
if !limiter.Allow() {
http.Error(w, http.StatusText(http.StatusTooManyRequests)+": wallet limit", http.StatusTooManyRequests)
return
@ -227,11 +269,37 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
}
smsg, err := h.api.MpoolPushMessage(h.ctx, &types.Message{
Value: types.BigInt(h.sendPerRequest),
From: h.from,
To: to,
}, nil)
var smsg *types.SignedMessage
if r.RequestURI == "/send" {
smsg, err = h.api.MpoolPushMessage(
h.ctx, &types.Message{
Value: types.BigInt(h.sendPerRequest),
From: h.from,
To: filecoinAddress,
}, nil)
} else if r.RequestURI == "/datacap" {
var params []byte
params, err = actors.SerializeParams(
&verifregtypes9.AddVerifiedClientParams{
Address: filecoinAddress,
Allowance: h.allowance,
})
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
smsg, err = h.api.MpoolPushMessage(
h.ctx, &types.Message{
Params: params,
From: h.from,
To: verifreg.Address,
Method: verifreg.Methods.AddVerifiedClient,
}, nil)
} else {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return

View File

@ -0,0 +1,41 @@
<!DOCTYPE html>
<html>
<head>
<title>Grant DataCap - Lotus Fountain</title>
<link rel="stylesheet" type="text/css" href="main.css">
<script src="https://www.google.com/recaptcha/api.js"></script>
<script>
function onSubmit(token) {
document.getElementById("datacap-form").submit();
}
</script>
</head>
<body>
<div class="Index">
<div class="Index-nodes">
<div class="Index-node">
<h3>Grant datacap</h3>
<p>Please input your address to receive a data cap on the Calibration Testnet.</p>
</div>
<div class="Index-node">
<form action='/datacap' method='post' id='datacap-form'>
<span>Enter destination address:</span>
<input type='text' name='address' style="width: 300px" placeholder="t0/1/2/3/4 or 0xETH">
<button class="g-recaptcha"
data-sitekey="{{ . }}"
data-callback='onSubmit'
data-action='submit'>Grant Datacap</button>
</form>
</div>
</div>
<div class="Index-footer">
<div>
<a href="index.html">[Back]</a>
<span style="float: right">Not dispensing real Filecoin tokens</span>
</div>
</div>
</div>
</body>
</html>

View File

@ -15,12 +15,13 @@
<div class="Index">
<div class="Index-nodes">
<div class="Index-node">
SENDING FUNDS
<h3>Send funds</h3>
<p>Please input your address to receive test FIL (tFIL) on the Calibration Testnet. This faucet dispenses 100 tFIL.</p>
</div>
<div class="Index-node">
<form action='/send' method='post' id='funds-form'>
<span>Enter destination address:</span>
<input type='text' name='address' style="width: 300px">
<input type='text' name='address' style="width: 300px" placeholder="Enter t0/1/2/3/4 or 0xETH">
<button class="g-recaptcha"
data-sitekey="{{ . }}"
data-callback='onSubmit'

View File

@ -13,6 +13,12 @@
<div class="Index-node">
<a href="funds.html">Send Funds</a>
</div>
<div class="Index-node">
LOTUS DEVNET GRANT DATACAP
</div>
<div class="Index-node">
<a href="datacap.html">Grant DataCap</a>
</div>
</div>
<div class="Index-footer">
<div>

View File

@ -66,7 +66,7 @@ button {
}
button:hover {
background-color: #555;
background-color: #4c7eff;
}
a:link {

View File

@ -5,6 +5,7 @@ import (
"encoding/json"
"errors"
"fmt"
"math"
"os"
"sort"
"strconv"
@ -2056,7 +2057,6 @@ func yesno(b bool) string {
return color.RedString("NO")
}
// TODO simulate this call if --really-do-it is not used
var sectorsCompactPartitionsCmd = &cli.Command{
Name: "compact-partitions",
Usage: "removes dead sectors from partitions and reduces the number of partitions used if possible",
@ -2082,12 +2082,7 @@ var sectorsCompactPartitionsCmd = &cli.Command{
},
},
Action: func(cctx *cli.Context) error {
if !cctx.Bool("really-do-it") {
fmt.Println("Pass --really-do-it to actually execute this action")
return nil
}
api, acloser, err := lcli.GetFullNodeAPI(cctx)
fullNodeAPI, acloser, err := lcli.GetFullNodeAPI(cctx)
if err != nil {
return err
}
@ -2100,7 +2095,7 @@ var sectorsCompactPartitionsCmd = &cli.Command{
return err
}
minfo, err := api.StateMinerInfo(ctx, maddr, types.EmptyTSK)
minfo, err := fullNodeAPI.StateMinerInfo(ctx, maddr, types.EmptyTSK)
if err != nil {
return err
}
@ -2116,46 +2111,118 @@ var sectorsCompactPartitionsCmd = &cli.Command{
}
fmt.Printf("compacting %d paritions\n", len(parts))
var makeMsgForPartitions func(partitionsBf bitfield.BitField) ([]*types.Message, error)
makeMsgForPartitions = func(partitionsBf bitfield.BitField) ([]*types.Message, error) {
params := miner.CompactPartitionsParams{
Deadline: deadline,
Partitions: partitionsBf,
}
sp, aerr := actors.SerializeParams(&params)
if aerr != nil {
return nil, xerrors.Errorf("serializing params: %w", err)
}
msg := &types.Message{
From: minfo.Worker,
To: maddr,
Method: builtin.MethodsMiner.CompactPartitions,
Value: big.Zero(),
Params: sp,
}
estimatedMsg, err := fullNodeAPI.GasEstimateMessageGas(ctx, msg, nil, types.EmptyTSK)
if err != nil && xerrors.Is(err, &api.ErrOutOfGas{}) {
// the message is too big -- split into 2
partitionsSlice, err := partitionsBf.All(math.MaxUint64)
if err != nil {
return nil, err
}
partitions1 := bitfield.New()
for i := 0; i < len(partitionsSlice)/2; i++ {
partitions1.Set(uint64(i))
}
msgs1, err := makeMsgForPartitions(partitions1)
if err != nil {
return nil, err
}
// time for the second half
partitions2 := bitfield.New()
for i := len(partitionsSlice) / 2; i < len(partitionsSlice); i++ {
partitions2.Set(uint64(i))
}
msgs2, err := makeMsgForPartitions(partitions2)
if err != nil {
return nil, err
}
return append(msgs1, msgs2...), nil
} else if err != nil {
return nil, err
}
return []*types.Message{estimatedMsg}, nil
}
partitions := bitfield.New()
for _, partition := range parts {
partitions.Set(uint64(partition))
}
params := miner.CompactPartitionsParams{
Deadline: deadline,
Partitions: partitions,
}
sp, err := actors.SerializeParams(&params)
msgs, err := makeMsgForPartitions(partitions)
if err != nil {
return xerrors.Errorf("serializing params: %w", err)
return xerrors.Errorf("failed to make messages: %w", err)
}
smsg, err := api.MpoolPushMessage(ctx, &types.Message{
From: minfo.Worker,
To: maddr,
Method: builtin.MethodsMiner.CompactPartitions,
Value: big.Zero(),
Params: sp,
}, nil)
if err != nil {
return xerrors.Errorf("mpool push: %w", err)
// Actually send the messages if really-do-it provided, simulate otherwise
if cctx.Bool("really-do-it") {
smsgs, err := fullNodeAPI.MpoolBatchPushMessage(ctx, msgs, nil)
if err != nil {
return xerrors.Errorf("mpool push: %w", err)
}
if len(smsgs) == 1 {
fmt.Printf("Requested compact partitions in message %s\n", smsgs[0].Cid())
} else {
fmt.Printf("Requested compact partitions in %d messages\n\n", len(smsgs))
for _, v := range smsgs {
fmt.Println(v.Cid())
}
}
for _, v := range smsgs {
wait, err := fullNodeAPI.StateWaitMsg(ctx, v.Cid(), 2)
if err != nil {
return err
}
// check it executed successfully
if wait.Receipt.ExitCode.IsError() {
fmt.Println(cctx.App.Writer, "compact partitions msg %s failed!", v.Cid())
return err
}
}
return nil
}
fmt.Printf("Requested compact partitions in message %s\n", smsg.Cid())
for i, v := range msgs {
fmt.Printf("total of %d CompactPartitions msgs would be sent\n", len(msgs))
wait, err := api.StateWaitMsg(ctx, smsg.Cid(), 0)
if err != nil {
return err
}
estMsg, err := fullNodeAPI.GasEstimateMessageGas(ctx, v, nil, types.EmptyTSK)
if err != nil {
return err
}
// check it executed successfully
if wait.Receipt.ExitCode.IsError() {
fmt.Println(cctx.App.Writer, "compact partitions failed!")
return err
fmt.Printf("msg %d would cost up to %s\n", i+1, types.FIL(estMsg.RequiredFunds()))
}
return nil
},
}

View File

@ -225,6 +225,7 @@ var storageRedeclareCmd = &cli.Command{
&cli.BoolFlag{
Name: "drop-missing",
Usage: "Drop index entries with missing files",
Value: true,
},
},
Action: func(cctx *cli.Context) error {
@ -235,14 +236,19 @@ var storageRedeclareCmd = &cli.Command{
defer closer()
ctx := lcli.ReqContext(cctx)
if cctx.NArg() != 1 {
return lcli.IncorrectNumArgs(cctx)
// check if no argument and no --id or --all flag is provided
if cctx.NArg() == 0 && !cctx.IsSet("id") && !cctx.Bool("all") {
return xerrors.Errorf("You must specify a storage path, or --id, or --all")
}
if cctx.IsSet("id") && cctx.Bool("all") {
return xerrors.Errorf("--id and --all can't be passed at the same time")
}
if cctx.Bool("all") && cctx.NArg() > 0 {
return xerrors.Errorf("No additional arguments are expected when --all is set")
}
if cctx.IsSet("id") {
id := storiface.ID(cctx.String("id"))
return minerApi.StorageRedeclareLocal(ctx, &id, cctx.Bool("drop-missing"))
@ -252,7 +258,28 @@ var storageRedeclareCmd = &cli.Command{
return minerApi.StorageRedeclareLocal(ctx, nil, cctx.Bool("drop-missing"))
}
return xerrors.Errorf("either --all or --id must be specified")
// As no --id or --all flag is set, we can assume the argument is a path.
path := cctx.Args().First()
metaFilePath := filepath.Join(path, "sectorstore.json")
var meta storiface.LocalStorageMeta
metaFile, err := os.Open(metaFilePath)
if err != nil {
return xerrors.Errorf("Failed to open file: %w", err)
}
defer func() {
if closeErr := metaFile.Close(); closeErr != nil {
log.Error("Failed to close the file: %v", closeErr)
}
}()
err = json.NewDecoder(metaFile).Decode(&meta)
if err != nil {
return xerrors.Errorf("Failed to decode file: %w", err)
}
id := meta.ID
return minerApi.StorageRedeclareLocal(ctx, &id, cctx.Bool("drop-missing"))
},
}

View File

@ -1,8 +1,10 @@
package main
import (
"context"
"fmt"
"os"
"os/signal"
logging "github.com/ipfs/go-log/v2"
"github.com/urfave/cli/v2"
@ -118,7 +120,20 @@ func main() {
},
}
if err := app.Run(os.Args); err != nil {
// terminate early on ctrl+c
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
ctx, cancel := context.WithCancel(context.Background())
go func() {
<-c
cancel()
fmt.Println("Received interrupt, shutting down... Press CTRL+C again to force shutdown")
<-c
fmt.Println("Forcing stop")
os.Exit(1)
}()
if err := app.RunContext(ctx, os.Args); err != nil {
log.Errorf("%+v", err)
os.Exit(1)
return

View File

@ -474,7 +474,7 @@ var verifRegRemoveVerifiedClientDataCapCmd = &cli.Command{
st, err := multisig.Load(store, vrkState)
if err != nil {
return err
return fmt.Errorf("load vrk failed: %w ", err)
}
signers, err := st.Signers()
@ -508,14 +508,13 @@ var verifRegRemoveVerifiedClientDataCapCmd = &cli.Command{
return err
}
sm, _, err := srv.PublishMessage(ctx, proto, false)
sm, err := lcli.InteractiveSend(ctx, cctx, srv, proto)
if err != nil {
return err
}
msgCid := sm.Cid()
fmt.Printf("message sent, now waiting on cid: %s\n", msgCid)
fmt.Println("sending msg: ", msgCid)
mwait, err := api.StateWaitMsg(ctx, msgCid, uint64(cctx.Int("confidence")), build.Finality, true)
if err != nil {

View File

@ -178,6 +178,7 @@ var storageRedeclareCmd = &cli.Command{
&cli.BoolFlag{
Name: "drop-missing",
Usage: "Drop index entries with missing files",
Value: true,
},
},
Action: func(cctx *cli.Context) error {
@ -188,10 +189,19 @@ var storageRedeclareCmd = &cli.Command{
defer closer()
ctx := lcli.ReqContext(cctx)
// check if no argument and no --id or --all flag is provided
if cctx.NArg() == 0 && !cctx.IsSet("id") && !cctx.Bool("all") {
return xerrors.Errorf("You must specify a storage path, or --id, or --all")
}
if cctx.IsSet("id") && cctx.Bool("all") {
return xerrors.Errorf("--id and --all can't be passed at the same time")
}
if cctx.Bool("all") && cctx.NArg() > 0 {
return xerrors.Errorf("No additional arguments are expected when --all is set")
}
if cctx.IsSet("id") {
id := storiface.ID(cctx.String("id"))
return nodeApi.StorageRedeclareLocal(ctx, &id, cctx.Bool("drop-missing"))
@ -201,6 +211,27 @@ var storageRedeclareCmd = &cli.Command{
return nodeApi.StorageRedeclareLocal(ctx, nil, cctx.Bool("drop-missing"))
}
return xerrors.Errorf("either --all or --id must be specified")
// As no --id or --all flag is set, we can assume the argument is a path.
path := cctx.Args().First()
metaFilePath := filepath.Join(path, "sectorstore.json")
var meta storiface.LocalStorageMeta
metaFile, err := os.Open(metaFilePath)
if err != nil {
return xerrors.Errorf("Failed to open file: %w", err)
}
defer func() {
if closeErr := metaFile.Close(); closeErr != nil {
log.Error("Failed to close the file: %v", closeErr)
}
}()
err = json.NewDecoder(metaFile).Decode(&meta)
if err != nil {
return xerrors.Errorf("Failed to decode file: %w", err)
}
id := meta.ID
return nodeApi.StorageRedeclareLocal(ctx, &id, cctx.Bool("drop-missing"))
},
}

View File

@ -1285,7 +1285,7 @@ USAGE:
OPTIONS:
--all redeclare all storage paths (default: false)
--drop-missing Drop index entries with missing files (default: false)
--drop-missing Drop index entries with missing files (default: true)
--id value storage path ID
```

View File

@ -148,7 +148,7 @@ USAGE:
OPTIONS:
--all redeclare all storage paths (default: false)
--drop-missing Drop index entries with missing files (default: false)
--drop-missing Drop index entries with missing files (default: true)
--id value storage path ID
```

View File

@ -2,8 +2,8 @@
The gas balancing process targets to set gas costs of syscalls to be in line with
10 gas per nanosecond on reference hardware.
The process can be either performed for all syscalls based on existing messages and chain or targeted
at single syscall.
The process can be either performed for all syscalls based on existing messages and chains or targeted
at a single syscall.
#### Reference hardware
@ -12,14 +12,14 @@ may be subject to change.
### Complete gas balancing
Complete gas balancing is performed using `lotus-bench` the process is based on importing a chain export
Complete gas balancing is performed using a `lotus-bench` the process is based on importing a chain export
and collecting gas traces which are later aggregated.
Before building `lotus-bench` make sure `EnableDetailedTracing` in `chain/vm/runtime.go` is set to `true`.
The process can be started using `./lotus-bench import` with `--car` flag set to the location of
CAR chain export. `--start-epoch` and `--end-epoch` can be used to to limit the range of epochs to run
the benchmark. Note that state tree of `start-epoch` needs to be in the CAR file or has to be previously computed
CAR chain export. `--start-epoch` and `--end-epoch` can be used to limit the range of epochs to run
the benchmark. Note that the state tree of `start-epoch` needs to be in the CAR file or has to be previously computed
to work.
The output will be a `bench.json` file containing information about every syscall invoked
@ -29,7 +29,7 @@ spare space.
After the bench run is complete the `bench.json` file can be analyzed with `./lotus-bench import analyze bench.json`.
It will compute means, standard deviations and co-variances (when applicable) of syscall runtimes.
The output is in nanoseconds, so the gas values for syscalls should be 10x that. In cases where co-variance of
The output is in nanoseconds, so the gas values for syscalls should be 10x that. In cases where the co-variance of
execution time to some parameter is evaluated, the strength of the correlation should be taken into account.
#### Special cases
@ -40,15 +40,15 @@ during block execution (when gas traces are formed) objects are only written to
### Targeted gas balancing
In some cases complete gas balancing is infeasible, either new syscall gets introduced or
In some cases complete gas balancing is infeasible, either a new syscall gets introduced or
complete balancing is too time consuming.
In these cases the recommended way to estimate gas for given syscall is to perform an `in-vivo` benchmark.
In these cases, the recommended way to estimate gas for a given syscall is to perform an `in-vivo` benchmark.
In the past `in-vitro` as in standalone benchmarks were found to be highly inaccurate when compared to results
of real execution.
A in-vivo benchmark can be performed by running an example of such syscall during block execution.
The best place to hook-in such benchmark is message execution loop in
`chain/stmgr/stmgr.go` in `ApplyBlocks()`. Depending of time required to complete the syscall it might be
An in-vivo benchmark can be performed by running an example of such a syscall during block execution.
The best place to hook-in such a benchmark is the message execution loop in
`chain/stmgr/stmgr.go` in `ApplyBlocks()`. Depending on the time required to complete the syscall it might be
advisable to run the execution only once every few messages.

10
go.mod
View File

@ -38,7 +38,7 @@ require (
github.com/filecoin-project/go-cbor-util v0.0.1
github.com/filecoin-project/go-commp-utils v0.1.3
github.com/filecoin-project/go-crypto v0.0.1
github.com/filecoin-project/go-data-transfer/v2 v2.0.0-rc6
github.com/filecoin-project/go-data-transfer/v2 v2.0.0-rc7
github.com/filecoin-project/go-fil-commcid v0.1.0
github.com/filecoin-project/go-fil-commp-hashhash v0.1.0
github.com/filecoin-project/go-fil-markets v1.28.2-0.20230530134621-3f0a6701a8fe
@ -87,7 +87,7 @@ require (
github.com/ipfs/go-ds-leveldb v0.5.0
github.com/ipfs/go-ds-measure v0.2.0
github.com/ipfs/go-fs-lock v0.0.7
github.com/ipfs/go-graphsync v0.14.5
github.com/ipfs/go-graphsync v0.14.6
github.com/ipfs/go-ipfs-blockstore v1.3.0
github.com/ipfs/go-ipfs-blocksutil v0.0.1
github.com/ipfs/go-ipfs-chunker v0.0.5
@ -111,8 +111,8 @@ require (
github.com/ipld/go-codec-dagpb v1.6.0
github.com/ipld/go-ipld-prime v0.20.0
github.com/ipld/go-ipld-selector-text-lite v0.0.1
github.com/ipni/go-libipni v0.0.8
github.com/ipni/index-provider v0.12.0
github.com/ipni/storetheindex v0.5.10
github.com/kelseyhightower/envconfig v1.4.0
github.com/koalacxr/quantile v0.0.1
github.com/libp2p/go-buffer-pool v0.1.0
@ -139,6 +139,7 @@ require (
github.com/open-rpc/meta-schema v0.0.0-20201029221707-1b72ef2ea333
github.com/polydawn/refmt v0.89.0
github.com/prometheus/client_golang v1.14.0
github.com/puzpuzpuz/xsync/v2 v2.4.0
github.com/raulk/clock v1.1.0
github.com/raulk/go-watchdog v1.3.0
github.com/stretchr/testify v1.8.2
@ -249,7 +250,6 @@ require (
github.com/ipfs/go-peertaskqueue v0.8.1 // indirect
github.com/ipfs/go-verifcid v0.0.2 // indirect
github.com/ipld/go-ipld-adl-hamt v0.0.0-20220616142416-9004dbd839e0 // indirect
github.com/ipni/go-libipni v0.0.7 // indirect
github.com/ipsn/go-secp256k1 v0.0.0-20180726113642-9d62b9f0bc52 // indirect
github.com/jackpal/go-nat-pmp v1.0.2 // indirect
github.com/jbenet/go-random v0.0.0-20190219211222-123a90aedc0c // indirect
@ -286,7 +286,7 @@ require (
github.com/mr-tron/base58 v1.2.0 // indirect
github.com/multiformats/go-base36 v0.2.0 // indirect
github.com/multiformats/go-multiaddr-fmt v0.1.0 // indirect
github.com/multiformats/go-multicodec v0.8.1 // indirect
github.com/multiformats/go-multicodec v0.9.0 // indirect
github.com/multiformats/go-multistream v0.4.1 // indirect
github.com/nikkolasg/hexjson v0.1.0 // indirect
github.com/nkovacs/streamquote v1.0.0 // indirect

20
go.sum
View File

@ -312,8 +312,8 @@ github.com/filecoin-project/go-commp-utils/nonffi v0.0.0-20220905160352-62059082
github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03/go.mod h1:+viYnvGtUTgJRdy6oaeF4MTFKAfatX071MPDPBL11EQ=
github.com/filecoin-project/go-crypto v0.0.1 h1:AcvpSGGCgjaY8y1az6AMfKQWreF/pWO2JJGLl6gCq6o=
github.com/filecoin-project/go-crypto v0.0.1/go.mod h1:+viYnvGtUTgJRdy6oaeF4MTFKAfatX071MPDPBL11EQ=
github.com/filecoin-project/go-data-transfer/v2 v2.0.0-rc6 h1:EsbXTWsBKT764qtX4MMJBNXMHoEa+g5Xg01azMqxXX0=
github.com/filecoin-project/go-data-transfer/v2 v2.0.0-rc6/go.mod h1:cX1acvFVWC5EXnnmFPWEFXbO7nLUdSZa+nqgi1QpTpw=
github.com/filecoin-project/go-data-transfer/v2 v2.0.0-rc7 h1:v+zJS5B6pA3ptWZS4t8tbt1Hz9qENnN4nVr1w99aSWc=
github.com/filecoin-project/go-data-transfer/v2 v2.0.0-rc7/go.mod h1:V3Y4KbttaCwyg1gwkP7iai8CbQx4mZUGjd3h9GZWLKE=
github.com/filecoin-project/go-ds-versioning v0.1.2 h1:to4pTadv3IeV1wvgbCbN6Vqd+fu+7tveXgv/rCEZy6w=
github.com/filecoin-project/go-ds-versioning v0.1.2/go.mod h1:C9/l9PnB1+mwPa26BBVpCjG/XQCB0yj/q5CK2J8X1I4=
github.com/filecoin-project/go-fil-commcid v0.0.0-20201016201715-d41df56b4f6a/go.mod h1:Eaox7Hvus1JgPrL5+M3+h7aSPHc0cVqpSxA+TxIEpZQ=
@ -738,8 +738,8 @@ github.com/ipfs/go-filestore v1.2.0/go.mod h1:HLJrCxRXquTeEEpde4lTLMaE/MYJZD7WHL
github.com/ipfs/go-fs-lock v0.0.6/go.mod h1:OTR+Rj9sHiRubJh3dRhD15Juhd/+w6VPOY28L7zESmM=
github.com/ipfs/go-fs-lock v0.0.7 h1:6BR3dajORFrFTkb5EpCUFIAypsoxpGpDSVUdFwzgL9U=
github.com/ipfs/go-fs-lock v0.0.7/go.mod h1:Js8ka+FNYmgQRLrRXzU3CB/+Csr1BwrRilEcvYrHhhc=
github.com/ipfs/go-graphsync v0.14.5 h1:SKQog4ZABe+yy7OtTsrMoSQfFmEPPi0qP5sl6bFN8xM=
github.com/ipfs/go-graphsync v0.14.5/go.mod h1:yT0AfjFgicOoWdAlUJ96tQ5AkuGI4r1taIQX/aHbBQo=
github.com/ipfs/go-graphsync v0.14.6 h1:NPxvuUy4Z08Mg8dwpBzwgbv/PGLIufSJ1sle6iAX8yo=
github.com/ipfs/go-graphsync v0.14.6/go.mod h1:yT0AfjFgicOoWdAlUJ96tQ5AkuGI4r1taIQX/aHbBQo=
github.com/ipfs/go-ipfs-blockstore v0.0.1/go.mod h1:d3WClOmRQKFnJ0Jz/jj/zmksX0ma1gROTlovZKBmN08=
github.com/ipfs/go-ipfs-blockstore v0.1.0/go.mod h1:5aD0AvHPi7mZc6Ci1WCAhiBQu2IsfTduLl+422H6Rqw=
github.com/ipfs/go-ipfs-blockstore v0.2.1/go.mod h1:jGesd8EtCM3/zPgx+qr0/feTXGUeRai6adgwC+Q+JvE=
@ -885,12 +885,10 @@ github.com/ipld/go-ipld-prime/storage/bsadapter v0.0.0-20211210234204-ce2a1c70cd
github.com/ipld/go-ipld-prime/storage/bsadapter v0.0.0-20230102063945-1a409dc236dd h1:gMlw/MhNr2Wtp5RwGdsW23cs+yCuj9k2ON7i9MiJlRo=
github.com/ipld/go-ipld-selector-text-lite v0.0.1 h1:lNqFsQpBHc3p5xHob2KvEg/iM5dIFn6iw4L/Hh+kS1Y=
github.com/ipld/go-ipld-selector-text-lite v0.0.1/go.mod h1:U2CQmFb+uWzfIEF3I1arrDa5rwtj00PrpiwwCO+k1RM=
github.com/ipni/go-libipni v0.0.7 h1:L0AnFQagedfJU5mJ7kz0H8P1452brJveOQeS6p3MmbA=
github.com/ipni/go-libipni v0.0.7/go.mod h1:TlGZaGMGIVpeb6fiwttfY1JgaMnH+HDVPzxgRJJPaQY=
github.com/ipni/go-libipni v0.0.8 h1:0wLfZRSBG84swmZwmaLKul/iB/FlBkkl9ZcR1ub+Z+w=
github.com/ipni/go-libipni v0.0.8/go.mod h1:paYP9U4N3/vOzGCuN9kU972vtvw9JUcQjOKyiCFGwRk=
github.com/ipni/index-provider v0.12.0 h1:R3F6dxxKNv4XkE4GJZNLOG0bDEbBQ/S5iztXwSD8jhQ=
github.com/ipni/index-provider v0.12.0/go.mod h1:GhyrADJp7n06fqoc1djzkvL4buZYHzV8SoWrlxEo5F4=
github.com/ipni/storetheindex v0.5.10 h1:r97jIZsXPuwQvePJQuStu2a/kn+Zn8X4MAdA0rU2Pu4=
github.com/ipni/storetheindex v0.5.10/go.mod h1:SJKFCnSx4X/4ekQuZvq8pVU/7tmxkEv632Qmgu3m2bQ=
github.com/ipsn/go-secp256k1 v0.0.0-20180726113642-9d62b9f0bc52 h1:QG4CGBqCeuBo6aZlGAamSkxWdgWfZGeE49eUOWJPA4c=
github.com/ipsn/go-secp256k1 v0.0.0-20180726113642-9d62b9f0bc52/go.mod h1:fdg+/X9Gg4AsAIzWpEHwnqd+QY3b7lajxyjE1m4hkq4=
github.com/jackpal/gateway v1.0.5/go.mod h1:lTpwd4ACLXmpyiCTRtfiNyVnUmqT9RivzCDQetPfnjA=
@ -1327,8 +1325,8 @@ github.com/multiformats/go-multibase v0.2.0/go.mod h1:bFBZX4lKCA/2lyOFSAoKH5SS6o
github.com/multiformats/go-multicodec v0.3.0/go.mod h1:qGGaQmioCDh+TeFOnxrbU0DaIPw8yFgAZgFG0V7p1qQ=
github.com/multiformats/go-multicodec v0.3.1-0.20210902112759-1539a079fd61/go.mod h1:1Hj/eHRaVWSXiSNNfcEPcwZleTmdNP81xlxDLnWU9GQ=
github.com/multiformats/go-multicodec v0.6.0/go.mod h1:GUC8upxSBE4oG+q3kWZRw/+6yC1BqO550bjhWsJbZlw=
github.com/multiformats/go-multicodec v0.8.1 h1:ycepHwavHafh3grIbR1jIXnKCsFm0fqsfEOsJ8NtKE8=
github.com/multiformats/go-multicodec v0.8.1/go.mod h1:L3QTQvMIaVBkXOXXtVmYE+LI16i14xuaojr/H7Ai54k=
github.com/multiformats/go-multicodec v0.9.0 h1:pb/dlPnzee/Sxv/j4PmkDRxCOi3hXTz3IbPKOXWJkmg=
github.com/multiformats/go-multicodec v0.9.0/go.mod h1:L3QTQvMIaVBkXOXXtVmYE+LI16i14xuaojr/H7Ai54k=
github.com/multiformats/go-multihash v0.0.1/go.mod h1:w/5tugSrLEbWqlcgJabL3oHFKTwfvkofsjW2Qa1ct4U=
github.com/multiformats/go-multihash v0.0.5/go.mod h1:lt/HCbqlQwlPBz7lv0sQCdtfcMtlJvakRUn/0Ual8po=
github.com/multiformats/go-multihash v0.0.8/go.mod h1:YSLudS+Pi8NHE7o6tb3D8vrpKa63epEDmG8nTduyAew=
@ -1480,6 +1478,8 @@ github.com/prometheus/procfs v0.9.0 h1:wzCHvIvM5SxWqYvwgVL7yJY8Lz3PKn49KQtpgMYJf
github.com/prometheus/procfs v0.9.0/go.mod h1:+pB4zwohETzFnmlpe6yd2lSc+0/46IYZRB/chUwxUZY=
github.com/prometheus/statsd_exporter v0.21.0 h1:hA05Q5RFeIjgwKIYEdFd59xu5Wwaznf33yKI+pyX6T8=
github.com/prometheus/statsd_exporter v0.21.0/go.mod h1:rbT83sZq2V+p73lHhPZfMc3MLCHmSHelCh9hSGYNLTQ=
github.com/puzpuzpuz/xsync/v2 v2.4.0 h1:5sXAMHrtx1bg9nbRZTOn8T4MkWe5V+o8yKRH02Eznag=
github.com/puzpuzpuz/xsync/v2 v2.4.0/go.mod h1:gD2H2krq/w52MfPLE+Uy64TzJDVY7lP2znR9qmR35kU=
github.com/quic-go/qpack v0.4.0 h1:Cr9BXA1sQS2SmDUWjSofMPNKmvF6IiIfDRmgU0w1ZCo=
github.com/quic-go/qpack v0.4.0/go.mod h1:UZVnYIfi5GRk+zI9UMaCPsmZ2xKJP7XBUvVyT1Knj9A=
github.com/quic-go/qtls-go1-19 v0.3.2 h1:tFxjCFcTQzK+oMxG6Zcvp4Dq8dx4yD3dDiIiyc86Z5U=

View File

@ -236,7 +236,7 @@ func (bm *BlockMiner) MineBlocksMustPost(ctx context.Context, blocktime time.Dur
break
}
require.NotEqual(bm.t, i, nloops-1, "block never managed to sync to node")
require.NotEqual(bm.t, i, nloops-1, "block at height %d never managed to sync to node, which is at height %d", target, ts.Height())
time.Sleep(time.Millisecond * 10)
}
@ -348,7 +348,7 @@ func (bm *BlockMiner) MineUntilBlock(ctx context.Context, fn *TestFullNode, cb f
break
}
require.NotEqual(bm.t, i, nloops-1, "block never managed to sync to node")
require.NotEqual(bm.t, i, nloops-1, "block at height %d never managed to sync to node, which is at height %d", epoch, ts.Height())
time.Sleep(time.Millisecond * 10)
}

View File

@ -39,7 +39,9 @@ func TestSectorImportAfterPC2(t *testing.T) {
////////
// Start a miner node
client, miner, ens := kit.EnsembleMinimal(t, kit.ThroughRPC())
// We use two miners so that in case the actively tested miner misses PoSt, we still have a blockchain
client, miner, _, ens := kit.EnsembleOneTwo(t, kit.ThroughRPC())
ens.InterconnectAll().BeginMining(blockTime)
ctx := context.Background()

View File

@ -1,6 +1,7 @@
package itests
import (
"bytes"
"context"
"strings"
"sync/atomic"
@ -13,13 +14,17 @@ import (
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
miner11 "github.com/filecoin-project/go-state-types/builtin/v11/miner"
"github.com/filecoin-project/go-state-types/network"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/itests/kit"
"github.com/filecoin-project/lotus/node"
"github.com/filecoin-project/lotus/node/config"
"github.com/filecoin-project/lotus/node/impl"
"github.com/filecoin-project/lotus/node/modules"
"github.com/filecoin-project/lotus/node/repo"
"github.com/filecoin-project/lotus/storage/paths"
sealing "github.com/filecoin-project/lotus/storage/pipeline"
@ -500,3 +505,228 @@ func TestWorkerName(t *testing.T) {
require.True(t, found)
}
// Tests that V1_1 proofs on post workers with faults
func TestWindowPostV1P1NV20WorkerFault(t *testing.T) {
kit.QuietMiningLogs()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
blocktime := 2 * time.Millisecond
sectors := 2 * 48 * 2
var badsector uint64 = 100000
client, miner, _, ens := kit.EnsembleWorker(t,
kit.PresealSectors(sectors), // 2 sectors per partition, 2 partitions in all 48 deadlines
kit.GenesisNetworkVersion(network.Version20),
kit.ConstructorOpts(
node.Override(new(config.ProvingConfig), func() config.ProvingConfig {
c := config.DefaultStorageMiner()
c.Proving.DisableBuiltinWindowPoSt = true
return c.Proving
}),
node.Override(new(*wdpost.WindowPoStScheduler), modules.WindowPostScheduler(
config.DefaultStorageMiner().Fees,
config.ProvingConfig{
DisableBuiltinWindowPoSt: true,
DisableBuiltinWinningPoSt: false,
DisableWDPoStPreChecks: false,
},
)),
node.Override(new(paths.Store), func(store *paths.Remote) paths.Store {
return &badWorkerStorage{
Store: store,
badsector: &badsector,
notBadCount: 1,
}
})),
kit.ThroughRPC(),
kit.WithTaskTypes([]sealtasks.TaskType{sealtasks.TTGenerateWindowPoSt}),
kit.WithWorkerStorage(func(store paths.Store) paths.Store {
return &badWorkerStorage{
Store: store,
badsector: &badsector,
}
}))
bm := ens.InterconnectAll().BeginMining(blocktime)[0]
maddr, err := miner.ActorAddress(ctx)
require.NoError(t, err)
// wait for sectors to be committed
require.Eventually(t, func() bool {
di, err := client.StateMinerProvingDeadline(ctx, maddr, types.EmptyTSK)
require.NoError(t, err)
parts, err := client.StateMinerPartitions(ctx, maddr, di.Index, types.EmptyTSK)
require.NoError(t, err)
return len(parts) > 1
}, 30*time.Second, 100*time.Millisecond)
// Wait until just before a deadline opens
{
di, err := client.StateMinerProvingDeadline(ctx, maddr, types.EmptyTSK)
require.NoError(t, err)
di = di.NextNotElapsed()
t.Log("Running one proving period")
waitUntil := di.Open + di.WPoStChallengeWindow - di.WPoStChallengeLookback - 1
client.WaitTillChain(ctx, kit.HeightAtLeast(waitUntil))
t.Log("Waiting for post message")
bm.Stop()
}
// Remove one sector in the next deadline (so it's skipped)
{
di, err := client.StateMinerProvingDeadline(ctx, maddr, types.EmptyTSK)
require.NoError(t, err)
parts, err := client.StateMinerPartitions(ctx, maddr, di.Index+1, types.EmptyTSK)
require.NoError(t, err)
require.Greater(t, len(parts), 0)
secs := parts[0].AllSectors
n, err := secs.Count()
require.NoError(t, err)
require.Equal(t, uint64(2), n)
// Drop the sector in first partition
sid, err := secs.First()
require.NoError(t, err)
t.Logf("Drop sector %d; dl %d part %d", sid, di.Index, 0)
atomic.StoreUint64(&badsector, sid)
require.NoError(t, err)
}
bm.MineBlocksMustPost(ctx, 2*time.Millisecond)
mi, err := client.StateMinerInfo(ctx, maddr, types.EmptyTSK)
require.NoError(t, err)
wact, err := client.StateGetActor(ctx, mi.Worker, types.EmptyTSK)
require.NoError(t, err)
en := wact.Nonce
// wait for a new message to be sent from worker address, it will be a PoSt
waitForProof:
for {
//stm: @CHAIN_STATE_GET_ACTOR_001
wact, err := client.StateGetActor(ctx, mi.Worker, types.EmptyTSK)
require.NoError(t, err)
if wact.Nonce > en {
break waitForProof
}
build.Clock.Sleep(blocktime)
}
slm, err := client.StateListMessages(ctx, &api.MessageMatch{To: maddr}, types.EmptyTSK, 0)
require.NoError(t, err)
pmr, err := client.StateSearchMsg(ctx, types.EmptyTSK, slm[0], -1, false)
require.NoError(t, err)
nv, err := client.StateNetworkVersion(ctx, pmr.TipSet)
require.NoError(t, err)
require.Equal(t, network.Version20, nv)
require.True(t, pmr.Receipt.ExitCode.IsSuccess())
slmsg, err := client.ChainGetMessage(ctx, slm[0])
require.NoError(t, err)
var params miner11.SubmitWindowedPoStParams
require.NoError(t, params.UnmarshalCBOR(bytes.NewBuffer(slmsg.Params)))
require.Equal(t, abi.RegisteredPoStProof_StackedDrgWindow2KiBV1_1, params.Proofs[0].PoStProof)
require.Len(t, params.Partitions, 2)
sc0, err := params.Partitions[0].Skipped.Count()
require.NoError(t, err)
require.Equal(t, uint64(1), sc0)
sc1, err := params.Partitions[1].Skipped.Count()
require.NoError(t, err)
require.Equal(t, uint64(0), sc1)
}
// Tests that V1_1 proofs on post worker
func TestWindowPostV1P1NV20Worker(t *testing.T) {
kit.QuietMiningLogs()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
blocktime := 2 * time.Millisecond
client, miner, _, ens := kit.EnsembleWorker(t,
kit.GenesisNetworkVersion(network.Version20),
kit.ConstructorOpts(
node.Override(new(config.ProvingConfig), func() config.ProvingConfig {
c := config.DefaultStorageMiner()
c.Proving.DisableBuiltinWindowPoSt = true
return c.Proving
}),
node.Override(new(*wdpost.WindowPoStScheduler), modules.WindowPostScheduler(
config.DefaultStorageMiner().Fees,
config.ProvingConfig{
DisableBuiltinWindowPoSt: true,
DisableBuiltinWinningPoSt: false,
DisableWDPoStPreChecks: false,
},
))),
kit.ThroughRPC(),
kit.WithTaskTypes([]sealtasks.TaskType{sealtasks.TTGenerateWindowPoSt}))
ens.InterconnectAll().BeginMining(blocktime)
maddr, err := miner.ActorAddress(ctx)
require.NoError(t, err)
mi, err := client.StateMinerInfo(ctx, maddr, types.EmptyTSK)
require.NoError(t, err)
wact, err := client.StateGetActor(ctx, mi.Worker, types.EmptyTSK)
require.NoError(t, err)
en := wact.Nonce
// wait for a new message to be sent from worker address, it will be a PoSt
waitForProof:
for {
//stm: @CHAIN_STATE_GET_ACTOR_001
wact, err := client.StateGetActor(ctx, mi.Worker, types.EmptyTSK)
require.NoError(t, err)
if wact.Nonce > en {
break waitForProof
}
build.Clock.Sleep(blocktime)
}
slm, err := client.StateListMessages(ctx, &api.MessageMatch{To: maddr}, types.EmptyTSK, 0)
require.NoError(t, err)
pmr, err := client.StateSearchMsg(ctx, types.EmptyTSK, slm[0], -1, false)
require.NoError(t, err)
nv, err := client.StateNetworkVersion(ctx, pmr.TipSet)
require.NoError(t, err)
require.Equal(t, network.Version20, nv)
require.True(t, pmr.Receipt.ExitCode.IsSuccess())
slmsg, err := client.ChainGetMessage(ctx, slm[0])
require.NoError(t, err)
var params miner11.SubmitWindowedPoStParams
require.NoError(t, params.UnmarshalCBOR(bytes.NewBuffer(slmsg.Params)))
require.Equal(t, abi.RegisteredPoStProof_StackedDrgWindow2KiBV1_1, params.Proofs[0].PoStProof)
}

View File

@ -0,0 +1,75 @@
package shardedmutex
import (
"hash/maphash"
"sync"
)
const cacheline = 64
// padding a mutex to a cacheline improves performance as the cachelines are not contested
// name old time/op new time/op delta
// Locks-8 74.6ns ± 7% 12.3ns ± 2% -83.54% (p=0.000 n=20+18)
type paddedMutex struct {
mt sync.Mutex
pad [cacheline - 8]uint8
}
type ShardedMutex struct {
shards []paddedMutex
}
// New creates a new ShardedMutex with N shards
func New(nShards int) ShardedMutex {
if nShards < 1 {
panic("n_shards cannot be less than 1")
}
return ShardedMutex{
shards: make([]paddedMutex, nShards),
}
}
func (sm ShardedMutex) Shards() int {
return len(sm.shards)
}
func (sm ShardedMutex) Lock(shard int) {
sm.shards[shard].mt.Lock()
}
func (sm ShardedMutex) Unlock(shard int) {
sm.shards[shard].mt.Unlock()
}
func (sm ShardedMutex) GetLock(shard int) sync.Locker {
return &sm.shards[shard].mt
}
type ShardedMutexFor[K any] struct {
inner ShardedMutex
hasher func(maphash.Seed, K) uint64
seed maphash.Seed
}
func NewFor[K any](hasher func(maphash.Seed, K) uint64, nShards int) ShardedMutexFor[K] {
return ShardedMutexFor[K]{
inner: New(nShards),
hasher: hasher,
seed: maphash.MakeSeed(),
}
}
func (sm ShardedMutexFor[K]) shardFor(key K) int {
return int(sm.hasher(sm.seed, key) % uint64(len(sm.inner.shards)))
}
func (sm ShardedMutexFor[K]) Lock(key K) {
sm.inner.Lock(sm.shardFor(key))
}
func (sm ShardedMutexFor[K]) Unlock(key K) {
sm.inner.Unlock(sm.shardFor(key))
}
func (sm ShardedMutexFor[K]) GetLock(key K) sync.Locker {
return sm.inner.GetLock(sm.shardFor(key))
}

View File

@ -0,0 +1,159 @@
package shardedmutex
import (
"fmt"
"hash/maphash"
"runtime"
"sync"
"sync/atomic"
"testing"
"time"
)
func TestLockingDifferentShardsDoesNotBlock(t *testing.T) {
shards := 16
sm := New(shards)
done := make(chan struct{})
go func() {
select {
case <-done:
return
case <-time.After(5 * time.Second):
panic("test locked up")
}
}()
for i := 0; i < shards; i++ {
sm.Lock(i)
}
close(done)
}
func TestLockingSameShardsBlocks(t *testing.T) {
shards := 16
sm := New(shards)
wg := sync.WaitGroup{}
wg.Add(shards)
ch := make(chan int, shards)
for i := 0; i < shards; i++ {
go func(i int) {
if i != 15 {
sm.Lock(i)
}
wg.Done()
wg.Wait()
sm.Lock((15 + i) % shards)
ch <- i
sm.Unlock(i)
}(i)
}
wg.Wait()
for i := 0; i < 2*shards; i++ {
runtime.Gosched()
}
for i := 0; i < shards; i++ {
if a := <-ch; a != i {
t.Errorf("got %d instead of %d", a, i)
}
}
}
func TestShardedByString(t *testing.T) {
shards := 16
sm := NewFor(maphash.String, shards)
wg1 := sync.WaitGroup{}
wg1.Add(shards * 20)
wg2 := sync.WaitGroup{}
wg2.Add(shards * 20)
active := atomic.Int32{}
max := atomic.Int32{}
for i := 0; i < shards*20; i++ {
go func(i int) {
wg1.Done()
wg1.Wait()
sm.Lock(fmt.Sprintf("goroutine %d", i))
activeNew := active.Add(1)
for {
curMax := max.Load()
if curMax >= activeNew {
break
}
if max.CompareAndSwap(curMax, activeNew) {
break
}
}
for j := 0; j < 100; j++ {
runtime.Gosched()
}
active.Add(-1)
sm.Unlock(fmt.Sprintf("goroutine %d", i))
wg2.Done()
}(i)
}
wg2.Wait()
if max.Load() != 16 {
t.Fatal("max load not achieved", max.Load())
}
}
func BenchmarkShardedMutex(b *testing.B) {
shards := 16
sm := New(shards)
done := atomic.Int32{}
go func() {
for {
sm.Lock(0)
sm.Unlock(0)
if done.Load() != 0 {
return
}
}
}()
for i := 0; i < 100; i++ {
runtime.Gosched()
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
sm.Lock(1)
sm.Unlock(1)
}
done.Add(1)
}
func BenchmarkShardedMutexOf(b *testing.B) {
shards := 16
sm := NewFor(maphash.String, shards)
str1 := "string1"
str2 := "string2"
done := atomic.Int32{}
go func() {
for {
sm.Lock(str1)
sm.Unlock(str1)
if done.Load() != 0 {
return
}
}
}()
for i := 0; i < 100; i++ {
runtime.Gosched()
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
sm.Lock(str2)
sm.Unlock(str2)
}
done.Add(1)
}

View File

@ -89,6 +89,7 @@ const (
// health checks
CheckFDLimit
LegacyMarketsEOL
// libp2p
PstoreAddSelfKeysKey

View File

@ -147,6 +147,10 @@ func ConfigStorageMiner(c interface{}) Option {
),
If(cfg.Subsystems.EnableMarkets,
// Alert that legacy-markets is being deprecated
Override(LegacyMarketsEOL, modules.LegacyMarketsEOL),
// Markets
Override(new(dtypes.StagingBlockstore), modules.StagingBlockstore),
Override(new(dtypes.StagingGraphsync), modules.StagingGraphsync(cfg.Dealmaking.SimultaneousTransfersForStorage, cfg.Dealmaking.SimultaneousTransfersForStoragePerClient, cfg.Dealmaking.SimultaneousTransfersForRetrieval)),

View File

@ -1209,7 +1209,7 @@ func (e *EthEvent) installEthFilterSpec(ctx context.Context, filterSpec *ethtype
return nil, xerrors.Errorf("must not specify block hash and from/to block")
}
// TODO: derive a tipset hash from eth hash - might need to push this down into the EventFilterManager
tipsetCid = filterSpec.BlockHash.ToCid()
} else {
if filterSpec.FromBlock == nil || *filterSpec.FromBlock == "latest" {
ts := e.Chain.GetHeaviestTipSet()

View File

@ -32,6 +32,16 @@ func CheckFdLimit(min uint64) func(al *alerting.Alerting) {
}
}
func LegacyMarketsEOL(al *alerting.Alerting) {
// Add alert if lotus-miner legacy markets subsystem is still in use
alert := al.AddAlertType("system", "EOL")
// Alert with a message to migrate to Boost or similar markets subsystems
al.Raise(alert, map[string]string{
"message": "The lotus-miner legacy markets subsystem is deprecated and will be removed in a future release. Please migrate to [Boost](https://boost.filecoin.io) or similar markets subsystems.",
})
}
// TODO: More things:
// * Space in repo dirs (taking into account mounts)
// * Miner

View File

@ -394,6 +394,20 @@ func GossipSub(in GossipIn) (service *pubsub.PubSub, err error) {
transports = append(transports, jsonTransport)
}
tps := make([]string, 0) // range of topics that will be submited to the traces
addTopicToList := func(topicList []string, newTopic string) []string {
// check if the topic is already in the list
for _, tp := range topicList {
if tp == newTopic {
return topicList
}
}
// add it otherwise
return append(topicList, newTopic)
}
// tracer
if in.Cfg.ElasticSearchTracer != "" {
elasticSearchTransport, err := tracer.NewElasticSearchTransport(
in.Cfg.ElasticSearchTracer,
@ -403,7 +417,9 @@ func GossipSub(in GossipIn) (service *pubsub.PubSub, err error) {
return nil, err
}
transports = append(transports, elasticSearchTransport)
tps = addTopicToList(tps, build.BlocksTopic(in.Nn))
}
lt := tracer.NewLotusTracer(transports, in.Host.ID(), in.Cfg.TracerSourceAuth)
// tracer
@ -412,28 +428,25 @@ func GossipSub(in GossipIn) (service *pubsub.PubSub, err error) {
if err != nil {
return nil, err
}
pi, err := peer.AddrInfoFromP2pAddr(a)
if err != nil {
return nil, err
}
tr, err := pubsub.NewRemoteTracer(context.TODO(), in.Host, *pi)
if err != nil {
return nil, err
}
pst := newPeerScoreTracker(lt, in.Sk)
trw := newTracerWrapper(tr, lt, build.BlocksTopic(in.Nn))
trw := newTracerWrapper(tr, lt, tps...)
options = append(options, pubsub.WithEventTracer(trw))
options = append(options, pubsub.WithPeerScoreInspect(pst.UpdatePeerScore, 10*time.Second))
} else {
// still instantiate a tracer for collecting metrics
trw := newTracerWrapper(nil, lt)
options = append(options, pubsub.WithEventTracer(trw))
pst := newPeerScoreTracker(lt, in.Sk)
trw := newTracerWrapper(nil, lt, tps...)
options = append(options, pubsub.WithEventTracer(trw))
options = append(options, pubsub.WithPeerScoreInspect(pst.UpdatePeerScore, 10*time.Second))
}
@ -457,7 +470,6 @@ func newTracerWrapper(
topicsMap[topic] = struct{}{}
}
}
return &tracerWrapper{lp2pTracer: lp2pTracer, lotusTracer: lotusTracer, topics: topicsMap}
}
@ -486,71 +498,164 @@ func (trw *tracerWrapper) Trace(evt *pubsub_pb.TraceEvent) {
if trw.lp2pTracer != nil {
trw.lp2pTracer.Trace(evt)
}
if trw.lotusTracer != nil {
trw.lotusTracer.Trace(evt)
}
}
case pubsub_pb.TraceEvent_DELIVER_MESSAGE:
stats.Record(context.TODO(), metrics.PubsubDeliverMessage.M(1))
if trw.traceMessage(evt.GetDeliverMessage().GetTopic()) {
if trw.lp2pTracer != nil {
trw.lp2pTracer.Trace(evt)
}
if trw.lotusTracer != nil {
trw.lotusTracer.Trace(evt)
}
}
case pubsub_pb.TraceEvent_REJECT_MESSAGE:
stats.Record(context.TODO(), metrics.PubsubRejectMessage.M(1))
if trw.traceMessage(evt.GetRejectMessage().GetTopic()) {
if trw.lp2pTracer != nil {
trw.lp2pTracer.Trace(evt)
}
if trw.lotusTracer != nil {
trw.lotusTracer.Trace(evt)
}
}
case pubsub_pb.TraceEvent_DUPLICATE_MESSAGE:
stats.Record(context.TODO(), metrics.PubsubDuplicateMessage.M(1))
if trw.traceMessage(evt.GetDuplicateMessage().GetTopic()) {
if trw.lp2pTracer != nil {
trw.lp2pTracer.Trace(evt)
}
if trw.lotusTracer != nil {
trw.lotusTracer.Trace(evt)
}
}
case pubsub_pb.TraceEvent_JOIN:
if trw.traceMessage(evt.GetJoin().GetTopic()) {
if trw.lp2pTracer != nil {
trw.lp2pTracer.Trace(evt)
}
if trw.lotusTracer != nil {
trw.lotusTracer.Trace(evt)
}
}
case pubsub_pb.TraceEvent_LEAVE:
if trw.traceMessage(evt.GetLeave().GetTopic()) {
if trw.lp2pTracer != nil {
trw.lp2pTracer.Trace(evt)
}
if trw.lotusTracer != nil {
trw.lotusTracer.Trace(evt)
}
}
case pubsub_pb.TraceEvent_GRAFT:
if trw.traceMessage(evt.GetGraft().GetTopic()) {
if trw.lp2pTracer != nil {
trw.lp2pTracer.Trace(evt)
}
if trw.lotusTracer != nil {
trw.lotusTracer.Trace(evt)
}
}
case pubsub_pb.TraceEvent_DUPLICATE_MESSAGE:
stats.Record(context.TODO(), metrics.PubsubDuplicateMessage.M(1))
case pubsub_pb.TraceEvent_JOIN:
if trw.lp2pTracer != nil {
trw.lp2pTracer.Trace(evt)
}
if trw.lotusTracer != nil {
trw.lotusTracer.Trace(evt)
}
case pubsub_pb.TraceEvent_LEAVE:
if trw.lp2pTracer != nil {
trw.lp2pTracer.Trace(evt)
}
if trw.lotusTracer != nil {
trw.lotusTracer.Trace(evt)
}
case pubsub_pb.TraceEvent_GRAFT:
if trw.lp2pTracer != nil {
trw.lp2pTracer.Trace(evt)
}
if trw.lotusTracer != nil {
trw.lotusTracer.Trace(evt)
}
case pubsub_pb.TraceEvent_PRUNE:
if trw.traceMessage(evt.GetPrune().GetTopic()) {
if trw.lp2pTracer != nil {
trw.lp2pTracer.Trace(evt)
}
if trw.lotusTracer != nil {
trw.lotusTracer.Trace(evt)
}
}
case pubsub_pb.TraceEvent_ADD_PEER:
if trw.lp2pTracer != nil {
trw.lp2pTracer.Trace(evt)
}
if trw.lotusTracer != nil {
trw.lotusTracer.Trace(evt)
}
case pubsub_pb.TraceEvent_REMOVE_PEER:
if trw.lp2pTracer != nil {
trw.lp2pTracer.Trace(evt)
}
if trw.lotusTracer != nil {
trw.lotusTracer.Trace(evt)
}
case pubsub_pb.TraceEvent_RECV_RPC:
stats.Record(context.TODO(), metrics.PubsubRecvRPC.M(1))
// only track the RPC Calls from IWANT / IHAVE / BLOCK topic
controlRPC := evt.GetRecvRPC().GetMeta().GetControl()
ihave := controlRPC.GetIhave()
iwant := controlRPC.GetIwant()
msgsRPC := evt.GetRecvRPC().GetMeta().GetMessages()
// check if any of the messages we are sending belong to a trackable topic
var validTopic bool = false
for _, topic := range msgsRPC {
if trw.traceMessage(topic.GetTopic()) {
validTopic = true
break
}
}
// track if the Iwant / Ihave messages are from a valid Topic
var validIhave bool = false
for _, msgs := range ihave {
if trw.traceMessage(msgs.GetTopic()) {
validIhave = true
break
}
}
// check if we have any of iwant msgs (it doesn't classify per topic - just msg.ID)
validIwant := len(iwant) > 0
// trace the event if any of the flags was triggered
if validIhave || validIwant || validTopic {
if trw.lp2pTracer != nil {
trw.lp2pTracer.Trace(evt)
}
if trw.lotusTracer != nil {
trw.lotusTracer.Trace(evt)
}
}
case pubsub_pb.TraceEvent_SEND_RPC:
stats.Record(context.TODO(), metrics.PubsubSendRPC.M(1))
// only track the RPC Calls from IWANT / IHAVE / BLOCK topic
controlRPC := evt.GetSendRPC().GetMeta().GetControl()
ihave := controlRPC.GetIhave()
iwant := controlRPC.GetIwant()
msgsRPC := evt.GetSendRPC().GetMeta().GetMessages()
// check if any of the messages we are sending belong to a trackable topic
var validTopic bool = false
for _, topic := range msgsRPC {
if trw.traceMessage(topic.GetTopic()) {
validTopic = true
break
}
}
// track if the Iwant / Ihave messages are from a valid Topic
var validIhave bool = false
for _, msgs := range ihave {
if trw.traceMessage(msgs.GetTopic()) {
validIhave = true
break
}
}
// check if there was any of the Iwant msgs
validIwant := len(iwant) > 0
// trace the msgs if any of the flags was triggered
if validIhave || validIwant || validTopic {
if trw.lp2pTracer != nil {
trw.lp2pTracer.Trace(evt)
}
if trw.lotusTracer != nil {
trw.lotusTracer.Trace(evt)
}
}
case pubsub_pb.TraceEvent_DROP_RPC:
stats.Record(context.TODO(), metrics.PubsubDropRPC.M(1))
}

View File

@ -64,7 +64,7 @@ func IndexProvider(cfg config.IndexProviderConfig) func(params IdxProv, marketHo
// If announcements to the network are enabled, then set options for datatransfer publisher.
if cfg.Enable {
// Join the indexer topic using the market's pubsub instance. Otherwise, the provider
// engine would create its own instance of pubsub down the line in go-legs, which has
// engine would create its own instance of pubsub down the line in dagsync, which has
// no validators by default.
t, err := ps.Join(topicName)
if err != nil {

View File

@ -1,18 +1,23 @@
package tracer
import (
"bytes"
"context"
"encoding/json"
"fmt"
"net/http"
"net/url"
"strings"
"time"
"github.com/elastic/go-elasticsearch/v7"
"github.com/elastic/go-elasticsearch/v7/esapi"
"github.com/elastic/go-elasticsearch/v7/esutil"
)
const (
ElasticSearchDefaultIndex = "lotus-pubsub"
flushInterval = 10 * time.Second
flushBytes = 1024 * 1024 // MB
esWorkers = 2 // TODO: hardcoded
)
func NewElasticSearchTransport(connectionString string, elasticsearchIndex string) (TracerTransport, error) {
@ -28,12 +33,12 @@ func NewElasticSearchTransport(connectionString string, elasticsearchIndex strin
Addresses: []string{
conUrl.Scheme + "://" + conUrl.Host,
},
Username: username,
Password: password,
Username: username,
Password: password,
Transport: &http.Transport{},
}
es, err := elasticsearch.NewClient(cfg)
if err != nil {
return nil, err
}
@ -45,14 +50,31 @@ func NewElasticSearchTransport(connectionString string, elasticsearchIndex strin
esIndex = ElasticSearchDefaultIndex
}
// Create the BulkIndexer to batch ES trace submission
bi, err := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
Index: esIndex,
Client: es,
NumWorkers: esWorkers,
FlushBytes: int(flushBytes),
FlushInterval: flushInterval,
OnError: func(ctx context.Context, err error) {
log.Errorf("Error persisting queries %s", err.Error())
},
})
if err != nil {
return nil, err
}
return &elasticSearchTransport{
cl: es,
bi: bi,
esIndex: esIndex,
}, nil
}
type elasticSearchTransport struct {
cl *elasticsearch.Client
bi esutil.BulkIndexer
esIndex string
}
@ -72,26 +94,18 @@ func (est *elasticSearchTransport) Transport(evt TracerTransportEvent) error {
return fmt.Errorf("error while marshaling event: %s", err)
}
req := esapi.IndexRequest{
Index: est.esIndex,
Body: strings.NewReader(string(jsonEvt)),
Refresh: "true",
}
// Perform the request with the client.
res, err := req.Do(context.Background(), est.cl)
if err != nil {
return err
}
err = res.Body.Close()
if err != nil {
return err
}
if res.IsError() {
return fmt.Errorf("[%s] Error indexing document ID=%s", res.Status(), req.DocumentID)
}
return nil
return est.bi.Add(
context.Background(),
esutil.BulkIndexerItem{
Action: "index",
Body: bytes.NewReader(jsonEvt),
OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem, err error) {
if err != nil {
log.Errorf("unable to submit trace - %s", err)
} else {
log.Errorf("unable to submit trace %s: %s", res.Error.Type, res.Error.Reason)
}
},
},
)
}