2022-09-05 23:57:11 +03:00
"use strict" ;
/ * *
* Generic long - lived socket provider .
*
* Sub - classing notes
* - a sub - class MUST call the ` _start() ` method once connected
* - a sub - class MUST override the ` _write(string) ` method
* - a sub - class MUST call ` _processMessage(string) ` for each message
2022-11-30 23:44:23 +03:00
*
* @ _subsection : api / providers / abstract - provider
2022-09-05 23:57:11 +03:00
* /
Object . defineProperty ( exports , "__esModule" , { value : true } ) ;
exports . SocketProvider = exports . SocketEventSubscriber = exports . SocketPendingSubscriber = exports . SocketBlockSubscriber = exports . SocketSubscriber = void 0 ;
const abstract _provider _js _1 = require ( "./abstract-provider.js" ) ;
2022-09-16 05:58:45 +03:00
const index _js _1 = require ( "../utils/index.js" ) ;
2022-09-05 23:57:11 +03:00
const provider _jsonrpc _js _1 = require ( "./provider-jsonrpc.js" ) ;
2023-06-02 00:52:58 +03:00
/ * *
* A * * SocketSubscriber * * uses a socket transport to handle events and
* should use [ [ _emit ] ] to manage the events .
* /
2022-09-05 23:57:11 +03:00
class SocketSubscriber {
# provider ;
# filter ;
2023-06-02 00:52:58 +03:00
/ * *
* The filter .
* /
2022-09-05 23:57:11 +03:00
get filter ( ) { return JSON . parse ( this . # filter ) ; }
# filterId ;
# paused ;
# emitPromise ;
2023-06-02 00:52:58 +03:00
/ * *
* Creates a new * * SocketSubscriber * * attached to % % provider % % listening
* to % % filter % % .
* /
2022-09-05 23:57:11 +03:00
constructor ( provider , filter ) {
this . # provider = provider ;
this . # filter = JSON . stringify ( filter ) ;
this . # filterId = null ;
this . # paused = null ;
this . # emitPromise = null ;
}
start ( ) {
this . # filterId = this . # provider . send ( "eth_subscribe" , this . filter ) . then ( ( filterId ) => {
;
this . # provider . _register ( filterId , this ) ;
return filterId ;
} ) ;
}
stop ( ) {
( this . # filterId ) . then ( ( filterId ) => {
this . # provider . send ( "eth_unsubscribe" , [ filterId ] ) ;
} ) ;
this . # filterId = null ;
}
// @TODO: pause should trap the current blockNumber, unsub, and on resume use getLogs
// and resume
pause ( dropWhilePaused ) {
2022-11-09 10:57:02 +03:00
( 0 , index _js _1 . assert ) ( dropWhilePaused , "preserve logs while paused not supported by SocketSubscriber yet" , "UNSUPPORTED_OPERATION" , { operation : "pause(false)" } ) ;
2022-09-05 23:57:11 +03:00
this . # paused = ! ! dropWhilePaused ;
}
resume ( ) {
this . # paused = null ;
}
2023-06-02 00:52:58 +03:00
/ * *
* @ _ignore :
* /
2022-09-05 23:57:11 +03:00
_handleMessage ( message ) {
if ( this . # filterId == null ) {
return ;
}
if ( this . # paused === null ) {
let emitPromise = this . # emitPromise ;
if ( emitPromise == null ) {
emitPromise = this . _emit ( this . # provider , message ) ;
}
else {
emitPromise = emitPromise . then ( async ( ) => {
await this . _emit ( this . # provider , message ) ;
} ) ;
}
this . # emitPromise = emitPromise . then ( ( ) => {
if ( this . # emitPromise === emitPromise ) {
this . # emitPromise = null ;
}
} ) ;
}
}
2023-06-02 00:52:58 +03:00
/ * *
* Sub - classes * * must * * override this to emit the events on the
* provider .
* /
2022-09-05 23:57:11 +03:00
async _emit ( provider , message ) {
throw new Error ( "sub-classes must implemente this; _emit" ) ;
}
}
exports . SocketSubscriber = SocketSubscriber ;
2023-06-02 00:52:58 +03:00
/ * *
* A * * SocketBlockSubscriber * * listens for ` ` newHeads ` ` events and emits
* ` ` "block" ` ` events .
* /
2022-09-05 23:57:11 +03:00
class SocketBlockSubscriber extends SocketSubscriber {
2023-06-02 00:52:58 +03:00
/ * *
* @ _ignore :
* /
2022-09-05 23:57:11 +03:00
constructor ( provider ) {
super ( provider , [ "newHeads" ] ) ;
}
async _emit ( provider , message ) {
provider . emit ( "block" , parseInt ( message . number ) ) ;
}
}
exports . SocketBlockSubscriber = SocketBlockSubscriber ;
2023-06-02 00:52:58 +03:00
/ * *
* A * * SocketPendingSubscriber * * listens for pending transacitons and emits
* ` ` "pending" ` ` events .
* /
2022-09-05 23:57:11 +03:00
class SocketPendingSubscriber extends SocketSubscriber {
2023-06-02 00:52:58 +03:00
/ * *
* @ _ignore :
* /
2022-09-05 23:57:11 +03:00
constructor ( provider ) {
super ( provider , [ "newPendingTransactions" ] ) ;
}
async _emit ( provider , message ) {
provider . emit ( "pending" , message ) ;
}
}
exports . SocketPendingSubscriber = SocketPendingSubscriber ;
2023-06-02 00:52:58 +03:00
/ * *
* A * * SocketEventSubscriber * * listens for event logs .
* /
2022-09-05 23:57:11 +03:00
class SocketEventSubscriber extends SocketSubscriber {
# logFilter ;
2023-06-02 00:52:58 +03:00
/ * *
* The filter .
* /
2022-09-05 23:57:11 +03:00
get logFilter ( ) { return JSON . parse ( this . # logFilter ) ; }
2023-06-02 00:52:58 +03:00
/ * *
* @ _ignore :
* /
2022-09-05 23:57:11 +03:00
constructor ( provider , filter ) {
super ( provider , [ "logs" , filter ] ) ;
this . # logFilter = JSON . stringify ( filter ) ;
}
async _emit ( provider , message ) {
2023-03-28 04:22:35 +03:00
provider . emit ( this . logFilter , provider . _wrapLog ( message , provider . _network ) ) ;
2022-09-05 23:57:11 +03:00
}
}
exports . SocketEventSubscriber = SocketEventSubscriber ;
2022-11-30 23:44:23 +03:00
/ * *
2023-06-02 00:52:58 +03:00
* A * * SocketProvider * * is backed by a long - lived connection over a
* socket , which can subscribe and receive real - time messages over
* its communication channel .
2022-11-30 23:44:23 +03:00
* /
2022-09-05 23:57:11 +03:00
class SocketProvider extends provider _jsonrpc _js _1 . JsonRpcApiProvider {
# callbacks ;
// Maps each filterId to its subscriber
# subs ;
// If any events come in before a subscriber has finished
// registering, queue them
# pending ;
2023-06-02 00:52:58 +03:00
/ * *
* Creates a new * * SocketProvider * * connected to % % network % % .
*
* If unspecified , the network will be discovered .
* /
2022-09-05 23:57:11 +03:00
constructor ( network ) {
super ( network , { batchMaxCount : 1 } ) ;
this . # callbacks = new Map ( ) ;
this . # subs = new Map ( ) ;
this . # pending = new Map ( ) ;
}
2022-09-27 10:45:27 +03:00
// This value is only valid after _start has been called
/ *
get _network ( ) : Network {
if ( this . # network == null ) {
throw new Error ( "this shouldn't happen" ) ;
}
return this . # network . clone ( ) ;
}
* /
2022-09-05 23:57:11 +03:00
_getSubscriber ( sub ) {
switch ( sub . type ) {
case "close" :
return new abstract _provider _js _1 . UnmanagedSubscriber ( "close" ) ;
case "block" :
return new SocketBlockSubscriber ( this ) ;
case "pending" :
return new SocketPendingSubscriber ( this ) ;
case "event" :
return new SocketEventSubscriber ( this , sub . filter ) ;
case "orphan" :
// Handled auto-matically within AbstractProvider
// when the log.removed = true
if ( sub . filter . orphan === "drop-log" ) {
return new abstract _provider _js _1 . UnmanagedSubscriber ( "drop-log" ) ;
}
}
return super . _getSubscriber ( sub ) ;
}
2023-06-02 00:52:58 +03:00
/ * *
* Register a new subscriber . This is used internalled by Subscribers
* and generally is unecessary unless extending capabilities .
* /
2022-09-05 23:57:11 +03:00
_register ( filterId , subscriber ) {
this . # subs . set ( filterId , subscriber ) ;
const pending = this . # pending . get ( filterId ) ;
if ( pending ) {
for ( const message of pending ) {
subscriber . _handleMessage ( message ) ;
}
this . # pending . delete ( filterId ) ;
}
}
async _send ( payload ) {
// WebSocket provider doesn't accept batches
2022-09-16 05:58:45 +03:00
( 0 , index _js _1 . assertArgument ) ( ! Array . isArray ( payload ) , "WebSocket does not support batch send" , "payload" , payload ) ;
2022-09-05 23:57:11 +03:00
// @TODO: stringify payloads here and store to prevent mutations
2022-09-30 05:57:27 +03:00
// Prepare a promise to respond to
2022-09-05 23:57:11 +03:00
const promise = new Promise ( ( resolve , reject ) => {
this . # callbacks . set ( payload . id , { payload , resolve , reject } ) ;
} ) ;
2022-09-30 05:57:27 +03:00
// Wait until the socket is connected before writing to it
await this . _waitUntilReady ( ) ;
// Write the request to the socket
2022-09-27 10:45:27 +03:00
await this . _write ( JSON . stringify ( payload ) ) ;
2022-09-05 23:57:11 +03:00
return [ await promise ] ;
}
// Sub-classes must call this once they are connected
2022-09-27 10:45:27 +03:00
/ *
async _start ( ) : Promise < void > {
if ( this . # ready ) { return ; }
2022-09-05 23:57:11 +03:00
for ( const { payload } of this . # callbacks . values ( ) ) {
await this . _write ( JSON . stringify ( payload ) ) ;
}
2022-09-27 10:45:27 +03:00
this . # ready = ( async function ( ) {
await super . _start ( ) ;
} ) ( ) ;
2022-09-05 23:57:11 +03:00
}
2022-09-27 10:45:27 +03:00
* /
2023-06-02 00:52:58 +03:00
/ * *
* Sub - classes * * must * * call this with messages received over their
* transport to be processed and dispatched .
* /
2022-09-05 23:57:11 +03:00
async _processMessage ( message ) {
const result = ( JSON . parse ( message ) ) ;
2023-05-19 00:26:59 +03:00
if ( result && typeof ( result ) === "object" && "id" in result ) {
2022-09-05 23:57:11 +03:00
const callback = this . # callbacks . get ( result . id ) ;
if ( callback == null ) {
2023-04-25 14:04:48 +03:00
this . emit ( "error" , ( 0 , index _js _1 . makeError ) ( "received result for unknown id" , "UNKNOWN_ERROR" , {
reasonCode : "UNKNOWN_ID" ,
result
} ) ) ;
2022-09-05 23:57:11 +03:00
return ;
}
this . # callbacks . delete ( result . id ) ;
2022-09-30 05:57:27 +03:00
callback . resolve ( result ) ;
2022-09-05 23:57:11 +03:00
}
2023-05-19 00:26:59 +03:00
else if ( result && result . method === "eth_subscription" ) {
2022-09-05 23:57:11 +03:00
const filterId = result . params . subscription ;
const subscriber = this . # subs . get ( filterId ) ;
if ( subscriber ) {
subscriber . _handleMessage ( result . params . result ) ;
}
else {
let pending = this . # pending . get ( filterId ) ;
if ( pending == null ) {
pending = [ ] ;
this . # pending . set ( filterId , pending ) ;
}
pending . push ( result . params . result ) ;
}
}
2023-05-19 00:26:59 +03:00
else {
this . emit ( "error" , ( 0 , index _js _1 . makeError ) ( "received unexpected message" , "UNKNOWN_ERROR" , {
reasonCode : "UNEXPECTED_MESSAGE" ,
result
} ) ) ;
return ;
}
2022-09-05 23:57:11 +03:00
}
2023-06-02 00:52:58 +03:00
/ * *
* Sub - classes * * must * * override this to send % % message % % over their
* transport .
* /
2022-09-05 23:57:11 +03:00
async _write ( message ) {
throw new Error ( "sub-classes must override this" ) ;
}
}
exports . SocketProvider = SocketProvider ;
//# sourceMappingURL=provider-socket.js.map