Skip to content

Commit

Permalink
Drive UI updates by ledgercpy workers events
Browse files Browse the repository at this point in the history
Signed-off-by: Patrik Stas <patrik.stas@gmail.com>
  • Loading branch information
Patrik-Stas committed Sep 7, 2022
1 parent d0dfea6 commit 45952da
Show file tree
Hide file tree
Showing 19 changed files with 178 additions and 57 deletions.
20 changes: 20 additions & 0 deletions indyscan-api/app-config/sovrin-buildernet.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
[
{
"id": "SOVRIN_BUILDERNET",
"ui": {
"priority": 1,
"display": "BuilderNet",
"display-long": "Sovrin BuilderNet",
"description": "For active development of your solution.",
"tutorial": "Get your DID and start writing on the network",
"tutorial-link": "https://selfserve.sovrin.org/",
"logo-address": "/static/sovrin.png"
},
"aliases": [
"sovbuilder"
],
"es" : {
"index": "txs-sovbuilder"
}
}
]
15 changes: 15 additions & 0 deletions indyscan-api/nodemon-buildernet.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
"verbose": true,
"ignore": ["node_modules", ".next"],
"watch": ["src/**/*"],
"ext": "js json",
"env": {
"ES_URL": "http://localhost:9200",
"DAEMON_URL": "http://localhost:3709",
"LOG_LEVEL": "info",
"PORT" : 3708,
"LOG_HTTP_REQUESTS" : true,
"LOG_HTTP_RESPONSES" : true,
"NETWORKS_CONFIG_PATH": "./app-config/sovrin-buildernet.json"
}
}
1 change: 1 addition & 0 deletions indyscan-api/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
"lint": "standard",
"lint:fix": "standard --fix",
"dev": "nodemon src/index.js",
"dev:sovrin:builder": "nodemon --config nodemon-buildernet.json src/index.js",
"test:unit": "jest tests/unit",
"start": "cross-env NODE_ENV=production node src/index.js"
},
Expand Down
2 changes: 1 addition & 1 deletion indyscan-api/src/service/service-storages.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ const { createStorageReadEs } = require('indyscan-storage/src')
async function createLedgerStorageManager (esUrl) {
const storages = {}

logger.info(`Connecting to ElasticSearh '${esUrl}'.`)
logger.info(`Connecting to ElasticSearch '${esUrl}'.`)
const esClient = new elasticsearch.Client({ node: esUrl })

async function addIndyNetwork (networkId, networkEsIndex) {
Expand Down
9 changes: 0 additions & 9 deletions indyscan-daemon/app-configs/sovbuilder.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,6 @@
"genesisPath": "{{{cfgdir}}}/genesis/{{{INDY_NETWORK}}}.txn",
"esIndex": "{{{ES_INDEX}}}",
"esUrl": "{{{ES_URL}}}",
"workerTiming": "SLOW"
}
},
{
"builder": "rtwExpansion",
"params": {
"indyNetworkId": "{{{INDY_NETWORK}}}",
"esUrl": "{{{ES_URL}}}",
"esIndex": "{{{ES_INDEX}}}",
"workerTiming": "MEDIUM"
}
}
Expand Down
1 change: 1 addition & 0 deletions indyscan-daemon/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
"lint:fix": "standard --fix",
"dev": "cross-env NODE_ENV=development nodemon src/index.js",
"dev:sovrin:staging:builder": "cross-env NODE_ENV=development WORKER_CONFIGS=app-configs/sovstaging.json,app-configs/sovbuilder.json nodemon src/index.js",
"dev:sovrin:builder": "cross-env NODE_ENV=development WORKER_CONFIGS=app-configs/sovbuilder.json nodemon src/index.js",
"dev:sovrin:staging": "cross-env NODE_ENV=development WORKER_CONFIGS=app-configs/sovstaging.json nodemon src/index.js",
"dev:sovrin:sovmain": "cross-env NODE_ENV=development WORKER_CONFIGS=app-configs/sovmain.json nodemon src/index.js",
"dev:sovrin": "cross-env NODE_ENV=development WORKER_CONFIGS=app-configs/sovmain.json,app-configs/sovstaging.json,app-configs/sovbuilder.json nodemon src/index.js",
Expand Down
8 changes: 8 additions & 0 deletions indyscan-daemon/src/constants.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
const OPERATION_TYPES = {
LEDGER_CPY: 'ledgercpy',
EXPANSION: 'expansion'
}

module.exports = {
OPERATION_TYPES
}
12 changes: 6 additions & 6 deletions indyscan-daemon/src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ const { createNetOpRtwSerialization } = require('./worker-templates/rtw-ledger-t
// }
// })

async function buildWorkers (builder, builderParams) {
logger.info(`Going to build workers by ${builder} from ${JSON.stringify(builderParams)}`)
async function buildWorker (builder, builderParams) {
logger.info(`Going to build worker by ${builder} from ${JSON.stringify(builderParams, null, 2)}`)
if (builder === 'rtwSerialization') {
return createNetOpRtwSerialization(builderParams)
} else if (builder === 'rtwExpansion') {
Expand All @@ -50,14 +50,14 @@ async function run () {
await sleep(2000)
logger.info(`Will bootstrap app from following operations definitions ${JSON.stringify(workerConfigPaths, null, 2)}`)

for (const workerConfigPath of workerConfigPaths) {
for (const workerConfigPath of workerConfigPaths) { // per each worker config file, render the file
const workersConfig = fs.readFileSync(workerConfigPath)
const { workersBuildersTemplate, env } = JSON.parse(workersConfig)
env.cfgdir = path.dirname(workerConfigPath)
const workerBuilders = JSON.parse(Mustache.render(JSON.stringify(workersBuildersTemplate), env))
for (const workerBuilder of workerBuilders) {
const workerBuilders = JSON.parse(Mustache.render(JSON.stringify(workersBuildersTemplate), env)) // render template
for (const workerBuilder of workerBuilders) { // one file can define multiple workers
const { builder, params } = workerBuilder
const { workers, sources, targets, transformers, iterators } = await buildWorkers(builder, params)
const { workers, sources, targets, transformers, iterators } = await buildWorker(builder, params)
allWorkers.push(workers)
allSources.push(sources)
allTargets.push(targets)
Expand Down
8 changes: 4 additions & 4 deletions indyscan-daemon/src/logging/logger-builder.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,19 @@ const winston = require('winston')
const mkdirp = require('mkdirp')
const Elasticsearch = require('winston-elasticsearch')
const { format } = require('winston')
const { timestamp, printf } = format
const { timestamp, printf, label } = format

const myFormat = printf(({ level, message, timestamp, metadaemon }) => {
return `${timestamp} [${metadaemon && metadaemon.workerId ? metadaemon.workerId : '--'}] ${level}: ${message}`
const myFormat = printf(({ label, level, message, timestamp, metadaemon }) => {
return `${timestamp} [${label}] ${level}: ${message}`
})

function createLogger (loggerName, consoleLogsLevel, enableLogFiles) {
winston.loggers.add(loggerName, {
transports: [
new winston.transports.Console({
level: consoleLogsLevel,
label: loggerName,
format: winston.format.combine(
label({ label: loggerName }),
timestamp(),
myFormat,
winston.format.colorize({ all: true })
Expand Down
28 changes: 21 additions & 7 deletions indyscan-daemon/src/server/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ const logger = require('../logging/logger-main')
var pretty = require('express-prettify')
const { createSocketioManager } = require('./wsockets')
const { logRequests, logResponses } = require('./middleware')
const { OPERATION_TYPES } = require('../constants')

function setupLoggingMiddlleware (app, enableRequestLogging, enableResponseLogging) {
if (enableRequestLogging) {
Expand All @@ -15,18 +16,30 @@ function setupLoggingMiddlleware (app, enableRequestLogging, enableResponseLoggi
app.use(logResponses)
}
}
function linkLedgerCpyWorkersToSockets(socketioManager, serviceWorkers) {
logger.info(`Linking workers of operationType ${OPERATION_TYPES.LEDGER_CPY} with sockets.`)
const workerQuery = { operationTypes: [OPERATION_TYPES.LEDGER_CPY] }
const workers = serviceWorkers.getWorkers(workerQuery)
for (const worker of workers) {
const emitter = worker.getEventEmitter()
const { workerId, subledger, operationType, indyNetworkId } = worker.getWorkerInfo()
if (operationType === OPERATION_TYPES.LEDGER_CPY) {
socketioManager.forwardEmitterEventToWebsocket(emitter, workerId, 'tx-processed', 'tx-ledger-processed', indyNetworkId, subledger)
socketioManager.forwardEmitterEventToWebsocket(emitter, workerId, 'tx-rescan-scheduled', 'tx-ledger-rescan-scheduled', indyNetworkId, subledger)
}
}
}

function linkExpansionWorkersToSockets (socketioManager, serviceWorkers) {
logger.info('Linking workers with sockets.')
const workerQuery = { operationTypes: ['expansion'] }
logger.info(`Linking workers of operationType ${OPERATION_TYPES.EXPANSION} with sockets.`)
const workerQuery = { operationTypes: [OPERATION_TYPES.EXPANSION] }
const workers = serviceWorkers.getWorkers(workerQuery)
for (const worker of workers) {
const emitter = worker.getEventEmitter()
const { subledger, operationType, indyNetworkId } = worker.getWorkerInfo()
logger.info(`Setting up event->ws forward for ${operationType}/${indyNetworkId}/${subledger} `)
if (operationType === 'expansion') {
socketioManager.forwardEmitterEventToWebsocket(emitter, 'tx-processed', indyNetworkId, subledger)
socketioManager.forwardEmitterEventToWebsocket(emitter, 'rescan-scheduled', indyNetworkId, subledger)
const { workerId, subledger, operationType, indyNetworkId } = worker.getWorkerInfo()
if (operationType === OPERATION_TYPES.EXPANSION) {
socketioManager.forwardEmitterEventToWebsocket(emitter, workerId, 'tx-processed', 'tx-processed', indyNetworkId, subledger)
socketioManager.forwardEmitterEventToWebsocket(emitter, workerId, 'tx-rescan-scheduled', 'tx-rescan-scheduled', indyNetworkId, subledger)
}
}
}
Expand All @@ -47,6 +60,7 @@ function setupWebsockets (expressServer, serviceWorkers) {
const socketioManager = createSocketioManager(expressServer)
socketioManager.setupBasicSocketioListeners(createRoomJoinReactor(serviceWorkers))
linkExpansionWorkersToSockets(socketioManager, serviceWorkers)
linkLedgerCpyWorkersToSockets(socketioManager, serviceWorkers)
}

function startServer (serviceWorkers) {
Expand Down
10 changes: 5 additions & 5 deletions indyscan-daemon/src/server/wsockets.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,17 @@ function createSocketioManager (expressServer) {
logger.info('Creating socketio manager')
const io = socketio(expressServer)

function forwardEmitterEventToWebsocket (emitter, eventName, forwardToRoom, subledger) {
logger.info(`Linking worker emitter to sockets for indyNetworkId=${forwardToRoom} subledger=${subledger}, `)
function forwardEmitterEventToWebsocket (emitter, workerId, sourceEmitterEventName, targetSocketEventName, forwardToRoom) {
logger.info(`Worker ${workerId} events of name ${sourceEmitterEventName} will be broadcasted to room ${targetSocketEventName} as event ${targetSocketEventName} `)

emitter.on(eventName, (payload) => {
emitter.on(sourceEmitterEventName, (payload) => {
io.of('/').in(`${forwardToRoom}`).clients((error, clients) => {
if (error) {
logger.error('Problem listing clients to print info.')
}
logger.info(`Broadcasting into room ${forwardToRoom}: "${eventName}" to ids=${JSON.stringify(clients)}`)
logger.info(`Worker ${workerId} emitting sockets event ${targetSocketEventName} to room ${forwardToRoom} to ${clients.length} clients`)
})
io.to(forwardToRoom).emit(eventName, payload)
io.to(forwardToRoom).emit(targetSocketEventName, payload)
})
}

Expand Down
3 changes: 2 additions & 1 deletion indyscan-daemon/src/worker-templates/rtw-db-expansion.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@ const { createTargetElasticsearch } = require('../targets/target-elasticsearch')
const { createWorkerRtw } = require('../workers/worker-rtw')
const { createIteratorGuided } = require('../iterators/iterator-guided')
const { createSourceElasticsearch } = require('../sources/source-elasticsearch')
const { OPERATION_TYPES } = require('../constants')

async function createNetOpRtwExpansion ({ indyNetworkId, esUrl, esIndex, workerTiming }) {
const operationType = 'expansion'
const operationType = OPERATION_TYPES.EXPANSION

const sourceEs = await createSourceElasticsearch({
indyNetworkId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ const { createIteratorGuided } = require('../iterators/iterator-guided')
const { createTargetElasticsearch } = require('../targets/target-elasticsearch')
const { createSourceElasticsearch } = require('../sources/source-elasticsearch')
const { createSourceLedger } = require('../sources/source-ledger')
const { OPERATION_TYPES } = require('../constants')


async function createNetOpRtwSerialization ({ indyNetworkId, genesisPath, esUrl, esIndex, workerTiming }) {
const operationType = 'ledgercpy'
const operationType = OPERATION_TYPES.LEDGER_CPY
const sourceLedger = await createSourceLedger({
name: indyNetworkId,
genesisPath
Expand Down
20 changes: 13 additions & 7 deletions indyscan-daemon/src/workers/worker-rtw.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,15 @@ const { envConfig } = require('../config/env')

function getExpandedTimingConfig (providedTimingSetup) {
let presetData
if (!providedTimingSetup || (typeof providedTimingSetup !== 'string')) {
if (!providedTimingSetup) {
presetData = getDefaultPreset()
} else {
} else if (typeof providedTimingSetup === 'string') {
presetData = resolvePreset(providedTimingSetup) || getDefaultPreset()
} else if (typeof providedTimingSetup === 'object') {
const defaultPreset = getDefaultPreset()
presetData = { ... defaultPreset, ... providedTimingSetup }
}
return Object.assign(presetData, providedTimingSetup)
return presetData
}

function validateTimingConfig (timingConfig) {
Expand All @@ -38,10 +41,10 @@ function validateTimingConfig (timingConfig) {
}

async function createWorkerRtw ({ indyNetworkId, subledger, operationType, iterator, iteratorTxFormat, transformer, target, timing }) {
console.log(`BUILDING WORKER for ${indyNetworkId}... timit= ${timing}`)
const eventEmitter = new EventEmitter()
const workerId = `${indyNetworkId}.${subledger}.${operationType}`
const logger = createLogger(workerId, envConfig.LOG_LEVEL, envConfig.ENABLE_LOGFILES)
logger.info(`Building RTW worker ${workerId} for network: ${indyNetworkId}`)
const eventEmitter = new EventEmitter()
const loggerMetadata = {
metadaemon: {
workerId,
Expand Down Expand Up @@ -81,7 +84,7 @@ async function createWorkerRtw ({ indyNetworkId, subledger, operationType, itera
throw Error(errMsg)
}
timing = getExpandedTimingConfig(timing)
logger.info(`Worker ${workerId} using timing ${JSON.stringify(timing)}`)
logger.info(`Effective timing configuration ${JSON.stringify(timing, null, 2)}`)
validateTimingConfig(timing)
const { timeoutOnSuccess, timeoutOnTxIngestionError, timeoutOnLedgerResolutionError, timeoutOnTxNoFound, jitterRatio } = timing

Expand Down Expand Up @@ -379,4 +382,7 @@ async function createWorkerRtw ({ indyNetworkId, subledger, operationType, itera
}
}

module.exports.createWorkerRtw = createWorkerRtw
module.exports = {
createWorkerRtw,
getExpandedTimingConfig
}
45 changes: 45 additions & 0 deletions indyscan-daemon/test/unit/workers/timing.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
const { getExpandedTimingConfig } = require('../../../src/workers/worker-rtw')
const sleep = require('sleep-promise')

describe('worker timing configuration', () => {
it('should expand timing configuration', async () => {
const timing = getExpandedTimingConfig("FAST")
const expected = {
"timeoutOnSuccess": 1000,
"timeoutOnTxIngestionError": 30000,
"timeoutOnLedgerResolutionError": 30000,
"timeoutOnTxNoFound": 3000,
"jitterRatio": 0.1
}
expect(timing).toStrictEqual(expected)
})

it('should expand timing configuration', async () => {
const timing = {
"timeoutOnSuccess": 1234,
"timeoutOnTxIngestionError": 2345,
"timeoutOnLedgerResolutionError": 3456,
"timeoutOnTxNoFound": 9999,
"jitterRatio": 0.1
}
const expandedTimingConfig = getExpandedTimingConfig(timing)
expect(expandedTimingConfig).toStrictEqual(timing)
})

it('should expand missing timing configuration with defaults', async () => {
const timing = {
"timeoutOnTxIngestionError": 60000,
"timeoutOnLedgerResolutionError": 60000,
"timeoutOnTxNoFound": 9000,
}
const expandedTimingConfig = getExpandedTimingConfig(timing)
const expected = {
"timeoutOnSuccess": 4000,
"timeoutOnTxIngestionError": 60000,
"timeoutOnLedgerResolutionError": 60000,
"timeoutOnTxNoFound": 9000,
"jitterRatio": 0.1
}
expect(expandedTimingConfig).toStrictEqual(expected)
})
})
13 changes: 13 additions & 0 deletions indyscan-webapp/nodemon-indyscan.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
{
"verbose": true,
"ignore": ["node_modules", ".next"],
"watch": ["server/**/*", "index.js"],
"ext": "js json",
"env": {
"INDYSCAN_API_URL": "https://indyscan.io",
"PORT" : 3707,
"LOG_LEVEL": "debug",
"LOG_HTTP_REQUESTS" : true,
"LOG_HTTP_RESPONSES" : true
}
}
3 changes: 2 additions & 1 deletion indyscan-webapp/nodemon.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
"watch": ["server/**/*", "index.js"],
"ext": "js json",
"env": {
"INDYSCAN_API_URL": "https://indyscan.io",
"INDYSCAN_API_URL": "http://localhost:3708",
"DAEMON_WS_URL": "http://localhost:3709",
"PORT" : 3707,
"LOG_LEVEL": "debug",
"LOG_HTTP_REQUESTS" : true,
Expand Down
Loading

0 comments on commit 45952da

Please sign in to comment.