Fix Subscriber model when removed within emit callback.

This commit is contained in:
Richard Moore 2023-02-04 03:12:04 -05:00
parent 32b1e7827a
commit d0ed91840c
5 changed files with 43 additions and 59 deletions

@ -161,6 +161,8 @@ type Sub = {
nameMap: Map<string, string> nameMap: Map<string, string>
addressableMap: WeakMap<Addressable, string>; addressableMap: WeakMap<Addressable, string>;
listeners: Array<{ listener: Listener, once: boolean }>; listeners: Array<{ listener: Listener, once: boolean }>;
// @TODO: get rid of this, as it is (and has to be)
// tracked in subscriber
started: boolean; started: boolean;
subscriber: Subscriber; subscriber: Subscriber;
}; };

@ -27,15 +27,21 @@ export class BlockConnectionSubscriber implements Subscriber {
#provider: ConnectionRpcProvider; #provider: ConnectionRpcProvider;
#blockNumber: number; #blockNumber: number;
#running: boolean;
#filterId: null | number; #filterId: null | number;
constructor(provider: ConnectionRpcProvider) { constructor(provider: ConnectionRpcProvider) {
this.#provider = provider; this.#provider = provider;
this.#blockNumber = -2; this.#blockNumber = -2;
this.#running = false;
this.#filterId = null; this.#filterId = null;
} }
start(): void { start(): void {
if (this.#running) { return; }
this.#running = true;
this.#filterId = this.#provider._subscribe([ "newHeads" ], (result: any) => { this.#filterId = this.#provider._subscribe([ "newHeads" ], (result: any) => {
const blockNumber = getNumber(result.number); const blockNumber = getNumber(result.number);
const initial = (this.#blockNumber === -2) ? blockNumber: (this.#blockNumber + 1) const initial = (this.#blockNumber === -2) ? blockNumber: (this.#blockNumber + 1)
@ -47,6 +53,9 @@ export class BlockConnectionSubscriber implements Subscriber {
} }
stop(): void { stop(): void {
if (!this.#running) { return; }
this.#running = false;
if (this.#filterId != null) { if (this.#filterId != null) {
this.#provider._unsubscribe(this.#filterId); this.#provider._unsubscribe(this.#filterId);
this.#filterId = null; this.#filterId = null;

@ -26,6 +26,8 @@ export class FilterIdSubscriber implements Subscriber {
#filterIdPromise: null | Promise<string>; #filterIdPromise: null | Promise<string>;
#poller: (b: number) => Promise<void>; #poller: (b: number) => Promise<void>;
#running: boolean;
#network: null | Network; #network: null | Network;
#hault: boolean; #hault: boolean;
@ -36,6 +38,8 @@ export class FilterIdSubscriber implements Subscriber {
this.#filterIdPromise = null; this.#filterIdPromise = null;
this.#poller = this.#poll.bind(this); this.#poller = this.#poll.bind(this);
this.#running = false;
this.#network = null; this.#network = null;
this.#hault = false; this.#hault = false;
@ -91,9 +95,17 @@ export class FilterIdSubscriber implements Subscriber {
} }
} }
start(): void { this.#poll(-2); } start(): void {
if (this.#running) { return; }
this.#running = true;
this.#poll(-2);
}
stop(): void { stop(): void {
if (!this.#running) { return; }
this.#running = false;
this.#hault = true; this.#hault = true;
this.#teardown(); this.#teardown();
this.#provider.off("block", this.#poller); this.#provider.off("block", this.#poller);

@ -76,13 +76,13 @@ export class PollingBlockSubscriber implements Subscriber {
} }
start(): void { start(): void {
if (this.#poller) { throw new Error("subscriber already running"); } if (this.#poller) { return; }
this.#poller = this.#provider._setTimeout(this.#poll.bind(this), this.#interval); this.#poller = this.#provider._setTimeout(this.#poll.bind(this), this.#interval);
this.#poll(); this.#poll();
} }
stop(): void { stop(): void {
if (!this.#poller) { throw new Error("subscriber not running"); } if (!this.#poller) { return; }
this.#provider._clearTimeout(this.#poller); this.#provider._clearTimeout(this.#poller);
this.#poller = null; this.#poller = null;
} }
@ -105,9 +105,11 @@ export class PollingBlockSubscriber implements Subscriber {
export class OnBlockSubscriber implements Subscriber { export class OnBlockSubscriber implements Subscriber {
#provider: AbstractProvider; #provider: AbstractProvider;
#poll: (b: number) => void; #poll: (b: number) => void;
#running: boolean;
constructor(provider: AbstractProvider) { constructor(provider: AbstractProvider) {
this.#provider = provider; this.#provider = provider;
this.#running = false;
this.#poll = (blockNumber: number) => { this.#poll = (blockNumber: number) => {
this._poll(blockNumber, this.#provider); this._poll(blockNumber, this.#provider);
} }
@ -118,11 +120,17 @@ export class OnBlockSubscriber implements Subscriber {
} }
start(): void { start(): void {
if (this.#running) { return; }
this.#running = true;
this.#poll(-2); this.#poll(-2);
this.#provider.on("block", this.#poll); this.#provider.on("block", this.#poll);
} }
stop(): void { stop(): void {
if (!this.#running) { return; }
this.#running = false;
this.#provider.off("block", this.#poll); this.#provider.off("block", this.#poll);
} }
@ -178,6 +186,8 @@ export class PollingEventSubscriber implements Subscriber {
#filter: EventFilter; #filter: EventFilter;
#poller: (b: number) => void; #poller: (b: number) => void;
#running: boolean;
// The most recent block we have scanned for events. The value -2 // The most recent block we have scanned for events. The value -2
// indicates we still need to fetch an initial block number // indicates we still need to fetch an initial block number
#blockNumber: number; #blockNumber: number;
@ -186,6 +196,7 @@ export class PollingEventSubscriber implements Subscriber {
this.#provider = provider; this.#provider = provider;
this.#filter = copy(filter); this.#filter = copy(filter);
this.#poller = this.#poll.bind(this); this.#poller = this.#poll.bind(this);
this.#running = false;
this.#blockNumber = -2; this.#blockNumber = -2;
} }
@ -215,6 +226,9 @@ export class PollingEventSubscriber implements Subscriber {
} }
start(): void { start(): void {
if (this.#running) { return; }
this.#running = true;
if (this.#blockNumber === -2) { if (this.#blockNumber === -2) {
this.#provider.getBlockNumber().then((blockNumber) => { this.#provider.getBlockNumber().then((blockNumber) => {
this.#blockNumber = blockNumber; this.#blockNumber = blockNumber;
@ -224,6 +238,9 @@ export class PollingEventSubscriber implements Subscriber {
} }
stop(): void { stop(): void {
if (!this.#running) { return; }
this.#running = false;
this.#provider.off("block", this.#poller); this.#provider.off("block", this.#poller);
} }

@ -1,56 +0,0 @@
/*
import { defineProperties } from "@ethersproject/properties";
export type EventCommon = "block" | "debug" | "blockObject";
export type Event = EventCommon | string | { address?: string, topics: Array<string | Array<string>> }
export type EventLike = Event | Array<string>;
export function getTag(eventName: Event): string {
if (typeof(eventName) === "string") { return eventName; }
if (typeof(eventName) === "object") {
return (eventName.address || "*") + (eventName.topics || []).map((topic) => {
if (typeof(topic) === "string") { return topic; }
return topic.join("|");
}).join("&");
}
throw new Error("FOO");
}
export function getEvent(tag: string): Event {
}
let nextId = 1;
export class Subscriber {
readonly id!: number;
readonly tag!: string;
#paused: boolean;
#blockNumber: number;
constructor(tag: string) {
this.#paused = false;
this.#blockNumber = -1;
defineProperties<Subscriber>(this, { id: nextId++, tag });
}
get blockNumber(): number {
return this.#blockNumber;
}
_setBlockNumber(blockNumber: number): void { this.#blockNumber = blockNumber; }
setup(): void { }
teardown(): void { }
isPaused(): boolean { return this.#paused; }
pause(): void { this.#paused = true; }
resume(): void { this.#paused = false; }
resubscribeInfo(): string { return this.tag; }
resubscribe(info: string): boolean { return true; }
}
*/