Merge pull request #11513 from filecoin-project/lotus-provider-backports

backports: lotus-provider: fixes caught in rc1-testing
This commit is contained in:
Andrew Jackson (Ajax) 2023-12-18 07:39:54 -08:00 committed by GitHub
commit bc5a2a6b52
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 95 additions and 26 deletions

View File

@ -356,7 +356,7 @@ docsgen-md-bin: api-gen actors-gen
docsgen-openrpc-bin: api-gen actors-gen
$(GOCC) build $(GOFLAGS) -o docgen-openrpc ./api/docgen-openrpc/cmd
docsgen-md: docsgen-md-full docsgen-md-storage docsgen-md-worker
docsgen-md: docsgen-md-full docsgen-md-storage docsgen-md-worker docsgen-md-provider
docsgen-md-full: docsgen-md-bin
./docgen-md "api/api_full.go" "FullNode" "api" "./api" > documentation/en/api-v1-unstable-methods.md
@ -365,6 +365,8 @@ docsgen-md-storage: docsgen-md-bin
./docgen-md "api/api_storage.go" "StorageMiner" "api" "./api" > documentation/en/api-v0-methods-miner.md
docsgen-md-worker: docsgen-md-bin
./docgen-md "api/api_worker.go" "Worker" "api" "./api" > documentation/en/api-v0-methods-worker.md
docsgen-md-provider: docsgen-md-bin
./docgen-md "api/api_lp.go" "Provider" "api" "./api" > documentation/en/api-v0-methods-provider.md
docsgen-openrpc: docsgen-openrpc-full docsgen-openrpc-storage docsgen-openrpc-worker docsgen-openrpc-gateway

View File

@ -432,6 +432,10 @@ func GetAPIType(name, pkg string) (i interface{}, t reflect.Type, permStruct []r
i = &api.GatewayStruct{}
t = reflect.TypeOf(new(struct{ api.Gateway })).Elem()
permStruct = append(permStruct, reflect.TypeOf(api.GatewayStruct{}.Internal))
case "Provider":
i = &api.LotusProviderStruct{}
t = reflect.TypeOf(new(struct{ api.LotusProvider })).Elem()
permStruct = append(permStruct, reflect.TypeOf(api.LotusProviderStruct{}.Internal))
default:
panic("unknown type")
}

View File

@ -231,12 +231,17 @@ func fromMiner(cctx *cli.Context) (err error) {
dbSettings += ` --db-name="` + smCfg.HarmonyDB.Database + `"`
}
var layerMaybe string
if name != "base" {
layerMaybe = "--layer=" + name
}
msg += `
To work with the config:
` + cliCommandColor(`lotus-provider `+dbSettings+` config help `)
msg += `
To run Lotus Provider: in its own machine or cgroup without other files, use the command:
` + cliCommandColor(`lotus-provider `+dbSettings+` run --layers="`+name+`"`)
` + cliCommandColor(`lotus-provider `+dbSettings+` run `+layerMaybe)
fmt.Println(msg)
return nil
}

View File

@ -117,7 +117,7 @@ var wdPostTaskCmd = &cli.Command{
}
fmt.Print(".")
}
log.Infof("Result:", result.String)
log.Infof("Result: %s", result.String)
return nil
},
}

View File

@ -176,7 +176,7 @@ var runCmd = &cli.Command{
}
}
log.Infow("This lotus_provider instance handles",
"miner_addresses", maddrs,
"miner_addresses", minerAddressesToStrings(maddrs),
"tasks", lo.Map(activeTasks, func(t harmonytask.TaskInterface, _ int) string { return t.TypeDetails().Name }))
taskEngine, err := harmonytask.New(db, activeTasks, deps.listenAddr)
@ -457,3 +457,11 @@ func (p *ProviderAPI) Shutdown(context.Context) error {
close(p.ShutdownChan)
return nil
}
func minerAddressesToStrings(maddrs []dtypes.MinerAddress) []string {
strs := make([]string, len(maddrs))
for i, addr := range maddrs {
strs[i] = address.Address(addr).String()
}
return strs
}

View File

@ -0,0 +1,25 @@
# Groups
* [](#)
* [Shutdown](#Shutdown)
* [Version](#Version)
##
### Shutdown
Perms: admin
Inputs: `null`
Response: `{}`
### Version
Perms: admin
Inputs: `null`
Response: `131840`

View File

@ -146,6 +146,11 @@ func New(
TaskTypeDetails: c.TypeDetails(),
TaskEngine: e,
}
if len(h.Name) > 16 {
return nil, fmt.Errorf("task name too long: %s, max 16 characters", h.Name)
}
e.handlers = append(e.handlers, &h)
e.taskMap[h.TaskTypeDetails.Name] = &h
}
@ -171,7 +176,7 @@ func New(
continue // not really fatal, but not great
}
}
if !h.considerWork("recovered", []TaskID{TaskID(w.ID)}) {
if !h.considerWork(workSourceRecover, []TaskID{TaskID(w.ID)}) {
log.Error("Strange: Unable to accept previously owned task: ", w.ID, w.Name)
}
}
@ -280,7 +285,7 @@ func (e *TaskEngine) pollerTryAllWork() {
continue
}
if len(unownedTasks) > 0 {
accepted := v.considerWork("poller", unownedTasks)
accepted := v.considerWork(workSourcePoller, unownedTasks)
if accepted {
return // accept new work slowly and in priority order
}

View File

@ -49,6 +49,11 @@ func (h *taskTypeHandler) AddTask(extra func(TaskID, *harmonydb.Tx) (bool, error
}
}
const (
workSourcePoller = "poller"
workSourceRecover = "recovered"
)
// considerWork is called to attempt to start work on a task-id of this task type.
// It presumes single-threaded calling, so there should not be a multi-threaded re-entry.
// The only caller should be the one work poller thread. This does spin off other threads,
@ -87,6 +92,8 @@ top:
return false
}
// if recovering we don't need to try to claim anything because those tasks are already claimed by us
if from != workSourceRecover {
// 4. Can we claim the work for our hostname?
ct, err := h.TaskEngine.db.Exec(h.TaskEngine.ctx, "UPDATE harmony_task SET owner_id=$1 WHERE id=$2 AND owner_id IS NULL", h.TaskEngine.ownerID, *tID)
if err != nil {
@ -104,6 +111,7 @@ top:
ids = tryAgain
goto top
}
}
h.Count.Add(1)
go func() {

View File

@ -324,6 +324,7 @@ func (s *Sender) Send(ctx context.Context, msg *types.Message, mss *api.MessageS
return cid.Undef, xerrors.Errorf("marshaling message: %w", err)
}
var sendTaskID *harmonytask.TaskID
taskAdder(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) {
_, err := tx.Exec(`insert into message_sends (from_key, to_addr, send_reason, unsigned_data, unsigned_cid, send_task_id) values ($1, $2, $3, $4, $5, $6)`,
msg.From.String(), msg.To.String(), reason, unsBytes.Bytes(), msg.Cid().String(), id)
@ -331,9 +332,15 @@ func (s *Sender) Send(ctx context.Context, msg *types.Message, mss *api.MessageS
return false, xerrors.Errorf("inserting message into db: %w", err)
}
sendTaskID = &id
return true, nil
})
if sendTaskID == nil {
return cid.Undef, xerrors.Errorf("failed to add task")
}
// wait for exec
var (
pollInterval = 50 * time.Millisecond
@ -347,10 +354,10 @@ func (s *Sender) Send(ctx context.Context, msg *types.Message, mss *api.MessageS
for {
var err error
var sigCidStr, sendError string
var sigCidStr, sendError *string
var sendSuccess *bool
err = s.db.QueryRow(ctx, `select signed_cid, send_success, send_error from message_sends where send_task_id = $1`, taskAdder).Scan(&sigCidStr, &sendSuccess, &sendError)
err = s.db.QueryRow(ctx, `select signed_cid, send_success, send_error from message_sends where send_task_id = $1`, &sendTaskID).Scan(&sigCidStr, &sendSuccess, &sendError)
if err != nil {
return cid.Undef, xerrors.Errorf("getting cid for task: %w", err)
}
@ -366,10 +373,15 @@ func (s *Sender) Send(ctx context.Context, msg *types.Message, mss *api.MessageS
continue
}
if sigCidStr == nil || sendError == nil {
// should never happen because sendSuccess is already not null here
return cid.Undef, xerrors.Errorf("got null values for sigCidStr or sendError, this should never happen")
}
if !*sendSuccess {
sendErr = xerrors.Errorf("send error: %s", sendError)
sendErr = xerrors.Errorf("send error: %s", *sendError)
} else {
sigCid, err = cid.Parse(sigCidStr)
sigCid, err = cid.Parse(*sigCidStr)
if err != nil {
return cid.Undef, xerrors.Errorf("parsing signed cid: %w", err)
}

View File

@ -217,7 +217,7 @@ func (w *WdPostRecoverDeclareTask) CanAccept(ids []harmonytask.TaskID, engine *h
func (w *WdPostRecoverDeclareTask) TypeDetails() harmonytask.TaskTypeDetails {
return harmonytask.TaskTypeDetails{
Max: 128,
Name: "WdPostRecoverDeclare",
Name: "WdPostRecover",
Cost: resources.Resources{
Cpu: 1,
Gpu: 0,