159 lines
4.2 KiB
Go
159 lines
4.2 KiB
Go
|
package lf3
|
||
|
|
||
|
import (
|
||
|
"context"
|
||
|
"errors"
|
||
|
"time"
|
||
|
|
||
|
"github.com/ipfs/go-datastore"
|
||
|
"github.com/ipfs/go-datastore/namespace"
|
||
|
logging "github.com/ipfs/go-log/v2"
|
||
|
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
||
|
"github.com/libp2p/go-libp2p/core/host"
|
||
|
"go.uber.org/fx"
|
||
|
"golang.org/x/xerrors"
|
||
|
|
||
|
"github.com/filecoin-project/go-f3"
|
||
|
"github.com/filecoin-project/go-f3/blssig"
|
||
|
"github.com/filecoin-project/go-f3/certs"
|
||
|
"github.com/filecoin-project/go-f3/gpbft"
|
||
|
|
||
|
"github.com/filecoin-project/lotus/api"
|
||
|
"github.com/filecoin-project/lotus/build"
|
||
|
"github.com/filecoin-project/lotus/chain/stmgr"
|
||
|
"github.com/filecoin-project/lotus/chain/store"
|
||
|
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
||
|
"github.com/filecoin-project/lotus/node/modules/helpers"
|
||
|
)
|
||
|
|
||
|
type F3 struct {
|
||
|
inner *f3.F3
|
||
|
|
||
|
signer gpbft.Signer
|
||
|
}
|
||
|
|
||
|
type F3Params struct {
|
||
|
fx.In
|
||
|
|
||
|
NetworkName dtypes.NetworkName
|
||
|
PubSub *pubsub.PubSub
|
||
|
Host host.Host
|
||
|
ChainStore *store.ChainStore
|
||
|
StateManager *stmgr.StateManager
|
||
|
Datastore dtypes.MetadataDS
|
||
|
Wallet api.Wallet
|
||
|
}
|
||
|
|
||
|
var log = logging.Logger("f3")
|
||
|
|
||
|
func New(mctx helpers.MetricsCtx, lc fx.Lifecycle, params F3Params) (*F3, error) {
|
||
|
manifest := f3.LocalnetManifest()
|
||
|
manifest.NetworkName = gpbft.NetworkName(params.NetworkName)
|
||
|
manifest.ECDelay = 2 * time.Duration(build.BlockDelaySecs) * time.Second
|
||
|
manifest.ECPeriod = manifest.ECDelay
|
||
|
manifest.BootstrapEpoch = int64(build.F3BootstrapEpoch)
|
||
|
manifest.ECFinality = int64(build.Finality)
|
||
|
|
||
|
ds := namespace.Wrap(params.Datastore, datastore.NewKey("/f3"))
|
||
|
ec := &ecWrapper{
|
||
|
ChainStore: params.ChainStore,
|
||
|
StateManager: params.StateManager,
|
||
|
Manifest: manifest,
|
||
|
}
|
||
|
verif := blssig.VerifierWithKeyOnG1()
|
||
|
|
||
|
module, err := f3.New(mctx, manifest, ds,
|
||
|
params.Host, params.PubSub, verif, ec, log, nil)
|
||
|
|
||
|
if err != nil {
|
||
|
return nil, xerrors.Errorf("creating F3: %w", err)
|
||
|
}
|
||
|
|
||
|
fff := &F3{
|
||
|
inner: module,
|
||
|
signer: &signer{params.Wallet},
|
||
|
}
|
||
|
|
||
|
lCtx, cancel := context.WithCancel(mctx)
|
||
|
lc.Append(fx.StartStopHook(
|
||
|
func() {
|
||
|
go func() {
|
||
|
err := fff.inner.Run(lCtx)
|
||
|
if err != nil {
|
||
|
log.Errorf("running f3: %+v", err)
|
||
|
}
|
||
|
}()
|
||
|
}, cancel))
|
||
|
|
||
|
return fff, nil
|
||
|
}
|
||
|
|
||
|
// Participate runs the participation loop for givine minerID
|
||
|
// It is blocking
|
||
|
func (fff *F3) Participate(ctx context.Context, minerIDAddress uint64, errCh chan<- string) {
|
||
|
defer close(errCh)
|
||
|
|
||
|
for ctx.Err() == nil {
|
||
|
|
||
|
// create channel for some buffer so we don't get dropped under high load
|
||
|
msgCh := make(chan *gpbft.MessageBuilder, 4)
|
||
|
// SubscribeForMessagesToSign will close the channel if it fills up
|
||
|
// so using the closer is not necessary, we can just drop it on the floor
|
||
|
_ = fff.inner.SubscribeForMessagesToSign(msgCh)
|
||
|
|
||
|
participateOnce := func(mb *gpbft.MessageBuilder) error {
|
||
|
signatureBuilder, err := mb.PrepareSigningInputs(gpbft.ActorID(minerIDAddress))
|
||
|
if errors.Is(err, gpbft.ErrNoPower) {
|
||
|
// we don't have any power in F3, continue
|
||
|
log.Debug("no power to participate in F3: %+v", err)
|
||
|
return nil
|
||
|
}
|
||
|
if err != nil {
|
||
|
log.Errorf("preparing signing inputs: %+v", err)
|
||
|
return err
|
||
|
}
|
||
|
// if worker keys were stored not in the node, the signatureBuilder can be send there
|
||
|
// the sign can be called where the keys are stored and then
|
||
|
// {signatureBuilder, payloadSig, vrfSig} can be sent back to lotus for broadcast
|
||
|
payloadSig, vrfSig, err := signatureBuilder.Sign(fff.signer)
|
||
|
if err != nil {
|
||
|
log.Errorf("signing message: %+v", err)
|
||
|
return err
|
||
|
}
|
||
|
log.Infof("miner with id %d is sending message in F3", minerIDAddress)
|
||
|
fff.inner.Broadcast(ctx, signatureBuilder, payloadSig, vrfSig)
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
inner:
|
||
|
for ctx.Err() == nil {
|
||
|
select {
|
||
|
case mb, ok := <-msgCh:
|
||
|
if !ok {
|
||
|
// the broadcast bus kicked us out
|
||
|
log.Warnf("lost message bus subscription, retrying")
|
||
|
break inner
|
||
|
}
|
||
|
|
||
|
err := participateOnce(mb)
|
||
|
if err != nil {
|
||
|
errCh <- err.Error()
|
||
|
} else {
|
||
|
errCh <- ""
|
||
|
}
|
||
|
|
||
|
case <-ctx.Done():
|
||
|
return
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (fff *F3) GetCert(ctx context.Context, instance uint64) (*certs.FinalityCertificate, error) {
|
||
|
return fff.inner.GetCert(ctx, instance)
|
||
|
}
|
||
|
|
||
|
func (fff *F3) GetLatestCert(ctx context.Context) (*certs.FinalityCertificate, error) {
|
||
|
return fff.inner.GetLatestCert(ctx)
|
||
|
}
|