From 482e1110c2e94c2dcdd74015d382cad815a7ad94 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 1 Jun 2021 14:35:30 +0200 Subject: [PATCH] precommit batcher: Improve error propagation --- api/api_storage.go | 2 +- api/apistruct/struct.go | 4 +- api/test/pledge.go | 4 +- build/openrpc/miner.json.gz | Bin 8040 -> 8066 bytes cmd/lotus-storage-miner/sectors.go | 17 +++- extern/storage-sealing/precommit_batch.go | 86 ++++++++++++------- extern/storage-sealing/sealiface/batching.go | 7 ++ extern/storage-sealing/sealing.go | 2 +- extern/storage-sealing/states_sealing.go | 16 +++- node/impl/storminer.go | 2 +- storage/sealing.go | 2 +- 11 files changed, 94 insertions(+), 48 deletions(-) diff --git a/api/api_storage.go b/api/api_storage.go index c2f3a3d57..e50fedc19 100644 --- a/api/api_storage.go +++ b/api/api_storage.go @@ -83,7 +83,7 @@ type StorageMiner interface { SectorMarkForUpgrade(ctx context.Context, id abi.SectorNumber) error //perm:admin // SectorPreCommitFlush immediately sends a PreCommit message with sectors batched for PreCommit. // Returns null if message wasn't sent - SectorPreCommitFlush(ctx context.Context) (*cid.Cid, error) //perm:admin + SectorPreCommitFlush(ctx context.Context) ([]sealiface.PreCommitBatchRes, error) //perm:admin // SectorPreCommitPending returns a list of pending PreCommit sectors to be sent in the next batch message SectorPreCommitPending(ctx context.Context) ([]abi.SectorID, error) //perm:admin // SectorCommitFlush immediately sends a Commit message with sectors aggregated for Commit. diff --git a/api/apistruct/struct.go b/api/apistruct/struct.go index d70c6aa0d..acdc0f9b5 100644 --- a/api/apistruct/struct.go +++ b/api/apistruct/struct.go @@ -650,7 +650,7 @@ type StorageMinerStruct struct { SectorMarkForUpgrade func(p0 context.Context, p1 abi.SectorNumber) error `perm:"admin"` - SectorPreCommitFlush func(p0 context.Context) (*cid.Cid, error) `perm:"admin"` + SectorPreCommitFlush func(p0 context.Context) ([]sealiface.PreCommitBatchRes, error) `perm:"admin"` SectorPreCommitPending func(p0 context.Context) ([]abi.SectorID, error) `perm:"admin"` @@ -1952,7 +1952,7 @@ func (s *StorageMinerStruct) SectorMarkForUpgrade(p0 context.Context, p1 abi.Sec return s.Internal.SectorMarkForUpgrade(p0, p1) } -func (s *StorageMinerStruct) SectorPreCommitFlush(p0 context.Context) (*cid.Cid, error) { +func (s *StorageMinerStruct) SectorPreCommitFlush(p0 context.Context) ([]sealiface.PreCommitBatchRes, error) { return s.Internal.SectorPreCommitFlush(p0) } diff --git a/api/test/pledge.go b/api/test/pledge.go index b4bf88b59..08548dc60 100644 --- a/api/test/pledge.go +++ b/api/test/pledge.go @@ -169,7 +169,7 @@ func TestPledgeBatching(t *testing.T, b APIBuilder, blocktime time.Duration, nSe pcb, err := miner.SectorPreCommitFlush(ctx) require.NoError(t, err) if pcb != nil { - fmt.Printf("PRECOMMIT BATCH: %s\n", *pcb) + fmt.Printf("PRECOMMIT BATCH: %+v\n", pcb) } } @@ -319,7 +319,7 @@ func flushSealingBatches(t *testing.T, ctx context.Context, miner TestStorageNod pcb, err := miner.SectorPreCommitFlush(ctx) require.NoError(t, err) if pcb != nil { - fmt.Printf("PRECOMMIT BATCH: %s\n", *pcb) + fmt.Printf("PRECOMMIT BATCH: %+v\n", pcb) } cb, err := miner.SectorCommitFlush(ctx) diff --git a/build/openrpc/miner.json.gz b/build/openrpc/miner.json.gz index dd1b12856db99e2cd59453cf86b081faf57ded17..f60d44df02aadd18fc0904012e557bc37e6711c0 100644 GIT binary patch delta 7407 zcmV zcL&`gy`v*MC;I4zP7$@BKGH7RhL>aGm>Qi~IK;q)|vU<&VPcP##Vsl5Q=W!Ep%(V*)R_oepZ8~#RJNLMb_ zyzT|2(8a)i>aG!d`3*mQ!+-z%x8Bjcm2Cs}K_BUQM|Xe=Y|)FA%f>W&IB^a9246{lzj)6So#RBEUTkuKD>smjWD2a?fE(Kz|4Z5zGLXV2bMJh z#%;ij{<|;4yw*XNJG=Kkf^suLEC_I{ zgt{(0MZ?q-F&R;s6GlDOfxymxsSeyZVwvh?epwYGw?7!_Bi#V_f2alw z-e1jyz|<^a-0G}@4xmixCkf{0zKYp)euMjJBn^fVgeSJMqA&oU3IHD?Aj9{sH?tM8 zD8l|6U*I|6;LfI<;olX&Cb6+Foubk)O;7;k$xZ}FzEKp<8q~%Ca0+=LGFQViRvemt zV*#5%aXvGcY8NKN6wOh*Y9IxyY9x(MhiPfGlV+J}AH9lBO)1fe1I$9q46e^%IR&Gk zA&!e$QsOvX^z8!`m2GGu-kBM~z8tF6O!K1^NkD7S=TdD6hi5>VeIM$OO_|VT|xX`;O?6z5j+UKV5&G{`>9O z`1j9~f4@DOzWev&?D|&?;ydILY%{RC11@5AzUD!yQKEUohMEKDvZDcP^0vNzVb&OX z7VG5qCB&KmEQ__67$)*~4-n>kWcoNShhzMYfR= z?TtjWefQ%je3D&fFGL(k*Xom$&i^`3u`2EO z_xoAA%9$CTPSP49Q*#+#co0)h1DXe|`GF`iLwF%R6sAF91~fGkQtN6LR70LNC$0@m zZT6r6?N5;#+0lSDCMGnr7vGo2SZWBH$l%c|pxG;nB7?m>aDZ_O&9D{5+Anz=31}Z# zsU`51-Q*(d5B)U{d(Iz!2D5*F0m1OQXD*K(hG2L^XG8B6+RlH-{ePX+1F@a|%HnZy{wx zl{Ak}ZX+6Iy+avGAE^QxUiWZ`Lp)%WOgI=@2x3}3Q%}?z6UzdB6uJt{rYO9_<~xTN zOSN^HOE800#`1SZpCD+!N=t^kCeOrDT`Ia`mQ4Af1~uz^t|3P*{=;FM&rtJ0`=tGGi3EbY;Jc1-zwl`Sb{%&x+qsUIazR z9Wviq#JII;0e^}+luQeRwxa_4XA#|yE{IIUbtO<}M5-+L?~#F-GQajzaPopgQ%EqJkFmipt*J|Y}{OOojK zhJ%j&5!|PriDN5*^L9G=HL{^cf$gvo{a(MLzq@x3>y-a<3jg~D(MN}a$Bxd%0K}L3 zp^Gyq1|6N}A&&HaVXODo^zG8Xzb)ba{_>0dSn`-If2bvD1YK20CP;n_r5cHEs#v31 z)(M};(upQwW>h*Bn`M>aYaCX8u49>*CUM3oF~NehDsA`f=%o)$$xR&j`SJavCkcIo z=ozj_)CT%EB?;oYK@lGy%fS2^`OV;cMhDX%pIu1p07a4T_4a`VDE=9H{1}c1*C*h? zhM4j%a0e`g!>NZC*l;26AP4Dkkeec}lwK!mUM)8-KI*cNm-s*xVQIgAWLkDcp%4{6 z86N_x@@9c7LXGVhkPvUT4Yb<^+HC_*1Sq*q;3e{CJXlk0Nkl>`Z%aYE;CKlzhJ||z zi2a=V^xKcn^S}aTZVUKC zm(ZQSMAg`7VH7N2@d3quYloSImDG+mUfXHG592YJ&s5$*IgihD#&|BtqH@PB$s&`c zpO0Uq!&%PS`mxb}06Tl;HrlRi!@|l_ba|t2ZK?7cyOo?|vND#LT>EVH9h}Gxe~dwI z<#qEl8qMrCOA~E(owd8p+FfVuuCsR6S$Xu&jVCgYy}$-x=tXOP}~ zueDfOi)A+~mVgs~D}mr-HM5YntW@_3&P#HCkW9I8-njzbdBSFi8idgD-3r|D5Q-(P zc4A$;+yV1tO}>~3tH!mkYz)E=2VN9Ib=EB=#qDIp^5R_QIb+8<}GP6pYP1Vib?Hq%u(B%yKSEbHoE#W{iCd*zF^b|7L(Z?&-ExoYs zTZsllm8?Y7kzT?DFdil@p~N5(gEi0MoPY<5fNGbIJVBFM3>pHT=96{|ApyOUm<(Hg zzx1`T`sk<^{;y;TUoEu%Nn>kmXs8Vh?Lufs$@oYLy3)tbz>y#NoVYfiVpkeGpvZltSB~o<@o0iudh~+jhr)ZM)-ktMHYZK{2hn<8~_Bamzu9eNM0Y7A-C? z<;b-5V90M#9xnmH+@k7e&Y`$i8JH$CLyNMbId8I_D*;>B#Wi{I$(lmBYd@zw0%uRNE6gMrr z50H*8&NCNNRJd-PNWq43#)l3QuR@6nyMCo2Zp>(8<2tF`RQ`msgAOGDe^s*FJ0TCb zt8uCJi*v-HM!pdD7P^8&V<}zhV^tK2X}A|gA|7JHnch1V{~ToeC-_G%DNDk=2wg6P zmGNSsv%&p^AlN_Z=r+Q!U+YfJw8}o9%>qFEb9D2vToSCW${?|0?VEPl%P!?s4q>W> zNgMd;>8zbBcu@^}6BFvAe_m{(X7VJ~vLS0uPU(n!>Bi7@k-5HNlwI~HRKj~md2&}( zt;p+_Qi+unV9?Z7PLgglT&v-VG+c|dE!M7Ktt52o0~o@qKiw*2NM@^-Zg9_FCWcb60++{IDa~jI$FFs;G5flP*u9H?0C{70?bU zAlN!Sts;6ZiYRvxJb~DCeV>R>B}F6vX^c#-K`qp}L9G_re@QJAal0aygbS37og6c5 zlwL{3iyAMtsV)8)wsJ;Gt8QC$TSDFD<{J3N+}6xGi&5E5jzO})Hou@%r8}WYTl8(w zH;KLjNqzw{HV|H%&4`;m*jWX5;{;SOw*Xq#NN)VF$re#tL~Y?B2^W$@VP7<-#uA$6 z2rTMAMo2^k3>)H_wik%u0u#?4kCQJFBY&Rm-mlmfP4z1IP5}qy$R)Z|&AQob@fb?h zu>8;kkFUMAR$jGZJ5p3+hZ=GY6^e@&eha9vyn7M7vMHP->UUq|+03y+eLs4O@=>K&>&==QNt5E)+#Y`_yFETGYX04| zwb8?;Bg2m}=RRhWEk+H$tmgXi4m1mSR3mfE@!$C95da1y+6;2}*AV8{pkjDWBOD2k zq5*WsJyA0Cdb237wSy%0RH`J=MKOOLAb)*#??8hxGzHfA$^|}$RFy9Y#cWBV$cEjZ zh?=0u>F0@@q?WdT)Z?NA{kIzH5?9E;7d$3+Oa;~!tHfZ_7i+G)-OM$KN4@ZWm9Ko< zZGM;Cz#Z}dpMPk9hk8sNl4+8&<3tPCIV>9SFlSx51e8ra{BAuQ7%t=nrr_jn_GRcfMA_eb1{Mp z%1W|d94Uw}pdeVl(Vz_!w1I*Q0m}^(TtS=M)v+H1fky4dkYwIcR#cFG#yco{CEvH7 zyTlH&iM0!=nFMnEN1usDISec#}*t z0X8*EB7*Fhh3;S|so#-hX$Dw%P>YzINj9(;XLlQ0R%?ykpGh2L*QAOWoHib}lTefi zY{Gk2l2&;GMdB-|sd*a;-sK6=PZPChWho|0xgnBiGcmzweoU~s)WoSOh)NU~^mZXI z*b39036t#BN}mB4r}zm(b$^5@My64lagA`HJjIGLvyVAd2Cyt>YFq&c6AoeRo$b&E zjkwwufGEEuStLby-!LT_bU7(1oZWJRyxnB_~d`NNmEg zkspuJK#r==!nybyCg6?lUMWNojHHJ_uAqlGsM?+vt615z7s6@QeFv z$Q7?&EKFiz)C4LZnSU)s?4<@Qi#bt?Fizn$%JY9@+iICbih2KQe=zLy2ZuMibL!7- zbFA#5s~CG+;3@Z^>vC>DN9GTGy#Wr20t6J+;} zq{dhA2&;9|q#|Rsg1Os*nH!++uHd|uK(hMTs>BA)SHW910e{B=3z}DOt|)pw>DsA- zSqv)gTW=k#c5FcAc?sV4Ua+js+KGY3aVkk@FudC+KWh`T;MrT zl+Uoa{PSQwKz}NNg}gZPm4LQDcQ6RJGmre@{24|0cHsW3EJ#f9K>E9`zg9mxf+5Ih zM;ErrNT0YomqzdCV`AIq`iPk(;>T6+L!1#k(*K36-e1$VO9TJ5g#Y`?FM3Dk%P~Q@ zYh>ICIQx?y+u}cd=MZDbfVAK1b08DPh2@nAlkRweXn)kD#DgnHT@Iv&gN}Ypf;@cx zJ6{5izH{C6qhvbzB?&2rvkCd&PCELTLD8L9FBKi>=S`Wdn%bms%@2!eP@Ta$IvaQbuRjT=u zRkmvUJAW2v)0FD+*>SUp!Qr&2eD*j8sgz%1nzA1tL6&)kCd74 zzOl$=dMt`|4L_ds_-NigJvo^j!1=IuaDQkX!-Lrn8V9G~XmC6~oPjssMDOUIuP(#j zT41{$4m$craPK!PevbjaW+UKB_LFr_{4RTVazHD~nTRN7z4d+1RE!|jTNO`Aj5_8> zr6{hu|B+v_F*`3$-zFGX^~h`B00Yqn9*jM^1IPk1*7MVj{(hSLeFZ(}-m$)%@_&!X z|NQLn2W5kX5n-j6Vc1EP(usa6oxaV+qEayW0!sYjsg-Qc>}^1!$d=KBz`unaJ+mxg z)KMxSk>aDi1=6!#Tt7HaxErA@KSlpQewR^Hegf$7nkSIH_qA>arFgY6b``S3bR4GR z3V}5lizN7KaAo5OiLP$d&}~k2 zSN))asD9q=R9~8JLVbA-4NDPoa!)*HWP566?MD zx}}?SpT&OEK~H+OlF#3D^95zef+^_}_^2v&KGiMAU6?qSCvYXQ3x8E5O@D0y3SK*a zF7*N_l5JetGfvG9L|+o)wkD(AHWQ(#7mN1$v9&048+v&rp_iX$7;TI?LOtn4SpqGt zphH~xD+Cs;l|UV*qqYR+{}NS1bv1xnYicVSYj(+HFT|R)uc~^=0tHlfz|AE1cJz!> z3`%0WY)5#|*EWN=W`728?Y*)s)W)DTKmO?_*;j^GR$nFlnaJu_^oM-f&p!0Iz9XXD zFR@Q(w_pCO^l~;K(%0@oU6HZ+}6R3RF|Jp@DzClI}Vhx zYxBMk_ZGTif-#eWDma;-P-{o~IwiR!mME!FXj$SK6--R1QGdd^Al?w|J1^>KSCesr z`-j_o@K49dk`T7wiQmYq+R2|$b@@WeYK6@1=g;Bdpa9CSW|8_%G_kQ&mx4}aF2WSo zQPKEteqx@Ug2BPj$HA-kYEFjv$0#0z`16WlYPbRzv-S0{!@? zchaAM-oa6SW`7 zsF92akob%xsEG>Ao^fV_`wKy^f7H=!gk!(flQg3F*?$=mFdh7fEnbJ*pninkZVoPF z<{$c;xHe#HZ2}4pD6(PJNr^&8Djesyr+%jFaz_D?QwnGut;6f9sGW6e<`vb{PSC~{ z^iF0k3-pexPIG$aht5;<5}+!Oo5x(p$~AnMg!W|l;2{H8`e<-`cyfGv+B-TuJnXPn z)9=91M}Pgn$;shquYYoK(&LY#sLh~%czk#=JUTq-=;O&}Paho}cJvSYZ(nurVe(n) z4~NJ44U?637|g6lE&>=Lcsc!acrtjQMYU)O#$@JY{nKe40wf z;|872$^;BMo0%r)&qPz2Ffo5+M$2bKWvn0kk~HcWi3v7RiV()RNU?(8#-{XLoZl2X h5VGqw7hHB-S-jC7A8#K2e*gdg|Nk*s*L0(g0Ra2LTj2lz delta 7383 zcmV;|94O<0Kj=PxABzY8000000RQY=YjfK+*8VFTz8^N}$ciqGFPiBGM@~|=PNTJ) zX19rF3z3k7H3@J@$g%73e}4x6uOdN$7b%L&ZMTU?-~b@bdCmm~z@ttah+W6fM|!_I z=pN}E9pO3AM~^y1)Pnj*za(_!X;+i6b~d@tJNg}TJwz~nuW{P#9k|9&f9mKqq)TFY z`smA}j)Bc;zC#u?2*TYngA(`5G+pR)xU zZ#udQy_H4Tm-C0e)}_?I)9({ad&UylfYF2Zz;-Oi+R^8|{$O~h-*i}Y-VI<)T;f2N zB35D!EDv^c`rtrToXq|U4XWQfvHtyUgWl4E#}=yEmFTxb&tH4EWey`!&OOCRY=N*!eYf5=^Dg-HTN0fz;Oxd*DH(h z)~lICX0L4EQRuz`4r2fJ_$PlQ{^%~q|6TTv`+fcCsguuLV}j>sQJcpE840I3sGBHO zhSx%q{s9etJ?aw|j)`p}%D#h3EPaGmmetWSA6`S(Mi@}I_WT@qU}nLO-!OFF0n3^J z<2K+%|J`Te-mbrVDq?mg_ULiFOOHCD(B^LxBqk7;r|ziqh(-pmbTp^0t!V@XHZ#M0>j zq8UcEHq?aQ#Od$863fKJ#8*s@Vg0pa2!Ta}SN06LDKfwF#~Si9poK+Z!yXGG)zqqlM$skVbo(C2<)7H>cE{NmZ@ImmsK%x`-7oA(hY$BK{a6T z{%S4+re+c2R%ab_0A*4?NiawEr_mX%8%6P~L2VoWr;rySb2Ut3#i2QW z7O)u<=QD$;c40zH(HzCA22#MPM$-8GFfFZi(kxT$qgTrfIx$e5~zF)Ky0QNC&s>$`H5#0yGH)F*rgU2c#!%URiU8{#f0@s`83%3-fn zZ23Pn!e3cb8{#er>6W*)%UgMd{Dtihm!1O(s&i6ALl(&~kl7t0lgHD4m7+Rnx?72t z`~56l<;;vvCuxn5skw|VJcy~M0nLNf{6LhMA-oVD3ezAl1DYBNsdcprsv%FC6W4~O zHha*3_J_!g>}Wt66B8QRi*HM0EH#8pWbkMf(Cn2(k-^>`IKa4tX4nd2?Uy``1hfyV z)Dn2hZgLU!QGdn5p7V!)!R&8fKrsB~naiVxAs8Of+0eU%w)0XJ5L+Wj$>3bqbp2v2HJX#$)SnX_W^W_FOsQUr9w~(@; zN}9(fw-F7q-l2@8k5qvTuY0(}As(XT|R*FM=ZE z4w-K(V%%D_fIme$N~Q%u+ff1jvxsg;7euDwx)LZfB2||B_sGCZnO}PxgR&Y-IJYpo^&wQ(Ru<1Oir)CygM3b)u(7Oa1X@9}tdzB}w#q z!$C*?0PfSz#IY5@c{?5b8rjgJz;;-Pey`ur-`+cjb;|!ah5!4V=%d5IQ%7fG0OHF% z>f%g_K}Y9#h$H>q*y{Z`eX}(1uS@vPpMTb$N*>eY54A*%psOm$1j(C(> zI^h#pI?+VTj7rC1v#e5ljl;@+bu2T}B+fV`CRngmrS0Awz4W0exrrk`KE9vyB%zNG zJ;ODL+CU$tBtd*PDB=TT8JJ%qzZtyG=wKS;vkR#mpePc)-aOC%#Xn(>AHxyh`UE`K z5L5mI?tsN`IQ8%X8!iMMvbVd35a zc`F;-X^_!R$(DoclAGOo@CJC*GVm|F3ydfNl(<>h<*dLIVn63T{q_U&Jg|V7+X6n) zC3Gh+Q8jj27zGPhd_eJk+F@p4CAH&?*LGU)!+1>QGnKbc&f_zkF`i4ZsNAtjvdEu(0wJUEU~MTdF+AZY3v~tc+zQ*FKwl2Pd+_A7jv4 zdEI=CMl<`((nQ-`XYH=DcGp?E>#W^%Rv!IxgxWpl5dgSzujBvWpjcdo!Up0HV>1|hV3w*t33gkp)S zomdwycffpElP_k%s&OqW8-wt}ffvP4opp;zaXVSDyg1i+4q1!k$-npWF?VH0H%EqJ z1A7r=#AL?q`ucN#%&Zb;Q+2a-xkNH5_67!MPdP+|~?!J21rPQZgjK()(9zC)8-3>pF-=aYC0ApxzEnhaZi zKlQb;`sk<^{;y;TUoEu%L1SxeXs8Vh?Lufs$@oYLy3)tbz>!CNPFx#Mu`3N8P-Md# zi)Fu(TxFM!_*x|RT+OJvG__2vr;yj0O==@r?zv_w%B~I(6L1ARCOr*pvCW{iwLP}k zZL4mw&8V#bN}=sgPoqS0#rt;0ZM)-tw%u{NRrt!ypqSR(aXXdmxaAEW1TlAlV0U7u<1Q1A}8!wgbS)23z!P(YHn47JYXSe|;ruO#;TQAgCRR z4{4M_-WefQy%2o7-QpO03&Jf3ZwZ9Av&*{;JhytFj2_5ck-dTz1RfMS&|o@MrHnR6 zP}+_)o^G_Y$XjKw6Uv}P;ueV$NPODMT>_j!V2z1wBbvq=brgYG;I?6tn>=dS!t`C&(}8D}RVR8i}|CS9IHZ(0S^Dxe)y zK(KXuT1E6+6jAOXcmlEO`aTh%N{UDT(ioXugIcI{gIX=Lf0J4$;&w$Y2^T0EJ2__B zD7})57d2jPQ(OEqY~_rWR^7JhwuHLP%{B0kxviOX7NfGA9D`(oZGJ(kN_Rq)w&>fU zZxVe6lKcW@Y#_Win-MpCu(JyA#tEomZUMBek=*!UlP#jQh}yzM5-uc*!oFxujU_bC z5m?lLjF5;73^v3yZ7&eR1ty-4Pm?hcBY(c%y7M7vMHP->UV$2vzcRu`hM^h<)ccm)|)jqk|xEoxjp_=c6)qW)cm_^ zYomuxM}{9|&V9@#TZ|fhSD{`uOZB@LB;T%MmQ26 zMFZ%Nd!l6O^=46EYX?d0sZ>d#i(>xXLx1}A-hl>XXbPV0dXng3b8{;NHF??k8vA0<+o*QWhau zWFoQ*#KaT^G&4zWJg~Xm&5g0&X?rJaeGZr1Nh@YB%-g9OS4a|)TtBIJU4n#S9;PP? zXND8PBH0mz8FS42a6(vgOA&>iWPjH-{PfIktnyu-Kq^b!H?`|>nd`8J99Z9HUnkX^ z+nzyLNk)sE_efR?{C6(3t?%CY?im7>>$_h;o7~kg=>&mBO}UU{-s)0RkblNIDBKs{ z-JZL|4ikJCWSazXJw>5wBMc~aRR}&XmH^?7CIH(2_$!`4%_P{@ilUwCpnpajV^&7G*%>XM8Y7w(D$p#kV>~3Q#U9BDZBZ;Hz=29_()5b$-5{fc` zO*qs_(kgGDNPHzVHE%=lv^*jDVFC%QEX8CgH)=6$CQLWY57SkbnmAPjQHf%8-Y&%I zT4CBVVUk@W=`$eX6hEh@j(;%4$TVs*t`RPjr&w`j_A#f*0G0(!jVmBw!Xd1^wH^AP z5m);R5akyai##RoE2c!FE?O+08zwVB;93u@VfQ@cCpVB0x)3#lC#2D+}I_+j;^_6Cl9U$5?t?%@B#CYGzK!0O7O~tA2fs_dhFtOb#lj>eMopjslG#$kUTVOym=m=K z;}l+_JpX&Pt(IA&n1A=b^asOEe{guSI|uRXHpj~Do{F)@1)g#rx-RDibY%X}mmA=q zC_q4AUA~SrD0h`OkH&_P3fOlo`;PnB9XO)4^GE10`2n7IK8?+VUq z2_&nZtx9a*d=b2L6L2iBpm_!7ilXO}uAR-7#h~)er9MV(QGb;^&8H{5UN5%0`Qn^! zYwqYD=W`EI_T%v6_3=@!6J7+*e?M`S%-Nn4?j0&&m<4UBSY}aol}eYr|2?*gs(9bh zK1gD2M8jiHzN#gQf`V$8FuWY>rS>9HC>MB66y-B)F8_#@_mGNUAurB+C7>mJdpmb>#x<1b6^N^+R=rrGSVk5&!y2j`k2`Exjtg1iTH69 z{1C@BkMw_ItM}*h&CTWZ+~5P{dAX(en~9n|)=Nc4I{Mpt2f4{dY{Z`-S}dtP(tCPG{{y;cj-Z)(ae)y~aTW#Cxlv5&Y>byF1orgeuki$tqhl{v8XnX-akZ?6}#Tyy3K|eD*j8 zsgz%1nt!q%AmsXt&DNaps~IZ4A#7&Sl%HwD;J}6+1-7F$@@p3uxrZZ3O_WHiRN6^m zOIhyp;o?!}YXj^rGlYTr@D>|{M_5Pt40!P9FxJheb_iK&7GH=?8!!-k6fU!kUtqKf zbsyWew4*-87`|G|)!YwB%k6ysmxl11FiXFwK(D0dnZ9~u9#LMbmd6%STOkcrya$_b=DxZ7GKA|1+G)phbVT_zGc*$pthAf1`!B{4wy z(|;%lUXq6VL`pVyMZ%S$wS}W7T@h|jV9A8z-tuJwd?{Og@cDpJU{uWCic`otB^7!> zYa+Pm479noqR8sL<1tJ{jt$z$R|i3Zex%HV_l-q1(_>MzYxs$&$4B%2>B-6L0M3WK zgG2Ke9?XW&I5-7IgX8((47>&>dPo0sb$=NK*8#gs5reXxK-l}*~V$?B5Dn)VK{g3>bjoEp5`ZmG9s;5u`2N;Mx z@L=rO9Y7YCv7VoH^mo(b?RpC>u142rI=5!%nJ{PV`&pK!0pD z7L|h07f|9KPpxEojBW!OMYfD41pW>5=$T~^qmEJui4>o{Es&n|;`+gX!rcgM`6>Dv z^6Q17@)JOp*F1sr#h05b^;F=Dr#tu^E*we@dSkvye>1*HVtO{#`QhR4_382OWO#Ja zz%bIc!K7JA(Zuz)b2Vw*W^r=XaeuJYESNfpsL#)~*vB-cK~H+uf|!bPXjs67{t@~$ zkUv4Fz^m{tx=E&=C%`_X91i`A3l)!Jy(@8?XQVcc^?ZnGprYhmRNfDvhUUtbOh~Sl zC}-DhEyKM*Mezfv?ZX|)jCrn>w)DaZAD>*}Gyu3Em4?w)5oF`dZ`G1+(SNl?*R7&! z3vCg!NiQ1_sPGP9vp%#%REk$CW49AaOvhn5t`Jz0u}Ffy23Izokm%}G4b2u|RPONo zF%TvfD1XSX%PyIrav{$NwPY0i(E=_L7m5C6)oG(Eji|%?RoYA*E1?Kk{X*;hrS_t7 zINWbwE4Nziirt06h7NdDRa(i0bF-4=yHHiq^j9XJ;FSaDQZJAq*~X^nWEWZfi2?Z8H&? zda-D~A6tt;x1pD35_Lv^EKop&2i#18Z%5BK#h@g{%XWkZeQh&{Yi1DF-YeTeZ47Gj z@Z>pLRa{Sy0xcKhYeN-t*YmjV8?+{c5U8g;@(1cOfY70Pz5Iw6l(2gU#BFu z#1bVn3N1@qqk@SEHA+|)#2cc0<3&B~YBFwc|8Tnx{`+yVB!7f0c;Z(wt9J5dR9(K% zvRWat`{`4-I4FQJtXZVK6HRPv)uo`5nTs&RbyPGyoS&Gdr(ke!baM3i;IMykdT=tI zgM-sKocHD@y(0)=m;ezRX&KYYemwpm%W8pP2`TufgksnbA8r0Q27K zdEXcu!PjPViho)G#cSZ*G8u-zdUe-(#W>qV> z8{ycm^(2jGes;zLOb35ri`OAHs2}0Cn}Z9P`B9$}*MA0#txZ7T0Yx^4wqkqGbjy|4z^7PTsVMl+@|Mo=(?FMU_{{sL3|NrYf JdLTBB0RXE=QaS(t diff --git a/cmd/lotus-storage-miner/sectors.go b/cmd/lotus-storage-miner/sectors.go index c76d4a249..2476c16e8 100644 --- a/cmd/lotus-storage-miner/sectors.go +++ b/cmd/lotus-storage-miner/sectors.go @@ -1059,15 +1059,26 @@ var sectorsBatchingPendingPreCommit = &cli.Command{ ctx := lcli.ReqContext(cctx) if cctx.Bool("publish-now") { - cid, err := api.SectorPreCommitFlush(ctx) + res, err := api.SectorPreCommitFlush(ctx) if err != nil { return xerrors.Errorf("flush: %w", err) } - if cid == nil { + if res == nil { return xerrors.Errorf("no sectors to publish") } - fmt.Println("sector batch published: ", cid) + for i, re := range res { + fmt.Printf("Batch %d:\n", i) + if re.Error != "" { + fmt.Printf("\tError: %s\n", re.Error) + } else { + fmt.Printf("\tMessage: %s\n", re.Msg) + } + fmt.Printf("\tSectors:\n") + for _, sector := range re.Sectors { + fmt.Printf("\t\t%d\tOK\n", sector) + } + } return nil } diff --git a/extern/storage-sealing/precommit_batch.go b/extern/storage-sealing/precommit_batch.go index bce8e21d5..dd674d331 100644 --- a/extern/storage-sealing/precommit_batch.go +++ b/extern/storage-sealing/precommit_batch.go @@ -18,6 +18,7 @@ import ( "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/chain/actors/builtin/miner" + "github.com/filecoin-project/lotus/extern/storage-sealing/sealiface" ) type PreCommitBatcherApi interface { @@ -41,10 +42,10 @@ type PreCommitBatcher struct { deadlines map[abi.SectorNumber]time.Time todo map[abi.SectorNumber]*preCommitEntry - waiting map[abi.SectorNumber][]chan cid.Cid + waiting map[abi.SectorNumber][]chan sealiface.PreCommitBatchRes notify, stop, stopped chan struct{} - force chan chan *cid.Cid + force chan chan []sealiface.PreCommitBatchRes lk sync.Mutex } @@ -59,10 +60,10 @@ func NewPreCommitBatcher(mctx context.Context, maddr address.Address, api PreCom deadlines: map[abi.SectorNumber]time.Time{}, todo: map[abi.SectorNumber]*preCommitEntry{}, - waiting: map[abi.SectorNumber][]chan cid.Cid{}, + waiting: map[abi.SectorNumber][]chan sealiface.PreCommitBatchRes{}, notify: make(chan struct{}, 1), - force: make(chan chan *cid.Cid), + force: make(chan chan []sealiface.PreCommitBatchRes), stop: make(chan struct{}), stopped: make(chan struct{}), } @@ -73,8 +74,8 @@ func NewPreCommitBatcher(mctx context.Context, maddr address.Address, api PreCom } func (b *PreCommitBatcher) run() { - var forceRes chan *cid.Cid - var lastMsg *cid.Cid + var forceRes chan []sealiface.PreCommitBatchRes + var lastRes []sealiface.PreCommitBatchRes cfg, err := b.getConfig() if err != nil { @@ -83,10 +84,10 @@ func (b *PreCommitBatcher) run() { for { if forceRes != nil { - forceRes <- lastMsg + forceRes <- lastRes forceRes = nil } - lastMsg = nil + lastRes = nil var sendAboveMax, sendAboveMin bool select { @@ -102,7 +103,7 @@ func (b *PreCommitBatcher) run() { } var err error - lastMsg, err = b.processBatch(sendAboveMax, sendAboveMin) + lastRes, err = b.maybeStartBatch(sendAboveMax, sendAboveMin) if err != nil { log.Warnw("PreCommitBatcher processBatch error", "error", err) } @@ -150,10 +151,9 @@ func (b *PreCommitBatcher) batchWait(maxWait, slack time.Duration) <-chan time.T return time.After(wait) } -func (b *PreCommitBatcher) processBatch(notif, after bool) (*cid.Cid, error) { +func (b *PreCommitBatcher) maybeStartBatch(notif, after bool) ([]sealiface.PreCommitBatchRes, error) { b.lk.Lock() defer b.lk.Unlock() - params := miner5.PreCommitSectorBatchParams{} total := len(b.todo) if total == 0 { @@ -173,7 +173,35 @@ func (b *PreCommitBatcher) processBatch(notif, after bool) (*cid.Cid, error) { return nil, nil } + // todo support multiple batches + res, err := b.processBatch(cfg) + if err != nil && len(res) == 0 { + return nil, err + } + + for _, r := range res { + if err != nil { + r.Error = err.Error() + } + + for _, sn := range r.Sectors { + for _, ch := range b.waiting[sn] { + ch <- r // buffered + } + + delete(b.waiting, sn) + delete(b.todo, sn) + delete(b.deadlines, sn) + } + } + + return res, nil +} + +func (b *PreCommitBatcher) processBatch(cfg sealiface.Config) ([]sealiface.PreCommitBatchRes, error) { + params := miner5.PreCommitSectorBatchParams{} deposit := big.Zero() + var res sealiface.PreCommitBatchRes for _, p := range b.todo { if len(params.Sectors) >= cfg.MaxPreCommitBatch { @@ -181,54 +209,46 @@ func (b *PreCommitBatcher) processBatch(notif, after bool) (*cid.Cid, error) { break } + res.Sectors = append(res.Sectors, p.pci.SectorNumber) params.Sectors = append(params.Sectors, *p.pci) deposit = big.Add(deposit, p.deposit) } enc := new(bytes.Buffer) if err := params.MarshalCBOR(enc); err != nil { - return nil, xerrors.Errorf("couldn't serialize PreCommitSectorBatchParams: %w", err) + return []sealiface.PreCommitBatchRes{res}, xerrors.Errorf("couldn't serialize PreCommitSectorBatchParams: %w", err) } mi, err := b.api.StateMinerInfo(b.mctx, b.maddr, nil) if err != nil { - return nil, xerrors.Errorf("couldn't get miner info: %w", err) + return []sealiface.PreCommitBatchRes{res}, xerrors.Errorf("couldn't get miner info: %w", err) } goodFunds := big.Add(deposit, b.feeCfg.MaxPreCommitGasFee) from, _, err := b.addrSel(b.mctx, mi, api.PreCommitAddr, goodFunds, deposit) if err != nil { - return nil, xerrors.Errorf("no good address found: %w", err) + return []sealiface.PreCommitBatchRes{res}, xerrors.Errorf("no good address found: %w", err) } mcid, err := b.api.SendMsg(b.mctx, from, b.maddr, miner.Methods.PreCommitSectorBatch, deposit, b.feeCfg.MaxPreCommitGasFee, enc.Bytes()) if err != nil { - return nil, xerrors.Errorf("sending message failed: %w", err) + return []sealiface.PreCommitBatchRes{res}, xerrors.Errorf("sending message failed: %w", err) } - log.Infow("Sent ProveCommitAggregate message", "cid", mcid, "from", from, "sectors", total) + res.Msg = &mcid - for _, sector := range params.Sectors { - sn := sector.SectorNumber + log.Infow("Sent ProveCommitAggregate message", "cid", mcid, "from", from, "sectors", len(b.todo)) - for _, ch := range b.waiting[sn] { - ch <- mcid // buffered - } - delete(b.waiting, sn) - delete(b.todo, sn) - delete(b.deadlines, sn) - } - - return &mcid, nil + return []sealiface.PreCommitBatchRes{res}, nil } // register PreCommit, wait for batch message, return message CID -func (b *PreCommitBatcher) AddPreCommit(ctx context.Context, s SectorInfo, deposit abi.TokenAmount, in *miner0.SectorPreCommitInfo) (mcid cid.Cid, err error) { +func (b *PreCommitBatcher) AddPreCommit(ctx context.Context, s SectorInfo, deposit abi.TokenAmount, in *miner0.SectorPreCommitInfo) (res sealiface.PreCommitBatchRes, err error) { _, curEpoch, err := b.api.ChainHead(b.mctx) if err != nil { log.Errorf("getting chain head: %s", err) - return cid.Undef, nil + return sealiface.PreCommitBatchRes{}, err } sn := s.SectorNumber @@ -240,7 +260,7 @@ func (b *PreCommitBatcher) AddPreCommit(ctx context.Context, s SectorInfo, depos pci: in, } - sent := make(chan cid.Cid, 1) + sent := make(chan sealiface.PreCommitBatchRes, 1) b.waiting[sn] = append(b.waiting[sn], sent) select { @@ -253,12 +273,12 @@ func (b *PreCommitBatcher) AddPreCommit(ctx context.Context, s SectorInfo, depos case c := <-sent: return c, nil case <-ctx.Done(): - return cid.Undef, ctx.Err() + return sealiface.PreCommitBatchRes{}, ctx.Err() } } -func (b *PreCommitBatcher) Flush(ctx context.Context) (*cid.Cid, error) { - resCh := make(chan *cid.Cid, 1) +func (b *PreCommitBatcher) Flush(ctx context.Context) ([]sealiface.PreCommitBatchRes, error) { + resCh := make(chan []sealiface.PreCommitBatchRes, 1) select { case b.force <- resCh: select { diff --git a/extern/storage-sealing/sealiface/batching.go b/extern/storage-sealing/sealiface/batching.go index e7c2cadbb..d0e6d4178 100644 --- a/extern/storage-sealing/sealiface/batching.go +++ b/extern/storage-sealing/sealiface/batching.go @@ -14,3 +14,10 @@ type CommitBatchRes struct { Msg *cid.Cid Error string // if set, means that all sectors are failed, implies Msg==nil } + +type PreCommitBatchRes struct { + Sectors []abi.SectorNumber + + Msg *cid.Cid + Error string // if set, means that all sectors are failed, implies Msg==nil +} diff --git a/extern/storage-sealing/sealing.go b/extern/storage-sealing/sealing.go index 61360dc12..e69ce5be0 100644 --- a/extern/storage-sealing/sealing.go +++ b/extern/storage-sealing/sealing.go @@ -207,7 +207,7 @@ func (m *Sealing) TerminatePending(ctx context.Context) ([]abi.SectorID, error) return m.terminator.Pending(ctx) } -func (m *Sealing) SectorPreCommitFlush(ctx context.Context) (*cid.Cid, error) { +func (m *Sealing) SectorPreCommitFlush(ctx context.Context) ([]sealiface.PreCommitBatchRes, error) { return m.precommiter.Flush(ctx) } diff --git a/extern/storage-sealing/states_sealing.go b/extern/storage-sealing/states_sealing.go index 6f4c57bfd..815ad6ac0 100644 --- a/extern/storage-sealing/states_sealing.go +++ b/extern/storage-sealing/states_sealing.go @@ -355,7 +355,7 @@ func (m *Sealing) handlePreCommitting(ctx statemachine.Context, sector SectorInf func (m *Sealing) handleSubmitPreCommitBatch(ctx statemachine.Context, sector SectorInfo) error { if sector.CommD == nil || sector.CommR == nil { - return ctx.Send(SectorCommitFailed{xerrors.Errorf("sector had nil commR or commD")}) + return ctx.Send(SectorSealPreCommit1Failed{xerrors.Errorf("sector had nil commR or commD")}) } params, deposit, _, err := m.preCommitParams(ctx, sector) @@ -363,12 +363,20 @@ func (m *Sealing) handleSubmitPreCommitBatch(ctx statemachine.Context, sector Se return err } - mcid, err := m.precommiter.AddPreCommit(ctx.Context(), sector, deposit, params) + res, err := m.precommiter.AddPreCommit(ctx.Context(), sector, deposit, params) if err != nil { - return ctx.Send(SectorCommitFailed{xerrors.Errorf("queuing precommit batch failed: %w", err)}) + return ctx.Send(SectorChainPreCommitFailed{xerrors.Errorf("queuing precommit batch failed: %w", err)}) } - return ctx.Send(SectorPreCommitBatchSent{mcid}) + if res.Error != "" { + return ctx.Send(SectorChainPreCommitFailed{xerrors.Errorf("precommit batch error: %s", res.Error)}) + } + + if res.Msg == nil { + return ctx.Send(SectorChainPreCommitFailed{xerrors.Errorf("batch message was nil")}) + } + + return ctx.Send(SectorPreCommitBatchSent{*res.Msg}) } func (m *Sealing) handlePreCommitWait(ctx statemachine.Context, sector SectorInfo) error { diff --git a/node/impl/storminer.go b/node/impl/storminer.go index 9b6f65207..e10925927 100644 --- a/node/impl/storminer.go +++ b/node/impl/storminer.go @@ -375,7 +375,7 @@ func (sm *StorageMinerAPI) SectorTerminatePending(ctx context.Context) ([]abi.Se return sm.Miner.TerminatePending(ctx) } -func (sm *StorageMinerAPI) SectorPreCommitFlush(ctx context.Context) (*cid.Cid, error) { +func (sm *StorageMinerAPI) SectorPreCommitFlush(ctx context.Context) ([]sealiface.PreCommitBatchRes, error) { return sm.Miner.SectorPreCommitFlush(ctx) } diff --git a/storage/sealing.go b/storage/sealing.go index bd8241197..6a1195826 100644 --- a/storage/sealing.go +++ b/storage/sealing.go @@ -60,7 +60,7 @@ func (m *Miner) TerminatePending(ctx context.Context) ([]abi.SectorID, error) { return m.sealing.TerminatePending(ctx) } -func (m *Miner) SectorPreCommitFlush(ctx context.Context) (*cid.Cid, error) { +func (m *Miner) SectorPreCommitFlush(ctx context.Context) ([]sealiface.PreCommitBatchRes, error) { return m.sealing.SectorPreCommitFlush(ctx) }