Skip to content

Commit

Permalink
Fix scan-scheduled event listening, cleanups, refactring
Browse files Browse the repository at this point in the history
Signed-off-by: Patrik Stas <patrik.stas@gmail.com>
  • Loading branch information
Patrik-Stas committed Sep 8, 2022
1 parent 45952da commit 4cac9a9
Show file tree
Hide file tree
Showing 8 changed files with 70 additions and 44 deletions.
9 changes: 9 additions & 0 deletions indyscan-daemon/app-configs/sovbuilder.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,15 @@
"esUrl": "{{{ES_URL}}}",
"workerTiming": "MEDIUM"
}
},
{
"builder": "rtwExpansion",
"params": {
"indyNetworkId": "{{{INDY_NETWORK}}}",
"esUrl": "{{{ES_URL}}}",
"esIndex": "{{{ES_INDEX}}}",
"workerTiming": "MEDIUM"
}
}
]
}
Expand Down
19 changes: 18 additions & 1 deletion indyscan-daemon/src/constants.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,25 @@
const SOCKETIO_EVENT = {
LEDGER_TX_SCANNED: 'ledger-tx-scanned',
LEDGER_TX_SCAN_SCHEDULED: 'ledger-tx-scan-scheduled',
SCANNED_TX_PROCESSED: 'tx-processed',
SCANNED_TX_PROCESSING_SCHEDULED: 'tx-rescan-scheduled'
}

const INTERNAL_EVENT = {
TX_RESCAN_SCHEDULED: 'tx-rescan-scheduled',
TX_NOT_AVAILABLE: 'tx-not-available',
TX_PROCESSED: 'tx-processed',
TX_RESOLUTION_ERROR: 'tx-resolution-error',
TX_INGESTION_ERROR: 'tx-ingestion-error'
}

const OPERATION_TYPES = {
LEDGER_CPY: 'ledgercpy',
EXPANSION: 'expansion'
}

module.exports = {
OPERATION_TYPES
OPERATION_TYPES,
INTERNAL_EVENT,
SOCKETIO_EVENT
}
18 changes: 10 additions & 8 deletions indyscan-daemon/src/server/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ const apiWorkers = require('./api/api-workers')
const express = require('express')
const bodyParser = require('body-parser')
const logger = require('../logging/logger-main')
var pretty = require('express-prettify')
const pretty = require('express-prettify')
const { createSocketioManager } = require('./wsockets')
const { logRequests, logResponses } = require('./middleware')
const { OPERATION_TYPES } = require('../constants')
const { OPERATION_TYPES, INTERNAL_EVENT, SOCKETIO_EVENT } = require('../constants')

function setupLoggingMiddlleware (app, enableRequestLogging, enableResponseLogging) {
if (enableRequestLogging) {
Expand All @@ -16,16 +16,17 @@ function setupLoggingMiddlleware (app, enableRequestLogging, enableResponseLoggi
app.use(logResponses)
}
}
function linkLedgerCpyWorkersToSockets(socketioManager, serviceWorkers) {

function linkLedgerCpyWorkersToSockets (socketioManager, serviceWorkers) {
logger.info(`Linking workers of operationType ${OPERATION_TYPES.LEDGER_CPY} with sockets.`)
const workerQuery = { operationTypes: [OPERATION_TYPES.LEDGER_CPY] }
const workers = serviceWorkers.getWorkers(workerQuery)
for (const worker of workers) {
const emitter = worker.getEventEmitter()
const { workerId, subledger, operationType, indyNetworkId } = worker.getWorkerInfo()
if (operationType === OPERATION_TYPES.LEDGER_CPY) {
socketioManager.forwardEmitterEventToWebsocket(emitter, workerId, 'tx-processed', 'tx-ledger-processed', indyNetworkId, subledger)
socketioManager.forwardEmitterEventToWebsocket(emitter, workerId, 'tx-rescan-scheduled', 'tx-ledger-rescan-scheduled', indyNetworkId, subledger)
socketioManager.forwardEmitterEventToWebsocket(emitter, workerId, INTERNAL_EVENT.TX_PROCESSED, SOCKETIO_EVENT.LEDGER_TX_SCANNED, indyNetworkId, subledger)
socketioManager.forwardEmitterEventToWebsocket(emitter, workerId, INTERNAL_EVENT.TX_RESCAN_SCHEDULED, SOCKETIO_EVENT.LEDGER_TX_SCAN_SCHEDULED, indyNetworkId, subledger)
}
}
}
Expand All @@ -38,21 +39,22 @@ function linkExpansionWorkersToSockets (socketioManager, serviceWorkers) {
const emitter = worker.getEventEmitter()
const { workerId, subledger, operationType, indyNetworkId } = worker.getWorkerInfo()
if (operationType === OPERATION_TYPES.EXPANSION) {
socketioManager.forwardEmitterEventToWebsocket(emitter, workerId, 'tx-processed', 'tx-processed', indyNetworkId, subledger)
socketioManager.forwardEmitterEventToWebsocket(emitter, workerId, 'tx-rescan-scheduled', 'tx-rescan-scheduled', indyNetworkId, subledger)
socketioManager.forwardEmitterEventToWebsocket(emitter, workerId, INTERNAL_EVENT.TX_PROCESSED, SOCKETIO_EVENT.SCANNED_TX_PROCESSED, indyNetworkId, subledger)
socketioManager.forwardEmitterEventToWebsocket(emitter, workerId, INTERNAL_EVENT.TX_RESCAN_SCHEDULED, SOCKETIO_EVENT.SCANNED_TX_PROCESSING_SCHEDULED, indyNetworkId, subledger)
}
}
}

function createRoomJoinReactor (serviceWorkers) {
function onRoomJoined (room, socket) {
const workerQuery = { operationTypes: ['expansion'], indyNetworkIds: [room] }
const workerQuery = { operationTypes: [OPERATION_TYPES.EXPANSION], indyNetworkIds: [room] }
const workers = serviceWorkers.getWorkers(workerQuery)
for (const worker of workers) {
const rescanScheduledPayload = worker.requestRescheduleStatus()
socket.emit('rescan-scheduled', rescanScheduledPayload)
}
}

return onRoomJoined
}

Expand Down
4 changes: 2 additions & 2 deletions indyscan-daemon/src/server/wsockets.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@ function createSocketioManager (expressServer) {
const io = socketio(expressServer)

function forwardEmitterEventToWebsocket (emitter, workerId, sourceEmitterEventName, targetSocketEventName, forwardToRoom) {
logger.info(`Worker ${workerId} events of name ${sourceEmitterEventName} will be broadcasted to room ${targetSocketEventName} as event ${targetSocketEventName} `)
logger.info(`Forwarding worker events ${workerId} / ${sourceEmitterEventName} -----> sockets room ${forwardToRoom} / ${targetSocketEventName} `)

emitter.on(sourceEmitterEventName, (payload) => {
io.of('/').in(`${forwardToRoom}`).clients((error, clients) => {
if (error) {
logger.error('Problem listing clients to print info.')
}
logger.info(`Worker ${workerId} emitting sockets event ${targetSocketEventName} to room ${forwardToRoom} to ${clients.length} clients`)
logger.info(`Worker ${workerId} emitting sockets event ${forwardToRoom} / ${targetSocketEventName} (${clients.length} clients)`)
})
io.to(forwardToRoom).emit(targetSocketEventName, payload)
})
Expand Down
23 changes: 12 additions & 11 deletions indyscan-daemon/src/workers/worker-rtw.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ const sleep = require('sleep-promise')
const EventEmitter = require('events')
const { createLogger } = require('../logging/logger-builder')
const { envConfig } = require('../config/env')
const { INTERNAL_EVENT } = require('../constants')

function getExpandedTimingConfig (providedTimingSetup) {
let presetData
Expand Down Expand Up @@ -157,29 +158,29 @@ async function createWorkerRtw ({ indyNetworkId, subledger, operationType, itera
subledger
}
}
logger.info(`Transaction seqno=${txMeta.seqNo} processed. Emitting 'tx-processed', 'rescan-scheduled'`)
eventEmitter.emit('tx-processed', { workerData, txData })
eventEmitter.emit('rescan-scheduled', { workerData, msTillRescan: timerLock.getMsTillUnlock() })
logger.info(`Transaction ${txMeta.seqNo} processed.`)
eventEmitter.emit(INTERNAL_EVENT.TX_PROCESSED, { workerData, txData })
eventEmitter.emit(INTERNAL_EVENT.TX_RESCAN_SCHEDULED, { workerData, msTillRescan: timerLock.getMsTillUnlock() })
}

// TODO: in all of this even-reacting functions, rename txMeta or reduce signature requirmenets to require just "seqNo" if possible
function txNotAvailable (queryMeta) {
txNotAvailableCount++
timerLock.addBlockTime(timeoutOnTxNoFound, jitterRatio)
const workerData = eventSharedPayload()
logger.info(`Transaction seqno=${queryMeta.seqNo} not available. Emitting 'tx-not-available', 'rescan-scheduled'`)
eventEmitter.emit('tx-not-available', { workerInfo: getWorkerInfo() })
eventEmitter.emit('rescan-scheduled', { workerData, msTillRescan: timerLock.getMsTillUnlock() })
logger.info(`Transaction ${queryMeta.seqNo} not available.`)
eventEmitter.emit(INTERNAL_EVENT.TX_NOT_AVAILABLE, { workerInfo: getWorkerInfo() })
eventEmitter.emit(INTERNAL_EVENT.TX_RESCAN_SCHEDULED, { workerData, msTillRescan: timerLock.getMsTillUnlock() })
}

function resolutionError (queryMeta, error) {
cycleExceptionCount++
timerLock.addBlockTime(timeoutOnLedgerResolutionError, jitterRatio)
const workerData = eventSharedPayload()
logger.warn(`Transaction seqno=${queryMeta.seqNo} resolution error ${util.inspect(error)}. ` +
logger.warn(`Transaction ${queryMeta.seqNo} resolution error ${util.inspect(error)}. ` +
'Emitting \'tx-resolution-error\', \'rescan-scheduled\'')
eventEmitter.emit('tx-resolution-error', getWorkerInfo())
eventEmitter.emit('rescan-scheduled', { workerData, msTillRescan: timerLock.getMsTillUnlock() })
eventEmitter.emit(INTERNAL_EVENT.TX_RESOLUTION_ERROR, getWorkerInfo())
eventEmitter.emit(INTERNAL_EVENT.TX_RESCAN_SCHEDULED, { workerData, msTillRescan: timerLock.getMsTillUnlock() })
}

function ingestionError (error, queryMeta, processedTx) {
Expand All @@ -189,8 +190,8 @@ async function createWorkerRtw ({ indyNetworkId, subledger, operationType, itera
logger.error(`Transaction ${queryMeta.seqNo} ingestion error. Couldn't ingest transaction` +
`${JSON.stringify(processedTx)} due to storage ingestion error ${util.inspect(error)} ` +
'Emitting: \'tx-ingestion-error\', \'rescan-scheduled\'.')
eventEmitter.emit('tx-ingestion-error', getWorkerInfo())
eventEmitter.emit('rescan-scheduled', { workerData, msTillRescan: timerLock.getMsTillUnlock() })
eventEmitter.emit(INTERNAL_EVENT.TX_INGESTION_ERROR, getWorkerInfo())
eventEmitter.emit(INTERNAL_EVENT.TX_RESCAN_SCHEDULED, { workerData, msTillRescan: timerLock.getMsTillUnlock() })
}

async function processTransaction (txData, txFormat) {
Expand Down
17 changes: 5 additions & 12 deletions indyscan-webapp/pages/home.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import { CSSTransition } from 'react-transition-group'
import { assureWebsocketClient, getWebsocketClient } from '../context/socket-client'
import NetworkInfo from '../components/NetworkInfo/NetworkInfo'
import SubledgerHeader from '../components/SubledgerHeader/SubledgerHeader'
import { SOCKETIO_EVENT } from '../sockets/constants'

class HomePage extends Component {
static async getInitialProps ({ req, query }) {
Expand Down Expand Up @@ -105,7 +106,6 @@ class HomePage extends Component {

onRescanScheduled (payload) {
const { workerData: { subledger }, msTillRescan } = payload
console.log(`rescan-scheduled = ${subledger} ${msTillRescan}`)
const rescanStart = Math.round((new Date()).getTime())
const rescanDone = rescanStart + Math.round(msTillRescan)
if (subledger === 'domain') {
Expand Down Expand Up @@ -143,27 +143,20 @@ class HomePage extends Component {
this.setState({ animateFirst: false })
}


configureSocketForCurrentNetwork(networkDetails) {
if (networkDetails) {
const { id: indyNetworkId } = networkDetails
if (indyNetworkId) {
let socket = assureWebsocketClient()
console.log(`home.js configureSocketForCurrentNetwork ${indyNetworkId}`)

socket.on('connection', function (_socket) {
logger.info(`app.js WS connection established.`)
})

socket.on('switched-room-notification', (activeWsRoom) => {
console.log(`switched-room-notification: Entered room ${activeWsRoom}`)
this.setState({activeWsRoom})
socket.on('rescan-scheduled', this.onRescanScheduled.bind(this))
// socket.on('tx-processed', this.onTxProcessed.bind(this))
socket.on('tx-ledger-processed', this.onTxDiscovered.bind(this))
console.log(`Registered hooks on the socket! ${socket.hasListeners()}`)
socket.on(SOCKETIO_EVENT.LEDGER_TX_SCAN_SCHEDULED, this.onRescanScheduled.bind(this))
socket.on(SOCKETIO_EVENT.LEDGER_TX_SCANNED, this.onTxDiscovered.bind(this))
})

console.log(`Sending switch-room request for ${indyNetworkId}`)
socket.emit('switch-room', indyNetworkId)
}
Expand All @@ -186,8 +179,8 @@ class HomePage extends Component {
const socket = getWebsocketClient()
if (socket) {
console.log(`Cleaning socket listeners. Had listeners=${socket.hasListeners()}`)
socket.off('rescan-scheduled')
socket.off('tx-processed')
socket.off(SOCKETIO_EVENT.LEDGER_TX_SCANNED)
socket.off(SOCKETIO_EVENT.LEDGER_TX_SCAN_SCHEDULED)
socket.off('switched-room-notification')
}
}
Expand Down
10 changes: 10 additions & 0 deletions indyscan-webapp/sockets/constants.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
const SOCKETIO_EVENT = {
LEDGER_TX_SCANNED: 'ledger-tx-scanned',
LEDGER_TX_SCAN_SCHEDULED: 'ledger-tx-scan-scheduled',
SCANNED_TX_PROCESSED: 'tx-processed',
SCANNED_TX_PROCESSING_SCHEDULED: 'tx-rescan-scheduled'
}

module.exports = {
SOCKETIO_EVENT
}
14 changes: 4 additions & 10 deletions indyscan-webapp/txtools/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -114,23 +114,17 @@ const txDataDescriptiveExtractors = {
export function extractTxDataBasic (tx) {
const { seqNo } = tx.imeta
let txnId, txnTimeIso8601, typeName, from, indexedFields
if (tx?.idata?.expansion) {
typeName = tx.idata.expansion.idata.txn.typeName
txnId = tx.idata.expansion.idata.txnMetadata.txnId
const epoch = tx.idata.expansion?.idata?.txnMetadata?.txnTime
txnTimeIso8601 = epoch ? new Date(epoch).toISOString() : null
from = tx?.idata?.expansion?.idata?.txn?.metadata?.from || '-'
indexedFields = true
} else if (tx?.idata?.json || tx?.idata?.serialized) {
const deserializedOriginal = JSON.parse(tx?.idata?.json || tx?.idata?.serialized?.idata?.json)
const serializedOriginal = tx?.idata?.json || tx?.idata?.serialized?.idata?.json
if (serializedOriginal) {
const deserializedOriginal = JSON.parse(serializedOriginal)
txnId = deserializedOriginal.txnMetadata.txnId
const epoch = deserializedOriginal.txnMetadata.txnTime * 1000
txnTimeIso8601 = epoch ? new Date(epoch).toISOString() : null
typeName = txTypeToTxName(deserializedOriginal.txn.type)
from = deserializedOriginal.txn.metadata.from
indexedFields = false
} else {
throw Error("Malformed transaction format, does not contain expansion nor serialized format.")
throw Error("Expected transaction in 'serialized' or 'full.serialized' format")
}
return { txnId, seqNo, txnTimeIso8601, typeName, from, indexedFields }
}
Expand Down

0 comments on commit 4cac9a9

Please sign in to comment.