fix some bugs and address some comments

This commit is contained in:
Shrenuj Bansal 2022-10-16 22:52:22 -04:00
parent 17a77220c2
commit 139f8773de
8 changed files with 33 additions and 59 deletions

View File

@ -32,10 +32,6 @@ type MsgSigner interface {
NextNonce(ctx context.Context, addr address.Address) (uint64, error) NextNonce(ctx context.Context, addr address.Address) (uint64, error)
SaveNonce(ctx context.Context, addr address.Address, nonce uint64) error SaveNonce(ctx context.Context, addr address.Address, nonce uint64) error
dstoreKey(addr address.Address) datastore.Key dstoreKey(addr address.Address) datastore.Key
//IsLeader(ctx context.Context) bool
//Leader(ctx context.Context) (peer.ID, error)
//RedirectToLeader(ctx context.Context, method string, arg interface{}, ret interface{}) (bool, error)
//GetRaftState(ctx context.Context) (consensus.State, error)
} }
// MessageSigner keeps track of nonces per address, and increments the nonce // MessageSigner keeps track of nonces per address, and increments the nonce
@ -47,8 +43,6 @@ type MessageSigner struct {
ds datastore.Batching ds datastore.Batching
} }
//var _ full.MsgSigner = &MessageSigner{}
func NewMessageSigner(wallet api.Wallet, mpool messagepool.MpoolNonceAPI, ds dtypes.MetadataDS) *MessageSigner { func NewMessageSigner(wallet api.Wallet, mpool messagepool.MpoolNonceAPI, ds dtypes.MetadataDS) *MessageSigner {
ds = namespace.Wrap(ds, datastore.NewKey("/message-signer/")) ds = namespace.Wrap(ds, datastore.NewKey("/message-signer/"))
return &MessageSigner{ return &MessageSigner{
@ -91,6 +85,7 @@ func (ms *MessageSigner) SignMessage(ctx context.Context, msg *types.Message, sp
Message: *msg, Message: *msg,
Signature: *sig, Signature: *sig,
} }
err = cb(smsg) err = cb(smsg)
if err != nil { if err != nil {
return nil, err return nil, err
@ -193,19 +188,3 @@ func (ms *MessageSigner) SaveNonce(ctx context.Context, addr address.Address, no
func (ms *MessageSigner) dstoreKey(addr address.Address) datastore.Key { func (ms *MessageSigner) dstoreKey(addr address.Address) datastore.Key {
return datastore.KeyWithNamespaces([]string{dsKeyActorNonce, addr.String()}) return datastore.KeyWithNamespaces([]string{dsKeyActorNonce, addr.String()})
} }
//func (ms *MessageSigner) IsLeader(ctx context.Context) bool {
// return true
//}
//
//func (ms *MessageSigner) RedirectToLeader(ctx context.Context, method string, arg interface{}, ret interface{}) (bool, error) {
// return false, xerrors.Errorf("single node shouldn't have any redirects")
//}
//
//func (ms *MessageSigner) GetRaftState(ctx context.Context) (consensus.State, error) {
// return nil, xerrors.Errorf("this is a non raft consensus message signer")
//}
//
//func (ms *MessageSigner) Leader(ctx context.Context) (peer.ID, error) {
// return "", xerrors.Errorf("no leaders in non raft message signer")
//}

View File

@ -4,6 +4,10 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"github.com/filecoin-project/lotus/lib/retry"
"github.com/mitchellh/go-homedir"
"github.com/urfave/cli/v2"
"golang.org/x/xerrors"
"net/http" "net/http"
"net/url" "net/url"
"os" "os"
@ -13,17 +17,12 @@ import (
"syscall" "syscall"
"time" "time"
"github.com/mitchellh/go-homedir"
"github.com/urfave/cli/v2"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-jsonrpc" "github.com/filecoin-project/go-jsonrpc"
"github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/api/client" "github.com/filecoin-project/lotus/api/client"
"github.com/filecoin-project/lotus/api/v0api" "github.com/filecoin-project/lotus/api/v0api"
"github.com/filecoin-project/lotus/api/v1api" "github.com/filecoin-project/lotus/api/v1api"
"github.com/filecoin-project/lotus/lib/retry"
"github.com/filecoin-project/lotus/node/repo" "github.com/filecoin-project/lotus/node/repo"
) )
@ -124,7 +123,7 @@ func GetAPIInfoMulti(ctx *cli.Context, t repo.RepoType) ([]APIInfo, error) {
func GetAPIInfo(ctx *cli.Context, t repo.RepoType) (APIInfo, error) { func GetAPIInfo(ctx *cli.Context, t repo.RepoType) (APIInfo, error) {
ainfos, err := GetAPIInfoMulti(ctx, t) ainfos, err := GetAPIInfoMulti(ctx, t)
if err != nil { if err != nil || len(ainfos) == 0 {
return APIInfo{}, err return APIInfo{}, err
} }
@ -145,7 +144,7 @@ func GetRawAPIMulti(ctx *cli.Context, t repo.RepoType, version string) ([]HttpHe
var httpHeads []HttpHead var httpHeads []HttpHead
ainfos, err := GetAPIInfoMulti(ctx, t) ainfos, err := GetAPIInfoMulti(ctx, t)
if err != nil { if err != nil || len(ainfos) == 0 {
return httpHeads, xerrors.Errorf("could not get API info for %s: %w", t.Type(), err) return httpHeads, xerrors.Errorf("could not get API info for %s: %w", t.Type(), err)
} }
@ -157,11 +156,6 @@ func GetRawAPIMulti(ctx *cli.Context, t repo.RepoType, version string) ([]HttpHe
httpHeads = append(httpHeads, HttpHead{addr: addr, header: ainfo.AuthHeader()}) httpHeads = append(httpHeads, HttpHead{addr: addr, header: ainfo.AuthHeader()})
} }
//addr, err := ainfo.DialArgs(version)
//if err != nil {
// return "", nil, xerrors.Errorf("could not get DialArgs: %w", err)
//}
if IsVeryVerbose { if IsVeryVerbose {
_, _ = fmt.Fprintf(ctx.App.Writer, "using raw API %s endpoint: %s\n", version, httpHeads[0].addr) _, _ = fmt.Fprintf(ctx.App.Writer, "using raw API %s endpoint: %s\n", version, httpHeads[0].addr)
} }
@ -239,11 +233,8 @@ func FullNodeProxy[T api.FullNode](ins []T, outstr *api.FullNodeStruct) {
outs := api.GetInternalStructs(outstr) outs := api.GetInternalStructs(outstr)
var rins []reflect.Value var rins []reflect.Value
//peertoNode := make(map[peer.ID]reflect.Value)
for _, in := range ins { for _, in := range ins {
rin := reflect.ValueOf(in) rins = append(rins, reflect.ValueOf(in))
rins = append(rins, rin)
//peertoNode[ins] = rin
} }
for _, out := range outs { for _, out := range outs {
@ -280,12 +271,6 @@ func FullNodeProxy[T api.FullNode](ins []T, outstr *api.FullNodeStruct) {
total := len(rins) total := len(rins)
result, err := retry.Retry(ctx, 5, initialBackoff, errorsToRetry, func() (results []reflect.Value, err2 error) { result, err := retry.Retry(ctx, 5, initialBackoff, errorsToRetry, func() (results []reflect.Value, err2 error) {
//ctx := args[0].Interface().(context.Context)
//
//rin := peertoNode[ins[0].Leader(ctx)]
//fn := rin.MethodByName(field.Name)
//
//return fn.Call(args)
curr = (curr + 1) % total curr = (curr + 1) % total
result := fns[curr].Call(args) result := fns[curr].Call(args)
@ -350,12 +335,19 @@ func GetFullNodeAPIV1New(ctx *cli.Context) (v1api.FullNode, jsonrpc.ClientCloser
for _, head := range heads { for _, head := range heads {
v1api, closer, err := client.NewFullNodeRPCV1(ctx.Context, head.addr, head.header) v1api, closer, err := client.NewFullNodeRPCV1(ctx.Context, head.addr, head.header)
if err != nil { if err != nil {
return nil, nil, err log.Warnf("Not able to establish connection to node with addr: ", head.addr)
continue
} }
fullNodes = append(fullNodes, v1api) fullNodes = append(fullNodes, v1api)
closers = append(closers, closer) closers = append(closers, closer)
} }
// When running in cluster mode and trying to establish connections to multiple nodes, fail
// if less than 2 lotus nodes are actually running
if len(heads) > 1 && len(fullNodes) < 2 {
return nil, nil, xerrors.Errorf("Not able to establish connection to more than a single node")
}
finalCloser := func() { finalCloser := func() {
for _, c := range closers { for _, c := range closers {
c() c()

View File

@ -46,8 +46,6 @@ func ParseApiInfoMulti(s string) []APIInfo {
for _, addr := range allAddrs { for _, addr := range allAddrs {
sp := strings.SplitN(addr, ":", 2) sp := strings.SplitN(addr, ":", 2)
//tok = []byte(sp[0])
//s = sp[1]
apiInfos = append(apiInfos, APIInfo{ apiInfos = append(apiInfos, APIInfo{
Addr: sp[1], Addr: sp[1],
Token: []byte(sp[0]), Token: []byte(sp[0]),
@ -55,10 +53,6 @@ func ParseApiInfoMulti(s string) []APIInfo {
} }
} }
//return APIInfo{
// Addr: s,
// Token: tok,
//}
return apiInfos return apiInfos
} }

2
go.mod
View File

@ -149,7 +149,6 @@ require (
go.opentelemetry.io/otel/bridge/opencensus v0.25.0 go.opentelemetry.io/otel/bridge/opencensus v0.25.0
go.opentelemetry.io/otel/exporters/jaeger v1.2.0 go.opentelemetry.io/otel/exporters/jaeger v1.2.0
go.opentelemetry.io/otel/sdk v1.2.0 go.opentelemetry.io/otel/sdk v1.2.0
go.uber.org/atomic v1.10.0
go.uber.org/fx v1.15.0 go.uber.org/fx v1.15.0
go.uber.org/multierr v1.8.0 go.uber.org/multierr v1.8.0
go.uber.org/zap v1.22.0 go.uber.org/zap v1.22.0
@ -329,6 +328,7 @@ require (
go.opentelemetry.io/otel/metric v0.25.0 // indirect go.opentelemetry.io/otel/metric v0.25.0 // indirect
go.opentelemetry.io/otel/sdk/export/metric v0.25.0 // indirect go.opentelemetry.io/otel/sdk/export/metric v0.25.0 // indirect
go.opentelemetry.io/otel/trace v1.7.0 // indirect go.opentelemetry.io/otel/trace v1.7.0 // indirect
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/dig v1.12.0 // indirect go.uber.org/dig v1.12.0 // indirect
go4.org v0.0.0-20200411211856-f5505b9728dd // indirect go4.org v0.0.0-20200411211856-f5505b9728dd // indirect
golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e // indirect golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e // indirect

View File

@ -3,10 +3,10 @@
package consensus package consensus
import ( import (
"bytes"
"context" "context"
"errors" "errors"
"fmt" "fmt"
"github.com/filecoin-project/lotus/lib/addrutil"
"sort" "sort"
"time" "time"
@ -18,6 +18,7 @@ import (
"github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/messagepool" "github.com/filecoin-project/lotus/chain/messagepool"
"github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/lib/addrutil"
"github.com/filecoin-project/lotus/node/repo" "github.com/filecoin-project/lotus/node/repo"
//ds "github.com/ipfs/go-datastore" //ds "github.com/ipfs/go-datastore"
@ -73,10 +74,17 @@ func (c ConsensusOp) ApplyTo(state consensus.State) (consensus.State, error) {
s := state.(*RaftState) s := state.(*RaftState)
s.NonceMap[c.Addr] = c.Nonce s.NonceMap[c.Addr] = c.Nonce
if c.SignedMsg != nil { if c.SignedMsg != nil {
tmp := *c.SignedMsg
s.MsgUuids[c.Uuid] = &tmp
_, err := s.Mpool.Push(context.TODO(), c.SignedMsg, false) // Deep copy to tmp
var buffer bytes.Buffer
c.SignedMsg.MarshalCBOR(&buffer)
tmp, err := types.DecodeSignedMessage(buffer.Bytes())
if err != nil {
return nil, err
}
s.MsgUuids[c.Uuid] = tmp
_, err = s.Mpool.Push(context.TODO(), tmp, false)
// Since this is only meant to keep messages in sync, ignore any error which // Since this is only meant to keep messages in sync, ignore any error which
// shows the message already exists in the mpool // shows the message already exists in the mpool
if err != nil && !api.ErrorIsIn(err, []error{messagepool.ErrExistingNonce}) { if err != nil && !api.ErrorIsIn(err, []error{messagepool.ErrExistingNonce}) {

View File

@ -4,7 +4,6 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"github.com/filecoin-project/lotus/lib/addrutil"
"io" "io"
"os" "os"
"path/filepath" "path/filepath"
@ -18,6 +17,7 @@ import (
peer "github.com/libp2p/go-libp2p/core/peer" peer "github.com/libp2p/go-libp2p/core/peer"
"go.uber.org/zap" "go.uber.org/zap"
"github.com/filecoin-project/lotus/lib/addrutil"
"github.com/filecoin-project/lotus/node/repo" "github.com/filecoin-project/lotus/node/repo"
) )

View File

@ -6,7 +6,6 @@ import (
"crypto/rand" "crypto/rand"
"encoding/binary" "encoding/binary"
"fmt" "fmt"
cliutil "github.com/filecoin-project/lotus/cli/util"
"os" "os"
"sync" "sync"
"time" "time"
@ -31,6 +30,7 @@ import (
"github.com/filecoin-project/lotus/chain/gen/slashfilter" "github.com/filecoin-project/lotus/chain/gen/slashfilter"
lrand "github.com/filecoin-project/lotus/chain/rand" lrand "github.com/filecoin-project/lotus/chain/rand"
"github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types"
cliutil "github.com/filecoin-project/lotus/cli/util"
"github.com/filecoin-project/lotus/journal" "github.com/filecoin-project/lotus/journal"
) )

View File

@ -2,6 +2,7 @@ package full
import ( import (
"context" "context"
"os"
"sync/atomic" "sync/atomic"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
@ -56,7 +57,7 @@ func (a *SyncAPI) SyncSubmitBlock(ctx context.Context, blk *types.BlockMsg) erro
return xerrors.Errorf("loading parent block: %w", err) return xerrors.Errorf("loading parent block: %w", err)
} }
if a.SlashFilter != nil { if a.SlashFilter != nil && os.Getenv("LOTUS_NO_SLASHFILTER") != "_yes_i_know_i_can_and_probably_will_lose_all_my_fil_and_power_" {
if err := a.SlashFilter.MinedBlock(ctx, blk.Header, parent.Height); err != nil { if err := a.SlashFilter.MinedBlock(ctx, blk.Header, parent.Height); err != nil {
log.Errorf("<!!> SLASH FILTER ERROR: %s", err) log.Errorf("<!!> SLASH FILTER ERROR: %s", err)
return xerrors.Errorf("<!!> SLASH FILTER ERROR: %w", err) return xerrors.Errorf("<!!> SLASH FILTER ERROR: %w", err)