diff --git a/src.ts/providers/plugin-fallback.ts b/src.ts/providers/plugin-fallback.ts new file mode 100644 index 000000000..76a929cb0 --- /dev/null +++ b/src.ts/providers/plugin-fallback.ts @@ -0,0 +1,35 @@ + +import { AbstractProviderPlugin } from "./abstract-provider.js"; +import { defineProperties } from "../utils/index.js"; + +import type { AbstractProvider, PerformActionRequest } from "./abstract-provider.js"; + + +export const PluginIdFallbackProvider = "org.ethers.plugins.QualifiedPlugin"; + +export class CheckQualifiedPlugin implements AbstractProviderPlugin { + declare name: string; + + constructor() { + defineProperties(this, { name: PluginIdFallbackProvider }); + } + + connect(provider: AbstractProvider): CheckQualifiedPlugin { + return this; + } + + // Retruns true if this value should be considered qualified for + // inclusion in the quorum. + isQualified(action: PerformActionRequest, result: any): boolean { + return true; + } +} + +export class PossiblyPrunedTransactionPlugin extends CheckQualifiedPlugin { + isQualified(action: PerformActionRequest, result: any): boolean { + if (action.method === "getTransaction" || action.method === "getTransactionReceipt") { + if (result == null) { return false; } + } + return super.isQualified(action, result); + } +} diff --git a/src.ts/providers/provider-fallback.ts b/src.ts/providers/provider-fallback.ts index 0b85f8b23..e9f23e2fe 100644 --- a/src.ts/providers/provider-fallback.ts +++ b/src.ts/providers/provider-fallback.ts @@ -1,19 +1,14 @@ import { - getBigInt, getNumber, hexlify, assert, assertArgument + getBigInt, getNumber, assert, assertArgument } from "../utils/index.js"; import { AbstractProvider } from "./abstract-provider.js"; -import { - formatBlock, formatBlockWithTransactions, formatLog, formatTransactionReceipt, - formatTransactionResponse -} from "./format.js"; import { Network } from "./network.js" import type { PerformActionRequest } from "./abstract-provider.js"; import type { Networkish } from "./network.js" -//const BN_0 = BigInt("0"); const BN_1 = BigInt("1"); const BN_2 = BigInt("2"); @@ -132,93 +127,114 @@ export type FallbackProviderOptions = { eventWorkers: number; }; -type RunningState = { +type RunnerResult = { result: any } | { error: Error }; + +type RunnerState = { config: Config; staller: null | Promise; didBump: boolean; perform: null | Promise; - done: boolean; - result: { result: any } | { error: Error } + result: null | RunnerResult; } -// Normalizes a result to a string that can be used to compare against -// other results using normal string equality -function normalize(provider: AbstractProvider, value: any, req: PerformActionRequest): string { - switch (req.method) { - case "chainId": - return getBigInt(value).toString(); - case "getBlockNumber": - return getNumber(value).toString(); - case "getGasPrice": - return getBigInt(value).toString(); - case "getBalance": - return getBigInt(value).toString(); - case "getTransactionCount": - return getNumber(value).toString(); - case "getCode": - return hexlify(value); - case "getStorage": - return hexlify(value); - case "getBlock": - if (req.includeTransactions) { - return stringify(formatBlockWithTransactions(value)); - } - return stringify(formatBlock(value)); - case "getTransaction": - return stringify(formatTransactionResponse(value)); - case "getTransactionReceipt": - return stringify(formatTransactionReceipt(value)); - case "call": - return hexlify(value); - case "estimateGas": - return getBigInt(value).toString(); - case "getLogs": - return stringify(value.map((v: any) => formatLog(v))); +function _normalize(value: any): string { + if (value == null) { return "null"; } + + if (Array.isArray(value)) { + return "[" + (value.map(_normalize)).join(",") + "]"; } - assert(false, "unsupported method", "UNSUPPORTED_OPERATION", { - operation: `_perform(${ stringify(req.method) })` }); -} - -type TallyResult = { - result: any; - normal: string; - weight: number; -}; - -// This strategy picks the highest wieght result, as long as the weight is -// equal to or greater than quorum -function checkQuorum(quorum: number, results: Array): any { - const tally: Map = new Map(); - for (const { result, normal, weight } of results) { - const t = tally.get(normal) || { result, weight: 0 }; - t.weight += weight; - tally.set(normal, t); + if (typeof(value) === "object" && typeof(value.toJSON) === "function") { + return _normalize(value.toJSON()); } - let bestWeight = 0; - let bestResult = undefined; - - for (const { weight, result } of tally.values()) { - if (weight >= quorum && weight > bestWeight) { - bestWeight = weight; - bestResult = result; + switch (typeof(value)) { + case "boolean": case "symbol": + return value.toString(); + case "bigint": case "number": + return BigInt(value).toString(); + case "string": + return JSON.stringify(value); + case "object": { + const keys = Object.keys(value); + keys.sort(); + return "{" + keys.map((k) => `${ JSON.stringify(k) }:${ _normalize(value[k]) }`).join(",") + "}"; } } - return bestResult; + console.log("Could not serialize", value); + throw new Error("Hmm..."); } -/* -function getMean(results: Array): bigint { - const total = results.reduce((a, r) => (a + BigInt(r.result)), BN_0); - return total / BigInt(results.length); -} -*/ +function normalizeResult(value: RunnerResult): { tag: string, value: any } { + + if ("error" in value) { + const error = value.error; + return { tag: _normalize(error), value: error }; + } + + const result = value.result; + return { tag: _normalize(result), value: result }; +} + +type TallyResult = { + tag: string; + value: any; + weight: number; +}; + +// This strategy picks the highest weight result, as long as the weight is +// equal to or greater than quorum +function checkQuorum(quorum: number, results: Array): any | Error { + const tally: Map = new Map(); + for (const { value, tag, weight } of results) { + const t = tally.get(tag) || { value, weight: 0 }; + t.weight += weight; + tally.set(tag, t); + } + + let best: null | { value: any, weight: number } = null; + + for (const r of tally.values()) { + if (r.weight >= quorum && (!best || r.weight > best.weight)) { + best = r; + } + } + + if (best) { return best.value; } + + return undefined; +} + +function getMedian(quorum: number, results: Array): undefined | bigint | Error { + let resultWeight = 0; + + const errorMap: Map = new Map(); + let bestError: null | { weight: number, value: Error } = null; + + const values: Array = [ ]; + for (const { value, tag, weight } of results) { + if (value instanceof Error) { + const e = errorMap.get(tag) || { value, weight: 0 }; + e.weight += weight; + errorMap.set(tag, e); + + if (bestError == null || e.weight > bestError.weight) { bestError = e; } + } else { + values.push(BigInt(value)); + resultWeight += weight; + } + } + + if (resultWeight < quorum) { + // We have quorum for an error + if (bestError && bestError.weight >= quorum) { return bestError.value; } + + // We do not have quorum for a result + return undefined; + } -function getMedian(results: Array): bigint { // Get the sorted values - const values = results.map((r) => BigInt(r.result)); values.sort((a, b) => ((a < b) ? -1: (b > a) ? 1: 0)); const mid = values.length / 2; @@ -230,8 +246,22 @@ function getMedian(results: Array): bigint { return (values[mid - 1] + values[mid] + BN_1) / BN_2; } +function getAnyResult(quorum: number, results: Array): undefined | any | Error { + // If any value or error meets quorum, that is our preferred result + const result = checkQuorum(quorum, results); + if (result !== undefined) { return result; } + + // Otherwise, do we have any result? + for (const r of results) { + if (r.value) { return r.value; } + } + + // Nope! + return undefined; +} + function getFuzzyMode(quorum: number, results: Array): undefined | number { - if (quorum === 1) { return getNumber(getMedian(results), "%internal"); } + if (quorum === 1) { return getNumber(getMedian(quorum, results), "%internal"); } const tally: Map = new Map(); const add = (result: number, weight: number) => { @@ -240,8 +270,8 @@ function getFuzzyMode(quorum: number, results: Array): undefined | tally.set(result, t); }; - for (const { weight, result } of results) { - const r = getNumber(result); + for (const { weight, value } of results) { + const r = getNumber(value); add(r - 1, weight); add(r, weight); add(r + 1, weight); @@ -264,7 +294,6 @@ function getFuzzyMode(quorum: number, results: Array): undefined | } export class FallbackProvider extends AbstractProvider { - //readonly providerConfigs!: ReadonlyArray>>; readonly quorum: number; readonly eventQuorum: number; @@ -296,9 +325,14 @@ export class FallbackProvider extends AbstractProvider { "quorum exceed provider wieght", "quorum", this.quorum); } - // @TOOD: Copy these and only return public values get providerConfigs(): Array { - return this.#configs.slice(); + return this.#configs.map((c) => { + const result: any = Object.assign({ }, c); + for (const key in result) { + if (key[0] === "_") { delete result[key]; } + } + return result; + }); } async _detectNetwork(): Promise { @@ -310,8 +344,55 @@ export class FallbackProvider extends AbstractProvider { // throw new Error("@TODO"); //} - // Grab the next (random) config that is not already part of configs - #getNextConfig(configs: Array): null | Config { + async _translatePerform(provider: AbstractProvider, req: PerformActionRequest): Promise { + switch (req.method) { + case "broadcastTransaction": + return await provider.broadcastTransaction(req.signedTransaction); + case "call": + return await provider.call(Object.assign({ }, req.transaction, { blockTag: req.blockTag })); + case "chainId": + return (await provider.getNetwork()).chainId; + case "estimateGas": + return await provider.estimateGas(req.transaction); + case "getBalance": + return await provider.getBalance(req.address, req.blockTag); + case "getBlock": { + const block = ("blockHash" in req) ? req.blockHash: req.blockTag; + if (req.includeTransactions) { + return await provider.getBlockWithTransactions(block); + } + return await provider.getBlock(block); + } + case "getBlockNumber": + return await provider.getBlockNumber(); + case "getCode": + return await provider.getCode(req.address, req.blockTag); + case "getGasPrice": + return (await provider.getFeeData()).gasPrice; + case "getLogs": + return await provider.getLogs(req.filter); + case "getStorage": + return await provider.getStorage(req.address, req.position, req.blockTag); + case "getTransaction": + return await provider.getTransaction(req.hash); + case "getTransactionCount": + return await provider.getTransactionCount(req.address, req.blockTag); + case "getTransactionReceipt": + return await provider.getTransactionReceipt(req.hash); + case "getTransactionResult": + return await provider.getTransactionResult(req.hash); + } + } + + // Grab the next (random) config that is not already part of + // the running set + #getNextConfig(running: Set): null | Config { + // @TODO: Maybe do a check here to favour (heavily) providers that + // do not require waitForSync and disfavour providers that + // seem down-ish or are behaving slowly + + const configs = Array.from(running).map((r) => r.config) + // Shuffle the states, sorted by priority const allConfigs = this.#configs.slice(); shuffle(allConfigs); @@ -325,32 +406,31 @@ export class FallbackProvider extends AbstractProvider { } // Adds a new runner (if available) to running. - #addRunner(running: Set, req: PerformActionRequest): null | RunningState { - const config = this.#getNextConfig(Array.from(running).map((r) => r.config)); - if (config == null) { - return null; - } + #addRunner(running: Set, req: PerformActionRequest): null | RunnerState { + const config = this.#getNextConfig(running); - const result: any = { }; + // No runners available + if (config == null) { return null; } - const runner: RunningState = { - config, result, didBump: false, done: false, + // Create a new runner + const runner: RunnerState = { + config, result: null, didBump: false, perform: null, staller: null }; const now = getTime(); + // Start performing this operation runner.perform = (async () => { try { config.requests++; - result.result = await config.provider._perform(req); - } catch (error) { + const result = await this._translatePerform(config.provider, req); + runner.result = { result }; + } catch (error: any) { config.errorResponses++; - result.error = error; + runner.result = { error }; } - if (runner.done) { config.lateResponses++; } - const dt = (getTime() - now); config._totalTime += dt; @@ -359,6 +439,8 @@ export class FallbackProvider extends AbstractProvider { runner.perform = null; })(); + // Start a staller; when this times out, it's time to force + // kicking off another runner because we are taking too long runner.staller = (async () => { await stall(config.stallTimeout); runner.staller = null; @@ -404,17 +486,13 @@ export class FallbackProvider extends AbstractProvider { } - async #checkQuorum(running: Set, req: PerformActionRequest): Promise { + async #checkQuorum(running: Set, req: PerformActionRequest): Promise { // Get all the result objects const results: Array = [ ]; for (const runner of running) { - if ("result" in runner.result) { - const result = runner.result.result; - results.push({ - result, - normal: normalize(runner.config.provider, result, req), - weight: runner.config.weight - }); + if (runner.result != null) { + const { tag, value } = normalizeResult(runner.result); + results.push({ tag, value, weight: runner.config.weight }); } } @@ -427,14 +505,15 @@ export class FallbackProvider extends AbstractProvider { case "getBlockNumber": { // We need to get the bootstrap block height if (this.#height === -2) { - const height = Math.ceil(getNumber(getMedian(this.#configs.map((c) => ({ - result: c.blockNumber, - normal: getNumber(c.blockNumber).toString(), + this.#height = Math.ceil(getNumber(getMedian(this.quorum, this.#configs.map((c) => ({ + value: c.blockNumber, + tag: getNumber(c.blockNumber).toString(), weight: c.weight - }))), "%internal")); - this.#height = height; + }))))); } + // Find the mode across all the providers, allowing for + // a little drift between block heights const mode = getFuzzyMode(this.quorum, results); if (mode === undefined) { return undefined; } if (mode > this.#height) { this.#height = mode; } @@ -443,16 +522,17 @@ export class FallbackProvider extends AbstractProvider { case "getGasPrice": case "estimateGas": - return getMedian(results); + return getMedian(this.quorum, results); case "getBlock": - // Pending blocks are mempool dependant and already - // quite untrustworthy + // Pending blocks are in the mempool and already + // quite untrustworthy; just grab anything if ("blockTag" in req && req.blockTag === "pending") { - return results[0].result; + return getAnyResult(this.quorum, results); } return checkQuorum(this.quorum, results); + case "call": case "chainId": case "getBalance": case "getTransactionCount": @@ -463,10 +543,6 @@ export class FallbackProvider extends AbstractProvider { case "getLogs": return checkQuorum(this.quorum, results); - case "call": - // @TODO: Check errors - return checkQuorum(this.quorum, results); - case "broadcastTransaction": throw new Error("TODO"); } @@ -476,25 +552,16 @@ export class FallbackProvider extends AbstractProvider { }); } - async #waitForQuorum(running: Set, req: PerformActionRequest): Promise { + async #waitForQuorum(running: Set, req: PerformActionRequest): Promise { if (running.size === 0) { throw new Error("no runners?!"); } // Any promises that are interesting to watch for; an expired stall // or a successful perform const interesting: Array> = [ ]; - //const results: Array = [ ]; - //const errors: Array = [ ]; let newRunners = 0; for (const runner of running) { -// @TODO: use runner.perfom != null - /* - if ("result" in runner.result) { - results.push(runner.result.result); - } else if ("error" in runner.result) { - errors.push(runner.result.error); - } -*/ + // No responses, yet; keep an eye on it if (runner.perform) { interesting.push(runner.perform); @@ -514,17 +581,7 @@ export class FallbackProvider extends AbstractProvider { newRunners++; } - // Check for quorum - /* - console.log({ results, errors } ); - if (results.length >= this.quorum) { - return results[0]; - } - - if (errors.length >= this.quorum) { - return errors[0]; - } - */ + // Check if we have reached quorum on a result (or error) const value = await this.#checkQuorum(running, req); if (value !== undefined) { if (value instanceof Error) { throw value; } @@ -537,29 +594,61 @@ export class FallbackProvider extends AbstractProvider { this.#addRunner(running, req) } - if (interesting.length === 0) { - throw new Error("quorum not met"); -// return logger.throwError("failed to meet quorum", "", { -// }); - } + // All providers have returned, and we have no result - // Wait for someone to either complete its perform or trigger a stall + assert(interesting.length > 0, "quorum not met", "SERVER_ERROR", { + request: "%sub-requests", + info: { request: req, results: Array.from(running).map((r) => stringify(r.result)) } + }); + + // Wait for someone to either complete its perform or stall out await Promise.race(interesting); + // This is recursive, but at worst case the depth is 2x the + // number of providers (each has a perform and a staller) return await this.#waitForQuorum(running, req); } async _perform(req: PerformActionRequest): Promise { + // Broadcasting a transaction is rare (ish) and already incurs + // a cost on the user, so spamming is safe-ish. Just send it to + // every backend. + if (req.method === "broadcastTransaction") { + const results = await Promise.all(this.#configs.map(async ({ provider, weight }) => { + try { + const result = await provider._perform(req); + return Object.assign(normalizeResult({ result }), { weight }); + } catch (error: any) { + return Object.assign(normalizeResult({ error }), { weight }); + } + })); + + const result = getAnyResult(this.quorum, results); + assert(result !== undefined, "problem multi-broadcasting", "SERVER_ERROR", { + request: "%sub-requests", + info: { request: req, results: results.map(stringify) } + }) + return result; + } + await this.#initialSync(); - // Bootstrap enough to meet quorum - const running: Set = new Set(); + // Bootstrap enough runners to meet quorum + const running: Set = new Set(); for (let i = 0; i < this.quorum; i++) { this.#addRunner(running, req); } - const result = this.#waitForQuorum(running, req); - for (const runner of running) { runner.done = true; } + const result = await this.#waitForQuorum(running, req); + + // Track requests sent to a provider that are still + // outstanding after quorum has been otherwise found + for (const runner of running) { + if (runner.perform && runner.result == null) { + runner.config.lateResponses++; + } + } + return result; } }