Stall FallbackProvider backends from requests if not in-sync.
This commit is contained in:
parent
3f37d15b88
commit
fa6904fef3
@ -1,17 +1,19 @@
|
||||
"use strict";
|
||||
|
||||
import { Network } from "@ethersproject/networks";
|
||||
import { Block, BlockWithTransactions, Provider } from "@ethersproject/abstract-provider";
|
||||
import { shuffled } from "@ethersproject/random";
|
||||
import { deepCopy, defineReadOnly, shallowCopy } from "@ethersproject/properties";
|
||||
import { BigNumber } from "@ethersproject/bignumber";
|
||||
import { isHexString } from "@ethersproject/bytes";
|
||||
import { Network } from "@ethersproject/networks";
|
||||
import { deepCopy, defineReadOnly, shallowCopy } from "@ethersproject/properties";
|
||||
import { shuffled } from "@ethersproject/random";
|
||||
import { poll } from "@ethersproject/web";
|
||||
|
||||
import { BaseProvider } from "./base-provider";
|
||||
|
||||
import { Logger } from "@ethersproject/logger";
|
||||
import { version } from "./_version";
|
||||
const logger = new Logger(version);
|
||||
|
||||
import { BaseProvider } from "./base-provider";
|
||||
|
||||
function now() { return (new Date()).getTime(); }
|
||||
|
||||
// Returns to network as long as all agree, or null if any is null.
|
||||
@ -298,7 +300,22 @@ function getProcessFunc(provider: FallbackProvider, method: string, params: { [
|
||||
|
||||
}
|
||||
|
||||
function getRunner(provider: Provider, method: string, params: { [ key: string]: any }): Promise<any> {
|
||||
// If we are doing a blockTag query, we need to make sure the backend is
|
||||
// caught up to the FallbackProvider, before sending a request to it.
|
||||
async function waitForSync(provider: BaseProvider, blockNumber: number): Promise<BaseProvider> {
|
||||
if ((provider.blockNumber != null && provider.blockNumber >= blockNumber) || blockNumber === -1) {
|
||||
return provider;
|
||||
}
|
||||
|
||||
return poll(() => {
|
||||
return provider.getBlockNumber().then((b) => {
|
||||
if (b >= blockNumber) { return Provider; }
|
||||
return undefined;
|
||||
});
|
||||
}, { onceBlock: provider });
|
||||
}
|
||||
|
||||
async function getRunner(provider: BaseProvider, currentBlockNumber: number, method: string, params: { [ key: string]: any }): Promise<any> {
|
||||
switch (method) {
|
||||
case "getBlockNumber":
|
||||
case "getGasPrice":
|
||||
@ -311,19 +328,36 @@ function getRunner(provider: Provider, method: string, params: { [ key: string]:
|
||||
case "getBalance":
|
||||
case "getTransactionCount":
|
||||
case "getCode":
|
||||
if (params.blockTag && isHexString(params.blockTag)) {
|
||||
provider = await waitForSync(provider, currentBlockNumber)
|
||||
}
|
||||
return provider[method](params.address, params.blockTag || "latest");
|
||||
case "getStorageAt":
|
||||
if (params.blockTag && isHexString(params.blockTag)) {
|
||||
provider = await waitForSync(provider, currentBlockNumber)
|
||||
}
|
||||
return provider.getStorageAt(params.address, params.position, params.blockTag || "latest");
|
||||
case "getBlock":
|
||||
if (params.blockTag && isHexString(params.blockTag)) {
|
||||
provider = await waitForSync(provider, currentBlockNumber)
|
||||
}
|
||||
return provider[(params.includeTransactions ? "getBlockWithTransactions": "getBlock")](params.blockTag || params.blockHash);
|
||||
case "call":
|
||||
case "estimateGas":
|
||||
if (params.blockTag && isHexString(params.blockTag)) {
|
||||
provider = await waitForSync(provider, currentBlockNumber)
|
||||
}
|
||||
return provider[method](params.transaction);
|
||||
case "getTransaction":
|
||||
case "getTransactionReceipt":
|
||||
return provider[method](params.transactionHash);
|
||||
case "getLogs":
|
||||
return provider.getLogs(params.filter);
|
||||
case "getLogs": {
|
||||
let filter = params.filter;
|
||||
if ((filter.fromBlock && isHexString(filter.fromBlock)) || (filter.toBlock && isHexString(filter.toBlock))) {
|
||||
provider = await waitForSync(provider, currentBlockNumber)
|
||||
}
|
||||
return provider.getLogs(filter);
|
||||
}
|
||||
}
|
||||
|
||||
return logger.throwError("unknown method error", Logger.errors.UNKNOWN_ERROR, {
|
||||
@ -336,7 +370,7 @@ export class FallbackProvider extends BaseProvider {
|
||||
readonly providerConfigs: ReadonlyArray<FallbackProviderConfig>;
|
||||
readonly quorum: number;
|
||||
|
||||
// Due to teh highly asyncronous nature of the blockchain, we need
|
||||
// Due to the highly asyncronous nature of the blockchain, we need
|
||||
// to make sure we never unroll the blockNumber due to our random
|
||||
// sample of backends
|
||||
_highestBlockNumber: number;
|
||||
@ -396,7 +430,6 @@ export class FallbackProvider extends BaseProvider {
|
||||
}
|
||||
|
||||
async perform(method: string, params: { [name: string]: any }): Promise<any> {
|
||||
|
||||
// Sending transactions is special; always broadcast it to all backends
|
||||
if (method === "sendTransaction") {
|
||||
const results: Array<string | Error> = await Promise.all(this.providerConfigs.map((c) => {
|
||||
@ -417,6 +450,12 @@ export class FallbackProvider extends BaseProvider {
|
||||
throw results[0];
|
||||
}
|
||||
|
||||
// We need to make sure we are in sync with our backends, so we need
|
||||
// to know this before we can make a lot of calls
|
||||
if (this._highestBlockNumber === -1 && method !== "getBlockNumber") {
|
||||
await this.getBlockNumber();
|
||||
}
|
||||
|
||||
const processFunc = getProcessFunc(this, method, params);
|
||||
|
||||
// Shuffle the providers and then sort them by their priority; we
|
||||
@ -424,14 +463,13 @@ export class FallbackProvider extends BaseProvider {
|
||||
const configs: Array<RunningConfig> = shuffled(this.providerConfigs.map(shallowCopy));
|
||||
configs.sort((a, b) => (a.priority - b.priority));
|
||||
|
||||
const currentBlockNumber = this._highestBlockNumber;
|
||||
|
||||
let i = 0;
|
||||
let first = true;
|
||||
while (true) {
|
||||
const t0 = now();
|
||||
|
||||
// Get a list of running
|
||||
//const running = configs.filter((c) => (c.runner && !c.done));
|
||||
|
||||
// Compute the inflight weight (exclude anything past)
|
||||
let inflightWeight = configs.filter((c) => (c.runner && ((t0 - c.start) < c.stallTimeout)))
|
||||
.reduce((accum, c) => (accum + c.weight), 0);
|
||||
@ -446,7 +484,7 @@ export class FallbackProvider extends BaseProvider {
|
||||
config.staller = stall(config.stallTimeout);
|
||||
config.staller.wait(() => { config.staller = null; });
|
||||
|
||||
config.runner = getRunner(config.provider, method, params).then((result) => {
|
||||
config.runner = getRunner(<BaseProvider>(config.provider), currentBlockNumber, method, params).then((result) => {
|
||||
config.done = true;
|
||||
config.result = result;
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user