From 14684138d75c597434481d09eea61e64d83316ed Mon Sep 17 00:00:00 2001 From: Jared Leonard Date: Mon, 2 Jul 2018 15:25:38 +0200 Subject: [PATCH 1/7] formatting and added logging --- .gitignore | 2 + .../migration/DynamoDBTableReplicator.java | 1065 +++++++++-------- 2 files changed, 535 insertions(+), 532 deletions(-) create mode 100644 .gitignore diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..5bd05d6 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +target +.idea \ No newline at end of file diff --git a/src/main/java/com/citusdata/migration/DynamoDBTableReplicator.java b/src/main/java/com/citusdata/migration/DynamoDBTableReplicator.java index 3629bf4..68e1dca 100644 --- a/src/main/java/com/citusdata/migration/DynamoDBTableReplicator.java +++ b/src/main/java/com/citusdata/migration/DynamoDBTableReplicator.java @@ -1,5 +1,5 @@ /** - * + * */ package com.citusdata.migration; @@ -69,567 +69,568 @@ /** * @author marco - * */ public class DynamoDBTableReplicator { - private static final Log LOG = LogFactory.getLog(DynamoDBTableReplicator.class); - - public static final String APPLICATION_NAME = "podyn"; - public static final String LEASE_TABLE_PREFIX = "podyn_migration_"; - - final AmazonDynamoDBStreams streamsClient; - final AmazonDynamoDB dynamoDBClient; - final AWSCredentialsProvider awsCredentialsProvider; - final ExecutorService executor; - - final TableEmitter emitter; - final String dynamoTableName; - - boolean addColumnsEnabled; - boolean useCitus; - boolean useLowerCaseColumnNames; - ConversionMode conversionMode; - - TableSchema tableSchema; - - public DynamoDBTableReplicator( - AmazonDynamoDB dynamoDBClient, - AmazonDynamoDBStreams streamsClient, - AWSCredentialsProvider awsCredentialsProvider, - ExecutorService executorService, - TableEmitter emitter, - String tableName) throws SQLException { - this.dynamoDBClient = dynamoDBClient; - this.streamsClient = streamsClient; - this.awsCredentialsProvider = awsCredentialsProvider; - this.executor = executorService; - this.emitter = emitter; - this.dynamoTableName = tableName; - this.addColumnsEnabled = true; - this.useCitus = false; - this.useLowerCaseColumnNames = false; - this.tableSchema = emitter.fetchSchema(this.dynamoTableName); - } - - public void setUseCitus(boolean useCitus) { - this.useCitus = useCitus; - } - - public void setAddColumnEnabled(boolean addColumnEnabled) { - this.addColumnsEnabled = addColumnEnabled; - } - - public void setUseLowerCaseColumnNames(boolean useLowerCaseColumnNames) { - this.useLowerCaseColumnNames = useLowerCaseColumnNames; - } - - public void setConversionMode(ConversionMode conversionMode) { - this.conversionMode = conversionMode; - } - - String dynamoKeyToColumnName(String keyName) { - if (useLowerCaseColumnNames) { - return keyName.toLowerCase(); - } else { - return keyName; - } - } - - public void replicateSchema() throws TableExistsException { - if (tableSchema != null) { - throw new TableExistsException("relation %s already exists", dynamoTableName); - } - - tableSchema = fetchSourceSchema(); - emitter.createTable(tableSchema); - } - - TableSchema fetchSourceSchema() { - TableSchema tableSchema = new TableSchema(dynamoTableName); - - DescribeTableResult describeTableResult = dynamoDBClient.describeTable(dynamoTableName); - TableDescription tableDescription = describeTableResult.getTable(); + private static final Log LOG = LogFactory.getLog(DynamoDBTableReplicator.class); + + public static final String APPLICATION_NAME = "podyn"; + public static final String LEASE_TABLE_PREFIX = "podyn_migration_"; + + final AmazonDynamoDBStreams streamsClient; + final AmazonDynamoDB dynamoDBClient; + final AWSCredentialsProvider awsCredentialsProvider; + final ExecutorService executor; + + final TableEmitter emitter; + final String dynamoTableName; + + boolean addColumnsEnabled; + boolean useCitus; + boolean useLowerCaseColumnNames; + ConversionMode conversionMode; + + TableSchema tableSchema; + + public DynamoDBTableReplicator( + AmazonDynamoDB dynamoDBClient, + AmazonDynamoDBStreams streamsClient, + AWSCredentialsProvider awsCredentialsProvider, + ExecutorService executorService, + TableEmitter emitter, + String tableName) throws SQLException { + this.dynamoDBClient = dynamoDBClient; + this.streamsClient = streamsClient; + this.awsCredentialsProvider = awsCredentialsProvider; + this.executor = executorService; + this.emitter = emitter; + this.dynamoTableName = tableName; + this.addColumnsEnabled = true; + this.useCitus = false; + this.useLowerCaseColumnNames = false; + this.tableSchema = emitter.fetchSchema(this.dynamoTableName); + } + + public void setUseCitus(boolean useCitus) { + this.useCitus = useCitus; + } + + public void setAddColumnEnabled(boolean addColumnEnabled) { + this.addColumnsEnabled = addColumnEnabled; + } + + public void setUseLowerCaseColumnNames(boolean useLowerCaseColumnNames) { + this.useLowerCaseColumnNames = useLowerCaseColumnNames; + } + + public void setConversionMode(ConversionMode conversionMode) { + this.conversionMode = conversionMode; + } + + String dynamoKeyToColumnName(String keyName) { + if (useLowerCaseColumnNames) { + return keyName.toLowerCase(); + } else { + return keyName; + } + } + + public void replicateSchema() throws TableExistsException { + if (tableSchema != null) { + throw new TableExistsException("relation %s already exists", dynamoTableName); + } + + tableSchema = fetchSourceSchema(); + emitter.createTable(tableSchema); + } + + TableSchema fetchSourceSchema() { + TableSchema tableSchema = new TableSchema(dynamoTableName); + + DescribeTableResult describeTableResult = dynamoDBClient.describeTable(dynamoTableName); + TableDescription tableDescription = describeTableResult.getTable(); - List attributeDefinitions = tableDescription.getAttributeDefinitions(); + List attributeDefinitions = tableDescription.getAttributeDefinitions(); - for (AttributeDefinition attributeDefinition : attributeDefinitions) { - String keyName = attributeDefinition.getAttributeName(); - String columnName = dynamoKeyToColumnName(keyName); - TableColumnType type = TableColumnType.text; + for (AttributeDefinition attributeDefinition : attributeDefinitions) { + String keyName = attributeDefinition.getAttributeName(); + String columnName = dynamoKeyToColumnName(keyName); + TableColumnType type = TableColumnType.text; - switch(attributeDefinition.getAttributeType()) { - case "N": - type = TableColumnType.numeric; - break; - case "B": - type = TableColumnType.bytea; - break; - } + switch (attributeDefinition.getAttributeType()) { + case "N": + type = TableColumnType.numeric; + break; + case "B": + type = TableColumnType.bytea; + break; + } - tableSchema.addColumn(columnName, type); - } + tableSchema.addColumn(columnName, type); + } - List primaryKey = new ArrayList<>(); - List keySchema = tableDescription.getKeySchema(); + List primaryKey = new ArrayList<>(); + List keySchema = tableDescription.getKeySchema(); - for (KeySchemaElement keySchemaElement : keySchema) { - String keyName = keySchemaElement.getAttributeName(); - String keyType = keySchemaElement.getKeyType(); - String columnName = dynamoKeyToColumnName(keyName); + for (KeySchemaElement keySchemaElement : keySchema) { + String keyName = keySchemaElement.getAttributeName(); + String keyType = keySchemaElement.getKeyType(); + String columnName = dynamoKeyToColumnName(keyName); - TableColumn column = tableSchema.getColumn(columnName); - - if (useCitus && KeyType.fromValue(keyType) == KeyType.HASH) { - tableSchema.setDistributionColumn(columnName); - } - - column.notNull = true; + TableColumn column = tableSchema.getColumn(columnName); + + if (useCitus && KeyType.fromValue(keyType) == KeyType.HASH) { + tableSchema.setDistributionColumn(columnName); + } + + column.notNull = true; - primaryKey.add(columnName); - } - - tableSchema.setPrimaryKey(primaryKey); + primaryKey.add(columnName); + } + + tableSchema.setPrimaryKey(primaryKey); - List secondaryIndexes = tableDescription.getGlobalSecondaryIndexes(); - - if (secondaryIndexes != null) { - for (GlobalSecondaryIndexDescription secondaryIndex : secondaryIndexes) { - String indexName = secondaryIndex.getIndexName(); - List indexColumns = new ArrayList<>(); + List secondaryIndexes = tableDescription.getGlobalSecondaryIndexes(); + + if (secondaryIndexes != null) { + for (GlobalSecondaryIndexDescription secondaryIndex : secondaryIndexes) { + String indexName = secondaryIndex.getIndexName(); + List indexColumns = new ArrayList<>(); - for (KeySchemaElement keySchemaElement : secondaryIndex.getKeySchema()) { - String keyName = keySchemaElement.getAttributeName(); - String columnName = dynamoKeyToColumnName(keyName); + for (KeySchemaElement keySchemaElement : secondaryIndex.getKeySchema()) { + String keyName = keySchemaElement.getAttributeName(); + String columnName = dynamoKeyToColumnName(keyName); - indexColumns.add(columnName); - } + indexColumns.add(columnName); + } - tableSchema.addIndex(indexName, indexColumns); - } - } + tableSchema.addIndex(indexName, indexColumns); + } + } - if(conversionMode == ConversionMode.jsonb) { - tableSchema.addColumn("data", TableColumnType.jsonb); - } + if (conversionMode == ConversionMode.jsonb) { + tableSchema.addColumn("data", TableColumnType.jsonb); + } - return tableSchema; - } + return tableSchema; + } - public Future startReplicatingData(final int maxScanRate) { - return executor.submit(new Callable() { - @Override - public Long call() throws Exception { - return replicateData(maxScanRate); - } - }); - } + public Future startReplicatingData(final int maxScanRate) { + return executor.submit(new Callable() { + @Override + public Long call() throws Exception { + return replicateData(maxScanRate); + } + }); + } - public long replicateData(int maxScanRate) { - RateLimiter rateLimiter = RateLimiter.create(maxScanRate); + public long replicateData(int maxScanRate) { + RateLimiter rateLimiter = RateLimiter.create(maxScanRate); - Map lastEvaluatedScanKey = null; - long numRowsReplicated = 0; + Map lastEvaluatedScanKey = null; + long numRowsReplicated = 0; - while(true) { - ScanResult scanResult = scanWithRetries(lastEvaluatedScanKey); + while (true) { + ScanResult scanResult = scanWithRetries(lastEvaluatedScanKey); - if (addColumnsEnabled) { - for(Map dynamoItem : scanResult.getItems()) { - addNewColumns(dynamoItem); - } - } + if (addColumnsEnabled) { + for (Map dynamoItem : scanResult.getItems()) { + addNewColumns(dynamoItem); + } + } - TableRowBatch tableRowBatch = new TableRowBatch(); + TableRowBatch tableRowBatch = new TableRowBatch(); - for(Map dynamoItem : scanResult.getItems()) { - TableRow tableRow = rowFromDynamoRecord(dynamoItem); + for (Map dynamoItem : scanResult.getItems()) { + TableRow tableRow = rowFromDynamoRecord(dynamoItem); - tableRowBatch.addRow(tableRow); - } + tableRowBatch.addRow(tableRow); + } - LOG.info(String.format("Replicated %d rows to table %s", tableRowBatch.size(), tableSchema.tableName)); + LOG.info(String.format("Replicated %d rows to table %s", tableRowBatch.size(), tableSchema.tableName)); - numRowsReplicated += tableRowBatch.size(); + numRowsReplicated += tableRowBatch.size(); /* load the batch using COPY */ - emitter.copyFromReader(tableSchema, tableRowBatch.asCopyReader()); - - lastEvaluatedScanKey = scanResult.getLastEvaluatedKey(); - - if(lastEvaluatedScanKey == null) { - break; - } - - // Account for the rest of the throughput we consumed, - // now that we know how much that scan request cost - double consumedCapacity = scanResult.getConsumedCapacity().getCapacityUnits(); - int permitsToConsume = (int)(consumedCapacity - 1.0); - if (permitsToConsume <= 0) { - permitsToConsume = 1; - } - - // Let the rate limiter wait until our desired throughput "recharges" - rateLimiter.acquire(permitsToConsume); - } - - return numRowsReplicated; - } - - private ScanResult scanWithRetries(Map lastEvaluatedScanKey) { - ScanRequest scanRequest = new ScanRequest(). - withTableName(this.dynamoTableName). - withConsistentRead(true). - withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL). - withLimit(100). - withExclusiveStartKey(lastEvaluatedScanKey); - - for (int tryNumber = 1; ; tryNumber++) { - try { - ScanResult scanResult = dynamoDBClient.scan(scanRequest); - return scanResult; - } catch (ProvisionedThroughputExceededException e) { - if (tryNumber == 3) { - throw e; - } - } catch (InternalServerErrorException e) { - if (tryNumber == 3) { - throw e; - } - } - - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - } - - public String getStreamArn() { - DescribeTableResult describeTableResult = dynamoDBClient.describeTable(dynamoTableName); - TableDescription tableDescription = describeTableResult.getTable(); - String tableStreamArn = tableDescription.getLatestStreamArn(); - return tableStreamArn; - } - - - public void startReplicatingChanges() throws StreamNotEnabledException { - if (tableSchema == null) { - throw new TableExistsException("table %s does not exist in destination", dynamoTableName); - } - - String tableStreamArn = getStreamArn(); - - if (tableStreamArn == null) { - throw new StreamNotEnabledException("table %s does not have a stream enabled\n", dynamoTableName); - } - - AmazonDynamoDBStreamsAdapterClient adapterClient = new AmazonDynamoDBStreamsAdapterClient(streamsClient); - AmazonCloudWatch cloudWatchClient = AmazonCloudWatchClientBuilder.standard().build(); - - String workerId = generateWorkerId(); - - final KinesisClientLibConfiguration workerConfig = new KinesisClientLibConfiguration( - APPLICATION_NAME, tableStreamArn, awsCredentialsProvider, workerId). - withMaxRecords(1000). - withIdleTimeBetweenReadsInMillis(500). - withCallProcessRecordsEvenForEmptyRecordList(false). - withCleanupLeasesUponShardCompletion(false). - withFailoverTimeMillis(20000). - withTableName(LEASE_TABLE_PREFIX + dynamoTableName). - withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON); - - Worker worker = new Worker.Builder(). - recordProcessorFactory(recordProcessorFactory). - config(workerConfig). - kinesisClient(adapterClient). - cloudWatchClient(cloudWatchClient). - dynamoDBClient(dynamoDBClient). - execService(executor). - build(); - - executor.execute(worker); - } - - IRecordProcessorFactory recordProcessorFactory = new IRecordProcessorFactory() { - @Override - public IRecordProcessor createProcessor() { - return createStreamProcessor(); - } - }; - - protected IRecordProcessor createStreamProcessor() { - return new IRecordProcessor() { - - @Override - public void initialize(InitializationInput initializationInput) { - } - - public List extractDynamoStreamRecords(List kinesisRecords) { - List dynamoRecords = new ArrayList<>(kinesisRecords.size()); - - for(com.amazonaws.services.kinesis.model.Record kinesisRecord : kinesisRecords) { - if (kinesisRecord instanceof RecordAdapter) { - Record dynamoRecord = ((RecordAdapter) kinesisRecord).getInternalObject(); - dynamoRecords.add(dynamoRecord); - } - } - - return dynamoRecords; - } - - @Override - public void processRecords(ProcessRecordsInput processRecordsInput) { - List records = extractDynamoStreamRecords(processRecordsInput.getRecords()); - - DynamoDBTableReplicator.this.processRecords(records); - - checkpoint(processRecordsInput.getCheckpointer()); - } - - @Override - public void shutdown(ShutdownInput shutdownInput) { - if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) { - checkpoint(shutdownInput.getCheckpointer()); - } - } - - void checkpoint(IRecordProcessorCheckpointer checkpointer) { - try { - checkpointer.checkpoint(); - } catch (KinesisClientLibDependencyException|InvalidStateException|ThrottlingException|ShutdownException e) { - LOG.warn(e); - } - } - }; - } - - void processRecords(List records) { - if (addColumnsEnabled) { - for (Record dynamoRecord : records) { - StreamRecord streamRecord = dynamoRecord.getDynamodb(); - Map item = streamRecord.getNewImage(); - - if (item == null) { - continue; - } - - addNewColumns(item); - } - } - - for (Record dynamoRecord : records) { - StreamRecord streamRecord = dynamoRecord.getDynamodb(); - - switch (dynamoRecord.getEventName()) { - case "INSERT": - case "MODIFY": - Map dynamoItem = streamRecord.getNewImage(); - - if(dynamoItem == null) { - LOG.error(String.format("the stream for table %s does not have new images", dynamoTableName)); - System.exit(1); - } - - TableRow tableRow = rowFromDynamoRecord(dynamoItem); - emitter.upsert(tableRow); - LOG.debug(tableRow.toUpsert()); - break; - case "REMOVE": - Map dynamoKeys = streamRecord.getKeys(); - PrimaryKeyValue keyValue = primaryKeyValueFromDynamoKeys(dynamoKeys); - emitter.delete(keyValue); - LOG.debug(keyValue.toDelete()); - break; - } - - LOG.debug(streamRecord); - } - - LOG.info(String.format("Replicated %d changes to table %s", records.size(), tableSchema.tableName)); - } - - void addNewColumns(Map item) { - if(conversionMode == ConversionMode.jsonb) { - /* don't add new columns in jsonb mode */ - return; - } - - for(Map.Entry entry : item.entrySet()) { - String keyName = entry.getKey(); - String columnName = dynamoKeyToColumnName(keyName); - TableColumn column = tableSchema.getColumn(columnName); - TableColumnType valueType = DynamoDBTableReplicator.columnTypeFromDynamoValue(entry.getValue()); - - if (column == null) { - column = tableSchema.addColumn(columnName, valueType); - LOG.info(String.format("Adding new column to table %s: %s", tableSchema.tableName, column)); - emitter.createColumn(column); - } else if (column.type != valueType) { - columnName = columnName + "_" + valueType; - column = tableSchema.getColumn(columnName); - - if (column == null) { - column = tableSchema.addColumn(columnName, valueType); - LOG.info(String.format("Adding new column to table %s: %s", tableSchema.tableName, column)); - emitter.createColumn(column); - } - } - } - } - - PrimaryKeyValue primaryKeyValueFromDynamoKeys(Map dynamoKeys) { - PrimaryKeyValue keyValue = new PrimaryKeyValue(tableSchema); - - for(Map.Entry entry : dynamoKeys.entrySet()) { - String keyName = entry.getKey(); - String columnName = dynamoKeyToColumnName(keyName); - - if (!tableSchema.isInPrimaryKey(columnName)) { - continue; - } - - TableColumnValue columnValue = DynamoDBTableReplicator.columnValueFromDynamoValue(entry.getValue()); - - keyValue.setValue(columnName, columnValue); - } - - return keyValue; - } - - static String generateWorkerId() { - StringBuilder sb = new StringBuilder(); - - try { - sb.append(InetAddress.getLocalHost().getCanonicalHostName()); - } catch (UnknownHostException e) { - } - - sb.append('_'); - sb.append(UUID.randomUUID()); - - return sb.toString(); - } - - public TableRow rowFromDynamoRecord(Map dynamoItem) { - if (conversionMode == ConversionMode.jsonb) { - return rowWithJsonbFromDynamoRecord(dynamoItem); - } else { - return rowWithColumnsFromDynamoRecord(dynamoItem); - - } - } - - public TableRow rowWithJsonbFromDynamoRecord(Map dynamoItem) { - TableRow row = tableSchema.createRow(); - Item item = new Item(); - - for(Map.Entry entry : dynamoItem.entrySet()) { - String keyName = entry.getKey(); - String columnName = dynamoKeyToColumnName(keyName); - TableColumn column = tableSchema.getColumn(columnName); - AttributeValue typedValue = entry.getValue(); - TableColumnValue columnValue = columnValueFromDynamoValue(typedValue); - - if (column != null) { - row.setValue(columnName, columnValue); - } - - item.with(keyName, columnValue.datum); - } - - row.setValue("data", item.toJSON()); - - return row; - } - - public TableRow rowWithColumnsFromDynamoRecord(Map dynamoItem) { - TableRow row = tableSchema.createRow(); - - for(Map.Entry entry : dynamoItem.entrySet()) { - String keyName = entry.getKey(); - String columnName = dynamoKeyToColumnName(keyName); - TableColumn column = tableSchema.getColumn(columnName); - - if (column == null) { + emitter.copyFromReader(tableSchema, tableRowBatch.asCopyReader()); + + lastEvaluatedScanKey = scanResult.getLastEvaluatedKey(); + + if (lastEvaluatedScanKey == null) { + break; + } + + // Account for the rest of the throughput we consumed, + // now that we know how much that scan request cost + double consumedCapacity = scanResult.getConsumedCapacity().getCapacityUnits(); + int permitsToConsume = (int) (consumedCapacity - 1.0); + if (permitsToConsume <= 0) { + permitsToConsume = 1; + } + + // Let the rate limiter wait until our desired throughput "recharges" + rateLimiter.acquire(permitsToConsume); + } + + return numRowsReplicated; + } + + private ScanResult scanWithRetries(Map lastEvaluatedScanKey) { + ScanRequest scanRequest = new ScanRequest(). + withTableName(this.dynamoTableName). + withConsistentRead(true). + withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL). + withLimit(100). + withExclusiveStartKey(lastEvaluatedScanKey); + + for (int tryNumber = 1; ; tryNumber++) { + try { + ScanResult scanResult = dynamoDBClient.scan(scanRequest); + return scanResult; + } catch (ProvisionedThroughputExceededException e) { + if (tryNumber == 3) { + throw e; + } + } catch (InternalServerErrorException e) { + if (tryNumber == 3) { + throw e; + } + } + + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } + + public String getStreamArn() { + DescribeTableResult describeTableResult = dynamoDBClient.describeTable(dynamoTableName); + TableDescription tableDescription = describeTableResult.getTable(); + String tableStreamArn = tableDescription.getLatestStreamArn(); + return tableStreamArn; + } + + + public void startReplicatingChanges() throws StreamNotEnabledException { + if (tableSchema == null) { + throw new TableExistsException("table %s does not exist in destination", dynamoTableName); + } + + String tableStreamArn = getStreamArn(); + + if (tableStreamArn == null) { + throw new StreamNotEnabledException("table %s does not have a stream enabled\n", dynamoTableName); + } + + AmazonDynamoDBStreamsAdapterClient adapterClient = new AmazonDynamoDBStreamsAdapterClient(streamsClient); + AmazonCloudWatch cloudWatchClient = AmazonCloudWatchClientBuilder.standard().build(); + + String workerId = generateWorkerId(); + + final KinesisClientLibConfiguration workerConfig = new KinesisClientLibConfiguration( + APPLICATION_NAME, tableStreamArn, awsCredentialsProvider, workerId). + withMaxRecords(1000). + withIdleTimeBetweenReadsInMillis(500). + withCallProcessRecordsEvenForEmptyRecordList(false). + withCleanupLeasesUponShardCompletion(false). + withFailoverTimeMillis(20000). + withTableName(LEASE_TABLE_PREFIX + dynamoTableName). + withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON); + + Worker worker = new Worker.Builder(). + recordProcessorFactory(recordProcessorFactory). + config(workerConfig). + kinesisClient(adapterClient). + cloudWatchClient(cloudWatchClient). + dynamoDBClient(dynamoDBClient). + execService(executor). + build(); + + executor.execute(worker); + } + + IRecordProcessorFactory recordProcessorFactory = new IRecordProcessorFactory() { + @Override + public IRecordProcessor createProcessor() { + return createStreamProcessor(); + } + }; + + protected IRecordProcessor createStreamProcessor() { + return new IRecordProcessor() { + + @Override + public void initialize(InitializationInput initializationInput) { + } + + public List extractDynamoStreamRecords(List kinesisRecords) { + List dynamoRecords = new ArrayList<>(kinesisRecords.size()); + + for (com.amazonaws.services.kinesis.model.Record kinesisRecord : kinesisRecords) { + if (kinesisRecord instanceof RecordAdapter) { + Record dynamoRecord = ((RecordAdapter) kinesisRecord).getInternalObject(); + dynamoRecords.add(dynamoRecord); + } + } + + return dynamoRecords; + } + + @Override + public void processRecords(ProcessRecordsInput processRecordsInput) { + List records = extractDynamoStreamRecords(processRecordsInput.getRecords()); + + DynamoDBTableReplicator.this.processRecords(records); + + checkpoint(processRecordsInput.getCheckpointer()); + } + + @Override + public void shutdown(ShutdownInput shutdownInput) { + if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) { + checkpoint(shutdownInput.getCheckpointer()); + } + } + + void checkpoint(IRecordProcessorCheckpointer checkpointer) { + try { + checkpointer.checkpoint(); + } catch (KinesisClientLibDependencyException | InvalidStateException | ThrottlingException | ShutdownException e) { + LOG.warn(e); + } + } + }; + } + + void processRecords(List records) { + try { + if (addColumnsEnabled) { + for (Record dynamoRecord : records) { + StreamRecord streamRecord = dynamoRecord.getDynamodb(); + Map item = streamRecord.getNewImage(); + + if (item == null) { + continue; + } + addNewColumns(item); + } + } + + for (Record dynamoRecord : records) { + StreamRecord streamRecord = dynamoRecord.getDynamodb(); + + switch (dynamoRecord.getEventName()) { + case "INSERT": + case "MODIFY": + Map dynamoItem = streamRecord.getNewImage(); + + if (dynamoItem == null) { + LOG.error(String.format("the stream for table %s does not have new images", dynamoTableName)); + System.exit(1); + } + + TableRow tableRow = rowFromDynamoRecord(dynamoItem); + emitter.upsert(tableRow); + LOG.debug(tableRow.toUpsert()); + break; + case "REMOVE": + Map dynamoKeys = streamRecord.getKeys(); + PrimaryKeyValue keyValue = primaryKeyValueFromDynamoKeys(dynamoKeys); + emitter.delete(keyValue); + LOG.debug(keyValue.toDelete()); + break; + } + + LOG.debug(streamRecord); + } + + LOG.info(String.format("Replicated %d changes to table %s", records.size(), tableSchema.tableName)); + } catch (Exeception e) { + e.printStackTrace(); + LOG.error(e); + } + } + + void addNewColumns(Map item) { + if (conversionMode == ConversionMode.jsonb) { + /* don't add new columns in jsonb mode */ + return; + } + + for (Map.Entry entry : item.entrySet()) { + String keyName = entry.getKey(); + String columnName = dynamoKeyToColumnName(keyName); + TableColumn column = tableSchema.getColumn(columnName); + TableColumnType valueType = DynamoDBTableReplicator.columnTypeFromDynamoValue(entry.getValue()); + + if (column == null) { + column = tableSchema.addColumn(columnName, valueType); + LOG.info(String.format("Adding new column to table %s: %s", tableSchema.tableName, column)); + emitter.createColumn(column); + } else if (column.type != valueType) { + columnName = columnName + "_" + valueType; + column = tableSchema.getColumn(columnName); + + if (column == null) { + column = tableSchema.addColumn(columnName, valueType); + LOG.info(String.format("Adding new column to table %s: %s", tableSchema.tableName, column)); + emitter.createColumn(column); + } + } + } + } + + PrimaryKeyValue primaryKeyValueFromDynamoKeys(Map dynamoKeys) { + PrimaryKeyValue keyValue = new PrimaryKeyValue(tableSchema); + + for (Map.Entry entry : dynamoKeys.entrySet()) { + String keyName = entry.getKey(); + String columnName = dynamoKeyToColumnName(keyName); + + if (!tableSchema.isInPrimaryKey(columnName)) { + continue; + } + + TableColumnValue columnValue = DynamoDBTableReplicator.columnValueFromDynamoValue(entry.getValue()); + + keyValue.setValue(columnName, columnValue); + } + + return keyValue; + } + + static String generateWorkerId() { + StringBuilder sb = new StringBuilder(); + + try { + sb.append(InetAddress.getLocalHost().getCanonicalHostName()); + } catch (UnknownHostException e) { + } + + sb.append('_'); + sb.append(UUID.randomUUID()); + + return sb.toString(); + } + + public TableRow rowFromDynamoRecord(Map dynamoItem) { + if (conversionMode == ConversionMode.jsonb) { + return rowWithJsonbFromDynamoRecord(dynamoItem); + } else { + return rowWithColumnsFromDynamoRecord(dynamoItem); + } + } + + public TableRow rowWithJsonbFromDynamoRecord(Map dynamoItem) { + TableRow row = tableSchema.createRow(); + Item item = new Item(); + + for (Map.Entry entry : dynamoItem.entrySet()) { + String keyName = entry.getKey(); + String columnName = dynamoKeyToColumnName(keyName); + TableColumn column = tableSchema.getColumn(columnName); + AttributeValue typedValue = entry.getValue(); + TableColumnValue columnValue = columnValueFromDynamoValue(typedValue); + + if (column != null) { + row.setValue(columnName, columnValue); + } + + item.with(keyName, columnValue.datum); + } + + row.setValue("data", item.toJSON()); + + return row; + } + + public TableRow rowWithColumnsFromDynamoRecord(Map dynamoItem) { + TableRow row = tableSchema.createRow(); + + for (Map.Entry entry : dynamoItem.entrySet()) { + String keyName = entry.getKey(); + String columnName = dynamoKeyToColumnName(keyName); + TableColumn column = tableSchema.getColumn(columnName); + + if (column == null) { /* skip non-existent columns */ - continue; - } - - AttributeValue typedValue = entry.getValue(); - TableColumnValue columnValue = columnValueFromDynamoValue(typedValue); - - if (columnValue.type == column.type) { - row.setValue(columnName, columnValue); - } else { - - row.setValue(columnName + "_" + columnValue.type, columnValue); - } - } - - return row; - } - - public static TableColumnValue columnValueFromDynamoValue(AttributeValue typedValue) { - if(typedValue.getB() != null) { - ByteBuffer value = typedValue.getB(); - return new TableColumnValue(TableColumnType.bytea, value.array()); - } else if (typedValue.getBOOL() != null) { - Boolean value = typedValue.getBOOL(); - return new TableColumnValue(TableColumnType.bool, value); - } else if (typedValue.getBS() != null) { - List value = typedValue.getBS(); - return new TableColumnValue(TableColumnType.jsonb, Jackson.toJsonString(value)); - } else if (typedValue.getL() != null) { - List value = typedValue.getL(); - List simpleList = InternalUtils.toSimpleList(value); - return new TableColumnValue(TableColumnType.jsonb, Jackson.toJsonString(simpleList)); - } else if (typedValue.getM() != null) { - Map value = typedValue.getM(); - Item simpleMap = Item.fromMap(InternalUtils.toSimpleMapValue(value)); - return new TableColumnValue(TableColumnType.jsonb, simpleMap.toJSON()); - } else if (typedValue.getN() != null) { - String value = typedValue.getN(); - return new TableColumnValue(TableColumnType.numeric, value); - } else if (typedValue.getNS() != null) { - List value = typedValue.getNS(); - return new TableColumnValue(TableColumnType.jsonb, Jackson.toJsonString(value)); - } else if (typedValue.getS() != null) { - String value = typedValue.getS(); - return new TableColumnValue(TableColumnType.text, value); - } else if (typedValue.getSS() != null) { - List value = typedValue.getSS(); - return new TableColumnValue(TableColumnType.jsonb, Jackson.toJsonString(value)); - } else { - return null; - } - } - - public static TableColumnType columnTypeFromDynamoValue(AttributeValue typedValue) { - if(typedValue.getB() != null) { - return TableColumnType.bytea; - } else if (typedValue.getBOOL() != null) { - return TableColumnType.bool; - } else if (typedValue.getBS() != null) { - return TableColumnType.jsonb; - } else if (typedValue.getL() != null) { - return TableColumnType.jsonb; - } else if (typedValue.getM() != null) { - return TableColumnType.jsonb; - } else if (typedValue.getN() != null) { - return TableColumnType.numeric; - } else if (typedValue.getNS() != null) { - return TableColumnType.jsonb; - } else if (typedValue.getS() != null) { - return TableColumnType.text; - } else if (typedValue.getSS() != null) { - return TableColumnType.jsonb; - } else { - return TableColumnType.text; - } - } - + continue; + } + + AttributeValue typedValue = entry.getValue(); + TableColumnValue columnValue = columnValueFromDynamoValue(typedValue); + + if (columnValue.type == column.type) { + row.setValue(columnName, columnValue); + } else { + + row.setValue(columnName + "_" + columnValue.type, columnValue); + } + } + + return row; + } + + public static TableColumnValue columnValueFromDynamoValue(AttributeValue typedValue) { + if (typedValue.getB() != null) { + ByteBuffer value = typedValue.getB(); + return new TableColumnValue(TableColumnType.bytea, value.array()); + } else if (typedValue.getBOOL() != null) { + Boolean value = typedValue.getBOOL(); + return new TableColumnValue(TableColumnType.bool, value); + } else if (typedValue.getBS() != null) { + List value = typedValue.getBS(); + return new TableColumnValue(TableColumnType.jsonb, Jackson.toJsonString(value)); + } else if (typedValue.getL() != null) { + List value = typedValue.getL(); + List simpleList = InternalUtils.toSimpleList(value); + return new TableColumnValue(TableColumnType.jsonb, Jackson.toJsonString(simpleList)); + } else if (typedValue.getM() != null) { + Map value = typedValue.getM(); + Item simpleMap = Item.fromMap(InternalUtils.toSimpleMapValue(value)); + return new TableColumnValue(TableColumnType.jsonb, simpleMap.toJSON()); + } else if (typedValue.getN() != null) { + String value = typedValue.getN(); + return new TableColumnValue(TableColumnType.numeric, value); + } else if (typedValue.getNS() != null) { + List value = typedValue.getNS(); + return new TableColumnValue(TableColumnType.jsonb, Jackson.toJsonString(value)); + } else if (typedValue.getS() != null) { + String value = typedValue.getS(); + return new TableColumnValue(TableColumnType.text, value); + } else if (typedValue.getSS() != null) { + List value = typedValue.getSS(); + return new TableColumnValue(TableColumnType.jsonb, Jackson.toJsonString(value)); + } else { + return null; + } + } + + public static TableColumnType columnTypeFromDynamoValue(AttributeValue typedValue) { + if (typedValue.getB() != null) { + return TableColumnType.bytea; + } else if (typedValue.getBOOL() != null) { + return TableColumnType.bool; + } else if (typedValue.getBS() != null) { + return TableColumnType.jsonb; + } else if (typedValue.getL() != null) { + return TableColumnType.jsonb; + } else if (typedValue.getM() != null) { + return TableColumnType.jsonb; + } else if (typedValue.getN() != null) { + return TableColumnType.numeric; + } else if (typedValue.getNS() != null) { + return TableColumnType.jsonb; + } else if (typedValue.getS() != null) { + return TableColumnType.text; + } else if (typedValue.getSS() != null) { + return TableColumnType.jsonb; + } else { + return TableColumnType.text; + } + } } From 4eb122dd10c712b5a194bdf32382323cc67f904a Mon Sep 17 00:00:00 2001 From: Jared Leonard Date: Mon, 2 Jul 2018 15:49:09 +0200 Subject: [PATCH 2/7] Typo --- .../java/com/citusdata/migration/DynamoDBTableReplicator.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/com/citusdata/migration/DynamoDBTableReplicator.java b/src/main/java/com/citusdata/migration/DynamoDBTableReplicator.java index 68e1dca..d9f141b 100644 --- a/src/main/java/com/citusdata/migration/DynamoDBTableReplicator.java +++ b/src/main/java/com/citusdata/migration/DynamoDBTableReplicator.java @@ -447,7 +447,7 @@ void processRecords(List records) { } LOG.info(String.format("Replicated %d changes to table %s", records.size(), tableSchema.tableName)); - } catch (Exeception e) { + } catch (Exception e) { e.printStackTrace(); LOG.error(e); } From 3f0904fd766c9354c6742112ffbe7365247448d8 Mon Sep 17 00:00:00 2001 From: Jared Leonard Date: Tue, 3 Jul 2018 16:45:01 +0200 Subject: [PATCH 3/7] added more logging --- .../java/com/citusdata/migration/DynamoDBReplicator.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/main/java/com/citusdata/migration/DynamoDBReplicator.java b/src/main/java/com/citusdata/migration/DynamoDBReplicator.java index cce6d8b..eeacfe0 100644 --- a/src/main/java/com/citusdata/migration/DynamoDBReplicator.java +++ b/src/main/java/com/citusdata/migration/DynamoDBReplicator.java @@ -214,13 +214,16 @@ public void run() { } } catch (ParseException e) { + e.printStackTrace(); LOG.error(e.getMessage()); formatter.printHelp("podyn", options); System.exit(3); } catch (TableExistsException|NonExistingTableException e) { + e.printStackTrace(); LOG.error(e.getMessage()); System.exit(1); } catch (ExecutionException e) { + e.printStackTrace(); Throwable cause = e.getCause(); if (cause.getCause() != null) { @@ -230,6 +233,7 @@ public void run() { } System.exit(1); } catch (EmissionException e) { + e.printStackTrace(); if (e.getCause() != null) { LOG.error(e.getCause().getMessage()); } else { @@ -240,6 +244,7 @@ public void run() { e.printStackTrace(); System.exit(2); } catch (Exception e) { + e.printStackTrace(); LOG.error(e); System.exit(1); } From 2a0572d749d6b990a82879da8aac0f0af9c80bbd Mon Sep 17 00:00:00 2001 From: Jared Leonard Date: Tue, 3 Jul 2018 19:52:00 +0200 Subject: [PATCH 4/7] null check on columnValue --- .../java/com/citusdata/migration/DynamoDBTableReplicator.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/main/java/com/citusdata/migration/DynamoDBTableReplicator.java b/src/main/java/com/citusdata/migration/DynamoDBTableReplicator.java index d9f141b..2f42b90 100644 --- a/src/main/java/com/citusdata/migration/DynamoDBTableReplicator.java +++ b/src/main/java/com/citusdata/migration/DynamoDBTableReplicator.java @@ -562,10 +562,9 @@ public TableRow rowWithColumnsFromDynamoRecord(Map dynam AttributeValue typedValue = entry.getValue(); TableColumnValue columnValue = columnValueFromDynamoValue(typedValue); - if (columnValue.type == column.type) { + if (columnValue != null && columnValue.type == column.type) { row.setValue(columnName, columnValue); } else { - row.setValue(columnName + "_" + columnValue.type, columnValue); } } From 91f402b81cd04c71f8a6a410f923b8ba8355545d Mon Sep 17 00:00:00 2001 From: Jared Leonard Date: Tue, 3 Jul 2018 20:41:55 +0200 Subject: [PATCH 5/7] added getNULL --- .../com/citusdata/migration/DynamoDBTableReplicator.java | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/citusdata/migration/DynamoDBTableReplicator.java b/src/main/java/com/citusdata/migration/DynamoDBTableReplicator.java index 2f42b90..f2bf03e 100644 --- a/src/main/java/com/citusdata/migration/DynamoDBTableReplicator.java +++ b/src/main/java/com/citusdata/migration/DynamoDBTableReplicator.java @@ -562,7 +562,7 @@ public TableRow rowWithColumnsFromDynamoRecord(Map dynam AttributeValue typedValue = entry.getValue(); TableColumnValue columnValue = columnValueFromDynamoValue(typedValue); - if (columnValue != null && columnValue.type == column.type) { + if (columnValue.type == column.type) { row.setValue(columnName, columnValue); } else { row.setValue(columnName + "_" + columnValue.type, columnValue); @@ -601,8 +601,11 @@ public static TableColumnValue columnValueFromDynamoValue(AttributeValue typedVa return new TableColumnValue(TableColumnType.text, value); } else if (typedValue.getSS() != null) { List value = typedValue.getSS(); - return new TableColumnValue(TableColumnType.jsonb, Jackson.toJsonString(value)); - } else { + return new TableColumnValue(TableColumnType.text, Jackson.toJsonString(value)); + } else if (typedValue.getNULL() != null) { + return new TableColumnValue(TableColumnType.text, ""); + } + else { return null; } } From 44726a5b0c3165c2a527682b31666de5e22fa33b Mon Sep 17 00:00:00 2001 From: Jared Leonard Date: Wed, 11 Jul 2018 12:04:36 +0200 Subject: [PATCH 6/7] added a new option to pydon time to test --- .gitignore | 10 +- .../migration/DynamoDBReplicator.java | 7 +- .../migration/DynamoDBTableReplicator.java | 133 +++++++++--------- 3 files changed, 80 insertions(+), 70 deletions(-) diff --git a/.gitignore b/.gitignore index 5bd05d6..c5dbb08 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,10 @@ target -.idea \ No newline at end of file +.idea +.classpath +.factorypath +.project +dependency-reduced-pom.xml +.settings/org.eclipse.jdt.apt.core.prefs +.settings/org.eclipse.jdt.apt.core.prefs +.settings/org.eclipse.jdt.core.prefs +.settings/org.eclipse.m2e.core.prefs diff --git a/src/main/java/com/citusdata/migration/DynamoDBReplicator.java b/src/main/java/com/citusdata/migration/DynamoDBReplicator.java index eeacfe0..6d9b2dc 100644 --- a/src/main/java/com/citusdata/migration/DynamoDBReplicator.java +++ b/src/main/java/com/citusdata/migration/DynamoDBReplicator.java @@ -89,6 +89,10 @@ public static void main(String[] args) throws SQLException, IOException { maxScanRateOption.setRequired(false); options.addOption(maxScanRateOption); + Option convertNumberToText = new Option("ntx", "convert-numbers-to-text", false, "Convert Number Types to text"); + convertNumberToText.setRequired(false); + options.addOption(convertNumberToText); + CommandLineParser parser = new DefaultParser(); HelpFormatter formatter = new HelpFormatter(); formatter.setWidth(120); @@ -108,6 +112,7 @@ public static void main(String[] args) throws SQLException, IOException { boolean useCitus = cmd.hasOption("citus"); boolean useLowerCaseColumnNames = cmd.hasOption("lower-case-column-names"); + boolean convertNumbersToText = cmd.hasOption("convert-numbers-to-text"); int maxScanRate = Integer.parseInt(cmd.getOptionValue("scan-rate", "25")); int dbConnectionCount = Integer.parseInt(cmd.getOptionValue("num-connections", "16")); String tableNamesString = cmd.getOptionValue("table"); @@ -177,8 +182,8 @@ public void run() { replicator.setAddColumnEnabled(true); replicator.setUseCitus(useCitus); replicator.setUseLowerCaseColumnNames(useLowerCaseColumnNames); + DynamoDBTableReplicator.setConvertNumberTypesToText(convertNumbersToText); replicator.setConversionMode(conversionMode); - replicators.add(replicator); } diff --git a/src/main/java/com/citusdata/migration/DynamoDBTableReplicator.java b/src/main/java/com/citusdata/migration/DynamoDBTableReplicator.java index f2bf03e..ac54daa 100644 --- a/src/main/java/com/citusdata/migration/DynamoDBTableReplicator.java +++ b/src/main/java/com/citusdata/migration/DynamoDBTableReplicator.java @@ -88,16 +88,13 @@ public class DynamoDBTableReplicator { boolean addColumnsEnabled; boolean useCitus; boolean useLowerCaseColumnNames; + private static boolean _convertNumbersToText = false; ConversionMode conversionMode; TableSchema tableSchema; - public DynamoDBTableReplicator( - AmazonDynamoDB dynamoDBClient, - AmazonDynamoDBStreams streamsClient, - AWSCredentialsProvider awsCredentialsProvider, - ExecutorService executorService, - TableEmitter emitter, + public DynamoDBTableReplicator(AmazonDynamoDB dynamoDBClient, AmazonDynamoDBStreams streamsClient, + AWSCredentialsProvider awsCredentialsProvider, ExecutorService executorService, TableEmitter emitter, String tableName) throws SQLException { this.dynamoDBClient = dynamoDBClient; this.streamsClient = streamsClient; @@ -123,6 +120,10 @@ public void setUseLowerCaseColumnNames(boolean useLowerCaseColumnNames) { this.useLowerCaseColumnNames = useLowerCaseColumnNames; } + public static void setConvertNumberTypesToText(boolean convertNumbersToText) { + _convertNumbersToText = convertNumbersToText; + } + public void setConversionMode(ConversionMode conversionMode) { this.conversionMode = conversionMode; } @@ -158,12 +159,12 @@ TableSchema fetchSourceSchema() { TableColumnType type = TableColumnType.text; switch (attributeDefinition.getAttributeType()) { - case "N": - type = TableColumnType.numeric; - break; - case "B": - type = TableColumnType.bytea; - break; + case "N": + type = TableColumnType.numeric; + break; + case "B": + type = TableColumnType.bytea; + break; } tableSchema.addColumn(columnName, type); @@ -251,7 +252,7 @@ public long replicateData(int maxScanRate) { numRowsReplicated += tableRowBatch.size(); - /* load the batch using COPY */ + /* load the batch using COPY */ emitter.copyFromReader(tableSchema, tableRowBatch.asCopyReader()); lastEvaluatedScanKey = scanResult.getLastEvaluatedKey(); @@ -276,14 +277,11 @@ public long replicateData(int maxScanRate) { } private ScanResult scanWithRetries(Map lastEvaluatedScanKey) { - ScanRequest scanRequest = new ScanRequest(). - withTableName(this.dynamoTableName). - withConsistentRead(true). - withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL). - withLimit(100). - withExclusiveStartKey(lastEvaluatedScanKey); - - for (int tryNumber = 1; ; tryNumber++) { + ScanRequest scanRequest = new ScanRequest().withTableName(this.dynamoTableName).withConsistentRead(true) + .withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL).withLimit(100) + .withExclusiveStartKey(lastEvaluatedScanKey); + + for (int tryNumber = 1;; tryNumber++) { try { ScanResult scanResult = dynamoDBClient.scan(scanRequest); return scanResult; @@ -312,7 +310,6 @@ public String getStreamArn() { return tableStreamArn; } - public void startReplicatingChanges() throws StreamNotEnabledException { if (tableSchema == null) { throw new TableExistsException("table %s does not exist in destination", dynamoTableName); @@ -329,24 +326,16 @@ public void startReplicatingChanges() throws StreamNotEnabledException { String workerId = generateWorkerId(); - final KinesisClientLibConfiguration workerConfig = new KinesisClientLibConfiguration( - APPLICATION_NAME, tableStreamArn, awsCredentialsProvider, workerId). - withMaxRecords(1000). - withIdleTimeBetweenReadsInMillis(500). - withCallProcessRecordsEvenForEmptyRecordList(false). - withCleanupLeasesUponShardCompletion(false). - withFailoverTimeMillis(20000). - withTableName(LEASE_TABLE_PREFIX + dynamoTableName). - withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON); - - Worker worker = new Worker.Builder(). - recordProcessorFactory(recordProcessorFactory). - config(workerConfig). - kinesisClient(adapterClient). - cloudWatchClient(cloudWatchClient). - dynamoDBClient(dynamoDBClient). - execService(executor). - build(); + final KinesisClientLibConfiguration workerConfig = new KinesisClientLibConfiguration(APPLICATION_NAME, + tableStreamArn, awsCredentialsProvider, workerId).withMaxRecords(1000) + .withIdleTimeBetweenReadsInMillis(500).withCallProcessRecordsEvenForEmptyRecordList(false) + .withCleanupLeasesUponShardCompletion(false).withFailoverTimeMillis(20000) + .withTableName(LEASE_TABLE_PREFIX + dynamoTableName) + .withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON); + + Worker worker = new Worker.Builder().recordProcessorFactory(recordProcessorFactory).config(workerConfig) + .kinesisClient(adapterClient).cloudWatchClient(cloudWatchClient).dynamoDBClient(dynamoDBClient) + .execService(executor).build(); executor.execute(worker); } @@ -365,7 +354,8 @@ protected IRecordProcessor createStreamProcessor() { public void initialize(InitializationInput initializationInput) { } - public List extractDynamoStreamRecords(List kinesisRecords) { + public List extractDynamoStreamRecords( + List kinesisRecords) { List dynamoRecords = new ArrayList<>(kinesisRecords.size()); for (com.amazonaws.services.kinesis.model.Record kinesisRecord : kinesisRecords) { @@ -397,7 +387,8 @@ public void shutdown(ShutdownInput shutdownInput) { void checkpoint(IRecordProcessorCheckpointer checkpointer) { try { checkpointer.checkpoint(); - } catch (KinesisClientLibDependencyException | InvalidStateException | ThrottlingException | ShutdownException e) { + } catch (KinesisClientLibDependencyException | InvalidStateException | ThrottlingException + | ShutdownException e) { LOG.warn(e); } } @@ -422,25 +413,25 @@ void processRecords(List records) { StreamRecord streamRecord = dynamoRecord.getDynamodb(); switch (dynamoRecord.getEventName()) { - case "INSERT": - case "MODIFY": - Map dynamoItem = streamRecord.getNewImage(); - - if (dynamoItem == null) { - LOG.error(String.format("the stream for table %s does not have new images", dynamoTableName)); - System.exit(1); - } - - TableRow tableRow = rowFromDynamoRecord(dynamoItem); - emitter.upsert(tableRow); - LOG.debug(tableRow.toUpsert()); - break; - case "REMOVE": - Map dynamoKeys = streamRecord.getKeys(); - PrimaryKeyValue keyValue = primaryKeyValueFromDynamoKeys(dynamoKeys); - emitter.delete(keyValue); - LOG.debug(keyValue.toDelete()); - break; + case "INSERT": + case "MODIFY": + Map dynamoItem = streamRecord.getNewImage(); + + if (dynamoItem == null) { + LOG.error(String.format("the stream for table %s does not have new images", dynamoTableName)); + System.exit(1); + } + + TableRow tableRow = rowFromDynamoRecord(dynamoItem); + emitter.upsert(tableRow); + LOG.debug(tableRow.toUpsert()); + break; + case "REMOVE": + Map dynamoKeys = streamRecord.getKeys(); + PrimaryKeyValue keyValue = primaryKeyValueFromDynamoKeys(dynamoKeys); + emitter.delete(keyValue); + LOG.debug(keyValue.toDelete()); + break; } LOG.debug(streamRecord); @@ -555,7 +546,7 @@ public TableRow rowWithColumnsFromDynamoRecord(Map dynam TableColumn column = tableSchema.getColumn(columnName); if (column == null) { - /* skip non-existent columns */ + /* skip non-existent columns */ continue; } @@ -591,8 +582,13 @@ public static TableColumnValue columnValueFromDynamoValue(AttributeValue typedVa Item simpleMap = Item.fromMap(InternalUtils.toSimpleMapValue(value)); return new TableColumnValue(TableColumnType.jsonb, simpleMap.toJSON()); } else if (typedValue.getN() != null) { - String value = typedValue.getN(); - return new TableColumnValue(TableColumnType.numeric, value); + if (_convertNumbersToText == true) { + String value = typedValue.getN(); + return new TableColumnValue(TableColumnType.text, value); + } else { + String value = typedValue.getN(); + return new TableColumnValue(TableColumnType.numeric, value); + } } else if (typedValue.getNS() != null) { List value = typedValue.getNS(); return new TableColumnValue(TableColumnType.jsonb, Jackson.toJsonString(value)); @@ -604,8 +600,7 @@ public static TableColumnValue columnValueFromDynamoValue(AttributeValue typedVa return new TableColumnValue(TableColumnType.text, Jackson.toJsonString(value)); } else if (typedValue.getNULL() != null) { return new TableColumnValue(TableColumnType.text, ""); - } - else { + } else { return null; } } @@ -622,7 +617,10 @@ public static TableColumnType columnTypeFromDynamoValue(AttributeValue typedValu } else if (typedValue.getM() != null) { return TableColumnType.jsonb; } else if (typedValue.getN() != null) { - return TableColumnType.numeric; + if (_convertNumbersToText == true) { + return TableColumnType.text; + } else + return TableColumnType.numeric; } else if (typedValue.getNS() != null) { return TableColumnType.jsonb; } else if (typedValue.getS() != null) { @@ -634,5 +632,4 @@ public static TableColumnType columnTypeFromDynamoValue(AttributeValue typedValu } } - } From be52cfde9ee3abc09a6209bdad2383b0f793770e Mon Sep 17 00:00:00 2001 From: Jared Leonard Date: Wed, 11 Jul 2018 14:19:03 +0200 Subject: [PATCH 7/7] small refactor --- .../migration/DynamoDBReplicator.java | 2 +- .../migration/DynamoDBTableReplicator.java | 25 +++++++++++-------- 2 files changed, 15 insertions(+), 12 deletions(-) diff --git a/src/main/java/com/citusdata/migration/DynamoDBReplicator.java b/src/main/java/com/citusdata/migration/DynamoDBReplicator.java index 6d9b2dc..b7c9a98 100644 --- a/src/main/java/com/citusdata/migration/DynamoDBReplicator.java +++ b/src/main/java/com/citusdata/migration/DynamoDBReplicator.java @@ -182,7 +182,7 @@ public void run() { replicator.setAddColumnEnabled(true); replicator.setUseCitus(useCitus); replicator.setUseLowerCaseColumnNames(useLowerCaseColumnNames); - DynamoDBTableReplicator.setConvertNumberTypesToText(convertNumbersToText); + replicator.setConvertNumberTypesToText(convertNumbersToText); replicator.setConversionMode(conversionMode); replicators.add(replicator); } diff --git a/src/main/java/com/citusdata/migration/DynamoDBTableReplicator.java b/src/main/java/com/citusdata/migration/DynamoDBTableReplicator.java index ac54daa..56719b9 100644 --- a/src/main/java/com/citusdata/migration/DynamoDBTableReplicator.java +++ b/src/main/java/com/citusdata/migration/DynamoDBTableReplicator.java @@ -88,7 +88,7 @@ public class DynamoDBTableReplicator { boolean addColumnsEnabled; boolean useCitus; boolean useLowerCaseColumnNames; - private static boolean _convertNumbersToText = false; + boolean convertNumbersToText; ConversionMode conversionMode; TableSchema tableSchema; @@ -105,6 +105,7 @@ public DynamoDBTableReplicator(AmazonDynamoDB dynamoDBClient, AmazonDynamoDBStre this.addColumnsEnabled = true; this.useCitus = false; this.useLowerCaseColumnNames = false; + this.convertNumbersToText = false; this.tableSchema = emitter.fetchSchema(this.dynamoTableName); } @@ -120,8 +121,8 @@ public void setUseLowerCaseColumnNames(boolean useLowerCaseColumnNames) { this.useLowerCaseColumnNames = useLowerCaseColumnNames; } - public static void setConvertNumberTypesToText(boolean convertNumbersToText) { - _convertNumbersToText = convertNumbersToText; + public void setConvertNumberTypesToText(boolean convertNumbersToText) { + this.convertNumbersToText = convertNumbersToText; } public void setConversionMode(ConversionMode conversionMode) { @@ -454,7 +455,7 @@ void addNewColumns(Map item) { String keyName = entry.getKey(); String columnName = dynamoKeyToColumnName(keyName); TableColumn column = tableSchema.getColumn(columnName); - TableColumnType valueType = DynamoDBTableReplicator.columnTypeFromDynamoValue(entry.getValue()); + TableColumnType valueType = DynamoDBTableReplicator.columnTypeFromDynamoValue(entry.getValue(), this.convertNumbersToText); if (column == null) { column = tableSchema.addColumn(columnName, valueType); @@ -484,7 +485,9 @@ PrimaryKeyValue primaryKeyValueFromDynamoKeys(Map dynamo continue; } - TableColumnValue columnValue = DynamoDBTableReplicator.columnValueFromDynamoValue(entry.getValue()); + + + TableColumnValue columnValue = DynamoDBTableReplicator.columnValueFromDynamoValue(entry.getValue(), this.convertNumbersToText); keyValue.setValue(columnName, columnValue); } @@ -523,7 +526,7 @@ public TableRow rowWithJsonbFromDynamoRecord(Map dynamoI String columnName = dynamoKeyToColumnName(keyName); TableColumn column = tableSchema.getColumn(columnName); AttributeValue typedValue = entry.getValue(); - TableColumnValue columnValue = columnValueFromDynamoValue(typedValue); + TableColumnValue columnValue = columnValueFromDynamoValue(typedValue, this.convertNumbersToText); if (column != null) { row.setValue(columnName, columnValue); @@ -551,7 +554,7 @@ public TableRow rowWithColumnsFromDynamoRecord(Map dynam } AttributeValue typedValue = entry.getValue(); - TableColumnValue columnValue = columnValueFromDynamoValue(typedValue); + TableColumnValue columnValue = columnValueFromDynamoValue(typedValue, this.convertNumbersToText); if (columnValue.type == column.type) { row.setValue(columnName, columnValue); @@ -563,7 +566,7 @@ public TableRow rowWithColumnsFromDynamoRecord(Map dynam return row; } - public static TableColumnValue columnValueFromDynamoValue(AttributeValue typedValue) { + public static TableColumnValue columnValueFromDynamoValue(AttributeValue typedValue, boolean convertNumberToText) { if (typedValue.getB() != null) { ByteBuffer value = typedValue.getB(); return new TableColumnValue(TableColumnType.bytea, value.array()); @@ -582,7 +585,7 @@ public static TableColumnValue columnValueFromDynamoValue(AttributeValue typedVa Item simpleMap = Item.fromMap(InternalUtils.toSimpleMapValue(value)); return new TableColumnValue(TableColumnType.jsonb, simpleMap.toJSON()); } else if (typedValue.getN() != null) { - if (_convertNumbersToText == true) { + if (convertNumberToText == true) { String value = typedValue.getN(); return new TableColumnValue(TableColumnType.text, value); } else { @@ -605,7 +608,7 @@ public static TableColumnValue columnValueFromDynamoValue(AttributeValue typedVa } } - public static TableColumnType columnTypeFromDynamoValue(AttributeValue typedValue) { + public static TableColumnType columnTypeFromDynamoValue(AttributeValue typedValue, boolean convertNumberToText) { if (typedValue.getB() != null) { return TableColumnType.bytea; } else if (typedValue.getBOOL() != null) { @@ -617,7 +620,7 @@ public static TableColumnType columnTypeFromDynamoValue(AttributeValue typedValu } else if (typedValue.getM() != null) { return TableColumnType.jsonb; } else if (typedValue.getN() != null) { - if (_convertNumbersToText == true) { + if (convertNumberToText == true) { return TableColumnType.text; } else return TableColumnType.numeric;