From 05aeeab58153674eb3fe4e7ad5a282fd0a581679 Mon Sep 17 00:00:00 2001 From: Ian Norden Date: Wed, 3 Aug 2022 06:35:04 -0500 Subject: [PATCH] Account selective snapshot (#46) * snapshotter ignores nodes not along a path along those derived from a list of account addresses if one is provided * config and env updates * cmd update * Encode watched address path bytes to hex for comparison * actually ignore the subtries that are not along the paths of interest * Fixes for account selective snapshot * Use non-concurrent iterator when having a single worker * Only index root node when starting path of an iterator is nil * Upgrade deps * Avoid tracking iterators and skip recovery test * Fix recovery mechanism, use sync Map instead of buffered channels * Add test for account selective snapshot * Continue traversal with concurrent iterators with starting path nil * Use errgroup to simplify error handling with concurrent iterators * Check if all the nodes are indexed in the recovery test * Use concurrency safe sync Map in account selective snapshot test * Only track concurrent iterators and refactor code * Fix node and recovered path comparison * Revert back to using buffered channels for tracking iterators * Add a metric to monitor number of active iterators * Update docs * Update seeked path after node is processed * Return error on context cancellation from subtrie iteration * Add tests for account selective snapshot recovery * Explicity enforce concurrent iterator bounds to avoid duplicate nodes * Update full snapshot test to check nodes being indexed * Refactor code to simplify snapshot logic * Remove unnecessary function argument * Use ctx cancellation for handling signals * Add descriptive comments Co-authored-by: prathamesh0 --- .gitignore | 1 + README.md | 12 + cmd/stateSnapshot.go | 7 +- fixture/.gitignore | 7 + fixture/chain2data/000002.ldb | Bin 0 -> 918 bytes fixture/chain2data/000004.ldb | Bin 0 -> 35872 bytes fixture/chain2data/ancient/bodies.0000.cdat | 0 fixture/chain2data/ancient/bodies.cidx | Bin 0 -> 6 bytes fixture/chain2data/ancient/bodies.meta | 1 + fixture/chain2data/ancient/diffs.0000.rdat | 0 fixture/chain2data/ancient/diffs.meta | 1 + fixture/chain2data/ancient/diffs.ridx | Bin 0 -> 6 bytes fixture/chain2data/ancient/hashes.0000.rdat | 0 fixture/chain2data/ancient/hashes.meta | 1 + fixture/chain2data/ancient/hashes.ridx | Bin 0 -> 6 bytes fixture/chain2data/ancient/headers.0000.cdat | 0 fixture/chain2data/ancient/headers.cidx | Bin 0 -> 6 bytes fixture/chain2data/ancient/headers.meta | 1 + fixture/chain2data/ancient/receipts.0000.cdat | 0 fixture/chain2data/ancient/receipts.cidx | Bin 0 -> 6 bytes fixture/chain2data/ancient/receipts.meta | 1 + fixture/chaindata.go | 26 +- fixture/chaindata/.gitignore | 6 - fixture/service.go | 279 +++++++++- go.mod | 42 +- go.sum | 350 ++++++++++--- pkg/prom/prom.go | 23 + pkg/snapshot/config.go | 33 +- pkg/snapshot/env.go | 3 + pkg/snapshot/service.go | 356 +++++++++---- pkg/snapshot/service_test.go | 476 +++++++++++++++++- pkg/snapshot/tracker.go | 98 +++- pkg/snapshot/util.go | 35 ++ 33 files changed, 1486 insertions(+), 273 deletions(-) create mode 100644 fixture/.gitignore create mode 100644 fixture/chain2data/000002.ldb create mode 100644 fixture/chain2data/000004.ldb create mode 100644 fixture/chain2data/ancient/bodies.0000.cdat create mode 100644 fixture/chain2data/ancient/bodies.cidx create mode 100644 fixture/chain2data/ancient/bodies.meta create mode 100644 fixture/chain2data/ancient/diffs.0000.rdat create mode 100644 fixture/chain2data/ancient/diffs.meta create mode 100644 fixture/chain2data/ancient/diffs.ridx create mode 100644 fixture/chain2data/ancient/hashes.0000.rdat create mode 100644 fixture/chain2data/ancient/hashes.meta create mode 100644 fixture/chain2data/ancient/hashes.ridx create mode 100644 fixture/chain2data/ancient/headers.0000.cdat create mode 100644 fixture/chain2data/ancient/headers.cidx create mode 100644 fixture/chain2data/ancient/headers.meta create mode 100644 fixture/chain2data/ancient/receipts.0000.cdat create mode 100644 fixture/chain2data/ancient/receipts.cidx create mode 100644 fixture/chain2data/ancient/receipts.meta delete mode 100644 fixture/chaindata/.gitignore diff --git a/.gitignore b/.gitignore index aaebaa2..f8df831 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,4 @@ ipld-eth-state-snapshot mocks/ .vscode +output_dir*/ diff --git a/README.md b/README.md index b740451..b354d55 100644 --- a/README.md +++ b/README.md @@ -22,6 +22,7 @@ Config format: workers = 4 # degree of concurrency, the state trie is subdivided into sections that are traversed and processed concurrently blockHeight = -1 # blockheight to perform the snapshot at (-1 indicates to use the latest blockheight found in leveldb) recoveryFile = "recovery_file" # specifies a file to output recovery information on error or premature closure + accounts = [] # list of accounts (addresses) to take the snapshot for # SNAPSHOT_ACCOUNTS [leveldb] # path to geth leveldb @@ -72,6 +73,17 @@ Config format: ./ipld-eth-state-snapshot stateSnapshot --config={path to toml config file} ``` + * Account selective snapshot: To restrict the snapshot to a list of accounts (addresses), provide the addresses in config parameter `snapshot.accounts` or env variable `SNAPSHOT_ACCOUNTS`. Only nodes related to provided addresses will be indexed. + + Example: + + ```toml + [snapshot] + accounts = [ + "0x825a6eec09e44Cb0fa19b84353ad0f7858d7F61a" + ] + ``` + * For in-place snapshot in the database: ```bash diff --git a/cmd/stateSnapshot.go b/cmd/stateSnapshot.go index 72a51fe..8765407 100644 --- a/cmd/stateSnapshot.go +++ b/cmd/stateSnapshot.go @@ -69,13 +69,12 @@ func stateSnapshot() { logWithCommand.Fatal(err) } workers := viper.GetUint(snapshot.SNAPSHOT_WORKERS_TOML) - if height < 0 { - if err := snapshotService.CreateLatestSnapshot(workers); err != nil { + if err := snapshotService.CreateLatestSnapshot(workers, config.Service.AllowedAccounts); err != nil { logWithCommand.Fatal(err) } } else { - params := snapshot.SnapshotParams{Workers: workers, Height: uint64(height)} + params := snapshot.SnapshotParams{Workers: workers, Height: uint64(height), WatchedAddresses: config.Service.AllowedAccounts} if err := snapshotService.CreateSnapshot(params); err != nil { logWithCommand.Fatal(err) } @@ -93,6 +92,7 @@ func init() { stateSnapshotCmd.PersistentFlags().String(snapshot.SNAPSHOT_RECOVERY_FILE_CLI, "", "file to recover from a previous iteration") stateSnapshotCmd.PersistentFlags().String(snapshot.SNAPSHOT_MODE_CLI, "postgres", "output mode for snapshot ('file' or 'postgres')") stateSnapshotCmd.PersistentFlags().String(snapshot.FILE_OUTPUT_DIR_CLI, "", "directory for writing ouput to while operating in 'file' mode") + stateSnapshotCmd.PersistentFlags().StringArray(snapshot.SNAPSHOT_ACCOUNTS_CLI, nil, "list of account addresses to limit snapshot to") viper.BindPFlag(snapshot.LVL_DB_PATH_TOML, stateSnapshotCmd.PersistentFlags().Lookup(snapshot.LVL_DB_PATH_CLI)) viper.BindPFlag(snapshot.ANCIENT_DB_PATH_TOML, stateSnapshotCmd.PersistentFlags().Lookup(snapshot.ANCIENT_DB_PATH_CLI)) @@ -101,4 +101,5 @@ func init() { viper.BindPFlag(snapshot.SNAPSHOT_RECOVERY_FILE_TOML, stateSnapshotCmd.PersistentFlags().Lookup(snapshot.SNAPSHOT_RECOVERY_FILE_CLI)) viper.BindPFlag(snapshot.SNAPSHOT_MODE_TOML, stateSnapshotCmd.PersistentFlags().Lookup(snapshot.SNAPSHOT_MODE_CLI)) viper.BindPFlag(snapshot.FILE_OUTPUT_DIR_TOML, stateSnapshotCmd.PersistentFlags().Lookup(snapshot.FILE_OUTPUT_DIR_CLI)) + viper.BindPFlag(snapshot.SNAPSHOT_ACCOUNTS_TOML, stateSnapshotCmd.PersistentFlags().Lookup(snapshot.SNAPSHOT_ACCOUNTS_CLI)) } diff --git a/fixture/.gitignore b/fixture/.gitignore new file mode 100644 index 0000000..7c4ab7e --- /dev/null +++ b/fixture/.gitignore @@ -0,0 +1,7 @@ +*/*.log +*/CURRENT* +*/LOCK +*/LOG +*/MANIFEST-* +*/ancient/FLOCK +*/ancient/*.meta diff --git a/fixture/chain2data/000002.ldb b/fixture/chain2data/000002.ldb new file mode 100644 index 0000000000000000000000000000000000000000..c7bb1f94f98117e2689a22dd14a344f424d036f8 GIT binary patch literal 918 zcmaFPyMRHH!^8aaHk-nL<=gl*Dx58o-*zv5`a^$HhTZX%t|v<#p7&#AWMg1p&|_jS z5LECa9b74?m;Zbk{iZOD}(q~ZNQ}9SlOi3+bAH2Nf%3eZ_8HYm$r*{6d7dds zRwjnV#^yRo8Tq-X#U(&9Ri!nQtPFINQZow-O%3b>7*sqGi!+q0lng2ielapIDDfsS zm|7(EwAhjg5s5q&#C|y#7A(K&y zBdIh;Qm7y~C$q3LRmrMasUWo|Gau*zQ=nx9`N=>#j2SQJ)YftuM5L$Yr50xvxA`$L zFdO<^FsMjLHcv56w6HWXNVZ5#O-?qjv`9%bO)@t%GDNnwGYQ&1_0PxL1A5 z-<|gCIr~x4b4?T3=J&_cH2F6l-B)W@VQf4)v0je-5Ji_rQ3tF;td04 zNB^laVQ6AxXmVg;=n!-k(d1+-VPxUtlVIfJRLBDoymg%{z+}K!#HrPFi;01QQwx{^ zj75r5lS_+Ib+c0|b<6<|a zh`AZg3z;3#uV7GQPRq;zrWC!L)Uwo^lq5Z;QXoGw5197zbKO{_HNLY-gEWE7%vFe3$tdN-G((x^!I4#*jfJ~lTUk@8}G{JLj(RSZc0Ow+p$AxCr3_WX^qZa?&h_N z@-e`>CV1P^b(X6d_%tX2M#wMo-nhC%&URkD8@EW~zJZm0t`Dp0C5gFUXHu zI;SR~H#y5)$$mR>OW`l<;T|!UKQD<^vRrf3=>77inLfH|S6asQ1WC<@(3}Ok6U1Rp z1`kp{8M^+f(ECwEdrxZQ$UEvwQySV~ti_J|aldGb51;per5|*iD+=>IXeDSAKAqDeO3LdPXc0HZH`pFib^fUIg=aQG&?R4IJ;IpR-U!43& zefHj8q&K_r{R=>=g$I@w#_7=UsNU0#q2WW3?8`}4LQS{I~^;vRPIJ~3TM{mg+0g{Q}%pp ztMc$N%-1}%Y)*n3tb|E+RfZHG(`QI=HKgE`v1{aNziFGAv%9nJyqgh4uxy3r2SN0> zsiFXjFOBmAX7C2K27sDQzIe>xS5DhLw$aV%=E9&&jJ&s3qp2s?MV+c}p1dM-?8}y&)zQ1B-g$a`2xd&zmLA*O7Y$OQ0VqfR zkBtVX8ad9Ub2w2O$YbJ{hQ2&Kb?lwXR0mRl(G1lDvz84H*1NjEK)Hgd8xWBCZa|+K zuq?+bXxv`)gP&N}cVF3Zv*l}<&%;r5Tio_sdi03%n0EZ83mm#_Eda1@EpS~6MnYxC z%i_iIG84iQGBT6WQX!L8UQGqY)R9vDJIbq$2+9nx$IXGV8T*~tzqER78`)Ok;=k|f z@Z?a~Kr@qr)pF2?D*f(h)S`{^pK`Y?Opa5!W-(++&T+r2r|!!-o62|8D87blC})nr zYGtG=$BeI7Gvjzj+q33In>91rufB^hKUiup*~J1%^P(R1S(%Nru$vOVd~y zH`=bJYouKm^L5wfx4z4F@6kKxbYxM7Zkm1}ER-{)!D<>RRatZCy3X>R>9N-Z388hC zKw5TV%sihKCoRk86un)=cVR80nIT;c(j7cYhFAUCeBP4-8>f#scyz{1-E-Nyb7@h% zbopJ*I@3PDNFdRbLb{y#Ye$xTl&ydH@kH1d<&#d?YN;Vj!@R4l^P72K^2>=ZT2ih^ z4b}EFyW_H>_p!?>#+tA_S7x*BoPp80VGPa#8+jMX5$6@z+8bUix;%7B$(nPAa^fGZhY$P9Wg%Uj>E*Ho9>?^rzu5Weyi$DgmbV!*u2a(X zK9QQkzESRI3*-CiD~)tx4`ogtk@Tf`Sn^qJguxZJ4YEB$e=$h9v$#WWSTKIWWC$ev z1}O3@%Hpd>CVm)3rJc{$S6_5?xz8q9-ced{Jx z+s9(*3oer*Qn#h6^<2ul`fdsA>aQ;w=~e_ro_4?O{`RxRYqr#i>%TgNJ^k2d+SWSb zV1Dbb(QdYIVSl+=NOyVTdCjE@4p)#j4Kqr|;V=a|nR)71d zBi+q++{a&S-QPY9%=I-mUCYt{^?W zUFD*$<*D`K&X3Q&w9Jb(a`&5?lPHC7NPoG5knYim356+Gy7O`AZQY|ceD0XOXmU|w z?ViBnxw&B<0%|hgy8d$MNcT$p$NU)uY4TsjQP!-YPt3j$;(-M&z1P_I%sEy!j`A7$ z^_Q!Hbgvo+FKe73K zUU<@nq;+EYl@C{Kj=7#9tW3R>x9!{_?hshoU#=$7z0qeqC(672#YVfwtCr_j-@p3x zp7jd9r|%7dMpV)KR_ns${pHFc-Fut|W63M4%NJcQ`7}DGu32Ami#Xu&%ZVJT>hbG# z#oUC)`k$LT(tW_5UObk3#Q0WQg1OY(Ope0SYqF#BtY=+*HTg->mZo1}dw;nMr2BkD zC_n3jO^fA>`@^qw+aHJ?v~{CX)N>Ah6L*R9!MiHZyuVyB(tS5!&Zx6j$1Tcv?~!WE z+gSWL)-}THM#IHb^>2zb&KX^X`jT=1iJ?F*8+AKo+vmd5HTZ z`pd))I2_XUfhM>cNar4sFGD}Hu5)dkSH|2$p_&Grjt|%5e2DNkZmV*w>Oza|bNdPD zi~DYajkm!_1cs+k0jQM);4%XM3VIKdofMj?InIE^8O_Vg@=QsKONN`!h=n2|!!0At zSp%W52oQ}^q_~t8A|e`gNNpa(LPSJe4e1FXW+EaoB}gw9*CQIjNN*~xM>L@zy$D>7 zXqrTNLAV~#P(*qoaXlhafb@)TJ))@`>B-}IL{kFNlfw0g%r?^N1xSy`Od!3txE_%S zMtaYXULMgjWWA9D?qN0}Kr|Pl0?*J)L_{V8sT~2>5D}62Kze)d`yfJBNN*#qM`V_f zUJb5CWDb#DHLk}YKq*MC1lN-zz+Ol%59#F)Kn_>{GBM*kKrqwu&H^54WFlmSesmn( z$VAu$>G|L{L4=)?;fvaN%B<}%9e?XEOkSGJvXW!7@e);vl zwO?r_g0Wb1J-AJ?DF~KS@D%#6Uph1(wV^6V<@R5#8@Rf9Kw3N?2?wOy0V#bznlK=R z4oF@DlGA`>JRl856%57zo4sB}V`Ks^J|P3vB}Em4V02k&4qkzN39mrEG635xZ+fo=u026j4Z*B8kw7PHXhE2yh9}Xd z{nEh!sdYfwFd(fOkQNO{f&pp9fRr{M#SBOx1JdvT$#Fn38j!S5{lc(GXqrYQ5rkj> z^hyJ;UIu_5IRK>a>D#`u)$X<)f(;9>zg!Y29BM%>Y=q#te7Cb%}g;c%2m>vSOG8Ph(3DGw3?iJGEf4 zHT|vMs80vBW}2-&ac}1*`g;m0bq8Lm2$dQLRKVC&UV7%Fv@EZrOkQkCLOfv8Ty3c= z=@AL32^qYsv<$e}_^8nlKExRmwzRY|O&_vUAm0YbiYyieEvfSR28~L~&Pe5@AYa_M zyO}TkxRn>?m98_w*wN*nopW~gh}>6cyAmHgmfm5zK(%JBy7=VO@sV@0(>y=rw4FV+ zWsv^deEZm`1;flr?PCuO4*b~M-Xrcw5?3y{@ABgMh`7d@;a_+`&rU6?d~ddMeQnsq zlyT9529qzpQHnRF>64Ta1f%zCJ2Gz1>eHcv4lW;jFk*3+$IV@B7jo)V#F~LllTOdq zi_W_?JbLGl5wRHsc}Wi~uOFB)R&lgk;tIiA@kB%vrrsmbE6&0BG{oc}K-)8-$6H*P zy9QA2ikouZz`Feu@!OX!Pxh`o?AM_LGvs<)qsI5`=y`m#uKsk@>Cd0n&wPWbEh#Ib zm`trWNAg?)s*}X){3iH~=)78N3OUF>maSEmy*loT(`DwcMQ>7HJotEb+NL00)UC8#S2g5yceQ(~l^x$5bHg%J zc;7A2+O+F|W!c81hWQ&Z`gUML*=L<((ubg8`>q1)>H;;A(u&Q}=kM9D`SS5;mrjq~wLb3Bd+NzWwD5*gt)si<`>N#;G1PKbyWRh5 zdK$kgY1bU6_z_Bdgk-@w(7mjuSUg#5!n)EmF$^P}jnP;3N}JjfDV{TQ^XL5&dej^n^KHjm zp}k&E3Vzu*FX2eDUti(<r?hAr?tBx zPWMF2nQb@pSi|8?F(M4bBhpgh0eGS61A!tHy&di4qXJYRQwrCT8^ae9d1l6^@*KYy zyB#Gx7_x%oMjL4m68(!sd++Nuvic64OE1fFQ*ZR0f{mvjphld61?vEM*Z-Px3g)dy z-Erp&c}M=9;^Y#w+jnFeM-MLfHEQ_uVENoncgH&gE!#i=G}l;2=}8g{A^Vf^>IyBT zPG;GBxZ1q0xi#+3>{(~9ewYvz*h>qTQCPP zv#ycQww=lZ!3l9`(-Jaf09I1122GZemX-w-XpDkRS!An|iI{Y_ljcW97S6V6txVT! zDtk6H3aV2xc&VAZxGY3}`liMwNNJ{)jiv38Mz%L%Q z>^KmTw(sHdTS+opKG!|imaoJE%y2H?_eKVCDO^}23Jv7Jdwd=+;sQr5c++KuUBEv7NtD%F8B9E((7`44_fScCgZ#+>OyX9YT2W`6?Suq&{wc5a39*M zW`)$CH=;mM82b4yMLd`p7AXi041{5UT#f*W&^->~0wFBsfj42173DN_Iu9BNFjK{H z^pTAalciS#3JOPU1J)3Dkp(Oz9-I`;gA;i$KP)m*0OMF;mPMk#a#3JdWMRO_nNy9A zg+b80m99Gwi*@g;-15ce{DZ{H!`?bSAGUVr>Mq&( z_d0&}2RpjS7{q6$rNj*fhz_w4!r{u|@Fg9SueXkskAC`EY;mYB%GSp86A>migPMMmHCOnC`RIZ8{gbraRugKHi2NPsMl-IHbjV*R7?BeNM*__1yN(9`|Bm)Q*wOSmB2Y)$`padzTP_x5Z1Xd;?8b<{~=p2m2YU6V7V(#>bh z{MGNF;NY>@syA*puG77;>I81Iq=@`+7y+;j|HEa{1n#{pjKeI(A4Wl{!#{paK1}LP zQWkf9IbGLNc5w@9{TaiZ*$1SyDXXT=qEnJ=r`$fOyDryL?6mod(<$fi^Uu1EVm?d| z`!9dj``Rz{ns2U+mK|;FyE?qq#8HrJ-^JdCMPwQg$AQQT%o4K@O_d>HP2eoV$Ex(C z^`!JnCVO{x&lPMHFVfg>Y;n&^8ROZzPE#-KeECpj+^8CUF>X&KwxI06H7z;E+tf28 zqnZcOYnMJ9Q#+j0rtG~xYj4r%2`}f0gZHQ`9sglhW$)ERE0c{ClpIWx<-nHb&PM`b zZWT-QzQk`^`;)rS1(I$=8TNI|x z9eYV(uJ_K{;ItvI%TFV=#WbM{^=`~1Lq#Mm8;an8O4$(~gf;Zl6z> z3YXxw#v&pp;RDI7v55#z_`=)*lqnGLr|`X!g$a{ZrbtA{!mmp9)u@6>L<}#aLd?>; zt-8xfnTRxoEac}~>3vjr_EF{WN0kRs`inS1cExLiSX35jxzQA30bqr58TY70fe1*% z0+loeG9O9@3LT4zAObW+P=r+M%h6x5lk#mUh3gJgE%{tZQ81SS3voydlVcFZC<=y= zN_K!OXGab}v=|sItds@>VS|8!qEak?ap9$iM7C#&n0nzKoEj1&xL25DKrF40t0dA!$%0MIBWq3QwI9J zK!izAxq!n82lt%90VoP{0e~C9gk^=dUI#a*-|JYObYSbGFZS~{E*`SsRotN(Wutcy z{_k|RzZ7ykj3^{uIFUlWq4biu7X5I7To*`Xbur>(K%f%lAmF2)Cl#UgqojvLiWr}1 z9USyqBJNtwLz{yR>NaOJ0<1-R6 zGXZ0bEC^QKD$Dki<)H6uM?XbLbUiIgEh-{org&)=(Is+XSfpCEEa1W#6jE*juc0op z7+tPY-AfS&y zn-nPX(Z$)}+(LnJd0-^EHD(bnjY1JaZ{}+B$tCSASYNX{y~+Xr9j!Dv6ZIiX-YiP7 z2n`KP-T1T-0;D_z4^#?fp6ks3^*Y`RJ;%VxAQlfy4d-LBK#`#?*ImSqOBV~Q6nU^3 zLHEU1mwOxP#+e_kCW={BqOZfh(4ml zY}k&(lSouimb-<-FG$pF$v`4hL!y=Q!-+^7h5Q2C&ESD%9%&lF^o*qp#e>KXV9I3I z#lJz7yxY1f0mdol?Oe10suXjM=_Gq0u#_xL!=l01gZfb9|ph`6pEEZZ6Q(v z+f*%$8c=DnXV9-wZpm4KeUNL{2L$!Xtl0&v2gpY7a37Jza`M36G+P&7+nSlg$9LJTS1b;_wqR zXt``FVPGSA(clNjWNO+CB+^B$fA{n8A{%`0D1_Ia@vDOjR}jC+v&KI zj_jW>%nSd}DFU3!@#sVfn61BX&K42{+QPg$cpVB zwi#$x09ZT!_1+VZLYuQO5-QS)P@$V$B9Gu#J~gHT<|}l; zR5M+JCxZ!vLY^$+$)Fmernt1ayWE*JxEDKvB;`dP+*#^wdcp0DXTmS3fop&=WJ#rkE!) zX^E8hjA_f@D0E4l&DBFiTUEOz0J+561pMbfj?YP86rT{6k$@8T;yFAkEk#aj0xFJn zlI_h(Tmn7u`pRENZS{I88%e(+=?hC<%acXrF)45Yy_iSeXFZGpV9jymLS%ZGdPV5c zX+{BSs=o3*R&Bi=Y6TwCyND+z4YmDmyK(?C0lk?r83`pwxPS|_MLdQhE*wU}YFv0& z#FNJoIY5zxAt~9D1E@Tfwy`Ef{+z2OvJot2IdyPHRh{9lUD13j#TN)ae4akLI;QIQ!kAB1LK z;oU9eu&Sh2#lNCf$#Bo&iD9S0`qsF`4S<| ze}U*Ze+kiX8u2%N@h@Sz^8ido1(RFDBH@dF8mY%Hq4+0oI^KRi1nSH{gzykj@Amxz zRL2=1)FuC2sE(s}_&r=_{u^*z55aYuWF>^_di`)6=UU-OTQqQf$L!$={{q?-C7?Z! zC4t$$gZ6HBLe!2Ex~K_odRM~Bq9@po-21uTFj}D^`bYxdP0?rHAiRiXgVp>Hz^ksM zBlI4NeFxwBX<-{OqJ;rj-_zkcATQyC^I6fLNOkd~pNI3H9QH9K6mmok8LblW<_z%1 z5|FQCLH`59_xcXv|3=a`nNtyuFQ?1pQgEui9*x3&s(t|Z(`36v2!&5Vfc$?q@W%rH z{~rhbrr!boqK9Sle;fD%(C}a3{_HL)mlUG(zXJ8ce+Kn8-Txm){geMQ)Q?!7SrUKX zuaG|#Q8+RG5%dr9f)UA*vI!gj&Nl~O`Th}w>EhZ@KkEgc5G9&$escpT zTn^(6gW@jGdc*j?-3c)HqXU4P`#-q=?2AlRv?}NSFekvIjs495s2*?taMXZkr$FAH z9e~KdKs8H6K+^e#+<>d!#e@l+f!|}oAZ$6gJAA+^kc5T2RsG&Pv3%8Bvrt=il^dQF{0-V?mWD109Y@ucvE+mev^8@fz&tz# zqe--;0G8lUC@rET3UC3`SNJWMNHb3Eo|`yf(T@Uc{9-&!1GR>dn5n#=DCmlBrLLQ% z-L(mQm9uALOgu=tV^n>z=$?8CP!^lLaF>xk+pv^>d^b-em%|y+be6lz>#XzE%c?2Q zU05A>NNO*>Dc8WIH z9r4lsnw)~!{bX*+$bsAxNmk5{vs0Lo>=bA%`kCYuF8tq?oiZEGPVxN*k_L37?~z9% zlfmtK0ss7}dI5oz_Orh0_y+Q=Lh?%t<;;>7If}*M{lO3WhJ0uQnOQ1Om(t&DJe4~TF_{}0ntKvXb!Q5 zF^?#q{?dg%#As^j%YMR9W2Df!N26fCKNq5TF^rCfXCA7dpi94xhROYE81+-Z897Vl z|2rcyXa5+Pkpw%xchOLUCRh@kLHrO0BI1QdLIE1lUj%3*(aru44T^Fy=v1+Umyv{N zL@1CcLS_6hP=kO5`X^#Mq5f5 zb`8XAFjpn7(Z2_61_IMyG6X2-&85J7$CPwSh5hgKQ~!qEO84>1M#oBLO8Jyqsg1c;6z zK#`0ehJeypzlVVIek1qrFwi=-GwLp~FY$Oh3IpNs_{B|3NgN28Q2rte1c%{KpgF96 zJPNd${a-|Zww4EeqaeS9g8q~|(C-oADFl;~X?lp|l!S%^P-`Sr@kfCnkU&cWnbRae zn!kw&C4P$uo&MRF&@%Yn#Du)F{yHQCxJ4XByAtJ(Q6WHv>P9F6)E^fLi!>O6G9%c% zO11UKOV;@o7-Fwc`Y*jgZ4!z_{UJC+DGJKILlp>ujrA6XQ~xzUv_^3d=lA%K;-G(x z4=I%de+v%<5BfelWc#n)p~sLPMu$)u=69JJC{FY*i5nc)t%HnOLj*MDWO3MFY8WK;qNQ2)mT}-r%h(1jhRf;s~LL68@#X zhSxt}l=*Qux%__|r`#V_lBAXY6>5>Fd`l|%US9%V`ak_GwInQ3Q;TBDWd(4d1`ym=mf*K+ z6M!eOa3u*R7)yl*Z=qtC*8gqV$$tuNy7PYu-3&yXeg@s(p&~RL{3sD6umZsh{8L0r zOY-)AM>*IaMI7WvVs}9KPe6xKhNt&SvQz#Yq~rd5whA3(tMH70MnBXE48)#M*dnw4 zkhw;QTvmTbS)t=0DcJiz4R)*!|6hQ0ey6f)Zc!vu_K%ZU{s?xy=d!nIz_qI8!3f&l+_p-z8r z+M0oa)BS6f25=7^o7T=%4P$D z&7aUvx66M>qkjWO6Qv7P+jP690*B?K1eE@pk&r#bEG{iIF=-+(Gldpa;s0jp#7*KQ zrTWI}IN4g+*w`a}Ds4(aW)|9HK(cR*gtAIVN=NL`ckeN#-pFq>mgOe|x^(1XsY5+e ztTO;`P-SvrXXNryvy!r>s46f~h0xXyO3c)>^ppfu6}|L?EJRtwW@k)PRRl@Ugc_Tj zqRLK>lQ3#Hla-#3k(7q&Gt^WkAw4Y)k!3dUtYUsXU~?wv>nS2+mK!F;x(e#g`FmXX0EE8*<^ncQMG&*-v$&QK5Coig9V0)`CnvbkH&+INR}t86Ojl)uAIKhaaysaE`P*rAQnyh4I*KGI(hd}X`5mTa|u>b|hU^Eub= z^?aU7+k8U(L(@&zwvPmKeG@b=R84X!97bb^h}MbHjBZk%OP;ocx_NB9mM)9#R}%&;+2J{^kOZ5g%P{1Az}+^uD_lfO(bEUg?o;ux##gs56dQO|gknrg zn7i?2Qz%#0}x4e(NE||1A zZIr2|_4yI}*=tw#b`v%;X4V)dLIB`=q^{1&?4z;6s_ZOWyDJe zqiCTuvkS(9zT>2rx`@%?+iZ^ZGA*Wlg7Se5D__V1r^q=@hX3 zD9IEz^BiRE(WC|70@J-CXX`N0ir`6j)+Awtv*38NtXoA+@=UoH9e@HmAbG09s~{6- z{>bd2ffxjS-s~obiFTzB8y8THtoP=%sfDJWniq;J^Y5^8`&5qsgHq5)8%{5=Ts_QJofek z7W>QS{LcH#rk8sr4Pt$+A6xg-F1V{`k^2N6MuOwxMNOB6#$2^x`8@Dg0g+jzjY3Pf zkXcskH_Nd!29QTxYO175p(egYAzs@#UDCv8W(xv+2S_moNO?qal0Y&e(GCwxNKaAr zH;F2oDfN=?u@9`281N@utV`0j8dg5Nu?p-v74g|osN8h*%&%HQ!`Fz+=S6IhFN>>a zicyghm%ja0o;IR87~7YG$+iISOSlIWyC8``80nt z$)J1r;Uq&7&eY$63S1+phPk{35Dz9ieSzW3ofC9&m0yO|48 zUH1#=W1i|{*qbcaqD)%mqs>` zFuLX*tPH9k@GJ!!l8kVs*L1WZw}uZklVYW+gz!1TH$ zl%dxqGHg>Co2etWP?IxwqT&mtK1W`?tu18LuJW}}AFmx%ZS6R+_@Xu5S*L9Rc5YNq zN;3mW-uponYOfU}MKnFjrb4+S!u{BAEOtXOJFlS4sM%bQ5Bgq^VqTE?lBwUn*vV9; ztZ-)Bh}z9ybLlf=0hycGi=-XX)ka^tc=+)Cd(Tqt^}NyKF57>(bK{y1^0H#@3odK( zZ^K*DmK!AM&b`6q_58Ya`9%Xe59>{RM?9lSfWIISg3xCsN(>-d&_tqU!KJidjX|j9 z^i)`n@DI^KVUH&g{8NkY&y#{#pzjIk53h_TBqm)rQ=sMj1T>XCL6u&nSM5IY$QXe~ z`Phc}O`n=xwc5+Rukv4R@xr$v$Z&pJYHmpFL8TDCyOnE&i3W93zQmLVx1Ar-O5^yQ zy4}p_g&UucjF7cPb?&#?qyWyxUD^Xyn{aV+7F>gG#-U0zzCj^g;oOK;*&#!XI&|bu z8ER~_9m2;6gnRIaQ_k*{eRhKF(>D|PC&e4uUpEip>iY#1MR;Dg;ugyG6CXdn`;FQ7 zIS+4b4OLhbyjHkBdTw>+i9s=^s!u;SQe`+48ETYm;0`ea$N|-wf!l1<=}G23;58Tc zYys<$B`*aKib-<9nc`m5>v8kl{5R$snsxi2rd3=n6n$Vfj=XkHi6xxhvei@}{f>uG z-&W1#VUs4$-|z0wcP}h6LAFLPtx@CP;qsW|pr+2&BtyUJp7uJjbJ4;tG$!Ck)U}WS zSMN9An`jIbb;&%rOENlCP#;5UYYx-Q2)9YZhsOiS@Yw&&RrxkT{zHS#5YF89fba1P z?2-&4u=Jj3i|!a&((rlqDz(&%*VcTa9Xe;ZE^{l-`)KaYPv?hrZ+LAU=p4vToLtp? zP`pt%qTvAC=`^9pZYC?K0Jc3NfxX7dG&Ipo$*028Xy6d7LjTQiIR3YegS>EN=Xqp+ z+a)$#_RzBCH)|H#)!tMyxo0_lk+K3f=-@Zr@Wb;A$2fM>ozOs!MekPXkxvU!{8Hw| z=$!Z^|AtI>EHmX5J7;Qq??#E^a1`0}!3;NQ8Sw)63p8`2Sg?f&6u-@A9ngvnNiv7Supncg)|F#wn{gnuESKd=yIbfBfH8`^;aY@g)@6Q)X=-brHhdj z$(Ucg`puD>$JJPr;iJi6r3q;($^>z5}VOR~P=x3^Vq&F6(}$Cr?y|QYe3Q6$K4@D^0%vKPfHoVIc}LPGC9r~cG5#Zj$3yUWct0s-KdDcF z`u&ToKxMMfZW>2wL%?y#V+$0j7KMLdjy`r}$?i>aTjqT3RIyX3;nkLXST*eN5cO&C z_73N|7yk0~(WhO_Rfl?y=;^*{lf`;ht!;a_*sA{4d&y(Fj~<&Y>IqLJQ!yR)_GuF?(p59s8`VO@_{)H=JHNez((vyBTD`=g3ZL=%YcUkP5$Gaw0&qvjJ`@ zBzM~L%~H(zDWm<3)FABSL;zC!SEPo5FfZbogw(kBH})4NHRLy~E6HizeBs#z1|17 z;l}q5q{cfMLu`m+X5t+mAjB*@D$$_ttNi0DqY|Z02=iuic~^pK61P~fV7|d)YvcKs zC-l~DJF`IEZ^qWHQMcTzzc@S596Tmn@Ks&7@$v6ho$t3cc>?!(<;W69Kj&GtZUF#^NH8&1glEf`i3<}H;)!=+O)0_%@{u9<9Epu|6L zd67!lzDuIZk7#@7a@NR>+VakB*lnp{D;4f9 zdMS7qHhWXLQ)Bej1qR}&&o%Ok3|8D#+FR5ySmEwT19+ff!)uEBhK%s%y+@Y1A7@d4 z`)p_ zLy-#ZOSk|XxN>tsNJ_(6rLWCiU!Lr1h@7MU)~W1JV)Ml_&BIRkXsOBsJK1MfJXbx- z->7)<>b;87UVCo89MUmfb(`<=rVNPlGECn|6f+v)dJIsztt8>o{-9LogQk6=C5bp^ z)h3ztO_AB^La6w97n+BNcEl4?CVuuV&Q?6f>GRvh*R`s$s9!rQO!tb&Ju^gbK4q(x zxV_?egTP&D_pKt;ZPa1Gz;CBT)w2}`tCh=2qQ^#U@h|Nnp^Y2#NvI9`x=21gWRcewxZ_^sFJBf87F9Z9l_s+J47>Uq(t2o#0dz?t&>BpY$@VUzA>u;w8J1tMVR-#oIRZi zUPuU(nX;zz9D@KRZ=}kUl;Oh4z{XcMTW1pL&w1^|mU+W(1j>hRCcbVLZ1@g}6K0ubL0^cVxD{@jh?C4ip#+K} zhyLGz;=>6yQ6xTe=r^lw_piK2HAz*NuXIXc)nEUeRafgC9$9f`&>U{py+eT|j#X|e zwz^!}`03Rm9V_35;_Xc3$%2_d3q*=W-D|SHzS4AMZ4QoHP*&Kst?~ZRaXAUM?jFkj z0aYDrT%%$mv76(+*`ym3;uQRtKRD<_n?Q<}ZR1XMQGNg9Slxq6IsITSd%d7$&W4kz zGDRuMM_pb8h-aU_9-tSWd^_jFrz^u=8#sQdJ92-fPfzgS$H!pgxZ67r-@)V{x5C|O zf{(;5DPV{+XE9=Nls5|Pj5tgQ!l`o5+zeNTp-w;SZVYjSSYvB(RS3^7;Ef1yGfwQ1tKo@uvMU#f~+IK!*u zQY*P(#))y3?9F~N4{W{<+V+z`r0+y96nW`ssL@tP43c^(oPi7y(Fy<@Tr8Iuq-@&g z@7#1W+~#6vxV0gV{o8OuwnLGmF3b;!M$Y;diI;Jr$9&rx znI%z=0dktZP=4c|n|HoMz&FlC?t|83NZ1Sk21VG75?ly@3Vau*v0d=w_W?3qG}cp77{N;+abV?}cb9kjH5m6`!i}Bs2L; zR5{1m0<;q(&TiwTUNaTWrJ3asErzHmkQlOA$j(2**^g(WKfE%Yk(ipo{7Sm_Gaw~t z&0mxCQb@(df_tX)-UY{7wpE!G54+q8d>6e6&7~+^4>i22V6AX*#om*-5tN?FVKxTT zgGYQ8x@3|9R++tOJQgSM*@}G=n8R=^ed-HcD-|xGDHEOYII-9vlF_EGcpxSq)fmo^qVjFS)ABxjg= zVnUkgh*7@ek*D6@Wm~DnJ~O>F?WyVGc;xN_Gp&IhhDL}jNlTc&Elpr!iTwHi+0tWZ zi15uWDRZ7y3Z`6gy<)LoWmD6$t9sk&WypFHE|t8>dRmxl^t58-Rzqy;xm%}}ui(C4 zVDa_ntFQ{4=?}Ya8?w`%gZ0PyVf&kJHvNW!@uPK|0eXdS$x0SFe+=1l7u=>3FC$s` zAKG-n_WoI<{wJG0SeSqAI&RYgm;^R`LdW_wr-Jnl>Mag@waM}g`K!};W75c7T`MwD z3Iq?IODlKQWOqz>nHq3Fy)fM^c$G)*{d*r$j_ywyQqIg8qy6g-Z2B@})7>OCy+$&K z5Edq0V6*;ju`oxN|H>Z$S0xD+p8BaZO=pLxZjWjvIGE$jg zGQpvCup&vXTZFdSwfVWt#0hu2CR#}ZjJ$~stb;BPTO#fmE_Nc=(^;}DGa>=Q5QLV} zmiD$cw&)uJSYv<|`k&?)w6qDP1V*B*07otr$*szGeCY6p3@e$_ev{X~FC9rOnI(T9 zCm3b`>}EAOx(bdHMisv^*!+SaZZAJ}KH{^=WXlry>hNZLZ2bIrb*_ui89SIMevlfR zWJ#-iE9Orb>~;P_(|yySuTMAf+ALcF`PmP{&m`V;g(J|RQPt?UCb-M9spsW0yM6Ds z?YjOTZ|}Of=R2=Och@G#y?MI%@?8Cq&=A)|2d=>b`##K@o^~BtCY?G#N)X>`xf7N6 z>fl&6Qc5AUn|pLFyo-Jvc?!46F0a`Oo!w=M9ZpGyE<5}AeZfbkwC!Wf-ke$ch@Sd% zRWVG5m|3+ND)ah_-HTHVGH%O6)x{3Fy*0u)SbmzU%e?e!+C`)#@#jilHU8`9Xe)U8 z?%Vr2PDiYraAe*Py7`>9vtt(?sa-a>K|L)gw>o#@a~O`_J33?xK0I?vd)#D|4~N>P zg|w^c;merq(#g`k&d0^`TqnHB#WA%4ia-QiZt_X zpPuzS>mW{Z^sMYz{==2WJ#k{9EjGn~b_w@g#O7*X^v(R-*_@89leGD@JA+LoSOqSR z{9>&QG-3O|T5hUw9VFg?$qZu7c9%flzFV@ZkBmr?x-<0QtrR8fnMu>ZsXcv{0D8$? z5rdYa#O1{=1e1(2oZHs!@Nq6dX17C^Y2b(U2!z#6696L1NvB-KRp-Nr_i+ zm!ZxlO$do$Im8Yq9ytsVREP7+h*9w>C9Vh(1HTOra08%}0ITy{o+S#rV*Gh?D7 zZ7yMUhzz2eN^`af&^dz;m*!$LbWJ2{q)2o=2aGc&2M9^%ONwfVT9U629Y6ssX>NCZ z)T?EUI7H`VKnt2%E&^U8R7uq;6a4o$L`OKXKZIq*>R;j>`-z(|EGj?DWYQ z2UbLmsoN2RZrvJUljoQKpj;IqX#%2iNT4mv4{tUK`mRcUvndiCA-C?x2PZOQGnVo;J0>ZA~+!lPU?T`5Y^uwdD}DyO@K8`7Tr@d zL|uUBpcZHYbJ3}9kV!@b46679tI4Qu6P?xrF9R$r>PMS>qpBxin!#H5X-(wp)VP!c zUaDEf3!;EF9&&8Ixo28AuJBMe%H~@wo9|zE}+h!Wa z?eT-Y%ql)*Uz_Y_lXvK?Cexw;iwIMYx7^<7yeWNIl+TlO^K0Qm);QplJ5!4FSd7lE zL5GAdk*aQbzr=9Nh#T!odx5#9Ls!<;7jMTDTWM+qrwrY95>AHRUR!&D&=FqZb2^(g zX;s!8on8HL8e=`I&$<(Q=*8HtyB{2u-X@yuZ;zkz#TU=YjU_{UF%t1ZT8VgCm_oZk z>1l$61q}e5|HT(~Jl>);l+}vuDH>_;EPSZuou>ysUMbUXf2cks>UH&KN2Kx(oJLP{ z#w>m(=&(4V17o2l0#pF3x2*}k)A#NREItqFZMWpK!Unq8m zB3gv#r=hFlIRHeASQDIv_3ij7Xn~UKm{dfKI3PyEZVUi}CCrEj8g7RKCxSs;x%BQU zO?abFhn(9uX-VUVRc{^hLMO3%{2n~tJI40O<;vH`8&}4JzDFw0mb|~`x!1ro|A+@W zmUdy)%`~>($8%dJFyz;qesOo^*8i0~I(cVR=)El`eqH#8dtug$v)izSiT)lf9Xsn^ zWb$b*bK_m16Vz{!uwEP`r`?GKvxaf)I@TrMDlRbTber93qmlDce{5gPqQi3kpy6-V zF}~y>fA7MsZ9a*@_b;^>mBKa2(+^H!Dqfx6wu`R{w*y+0_~%Y!#1M~*AILc6pO_PJ zh^!Gz{;lIxtwq_teQSZwT7bqVQNA-w1^Noqo^n10yGzAHM^d5=LS$9MM3Wa^{Ix2i zw^n-Ftoz=_~tRWjjG zZF5w~WqL%DE=<7|i^2UG4iR8OM_TFyZ8HbJZoN4`hl5r^B7z<+f>@AuK{roKG;yNV zF~8HXQcLA-RHqOf3KRAqZ%o?|-M z6XkfI^XTz{063`(XcT|=^(#iwIH%ku53DG$n7Rm28AL0h(8+lwbdf=&f~G?mF?YnG z71HY*%k+>kF_B|JbEw-9JzoVFQ;0WZK>I`eKU=7By!5;@11D<=<9|*@h1nAsX9zJE zvSzT#0X;%+4+`)|JuQJ9iy7DR$XXIHlDU-AQsB*c#*N%S)r80M-g8>=EZWVuSkIE7 zgYdwUkEROGsbh@yK&lCkiA(^Rz4j90c}7cyBH*nD3`C4q0p}G}8t*Z_5(Wm=8b-}D zjsG7$GM*P^;9~thT~J8iNGlWYc2(Usz+#BO4v5QucpVTw17a~?ictsR1R$OX#Fv1W t8<+}RfVdWj`+yj<>yvTv`_sX 0 // extract header from lvldb and publish to PG-IPFS // hold onto the headerID so that we can link the state nodes to this header log.Infof("Creating snapshot at height %d", params.Height) @@ -105,8 +113,10 @@ func (s *Service) CreateSnapshot(params SnapshotParams) error { } headerID := header.Hash().String() + + ctx, cancelCtx := context.WithCancel(context.Background()) s.tracker = newTracker(s.recoveryFile, int(params.Workers)) - s.tracker.captureSignal() + s.tracker.captureSignal(cancelCtx) var iters []trie.NodeIterator // attempt to restore from recovery file if it exists @@ -124,7 +134,8 @@ func (s *Service) CreateSnapshot(params SnapshotParams) error { len(iters), params.Workers, ) } - } else { // nothing to restore + } else { + // nothing to restore log.Debugf("no iterators to restore") if params.Workers > 1 { iters = iter.SubtrieIterators(tree, params.Workers) @@ -132,7 +143,8 @@ func (s *Service) CreateSnapshot(params SnapshotParams) error { iters = []trie.NodeIterator{tree.NodeIterator(nil)} } for i, it := range iters { - iters[i] = s.tracker.tracked(it) + // recovered path is nil for fresh iterators + iters[i] = s.tracker.tracked(it, nil) } } @@ -143,22 +155,25 @@ func (s *Service) CreateSnapshot(params SnapshotParams) error { } }() - if len(iters) > 0 { - return s.createSnapshotAsync(iters, headerID, new(big.Int).SetUint64(params.Height)) - } else { - return s.createSnapshot(iters[0], headerID, new(big.Int).SetUint64(params.Height)) + switch { + case len(iters) > 1: + return s.createSnapshotAsync(ctx, iters, headerID, new(big.Int).SetUint64(params.Height), paths) + case len(iters) == 1: + return s.createSnapshot(ctx, iters[0], headerID, new(big.Int).SetUint64(params.Height), paths) + default: + return nil } } // Create snapshot up to head (ignores height param) -func (s *Service) CreateLatestSnapshot(workers uint) error { +func (s *Service) CreateLatestSnapshot(workers uint, watchedAddresses map[common.Address]struct{}) error { log.Info("Creating snapshot at head") hash := rawdb.ReadHeadHeaderHash(s.ethDB) height := rawdb.ReadHeaderNumber(s.ethDB, hash) if height == nil { return fmt.Errorf("unable to read header height for header hash %s", hash.String()) } - return s.CreateSnapshot(SnapshotParams{Height: *height, Workers: workers}) + return s.CreateSnapshot(SnapshotParams{Height: *height, Workers: workers, WatchedAddresses: watchedAddresses}) } type nodeResult struct { @@ -166,7 +181,7 @@ type nodeResult struct { elements []interface{} } -func resolveNode(it trie.NodeIterator, trieDB *trie.Database) (*nodeResult, error) { +func resolveNode(nodePath []byte, it trie.NodeIterator, trieDB *trie.Database) (*nodeResult, error) { // "leaf" nodes are actually "value" nodes, whose parents are the actual leaves if it.Leaf() { return nil, nil @@ -175,8 +190,10 @@ func resolveNode(it trie.NodeIterator, trieDB *trie.Database) (*nodeResult, erro return nil, nil } - path := make([]byte, len(it.Path())) - copy(path, it.Path()) + // use full node path + // (it.Path() will give partial path in case of subtrie iterators) + path := make([]byte, len(nodePath)) + copy(path, nodePath) n, err := trieDB.Node(it.Hash()) if err != nil { return nil, err @@ -199,7 +216,19 @@ func resolveNode(it trie.NodeIterator, trieDB *trie.Database) (*nodeResult, erro }, nil } -func (s *Service) createSnapshot(it trie.NodeIterator, headerID string, height *big.Int) error { +// validPath checks if a path is prefix to any one of the paths in the given list +func validPath(currentPath []byte, seekingPaths [][]byte) bool { + for _, seekingPath := range seekingPaths { + if bytes.HasPrefix(seekingPath, currentPath) { + return true + } + } + return false +} + +// createSnapshot performs traversal using the given iterator and indexes the nodes +// optionally filtering them according to a list of paths +func (s *Service) createSnapshot(ctx context.Context, it trie.NodeIterator, headerID string, height *big.Int, seekingPaths [][]byte) error { tx, err := s.ipfsPublisher.BeginTx() if err != nil { return err @@ -207,99 +236,234 @@ func (s *Service) createSnapshot(it trie.NodeIterator, headerID string, height * defer func() { err = CommitOrRollback(tx, err) if err != nil { - logrus.Errorf("CommitOrRollback failed: %s", err) + log.Errorf("CommitOrRollback failed: %s", err) } }() - for it.Next(true) { - res, err := resolveNode(it, s.stateDB.TrieDB()) - if err != nil { - return err - } - if res == nil { - continue - } + // path (from recovery dump) to be seeked on recovery + // nil in case of a fresh iterator + var recoveredPath []byte - tx, err = s.ipfsPublisher.PrepareTxForBatch(tx, s.maxBatchSize) - if err != nil { - return err - } + // latest path seeked from the concurrent iterator + // (updated after a node processed) + // nil in case of a fresh iterator; initially holds the recovered path in case of a recovered iterator + var seekedPath *[]byte - switch res.node.NodeType { - case Leaf: - // if the node is a leaf, decode the account and publish the associated storage trie - // nodes if there are any - var account types.StateAccount - if err := rlp.DecodeBytes(res.elements[1].([]byte), &account); err != nil { - return fmt.Errorf( - "error decoding account for leaf node at path %x nerror: %v", res.node.Path, err) + // end path for the concurrent iterator + var endPath []byte + + if iter, ok := it.(*trackedIter); ok { + seekedPath = &iter.seekedPath + recoveredPath = append(recoveredPath, *seekedPath...) + endPath = iter.endPath + } else { + return errors.New("untracked iterator") + } + + return s.createSubTrieSnapshot(ctx, tx, nil, it, recoveredPath, seekedPath, endPath, headerID, height, seekingPaths) +} + +// createSubTrieSnapshot processes nodes at the next level of a trie using the given subtrie iterator +// continually updating seekedPath with path of the latest processed node +func (s *Service) createSubTrieSnapshot(ctx context.Context, tx Tx, prefixPath []byte, subTrieIt trie.NodeIterator, recoveredPath []byte, seekedPath *[]byte, endPath []byte, headerID string, height *big.Int, seekingPaths [][]byte) error { + prom.IncActiveIterCount() + defer prom.DecActiveIterCount() + + // descend in the first loop iteration to reach first child node + descend := true + for { + select { + case <-ctx.Done(): + return errors.New("ctx cancelled") + default: + if ok := subTrieIt.Next(descend); !ok { + return subTrieIt.Error() } - partialPath := trie.CompactToHex(res.elements[0].([]byte)) - valueNodePath := append(res.node.Path, partialPath...) - encodedPath := trie.HexToCompact(valueNodePath) - leafKey := encodedPath[1:] - res.node.Key = common.BytesToHash(leafKey) - err := s.ipfsPublisher.PublishStateNode(&res.node, headerID, height, tx) + + // to avoid descending further + descend = false + + // move on to next node if current path is empty + // occurs when reaching root node or just before reaching the first child of a subtrie in case of some concurrent iterators + if bytes.Equal(subTrieIt.Path(), []byte{}) { + // if node path is empty and prefix is nil, it's the root node + if prefixPath == nil { + // create snapshot of node, if it is a leaf this will also create snapshot of entire storage trie + if err := s.createNodeSnapshot(tx, subTrieIt.Path(), subTrieIt, headerID, height); err != nil { + return err + } + updateSeekedPath(seekedPath, subTrieIt.Path()) + } + + if ok := subTrieIt.Next(true); !ok { + // return if no further nodes available + return subTrieIt.Error() + } + } + + // create the full node path as it.Path() doesn't include the path before subtrie root + nodePath := append(prefixPath, subTrieIt.Path()...) + + // check iterator upper bound before processing the node + // required to avoid processing duplicate nodes: + // if a node is considered more than once, + // it's whole subtrie is re-processed giving large number of duplicate nodoes + if !checkUpperPathBound(nodePath, endPath) { + // fmt.Println("failed checkUpperPathBound", nodePath, endPath) + // explicitly stop the iterator in tracker if upper bound check fails + // required since it won't be marked as stopped if further nodes are still available + if trackedSubtrieIt, ok := subTrieIt.(*trackedIter); ok { + s.tracker.stopIter(trackedSubtrieIt) + } + return subTrieIt.Error() + } + + // skip the current node if it's before recovered path and not along the recovered path + // nodes at the same level that are before recovered path are ignored to avoid duplicate nodes + // however, nodes along the recovered path are re-considered for redundancy + if bytes.Compare(recoveredPath, nodePath) > 0 && + // a node is along the recovered path if it's path is shorter or equal in length + // and is part of the recovered path + !(len(nodePath) <= len(recoveredPath) && bytes.Equal(recoveredPath[:len(nodePath)], nodePath)) { + continue + } + + // ignore node if it is not along paths of interest + if s.watchingAddresses && !validPath(nodePath, seekingPaths) { + // consider this node as processed since it is getting ignored + // and update the seeked path + updateSeekedPath(seekedPath, nodePath) + // move on to the next node + continue + } + + // if the node is along paths of interest + // create snapshot of node, if it is a leaf this will also create snapshot of entire storage trie + if err := s.createNodeSnapshot(tx, nodePath, subTrieIt, headerID, height); err != nil { + return err + } + // update seeked path after node has been processed + updateSeekedPath(seekedPath, nodePath) + + // create an iterator to traverse and process the next level of this subTrie + nextSubTrieIt, err := s.createSubTrieIt(nodePath, subTrieIt.Hash(), recoveredPath) if err != nil { return err } - - // publish any non-nil code referenced by codehash - if !bytes.Equal(account.CodeHash, emptyCodeHash) { - codeHash := common.BytesToHash(account.CodeHash) - codeBytes := rawdb.ReadCode(s.ethDB, codeHash) - if len(codeBytes) == 0 { - log.Error("Code is missing", "account", common.BytesToHash(it.LeafKey())) - return errors.New("missing code") - } - - if err = s.ipfsPublisher.PublishCode(height, codeHash, codeBytes, tx); err != nil { - return err - } - } - - if tx, err = s.storageSnapshot(account.Root, headerID, height, res.node.Path, tx); err != nil { - return fmt.Errorf("failed building storage snapshot for account %+v\r\nerror: %w", account, err) - } - case Extension, Branch: - res.node.Key = common.BytesToHash([]byte{}) - if err := s.ipfsPublisher.PublishStateNode(&res.node, headerID, height, tx); err != nil { + // pass on the seekedPath of the tracked concurrent iterator to be updated + if err := s.createSubTrieSnapshot(ctx, tx, nodePath, nextSubTrieIt, recoveredPath, seekedPath, endPath, headerID, height, seekingPaths); err != nil { return err } - default: - return errors.New("unexpected node type") } } +} + +// createSubTrieIt creates an iterator to traverse the subtrie of node with the given hash +// the subtrie iterator is initialized at a node from the recovered path at corresponding level (if avaiable) +func (s *Service) createSubTrieIt(prefixPath []byte, hash common.Hash, recoveredPath []byte) (trie.NodeIterator, error) { + // skip directly to the node from the recovered path at corresponding level + // applicable if: + // node path is behind recovered path + // and recovered path includes the prefix path + var startPath []byte + if bytes.Compare(recoveredPath, prefixPath) > 0 && + len(recoveredPath) > len(prefixPath) && + bytes.Equal(recoveredPath[:len(prefixPath)], prefixPath) { + startPath = append(startPath, recoveredPath[len(prefixPath):len(prefixPath)+1]...) + // force the lower bound path to an even length + // (required by HexToKeyBytes()) + if len(startPath)&0b1 == 1 { + // decrement first to avoid skipped nodes + decrementPath(startPath) + startPath = append(startPath, 0) + } + } + + // create subTrie iterator with the given hash + subTrie, err := s.stateDB.OpenTrie(hash) + if err != nil { + return nil, err + } + + return subTrie.NodeIterator(iter.HexToKeyBytes(startPath)), nil +} + +// createNodeSnapshot indexes the current node +// entire storage trie is also indexed (if available) +func (s *Service) createNodeSnapshot(tx Tx, path []byte, it trie.NodeIterator, headerID string, height *big.Int) error { + res, err := resolveNode(path, it, s.stateDB.TrieDB()) + if err != nil { + return err + } + if res == nil { + return nil + } + + tx, err = s.ipfsPublisher.PrepareTxForBatch(tx, s.maxBatchSize) + if err != nil { + return err + } + + switch res.node.NodeType { + case Leaf: + // if the node is a leaf, decode the account and publish the associated storage trie + // nodes if there are any + var account types.StateAccount + if err := rlp.DecodeBytes(res.elements[1].([]byte), &account); err != nil { + return fmt.Errorf( + "error decoding account for leaf node at path %x nerror: %v", res.node.Path, err) + } + partialPath := trie.CompactToHex(res.elements[0].([]byte)) + valueNodePath := append(res.node.Path, partialPath...) + encodedPath := trie.HexToCompact(valueNodePath) + leafKey := encodedPath[1:] + res.node.Key = common.BytesToHash(leafKey) + if err := s.ipfsPublisher.PublishStateNode(&res.node, headerID, height, tx); err != nil { + return err + } + + // publish any non-nil code referenced by codehash + if !bytes.Equal(account.CodeHash, emptyCodeHash) { + codeHash := common.BytesToHash(account.CodeHash) + codeBytes := rawdb.ReadCode(s.ethDB, codeHash) + if len(codeBytes) == 0 { + log.Error("Code is missing", "account", common.BytesToHash(it.LeafKey())) + return errors.New("missing code") + } + + if err = s.ipfsPublisher.PublishCode(height, codeHash, codeBytes, tx); err != nil { + return err + } + } + + if _, err = s.storageSnapshot(account.Root, headerID, height, res.node.Path, tx); err != nil { + return fmt.Errorf("failed building storage snapshot for account %+v\r\nerror: %w", account, err) + } + case Extension, Branch: + res.node.Key = common.BytesToHash([]byte{}) + if err := s.ipfsPublisher.PublishStateNode(&res.node, headerID, height, tx); err != nil { + return err + } + default: + return errors.New("unexpected node type") + } return it.Error() } // Full-trie concurrent snapshot -func (s *Service) createSnapshotAsync(iters []trie.NodeIterator, headerID string, height *big.Int) error { - errors := make(chan error) - var wg sync.WaitGroup +func (s *Service) createSnapshotAsync(ctx context.Context, iters []trie.NodeIterator, headerID string, height *big.Int, seekingPaths [][]byte) error { + // use errgroup with a context to stop all concurrent iterators if one runs into an error + // each concurrent iterator completes processing it's current node before stopping + g, ctx := errgroup.WithContext(ctx) for _, it := range iters { - wg.Add(1) - go func(it trie.NodeIterator) { - defer wg.Done() - if err := s.createSnapshot(it, headerID, height); err != nil { - errors <- err - } + func(it trie.NodeIterator) { + g.Go(func() error { + return s.createSnapshot(ctx, it, headerID, height, seekingPaths) + }) }(it) } - done := make(chan struct{}) - go func() { - wg.Wait() - done <- struct{}{} - }() - - var err error - select { - case err = <-errors: - case <-done: - close(errors) - } - return err + return g.Wait() } func (s *Service) storageSnapshot(sr common.Hash, headerID string, height *big.Int, statePath []byte, tx Tx) (Tx, error) { @@ -314,7 +478,7 @@ func (s *Service) storageSnapshot(sr common.Hash, headerID string, height *big.I it := sTrie.NodeIterator(make([]byte, 0)) for it.Next(true) { - res, err := resolveNode(it, s.stateDB.TrieDB()) + res, err := resolveNode(it.Path(), it, s.stateDB.TrieDB()) if err != nil { return nil, err } diff --git a/pkg/snapshot/service_test.go b/pkg/snapshot/service_test.go index a021ec1..16ad2df 100644 --- a/pkg/snapshot/service_test.go +++ b/pkg/snapshot/service_test.go @@ -2,12 +2,17 @@ package snapshot import ( "errors" + "fmt" "math/big" + "math/rand" "os" "path/filepath" + "sync" + "sync/atomic" "testing" "time" + "github.com/ethereum/go-ethereum/common" "github.com/golang/mock/gomock" fixt "github.com/vulcanize/ipld-eth-state-snapshot/fixture" @@ -16,6 +21,16 @@ import ( "github.com/vulcanize/ipld-eth-state-snapshot/test" ) +var ( + stateNodeNotIndexedErr = "state node not indexed for path %v" + storageNodeNotIndexedErr = "storage node not indexed for state path %v, storage path %v" + + unexpectedStateNodeErr = "got unexpected state node for path %v" + unexpectedStorageNodeErr = "got unexpected storage node for state path %v, storage path %v" + + extraNodesIndexedErr = "number of nodes indexed (%v) is more than expected (max %v)" +) + func testConfig(leveldbpath, ancientdbpath string) *Config { return &Config{ Eth: &EthConfig{ @@ -39,23 +54,40 @@ func makeMocks(t *testing.T) (*mock.MockPublisher, *mock.MockTx) { func TestCreateSnapshot(t *testing.T) { runCase := func(t *testing.T, workers int) { + // map: expected state path -> struct{}{} + expectedStateNodePaths := sync.Map{} + for _, path := range fixt.Block1_StateNodePaths { + expectedStateNodePaths.Store(string(path), struct{}{}) + } + pub, tx := makeMocks(t) pub.EXPECT().PublishHeader(gomock.Eq(&fixt.Block1_Header)) pub.EXPECT().BeginTx().Return(tx, nil). Times(workers) pub.EXPECT().PrepareTxForBatch(gomock.Any(), gomock.Any()).Return(tx, nil). AnyTimes() - pub.EXPECT().PublishStateNode(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). - // Use MinTimes as duplicate nodes are expected at boundaries - MinTimes(len(fixt.Block1_StateNodePaths)) + tx.EXPECT().Commit(). + Times(workers) + pub.EXPECT().PublishStateNode( + gomock.Any(), + gomock.Eq(fixt.Block1_Header.Hash().String()), + gomock.Eq(fixt.Block1_Header.Number), + gomock.Eq(tx)). + DoAndReturn(func(node *snapt.Node, _ string, _ *big.Int, _ snapt.Tx) error { + if _, ok := expectedStateNodePaths.Load(string(node.Path)); ok { + expectedStateNodePaths.Delete(string(node.Path)) + } else { + t.Fatalf(unexpectedStateNodeErr, node.Path) + } + return nil + }). + Times(len(fixt.Block1_StateNodePaths)) // TODO: fixtures for storage node // pub.EXPECT().PublishStorageNode(gomock.Eq(fixt.StorageNode), gomock.Eq(int64(0)), gomock.Any()) - tx.EXPECT().Commit(). - Times(workers) - - config := testConfig(fixt.ChaindataPath, fixt.AncientdataPath) + chainDataPath, ancientDataPath := fixt.GetChainDataPath("chaindata") + config := testConfig(chainDataPath, ancientDataPath) edb, err := NewLevelDB(config.Eth) if err != nil { t.Fatal(err) @@ -73,30 +105,221 @@ func TestCreateSnapshot(t *testing.T) { if err != nil { t.Fatal(err) } + + // Check if all expected state nodes are indexed + expectedStateNodePaths.Range(func(key, value any) bool { + t.Fatalf(stateNodeNotIndexedErr, []byte(key.(string))) + return true + }) } - testCases := []int{1, 4, 16, 32} + testCases := []int{1, 4, 8, 16, 32} for _, tc := range testCases { t.Run("case", func(t *testing.T) { runCase(t, tc) }) } } -func failingPublishStateNode(_ *snapt.Node, _ string, _ *big.Int, _ snapt.Tx) error { - return errors.New("failingPublishStateNode") +type indexedNode struct { + value snapt.Node + isIndexed bool +} + +type storageNodeKey struct { + statePath string + storagePath string +} + +func TestAccountSelectiveSnapshot(t *testing.T) { + snapShotHeight := uint64(32) + watchedAddresses := map[common.Address]struct{}{ + common.HexToAddress("0x825a6eec09e44Cb0fa19b84353ad0f7858d7F61a"): {}, + common.HexToAddress("0x0616F59D291a898e796a1FAD044C5926ed2103eC"): {}, + } + + expectedStateNodeIndexes := []int{0, 1, 2, 6} + + statePath33 := []byte{3, 3} + expectedStorageNodeIndexes33 := []int{0, 1, 2, 3, 4, 6, 8} + + statePath12 := []byte{12} + expectedStorageNodeIndexes12 := []int{12, 14, 16} + + runCase := func(t *testing.T, workers int) { + expectedStateNodes := sync.Map{} + + for _, expectedStateNodeIndex := range expectedStateNodeIndexes { + path := fixt.Chain2_Block32_StateNodes[expectedStateNodeIndex].Path + expectedStateNodes.Store(string(path), indexedNode{ + value: fixt.Chain2_Block32_StateNodes[expectedStateNodeIndex], + isIndexed: false, + }) + } + + expectedStorageNodes := sync.Map{} + + for _, expectedStorageNodeIndex := range expectedStorageNodeIndexes33 { + path := fixt.Chain2_Block32_StorageNodes[expectedStorageNodeIndex].Path + key := storageNodeKey{ + statePath: string(statePath33), + storagePath: string(path), + } + value := indexedNode{ + value: fixt.Chain2_Block32_StorageNodes[expectedStorageNodeIndex].Node, + isIndexed: false, + } + expectedStorageNodes.Store(key, value) + } + + for _, expectedStorageNodeIndex := range expectedStorageNodeIndexes12 { + path := fixt.Chain2_Block32_StorageNodes[expectedStorageNodeIndex].Path + key := storageNodeKey{ + statePath: string(statePath12), + storagePath: string(path), + } + value := indexedNode{ + value: fixt.Chain2_Block32_StorageNodes[expectedStorageNodeIndex].Node, + isIndexed: false, + } + expectedStorageNodes.Store(key, value) + } + + pub, tx := makeMocks(t) + pub.EXPECT().PublishHeader(gomock.Eq(&fixt.Chain2_Block32_Header)) + pub.EXPECT().BeginTx().Return(tx, nil). + Times(workers) + pub.EXPECT().PrepareTxForBatch(gomock.Any(), gomock.Any()).Return(tx, nil). + AnyTimes() + tx.EXPECT().Commit(). + Times(workers) + pub.EXPECT().PublishCode(gomock.Eq(fixt.Chain2_Block32_Header.Number), gomock.Any(), gomock.Any(), gomock.Eq(tx)). + AnyTimes() + pub.EXPECT().PublishStateNode( + gomock.Any(), + gomock.Eq(fixt.Chain2_Block32_Header.Hash().String()), + gomock.Eq(fixt.Chain2_Block32_Header.Number), + gomock.Eq(tx)). + Do(func(node *snapt.Node, _ string, _ *big.Int, _ snapt.Tx) error { + key := string(node.Path) + // Check published nodes + if expectedStateNode, ok := expectedStateNodes.Load(key); ok { + expectedVal := expectedStateNode.(indexedNode).value + test.ExpectEqual(t, expectedVal, *node) + + // Mark expected node as indexed + expectedStateNodes.Store(key, indexedNode{ + value: expectedVal, + isIndexed: true, + }) + } else { + t.Fatalf(unexpectedStateNodeErr, node.Path) + } + return nil + }). + AnyTimes() + pub.EXPECT().PublishStorageNode( + gomock.Any(), + gomock.Eq(fixt.Chain2_Block32_Header.Hash().String()), + gomock.Eq(new(big.Int).SetUint64(snapShotHeight)), + gomock.Any(), + gomock.Eq(tx)). + Do(func(node *snapt.Node, _ string, _ *big.Int, statePath []byte, _ snapt.Tx) error { + key := storageNodeKey{ + statePath: string(statePath), + storagePath: string(node.Path), + } + // Check published nodes + if expectedStorageNode, ok := expectedStorageNodes.Load(key); ok { + expectedVal := expectedStorageNode.(indexedNode).value + test.ExpectEqual(t, expectedVal, *node) + + // Mark expected node as indexed + expectedStorageNodes.Store(key, indexedNode{ + value: expectedVal, + isIndexed: true, + }) + } else { + t.Fatalf(unexpectedStorageNodeErr, statePath, node.Path) + } + return nil + }). + AnyTimes() + + chainDataPath, ancientDataPath := fixt.GetChainDataPath("chain2data") + config := testConfig(chainDataPath, ancientDataPath) + edb, err := NewLevelDB(config.Eth) + if err != nil { + t.Fatal(err) + } + defer edb.Close() + + recovery := filepath.Join(t.TempDir(), "recover.csv") + service, err := NewSnapshotService(edb, pub, recovery) + if err != nil { + t.Fatal(err) + } + + params := SnapshotParams{Height: snapShotHeight, Workers: uint(workers), WatchedAddresses: watchedAddresses} + err = service.CreateSnapshot(params) + if err != nil { + t.Fatal(err) + } + + expectedStateNodes.Range(func(key, value any) bool { + if !value.(indexedNode).isIndexed { + t.Fatalf(stateNodeNotIndexedErr, []byte(key.(string))) + return false + } + return true + }) + expectedStorageNodes.Range(func(key, value any) bool { + if !value.(indexedNode).isIndexed { + t.Fatalf(storageNodeNotIndexedErr, []byte(key.(storageNodeKey).statePath), []byte(key.(storageNodeKey).storagePath)) + return false + } + return true + }) + } + + testCases := []int{1, 4, 8, 16, 32} + for _, tc := range testCases { + t.Run("case", func(t *testing.T) { runCase(t, tc) }) + } } func TestRecovery(t *testing.T) { - runCase := func(t *testing.T, workers int) { - pub, tx := makeMocks(t) - pub.EXPECT().PublishHeader(gomock.Any()).AnyTimes() - pub.EXPECT().BeginTx().Return(tx, nil).AnyTimes() - pub.EXPECT().PrepareTxForBatch(gomock.Any(), gomock.Any()).Return(tx, nil).AnyTimes() - pub.EXPECT().PublishStateNode(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). - Times(workers). - DoAndReturn(failingPublishStateNode) - tx.EXPECT().Commit().AnyTimes() + maxPathLength := 4 + runCase := func(t *testing.T, workers int, interruptAt int32) { + // map: expected state path -> number of times it got published + expectedStateNodePaths := sync.Map{} + for _, path := range fixt.Block1_StateNodePaths { + expectedStateNodePaths.Store(string(path), 0) + } + var indexedStateNodesCount int32 - config := testConfig(fixt.ChaindataPath, fixt.AncientdataPath) + pub, tx := makeMocks(t) + pub.EXPECT().PublishHeader(gomock.Eq(&fixt.Block1_Header)) + pub.EXPECT().BeginTx().Return(tx, nil).MaxTimes(workers) + pub.EXPECT().PrepareTxForBatch(gomock.Any(), gomock.Any()).Return(tx, nil).AnyTimes() + tx.EXPECT().Commit().MaxTimes(workers) + pub.EXPECT().PublishStateNode(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + DoAndReturn(func(node *snapt.Node, _ string, _ *big.Int, _ snapt.Tx) error { + // Start throwing an error after a certain number of state nodes have been indexed + if indexedStateNodesCount >= interruptAt { + return errors.New("failingPublishStateNode") + } else { + if prevCount, ok := expectedStateNodePaths.Load(string(node.Path)); ok { + expectedStateNodePaths.Store(string(node.Path), prevCount.(int)+1) + atomic.AddInt32(&indexedStateNodesCount, 1) + } else { + t.Fatalf(unexpectedStateNodeErr, node.Path) + } + } + return nil + }). + MaxTimes(int(interruptAt) + workers) + + chainDataPath, ancientDataPath := fixt.GetChainDataPath("chaindata") + config := testConfig(chainDataPath, ancientDataPath) edb, err := NewLevelDB(config.Eth) if err != nil { t.Fatal(err) @@ -119,15 +342,35 @@ func TestRecovery(t *testing.T) { t.Fatal("cannot stat recovery file:", err) } - // Wait for earlier snapshot process to complete - time.Sleep(5 * time.Second) + // Create new mocks for recovery + recoveryPub, tx := makeMocks(t) + recoveryPub.EXPECT().PublishHeader(gomock.Eq(&fixt.Block1_Header)) + recoveryPub.EXPECT().BeginTx().Return(tx, nil).AnyTimes() + recoveryPub.EXPECT().PrepareTxForBatch(gomock.Any(), gomock.Any()).Return(tx, nil).AnyTimes() + tx.EXPECT().Commit().AnyTimes() + recoveryPub.EXPECT().PublishStateNode(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + DoAndReturn(func(node *snapt.Node, _ string, _ *big.Int, _ snapt.Tx) error { + if prevCount, ok := expectedStateNodePaths.Load(string(node.Path)); ok { + expectedStateNodePaths.Store(string(node.Path), prevCount.(int)+1) + atomic.AddInt32(&indexedStateNodesCount, 1) + } else { + t.Fatalf(unexpectedStateNodeErr, node.Path) + } + return nil + }). + AnyTimes() - pub.EXPECT().PublishStateNode(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() - err = service.CreateSnapshot(params) + // Create a new snapshot service for recovery + recoveryService, err := NewSnapshotService(edb, recoveryPub, recovery) + if err != nil { + t.Fatal(err) + } + err = recoveryService.CreateSnapshot(params) if err != nil { t.Fatal(err) } + // Check if recovery file has been deleted _, err = os.Stat(recovery) if err == nil { t.Fatal("recovery file still present") @@ -136,11 +379,192 @@ func TestRecovery(t *testing.T) { t.Fatal(err) } } + + // Check if all state nodes are indexed after recovery + expectedStateNodePaths.Range(func(key, value any) bool { + if value.(int) == 0 { + t.Fatalf(stateNodeNotIndexedErr, []byte(key.(string))) + } + return true + }) + + // nodes along the recovery path get reindexed + maxStateNodesCount := len(fixt.Block1_StateNodePaths) + workers*maxPathLength + if indexedStateNodesCount > int32(maxStateNodesCount) { + t.Fatalf(extraNodesIndexedErr, indexedStateNodesCount, maxStateNodesCount) + } + } + + testCases := []int{1, 2, 4, 8, 16, 32} + numInterrupts := 3 + interrupts := make([]int32, numInterrupts) + for i := 0; i < numInterrupts; i++ { + rand.Seed(time.Now().UnixNano()) + interrupts[i] = rand.Int31n(int32(len(fixt.Block1_StateNodePaths))) } - testCases := []int{1, 4, 32} for _, tc := range testCases { - t.Run("case", func(t *testing.T) { runCase(t, tc) }) + for _, interrupt := range interrupts { + t.Run(fmt.Sprint("case", tc, interrupt), func(t *testing.T) { runCase(t, tc, interrupt) }) + } + } +} + +func TestAccountSelectiveRecovery(t *testing.T) { + maxPathLength := 2 + snapShotHeight := uint64(32) + watchedAddresses := map[common.Address]struct{}{ + common.HexToAddress("0x825a6eec09e44Cb0fa19b84353ad0f7858d7F61a"): {}, + common.HexToAddress("0x0616F59D291a898e796a1FAD044C5926ed2103eC"): {}, } + expectedStateNodeIndexes := []int{0, 1, 2, 6} + + runCase := func(t *testing.T, workers int, interruptAt int32) { + // map: expected state path -> number of times it got published + expectedStateNodePaths := sync.Map{} + for _, expectedStateNodeIndex := range expectedStateNodeIndexes { + path := fixt.Chain2_Block32_StateNodes[expectedStateNodeIndex].Path + expectedStateNodePaths.Store(string(path), 0) + } + var indexedStateNodesCount int32 + + pub, tx := makeMocks(t) + pub.EXPECT().PublishHeader(gomock.Eq(&fixt.Chain2_Block32_Header)) + pub.EXPECT().BeginTx().Return(tx, nil).Times(workers) + pub.EXPECT().PrepareTxForBatch(gomock.Any(), gomock.Any()).Return(tx, nil).AnyTimes() + tx.EXPECT().Commit().Times(workers) + pub.EXPECT().PublishStateNode( + gomock.Any(), + gomock.Eq(fixt.Chain2_Block32_Header.Hash().String()), + gomock.Eq(fixt.Chain2_Block32_Header.Number), + gomock.Eq(tx)). + DoAndReturn(func(node *snapt.Node, _ string, _ *big.Int, _ snapt.Tx) error { + // Start throwing an error after a certain number of state nodes have been indexed + if indexedStateNodesCount >= interruptAt { + return errors.New("failingPublishStateNode") + } else { + if prevCount, ok := expectedStateNodePaths.Load(string(node.Path)); ok { + expectedStateNodePaths.Store(string(node.Path), prevCount.(int)+1) + atomic.AddInt32(&indexedStateNodesCount, 1) + } else { + t.Fatalf(unexpectedStateNodeErr, node.Path) + } + } + return nil + }). + MaxTimes(int(interruptAt) + workers) + pub.EXPECT().PublishStorageNode( + gomock.Any(), + gomock.Eq(fixt.Chain2_Block32_Header.Hash().String()), + gomock.Eq(new(big.Int).SetUint64(snapShotHeight)), + gomock.Any(), + gomock.Eq(tx)). + AnyTimes() + pub.EXPECT().PublishCode(gomock.Eq(fixt.Chain2_Block32_Header.Number), gomock.Any(), gomock.Any(), gomock.Eq(tx)). + AnyTimes() + + chainDataPath, ancientDataPath := fixt.GetChainDataPath("chain2data") + config := testConfig(chainDataPath, ancientDataPath) + edb, err := NewLevelDB(config.Eth) + if err != nil { + t.Fatal(err) + } + defer edb.Close() + + recovery := filepath.Join(t.TempDir(), "recover.csv") + service, err := NewSnapshotService(edb, pub, recovery) + if err != nil { + t.Fatal(err) + } + + params := SnapshotParams{Height: snapShotHeight, Workers: uint(workers), WatchedAddresses: watchedAddresses} + err = service.CreateSnapshot(params) + if err == nil { + t.Fatal("expected an error") + } + + if _, err = os.Stat(recovery); err != nil { + t.Fatal("cannot stat recovery file:", err) + } + + // Create new mocks for recovery + recoveryPub, tx := makeMocks(t) + recoveryPub.EXPECT().PublishHeader(gomock.Eq(&fixt.Chain2_Block32_Header)) + recoveryPub.EXPECT().BeginTx().Return(tx, nil).MaxTimes(workers) + recoveryPub.EXPECT().PrepareTxForBatch(gomock.Any(), gomock.Any()).Return(tx, nil).AnyTimes() + tx.EXPECT().Commit().MaxTimes(workers) + recoveryPub.EXPECT().PublishStateNode( + gomock.Any(), + gomock.Eq(fixt.Chain2_Block32_Header.Hash().String()), + gomock.Eq(fixt.Chain2_Block32_Header.Number), + gomock.Eq(tx)). + DoAndReturn(func(node *snapt.Node, _ string, _ *big.Int, _ snapt.Tx) error { + if prevCount, ok := expectedStateNodePaths.Load(string(node.Path)); ok { + expectedStateNodePaths.Store(string(node.Path), prevCount.(int)+1) + atomic.AddInt32(&indexedStateNodesCount, 1) + } else { + t.Fatalf(unexpectedStateNodeErr, node.Path) + } + return nil + }). + AnyTimes() + recoveryPub.EXPECT().PublishStorageNode( + gomock.Any(), + gomock.Eq(fixt.Chain2_Block32_Header.Hash().String()), + gomock.Eq(new(big.Int).SetUint64(snapShotHeight)), + gomock.Any(), + gomock.Eq(tx)). + AnyTimes() + recoveryPub.EXPECT().PublishCode(gomock.Eq(fixt.Chain2_Block32_Header.Number), gomock.Any(), gomock.Any(), gomock.Eq(tx)). + AnyTimes() + + // Create a new snapshot service for recovery + recoveryService, err := NewSnapshotService(edb, recoveryPub, recovery) + if err != nil { + t.Fatal(err) + } + err = recoveryService.CreateSnapshot(params) + if err != nil { + t.Fatal(err) + } + + // Check if recovery file has been deleted + _, err = os.Stat(recovery) + if err == nil { + t.Fatal("recovery file still present") + } else { + if !os.IsNotExist(err) { + t.Fatal(err) + } + } + + // Check if all expected state nodes are indexed after recovery + expectedStateNodePaths.Range(func(key, value any) bool { + if value.(int) == 0 { + t.Fatalf(stateNodeNotIndexedErr, []byte(key.(string))) + } + return true + }) + + // nodes along the recovery path get reindexed + maxStateNodesCount := len(expectedStateNodeIndexes) + workers*maxPathLength + if indexedStateNodesCount > int32(maxStateNodesCount) { + t.Fatalf(extraNodesIndexedErr, indexedStateNodesCount, maxStateNodesCount) + } + } + + testCases := []int{1, 2, 4, 8, 16, 32} + numInterrupts := 2 + interrupts := make([]int32, numInterrupts) + for i := 0; i < numInterrupts; i++ { + rand.Seed(time.Now().UnixNano()) + interrupts[i] = rand.Int31n(int32(len(expectedStateNodeIndexes))) + } + + for _, tc := range testCases { + for _, interrupt := range interrupts { + t.Run(fmt.Sprint("case", tc, interrupt), func(t *testing.T) { runCase(t, tc, interrupt) }) + } + } } diff --git a/pkg/snapshot/tracker.go b/pkg/snapshot/tracker.go index 7192eec..caa2239 100644 --- a/pkg/snapshot/tracker.go +++ b/pkg/snapshot/tracker.go @@ -1,6 +1,8 @@ package snapshot import ( + "bytes" + "context" "encoding/csv" "fmt" "os" @@ -17,10 +19,14 @@ import ( type trackedIter struct { trie.NodeIterator tracker *iteratorTracker + + seekedPath []byte // latest path seeked from the tracked iterator + endPath []byte // endPath for the tracked iterator } func (it *trackedIter) Next(descend bool) bool { ret := it.NodeIterator.Next(descend) + if !ret { if it.tracker.running { it.tracker.stopChan <- it @@ -51,45 +57,79 @@ func newTracker(file string, buf int) iteratorTracker { } } -func (tr *iteratorTracker) captureSignal() { +func (tr *iteratorTracker) captureSignal(cancelCtx context.CancelFunc) { sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) go func() { sig := <-sigChan log.Errorf("Signal received (%v), stopping", sig) - tr.haltAndDump() - os.Exit(1) + // cancel context on receiving a signal + // on ctx cancellation, all the iterators complete processing of their current node before stopping + cancelCtx() }() } // Wraps an iterator in a trackedIter. This should not be called once halts are possible. -func (tr *iteratorTracker) tracked(it trie.NodeIterator) (ret *trackedIter) { - ret = &trackedIter{it, tr} +func (tr *iteratorTracker) tracked(it trie.NodeIterator, recoveredPath []byte) (ret *trackedIter) { + // create seeked path of max capacity (65) + iterSeekedPath := make([]byte, 0, 65) + // intially populate seeked path with the recovered path + // to be used in trie traversal + if recoveredPath != nil { + iterSeekedPath = append(iterSeekedPath, recoveredPath...) + } + + // if the iterator being tracked is a PrefixBoundIterator, capture it's end path + // to be used in trie traversal + var endPath []byte + if boundedIter, ok := it.(*iter.PrefixBoundIterator); ok { + endPath = boundedIter.EndPath + } + + ret = &trackedIter{it, tr, iterSeekedPath, endPath} tr.startChan <- ret return } +// explicitly stops an iterator +func (tr *iteratorTracker) stopIter(it *trackedIter) { + tr.stopChan <- it +} + // dumps iterator path and bounds to a text file so it can be restored later func (tr *iteratorTracker) dump() error { log.Debug("Dumping recovery state to: ", tr.recoveryFile) var rows [][]string - for it, _ := range tr.started { + for it := range tr.started { + var startPath []byte var endPath []byte if impl, ok := it.NodeIterator.(*iter.PrefixBoundIterator); ok { + // if the iterator being tracked is a PrefixBoundIterator, + // initialize start and end paths with its bounds + startPath = impl.StartPath endPath = impl.EndPath } + + // if seeked path and iterator path are non-empty, use iterator's path as startpath + if !bytes.Equal(it.seekedPath, []byte{}) && !bytes.Equal(it.Path(), []byte{}) { + startPath = it.Path() + } + rows = append(rows, []string{ - fmt.Sprintf("%x", it.Path()), + fmt.Sprintf("%x", startPath), fmt.Sprintf("%x", endPath), + fmt.Sprintf("%x", it.seekedPath), }) } + file, err := os.Create(tr.recoveryFile) if err != nil { return err } defer file.Close() out := csv.NewWriter(file) + return out.WriteAll(rows) } @@ -106,30 +146,45 @@ func (tr *iteratorTracker) restore(tree state.Trie) ([]trie.NodeIterator, error) log.Debug("Restoring recovery state from: ", tr.recoveryFile) defer file.Close() in := csv.NewReader(file) - in.FieldsPerRecord = 2 + in.FieldsPerRecord = 3 rows, err := in.ReadAll() if err != nil { return nil, err } + var ret []trie.NodeIterator for _, row := range rows { // pick up where each interval left off - var paths [2][]byte - for i, val := range row { - if len(val) != 0 { - if _, err = fmt.Sscanf(val, "%x", &paths[i]); err != nil { - return nil, err - } + var startPath []byte + var endPath []byte + var recoveredPath []byte + + if len(row[0]) != 0 { + if _, err = fmt.Sscanf(row[0], "%x", &startPath); err != nil { + return nil, err + } + } + if len(row[1]) != 0 { + if _, err = fmt.Sscanf(row[1], "%x", &endPath); err != nil { + return nil, err + } + } + if len(row[2]) != 0 { + if _, err = fmt.Sscanf(row[2], "%x", &recoveredPath); err != nil { + return nil, err } } - // Force the lower bound path to an even length - if len(paths[0])&0b1 == 1 { - decrementPath(paths[0]) // decrement first to avoid skipped nodes - paths[0] = append(paths[0], 0) + // force the lower bound path to an even length + // (required by HexToKeyBytes()) + if len(startPath)&0b1 == 1 { + // decrement first to avoid skipped nodes + decrementPath(startPath) + startPath = append(startPath, 0) } - it := iter.NewPrefixBoundIterator(tree.NodeIterator(iter.HexToKeyBytes(paths[0])), paths[1]) - ret = append(ret, tr.tracked(it)) + + it := iter.NewPrefixBoundIterator(tree.NodeIterator(iter.HexToKeyBytes(startPath)), startPath, endPath) + ret = append(ret, tr.tracked(it, recoveredPath)) } return ret, nil } @@ -137,7 +192,7 @@ func (tr *iteratorTracker) restore(tree state.Trie) ([]trie.NodeIterator, error) func (tr *iteratorTracker) haltAndDump() error { tr.running = false - // drain any pending events + // drain any pending iterators close(tr.startChan) for start := range tr.startChan { tr.started[start] = struct{}{} @@ -159,5 +214,6 @@ func (tr *iteratorTracker) haltAndDump() error { } return err } + return tr.dump() } diff --git a/pkg/snapshot/util.go b/pkg/snapshot/util.go index e9db47d..1f2357c 100644 --- a/pkg/snapshot/util.go +++ b/pkg/snapshot/util.go @@ -1,6 +1,7 @@ package snapshot import ( + "bytes" "context" "fmt" @@ -51,3 +52,37 @@ func decrementPath(path []byte) bool { } return true } + +// https://github.com/ethereum/go-ethereum/blob/master/trie/encoding.go#L97 +func keybytesToHex(str []byte) []byte { + l := len(str)*2 + 1 + var nibbles = make([]byte, l) + for i, b := range str { + nibbles[i*2] = b / 16 + nibbles[i*2+1] = b % 16 + } + nibbles[l-1] = 16 + return nibbles +} + +func updateSeekedPath(seekedPath *[]byte, nodePath []byte) { + // assumes len(nodePath) <= max len(*seekedPath) + *seekedPath = (*seekedPath)[:len(nodePath)] + copy(*seekedPath, nodePath) +} + +// checks that the provided node path is before the end path +func checkUpperPathBound(nodePath, endPath []byte) bool { + // every path is before nil endPath + if endPath == nil { + return true + } + + if len(endPath)%2 == 0 { + // in case of even length endpath + // apply open interval filter since the node at endpath will be covered by the next iterator + return bytes.Compare(nodePath, endPath) < 0 + } + + return bytes.Compare(nodePath, endPath) <= 0 +}