More robust FallbackProvider on clean exits.

This commit is contained in:
Richard Moore 2020-05-04 22:48:21 -04:00
parent 657a0394f5
commit 8eeda23e98
No known key found for this signature in database
GPG Key ID: 665176BE8E9DC651

@ -148,9 +148,11 @@ function stall(duration: number): Staller {
return { cancel, getPromise, wait }; return { cancel, getPromise, wait };
} }
// @TODO: Make this an object with staller and cancel built-in
interface RunningConfig extends FallbackProviderConfig { interface RunningConfig extends FallbackProviderConfig {
start?: number; start?: number;
done?: boolean; done?: boolean;
cancelled?: boolean;
runner?: Promise<any>; runner?: Promise<any>;
staller?: Staller; staller?: Staller;
result?: any; result?: any;
@ -302,20 +304,33 @@ function getProcessFunc(provider: FallbackProvider, method: string, params: { [
// If we are doing a blockTag query, we need to make sure the backend is // If we are doing a blockTag query, we need to make sure the backend is
// caught up to the FallbackProvider, before sending a request to it. // caught up to the FallbackProvider, before sending a request to it.
async function waitForSync(provider: BaseProvider, blockNumber: number): Promise<BaseProvider> { async function waitForSync(config: RunningConfig, blockNumber: number): Promise<BaseProvider> {
const provider = <BaseProvider>(config.provider);
if ((provider.blockNumber != null && provider.blockNumber >= blockNumber) || blockNumber === -1) { if ((provider.blockNumber != null && provider.blockNumber >= blockNumber) || blockNumber === -1) {
return provider; return provider;
} }
return poll(() => { return poll(() => {
return provider.getBlockNumber().then((b) => { return new Promise((resolve, reject) => {
if (b >= blockNumber) { return Provider; } setTimeout(function() {
return undefined;
// We are synced
if (provider.blockNumber >= blockNumber) { return resolve(Provider); }
// We're done; just quit
if (config.cancelled) { return resolve(null); }
// Try again, next block
return resolve(undefined);
}, 0);
}); });
}, { onceBlock: provider }); }, { oncePoll: provider });
} }
async function getRunner(provider: BaseProvider, currentBlockNumber: number, method: string, params: { [ key: string]: any }): Promise<any> { async function getRunner(config: RunningConfig, currentBlockNumber: number, method: string, params: { [ key: string]: any }): Promise<any> {
let provider = config.provider;
switch (method) { switch (method) {
case "getBlockNumber": case "getBlockNumber":
case "getGasPrice": case "getGasPrice":
@ -329,23 +344,23 @@ async function getRunner(provider: BaseProvider, currentBlockNumber: number, met
case "getTransactionCount": case "getTransactionCount":
case "getCode": case "getCode":
if (params.blockTag && isHexString(params.blockTag)) { if (params.blockTag && isHexString(params.blockTag)) {
provider = await waitForSync(provider, currentBlockNumber) provider = await waitForSync(config, currentBlockNumber)
} }
return provider[method](params.address, params.blockTag || "latest"); return provider[method](params.address, params.blockTag || "latest");
case "getStorageAt": case "getStorageAt":
if (params.blockTag && isHexString(params.blockTag)) { if (params.blockTag && isHexString(params.blockTag)) {
provider = await waitForSync(provider, currentBlockNumber) provider = await waitForSync(config, currentBlockNumber)
} }
return provider.getStorageAt(params.address, params.position, params.blockTag || "latest"); return provider.getStorageAt(params.address, params.position, params.blockTag || "latest");
case "getBlock": case "getBlock":
if (params.blockTag && isHexString(params.blockTag)) { if (params.blockTag && isHexString(params.blockTag)) {
provider = await waitForSync(provider, currentBlockNumber) provider = await waitForSync(config, currentBlockNumber)
} }
return provider[(params.includeTransactions ? "getBlockWithTransactions": "getBlock")](params.blockTag || params.blockHash); return provider[(params.includeTransactions ? "getBlockWithTransactions": "getBlock")](params.blockTag || params.blockHash);
case "call": case "call":
case "estimateGas": case "estimateGas":
if (params.blockTag && isHexString(params.blockTag)) { if (params.blockTag && isHexString(params.blockTag)) {
provider = await waitForSync(provider, currentBlockNumber) provider = await waitForSync(config, currentBlockNumber)
} }
return provider[method](params.transaction); return provider[method](params.transaction);
case "getTransaction": case "getTransaction":
@ -354,7 +369,7 @@ async function getRunner(provider: BaseProvider, currentBlockNumber: number, met
case "getLogs": { case "getLogs": {
let filter = params.filter; let filter = params.filter;
if ((filter.fromBlock && isHexString(filter.fromBlock)) || (filter.toBlock && isHexString(filter.toBlock))) { if ((filter.fromBlock && isHexString(filter.fromBlock)) || (filter.toBlock && isHexString(filter.toBlock))) {
provider = await waitForSync(provider, currentBlockNumber) provider = await waitForSync(config, currentBlockNumber)
} }
return provider.getLogs(filter); return provider.getLogs(filter);
} }
@ -484,7 +499,7 @@ export class FallbackProvider extends BaseProvider {
config.staller = stall(config.stallTimeout); config.staller = stall(config.stallTimeout);
config.staller.wait(() => { config.staller = null; }); config.staller.wait(() => { config.staller = null; });
config.runner = getRunner(<BaseProvider>(config.provider), currentBlockNumber, method, params).then((result) => { config.runner = getRunner(config, currentBlockNumber, method, params).then((result) => {
config.done = true; config.done = true;
config.result = result; config.result = result;
@ -543,7 +558,10 @@ export class FallbackProvider extends BaseProvider {
const result = processFunc(results); const result = processFunc(results);
if (result !== undefined) { if (result !== undefined) {
// Shut down any stallers // Shut down any stallers
configs.filter(c => c.staller).forEach(c => c.staller.cancel()); configs.forEach(c => {
if (c.staller) { c.staller.cancel(); }
c.cancelled = true;
});
return result; return result;
} }
if (!first) { await stall(100).getPromise(); } if (!first) { await stall(100).getPromise(); }
@ -555,7 +573,10 @@ export class FallbackProvider extends BaseProvider {
} }
// Shut down any stallers; shouldn't be any // Shut down any stallers; shouldn't be any
configs.filter(c => c.staller).forEach(c => c.staller.cancel()); configs.forEach(c => {
if (c.staller) { c.staller.cancel(); }
c.cancelled = true;
});
return logger.throwError("failed to meet quorum", Logger.errors.SERVER_ERROR, { return logger.throwError("failed to meet quorum", Logger.errors.SERVER_ERROR, {
method: method, method: method,