2022-09-05 23:57:11 +03:00
"use strict" ;
/ * *
* Generic long - lived socket provider .
*
* Sub - classing notes
* - a sub - class MUST call the ` _start() ` method once connected
* - a sub - class MUST override the ` _write(string) ` method
* - a sub - class MUST call ` _processMessage(string) ` for each message
2022-11-30 23:44:23 +03:00
*
* @ _subsection : api / providers / abstract - provider
2022-09-05 23:57:11 +03:00
* /
Object . defineProperty ( exports , "__esModule" , { value : true } ) ;
exports . SocketProvider = exports . SocketEventSubscriber = exports . SocketPendingSubscriber = exports . SocketBlockSubscriber = exports . SocketSubscriber = void 0 ;
const abstract _provider _js _1 = require ( "./abstract-provider.js" ) ;
2022-09-16 05:58:45 +03:00
const index _js _1 = require ( "../utils/index.js" ) ;
2022-09-05 23:57:11 +03:00
const provider _jsonrpc _js _1 = require ( "./provider-jsonrpc.js" ) ;
class SocketSubscriber {
# provider ;
# filter ;
get filter ( ) { return JSON . parse ( this . # filter ) ; }
# filterId ;
# paused ;
# emitPromise ;
constructor ( provider , filter ) {
this . # provider = provider ;
this . # filter = JSON . stringify ( filter ) ;
this . # filterId = null ;
this . # paused = null ;
this . # emitPromise = null ;
}
start ( ) {
this . # filterId = this . # provider . send ( "eth_subscribe" , this . filter ) . then ( ( filterId ) => {
;
this . # provider . _register ( filterId , this ) ;
return filterId ;
} ) ;
}
stop ( ) {
( this . # filterId ) . then ( ( filterId ) => {
this . # provider . send ( "eth_unsubscribe" , [ filterId ] ) ;
} ) ;
this . # filterId = null ;
}
// @TODO: pause should trap the current blockNumber, unsub, and on resume use getLogs
// and resume
pause ( dropWhilePaused ) {
2022-11-09 10:57:02 +03:00
( 0 , index _js _1 . assert ) ( dropWhilePaused , "preserve logs while paused not supported by SocketSubscriber yet" , "UNSUPPORTED_OPERATION" , { operation : "pause(false)" } ) ;
2022-09-05 23:57:11 +03:00
this . # paused = ! ! dropWhilePaused ;
}
resume ( ) {
this . # paused = null ;
}
_handleMessage ( message ) {
if ( this . # filterId == null ) {
return ;
}
if ( this . # paused === null ) {
let emitPromise = this . # emitPromise ;
if ( emitPromise == null ) {
emitPromise = this . _emit ( this . # provider , message ) ;
}
else {
emitPromise = emitPromise . then ( async ( ) => {
await this . _emit ( this . # provider , message ) ;
} ) ;
}
this . # emitPromise = emitPromise . then ( ( ) => {
if ( this . # emitPromise === emitPromise ) {
this . # emitPromise = null ;
}
} ) ;
}
}
async _emit ( provider , message ) {
throw new Error ( "sub-classes must implemente this; _emit" ) ;
}
}
exports . SocketSubscriber = SocketSubscriber ;
class SocketBlockSubscriber extends SocketSubscriber {
constructor ( provider ) {
super ( provider , [ "newHeads" ] ) ;
}
async _emit ( provider , message ) {
provider . emit ( "block" , parseInt ( message . number ) ) ;
}
}
exports . SocketBlockSubscriber = SocketBlockSubscriber ;
class SocketPendingSubscriber extends SocketSubscriber {
constructor ( provider ) {
super ( provider , [ "newPendingTransactions" ] ) ;
}
async _emit ( provider , message ) {
provider . emit ( "pending" , message ) ;
}
}
exports . SocketPendingSubscriber = SocketPendingSubscriber ;
class SocketEventSubscriber extends SocketSubscriber {
# logFilter ;
get logFilter ( ) { return JSON . parse ( this . # logFilter ) ; }
constructor ( provider , filter ) {
super ( provider , [ "logs" , filter ] ) ;
this . # logFilter = JSON . stringify ( filter ) ;
}
async _emit ( provider , message ) {
2022-09-27 10:45:27 +03:00
provider . emit ( this . # logFilter , provider . _wrapLog ( message , provider . _network ) ) ;
2022-09-05 23:57:11 +03:00
}
}
exports . SocketEventSubscriber = SocketEventSubscriber ;
2022-11-30 23:44:23 +03:00
/ * *
* SocketProvider ...
*
* /
2022-09-05 23:57:11 +03:00
class SocketProvider extends provider _jsonrpc _js _1 . JsonRpcApiProvider {
# callbacks ;
// Maps each filterId to its subscriber
# subs ;
// If any events come in before a subscriber has finished
// registering, queue them
# pending ;
constructor ( network ) {
super ( network , { batchMaxCount : 1 } ) ;
this . # callbacks = new Map ( ) ;
this . # subs = new Map ( ) ;
this . # pending = new Map ( ) ;
}
2022-09-27 10:45:27 +03:00
// This value is only valid after _start has been called
/ *
get _network ( ) : Network {
if ( this . # network == null ) {
throw new Error ( "this shouldn't happen" ) ;
}
return this . # network . clone ( ) ;
}
* /
2022-09-05 23:57:11 +03:00
_getSubscriber ( sub ) {
switch ( sub . type ) {
case "close" :
return new abstract _provider _js _1 . UnmanagedSubscriber ( "close" ) ;
case "block" :
return new SocketBlockSubscriber ( this ) ;
case "pending" :
return new SocketPendingSubscriber ( this ) ;
case "event" :
return new SocketEventSubscriber ( this , sub . filter ) ;
case "orphan" :
// Handled auto-matically within AbstractProvider
// when the log.removed = true
if ( sub . filter . orphan === "drop-log" ) {
return new abstract _provider _js _1 . UnmanagedSubscriber ( "drop-log" ) ;
}
}
return super . _getSubscriber ( sub ) ;
}
_register ( filterId , subscriber ) {
this . # subs . set ( filterId , subscriber ) ;
const pending = this . # pending . get ( filterId ) ;
if ( pending ) {
for ( const message of pending ) {
subscriber . _handleMessage ( message ) ;
}
this . # pending . delete ( filterId ) ;
}
}
async _send ( payload ) {
// WebSocket provider doesn't accept batches
2022-09-16 05:58:45 +03:00
( 0 , index _js _1 . assertArgument ) ( ! Array . isArray ( payload ) , "WebSocket does not support batch send" , "payload" , payload ) ;
2022-09-05 23:57:11 +03:00
// @TODO: stringify payloads here and store to prevent mutations
2022-09-30 05:57:27 +03:00
// Prepare a promise to respond to
2022-09-05 23:57:11 +03:00
const promise = new Promise ( ( resolve , reject ) => {
this . # callbacks . set ( payload . id , { payload , resolve , reject } ) ;
} ) ;
2022-09-30 05:57:27 +03:00
// Wait until the socket is connected before writing to it
await this . _waitUntilReady ( ) ;
// Write the request to the socket
2022-09-27 10:45:27 +03:00
await this . _write ( JSON . stringify ( payload ) ) ;
2022-09-05 23:57:11 +03:00
return [ await promise ] ;
}
// Sub-classes must call this once they are connected
2022-09-27 10:45:27 +03:00
/ *
async _start ( ) : Promise < void > {
if ( this . # ready ) { return ; }
2022-09-05 23:57:11 +03:00
for ( const { payload } of this . # callbacks . values ( ) ) {
await this . _write ( JSON . stringify ( payload ) ) ;
}
2022-09-27 10:45:27 +03:00
this . # ready = ( async function ( ) {
await super . _start ( ) ;
} ) ( ) ;
2022-09-05 23:57:11 +03:00
}
2022-09-27 10:45:27 +03:00
* /
2022-09-05 23:57:11 +03:00
// Sub-classes must call this for each message
async _processMessage ( message ) {
const result = ( JSON . parse ( message ) ) ;
if ( "id" in result ) {
const callback = this . # callbacks . get ( result . id ) ;
if ( callback == null ) {
console . log ( "Weird... Response for not a thing we sent" ) ;
return ;
}
this . # callbacks . delete ( result . id ) ;
2022-09-30 05:57:27 +03:00
callback . resolve ( result ) ;
/ *
if ( "error" in result ) {
const { message , code , data } = result . error ;
const error = makeError ( message || "unkonwn error" , "SERVER_ERROR" , {
request : ` ws: ${ JSON . stringify ( callback . payload ) } ` ,
info : { code , data }
} ) ;
callback . reject ( error ) ;
} else {
callback . resolve ( result . result ) ;
}
* /
2022-09-05 23:57:11 +03:00
}
else if ( result . method === "eth_subscription" ) {
const filterId = result . params . subscription ;
const subscriber = this . # subs . get ( filterId ) ;
if ( subscriber ) {
subscriber . _handleMessage ( result . params . result ) ;
}
else {
let pending = this . # pending . get ( filterId ) ;
if ( pending == null ) {
pending = [ ] ;
this . # pending . set ( filterId , pending ) ;
}
pending . push ( result . params . result ) ;
}
}
}
async _write ( message ) {
throw new Error ( "sub-classes must override this" ) ;
}
}
exports . SocketProvider = SocketProvider ;
//# sourceMappingURL=provider-socket.js.map