From 4cac9a9c85148a36ded366f9a762d166cbf72c24 Mon Sep 17 00:00:00 2001 From: Patrik Stas Date: Thu, 8 Sep 2022 18:09:45 +0200 Subject: [PATCH] Fix scan-scheduled event listening, cleanups, refactring Signed-off-by: Patrik Stas --- indyscan-daemon/app-configs/sovbuilder.json | 9 ++++++++ indyscan-daemon/src/constants.js | 19 ++++++++++++++++- indyscan-daemon/src/server/server.js | 18 +++++++++------- indyscan-daemon/src/server/wsockets.js | 4 ++-- indyscan-daemon/src/workers/worker-rtw.js | 23 +++++++++++---------- indyscan-webapp/pages/home.js | 17 +++++---------- indyscan-webapp/sockets/constants.js | 10 +++++++++ indyscan-webapp/txtools/index.js | 14 ++++--------- 8 files changed, 70 insertions(+), 44 deletions(-) create mode 100644 indyscan-webapp/sockets/constants.js diff --git a/indyscan-daemon/app-configs/sovbuilder.json b/indyscan-daemon/app-configs/sovbuilder.json index eb18a2b9..db4a40ed 100644 --- a/indyscan-daemon/app-configs/sovbuilder.json +++ b/indyscan-daemon/app-configs/sovbuilder.json @@ -14,6 +14,15 @@ "esUrl": "{{{ES_URL}}}", "workerTiming": "MEDIUM" } + }, + { + "builder": "rtwExpansion", + "params": { + "indyNetworkId": "{{{INDY_NETWORK}}}", + "esUrl": "{{{ES_URL}}}", + "esIndex": "{{{ES_INDEX}}}", + "workerTiming": "MEDIUM" + } } ] } diff --git a/indyscan-daemon/src/constants.js b/indyscan-daemon/src/constants.js index 4dcca267..f8e5e520 100644 --- a/indyscan-daemon/src/constants.js +++ b/indyscan-daemon/src/constants.js @@ -1,8 +1,25 @@ +const SOCKETIO_EVENT = { + LEDGER_TX_SCANNED: 'ledger-tx-scanned', + LEDGER_TX_SCAN_SCHEDULED: 'ledger-tx-scan-scheduled', + SCANNED_TX_PROCESSED: 'tx-processed', + SCANNED_TX_PROCESSING_SCHEDULED: 'tx-rescan-scheduled' +} + +const INTERNAL_EVENT = { + TX_RESCAN_SCHEDULED: 'tx-rescan-scheduled', + TX_NOT_AVAILABLE: 'tx-not-available', + TX_PROCESSED: 'tx-processed', + TX_RESOLUTION_ERROR: 'tx-resolution-error', + TX_INGESTION_ERROR: 'tx-ingestion-error' +} + const OPERATION_TYPES = { LEDGER_CPY: 'ledgercpy', EXPANSION: 'expansion' } module.exports = { - OPERATION_TYPES + OPERATION_TYPES, + INTERNAL_EVENT, + SOCKETIO_EVENT } diff --git a/indyscan-daemon/src/server/server.js b/indyscan-daemon/src/server/server.js index db25320c..2f3e4658 100644 --- a/indyscan-daemon/src/server/server.js +++ b/indyscan-daemon/src/server/server.js @@ -3,10 +3,10 @@ const apiWorkers = require('./api/api-workers') const express = require('express') const bodyParser = require('body-parser') const logger = require('../logging/logger-main') -var pretty = require('express-prettify') +const pretty = require('express-prettify') const { createSocketioManager } = require('./wsockets') const { logRequests, logResponses } = require('./middleware') -const { OPERATION_TYPES } = require('../constants') +const { OPERATION_TYPES, INTERNAL_EVENT, SOCKETIO_EVENT } = require('../constants') function setupLoggingMiddlleware (app, enableRequestLogging, enableResponseLogging) { if (enableRequestLogging) { @@ -16,7 +16,8 @@ function setupLoggingMiddlleware (app, enableRequestLogging, enableResponseLoggi app.use(logResponses) } } -function linkLedgerCpyWorkersToSockets(socketioManager, serviceWorkers) { + +function linkLedgerCpyWorkersToSockets (socketioManager, serviceWorkers) { logger.info(`Linking workers of operationType ${OPERATION_TYPES.LEDGER_CPY} with sockets.`) const workerQuery = { operationTypes: [OPERATION_TYPES.LEDGER_CPY] } const workers = serviceWorkers.getWorkers(workerQuery) @@ -24,8 +25,8 @@ function linkLedgerCpyWorkersToSockets(socketioManager, serviceWorkers) { const emitter = worker.getEventEmitter() const { workerId, subledger, operationType, indyNetworkId } = worker.getWorkerInfo() if (operationType === OPERATION_TYPES.LEDGER_CPY) { - socketioManager.forwardEmitterEventToWebsocket(emitter, workerId, 'tx-processed', 'tx-ledger-processed', indyNetworkId, subledger) - socketioManager.forwardEmitterEventToWebsocket(emitter, workerId, 'tx-rescan-scheduled', 'tx-ledger-rescan-scheduled', indyNetworkId, subledger) + socketioManager.forwardEmitterEventToWebsocket(emitter, workerId, INTERNAL_EVENT.TX_PROCESSED, SOCKETIO_EVENT.LEDGER_TX_SCANNED, indyNetworkId, subledger) + socketioManager.forwardEmitterEventToWebsocket(emitter, workerId, INTERNAL_EVENT.TX_RESCAN_SCHEDULED, SOCKETIO_EVENT.LEDGER_TX_SCAN_SCHEDULED, indyNetworkId, subledger) } } } @@ -38,21 +39,22 @@ function linkExpansionWorkersToSockets (socketioManager, serviceWorkers) { const emitter = worker.getEventEmitter() const { workerId, subledger, operationType, indyNetworkId } = worker.getWorkerInfo() if (operationType === OPERATION_TYPES.EXPANSION) { - socketioManager.forwardEmitterEventToWebsocket(emitter, workerId, 'tx-processed', 'tx-processed', indyNetworkId, subledger) - socketioManager.forwardEmitterEventToWebsocket(emitter, workerId, 'tx-rescan-scheduled', 'tx-rescan-scheduled', indyNetworkId, subledger) + socketioManager.forwardEmitterEventToWebsocket(emitter, workerId, INTERNAL_EVENT.TX_PROCESSED, SOCKETIO_EVENT.SCANNED_TX_PROCESSED, indyNetworkId, subledger) + socketioManager.forwardEmitterEventToWebsocket(emitter, workerId, INTERNAL_EVENT.TX_RESCAN_SCHEDULED, SOCKETIO_EVENT.SCANNED_TX_PROCESSING_SCHEDULED, indyNetworkId, subledger) } } } function createRoomJoinReactor (serviceWorkers) { function onRoomJoined (room, socket) { - const workerQuery = { operationTypes: ['expansion'], indyNetworkIds: [room] } + const workerQuery = { operationTypes: [OPERATION_TYPES.EXPANSION], indyNetworkIds: [room] } const workers = serviceWorkers.getWorkers(workerQuery) for (const worker of workers) { const rescanScheduledPayload = worker.requestRescheduleStatus() socket.emit('rescan-scheduled', rescanScheduledPayload) } } + return onRoomJoined } diff --git a/indyscan-daemon/src/server/wsockets.js b/indyscan-daemon/src/server/wsockets.js index 71b2b9ca..fa27c9e0 100644 --- a/indyscan-daemon/src/server/wsockets.js +++ b/indyscan-daemon/src/server/wsockets.js @@ -7,14 +7,14 @@ function createSocketioManager (expressServer) { const io = socketio(expressServer) function forwardEmitterEventToWebsocket (emitter, workerId, sourceEmitterEventName, targetSocketEventName, forwardToRoom) { - logger.info(`Worker ${workerId} events of name ${sourceEmitterEventName} will be broadcasted to room ${targetSocketEventName} as event ${targetSocketEventName} `) + logger.info(`Forwarding worker events ${workerId} / ${sourceEmitterEventName} -----> sockets room ${forwardToRoom} / ${targetSocketEventName} `) emitter.on(sourceEmitterEventName, (payload) => { io.of('/').in(`${forwardToRoom}`).clients((error, clients) => { if (error) { logger.error('Problem listing clients to print info.') } - logger.info(`Worker ${workerId} emitting sockets event ${targetSocketEventName} to room ${forwardToRoom} to ${clients.length} clients`) + logger.info(`Worker ${workerId} emitting sockets event ${forwardToRoom} / ${targetSocketEventName} (${clients.length} clients)`) }) io.to(forwardToRoom).emit(targetSocketEventName, payload) }) diff --git a/indyscan-daemon/src/workers/worker-rtw.js b/indyscan-daemon/src/workers/worker-rtw.js index 98759ce3..4ed620fa 100644 --- a/indyscan-daemon/src/workers/worker-rtw.js +++ b/indyscan-daemon/src/workers/worker-rtw.js @@ -7,6 +7,7 @@ const sleep = require('sleep-promise') const EventEmitter = require('events') const { createLogger } = require('../logging/logger-builder') const { envConfig } = require('../config/env') +const { INTERNAL_EVENT } = require('../constants') function getExpandedTimingConfig (providedTimingSetup) { let presetData @@ -157,9 +158,9 @@ async function createWorkerRtw ({ indyNetworkId, subledger, operationType, itera subledger } } - logger.info(`Transaction seqno=${txMeta.seqNo} processed. Emitting 'tx-processed', 'rescan-scheduled'`) - eventEmitter.emit('tx-processed', { workerData, txData }) - eventEmitter.emit('rescan-scheduled', { workerData, msTillRescan: timerLock.getMsTillUnlock() }) + logger.info(`Transaction ${txMeta.seqNo} processed.`) + eventEmitter.emit(INTERNAL_EVENT.TX_PROCESSED, { workerData, txData }) + eventEmitter.emit(INTERNAL_EVENT.TX_RESCAN_SCHEDULED, { workerData, msTillRescan: timerLock.getMsTillUnlock() }) } // TODO: in all of this even-reacting functions, rename txMeta or reduce signature requirmenets to require just "seqNo" if possible @@ -167,19 +168,19 @@ async function createWorkerRtw ({ indyNetworkId, subledger, operationType, itera txNotAvailableCount++ timerLock.addBlockTime(timeoutOnTxNoFound, jitterRatio) const workerData = eventSharedPayload() - logger.info(`Transaction seqno=${queryMeta.seqNo} not available. Emitting 'tx-not-available', 'rescan-scheduled'`) - eventEmitter.emit('tx-not-available', { workerInfo: getWorkerInfo() }) - eventEmitter.emit('rescan-scheduled', { workerData, msTillRescan: timerLock.getMsTillUnlock() }) + logger.info(`Transaction ${queryMeta.seqNo} not available.`) + eventEmitter.emit(INTERNAL_EVENT.TX_NOT_AVAILABLE, { workerInfo: getWorkerInfo() }) + eventEmitter.emit(INTERNAL_EVENT.TX_RESCAN_SCHEDULED, { workerData, msTillRescan: timerLock.getMsTillUnlock() }) } function resolutionError (queryMeta, error) { cycleExceptionCount++ timerLock.addBlockTime(timeoutOnLedgerResolutionError, jitterRatio) const workerData = eventSharedPayload() - logger.warn(`Transaction seqno=${queryMeta.seqNo} resolution error ${util.inspect(error)}. ` + + logger.warn(`Transaction ${queryMeta.seqNo} resolution error ${util.inspect(error)}. ` + 'Emitting \'tx-resolution-error\', \'rescan-scheduled\'') - eventEmitter.emit('tx-resolution-error', getWorkerInfo()) - eventEmitter.emit('rescan-scheduled', { workerData, msTillRescan: timerLock.getMsTillUnlock() }) + eventEmitter.emit(INTERNAL_EVENT.TX_RESOLUTION_ERROR, getWorkerInfo()) + eventEmitter.emit(INTERNAL_EVENT.TX_RESCAN_SCHEDULED, { workerData, msTillRescan: timerLock.getMsTillUnlock() }) } function ingestionError (error, queryMeta, processedTx) { @@ -189,8 +190,8 @@ async function createWorkerRtw ({ indyNetworkId, subledger, operationType, itera logger.error(`Transaction ${queryMeta.seqNo} ingestion error. Couldn't ingest transaction` + `${JSON.stringify(processedTx)} due to storage ingestion error ${util.inspect(error)} ` + 'Emitting: \'tx-ingestion-error\', \'rescan-scheduled\'.') - eventEmitter.emit('tx-ingestion-error', getWorkerInfo()) - eventEmitter.emit('rescan-scheduled', { workerData, msTillRescan: timerLock.getMsTillUnlock() }) + eventEmitter.emit(INTERNAL_EVENT.TX_INGESTION_ERROR, getWorkerInfo()) + eventEmitter.emit(INTERNAL_EVENT.TX_RESCAN_SCHEDULED, { workerData, msTillRescan: timerLock.getMsTillUnlock() }) } async function processTransaction (txData, txFormat) { diff --git a/indyscan-webapp/pages/home.js b/indyscan-webapp/pages/home.js index aaaa4f24..c2f631ab 100644 --- a/indyscan-webapp/pages/home.js +++ b/indyscan-webapp/pages/home.js @@ -12,6 +12,7 @@ import { CSSTransition } from 'react-transition-group' import { assureWebsocketClient, getWebsocketClient } from '../context/socket-client' import NetworkInfo from '../components/NetworkInfo/NetworkInfo' import SubledgerHeader from '../components/SubledgerHeader/SubledgerHeader' +import { SOCKETIO_EVENT } from '../sockets/constants' class HomePage extends Component { static async getInitialProps ({ req, query }) { @@ -105,7 +106,6 @@ class HomePage extends Component { onRescanScheduled (payload) { const { workerData: { subledger }, msTillRescan } = payload - console.log(`rescan-scheduled = ${subledger} ${msTillRescan}`) const rescanStart = Math.round((new Date()).getTime()) const rescanDone = rescanStart + Math.round(msTillRescan) if (subledger === 'domain') { @@ -143,27 +143,20 @@ class HomePage extends Component { this.setState({ animateFirst: false }) } - configureSocketForCurrentNetwork(networkDetails) { if (networkDetails) { const { id: indyNetworkId } = networkDetails if (indyNetworkId) { let socket = assureWebsocketClient() - console.log(`home.js configureSocketForCurrentNetwork ${indyNetworkId}`) - socket.on('connection', function (_socket) { logger.info(`app.js WS connection established.`) }) - socket.on('switched-room-notification', (activeWsRoom) => { console.log(`switched-room-notification: Entered room ${activeWsRoom}`) this.setState({activeWsRoom}) - socket.on('rescan-scheduled', this.onRescanScheduled.bind(this)) - // socket.on('tx-processed', this.onTxProcessed.bind(this)) - socket.on('tx-ledger-processed', this.onTxDiscovered.bind(this)) - console.log(`Registered hooks on the socket! ${socket.hasListeners()}`) + socket.on(SOCKETIO_EVENT.LEDGER_TX_SCAN_SCHEDULED, this.onRescanScheduled.bind(this)) + socket.on(SOCKETIO_EVENT.LEDGER_TX_SCANNED, this.onTxDiscovered.bind(this)) }) - console.log(`Sending switch-room request for ${indyNetworkId}`) socket.emit('switch-room', indyNetworkId) } @@ -186,8 +179,8 @@ class HomePage extends Component { const socket = getWebsocketClient() if (socket) { console.log(`Cleaning socket listeners. Had listeners=${socket.hasListeners()}`) - socket.off('rescan-scheduled') - socket.off('tx-processed') + socket.off(SOCKETIO_EVENT.LEDGER_TX_SCANNED) + socket.off(SOCKETIO_EVENT.LEDGER_TX_SCAN_SCHEDULED) socket.off('switched-room-notification') } } diff --git a/indyscan-webapp/sockets/constants.js b/indyscan-webapp/sockets/constants.js new file mode 100644 index 00000000..bdbeccbf --- /dev/null +++ b/indyscan-webapp/sockets/constants.js @@ -0,0 +1,10 @@ +const SOCKETIO_EVENT = { + LEDGER_TX_SCANNED: 'ledger-tx-scanned', + LEDGER_TX_SCAN_SCHEDULED: 'ledger-tx-scan-scheduled', + SCANNED_TX_PROCESSED: 'tx-processed', + SCANNED_TX_PROCESSING_SCHEDULED: 'tx-rescan-scheduled' +} + +module.exports = { + SOCKETIO_EVENT +} diff --git a/indyscan-webapp/txtools/index.js b/indyscan-webapp/txtools/index.js index aec12480..27bd361e 100644 --- a/indyscan-webapp/txtools/index.js +++ b/indyscan-webapp/txtools/index.js @@ -114,15 +114,9 @@ const txDataDescriptiveExtractors = { export function extractTxDataBasic (tx) { const { seqNo } = tx.imeta let txnId, txnTimeIso8601, typeName, from, indexedFields - if (tx?.idata?.expansion) { - typeName = tx.idata.expansion.idata.txn.typeName - txnId = tx.idata.expansion.idata.txnMetadata.txnId - const epoch = tx.idata.expansion?.idata?.txnMetadata?.txnTime - txnTimeIso8601 = epoch ? new Date(epoch).toISOString() : null - from = tx?.idata?.expansion?.idata?.txn?.metadata?.from || '-' - indexedFields = true - } else if (tx?.idata?.json || tx?.idata?.serialized) { - const deserializedOriginal = JSON.parse(tx?.idata?.json || tx?.idata?.serialized?.idata?.json) + const serializedOriginal = tx?.idata?.json || tx?.idata?.serialized?.idata?.json + if (serializedOriginal) { + const deserializedOriginal = JSON.parse(serializedOriginal) txnId = deserializedOriginal.txnMetadata.txnId const epoch = deserializedOriginal.txnMetadata.txnTime * 1000 txnTimeIso8601 = epoch ? new Date(epoch).toISOString() : null @@ -130,7 +124,7 @@ export function extractTxDataBasic (tx) { from = deserializedOriginal.txn.metadata.from indexedFields = false } else { - throw Error("Malformed transaction format, does not contain expansion nor serialized format.") + throw Error("Expected transaction in 'serialized' or 'full.serialized' format") } return { txnId, seqNo, txnTimeIso8601, typeName, from, indexedFields } }