import type { ApolloClient, DocumentNode, FetchPolicy, OperationVariables, TypedDocumentNode, FetchResult, ErrorPolicy, ApolloQueryResult, } from '@apollo/client'; import type { GraphQLErrors } from '@apollo/client/errors'; import type { Subscription } from 'zen-observable-ts'; import isEqual from 'lodash/isEqual'; import { isNotFoundGraphQLError } from './apollo-client'; import type * as Schema from '@vegaprotocol/types'; interface UpdateData { delta?: Delta; isUpdate?: boolean; insertionData?: Data | null; isInsert?: boolean; } export interface UpdateCallback { ( arg: UpdateData & { data: Data | null; error?: Error; loading: boolean; loaded: boolean; pageInfo: PageInfo | null; totalCount?: number; } ): void; } export interface Load { (start?: number, end?: number): Promise; } export interface Reload { (forceReset?: boolean): void; } type Pagination = Schema.Pagination & { skip?: number; }; export interface PageInfo { startCursor?: string; endCursor?: string; hasNextPage?: boolean; hasPreviousPage?: boolean; } export interface Subscribe< Data, Delta, Variables extends OperationVariables = OperationVariables > { ( callback: UpdateCallback, client: ApolloClient, variables?: Variables ): { unsubscribe: () => void; reload: Reload; flush: () => void; load?: Load; }; } // eslint-disable-next-line @typescript-eslint/no-explicit-any export type Query = DocumentNode | TypedDocumentNode; export interface Update< Data, Delta, Variables extends OperationVariables = OperationVariables > { ( data: Data | null, delta: Delta, reload: Reload, variables?: Variables ): Data; } export interface Append { ( data: Data | null, insertionData: Data | null, insertionPageInfo: PageInfo | null, pagination?: Pagination, totalCount?: number ): { data: Data | null; totalCount?: number; }; } interface GetData { (queryData: QueryData | null, variables?: Variables): Data | null; } interface GetPageInfo { (queryData: QueryData): PageInfo | null; } interface GetTotalCount { (queryData: QueryData): number | undefined; } interface GetDelta { (subscriptionData: SubscriptionData, variables?: Variables): Delta; } export type Node = { id: string }; export type Cursor = { cursor?: string | null; }; export interface Edge extends Cursor { node: T; } export function defaultAppend( data: Data | null, insertionData: Data | null, insertionPageInfo: PageInfo | null, pagination?: Pagination, totalCount?: number ) { if (data && insertionData && insertionPageInfo) { if (!(data instanceof Array) || !(insertionData instanceof Array)) { throw new Error( 'data needs to be instance of Edge[] when using pagination' ); } if (pagination?.after) { const cursors = data.map((item) => item && item.cursor); const startIndex = cursors.lastIndexOf(pagination.after); if (startIndex !== -1) { const start = startIndex + 1 + (pagination.skip ?? 0); const end = start + insertionData.length; let updatedData = [ ...data.slice(0, start), ...insertionData, ...data.slice(end), ]; if (!insertionPageInfo.hasNextPage && end !== (totalCount ?? 0)) { // adjust totalCount if last page is shorter or longer than expected totalCount = end; updatedData = updatedData.slice(0, end); } return { data: updatedData, // increase totalCount if last page is longer than expected totalCount: totalCount && Math.max(updatedData.length, totalCount), }; } } } return { data, totalCount }; } interface DataProviderParams< QueryData, Data, SubscriptionData, Delta, Variables extends OperationVariables = OperationVariables > { query: Query; subscriptionQuery?: Query; update?: Update; getData: GetData; getDelta?: GetDelta; pagination?: { getPageInfo: GetPageInfo; getTotalCount?: GetTotalCount; append: Append; first: number; }; fetchPolicy?: FetchPolicy; resetDelay?: number; additionalContext?: Record; errorPolicyGuard?: (graphqlErrors: GraphQLErrors) => boolean; } /** * @param subscriptionQuery query that will be used for subscription * @param update function that will be executed on each onNext, it should update data base on delta, it can reload data provider * @param getData transforms received query data to format that will be stored in data provider * @param getDelta transforms delta data to format that will be stored in data provider * @param fetchPolicy * @param resetDelay * @param additionalContext add property to the context of the query, ie. 'isEnlargedTimeout' * @param errorPolicyGuard indicate which gql errors can be tolerate * @returns subscribe function */ function makeDataProviderInternal< QueryData, Data, SubscriptionData, Delta, Variables extends OperationVariables = OperationVariables >({ query, subscriptionQuery, update, getData, getDelta, pagination, fetchPolicy, resetDelay, additionalContext, errorPolicyGuard, }: DataProviderParams< QueryData, Data, SubscriptionData, Delta, Variables >): Subscribe { // list of callbacks passed through subscribe call const callbacks: UpdateCallback[] = []; // subscription is started before initial query, all deltas that will arrive before initial query response are put on queue const updateQueue: Delta[] = []; let resetTimer: ReturnType; let variables: Variables | undefined; let data: Data | null = null; let error: Error | undefined; let loading = true; let loaded = false; let client: ApolloClient; let subscription: Subscription | undefined; let pageInfo: PageInfo | null = null; let totalCount: number | undefined; // notify single callback about current state, delta is passes optionally only if notify was invoked onNext const notify = ( callback: UpdateCallback, updateData?: UpdateData ) => { callback({ data, error, loading, loaded, pageInfo, totalCount, ...updateData, }); }; // notify all callbacks const notifyAll = (updateData?: UpdateData) => { callbacks.forEach((callback) => notify(callback, updateData)); }; const call = ( pagination?: Pagination, policy?: ErrorPolicy ): Promise> => client .query({ query, variables: { ...variables, ...(pagination && { pagination }) }, fetchPolicy: fetchPolicy || 'no-cache', context: additionalContext, errorPolicy: policy || 'none', }) .catch((err) => { if ( err.graphQLErrors && errorPolicyGuard && errorPolicyGuard(err.graphQLErrors) ) { return call(pagination, 'ignore'); } else { throw err; } }); const load = async (start?: number, end?: number) => { if (!pagination) { return Promise.reject(); } const paginationVariables: Pagination = { first: pagination.first, after: pageInfo?.endCursor, }; if (start !== undefined && data instanceof Array) { if (!start) { paginationVariables.after = undefined; } else if (data && data[start - 1]) { paginationVariables.after = (data[start - 1] as Cursor).cursor; } else { let skip = 1; while (!data[start - 1 - skip] && skip <= start) { skip += 1; } paginationVariables.skip = skip; if (skip === start) { paginationVariables.after = undefined; } else { paginationVariables.after = (data[start - 1 - skip] as Cursor).cursor; } } } else if (!pageInfo?.hasNextPage) { return null; } const res = await call(paginationVariables); const insertionData = getData(res.data, variables); const insertionPageInfo = pagination.getPageInfo(res.data); ({ data, totalCount } = pagination.append( data, insertionData, insertionPageInfo, paginationVariables, totalCount )); pageInfo = insertionPageInfo; totalCount = (pagination.getTotalCount && pagination.getTotalCount(res.data)) ?? totalCount; notifyAll({ insertionData, isInsert: true }); return insertionData; }; const setData = (updatedData: Data | null) => { data = updatedData; if (totalCount !== undefined && data instanceof Array) { totalCount = data.length; } }; const initialFetch = async () => { if (!client) { return; } const paginationVariables = pagination ? { first: pagination.first } : undefined; try { const res = await call(paginationVariables); data = getData(res.data, variables); if (data && pagination) { if (!(data instanceof Array)) { throw new Error( 'data needs to be instance of Edge[] when using pagination' ); } pageInfo = pagination.getPageInfo(res.data); if (pageInfo && !pageInfo.hasNextPage) { totalCount = data.length; } else { totalCount = pagination.getTotalCount && pagination.getTotalCount(res.data); } if (data && totalCount && data.length < totalCount) { data.push(...new Array(totalCount - data.length).fill(null)); } } // if there was some updates received from subscription during initial query loading apply them on just received data if (update && data && updateQueue && updateQueue.length > 0) { while (updateQueue.length) { const delta = updateQueue.shift(); if (delta) { setData(update(data, delta, reload, variables)); if (totalCount !== undefined && data instanceof Array) { totalCount = data.length; } } } } loaded = true; } catch (e) { if (isNotFoundGraphQLError(e as Error, ['party'])) { data = getData(null, variables); loaded = true; return; } // if error will occur data provider stops subscription error = e as Error; if (subscription) { subscription.unsubscribe(); } subscription = undefined; } finally { loading = false; notifyAll(); } }; // reload function is passed to update and as a returned by subscribe function const reload = (forceReset = false) => { if (loading) { return; } // hard reset on demand or when there is no apollo subscription yet if (forceReset || !subscription) { reset(); initialize(); } else { loading = true; error = undefined; initialFetch(); } }; const onNext = ({ data: subscriptionData, }: FetchResult) => { if (!subscriptionData || !getDelta || !update) { return; } const delta = getDelta(subscriptionData, variables); if (loading) { updateQueue.push(delta); } else { const updatedData = update(data, delta, reload, variables); if (updatedData === data) { return; } setData(updatedData); notifyAll({ delta, isUpdate: true }); } }; const onError = (e: Error) => { error = e; if (subscription) { subscription.unsubscribe(); subscription = undefined; } notifyAll(); }; const initialize = async () => { if (subscription) { if (resetTimer) { clearTimeout(resetTimer); } return; } loading = true; error = undefined; notifyAll(); if (!client) { return; } if (subscriptionQuery && getDelta && update) { subscription = client .subscribe({ query: subscriptionQuery, variables, fetchPolicy, }) .subscribe(onNext, onError); } await initialFetch(); }; const reset = () => { if (!subscription) { return; } subscription.unsubscribe(); subscription = undefined; data = null; error = undefined; loading = false; loaded = false; notifyAll(); }; // remove callback from list, and unsubscribe if there is no more callbacks registered const unsubscribe = (callback: UpdateCallback) => { callbacks.splice(callbacks.indexOf(callback), 1); if (callbacks.length === 0) { if (resetDelay) { resetTimer = setTimeout(reset, resetDelay); } else { reset(); } } }; return (callback, c, v) => { callbacks.push(callback); if (callbacks.length === 1) { client = c; variables = v; initialize(); } else { notify(callback); } return { unsubscribe: () => unsubscribe(callback), reload, flush: () => notify(callback), load, }; }; } /** * Memoizes data provider instances using query variables as cache key * * @param fn * @returns subscibe function */ const memoize = < Data, Delta, Variables extends OperationVariables = OperationVariables >( fn: () => Subscribe ) => { const cache: { subscribe: Subscribe; variables?: Variables; }[] = []; return (variables?: Variables) => { const cached = cache.find((c) => isEqual(c.variables, variables)); if (cached) { return cached.subscribe; } const subscribe = fn(); cache.push({ subscribe, variables }); return subscribe; }; }; /** * @param query Query * @param subscriptionQuery Query query that will be used for subscription * @param update Update function that will be executed on each onNext, it should update data base on delta, it can reload data provider * @param getData transforms received query data to format that will be stored in data provider * @param getDelta transforms delta data to format that will be stored in data provider * @param pagination pagination related functions { getPageInfo, getTotalCount, append, first } * @returns Subscribe subscribe function * @example * const marketMidPriceProvider = makeDataProvider({ * query: gql`query MarketMidPrice($marketId: ID!) { market(id: $marketId) { data { midPrice } } }`, * subscriptionQuery: gql`subscription MarketMidPriceSubscription($marketId: ID!) { marketDepthUpdate(marketId: $marketId) { market { data { midPrice } } } }`, * update: (draft: Draft, delta: Delta, reload: Reload) => { draft.midPrice = delta.midPrice } * getData: (data:QueryData) => data.market.data.midPrice * getDelta: (delta:SubscriptionData) => delta.marketData.market * }) * * const { unsubscribe, flush, reload } = marketMidPriceProvider( * ({ data, error, loading, delta }) => { ... }, * apolloClient, * { id: '1fd726454fa1220038acbf6ff9ac701d8b8bf3f2d77c93a4998544471dc58747' } * ) * */ export function makeDataProvider< QueryData, Data, SubscriptionData, Delta, Variables extends OperationVariables = OperationVariables >( params: DataProviderParams< QueryData, Data, SubscriptionData, Delta, Variables > ): Subscribe { const getInstance = memoize(() => makeDataProviderInternal(params) ); return (callback, client, variables) => getInstance(variables)(callback, client, variables); } /** * Dependency subscribe needs to use any as Data and Delta because it's unknown what dependencies will be used. * This effects in parts in combine function has any[] type */ type DependencySubscribe< Variables extends OperationVariables = OperationVariables > = Subscribe; // eslint-disable-line @typescript-eslint/no-explicit-any type DependencyUpdateCallback< Variables extends OperationVariables = OperationVariables > = Parameters>['0']; export type DerivedPart< Variables extends OperationVariables = OperationVariables > = Parameters>['0']; export type CombineDerivedData< Data, Variables extends OperationVariables = OperationVariables > = ( data: DerivedPart['data'][], variables: Variables | undefined, prevData: Data | null ) => Data | null; export type CombineDerivedDelta< Data, Delta, Variables extends OperationVariables = OperationVariables > = ( data: Data, parts: DerivedPart[], previousData: Data | null, variables?: Variables ) => Delta | undefined; export type CombineInsertionData< Data, Variables extends OperationVariables = OperationVariables > = ( data: Data, parts: DerivedPart[], variables?: Variables ) => Data | undefined; function makeDerivedDataProviderInternal< Data, Delta, Variables extends OperationVariables = OperationVariables >( dependencies: DependencySubscribe[], combineData: CombineDerivedData, combineDelta?: CombineDerivedDelta, combineInsertionData?: CombineInsertionData ): Subscribe { let subscriptions: ReturnType[] | undefined; let client: ApolloClient; const callbacks: UpdateCallback[] = []; let variables: Variables | undefined; const parts: DerivedPart[] = []; let data: Data | null = null; let error: Error | undefined; let loading = true; let loaded = false; const notify = ( callback: UpdateCallback, updateData?: UpdateData ) => { callback({ data, error, loading, loaded, pageInfo: parts[0]?.pageInfo || null, ...updateData, }); }; // notify all callbacks const notifyAll = (updateData?: UpdateData) => callbacks.forEach((callback) => { notify(callback, updateData); }); const combine = (updatedPartIndex: number) => { let delta: Delta | undefined; let isUpdate = false; let isInsert = false; let insertionData: Data | undefined; let newError: Error | undefined; let newLoading = false; let newLoaded = true; dependencies .map((dependency, i) => parts[i]) .forEach((part) => { newError = newError || (part && part.error); newLoading = newLoading || !part || part.loading; newLoaded = newLoaded && part && part.loaded; }); const newData = newLoaded ? combineData( parts.map((part) => part.data), variables, data ) : data; if ( newLoading !== loading || newError !== error || newLoaded !== loaded || newData !== data ) { loading = newLoading; error = newError; loaded = newLoaded; const previousData = data; data = newData; if (loaded) { const updatedPart = parts[updatedPartIndex]; if (updatedPart.isUpdate) { isUpdate = true; if (combineDelta && data) { delta = combineDelta(data, parts, previousData, variables); } delete updatedPart.isUpdate; delete updatedPart.delta; } if (updatedPart.isInsert) { isInsert = updatedPartIndex === 0; if (updatedPart.insertionData && combineInsertionData && data) { insertionData = combineInsertionData(data, parts, variables); } delete updatedPart.insertionData; delete updatedPart.isInsert; } } notifyAll({ isUpdate, isInsert, delta, insertionData, }); } }; const initialize = () => { if (subscriptions) { return; } subscriptions = dependencies.map((dependency, i) => dependency( (updateData) => { parts[i] = updateData; combine(i); }, client, variables ) ); }; // remove callback from list, and unsubscribe if there is no more callbacks registered const unsubscribe = (callback: UpdateCallback) => { callbacks.splice(callbacks.indexOf(callback), 1); if (callbacks.length === 0) { subscriptions?.forEach((subscription) => subscription.unsubscribe()); subscriptions = undefined; data = null; error = undefined; loading = true; loaded = false; } }; return (callback, c, v) => { callbacks.push(callback); if (callbacks.length === 1) { client = c; variables = v; initialize(); } else { notify(callback); } return { unsubscribe: () => unsubscribe(callback), reload: (forceReset) => subscriptions && subscriptions.forEach((subscription) => subscription.reload(forceReset) ), flush: () => notify(callback), load: subscriptions && subscriptions[0]?.load, }; }; } export function makeDerivedDataProvider< Data, Delta, Variables extends OperationVariables = OperationVariables >( dependencies: DependencySubscribe[], combineData: CombineDerivedData, combineDelta?: CombineDerivedDelta, combineInsertionData?: CombineInsertionData ): Subscribe { const getInstance = memoize(() => makeDerivedDataProviderInternal( dependencies, combineData, combineDelta, combineInsertionData ) ); return (callback, client, variables) => getInstance(variables)(callback, client, variables); }