Refactor provider model for better socket provider support.

This commit is contained in:
Richard Moore 2022-09-29 21:53:25 -04:00
parent a6faed5098
commit e14cca31ba
4 changed files with 269 additions and 173 deletions

@ -1,10 +1,11 @@
import {
defineProperties, FetchRequest, throwArgumentError
defineProperties, FetchRequest, throwArgumentError, throwError
} from "../utils/index.js";
import { showThrottleMessage } from "./community.js";
import { Network } from "./network.js";
import { JsonRpcProvider } from "./provider-jsonrpc.js";
import { WebSocketProvider } from "./provider-websocket.js";
import type { AbstractProvider } from "./abstract-provider.js";
import type { CommunityResourcable } from "./community.js";
@ -15,7 +16,7 @@ const defaultProjectId = "84842078b09946638c03157f83405213";
function getHost(name: string): string {
switch(name) {
case "homestead":
case "mainnet":
return "mainnet.infura.io";
case "ropsten":
return "ropsten.infura.io";
@ -42,12 +43,39 @@ function getHost(name: string): string {
return throwArgumentError("unsupported network", "network", name);
}
export class InfuraWebSocketProvider extends WebSocketProvider implements CommunityResourcable {
readonly projectId!: string;
readonly projectSecret!: null | string;
constructor(network?: Networkish, apiKey?: any) {
const provider = new InfuraProvider(network, apiKey);
const req = provider._getConnection();
if (req.credentials) {
throwError("INFURA WebSocket project secrets unsupported", "UNSUPPORTED_OPERATION", {
operation: "InfuraProvider.getWebSocketProvider()"
});
}
const url = req.url.replace(/^http/i, "ws").replace("/v3/", "/ws/v3/");
super(url, network);
defineProperties<InfuraWebSocketProvider>(this, {
projectId: provider.projectId,
projectSecret: provider.projectSecret
});
}
isCommunityResource(): boolean {
return (this.projectId === defaultProjectId);
}
}
export class InfuraProvider extends JsonRpcProvider implements CommunityResourcable {
readonly projectId!: string;
readonly projectSecret!: null | string;
constructor(_network: Networkish = "homestead", projectId?: null | string, projectSecret?: null | string) {
constructor(_network: Networkish = "mainnet", projectId?: null | string, projectSecret?: null | string) {
const network = Network.from(_network);
if (projectId == null) { projectId = defaultProjectId; }
if (projectSecret == null) { projectSecret = null; }
@ -65,6 +93,14 @@ export class InfuraProvider extends JsonRpcProvider implements CommunityResourca
return super._getProvider(chainId);
}
isCommunityResource(): boolean {
return (this.projectId === defaultProjectId);
}
static getWebSocketProvider(network?: Networkish, apiKey?: any): InfuraWebSocketProvider {
return new InfuraWebSocketProvider(network, apiKey);
}
static getRequest(network: Network, projectId?: null | string, projectSecret?: null | string): FetchRequest {
if (projectId == null) { projectId = defaultProjectId; }
if (projectSecret == null) { projectSecret = null; }
@ -83,7 +119,4 @@ export class InfuraProvider extends JsonRpcProvider implements CommunityResourca
return request;
}
isCommunityResource(): boolean {
return (this.projectId === defaultProjectId);
}
}

@ -355,86 +355,22 @@ export class JsonRpcApiProvider extends AbstractProvider {
#options: Required<JsonRpcApiProviderOptions>;
// The next ID to use for the JSON-RPC ID field
#nextId: number;
// Payloads are queued and triggered in batches using the drainTimer
#payloads: Array<Payload>;
#ready: boolean;
#starting: null | Promise<void>;
#drainTimer: null | NodeJS.Timer;
#notReady: null | {
promise: Promise<void>,
resolve: null | ((v: void) => void)
};
#network: null | Network;
constructor(network?: Networkish, options?: JsonRpcApiProviderOptions) {
super(network);
this.#ready = false;
this.#starting = null;
this.#nextId = 1;
this.#options = Object.assign({ }, defaultOptions, options || { });
this.#payloads = [ ];
this.#drainTimer = null;
this.#network = null;
// This could be relaxed in the future to just check equivalent networks
const staticNetwork = this._getOption("staticNetwork");
if (staticNetwork) {
if (staticNetwork !== network) {
throwArgumentError("staticNetwork MUST match network object", "options", options);
}
this.#network = staticNetwork;
}
}
/**
* Returns the value associated with the option %%key%%.
*
* Sub-classes can use this to inquire about configuration options.
*/
_getOption<K extends keyof JsonRpcApiProviderOptions>(key: K): JsonRpcApiProviderOptions[K] {
return this.#options[key];
}
get _network(): Network {
if (!this.#network) {
throwError("network is not available yet", "NETWORK_ERROR");
}
return this.#network;
}
get ready(): boolean { return this.#ready; }
async _start(): Promise<void> {
if (this.#ready) { return; }
if (this.#starting) { return this.#starting; }
this.#starting = (async () => {
// Bootstrap the network
if (this.#network == null) {
try {
this.#network = await this._detectNetwork();
} catch (error) {
console.log("JsonRpcProvider failed to startup; retry in 1s");
await stall(1000);
this.#starting = null;
}
}
this.#ready = true;
this.#starting = null;
// Start dispatching requests
this.#scheduleDrain();
})();
}
#scheduleDrain(): void {
if (this.#drainTimer || !this.ready) { return; }
if (this.#drainTimer) { return; }
// If we aren't using batching, no hard in sending it immeidately
const stallTime = (this._getOption("batchMaxCount") === 1) ? 0: this._getOption("batchStallTime");
@ -503,34 +439,54 @@ export class JsonRpcApiProvider extends AbstractProvider {
}, stallTime);
}
/**
* Requests the %%method%% with %%params%% via the JSON-RPC protocol
* over the underlying channel. This can be used to call methods
* on the backend that do not have a high-level API within the Provider
* API.
*
* This method queues requests according to the batch constraints
* in the options, assigns the request a unique ID.
*
* **Do NOT override** this method in sub-classes; instead
* override [[_send]] or force the options values in the
* call to the constructor to modify this method's behavior.
*/
send(method: string, params: Array<any> | Record<string, any>): Promise<any> {
// @TODO: cache chainId?? purge on switch_networks
constructor(network?: Networkish, options?: JsonRpcApiProviderOptions) {
super(network);
const id = this.#nextId++;
const promise = new Promise((resolve, reject) => {
this.#payloads.push({
resolve, reject,
payload: { method, params, id, jsonrpc: "2.0" }
this.#nextId = 1;
this.#options = Object.assign({ }, defaultOptions, options || { });
this.#payloads = [ ];
this.#drainTimer = null;
this.#network = null;
{
let resolve: null | ((value: void) => void) = null;
const promise = new Promise((_resolve: (value: void) => void) => {
resolve = _resolve;
});
});
this.#notReady = { promise, resolve };
}
// If there is not a pending drainTimer, set one
this.#scheduleDrain();
// This could be relaxed in the future to just check equivalent networks
const staticNetwork = this._getOption("staticNetwork");
if (staticNetwork) {
if (staticNetwork !== network) {
throwArgumentError("staticNetwork MUST match network object", "options", options);
}
this.#network = staticNetwork;
}
}
return <Promise<JsonRpcResult>>promise;
/**
* Returns the value associated with the option %%key%%.
*
* Sub-classes can use this to inquire about configuration options.
*/
_getOption<K extends keyof JsonRpcApiProviderOptions>(key: K): JsonRpcApiProviderOptions[K] {
return this.#options[key];
}
/**
* Gets the [[Network]] this provider has committed to. On each call, the network
* is detected, and if it has changed, the call will reject.
*/
get _network(): Network {
if (!this.#network) {
throwError("network is not available yet", "NETWORK_ERROR");
}
return this.#network;
}
/**
@ -544,49 +500,46 @@ export class JsonRpcApiProvider extends AbstractProvider {
});
}
/**
* Resolves to the [[Signer]] account for %%address%% managed by
* the client.
* Resolves to the non-normalized value by performing %%req%%.
*
* If the %%address%% is a number, it is used as an index in the
* the accounts from [[listAccounts]].
*
* This can only be used on clients which manage accounts (such as
* Geth with imported account or MetaMask).
*
* Throws if the account doesn't exist.
* Sub-classes may override this to modify behavior of actions,
* and should generally call ``super._perform`` as a fallback.
*/
async getSigner(address: number | string = 0): Promise<JsonRpcSigner> {
const accountsPromise = this.send("eth_accounts", [ ]);
// Account index
if (typeof(address) === "number") {
const accounts = <Array<string>>(await accountsPromise);
if (address > accounts.length) { throw new Error("no such account"); }
return new JsonRpcSigner(this, accounts[address]);
}
const { accounts } = await resolveProperties({
network: this.getNetwork(),
accounts: accountsPromise
});
// Account address
address = getAddress(address);
for (const account of accounts) {
if (getAddress(account) === account) {
return new JsonRpcSigner(this, account);
async _perform(req: PerformActionRequest): Promise<any> {
// Legacy networks do not like the type field being passed along (which
// is fair), so we delete type if it is 0 and a non-EIP-1559 network
if (req.method === "call" || req.method === "estimateGas") {
let tx = req.transaction;
if (tx && tx.type != null && getBigInt(tx.type)) {
// If there are no EIP-1559 properties, it might be non-EIP-a559
if (tx.maxFeePerGas == null && tx.maxPriorityFeePerGas == null) {
const feeData = await this.getFeeData();
if (feeData.maxFeePerGas == null && feeData.maxPriorityFeePerGas == null) {
// Network doesn't know about EIP-1559 (and hence type)
req = Object.assign({ }, req, {
transaction: Object.assign({ }, tx, { type: undefined })
});
}
}
}
}
throw new Error("invalid account");
const request = this.getRpcRequest(req);
if (request != null) {
return await this.send(request.method, request.args);
}
return super._perform(req);
}
/** Sub-classes can override this; it detects the *actual* network that
/** Sub-classes may override this; it detects the *actual* network that
* we are **currently** connected to.
*
* Keep in mind that [[send]] may only be used once [[ready]].
* Keep in mind that [[send]] may only be used once [[ready]], otherwise the
* _send primitive must be used instead.
*/
async _detectNetwork(): Promise<Network> {
const network = this._getOption("staticNetwork");
@ -604,7 +557,15 @@ export class JsonRpcApiProvider extends AbstractProvider {
};
this.emit("debug", { action: "sendRpcPayload", payload });
const result = (await this._send(payload))[0];
let result: JsonRpcResult | JsonRpcError;
try {
result = (await this._send(payload))[0];
} catch (error) {
this.emit("debug", { action: "receiveRpcError", error });
throw error;
}
this.emit("debug", { action: "receiveRpcResult", result });
if ("result" in result) {
@ -614,10 +575,51 @@ export class JsonRpcApiProvider extends AbstractProvider {
throw this.getRpcError(payload, result);
}
/**
* Sub-classes **MUST** call this. Until [[_start]] has been called, no calls
* will be passed to [[_send]] from [[send]]. If it is overridden, then
* ``super._start()`` **MUST** be called.
*
* Calling it multiple times is safe and has no effect.
*/
_start(): void {
if (this.#notReady == null || this.#notReady.resolve == null) { return; }
this.#notReady.resolve();
this.#notReady = null;
(async () => {
// Bootstrap the network
while (this.#network == null) {
try {
this.#network = await this._detectNetwork();
} catch (error) {
console.log("JsonRpcProvider failed to startup; retry in 1s");
await stall(1000);
}
}
// Start dispatching requests
this.#scheduleDrain();
})();
}
/**
* Resolves once the [[_start]] has been called. This can be used in
* sub-classes to defer sending data until the connection has been
* established.
*/
async _waitUntilReady(): Promise<void> {
if (this.#notReady == null) { return; }
return await this.#notReady.promise;
}
/**
* Return a Subscriber that will manage the %%sub%%.
*
* Sub-classes can override this to modify the behavior of
* Sub-classes may override this to modify the behavior of
* subscription management.
*/
_getSubscriber(sub: Subscription): Subscriber {
@ -637,6 +639,11 @@ export class JsonRpcApiProvider extends AbstractProvider {
return super._getSubscriber(sub);
}
/**
* Returns true only if the [[_start]] has been called.
*/
get ready(): boolean { return this.#notReady == null; }
/**
* Returns %%tx%% as a normalized JSON-RPC transaction request,
* which has all values hexlified and any numeric values converted
@ -700,7 +707,7 @@ export class JsonRpcApiProvider extends AbstractProvider {
args: [ getLowerCase(req.address), req.blockTag ]
};
case "getStorageAt":
case "getStorage":
return {
method: "eth_getStorageAt",
args: [
@ -836,38 +843,74 @@ export class JsonRpcApiProvider extends AbstractProvider {
return makeError("could not coalesce error", "UNKNOWN_ERROR", { error });
}
/**
* Resolves to the non-normalized value by performing %%req%%.
* Requests the %%method%% with %%params%% via the JSON-RPC protocol
* over the underlying channel. This can be used to call methods
* on the backend that do not have a high-level API within the Provider
* API.
*
* Sub-classes may override this to modify behavior of actions,
* and should generally call ``super._perform`` as a fallback.
* This method queues requests according to the batch constraints
* in the options, assigns the request a unique ID.
*
* **Do NOT override** this method in sub-classes; instead
* override [[_send]] or force the options values in the
* call to the constructor to modify this method's behavior.
*/
async _perform(req: PerformActionRequest): Promise<any> {
// Legacy networks do not like the type field being passed along (which
// is fair), so we delete type if it is 0 and a non-EIP-1559 network
if (req.method === "call" || req.method === "estimateGas") {
let tx = req.transaction;
if (tx && tx.type != null && getBigInt(tx.type)) {
// If there are no EIP-1559 properties, it might be non-EIP-a559
if (tx.maxFeePerGas == null && tx.maxPriorityFeePerGas == null) {
const feeData = await this.getFeeData();
if (feeData.maxFeePerGas == null && feeData.maxPriorityFeePerGas == null) {
// Network doesn't know about EIP-1559 (and hence type)
req = Object.assign({ }, req, {
transaction: Object.assign({ }, tx, { type: undefined })
});
}
}
send(method: string, params: Array<any> | Record<string, any>): Promise<any> {
// @TODO: cache chainId?? purge on switch_networks
const id = this.#nextId++;
const promise = new Promise((resolve, reject) => {
this.#payloads.push({
resolve, reject,
payload: { method, params, id, jsonrpc: "2.0" }
});
});
// If there is not a pending drainTimer, set one
this.#scheduleDrain();
return <Promise<JsonRpcResult>>promise;
}
/**
* Resolves to the [[Signer]] account for %%address%% managed by
* the client.
*
* If the %%address%% is a number, it is used as an index in the
* the accounts from [[listAccounts]].
*
* This can only be used on clients which manage accounts (such as
* Geth with imported account or MetaMask).
*
* Throws if the account doesn't exist.
*/
async getSigner(address: number | string = 0): Promise<JsonRpcSigner> {
const accountsPromise = this.send("eth_accounts", [ ]);
// Account index
if (typeof(address) === "number") {
const accounts = <Array<string>>(await accountsPromise);
if (address > accounts.length) { throw new Error("no such account"); }
return new JsonRpcSigner(this, accounts[address]);
}
const { accounts } = await resolveProperties({
network: this.getNetwork(),
accounts: accountsPromise
});
// Account address
address = getAddress(address);
for (const account of accounts) {
if (getAddress(account) === account) {
return new JsonRpcSigner(this, account);
}
}
const request = this.getRpcRequest(req);
if (request != null) {
return await this.send(request.method, request.args);
}
return super._perform(req);
throw new Error("invalid account");
}
}
@ -897,10 +940,14 @@ export class JsonRpcProvider extends JsonRpcApiProvider {
this.#pollingInterval = 4000;
}
_getConnection(): FetchRequest {
return this.#connect.clone();
}
async send(method: string, params: Array<any> | Record<string, any>): Promise<any> {
// All requests are over HTTP, so we can just start handling requests
// We do this here rather than the constructor so that we don't send any
// requests to the network until we absolutely have to.
// requests to the network (i.e. eth_chainId) until we absolutely have to.
await this._start();
return await super.send(method, params);
@ -908,7 +955,7 @@ export class JsonRpcProvider extends JsonRpcApiProvider {
async _send(payload: JsonRpcPayload | Array<JsonRpcPayload>): Promise<Array<JsonRpcResult>> {
// Configure a POST connection for the requested method
const request = this.#connect.clone();
const request = this._getConnection();
request.body = JSON.stringify(payload);
const response = await request.send();

@ -10,7 +10,7 @@
*/
import { UnmanagedSubscriber } from "./abstract-provider.js";
import { assertArgument, makeError, throwError } from "../utils/index.js";
import { assertArgument, throwError } from "../utils/index.js";
import { JsonRpcApiProvider } from "./provider-jsonrpc.js";
import type { Subscriber, Subscription } from "./abstract-provider.js";
@ -196,10 +196,16 @@ export class SocketProvider extends JsonRpcApiProvider {
assertArgument(!Array.isArray(payload), "WebSocket does not support batch send", "payload", payload);
// @TODO: stringify payloads here and store to prevent mutations
// Prepare a promise to respond to
const promise = new Promise((resolve, reject) => {
this.#callbacks.set(payload.id, { payload, resolve, reject });
});
// Wait until the socket is connected before writing to it
await this._waitUntilReady();
// Write the request to the socket
await this._write(JSON.stringify(payload));
return <Array<JsonRpcResult | JsonRpcError>>[ await promise ];
@ -232,6 +238,9 @@ export class SocketProvider extends JsonRpcApiProvider {
}
this.#callbacks.delete(result.id);
callback.resolve(result);
/*
if ("error" in result) {
const { message, code, data } = result.error;
const error = makeError(message || "unkonwn error", "SERVER_ERROR", {
@ -242,7 +251,7 @@ export class SocketProvider extends JsonRpcApiProvider {
} else {
callback.resolve(result.result);
}
*/
} else if (result.method === "eth_subscription") {
const filterId = result.params.subscription;
const subscriber = this.#subs.get(filterId);

@ -18,10 +18,11 @@ export interface WebSocketLike {
}
export class WebSocketProvider extends SocketProvider {
url!: string;
#websocket: WebSocketLike;
get websocket(): WebSocketLike { return this.#websocket; }
#websocket: null | WebSocketLike;
get websocket(): WebSocketLike {
if (this.#websocket == null) { throw new Error("websocket closed"); }
return this.#websocket;
}
constructor(url: string | WebSocketLike, network?: Networkish) {
super(network);
@ -48,4 +49,10 @@ export class WebSocketProvider extends SocketProvider {
async _write(message: string): Promise<void> {
this.websocket.send(message);
}
async destroy(): Promise<void> {
if (this.#websocket == null) { return; }
this.#websocket.close();
this.#websocket = null;
}
}