diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..c5dbb08 --- /dev/null +++ b/.gitignore @@ -0,0 +1,10 @@ +target +.idea +.classpath +.factorypath +.project +dependency-reduced-pom.xml +.settings/org.eclipse.jdt.apt.core.prefs +.settings/org.eclipse.jdt.apt.core.prefs +.settings/org.eclipse.jdt.core.prefs +.settings/org.eclipse.m2e.core.prefs diff --git a/src/main/java/com/citusdata/migration/DynamoDBReplicator.java b/src/main/java/com/citusdata/migration/DynamoDBReplicator.java index cce6d8b..b7c9a98 100644 --- a/src/main/java/com/citusdata/migration/DynamoDBReplicator.java +++ b/src/main/java/com/citusdata/migration/DynamoDBReplicator.java @@ -89,6 +89,10 @@ public static void main(String[] args) throws SQLException, IOException { maxScanRateOption.setRequired(false); options.addOption(maxScanRateOption); + Option convertNumberToText = new Option("ntx", "convert-numbers-to-text", false, "Convert Number Types to text"); + convertNumberToText.setRequired(false); + options.addOption(convertNumberToText); + CommandLineParser parser = new DefaultParser(); HelpFormatter formatter = new HelpFormatter(); formatter.setWidth(120); @@ -108,6 +112,7 @@ public static void main(String[] args) throws SQLException, IOException { boolean useCitus = cmd.hasOption("citus"); boolean useLowerCaseColumnNames = cmd.hasOption("lower-case-column-names"); + boolean convertNumbersToText = cmd.hasOption("convert-numbers-to-text"); int maxScanRate = Integer.parseInt(cmd.getOptionValue("scan-rate", "25")); int dbConnectionCount = Integer.parseInt(cmd.getOptionValue("num-connections", "16")); String tableNamesString = cmd.getOptionValue("table"); @@ -177,8 +182,8 @@ public void run() { replicator.setAddColumnEnabled(true); replicator.setUseCitus(useCitus); replicator.setUseLowerCaseColumnNames(useLowerCaseColumnNames); + replicator.setConvertNumberTypesToText(convertNumbersToText); replicator.setConversionMode(conversionMode); - replicators.add(replicator); } @@ -214,13 +219,16 @@ public void run() { } } catch (ParseException e) { + e.printStackTrace(); LOG.error(e.getMessage()); formatter.printHelp("podyn", options); System.exit(3); } catch (TableExistsException|NonExistingTableException e) { + e.printStackTrace(); LOG.error(e.getMessage()); System.exit(1); } catch (ExecutionException e) { + e.printStackTrace(); Throwable cause = e.getCause(); if (cause.getCause() != null) { @@ -230,6 +238,7 @@ public void run() { } System.exit(1); } catch (EmissionException e) { + e.printStackTrace(); if (e.getCause() != null) { LOG.error(e.getCause().getMessage()); } else { @@ -240,6 +249,7 @@ public void run() { e.printStackTrace(); System.exit(2); } catch (Exception e) { + e.printStackTrace(); LOG.error(e); System.exit(1); } diff --git a/src/main/java/com/citusdata/migration/DynamoDBTableReplicator.java b/src/main/java/com/citusdata/migration/DynamoDBTableReplicator.java index 3629bf4..56719b9 100644 --- a/src/main/java/com/citusdata/migration/DynamoDBTableReplicator.java +++ b/src/main/java/com/citusdata/migration/DynamoDBTableReplicator.java @@ -1,5 +1,5 @@ /** - * + * */ package com.citusdata.migration; @@ -69,567 +69,570 @@ /** * @author marco - * */ public class DynamoDBTableReplicator { - private static final Log LOG = LogFactory.getLog(DynamoDBTableReplicator.class); - - public static final String APPLICATION_NAME = "podyn"; - public static final String LEASE_TABLE_PREFIX = "podyn_migration_"; - - final AmazonDynamoDBStreams streamsClient; - final AmazonDynamoDB dynamoDBClient; - final AWSCredentialsProvider awsCredentialsProvider; - final ExecutorService executor; - - final TableEmitter emitter; - final String dynamoTableName; - - boolean addColumnsEnabled; - boolean useCitus; - boolean useLowerCaseColumnNames; - ConversionMode conversionMode; - - TableSchema tableSchema; - - public DynamoDBTableReplicator( - AmazonDynamoDB dynamoDBClient, - AmazonDynamoDBStreams streamsClient, - AWSCredentialsProvider awsCredentialsProvider, - ExecutorService executorService, - TableEmitter emitter, - String tableName) throws SQLException { - this.dynamoDBClient = dynamoDBClient; - this.streamsClient = streamsClient; - this.awsCredentialsProvider = awsCredentialsProvider; - this.executor = executorService; - this.emitter = emitter; - this.dynamoTableName = tableName; - this.addColumnsEnabled = true; - this.useCitus = false; - this.useLowerCaseColumnNames = false; - this.tableSchema = emitter.fetchSchema(this.dynamoTableName); - } - - public void setUseCitus(boolean useCitus) { - this.useCitus = useCitus; - } - - public void setAddColumnEnabled(boolean addColumnEnabled) { - this.addColumnsEnabled = addColumnEnabled; - } - - public void setUseLowerCaseColumnNames(boolean useLowerCaseColumnNames) { - this.useLowerCaseColumnNames = useLowerCaseColumnNames; - } - - public void setConversionMode(ConversionMode conversionMode) { - this.conversionMode = conversionMode; - } - - String dynamoKeyToColumnName(String keyName) { - if (useLowerCaseColumnNames) { - return keyName.toLowerCase(); - } else { - return keyName; - } - } - - public void replicateSchema() throws TableExistsException { - if (tableSchema != null) { - throw new TableExistsException("relation %s already exists", dynamoTableName); - } - - tableSchema = fetchSourceSchema(); - emitter.createTable(tableSchema); - } - - TableSchema fetchSourceSchema() { - TableSchema tableSchema = new TableSchema(dynamoTableName); - - DescribeTableResult describeTableResult = dynamoDBClient.describeTable(dynamoTableName); - TableDescription tableDescription = describeTableResult.getTable(); + private static final Log LOG = LogFactory.getLog(DynamoDBTableReplicator.class); + + public static final String APPLICATION_NAME = "podyn"; + public static final String LEASE_TABLE_PREFIX = "podyn_migration_"; + + final AmazonDynamoDBStreams streamsClient; + final AmazonDynamoDB dynamoDBClient; + final AWSCredentialsProvider awsCredentialsProvider; + final ExecutorService executor; + + final TableEmitter emitter; + final String dynamoTableName; + + boolean addColumnsEnabled; + boolean useCitus; + boolean useLowerCaseColumnNames; + boolean convertNumbersToText; + ConversionMode conversionMode; + + TableSchema tableSchema; + + public DynamoDBTableReplicator(AmazonDynamoDB dynamoDBClient, AmazonDynamoDBStreams streamsClient, + AWSCredentialsProvider awsCredentialsProvider, ExecutorService executorService, TableEmitter emitter, + String tableName) throws SQLException { + this.dynamoDBClient = dynamoDBClient; + this.streamsClient = streamsClient; + this.awsCredentialsProvider = awsCredentialsProvider; + this.executor = executorService; + this.emitter = emitter; + this.dynamoTableName = tableName; + this.addColumnsEnabled = true; + this.useCitus = false; + this.useLowerCaseColumnNames = false; + this.convertNumbersToText = false; + this.tableSchema = emitter.fetchSchema(this.dynamoTableName); + } + + public void setUseCitus(boolean useCitus) { + this.useCitus = useCitus; + } + + public void setAddColumnEnabled(boolean addColumnEnabled) { + this.addColumnsEnabled = addColumnEnabled; + } + + public void setUseLowerCaseColumnNames(boolean useLowerCaseColumnNames) { + this.useLowerCaseColumnNames = useLowerCaseColumnNames; + } + + public void setConvertNumberTypesToText(boolean convertNumbersToText) { + this.convertNumbersToText = convertNumbersToText; + } + + public void setConversionMode(ConversionMode conversionMode) { + this.conversionMode = conversionMode; + } + + String dynamoKeyToColumnName(String keyName) { + if (useLowerCaseColumnNames) { + return keyName.toLowerCase(); + } else { + return keyName; + } + } + + public void replicateSchema() throws TableExistsException { + if (tableSchema != null) { + throw new TableExistsException("relation %s already exists", dynamoTableName); + } + + tableSchema = fetchSourceSchema(); + emitter.createTable(tableSchema); + } + + TableSchema fetchSourceSchema() { + TableSchema tableSchema = new TableSchema(dynamoTableName); - List attributeDefinitions = tableDescription.getAttributeDefinitions(); + DescribeTableResult describeTableResult = dynamoDBClient.describeTable(dynamoTableName); + TableDescription tableDescription = describeTableResult.getTable(); - for (AttributeDefinition attributeDefinition : attributeDefinitions) { - String keyName = attributeDefinition.getAttributeName(); - String columnName = dynamoKeyToColumnName(keyName); - TableColumnType type = TableColumnType.text; + List attributeDefinitions = tableDescription.getAttributeDefinitions(); - switch(attributeDefinition.getAttributeType()) { - case "N": - type = TableColumnType.numeric; - break; - case "B": - type = TableColumnType.bytea; - break; - } + for (AttributeDefinition attributeDefinition : attributeDefinitions) { + String keyName = attributeDefinition.getAttributeName(); + String columnName = dynamoKeyToColumnName(keyName); + TableColumnType type = TableColumnType.text; - tableSchema.addColumn(columnName, type); - } + switch (attributeDefinition.getAttributeType()) { + case "N": + type = TableColumnType.numeric; + break; + case "B": + type = TableColumnType.bytea; + break; + } - List primaryKey = new ArrayList<>(); - List keySchema = tableDescription.getKeySchema(); + tableSchema.addColumn(columnName, type); + } - for (KeySchemaElement keySchemaElement : keySchema) { - String keyName = keySchemaElement.getAttributeName(); - String keyType = keySchemaElement.getKeyType(); - String columnName = dynamoKeyToColumnName(keyName); - - TableColumn column = tableSchema.getColumn(columnName); - - if (useCitus && KeyType.fromValue(keyType) == KeyType.HASH) { - tableSchema.setDistributionColumn(columnName); - } - - column.notNull = true; - - primaryKey.add(columnName); - } - - tableSchema.setPrimaryKey(primaryKey); - - List secondaryIndexes = tableDescription.getGlobalSecondaryIndexes(); - - if (secondaryIndexes != null) { - for (GlobalSecondaryIndexDescription secondaryIndex : secondaryIndexes) { - String indexName = secondaryIndex.getIndexName(); - List indexColumns = new ArrayList<>(); - - for (KeySchemaElement keySchemaElement : secondaryIndex.getKeySchema()) { - String keyName = keySchemaElement.getAttributeName(); - String columnName = dynamoKeyToColumnName(keyName); - - indexColumns.add(columnName); - } - - tableSchema.addIndex(indexName, indexColumns); - } - } - - if(conversionMode == ConversionMode.jsonb) { - tableSchema.addColumn("data", TableColumnType.jsonb); - } - - return tableSchema; - } - - public Future startReplicatingData(final int maxScanRate) { - return executor.submit(new Callable() { - @Override - public Long call() throws Exception { - return replicateData(maxScanRate); - } - }); - } - - public long replicateData(int maxScanRate) { - RateLimiter rateLimiter = RateLimiter.create(maxScanRate); - - Map lastEvaluatedScanKey = null; - long numRowsReplicated = 0; - - while(true) { - ScanResult scanResult = scanWithRetries(lastEvaluatedScanKey); - - if (addColumnsEnabled) { - for(Map dynamoItem : scanResult.getItems()) { - addNewColumns(dynamoItem); - } - } - - TableRowBatch tableRowBatch = new TableRowBatch(); - - for(Map dynamoItem : scanResult.getItems()) { - TableRow tableRow = rowFromDynamoRecord(dynamoItem); - - tableRowBatch.addRow(tableRow); - } - - LOG.info(String.format("Replicated %d rows to table %s", tableRowBatch.size(), tableSchema.tableName)); - - numRowsReplicated += tableRowBatch.size(); - - /* load the batch using COPY */ - emitter.copyFromReader(tableSchema, tableRowBatch.asCopyReader()); - - lastEvaluatedScanKey = scanResult.getLastEvaluatedKey(); - - if(lastEvaluatedScanKey == null) { - break; - } - - // Account for the rest of the throughput we consumed, - // now that we know how much that scan request cost - double consumedCapacity = scanResult.getConsumedCapacity().getCapacityUnits(); - int permitsToConsume = (int)(consumedCapacity - 1.0); - if (permitsToConsume <= 0) { - permitsToConsume = 1; - } - - // Let the rate limiter wait until our desired throughput "recharges" - rateLimiter.acquire(permitsToConsume); - } - - return numRowsReplicated; - } - - private ScanResult scanWithRetries(Map lastEvaluatedScanKey) { - ScanRequest scanRequest = new ScanRequest(). - withTableName(this.dynamoTableName). - withConsistentRead(true). - withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL). - withLimit(100). - withExclusiveStartKey(lastEvaluatedScanKey); - - for (int tryNumber = 1; ; tryNumber++) { - try { - ScanResult scanResult = dynamoDBClient.scan(scanRequest); - return scanResult; - } catch (ProvisionedThroughputExceededException e) { - if (tryNumber == 3) { - throw e; - } - } catch (InternalServerErrorException e) { - if (tryNumber == 3) { - throw e; - } - } - - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - } - - public String getStreamArn() { - DescribeTableResult describeTableResult = dynamoDBClient.describeTable(dynamoTableName); - TableDescription tableDescription = describeTableResult.getTable(); - String tableStreamArn = tableDescription.getLatestStreamArn(); - return tableStreamArn; - } - - - public void startReplicatingChanges() throws StreamNotEnabledException { - if (tableSchema == null) { - throw new TableExistsException("table %s does not exist in destination", dynamoTableName); - } - - String tableStreamArn = getStreamArn(); - - if (tableStreamArn == null) { - throw new StreamNotEnabledException("table %s does not have a stream enabled\n", dynamoTableName); - } - - AmazonDynamoDBStreamsAdapterClient adapterClient = new AmazonDynamoDBStreamsAdapterClient(streamsClient); - AmazonCloudWatch cloudWatchClient = AmazonCloudWatchClientBuilder.standard().build(); - - String workerId = generateWorkerId(); - - final KinesisClientLibConfiguration workerConfig = new KinesisClientLibConfiguration( - APPLICATION_NAME, tableStreamArn, awsCredentialsProvider, workerId). - withMaxRecords(1000). - withIdleTimeBetweenReadsInMillis(500). - withCallProcessRecordsEvenForEmptyRecordList(false). - withCleanupLeasesUponShardCompletion(false). - withFailoverTimeMillis(20000). - withTableName(LEASE_TABLE_PREFIX + dynamoTableName). - withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON); - - Worker worker = new Worker.Builder(). - recordProcessorFactory(recordProcessorFactory). - config(workerConfig). - kinesisClient(adapterClient). - cloudWatchClient(cloudWatchClient). - dynamoDBClient(dynamoDBClient). - execService(executor). - build(); - - executor.execute(worker); - } - - IRecordProcessorFactory recordProcessorFactory = new IRecordProcessorFactory() { - @Override - public IRecordProcessor createProcessor() { - return createStreamProcessor(); - } - }; - - protected IRecordProcessor createStreamProcessor() { - return new IRecordProcessor() { - - @Override - public void initialize(InitializationInput initializationInput) { - } - - public List extractDynamoStreamRecords(List kinesisRecords) { - List dynamoRecords = new ArrayList<>(kinesisRecords.size()); - - for(com.amazonaws.services.kinesis.model.Record kinesisRecord : kinesisRecords) { - if (kinesisRecord instanceof RecordAdapter) { - Record dynamoRecord = ((RecordAdapter) kinesisRecord).getInternalObject(); - dynamoRecords.add(dynamoRecord); - } - } - - return dynamoRecords; - } - - @Override - public void processRecords(ProcessRecordsInput processRecordsInput) { - List records = extractDynamoStreamRecords(processRecordsInput.getRecords()); - - DynamoDBTableReplicator.this.processRecords(records); - - checkpoint(processRecordsInput.getCheckpointer()); - } - - @Override - public void shutdown(ShutdownInput shutdownInput) { - if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) { - checkpoint(shutdownInput.getCheckpointer()); - } - } - - void checkpoint(IRecordProcessorCheckpointer checkpointer) { - try { - checkpointer.checkpoint(); - } catch (KinesisClientLibDependencyException|InvalidStateException|ThrottlingException|ShutdownException e) { - LOG.warn(e); - } - } - }; - } - - void processRecords(List records) { - if (addColumnsEnabled) { - for (Record dynamoRecord : records) { - StreamRecord streamRecord = dynamoRecord.getDynamodb(); - Map item = streamRecord.getNewImage(); - - if (item == null) { - continue; - } - - addNewColumns(item); - } - } - - for (Record dynamoRecord : records) { - StreamRecord streamRecord = dynamoRecord.getDynamodb(); - - switch (dynamoRecord.getEventName()) { - case "INSERT": - case "MODIFY": - Map dynamoItem = streamRecord.getNewImage(); - - if(dynamoItem == null) { - LOG.error(String.format("the stream for table %s does not have new images", dynamoTableName)); - System.exit(1); - } - - TableRow tableRow = rowFromDynamoRecord(dynamoItem); - emitter.upsert(tableRow); - LOG.debug(tableRow.toUpsert()); - break; - case "REMOVE": - Map dynamoKeys = streamRecord.getKeys(); - PrimaryKeyValue keyValue = primaryKeyValueFromDynamoKeys(dynamoKeys); - emitter.delete(keyValue); - LOG.debug(keyValue.toDelete()); - break; - } - - LOG.debug(streamRecord); - } - - LOG.info(String.format("Replicated %d changes to table %s", records.size(), tableSchema.tableName)); - } - - void addNewColumns(Map item) { - if(conversionMode == ConversionMode.jsonb) { - /* don't add new columns in jsonb mode */ - return; - } - - for(Map.Entry entry : item.entrySet()) { - String keyName = entry.getKey(); - String columnName = dynamoKeyToColumnName(keyName); - TableColumn column = tableSchema.getColumn(columnName); - TableColumnType valueType = DynamoDBTableReplicator.columnTypeFromDynamoValue(entry.getValue()); - - if (column == null) { - column = tableSchema.addColumn(columnName, valueType); - LOG.info(String.format("Adding new column to table %s: %s", tableSchema.tableName, column)); - emitter.createColumn(column); - } else if (column.type != valueType) { - columnName = columnName + "_" + valueType; - column = tableSchema.getColumn(columnName); - - if (column == null) { - column = tableSchema.addColumn(columnName, valueType); - LOG.info(String.format("Adding new column to table %s: %s", tableSchema.tableName, column)); - emitter.createColumn(column); - } - } - } - } - - PrimaryKeyValue primaryKeyValueFromDynamoKeys(Map dynamoKeys) { - PrimaryKeyValue keyValue = new PrimaryKeyValue(tableSchema); - - for(Map.Entry entry : dynamoKeys.entrySet()) { - String keyName = entry.getKey(); - String columnName = dynamoKeyToColumnName(keyName); - - if (!tableSchema.isInPrimaryKey(columnName)) { - continue; - } - - TableColumnValue columnValue = DynamoDBTableReplicator.columnValueFromDynamoValue(entry.getValue()); - - keyValue.setValue(columnName, columnValue); - } - - return keyValue; - } - - static String generateWorkerId() { - StringBuilder sb = new StringBuilder(); - - try { - sb.append(InetAddress.getLocalHost().getCanonicalHostName()); - } catch (UnknownHostException e) { - } - - sb.append('_'); - sb.append(UUID.randomUUID()); - - return sb.toString(); - } - - public TableRow rowFromDynamoRecord(Map dynamoItem) { - if (conversionMode == ConversionMode.jsonb) { - return rowWithJsonbFromDynamoRecord(dynamoItem); - } else { - return rowWithColumnsFromDynamoRecord(dynamoItem); - - } - } - - public TableRow rowWithJsonbFromDynamoRecord(Map dynamoItem) { - TableRow row = tableSchema.createRow(); - Item item = new Item(); - - for(Map.Entry entry : dynamoItem.entrySet()) { - String keyName = entry.getKey(); - String columnName = dynamoKeyToColumnName(keyName); - TableColumn column = tableSchema.getColumn(columnName); - AttributeValue typedValue = entry.getValue(); - TableColumnValue columnValue = columnValueFromDynamoValue(typedValue); - - if (column != null) { - row.setValue(columnName, columnValue); - } - - item.with(keyName, columnValue.datum); - } - - row.setValue("data", item.toJSON()); - - return row; - } - - public TableRow rowWithColumnsFromDynamoRecord(Map dynamoItem) { - TableRow row = tableSchema.createRow(); - - for(Map.Entry entry : dynamoItem.entrySet()) { - String keyName = entry.getKey(); - String columnName = dynamoKeyToColumnName(keyName); - TableColumn column = tableSchema.getColumn(columnName); - - if (column == null) { - /* skip non-existent columns */ - continue; - } - - AttributeValue typedValue = entry.getValue(); - TableColumnValue columnValue = columnValueFromDynamoValue(typedValue); - - if (columnValue.type == column.type) { - row.setValue(columnName, columnValue); - } else { - - row.setValue(columnName + "_" + columnValue.type, columnValue); - } - } - - return row; - } - - public static TableColumnValue columnValueFromDynamoValue(AttributeValue typedValue) { - if(typedValue.getB() != null) { - ByteBuffer value = typedValue.getB(); - return new TableColumnValue(TableColumnType.bytea, value.array()); - } else if (typedValue.getBOOL() != null) { - Boolean value = typedValue.getBOOL(); - return new TableColumnValue(TableColumnType.bool, value); - } else if (typedValue.getBS() != null) { - List value = typedValue.getBS(); - return new TableColumnValue(TableColumnType.jsonb, Jackson.toJsonString(value)); - } else if (typedValue.getL() != null) { - List value = typedValue.getL(); - List simpleList = InternalUtils.toSimpleList(value); - return new TableColumnValue(TableColumnType.jsonb, Jackson.toJsonString(simpleList)); - } else if (typedValue.getM() != null) { - Map value = typedValue.getM(); - Item simpleMap = Item.fromMap(InternalUtils.toSimpleMapValue(value)); - return new TableColumnValue(TableColumnType.jsonb, simpleMap.toJSON()); - } else if (typedValue.getN() != null) { - String value = typedValue.getN(); - return new TableColumnValue(TableColumnType.numeric, value); - } else if (typedValue.getNS() != null) { - List value = typedValue.getNS(); - return new TableColumnValue(TableColumnType.jsonb, Jackson.toJsonString(value)); - } else if (typedValue.getS() != null) { - String value = typedValue.getS(); - return new TableColumnValue(TableColumnType.text, value); - } else if (typedValue.getSS() != null) { - List value = typedValue.getSS(); - return new TableColumnValue(TableColumnType.jsonb, Jackson.toJsonString(value)); - } else { - return null; - } - } - - public static TableColumnType columnTypeFromDynamoValue(AttributeValue typedValue) { - if(typedValue.getB() != null) { - return TableColumnType.bytea; - } else if (typedValue.getBOOL() != null) { - return TableColumnType.bool; - } else if (typedValue.getBS() != null) { - return TableColumnType.jsonb; - } else if (typedValue.getL() != null) { - return TableColumnType.jsonb; - } else if (typedValue.getM() != null) { - return TableColumnType.jsonb; - } else if (typedValue.getN() != null) { - return TableColumnType.numeric; - } else if (typedValue.getNS() != null) { - return TableColumnType.jsonb; - } else if (typedValue.getS() != null) { - return TableColumnType.text; - } else if (typedValue.getSS() != null) { - return TableColumnType.jsonb; - } else { - return TableColumnType.text; - } - } + List primaryKey = new ArrayList<>(); + List keySchema = tableDescription.getKeySchema(); + for (KeySchemaElement keySchemaElement : keySchema) { + String keyName = keySchemaElement.getAttributeName(); + String keyType = keySchemaElement.getKeyType(); + String columnName = dynamoKeyToColumnName(keyName); + + TableColumn column = tableSchema.getColumn(columnName); + + if (useCitus && KeyType.fromValue(keyType) == KeyType.HASH) { + tableSchema.setDistributionColumn(columnName); + } + + column.notNull = true; + + primaryKey.add(columnName); + } + + tableSchema.setPrimaryKey(primaryKey); + + List secondaryIndexes = tableDescription.getGlobalSecondaryIndexes(); + + if (secondaryIndexes != null) { + for (GlobalSecondaryIndexDescription secondaryIndex : secondaryIndexes) { + String indexName = secondaryIndex.getIndexName(); + List indexColumns = new ArrayList<>(); + + for (KeySchemaElement keySchemaElement : secondaryIndex.getKeySchema()) { + String keyName = keySchemaElement.getAttributeName(); + String columnName = dynamoKeyToColumnName(keyName); + + indexColumns.add(columnName); + } + + tableSchema.addIndex(indexName, indexColumns); + } + } + + if (conversionMode == ConversionMode.jsonb) { + tableSchema.addColumn("data", TableColumnType.jsonb); + } + + return tableSchema; + } + + public Future startReplicatingData(final int maxScanRate) { + return executor.submit(new Callable() { + @Override + public Long call() throws Exception { + return replicateData(maxScanRate); + } + }); + } + + public long replicateData(int maxScanRate) { + RateLimiter rateLimiter = RateLimiter.create(maxScanRate); + + Map lastEvaluatedScanKey = null; + long numRowsReplicated = 0; + + while (true) { + ScanResult scanResult = scanWithRetries(lastEvaluatedScanKey); + + if (addColumnsEnabled) { + for (Map dynamoItem : scanResult.getItems()) { + addNewColumns(dynamoItem); + } + } + TableRowBatch tableRowBatch = new TableRowBatch(); + + for (Map dynamoItem : scanResult.getItems()) { + TableRow tableRow = rowFromDynamoRecord(dynamoItem); + + tableRowBatch.addRow(tableRow); + } + + LOG.info(String.format("Replicated %d rows to table %s", tableRowBatch.size(), tableSchema.tableName)); + + numRowsReplicated += tableRowBatch.size(); + + /* load the batch using COPY */ + emitter.copyFromReader(tableSchema, tableRowBatch.asCopyReader()); + + lastEvaluatedScanKey = scanResult.getLastEvaluatedKey(); + + if (lastEvaluatedScanKey == null) { + break; + } + + // Account for the rest of the throughput we consumed, + // now that we know how much that scan request cost + double consumedCapacity = scanResult.getConsumedCapacity().getCapacityUnits(); + int permitsToConsume = (int) (consumedCapacity - 1.0); + if (permitsToConsume <= 0) { + permitsToConsume = 1; + } + + // Let the rate limiter wait until our desired throughput "recharges" + rateLimiter.acquire(permitsToConsume); + } + + return numRowsReplicated; + } + + private ScanResult scanWithRetries(Map lastEvaluatedScanKey) { + ScanRequest scanRequest = new ScanRequest().withTableName(this.dynamoTableName).withConsistentRead(true) + .withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL).withLimit(100) + .withExclusiveStartKey(lastEvaluatedScanKey); + + for (int tryNumber = 1;; tryNumber++) { + try { + ScanResult scanResult = dynamoDBClient.scan(scanRequest); + return scanResult; + } catch (ProvisionedThroughputExceededException e) { + if (tryNumber == 3) { + throw e; + } + } catch (InternalServerErrorException e) { + if (tryNumber == 3) { + throw e; + } + } + + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } + + public String getStreamArn() { + DescribeTableResult describeTableResult = dynamoDBClient.describeTable(dynamoTableName); + TableDescription tableDescription = describeTableResult.getTable(); + String tableStreamArn = tableDescription.getLatestStreamArn(); + return tableStreamArn; + } + + public void startReplicatingChanges() throws StreamNotEnabledException { + if (tableSchema == null) { + throw new TableExistsException("table %s does not exist in destination", dynamoTableName); + } + + String tableStreamArn = getStreamArn(); + + if (tableStreamArn == null) { + throw new StreamNotEnabledException("table %s does not have a stream enabled\n", dynamoTableName); + } + + AmazonDynamoDBStreamsAdapterClient adapterClient = new AmazonDynamoDBStreamsAdapterClient(streamsClient); + AmazonCloudWatch cloudWatchClient = AmazonCloudWatchClientBuilder.standard().build(); + + String workerId = generateWorkerId(); + + final KinesisClientLibConfiguration workerConfig = new KinesisClientLibConfiguration(APPLICATION_NAME, + tableStreamArn, awsCredentialsProvider, workerId).withMaxRecords(1000) + .withIdleTimeBetweenReadsInMillis(500).withCallProcessRecordsEvenForEmptyRecordList(false) + .withCleanupLeasesUponShardCompletion(false).withFailoverTimeMillis(20000) + .withTableName(LEASE_TABLE_PREFIX + dynamoTableName) + .withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON); + + Worker worker = new Worker.Builder().recordProcessorFactory(recordProcessorFactory).config(workerConfig) + .kinesisClient(adapterClient).cloudWatchClient(cloudWatchClient).dynamoDBClient(dynamoDBClient) + .execService(executor).build(); + + executor.execute(worker); + } + + IRecordProcessorFactory recordProcessorFactory = new IRecordProcessorFactory() { + @Override + public IRecordProcessor createProcessor() { + return createStreamProcessor(); + } + }; + + protected IRecordProcessor createStreamProcessor() { + return new IRecordProcessor() { + + @Override + public void initialize(InitializationInput initializationInput) { + } + + public List extractDynamoStreamRecords( + List kinesisRecords) { + List dynamoRecords = new ArrayList<>(kinesisRecords.size()); + + for (com.amazonaws.services.kinesis.model.Record kinesisRecord : kinesisRecords) { + if (kinesisRecord instanceof RecordAdapter) { + Record dynamoRecord = ((RecordAdapter) kinesisRecord).getInternalObject(); + dynamoRecords.add(dynamoRecord); + } + } + + return dynamoRecords; + } + + @Override + public void processRecords(ProcessRecordsInput processRecordsInput) { + List records = extractDynamoStreamRecords(processRecordsInput.getRecords()); + + DynamoDBTableReplicator.this.processRecords(records); + + checkpoint(processRecordsInput.getCheckpointer()); + } + + @Override + public void shutdown(ShutdownInput shutdownInput) { + if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) { + checkpoint(shutdownInput.getCheckpointer()); + } + } + + void checkpoint(IRecordProcessorCheckpointer checkpointer) { + try { + checkpointer.checkpoint(); + } catch (KinesisClientLibDependencyException | InvalidStateException | ThrottlingException + | ShutdownException e) { + LOG.warn(e); + } + } + }; + } + + void processRecords(List records) { + try { + if (addColumnsEnabled) { + for (Record dynamoRecord : records) { + StreamRecord streamRecord = dynamoRecord.getDynamodb(); + Map item = streamRecord.getNewImage(); + + if (item == null) { + continue; + } + addNewColumns(item); + } + } + + for (Record dynamoRecord : records) { + StreamRecord streamRecord = dynamoRecord.getDynamodb(); + + switch (dynamoRecord.getEventName()) { + case "INSERT": + case "MODIFY": + Map dynamoItem = streamRecord.getNewImage(); + + if (dynamoItem == null) { + LOG.error(String.format("the stream for table %s does not have new images", dynamoTableName)); + System.exit(1); + } + + TableRow tableRow = rowFromDynamoRecord(dynamoItem); + emitter.upsert(tableRow); + LOG.debug(tableRow.toUpsert()); + break; + case "REMOVE": + Map dynamoKeys = streamRecord.getKeys(); + PrimaryKeyValue keyValue = primaryKeyValueFromDynamoKeys(dynamoKeys); + emitter.delete(keyValue); + LOG.debug(keyValue.toDelete()); + break; + } + + LOG.debug(streamRecord); + } + + LOG.info(String.format("Replicated %d changes to table %s", records.size(), tableSchema.tableName)); + } catch (Exception e) { + e.printStackTrace(); + LOG.error(e); + } + } + + void addNewColumns(Map item) { + if (conversionMode == ConversionMode.jsonb) { + /* don't add new columns in jsonb mode */ + return; + } + + for (Map.Entry entry : item.entrySet()) { + String keyName = entry.getKey(); + String columnName = dynamoKeyToColumnName(keyName); + TableColumn column = tableSchema.getColumn(columnName); + TableColumnType valueType = DynamoDBTableReplicator.columnTypeFromDynamoValue(entry.getValue(), this.convertNumbersToText); + + if (column == null) { + column = tableSchema.addColumn(columnName, valueType); + LOG.info(String.format("Adding new column to table %s: %s", tableSchema.tableName, column)); + emitter.createColumn(column); + } else if (column.type != valueType) { + columnName = columnName + "_" + valueType; + column = tableSchema.getColumn(columnName); + + if (column == null) { + column = tableSchema.addColumn(columnName, valueType); + LOG.info(String.format("Adding new column to table %s: %s", tableSchema.tableName, column)); + emitter.createColumn(column); + } + } + } + } + + PrimaryKeyValue primaryKeyValueFromDynamoKeys(Map dynamoKeys) { + PrimaryKeyValue keyValue = new PrimaryKeyValue(tableSchema); + + for (Map.Entry entry : dynamoKeys.entrySet()) { + String keyName = entry.getKey(); + String columnName = dynamoKeyToColumnName(keyName); + + if (!tableSchema.isInPrimaryKey(columnName)) { + continue; + } + + + + TableColumnValue columnValue = DynamoDBTableReplicator.columnValueFromDynamoValue(entry.getValue(), this.convertNumbersToText); + + keyValue.setValue(columnName, columnValue); + } + + return keyValue; + } + + static String generateWorkerId() { + StringBuilder sb = new StringBuilder(); + + try { + sb.append(InetAddress.getLocalHost().getCanonicalHostName()); + } catch (UnknownHostException e) { + } + + sb.append('_'); + sb.append(UUID.randomUUID()); + + return sb.toString(); + } + + public TableRow rowFromDynamoRecord(Map dynamoItem) { + if (conversionMode == ConversionMode.jsonb) { + return rowWithJsonbFromDynamoRecord(dynamoItem); + } else { + return rowWithColumnsFromDynamoRecord(dynamoItem); + } + } + + public TableRow rowWithJsonbFromDynamoRecord(Map dynamoItem) { + TableRow row = tableSchema.createRow(); + Item item = new Item(); + + for (Map.Entry entry : dynamoItem.entrySet()) { + String keyName = entry.getKey(); + String columnName = dynamoKeyToColumnName(keyName); + TableColumn column = tableSchema.getColumn(columnName); + AttributeValue typedValue = entry.getValue(); + TableColumnValue columnValue = columnValueFromDynamoValue(typedValue, this.convertNumbersToText); + + if (column != null) { + row.setValue(columnName, columnValue); + } + + item.with(keyName, columnValue.datum); + } + + row.setValue("data", item.toJSON()); + + return row; + } + + public TableRow rowWithColumnsFromDynamoRecord(Map dynamoItem) { + TableRow row = tableSchema.createRow(); + + for (Map.Entry entry : dynamoItem.entrySet()) { + String keyName = entry.getKey(); + String columnName = dynamoKeyToColumnName(keyName); + TableColumn column = tableSchema.getColumn(columnName); + + if (column == null) { + /* skip non-existent columns */ + continue; + } + + AttributeValue typedValue = entry.getValue(); + TableColumnValue columnValue = columnValueFromDynamoValue(typedValue, this.convertNumbersToText); + + if (columnValue.type == column.type) { + row.setValue(columnName, columnValue); + } else { + row.setValue(columnName + "_" + columnValue.type, columnValue); + } + } + + return row; + } + + public static TableColumnValue columnValueFromDynamoValue(AttributeValue typedValue, boolean convertNumberToText) { + if (typedValue.getB() != null) { + ByteBuffer value = typedValue.getB(); + return new TableColumnValue(TableColumnType.bytea, value.array()); + } else if (typedValue.getBOOL() != null) { + Boolean value = typedValue.getBOOL(); + return new TableColumnValue(TableColumnType.bool, value); + } else if (typedValue.getBS() != null) { + List value = typedValue.getBS(); + return new TableColumnValue(TableColumnType.jsonb, Jackson.toJsonString(value)); + } else if (typedValue.getL() != null) { + List value = typedValue.getL(); + List simpleList = InternalUtils.toSimpleList(value); + return new TableColumnValue(TableColumnType.jsonb, Jackson.toJsonString(simpleList)); + } else if (typedValue.getM() != null) { + Map value = typedValue.getM(); + Item simpleMap = Item.fromMap(InternalUtils.toSimpleMapValue(value)); + return new TableColumnValue(TableColumnType.jsonb, simpleMap.toJSON()); + } else if (typedValue.getN() != null) { + if (convertNumberToText == true) { + String value = typedValue.getN(); + return new TableColumnValue(TableColumnType.text, value); + } else { + String value = typedValue.getN(); + return new TableColumnValue(TableColumnType.numeric, value); + } + } else if (typedValue.getNS() != null) { + List value = typedValue.getNS(); + return new TableColumnValue(TableColumnType.jsonb, Jackson.toJsonString(value)); + } else if (typedValue.getS() != null) { + String value = typedValue.getS(); + return new TableColumnValue(TableColumnType.text, value); + } else if (typedValue.getSS() != null) { + List value = typedValue.getSS(); + return new TableColumnValue(TableColumnType.text, Jackson.toJsonString(value)); + } else if (typedValue.getNULL() != null) { + return new TableColumnValue(TableColumnType.text, ""); + } else { + return null; + } + } + + public static TableColumnType columnTypeFromDynamoValue(AttributeValue typedValue, boolean convertNumberToText) { + if (typedValue.getB() != null) { + return TableColumnType.bytea; + } else if (typedValue.getBOOL() != null) { + return TableColumnType.bool; + } else if (typedValue.getBS() != null) { + return TableColumnType.jsonb; + } else if (typedValue.getL() != null) { + return TableColumnType.jsonb; + } else if (typedValue.getM() != null) { + return TableColumnType.jsonb; + } else if (typedValue.getN() != null) { + if (convertNumberToText == true) { + return TableColumnType.text; + } else + return TableColumnType.numeric; + } else if (typedValue.getNS() != null) { + return TableColumnType.jsonb; + } else if (typedValue.getS() != null) { + return TableColumnType.text; + } else if (typedValue.getSS() != null) { + return TableColumnType.jsonb; + } else { + return TableColumnType.text; + } + } }