Fetch all onboarding txs in a single query

This commit is contained in:
Prathamesh Musale 2024-08-14 20:14:54 +05:30
parent 71e292cfc2
commit 048c6f1604
2 changed files with 55 additions and 29 deletions

View File

@ -2,17 +2,17 @@
## Prerequisites ## Prerequisites
- NodeJS - `v20.16.0` - NodeJS >= `v18.17.x`
## Instructions ## Instructions
* Install dependencies and build: - Install dependencies and build:
```bash ```bash
yarn && yarn build yarn && yarn build
``` ```
* Create required env configuration: - Create required env configuration:
```bash ```bash
# Update the values as required # Update the values as required
@ -20,7 +20,7 @@
cp .env.example .env cp .env.example .env
``` ```
* Map subscribers to onboarded participants: - Map subscribers to onboarded participants:
```bash ```bash
yarn map-subscribers-to-participants --subscribers-csv <subscribers-csv-file> --output <output-csv-file> yarn map-subscribers-to-participants --subscribers-csv <subscribers-csv-file> --output <output-csv-file>

View File

@ -10,6 +10,7 @@ import * as cliProgress from 'cli-progress';
import { StargateClient } from '@cosmjs/stargate'; import { StargateClient } from '@cosmjs/stargate';
import { Registry } from '@cerc-io/registry-sdk'; import { Registry } from '@cerc-io/registry-sdk';
import { decodeTxRaw, decodePubkey } from '@cosmjs/proto-signing';
dotenv.config(); dotenv.config();
@ -23,8 +24,12 @@ async function main(): Promise<void> {
const registry = new Registry(LACONICD_GQL_ENDPOINT, LACONICD_RPC_ENDPOINT, LACONICD_CHAIN_ID); const registry = new Registry(LACONICD_GQL_ENDPOINT, LACONICD_RPC_ENDPOINT, LACONICD_CHAIN_ID);
const client = await StargateClient.connect(LACONICD_RPC_ENDPOINT); const client = await StargateClient.connect(LACONICD_RPC_ENDPOINT);
console.time('time_taken_getParticipants');
const participants = await registry.getParticipants(); const participants = await registry.getParticipants();
console.timeEnd('time_taken_getParticipants');
const subscribers = await readSubscribers(argv.subscribersCsv); const subscribers = await readSubscribers(argv.subscribersCsv);
console.log('Read subscribers, count:', subscribers.length);
await processSubscribers(client, participants, subscribers, argv.output); await processSubscribers(client, participants, subscribers, argv.output);
} }
@ -47,6 +52,47 @@ async function processSubscribers(client: StargateClient, participants: any[], s
kycMap[participant.kycId] = participant; kycMap[participant.kycId] = participant;
}); });
const onboardingTxsHeightMap: Record<string, { txHeight: number, pubkey: string }> = {};
console.time('time_taken_searchTx');
const onboardingTxs = await client.searchTx(`message.action='/cerc.onboarding.v1.MsgOnboardParticipant'`);
console.timeEnd('time_taken_searchTx');
console.log('Fetched onboardingTxs, count:', onboardingTxs.length);
console.time('time_taken_decodingTxs');
onboardingTxs.forEach(onboardingTx => {
const rawPubkey = decodeTxRaw(onboardingTx.tx).authInfo.signerInfos[0].publicKey;
if (!rawPubkey) {
console.error('pubkey not found in tx', onboardingTx.hash);
return;
}
const pubkey = decodePubkey(rawPubkey).value;
// Determine sender
const onboardParticipantEvent = onboardingTx.events.find(event => event.type === 'onboard_participant');
if (!onboardParticipantEvent) {
console.error('onboard_participant event not found in tx', onboardingTx.hash);
return;
}
const sender = onboardParticipantEvent.attributes.find(attr => attr.key === 'signer')?.value;
if (!sender) {
console.error('sender not found in onboard_participant event for tx', onboardingTx.hash)
return;
}
// Update if already exists
let latesTxHeight = onboardingTx.height;
if (onboardingTxsHeightMap[sender]) {
latesTxHeight = latesTxHeight > onboardingTxsHeightMap[sender].txHeight ? latesTxHeight : onboardingTxsHeightMap[sender].txHeight;
}
onboardingTxsHeightMap[sender] = { txHeight: latesTxHeight, pubkey };
});
console.timeEnd('time_taken_decodingTxs');
// Create a new progress bar instance // Create a new progress bar instance
const progressBar = new cliProgress.SingleBar({}, cliProgress.Presets.shades_classic); const progressBar = new cliProgress.SingleBar({}, cliProgress.Presets.shades_classic);
progressBar.start(subscribers.length, 0); progressBar.start(subscribers.length, 0);
@ -63,49 +109,29 @@ async function processSubscribers(client: StargateClient, participants: any[], s
const participantAddresss = participant['cosmosAddress']; const participantAddresss = participant['cosmosAddress'];
// Fetch participant's Laconic pubkey
const participantAccount = await client.getAccount(participantAddresss);
const participantPubkey = participantAccount?.pubkey;
// Skip participant if pubkey not found
// (account may have funds but hasn't done any tx till now)
if (!participantPubkey) {
continue;
}
// Skip participant if an onboarding tx not found // Skip participant if an onboarding tx not found
const onboardingTxs = await client.searchTx(`message.sender='${participantAddresss}' AND message.action='/cerc.onboarding.v1.MsgOnboardParticipant'`); if (!onboardingTxsHeightMap[participantAddresss]) {
if (onboardingTxs.length === 0) {
continue; continue;
} }
const latestOnboardingTx = onboardingTxs.reduce((prev, current) => {
return current.height > prev.height ? current : prev;
});
const onboardedSubscriber = { const onboardedSubscriber = {
subscriber_id: subscriber['subscriber_id'], subscriber_id: subscriber['subscriber_id'],
email: subscriber['email'], email: subscriber['email'],
status: subscriber['status'], status: subscriber['status'],
'premium?': subscriber['premium?'], 'premium?': subscriber['premium?'],
created_at: subscriber['created_at'], created_at: subscriber['created_at'],
cosmos_address: participantAddresss, laconic_address: participantAddresss,
nitro_address: participant['nitroAddress'], nitro_address: participant['nitroAddress'],
role: participant['role'], role: participant['role'],
hashed_subscriber_id: participant['kycId'], hashed_subscriber_id: participant['kycId'],
laconic_pubkey: participantPubkey.value, laconic_pubkey: onboardingTxsHeightMap[participantAddresss].pubkey,
onboarding_height: latestOnboardingTx.height onboarding_height: onboardingTxsHeightMap[participantAddresss].txHeight
}; };
onboardedSubscribers.push(onboardedSubscriber); onboardedSubscribers.push(onboardedSubscriber);
// Update the progress bar // Update the progress bar
progressBar.update(i + 1); progressBar.update(i + 1);
// Wait for a second every 10 entries
if (i % 10 === 0) {
await new Promise(resolve => setTimeout(resolve, 1000));
}
} }
// Stop the progress bar // Stop the progress bar
@ -119,7 +145,7 @@ async function processSubscribers(client: StargateClient, participants: any[], s
{ id: 'status', title: 'status' }, { id: 'status', title: 'status' },
{ id: 'premium?', title: 'premium?' }, { id: 'premium?', title: 'premium?' },
{ id: 'created_at', title: 'created_at' }, { id: 'created_at', title: 'created_at' },
{ id: 'cosmos_address', title: 'cosmos_address' }, { id: 'laconic_address', title: 'laconic_address' },
{ id: 'nitro_address', title: 'nitro_address' }, { id: 'nitro_address', title: 'nitro_address' },
{ id: 'role', title: 'role' }, { id: 'role', title: 'role' },
{ id: 'hashed_subscriber_id', title: 'hashed_subscriber_id' }, { id: 'hashed_subscriber_id', title: 'hashed_subscriber_id' },