From e14cca31bab59fa76d48ae1eca5772533ef42e4e Mon Sep 17 00:00:00 2001 From: Richard Moore Date: Thu, 29 Sep 2022 21:53:25 -0400 Subject: [PATCH] Refactor provider model for better socket provider support. --- src.ts/providers/provider-infura.ts | 45 ++- src.ts/providers/provider-jsonrpc.ts | 369 ++++++++++++++----------- src.ts/providers/provider-socket.ts | 13 +- src.ts/providers/provider-websocket.ts | 15 +- 4 files changed, 269 insertions(+), 173 deletions(-) diff --git a/src.ts/providers/provider-infura.ts b/src.ts/providers/provider-infura.ts index 2c9e60495..0c274dc7c 100644 --- a/src.ts/providers/provider-infura.ts +++ b/src.ts/providers/provider-infura.ts @@ -1,10 +1,11 @@ import { - defineProperties, FetchRequest, throwArgumentError + defineProperties, FetchRequest, throwArgumentError, throwError } from "../utils/index.js"; import { showThrottleMessage } from "./community.js"; import { Network } from "./network.js"; import { JsonRpcProvider } from "./provider-jsonrpc.js"; +import { WebSocketProvider } from "./provider-websocket.js"; import type { AbstractProvider } from "./abstract-provider.js"; import type { CommunityResourcable } from "./community.js"; @@ -15,7 +16,7 @@ const defaultProjectId = "84842078b09946638c03157f83405213"; function getHost(name: string): string { switch(name) { - case "homestead": + case "mainnet": return "mainnet.infura.io"; case "ropsten": return "ropsten.infura.io"; @@ -42,12 +43,39 @@ function getHost(name: string): string { return throwArgumentError("unsupported network", "network", name); } +export class InfuraWebSocketProvider extends WebSocketProvider implements CommunityResourcable { + readonly projectId!: string; + readonly projectSecret!: null | string; + + constructor(network?: Networkish, apiKey?: any) { + const provider = new InfuraProvider(network, apiKey); + + const req = provider._getConnection(); + if (req.credentials) { + throwError("INFURA WebSocket project secrets unsupported", "UNSUPPORTED_OPERATION", { + operation: "InfuraProvider.getWebSocketProvider()" + }); + } + + const url = req.url.replace(/^http/i, "ws").replace("/v3/", "/ws/v3/"); + super(url, network); + + defineProperties(this, { + projectId: provider.projectId, + projectSecret: provider.projectSecret + }); + } + + isCommunityResource(): boolean { + return (this.projectId === defaultProjectId); + } +} export class InfuraProvider extends JsonRpcProvider implements CommunityResourcable { readonly projectId!: string; readonly projectSecret!: null | string; - constructor(_network: Networkish = "homestead", projectId?: null | string, projectSecret?: null | string) { + constructor(_network: Networkish = "mainnet", projectId?: null | string, projectSecret?: null | string) { const network = Network.from(_network); if (projectId == null) { projectId = defaultProjectId; } if (projectSecret == null) { projectSecret = null; } @@ -65,6 +93,14 @@ export class InfuraProvider extends JsonRpcProvider implements CommunityResourca return super._getProvider(chainId); } + isCommunityResource(): boolean { + return (this.projectId === defaultProjectId); + } + + static getWebSocketProvider(network?: Networkish, apiKey?: any): InfuraWebSocketProvider { + return new InfuraWebSocketProvider(network, apiKey); + } + static getRequest(network: Network, projectId?: null | string, projectSecret?: null | string): FetchRequest { if (projectId == null) { projectId = defaultProjectId; } if (projectSecret == null) { projectSecret = null; } @@ -83,7 +119,4 @@ export class InfuraProvider extends JsonRpcProvider implements CommunityResourca return request; } - isCommunityResource(): boolean { - return (this.projectId === defaultProjectId); - } } diff --git a/src.ts/providers/provider-jsonrpc.ts b/src.ts/providers/provider-jsonrpc.ts index 96558ec9e..8fc6454eb 100644 --- a/src.ts/providers/provider-jsonrpc.ts +++ b/src.ts/providers/provider-jsonrpc.ts @@ -355,86 +355,22 @@ export class JsonRpcApiProvider extends AbstractProvider { #options: Required; + // The next ID to use for the JSON-RPC ID field #nextId: number; + + // Payloads are queued and triggered in batches using the drainTimer #payloads: Array; - - #ready: boolean; - #starting: null | Promise; - #drainTimer: null | NodeJS.Timer; + #notReady: null | { + promise: Promise, + resolve: null | ((v: void) => void) + }; + #network: null | Network; - constructor(network?: Networkish, options?: JsonRpcApiProviderOptions) { - super(network); - - this.#ready = false; - this.#starting = null; - - this.#nextId = 1; - this.#options = Object.assign({ }, defaultOptions, options || { }); - - this.#payloads = [ ]; - this.#drainTimer = null; - - this.#network = null; - - // This could be relaxed in the future to just check equivalent networks - const staticNetwork = this._getOption("staticNetwork"); - if (staticNetwork) { - if (staticNetwork !== network) { - throwArgumentError("staticNetwork MUST match network object", "options", options); - } - this.#network = staticNetwork; - } - } - - /** - * Returns the value associated with the option %%key%%. - * - * Sub-classes can use this to inquire about configuration options. - */ - _getOption(key: K): JsonRpcApiProviderOptions[K] { - return this.#options[key]; - } - - get _network(): Network { - if (!this.#network) { - throwError("network is not available yet", "NETWORK_ERROR"); - } - - return this.#network; - } - - get ready(): boolean { return this.#ready; } - - async _start(): Promise { - if (this.#ready) { return; } - if (this.#starting) { return this.#starting; } - - this.#starting = (async () => { - - // Bootstrap the network - if (this.#network == null) { - try { - this.#network = await this._detectNetwork(); - } catch (error) { - console.log("JsonRpcProvider failed to startup; retry in 1s"); - await stall(1000); - this.#starting = null; - } - } - - this.#ready = true; - this.#starting = null; - - // Start dispatching requests - this.#scheduleDrain(); - })(); - } - #scheduleDrain(): void { - if (this.#drainTimer || !this.ready) { return; } + if (this.#drainTimer) { return; } // If we aren't using batching, no hard in sending it immeidately const stallTime = (this._getOption("batchMaxCount") === 1) ? 0: this._getOption("batchStallTime"); @@ -503,34 +439,54 @@ export class JsonRpcApiProvider extends AbstractProvider { }, stallTime); } - /** - * Requests the %%method%% with %%params%% via the JSON-RPC protocol - * over the underlying channel. This can be used to call methods - * on the backend that do not have a high-level API within the Provider - * API. - * - * This method queues requests according to the batch constraints - * in the options, assigns the request a unique ID. - * - * **Do NOT override** this method in sub-classes; instead - * override [[_send]] or force the options values in the - * call to the constructor to modify this method's behavior. - */ - send(method: string, params: Array | Record): Promise { - // @TODO: cache chainId?? purge on switch_networks + constructor(network?: Networkish, options?: JsonRpcApiProviderOptions) { + super(network); - const id = this.#nextId++; - const promise = new Promise((resolve, reject) => { - this.#payloads.push({ - resolve, reject, - payload: { method, params, id, jsonrpc: "2.0" } + this.#nextId = 1; + this.#options = Object.assign({ }, defaultOptions, options || { }); + + this.#payloads = [ ]; + this.#drainTimer = null; + + this.#network = null; + + { + let resolve: null | ((value: void) => void) = null; + const promise = new Promise((_resolve: (value: void) => void) => { + resolve = _resolve; }); - }); + this.#notReady = { promise, resolve }; + } - // If there is not a pending drainTimer, set one - this.#scheduleDrain(); + // This could be relaxed in the future to just check equivalent networks + const staticNetwork = this._getOption("staticNetwork"); + if (staticNetwork) { + if (staticNetwork !== network) { + throwArgumentError("staticNetwork MUST match network object", "options", options); + } + this.#network = staticNetwork; + } + } - return >promise; + /** + * Returns the value associated with the option %%key%%. + * + * Sub-classes can use this to inquire about configuration options. + */ + _getOption(key: K): JsonRpcApiProviderOptions[K] { + return this.#options[key]; + } + + /** + * Gets the [[Network]] this provider has committed to. On each call, the network + * is detected, and if it has changed, the call will reject. + */ + get _network(): Network { + if (!this.#network) { + throwError("network is not available yet", "NETWORK_ERROR"); + } + + return this.#network; } /** @@ -544,49 +500,46 @@ export class JsonRpcApiProvider extends AbstractProvider { }); } + /** - * Resolves to the [[Signer]] account for %%address%% managed by - * the client. + * Resolves to the non-normalized value by performing %%req%%. * - * If the %%address%% is a number, it is used as an index in the - * the accounts from [[listAccounts]]. - * - * This can only be used on clients which manage accounts (such as - * Geth with imported account or MetaMask). - * - * Throws if the account doesn't exist. + * Sub-classes may override this to modify behavior of actions, + * and should generally call ``super._perform`` as a fallback. */ - async getSigner(address: number | string = 0): Promise { - - const accountsPromise = this.send("eth_accounts", [ ]); - - // Account index - if (typeof(address) === "number") { - const accounts = >(await accountsPromise); - if (address > accounts.length) { throw new Error("no such account"); } - return new JsonRpcSigner(this, accounts[address]); - } - - const { accounts } = await resolveProperties({ - network: this.getNetwork(), - accounts: accountsPromise - }); - - // Account address - address = getAddress(address); - for (const account of accounts) { - if (getAddress(account) === account) { - return new JsonRpcSigner(this, account); + async _perform(req: PerformActionRequest): Promise { + // Legacy networks do not like the type field being passed along (which + // is fair), so we delete type if it is 0 and a non-EIP-1559 network + if (req.method === "call" || req.method === "estimateGas") { + let tx = req.transaction; + if (tx && tx.type != null && getBigInt(tx.type)) { + // If there are no EIP-1559 properties, it might be non-EIP-a559 + if (tx.maxFeePerGas == null && tx.maxPriorityFeePerGas == null) { + const feeData = await this.getFeeData(); + if (feeData.maxFeePerGas == null && feeData.maxPriorityFeePerGas == null) { + // Network doesn't know about EIP-1559 (and hence type) + req = Object.assign({ }, req, { + transaction: Object.assign({ }, tx, { type: undefined }) + }); + } + } } } - throw new Error("invalid account"); + const request = this.getRpcRequest(req); + + if (request != null) { + return await this.send(request.method, request.args); + } + + return super._perform(req); } - /** Sub-classes can override this; it detects the *actual* network that + /** Sub-classes may override this; it detects the *actual* network that * we are **currently** connected to. * - * Keep in mind that [[send]] may only be used once [[ready]]. + * Keep in mind that [[send]] may only be used once [[ready]], otherwise the + * _send primitive must be used instead. */ async _detectNetwork(): Promise { const network = this._getOption("staticNetwork"); @@ -604,7 +557,15 @@ export class JsonRpcApiProvider extends AbstractProvider { }; this.emit("debug", { action: "sendRpcPayload", payload }); - const result = (await this._send(payload))[0]; + + let result: JsonRpcResult | JsonRpcError; + try { + result = (await this._send(payload))[0]; + } catch (error) { + this.emit("debug", { action: "receiveRpcError", error }); + throw error; + } + this.emit("debug", { action: "receiveRpcResult", result }); if ("result" in result) { @@ -614,10 +575,51 @@ export class JsonRpcApiProvider extends AbstractProvider { throw this.getRpcError(payload, result); } + /** + * Sub-classes **MUST** call this. Until [[_start]] has been called, no calls + * will be passed to [[_send]] from [[send]]. If it is overridden, then + * ``super._start()`` **MUST** be called. + * + * Calling it multiple times is safe and has no effect. + */ + _start(): void { + if (this.#notReady == null || this.#notReady.resolve == null) { return; } + + this.#notReady.resolve(); + this.#notReady = null; + + (async () => { + + // Bootstrap the network + while (this.#network == null) { + try { + this.#network = await this._detectNetwork(); + } catch (error) { + console.log("JsonRpcProvider failed to startup; retry in 1s"); + await stall(1000); + } + } + + // Start dispatching requests + this.#scheduleDrain(); + })(); + } + + /** + * Resolves once the [[_start]] has been called. This can be used in + * sub-classes to defer sending data until the connection has been + * established. + */ + async _waitUntilReady(): Promise { + if (this.#notReady == null) { return; } + return await this.#notReady.promise; + } + + /** * Return a Subscriber that will manage the %%sub%%. * - * Sub-classes can override this to modify the behavior of + * Sub-classes may override this to modify the behavior of * subscription management. */ _getSubscriber(sub: Subscription): Subscriber { @@ -637,6 +639,11 @@ export class JsonRpcApiProvider extends AbstractProvider { return super._getSubscriber(sub); } + /** + * Returns true only if the [[_start]] has been called. + */ + get ready(): boolean { return this.#notReady == null; } + /** * Returns %%tx%% as a normalized JSON-RPC transaction request, * which has all values hexlified and any numeric values converted @@ -700,7 +707,7 @@ export class JsonRpcApiProvider extends AbstractProvider { args: [ getLowerCase(req.address), req.blockTag ] }; - case "getStorageAt": + case "getStorage": return { method: "eth_getStorageAt", args: [ @@ -836,38 +843,74 @@ export class JsonRpcApiProvider extends AbstractProvider { return makeError("could not coalesce error", "UNKNOWN_ERROR", { error }); } + /** - * Resolves to the non-normalized value by performing %%req%%. + * Requests the %%method%% with %%params%% via the JSON-RPC protocol + * over the underlying channel. This can be used to call methods + * on the backend that do not have a high-level API within the Provider + * API. * - * Sub-classes may override this to modify behavior of actions, - * and should generally call ``super._perform`` as a fallback. + * This method queues requests according to the batch constraints + * in the options, assigns the request a unique ID. + * + * **Do NOT override** this method in sub-classes; instead + * override [[_send]] or force the options values in the + * call to the constructor to modify this method's behavior. */ - async _perform(req: PerformActionRequest): Promise { - // Legacy networks do not like the type field being passed along (which - // is fair), so we delete type if it is 0 and a non-EIP-1559 network - if (req.method === "call" || req.method === "estimateGas") { - let tx = req.transaction; - if (tx && tx.type != null && getBigInt(tx.type)) { - // If there are no EIP-1559 properties, it might be non-EIP-a559 - if (tx.maxFeePerGas == null && tx.maxPriorityFeePerGas == null) { - const feeData = await this.getFeeData(); - if (feeData.maxFeePerGas == null && feeData.maxPriorityFeePerGas == null) { - // Network doesn't know about EIP-1559 (and hence type) - req = Object.assign({ }, req, { - transaction: Object.assign({ }, tx, { type: undefined }) - }); - } - } + send(method: string, params: Array | Record): Promise { + // @TODO: cache chainId?? purge on switch_networks + + const id = this.#nextId++; + const promise = new Promise((resolve, reject) => { + this.#payloads.push({ + resolve, reject, + payload: { method, params, id, jsonrpc: "2.0" } + }); + }); + + // If there is not a pending drainTimer, set one + this.#scheduleDrain(); + + return >promise; + } + + /** + * Resolves to the [[Signer]] account for %%address%% managed by + * the client. + * + * If the %%address%% is a number, it is used as an index in the + * the accounts from [[listAccounts]]. + * + * This can only be used on clients which manage accounts (such as + * Geth with imported account or MetaMask). + * + * Throws if the account doesn't exist. + */ + async getSigner(address: number | string = 0): Promise { + + const accountsPromise = this.send("eth_accounts", [ ]); + + // Account index + if (typeof(address) === "number") { + const accounts = >(await accountsPromise); + if (address > accounts.length) { throw new Error("no such account"); } + return new JsonRpcSigner(this, accounts[address]); + } + + const { accounts } = await resolveProperties({ + network: this.getNetwork(), + accounts: accountsPromise + }); + + // Account address + address = getAddress(address); + for (const account of accounts) { + if (getAddress(account) === account) { + return new JsonRpcSigner(this, account); } } - const request = this.getRpcRequest(req); - - if (request != null) { - return await this.send(request.method, request.args); - } - - return super._perform(req); + throw new Error("invalid account"); } } @@ -897,10 +940,14 @@ export class JsonRpcProvider extends JsonRpcApiProvider { this.#pollingInterval = 4000; } + _getConnection(): FetchRequest { + return this.#connect.clone(); + } + async send(method: string, params: Array | Record): Promise { // All requests are over HTTP, so we can just start handling requests // We do this here rather than the constructor so that we don't send any - // requests to the network until we absolutely have to. + // requests to the network (i.e. eth_chainId) until we absolutely have to. await this._start(); return await super.send(method, params); @@ -908,7 +955,7 @@ export class JsonRpcProvider extends JsonRpcApiProvider { async _send(payload: JsonRpcPayload | Array): Promise> { // Configure a POST connection for the requested method - const request = this.#connect.clone(); + const request = this._getConnection(); request.body = JSON.stringify(payload); const response = await request.send(); diff --git a/src.ts/providers/provider-socket.ts b/src.ts/providers/provider-socket.ts index 8220fd44d..d389b15b6 100644 --- a/src.ts/providers/provider-socket.ts +++ b/src.ts/providers/provider-socket.ts @@ -10,7 +10,7 @@ */ import { UnmanagedSubscriber } from "./abstract-provider.js"; -import { assertArgument, makeError, throwError } from "../utils/index.js"; +import { assertArgument, throwError } from "../utils/index.js"; import { JsonRpcApiProvider } from "./provider-jsonrpc.js"; import type { Subscriber, Subscription } from "./abstract-provider.js"; @@ -196,10 +196,16 @@ export class SocketProvider extends JsonRpcApiProvider { assertArgument(!Array.isArray(payload), "WebSocket does not support batch send", "payload", payload); // @TODO: stringify payloads here and store to prevent mutations + + // Prepare a promise to respond to const promise = new Promise((resolve, reject) => { this.#callbacks.set(payload.id, { payload, resolve, reject }); }); + // Wait until the socket is connected before writing to it + await this._waitUntilReady(); + + // Write the request to the socket await this._write(JSON.stringify(payload)); return >[ await promise ]; @@ -232,6 +238,9 @@ export class SocketProvider extends JsonRpcApiProvider { } this.#callbacks.delete(result.id); + callback.resolve(result); + +/* if ("error" in result) { const { message, code, data } = result.error; const error = makeError(message || "unkonwn error", "SERVER_ERROR", { @@ -242,7 +251,7 @@ export class SocketProvider extends JsonRpcApiProvider { } else { callback.resolve(result.result); } - +*/ } else if (result.method === "eth_subscription") { const filterId = result.params.subscription; const subscriber = this.#subs.get(filterId); diff --git a/src.ts/providers/provider-websocket.ts b/src.ts/providers/provider-websocket.ts index 61d29a772..cde173b2b 100644 --- a/src.ts/providers/provider-websocket.ts +++ b/src.ts/providers/provider-websocket.ts @@ -18,10 +18,11 @@ export interface WebSocketLike { } export class WebSocketProvider extends SocketProvider { - url!: string; - - #websocket: WebSocketLike; - get websocket(): WebSocketLike { return this.#websocket; } + #websocket: null | WebSocketLike; + get websocket(): WebSocketLike { + if (this.#websocket == null) { throw new Error("websocket closed"); } + return this.#websocket; + } constructor(url: string | WebSocketLike, network?: Networkish) { super(network); @@ -48,4 +49,10 @@ export class WebSocketProvider extends SocketProvider { async _write(message: string): Promise { this.websocket.send(message); } + + async destroy(): Promise { + if (this.#websocket == null) { return; } + this.#websocket.close(); + this.#websocket = null; + } }