Manage FallbackProvider stalling without unref (#815).

This commit is contained in:
Richard Moore 2020-05-03 16:15:19 -04:00
parent 20a3d9b98d
commit 7b1a7c7f31
No known key found for this signature in database
GPG Key ID: 665176BE8E9DC651
2 changed files with 60 additions and 26 deletions

@ -100,7 +100,7 @@ export interface FallbackProviderConfig {
// Timeout before also triggering the next provider; this does not stop
// this provider and if its result comes back before a quorum is reached
// it will be used it will be used.
// it will be incorporated into the vote
// - lower values will cause more network traffic but may result in a
// faster retult.
stallTimeout?: number;
@ -111,19 +111,46 @@ export interface FallbackProviderConfig {
weight?: number;
};
// Returns a promise that delays for duration
function stall(duration: number): Promise<void> {
return new Promise((resolve) => {
const timer = setTimeout(resolve, duration);
if (timer.unref) { timer.unref(); }
});
// A Staller is used to provide a delay to give a Provider a chance to response
// before asking the next Provider to try.
type Staller = {
wait: (func: () => void) => Promise<void>
getPromise: () => Promise<void>,
cancel: () => void
};
function stall(duration: number): Staller {
let cancel: () => void = null;
let timer: NodeJS.Timer = null;
let promise = <Promise<void>>(new Promise((resolve) => {
cancel = function() {
if (timer) {
clearTimeout(timer);
timer = null;
}
resolve();
}
timer = setTimeout(cancel, duration);
}));
const wait = (func: () => void) => {
promise = promise.then(func);
return promise;
}
function getPromise(): Promise<void> {
return promise;
}
return { cancel, getPromise, wait };
}
interface RunningConfig extends FallbackProviderConfig {
start?: number;
done?: boolean;
runner?: Promise<any>;
staller?: Promise<void>;
staller?: Staller;
result?: any;
error?: Error;
};
@ -373,29 +400,29 @@ export class FallbackProvider extends BaseProvider {
// Sending transactions is special; always broadcast it to all backends
if (method === "sendTransaction") {
return Promise.all(this.providerConfigs.map((c) => {
const results: Array<string | Error> = await Promise.all(this.providerConfigs.map((c) => {
return c.provider.sendTransaction(params.signedTransaction).then((result) => {
return result.hash;
}, (error) => {
return error;
});
})).then((results) => {
// Any success is good enough (other errors are likely "already seen" errors
for (let i = 0; i < results.length; i++) {
const result = results[i];
if (typeof(result) === "string") { return result; }
}
}));
// They were all an error; pick the first error
return Promise.reject(results[0]);
});
// Any success is good enough (other errors are likely "already seen" errors
for (let i = 0; i < results.length; i++) {
const result = results[i];
if (typeof(result) === "string") { return result; }
}
// They were all an error; pick the first error
throw results[0];
}
const processFunc = getProcessFunc(this, method, params);
// Shuffle the providers and then sort them by their priority; we
// shallowCopy them since we will store the result in them too
const configs: Array<RunningConfig> = shuffled(this.providerConfigs.map((c) => shallowCopy(c)));
const configs: Array<RunningConfig> = shuffled(this.providerConfigs.map(shallowCopy));
configs.sort((a, b) => (a.priority - b.priority));
let i = 0;
@ -417,7 +444,8 @@ export class FallbackProvider extends BaseProvider {
const rid = nextRid++;
config.start = now();
config.staller = stall(config.stallTimeout).then(() => { config.staller = null; });
config.staller = stall(config.stallTimeout);
config.staller.wait(() => { config.staller = null; });
config.runner = getRunner(config.provider, method, params).then((result) => {
config.done = true;
@ -448,8 +476,6 @@ export class FallbackProvider extends BaseProvider {
}
});
//running.push(config);
if (this.listenerCount("debug")) {
this.emit("debug", {
action: "request",
@ -468,7 +494,7 @@ export class FallbackProvider extends BaseProvider {
configs.forEach((c) => {
if (c.done || !c.runner) { return; }
waiting.push(c.runner);
if (c.staller) { waiting.push(c.staller); }
if (c.staller) { waiting.push(c.staller.getPromise()); }
});
if (waiting.length) { await Promise.race(waiting); }
@ -478,8 +504,12 @@ export class FallbackProvider extends BaseProvider {
const results = configs.filter((c) => (c.done && c.error == null));
if (results.length >= this.quorum) {
const result = processFunc(results);
if (result !== undefined) { return result; }
if (!first) { await stall(100); }
if (result !== undefined) {
// Shut down any stallers
configs.filter(c => c.staller).forEach(c => c.staller.cancel());
return result;
}
if (!first) { await stall(100).getPromise(); }
first = false;
}
@ -487,6 +517,9 @@ export class FallbackProvider extends BaseProvider {
if (configs.filter((c) => !c.done).length === 0) { break; }
}
// Shut down any stallers; shouldn't be any
configs.filter(c => c.staller).forEach(c => c.staller.cancel());
return logger.throwError("failed to meet quorum", Logger.errors.SERVER_ERROR, {
method: method,
params: params,

@ -6,7 +6,8 @@ import { ethers } from "ethers";
import contractData from "./test-contract.json";
const provider = new ethers.providers.InfuraProvider('rinkeby');
//const provider = new ethers.providers.InfuraProvider('rinkeby');
const provider = ethers.getDefaultProvider("rinkeby");
const TIMEOUT_PERIOD = 120000;