diff --git a/packages/providers/src.ts/base-provider.ts b/packages/providers/src.ts/base-provider.ts index d2dd24857..d2a8594c7 100644 --- a/packages/providers/src.ts/base-provider.ts +++ b/packages/providers/src.ts/base-provider.ts @@ -687,6 +687,23 @@ export class BaseProvider extends Provider { // Add transactions if (includeTransactions) { + let blockNumber: number = null; + for (let i = 0; i < block.transactions.length; i++) { + const tx = block.transactions[i]; + if (tx.blockNumber == null) { + tx.confirmations = 0; + + } else if (tx.confirmations == null) { + if (blockNumber == null) { + blockNumber = await this._getInternalBlockNumber(100 + 2 * this.pollingInterval); + } + + // Add the confirmations using the fast block number (pessimistic) + let confirmations = (blockNumber - tx.blockNumber) + 1; + if (confirmations <= 0) { confirmations = 1; } + tx.confirmations = confirmations; + } + } return this.formatter.blockWithTransactions(block); } @@ -777,7 +794,10 @@ export class BaseProvider extends Provider { async getLogs(filter: Filter | FilterByBlockHash | Promise): Promise> { await this.ready; const params = await resolveProperties({ filter: this._getFilter(filter) }); - const logs = await this.perform("getLogs", params); + const logs: Array = await this.perform("getLogs", params); + logs.forEach((log) => { + if (log.removed == null) { log.removed = false; } + }); return Formatter.arrayOf(this.formatter.filterLog.bind(this.formatter))(logs); } diff --git a/packages/providers/src.ts/fallback-provider.ts b/packages/providers/src.ts/fallback-provider.ts index 39ae57cb9..67f725dda 100644 --- a/packages/providers/src.ts/fallback-provider.ts +++ b/packages/providers/src.ts/fallback-provider.ts @@ -1,8 +1,9 @@ "use strict"; import { Network } from "@ethersproject/networks"; +import { BlockWithTransactions, Provider } from "@ethersproject/abstract-provider"; import { shuffled } from "@ethersproject/random"; -import { deepCopy, defineReadOnly } from "@ethersproject/properties"; +import { deepCopy, defineReadOnly, shallowCopy } from "@ethersproject/properties"; import { BigNumber } from "@ethersproject/bignumber"; import { Logger } from "@ethersproject/logger"; @@ -13,318 +14,465 @@ import { BaseProvider } from "./base-provider"; function now() { return (new Date()).getTime(); } -// Returns: -// - true is all networks match -// - false if any network is null -// - throws if any 2 networks do not match -function checkNetworks(networks: Array): boolean { - let result = true; +// Returns to network as long as all agree, or null if any is null. +// Throws an error if any two networks do not match. +function checkNetworks(networks: Array): Network { + let result = null; - let check: Network = null; - networks.forEach((network) => { + for (let i = 0; i < networks.length; i++) { + const network = networks[i]; - // Null - if (network == null) { - result = false; - return; + // Null! We do not know our network; bail. + if (network == null) { return null; } + + if (result) { + // Make sure the network matches the previous networks + if (!(result.name === network.name && result.chainId === network.chainId && + ((result.ensAddress === network.ensAddress) || (result.ensAddress == null && network.ensAddress == null)))) { + + logger.throwArgumentError("provider mismatch", "networks", networks); + } + } else { + result = network; } - - // Have nothing to compre to yet - if (check == null) { - check = network; - return; - } - - // Matches! - if (check.name === network.name && - check.chainId === network.chainId && - ((check.ensAddress === network.ensAddress) || - (check.ensAddress == null && network.ensAddress == null))) { return; } - - logger.throwArgumentError("provider mismatch", "networks", networks); - }); + } return result; } -type Result = { - result?: any; - error?: Error; - weight: number; -}; +function median(values: Array): number { + values = values.slice().sort(); + const middle = Math.floor(values.length / 2); -type Runner = { - run: () => Promise; - weight: number; -}; + // Odd length; take the middle + if (values.length % 2) { + return values[middle]; + } + // Even length; take the average of the two middle + const a = values[middle - 1], b = values[middle]; + return (a + b) / 2; +} -function serialize(result: any): string { - if (Array.isArray(result)) { - return JSON.stringify(result.map((r) => serialize(r))); - } else if (result === null) { - return "null"; - } else if (typeof(result) === "object") { - let keys = Object.keys(result); +function serialize(value: any): string { + if (value === null) { + return null; + } else if (typeof(value) === "number" || typeof(value) === "boolean") { + return JSON.stringify(value); + } else if (typeof(value) === "string") { + return value; + } else if (BigNumber.isBigNumber(value)) { + return value.toString(); + } else if (Array.isArray(value)) { + return JSON.stringify(value.map((i) => serialize(i))); + } else if (typeof(value) === "object") { + const keys = Object.keys(value); keys.sort(); return "{" + keys.map((key) => { - let value = result[key]; - if (typeof(value) === "function") { - value = "function{}"; + let v = value[key]; + if (typeof(v) === "function") { + v = "[function]"; } else { - value = serialize(value); + v = serialize(v); } - return JSON.stringify(key) + "=" + serialize(value); + return JSON.stringify(key) + ":" + v; }).join(",") + "}"; } - return JSON.stringify(result); + throw new Error("unknown value type: " + typeof(value)); } +// Next request ID to use for emitting debug info let nextRid = 1; + +export interface FallbackProviderConfig { + // The Provider + provider: Provider; + + // The priority to favour this Provider; higher values are used first + priority?: number; + + // Timeout before also triggering the next provider; this does not stop + // this provider and if its result comes back before a quorum is reached + // it will be used it will be used. + // - lower values will cause more network traffic but may result in a + // faster retult. + stallTimeout?: number; + + // How much this provider contributes to the quorum; sometimes a specific + // provider may be more reliable or trustworthy than others, but usually + // this should be left as the default + weight?: number; +}; + +// Returns a promise that delays for duration +function stall(duration: number): Promise { + return new Promise((resolve) => { + const timer = setTimeout(resolve, duration); + if (timer.unref) { timer.unref(); } + }); +} + +interface RunningConfig extends FallbackProviderConfig { + start?: number; + done?: boolean; + runner?: Promise; + staller?: Promise; + result?: any; + error?: Error; +}; + +function exposeDebugConfig(config: RunningConfig, now?: number): any { + const result: any = { + provider: config.provider, + weight: config.weight + }; + if (config.start) { result.start = config.start; } + if (now) { result.duration = (now - config.start); } + if (config.done) { + if (config.error) { + result.error = config.error; + } else { + result.result = config.result || null; + } + } + return result; +} + +function normalizedTally(normalize: (value: any) => string, quorum: number): (configs: Array) => any { + return function(configs: Array): any { + + // Count the votes for each result + const tally: { [ key: string]: { count: number, result: any } } = { }; + configs.forEach((c) => { + const value = normalize(c.result); + if (!tally[value]) { tally[value] = { count: 0, result: c.result }; } + tally[value].count++; + }); + + // Check for a quorum on any given result + const keys = Object.keys(tally); + for (let i = 0; i < keys.length; i++) { + const check = tally[keys[i]]; + if (check.count >= quorum) { + return check.result; + } + } + + // No quroum + return undefined; + } +} +function getProcessFunc(provider: FallbackProvider, method: string, params: { [ key: string ]: any }): (configs: Array) => any { + + let normalize = serialize; + + switch (method) { + case "getBlockNumber": + // Return the median value, unless there is (median + 1) is also + // present, in which case that is probably true and the median + // is going to be stale soon. In the event of a malicious node, + // the lie will be true soon enough. + return function(configs: Array): number { + const values = configs.map((c) => c.result); + + // Get the median block number + let blockNumber = Math.ceil(median(configs.map((c) => c.result))); + + // If the next block height is present, its prolly safe to use + if (values.indexOf(blockNumber + 1) >= 0) { blockNumber++; } + + // Don't ever roll back the blockNumber + if (blockNumber >= provider._highestBlockNumber) { + provider._highestBlockNumber = blockNumber; + } + + return provider._highestBlockNumber; + }; + + case "getGasPrice": + // Return the middle (round index up) value, similar to median + // but do not average even entries and choose the higher. + // Malicious actors must compromise 50% of the nodes to lie. + return function(configs: Array): BigNumber { + const values = configs.map((c) => c.result); + values.sort(); + return values[Math.floor(values.length / 2)]; + } + + case "getEtherPrice": + // Returns the median price. Malicious actors must compromise at + // least 50% of the nodes to lie (in a meaningful way). + return function(configs: Array): number { + return median(configs.map((c) => c.result)); + } + + // No additional normalizing required; serialize is enough + case "getBalance": + case "getTransactionCount": + case "getCode": + case "getStorageAt": + case "call": + case "estimateGas": + case "getLogs": + break; + + // We drop the confirmations from transactions as it is approximate + case "getTransaction": + case "getTransactionReceipt": + normalize = function(tx: any): string { + tx = shallowCopy(tx); + tx.confirmations = -1; + return serialize(tx); + } + break; + + // We drop the confirmations from transactions as it is approximate + case "getBlock": + // We drop the confirmations from transactions as it is approximate + if (params.includeTransactions) { + normalize = function(block: BlockWithTransactions): string { + block = shallowCopy(block); + block.transactions = block.transactions.map((tx) => { + tx = shallowCopy(tx); + tx.confirmations = -1; + return tx; + }); + return serialize(block); + }; + } + break; + + default: + throw new Error("unknown method: " + method); + } + + // Return the result if and only if the expected quorum is + // satisfied and agreed upon for the final result. + return normalizedTally(normalize, provider.quorum); + +} + +function getRunner(provider: Provider, method: string, params: { [ key: string]: any }): Promise { + switch (method) { + case "getBlockNumber": + case "getGasPrice": + return provider[method](); + case "getEtherPrice": + if ((provider).getEtherPrice) { + return (provider).getEtherPrice(); + } + break; + case "getBalance": + case "getTransactionCount": + case "getCode": + return provider[method](params.address, params.blockTag || "latest"); + case "getStorageAt": + return provider.getStorageAt(params.address, params.position, params.blockTag || "latest"); + case "getBlock": + return provider[(params.includeTransactions ? "getBlockWithTransactions": "getBlock")](params.blockTag || params.blockHash); + case "call": + case "estimateGas": + return provider[method](params.transaction); + case "getTransaction": + case "getTransactionReceipt": + return provider[method](params.transactionHash); + case "getLogs": + return provider.getLogs(params.filter); + } + + return logger.throwError("unknown method error", Logger.errors.UNKNOWN_ERROR, { + method: method, + params: params + }); +} + export class FallbackProvider extends BaseProvider { - readonly providers: Array; - readonly weights: Array; + readonly providerConfigs: Array; readonly quorum: number; - constructor(providers: Array, quorum?: number, weights?: Array) { + // Due to teh highly asyncronous nature of the blockchain, we need + // to make sure we never unroll the blockNumber due to our random + // sample of backends + _highestBlockNumber: number; + + constructor(providers: Array, quorum?: number) { logger.checkNew(new.target, FallbackProvider); if (providers.length === 0) { logger.throwArgumentError("missing providers", "providers", providers); } - if (weights != null && weights.length !== providers.length) { - logger.throwArgumentError("too many weights", "weights", weights); - } else if (!weights) { - weights = providers.map((p) => 1); - } else { - weights.forEach((w) => { - if (w % 1 || w > 512 || w < 1) { - logger.throwArgumentError("invalid weight; must be integer in [1, 512]", "weights", weights); - } - }); - } + const providerConfigs: Array = providers.map((configOrProvider, index) => { + if (Provider.isProvider(configOrProvider)) { + return Object.freeze({ provider: configOrProvider, weight: 1, stallTimeout: 750, priority: 1 }); + } - let total = weights.reduce((accum, w) => (accum + w)); + const config: FallbackProviderConfig = shallowCopy(configOrProvider); + + if (config.priority == null) { config.priority = 1; } + if (config.stallTimeout == null) { config.stallTimeout = 750; } + if (config.weight == null) { config.weight = 1; } + + const weight = config.weight; + if (weight % 1 || weight > 512 || weight < 1) { + logger.throwArgumentError("invalid weight; must be integer in [1, 512]", `providers[${ index }].weight`, weight); + } + + return Object.freeze(config); + }); + + const total = providerConfigs.reduce((accum, c) => (accum + c.weight), 0); if (quorum == null) { quorum = total / 2; - } else { - if (quorum > total) { - logger.throwArgumentError("quorum will always fail; larger than total weight", "quorum", quorum); - } + } else if (quorum > total) { + logger.throwArgumentError("quorum will always fail; larger than total weight", "quorum", quorum); } - // All networks are ready, we can know the network for certain - let ready = checkNetworks(providers.map((p) => p.network)); - if (ready) { - super(providers[0].network); + const network = checkNetworks(providerConfigs.map((c) => ((c.provider)).network)); + if (network) { + super(network); } else { // The network won't be known until all child providers know - let ready = Promise.all(providers.map((p) => p.getNetwork())).then((networks) => { - if (!checkNetworks(networks)) { - logger.throwError("getNetwork returned null", Logger.errors.UNKNOWN_ERROR) - } - return networks[0]; + const ready = Promise.all(providerConfigs.map((c) => c.provider.getNetwork())).then((networks) => { + return checkNetworks(networks); }); super(ready); } // Preserve a copy, so we do not get mutated - defineReadOnly(this, "providers", Object.freeze(providers.slice())); + defineReadOnly(this, "providerConfigs", Object.freeze(providerConfigs)); defineReadOnly(this, "quorum", quorum); - defineReadOnly(this, "weights", Object.freeze(weights.slice())); + + this._highestBlockNumber = -1; } - static doPerform(provider: BaseProvider, method: string, params: { [ name: string ]: any }): Promise { - switch (method) { - case "getBlockNumber": - case "getGasPrice": - case "getEtherPrice": - return provider[method](); - case "getBalance": - case "getTransactionCount": - case "getCode": - return provider[method](params.address, params.blockTag || "latest"); - case "getStorageAt": - return provider.getStorageAt(params.address, params.position, params.blockTag || "latest"); - case "sendTransaction": - return provider.sendTransaction(params.signedTransaction).then((result) => { - return result.hash; - }); - case "getBlock": - return provider[(params.includeTransactions ? "getBlockWithTransactions": "getBlock")](params.blockTag || params.blockHash); - case "call": - case "estimateGas": - return provider[method](params.transaction); - case "getTransaction": - case "getTransactionReceipt": - return provider[method](params.transactionHash); - case "getLogs": - return provider.getLogs(params.filter); - } - return logger.throwError("unknown method error", Logger.errors.UNKNOWN_ERROR, { - method: method, - params: params - }); - } + async perform(method: string, params: { [name: string]: any }): Promise { - perform(method: string, params: { [name: string]: any }): any { - let T0 = now(); - let runners: Array = (>(shuffled(this.providers))).map((provider, i) => { - let weight = this.weights[i]; - let rid = nextRid++; - return { - run: () => { - let t0 = now(); - let start = t0 - T0; - this.emit("debug", { - action: "request", - rid: rid, - backend: { weight, start, provider }, - request: { method: method, params: deepCopy(params) }, - provider: this - }); - return FallbackProvider.doPerform(provider, method, params).then((result) => { - let duration = now() - t0; - this.emit("debug", { - action: "response", - rid: rid, - backend: { weight, start, duration, provider }, - request: { method: method, params: deepCopy(params) }, - response: deepCopy(result) - }); - return { weight: weight, result: result }; - }, (error) => { - let duration = now() - t0; - this.emit("debug", { - action: "response", - rid: rid, - backend: { weight, start, duration, provider }, - request: { method: method, params: deepCopy(params) }, - error: error - }); - return { weight: weight, error: error }; - }); - }, - weight: weight - } - }); - - // Broadcast transactions to all backends, any that succeed is good enough + // Sending transactions is special; always broadcast it to all backends if (method === "sendTransaction") { - return Promise.all(runners.map((r) => r.run())).then((results) => { + return Promise.all(this.providerConfigs.map((c) => { + return c.provider.sendTransaction(params.signedTransaction).then((result) => { + return result.hash; + }, (error) => { + return error; + }); + })).then((results) => { + // Any success is good enough (other errors are likely "already seen" errors for (let i = 0; i < results.length; i++) { - let result = results[i]; - if (result.result) { return result.result; } + const result = results[i]; + if (typeof(result) === "string") { return result; } } + + // They were all an error; pick the first error return Promise.reject(results[0].error); }); } - // Otherwise query backends (randomly) until we have a quorum agreement - // on the correct result - return new Promise((resolve, reject) => { - let firstError: Error = null; + const processFunc = getProcessFunc(this, method, params); - // How much weight is inflight - let inflightWeight = 0; + // Shuffle the providers and then sort them by their priority; we + // shallowCopy them since we will store the result in them too + const configs: Array = shuffled(this.providerConfigs.map((c) => shallowCopy(c))); + configs.sort((a, b) => (a.priority - b.priority)); - // All results, indexed by the serialized response. - let results: { [ unique: string ]: Array } = { }; + let i = 0; + while (true) { + const t0 = now(); - let next = () => { - if (runners.length === 0) { return; } + // Get a list of running + //const running = configs.filter((c) => (c.runner && !c.done)); - let runner = runners.shift(); - inflightWeight += runner.weight; + // Compute the inflight weight (exclude anything past) + let inflightWeight = configs.filter((c) => (c.runner && ((t0 - c.start) < c.stallTimeout))) + .reduce((accum, c) => (accum + c.weight), 0); - runner.run().then((result) => { - if (results === null) { return; } - inflightWeight -= runner.weight; + // Start running enough to meet quorum + while (inflightWeight < this.quorum && i < configs.length) { + const config = configs[i++]; - if (result.error) { - if (firstError == null) { firstError = result.error; } + const rid = nextRid++; - } else { - let unique = serialize(result.result); - if (results[unique] == null) { results[unique] = []; } - results[unique].push(result); + config.start = now(); + config.staller = stall(config.stallTimeout).then(() => { config.staller = null; }); - // Do any results meet our quroum? - for (let u in results) { - let weight = results[u].reduce((accum, r) => (accum + r.weight), 0); - if (weight >= this.quorum) { - let result = results[u][0].result; - this.emit("debug", "quorum", -1, { weight, result }) - resolve(result); - results = null; - return; - } - } + config.runner = getRunner(config.provider, method, params).then((result) => { + config.done = true; + config.result = result; + + if (this.listenerCount("debug")) { + this.emit("debug", { + action: "request", + rid: rid, + backend: exposeDebugConfig(config, now()), + request: { method: method, params: deepCopy(params) }, + provider: this + }); + } + + }, (error) => { + config.done = true; + config.error = error; + + if (this.listenerCount("debug")) { + this.emit("debug", { + action: "request", + rid: rid, + backend: exposeDebugConfig(config, now()), + request: { method: method, params: deepCopy(params) }, + provider: this + }); } - - // Out of options; give up - if (runners.length === 0 && inflightWeight === 0) { - - // @TODO: this might need some more thinking... Maybe only if half - // of the results contain non-error? - if (method === "getGasPrice") { - const values: Array = [ ]; - Object.keys(results).forEach((key) => { - results[key].forEach((result) => { - if (!result.result) { return; } - values.push(result.result); - }); - }); - values.sort((a, b) => { - if (a.lt(b)) { return -1; } - if (a.gt(b)) { return 1; } - return 0; - }); - let index = parseInt(String(values.length / 2)); - if (values.length % 2) { - resolve(values[index]); - return; - } - resolve(values[index - 1].add(values[index]).div(2)); - return; - } - - if (firstError === null) { - firstError = logger.makeError("failed to meet quorum", Logger.errors.SERVER_ERROR, { - results: Object.keys(results).map((u) => { - return { - method: method, - params: params, - result: u, - weight: results[u].reduce((accum, r) => (accum + r.weight), 0) - }; - }) - }); - } - reject(firstError); - return; - } - - // Queue up the next round - setTimeout(next, 0); }); - // Fire off requests until we could possibly meet quorum - if (inflightWeight < this.quorum) { - setTimeout(next, 0); - return; + //running.push(config); + + if (this.listenerCount("debug")) { + this.emit("debug", { + action: "request", + rid: rid, + backend: exposeDebugConfig(config, null), + request: { method: method, params: deepCopy(params) }, + provider: this + }); } + + inflightWeight += config.weight; } - // bootstrap firing requests - next(); + // Wait for anything meaningful to finish or stall out + const waiting: Array> = [ ]; + configs.forEach((c) => { + if (c.done || !c.runner) { return; } + waiting.push(c.runner); + if (c.staller) { waiting.push(c.staller); } + }); + + if (waiting.length) { await Promise.race(waiting); } + + // Check the quorum and process the results; the process function + // may additionally decide the quorum is not met + const results = configs.filter((c) => (c.done && c.error == null)); + if (results.length >= this.quorum) { + const result = processFunc(results); + if (result != undefined) { return result; } + } + + // All configs have run to completion; we will never get more data + if (configs.filter((c) => !c.done).length === 0) { break; } + } + + return logger.throwError("failed to meet quorum", Logger.errors.SERVER_ERROR, { + method: method, + params: params, + results: configs.map((c) => exposeDebugConfig(c)), + //errors: configs.map((c) => c.error), + provider: this }); } }