Skip to content

Commit

Permalink
Merge pull request #179 from Patrik-Stas/fix/assure-txs-listed
Browse files Browse the repository at this point in the history
List transactions in UI based on basic 'serialized' tx representation
  • Loading branch information
Patrik-Stas authored Sep 8, 2022
2 parents 78fbd22 + 4cac9a9 commit eb429e0
Show file tree
Hide file tree
Showing 31 changed files with 380 additions and 207 deletions.
20 changes: 20 additions & 0 deletions indyscan-api/app-config/sovrin-buildernet.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
[
{
"id": "SOVRIN_BUILDERNET",
"ui": {
"priority": 1,
"display": "BuilderNet",
"display-long": "Sovrin BuilderNet",
"description": "For active development of your solution.",
"tutorial": "Get your DID and start writing on the network",
"tutorial-link": "https://selfserve.sovrin.org/",
"logo-address": "/static/sovrin.png"
},
"aliases": [
"sovbuilder"
],
"es" : {
"index": "txs-sovbuilder"
}
}
]
15 changes: 15 additions & 0 deletions indyscan-api/nodemon-buildernet.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
"verbose": true,
"ignore": ["node_modules", ".next"],
"watch": ["src/**/*"],
"ext": "js json",
"env": {
"ES_URL": "http://localhost:9200",
"DAEMON_URL": "http://localhost:3709",
"LOG_LEVEL": "info",
"PORT" : 3708,
"LOG_HTTP_REQUESTS" : true,
"LOG_HTTP_RESPONSES" : true,
"NETWORKS_CONFIG_PATH": "./app-config/sovrin-buildernet.json"
}
}
1 change: 1 addition & 0 deletions indyscan-api/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
"lint": "standard",
"lint:fix": "standard --fix",
"dev": "nodemon src/index.js",
"dev:sovrin:builder": "nodemon --config nodemon-buildernet.json src/index.js",
"test:unit": "jest tests/unit",
"start": "cross-env NODE_ENV=production node src/index.js"
},
Expand Down
2 changes: 1 addition & 1 deletion indyscan-api/src/service/service-storages.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ const { createStorageReadEs } = require('indyscan-storage/src')
async function createLedgerStorageManager (esUrl) {
const storages = {}

logger.info(`Connecting to ElasticSearh '${esUrl}'.`)
logger.info(`Connecting to ElasticSearch '${esUrl}'.`)
const esClient = new elasticsearch.Client({ node: esUrl })

async function addIndyNetwork (networkId, networkEsIndex) {
Expand Down
2 changes: 1 addition & 1 deletion indyscan-daemon/app-configs/sovbuilder.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
"genesisPath": "{{{cfgdir}}}/genesis/{{{INDY_NETWORK}}}.txn",
"esIndex": "{{{ES_INDEX}}}",
"esUrl": "{{{ES_URL}}}",
"workerTiming": "SLOW"
"workerTiming": "MEDIUM"
}
},
{
Expand Down
1 change: 1 addition & 0 deletions indyscan-daemon/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
"lint:fix": "standard --fix",
"dev": "cross-env NODE_ENV=development nodemon src/index.js",
"dev:sovrin:staging:builder": "cross-env NODE_ENV=development WORKER_CONFIGS=app-configs/sovstaging.json,app-configs/sovbuilder.json nodemon src/index.js",
"dev:sovrin:builder": "cross-env NODE_ENV=development WORKER_CONFIGS=app-configs/sovbuilder.json nodemon src/index.js",
"dev:sovrin:staging": "cross-env NODE_ENV=development WORKER_CONFIGS=app-configs/sovstaging.json nodemon src/index.js",
"dev:sovrin:sovmain": "cross-env NODE_ENV=development WORKER_CONFIGS=app-configs/sovmain.json nodemon src/index.js",
"dev:sovrin": "cross-env NODE_ENV=development WORKER_CONFIGS=app-configs/sovmain.json,app-configs/sovstaging.json,app-configs/sovbuilder.json nodemon src/index.js",
Expand Down
25 changes: 25 additions & 0 deletions indyscan-daemon/src/constants.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
const SOCKETIO_EVENT = {
LEDGER_TX_SCANNED: 'ledger-tx-scanned',
LEDGER_TX_SCAN_SCHEDULED: 'ledger-tx-scan-scheduled',
SCANNED_TX_PROCESSED: 'tx-processed',
SCANNED_TX_PROCESSING_SCHEDULED: 'tx-rescan-scheduled'
}

const INTERNAL_EVENT = {
TX_RESCAN_SCHEDULED: 'tx-rescan-scheduled',
TX_NOT_AVAILABLE: 'tx-not-available',
TX_PROCESSED: 'tx-processed',
TX_RESOLUTION_ERROR: 'tx-resolution-error',
TX_INGESTION_ERROR: 'tx-ingestion-error'
}

const OPERATION_TYPES = {
LEDGER_CPY: 'ledgercpy',
EXPANSION: 'expansion'
}

module.exports = {
OPERATION_TYPES,
INTERNAL_EVENT,
SOCKETIO_EVENT
}
12 changes: 6 additions & 6 deletions indyscan-daemon/src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ const { createNetOpRtwSerialization } = require('./worker-templates/rtw-ledger-t
// }
// })

async function buildWorkers (builder, builderParams) {
logger.info(`Going to build workers by ${builder} from ${JSON.stringify(builderParams)}`)
async function buildWorker (builder, builderParams) {
logger.info(`Going to build worker by ${builder} from ${JSON.stringify(builderParams, null, 2)}`)
if (builder === 'rtwSerialization') {
return createNetOpRtwSerialization(builderParams)
} else if (builder === 'rtwExpansion') {
Expand All @@ -50,14 +50,14 @@ async function run () {
await sleep(2000)
logger.info(`Will bootstrap app from following operations definitions ${JSON.stringify(workerConfigPaths, null, 2)}`)

for (const workerConfigPath of workerConfigPaths) {
for (const workerConfigPath of workerConfigPaths) { // per each worker config file, render the file
const workersConfig = fs.readFileSync(workerConfigPath)
const { workersBuildersTemplate, env } = JSON.parse(workersConfig)
env.cfgdir = path.dirname(workerConfigPath)
const workerBuilders = JSON.parse(Mustache.render(JSON.stringify(workersBuildersTemplate), env))
for (const workerBuilder of workerBuilders) {
const workerBuilders = JSON.parse(Mustache.render(JSON.stringify(workersBuildersTemplate), env)) // render template
for (const workerBuilder of workerBuilders) { // one file can define multiple workers
const { builder, params } = workerBuilder
const { workers, sources, targets, transformers, iterators } = await buildWorkers(builder, params)
const { workers, sources, targets, transformers, iterators } = await buildWorker(builder, params)
allWorkers.push(workers)
allSources.push(sources)
allTargets.push(targets)
Expand Down
8 changes: 4 additions & 4 deletions indyscan-daemon/src/logging/logger-builder.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,19 @@ const winston = require('winston')
const mkdirp = require('mkdirp')
const Elasticsearch = require('winston-elasticsearch')
const { format } = require('winston')
const { timestamp, printf } = format
const { timestamp, printf, label } = format

const myFormat = printf(({ level, message, timestamp, metadaemon }) => {
return `${timestamp} [${metadaemon && metadaemon.workerId ? metadaemon.workerId : '--'}] ${level}: ${message}`
const myFormat = printf(({ label, level, message, timestamp, metadaemon }) => {
return `${timestamp} [${label}] ${level}: ${message}`
})

function createLogger (loggerName, consoleLogsLevel, enableLogFiles) {
winston.loggers.add(loggerName, {
transports: [
new winston.transports.Console({
level: consoleLogsLevel,
label: loggerName,
format: winston.format.combine(
label({ label: loggerName }),
timestamp(),
myFormat,
winston.format.colorize({ all: true })
Expand Down
34 changes: 25 additions & 9 deletions indyscan-daemon/src/server/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ const apiWorkers = require('./api/api-workers')
const express = require('express')
const bodyParser = require('body-parser')
const logger = require('../logging/logger-main')
var pretty = require('express-prettify')
const pretty = require('express-prettify')
const { createSocketioManager } = require('./wsockets')
const { logRequests, logResponses } = require('./middleware')
const { OPERATION_TYPES, INTERNAL_EVENT, SOCKETIO_EVENT } = require('../constants')

function setupLoggingMiddlleware (app, enableRequestLogging, enableResponseLogging) {
if (enableRequestLogging) {
Expand All @@ -16,37 +17,52 @@ function setupLoggingMiddlleware (app, enableRequestLogging, enableResponseLoggi
}
}

function linkLedgerCpyWorkersToSockets (socketioManager, serviceWorkers) {
logger.info(`Linking workers of operationType ${OPERATION_TYPES.LEDGER_CPY} with sockets.`)
const workerQuery = { operationTypes: [OPERATION_TYPES.LEDGER_CPY] }
const workers = serviceWorkers.getWorkers(workerQuery)
for (const worker of workers) {
const emitter = worker.getEventEmitter()
const { workerId, subledger, operationType, indyNetworkId } = worker.getWorkerInfo()
if (operationType === OPERATION_TYPES.LEDGER_CPY) {
socketioManager.forwardEmitterEventToWebsocket(emitter, workerId, INTERNAL_EVENT.TX_PROCESSED, SOCKETIO_EVENT.LEDGER_TX_SCANNED, indyNetworkId, subledger)
socketioManager.forwardEmitterEventToWebsocket(emitter, workerId, INTERNAL_EVENT.TX_RESCAN_SCHEDULED, SOCKETIO_EVENT.LEDGER_TX_SCAN_SCHEDULED, indyNetworkId, subledger)
}
}
}

function linkExpansionWorkersToSockets (socketioManager, serviceWorkers) {
logger.info('Linking workers with sockets.')
const workerQuery = { operationTypes: ['expansion'] }
logger.info(`Linking workers of operationType ${OPERATION_TYPES.EXPANSION} with sockets.`)
const workerQuery = { operationTypes: [OPERATION_TYPES.EXPANSION] }
const workers = serviceWorkers.getWorkers(workerQuery)
for (const worker of workers) {
const emitter = worker.getEventEmitter()
const { subledger, operationType, indyNetworkId } = worker.getWorkerInfo()
logger.info(`Setting up event->ws forward for ${operationType}/${indyNetworkId}/${subledger} `)
if (operationType === 'expansion') {
socketioManager.forwardEmitterEventToWebsocket(emitter, 'tx-processed', indyNetworkId, subledger)
socketioManager.forwardEmitterEventToWebsocket(emitter, 'rescan-scheduled', indyNetworkId, subledger)
const { workerId, subledger, operationType, indyNetworkId } = worker.getWorkerInfo()
if (operationType === OPERATION_TYPES.EXPANSION) {
socketioManager.forwardEmitterEventToWebsocket(emitter, workerId, INTERNAL_EVENT.TX_PROCESSED, SOCKETIO_EVENT.SCANNED_TX_PROCESSED, indyNetworkId, subledger)
socketioManager.forwardEmitterEventToWebsocket(emitter, workerId, INTERNAL_EVENT.TX_RESCAN_SCHEDULED, SOCKETIO_EVENT.SCANNED_TX_PROCESSING_SCHEDULED, indyNetworkId, subledger)
}
}
}

function createRoomJoinReactor (serviceWorkers) {
function onRoomJoined (room, socket) {
const workerQuery = { operationTypes: ['expansion'], indyNetworkIds: [room] }
const workerQuery = { operationTypes: [OPERATION_TYPES.EXPANSION], indyNetworkIds: [room] }
const workers = serviceWorkers.getWorkers(workerQuery)
for (const worker of workers) {
const rescanScheduledPayload = worker.requestRescheduleStatus()
socket.emit('rescan-scheduled', rescanScheduledPayload)
}
}

return onRoomJoined
}

function setupWebsockets (expressServer, serviceWorkers) {
const socketioManager = createSocketioManager(expressServer)
socketioManager.setupBasicSocketioListeners(createRoomJoinReactor(serviceWorkers))
linkExpansionWorkersToSockets(socketioManager, serviceWorkers)
linkLedgerCpyWorkersToSockets(socketioManager, serviceWorkers)
}

function startServer (serviceWorkers) {
Expand Down
10 changes: 5 additions & 5 deletions indyscan-daemon/src/server/wsockets.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,17 @@ function createSocketioManager (expressServer) {
logger.info('Creating socketio manager')
const io = socketio(expressServer)

function forwardEmitterEventToWebsocket (emitter, eventName, forwardToRoom, subledger) {
logger.info(`Linking worker emitter to sockets for indyNetworkId=${forwardToRoom} subledger=${subledger}, `)
function forwardEmitterEventToWebsocket (emitter, workerId, sourceEmitterEventName, targetSocketEventName, forwardToRoom) {
logger.info(`Forwarding worker events ${workerId} / ${sourceEmitterEventName} -----> sockets room ${forwardToRoom} / ${targetSocketEventName} `)

emitter.on(eventName, (payload) => {
emitter.on(sourceEmitterEventName, (payload) => {
io.of('/').in(`${forwardToRoom}`).clients((error, clients) => {
if (error) {
logger.error('Problem listing clients to print info.')
}
logger.info(`Broadcasting into room ${forwardToRoom}: "${eventName}" to ids=${JSON.stringify(clients)}`)
logger.info(`Worker ${workerId} emitting sockets event ${forwardToRoom} / ${targetSocketEventName} (${clients.length} clients)`)
})
io.to(forwardToRoom).emit(eventName, payload)
io.to(forwardToRoom).emit(targetSocketEventName, payload)
})
}

Expand Down
3 changes: 2 additions & 1 deletion indyscan-daemon/src/worker-templates/rtw-db-expansion.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@ const { createTargetElasticsearch } = require('../targets/target-elasticsearch')
const { createWorkerRtw } = require('../workers/worker-rtw')
const { createIteratorGuided } = require('../iterators/iterator-guided')
const { createSourceElasticsearch } = require('../sources/source-elasticsearch')
const { OPERATION_TYPES } = require('../constants')

async function createNetOpRtwExpansion ({ indyNetworkId, esUrl, esIndex, workerTiming }) {
const operationType = 'expansion'
const operationType = OPERATION_TYPES.EXPANSION

const sourceEs = await createSourceElasticsearch({
indyNetworkId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ const { createIteratorGuided } = require('../iterators/iterator-guided')
const { createTargetElasticsearch } = require('../targets/target-elasticsearch')
const { createSourceElasticsearch } = require('../sources/source-elasticsearch')
const { createSourceLedger } = require('../sources/source-ledger')
const { OPERATION_TYPES } = require('../constants')


async function createNetOpRtwSerialization ({ indyNetworkId, genesisPath, esUrl, esIndex, workerTiming }) {
const operationType = 'ledgercpy'
const operationType = OPERATION_TYPES.LEDGER_CPY
const sourceLedger = await createSourceLedger({
name: indyNetworkId,
genesisPath
Expand Down
43 changes: 25 additions & 18 deletions indyscan-daemon/src/workers/worker-rtw.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,19 @@ const sleep = require('sleep-promise')
const EventEmitter = require('events')
const { createLogger } = require('../logging/logger-builder')
const { envConfig } = require('../config/env')
const { INTERNAL_EVENT } = require('../constants')

function getExpandedTimingConfig (providedTimingSetup) {
let presetData
if (!providedTimingSetup || (typeof providedTimingSetup !== 'string')) {
if (!providedTimingSetup) {
presetData = getDefaultPreset()
} else {
} else if (typeof providedTimingSetup === 'string') {
presetData = resolvePreset(providedTimingSetup) || getDefaultPreset()
} else if (typeof providedTimingSetup === 'object') {
const defaultPreset = getDefaultPreset()
presetData = { ... defaultPreset, ... providedTimingSetup }
}
return Object.assign(presetData, providedTimingSetup)
return presetData
}

function validateTimingConfig (timingConfig) {
Expand All @@ -38,10 +42,10 @@ function validateTimingConfig (timingConfig) {
}

async function createWorkerRtw ({ indyNetworkId, subledger, operationType, iterator, iteratorTxFormat, transformer, target, timing }) {
console.log(`BUILDING WORKER for ${indyNetworkId}... timit= ${timing}`)
const eventEmitter = new EventEmitter()
const workerId = `${indyNetworkId}.${subledger}.${operationType}`
const logger = createLogger(workerId, envConfig.LOG_LEVEL, envConfig.ENABLE_LOGFILES)
logger.info(`Building RTW worker ${workerId} for network: ${indyNetworkId}`)
const eventEmitter = new EventEmitter()
const loggerMetadata = {
metadaemon: {
workerId,
Expand Down Expand Up @@ -81,7 +85,7 @@ async function createWorkerRtw ({ indyNetworkId, subledger, operationType, itera
throw Error(errMsg)
}
timing = getExpandedTimingConfig(timing)
logger.info(`Worker ${workerId} using timing ${JSON.stringify(timing)}`)
logger.info(`Effective timing configuration ${JSON.stringify(timing, null, 2)}`)
validateTimingConfig(timing)
const { timeoutOnSuccess, timeoutOnTxIngestionError, timeoutOnLedgerResolutionError, timeoutOnTxNoFound, jitterRatio } = timing

Expand Down Expand Up @@ -154,29 +158,29 @@ async function createWorkerRtw ({ indyNetworkId, subledger, operationType, itera
subledger
}
}
logger.info(`Transaction seqno=${txMeta.seqNo} processed. Emitting 'tx-processed', 'rescan-scheduled'`)
eventEmitter.emit('tx-processed', { workerData, txData })
eventEmitter.emit('rescan-scheduled', { workerData, msTillRescan: timerLock.getMsTillUnlock() })
logger.info(`Transaction ${txMeta.seqNo} processed.`)
eventEmitter.emit(INTERNAL_EVENT.TX_PROCESSED, { workerData, txData })
eventEmitter.emit(INTERNAL_EVENT.TX_RESCAN_SCHEDULED, { workerData, msTillRescan: timerLock.getMsTillUnlock() })
}

// TODO: in all of this even-reacting functions, rename txMeta or reduce signature requirmenets to require just "seqNo" if possible
function txNotAvailable (queryMeta) {
txNotAvailableCount++
timerLock.addBlockTime(timeoutOnTxNoFound, jitterRatio)
const workerData = eventSharedPayload()
logger.info(`Transaction seqno=${queryMeta.seqNo} not available. Emitting 'tx-not-available', 'rescan-scheduled'`)
eventEmitter.emit('tx-not-available', { workerInfo: getWorkerInfo() })
eventEmitter.emit('rescan-scheduled', { workerData, msTillRescan: timerLock.getMsTillUnlock() })
logger.info(`Transaction ${queryMeta.seqNo} not available.`)
eventEmitter.emit(INTERNAL_EVENT.TX_NOT_AVAILABLE, { workerInfo: getWorkerInfo() })
eventEmitter.emit(INTERNAL_EVENT.TX_RESCAN_SCHEDULED, { workerData, msTillRescan: timerLock.getMsTillUnlock() })
}

function resolutionError (queryMeta, error) {
cycleExceptionCount++
timerLock.addBlockTime(timeoutOnLedgerResolutionError, jitterRatio)
const workerData = eventSharedPayload()
logger.warn(`Transaction seqno=${queryMeta.seqNo} resolution error ${util.inspect(error)}. ` +
logger.warn(`Transaction ${queryMeta.seqNo} resolution error ${util.inspect(error)}. ` +
'Emitting \'tx-resolution-error\', \'rescan-scheduled\'')
eventEmitter.emit('tx-resolution-error', getWorkerInfo())
eventEmitter.emit('rescan-scheduled', { workerData, msTillRescan: timerLock.getMsTillUnlock() })
eventEmitter.emit(INTERNAL_EVENT.TX_RESOLUTION_ERROR, getWorkerInfo())
eventEmitter.emit(INTERNAL_EVENT.TX_RESCAN_SCHEDULED, { workerData, msTillRescan: timerLock.getMsTillUnlock() })
}

function ingestionError (error, queryMeta, processedTx) {
Expand All @@ -186,8 +190,8 @@ async function createWorkerRtw ({ indyNetworkId, subledger, operationType, itera
logger.error(`Transaction ${queryMeta.seqNo} ingestion error. Couldn't ingest transaction` +
`${JSON.stringify(processedTx)} due to storage ingestion error ${util.inspect(error)} ` +
'Emitting: \'tx-ingestion-error\', \'rescan-scheduled\'.')
eventEmitter.emit('tx-ingestion-error', getWorkerInfo())
eventEmitter.emit('rescan-scheduled', { workerData, msTillRescan: timerLock.getMsTillUnlock() })
eventEmitter.emit(INTERNAL_EVENT.TX_INGESTION_ERROR, getWorkerInfo())
eventEmitter.emit(INTERNAL_EVENT.TX_RESCAN_SCHEDULED, { workerData, msTillRescan: timerLock.getMsTillUnlock() })
}

async function processTransaction (txData, txFormat) {
Expand Down Expand Up @@ -379,4 +383,7 @@ async function createWorkerRtw ({ indyNetworkId, subledger, operationType, itera
}
}

module.exports.createWorkerRtw = createWorkerRtw
module.exports = {
createWorkerRtw,
getExpandedTimingConfig
}
Loading

0 comments on commit eb429e0

Please sign in to comment.