From 0bc8779aac4d83ded02461130f32abf2385aaebc Mon Sep 17 00:00:00 2001 From: SimbaGithub <48035983+SimbaGithub@users.noreply.github.com> Date: Mon, 13 Dec 2021 13:41:59 -0800 Subject: [PATCH] Add GET command (#221) * Check if command being executed is GET * Add GET functionality to file transfer agent * Add GET functionality to remote storage util * Add download function to S3 client * Fix decryption when downloading more than one file * Add Azure client for GET * Add GCS client for GET * Add local client for GET * Add testing for GET command * Add temp debug msg * Remove temp debug msg * Modify GET query on Windows OS --- lib/connection/statement.js | 2 +- lib/file_transfer_agent/azure_util.js | 60 ++++- lib/file_transfer_agent/encrypt_util.js | 27 +- .../file_transfer_agent.js | 236 +++++++++++++++++- lib/file_transfer_agent/gcs_util.js | 100 ++++++++ lib/file_transfer_agent/local_util.js | 39 +++ .../remote_storage_util.js | 91 +++++-- lib/file_transfer_agent/s3_util.js | 64 ++++- lib/util.js | 11 + .../integration/{testPut.js => testPutGet.js} | 64 ++++- 10 files changed, 650 insertions(+), 44 deletions(-) rename test/integration/{testPut.js => testPutGet.js} (75%) diff --git a/lib/connection/statement.js b/lib/connection/statement.js index c1f6ee0a4..98de7573d 100644 --- a/lib/connection/statement.js +++ b/lib/connection/statement.js @@ -72,7 +72,7 @@ exports.createStatementPreExec = function ( var context = createContextPreExec( options, services, connectionConfig); - if (Util.isPutCommand(options.sqlText)) + if (Util.isPutCommand(options.sqlText) || Util.isGetCommand(options.sqlText)) { return createFileStatementPreExec( options, context, services, connectionConfig); diff --git a/lib/file_transfer_agent/azure_util.js b/lib/file_transfer_agent/azure_util.js index 3f1a477bb..98858a3b1 100644 --- a/lib/file_transfer_agent/azure_util.js +++ b/lib/file_transfer_agent/azure_util.js @@ -236,6 +236,65 @@ function azure_util(azure, filestream) meta['resultStatus'] = resultStatus.UPLOADED; } + /** + * Download the file blob then write the file. + * + * @param {String} dataFile + * @param {Object} meta + * @param {Object} encryptionMetadata + * @param {Number} maxConcurrency + * + * @returns {null} + */ + this.nativeDownloadFile = async function (meta, fullDstPath, maxConcurrency) + { + var stageInfo = meta['stageInfo']; + var client = this.createClient(stageInfo); + var azureLocation = this.extractContainerNameAndPath(stageInfo['location']); + var blobName = azureLocation.path + meta['srcFileName']; + + var containerClient = client.getContainerClient(azureLocation.containerName); + var blockBlobClient = containerClient.getBlockBlobClient(blobName); + + try + { + var downloadBlockBlobResponse = await blockBlobClient.download(0); + var readableStream = downloadBlockBlobResponse.readableStreamBody; + + await new Promise((resolve, reject) => + { + var writer = fs.createWriteStream(fullDstPath); + readableStream.on("data", (data) => + { + writer.write(data); + }); + readableStream.on("end", () => + { + writer.end(); + resolve(); + }); + readableStream.on("error", reject); + }); + } + catch (err) + { + if (err['statusCode'] == 403 && detectAzureTokenExpireError(err)) + { + meta['lastError'] = err; + meta['resultStatus'] = resultStatus.RENEW_TOKEN; + return; + } + else + { + meta['lastError'] = err; + meta['resultStatus'] = resultStatus.NEED_RETRY; + } + return; + } + + meta['resultStatus'] = resultStatus.DOWNLOADED; + } + /** * Detect if the Azure token has expired. * @@ -253,7 +312,6 @@ function azure_util(azure, filestream) return errstr.includes("Signature not valid in the specified time frame") || errstr.includes("Server failed to authenticate the request."); } - } module.exports = azure_util; diff --git a/lib/file_transfer_agent/encrypt_util.js b/lib/file_transfer_agent/encrypt_util.js index fb8c57662..702e53d91 100644 --- a/lib/file_transfer_agent/encrypt_util.js +++ b/lib/file_transfer_agent/encrypt_util.js @@ -181,9 +181,18 @@ function encrypt_util(encrypt, filestream, temp) var ivBytes = new Buffer.from(ivBase64, BASE64); // Create temp file - var tmpobj = tmp.fileSync({ dir: tmpDir, prefix: path.basename(inFileName) + '#' }); - var tempOutputFileName = tmpobj.name; - var tempFd = tmpobj.fd; + var tempOutputFileName; + var tempFd; + await new Promise((resolve) => + { + tmp.file({ dir: tmpDir, prefix: path.basename(inFileName) + '#' }, (err, path, fd) => + { + if (err) reject(err); + tempOutputFileName = path; + tempFd = fd; + resolve(); + }); + }); // Create key decipher with decoded key and AES ECB var decipher = crypto.createDecipheriv(AES_ECB, decodedKey, null); @@ -211,12 +220,22 @@ function encrypt_util(encrypt, filestream, temp) { outfile.write(decipher.final()); outfile.close(); + }); + outfile.on('close', () => + { resolve(); }); }); // Close temp file - fs.closeSync(tempFd); + await new Promise((resolve) => + { + fs.close(tempFd, (err) => + { + if (err) reject(err); + resolve(); + }); + }); return tempOutputFileName; } diff --git a/lib/file_transfer_agent/file_transfer_agent.js b/lib/file_transfer_agent/file_transfer_agent.js index 033382aa4..e96e4f8c4 100644 --- a/lib/file_transfer_agent/file_transfer_agent.js +++ b/lib/file_transfer_agent/file_transfer_agent.js @@ -11,6 +11,7 @@ var path = require('path'); var statement = require('../connection/statement'); var fileCompressionType = require('./file_compression_type'); +var expandTilde = require('expand-tilde'); var SnowflakeFileUtil = new (require('./file_util').file_util)(); var SnowflakeRemoteStorageUtil = new (require('./remote_storage_util').remote_storage_util)(); var SnowflakeFileEncryptionMaterial = require('./remote_storage_util').SnowflakeFileEncryptionMaterial; @@ -75,11 +76,14 @@ function file_transfer_agent(context) var parallel; var stageInfo; var stageLocationType; - var presignedUrl; - var localLocation; + var presignedUrls; var useAccelerateEndpoint = false; + var srcFiles; + var srcFilesToEncryptionMaterial = {}; + var localLocation; + var results = []; // Store info of files retrieved @@ -99,17 +103,36 @@ function file_transfer_agent(context) { parseCommand(); - if (filesToPut.length === 0) - { - throw new Error('No file found for: ' + fileName); - } initFileMetadata(); if (commandType === CMD_TYPE_UPLOAD) { + if (filesToPut.length === 0) + { + throw new Error('No file found for: ' + fileName); + } + processFileCompressionType(); } + if (commandType === CMD_TYPE_DOWNLOAD) + { + if (!fs.existsSync(localLocation)) + { + fs.mkdirSync(localLocation); + } + + } + + if (stageLocationType === LOCAL_FS) + { + process.umask(0); + if (!fs.existsSync(stageInfo['location'])) + { + fs.mkdirSync(stageInfo['location'], { mode: 0o777, recursive: true }); + } + } + await transferAccelerateConfig(); await updateFileMetasWithPresignedUrl(); @@ -134,6 +157,11 @@ function file_transfer_agent(context) { await upload(largeFileMetas, smallFileMetas) } + + if (commandType === CMD_TYPE_DOWNLOAD) + { + await download(largeFileMetas, smallFileMetas) + } } /** @@ -205,6 +233,37 @@ function file_transfer_agent(context) ] }; } + else if (commandType === CMD_TYPE_DOWNLOAD) + { + var dstFileSize; + var errorDetails; + + if (results) + { + for (var meta of results) + { + errorDetails = meta['errorDetails']; + dstFileSize = meta['dstFileSize']; + + rowset.push([ + meta['dstFileName'], + dstFileSize, + meta['resultStatus'], + errorDetails + ]); + } + } + + return { + 'rowset': rowset, + 'rowtype': [ + RESULT_TEXT_COLUMN_DESC('file'), + RESULT_FIXED_COLUMN_DESC('size'), + RESULT_TEXT_COLUMN_DESC('status'), + RESULT_TEXT_COLUMN_DESC('message') + ] + }; + } } /** @@ -336,6 +395,113 @@ function file_transfer_agent(context) return meta; } + /** + * Download files in the metadata list. + * + * @returns {null} + */ + async function download(largeFileMetas, smallFileMetas) + { + var storageClient = getStorageClient(stageLocationType); + var client = storageClient.createClient(stageInfo, false); + + for (var meta of smallFileMetas) + { + meta['client'] = client; + } + for (var meta of largeFileMetas) + { + meta['client'] = client; + } + + if (smallFileMetas.length > 0) + { + //await downloadFilesinParallel(smallFileMetas); + await downloadFilesinSequential(smallFileMetas); + } + if (largeFileMetas.length > 0) + { + await downloadFilesinSequential(largeFileMetas); + } + } + + /** + * Download a file sequentially. + * + * @param {Object} fileMeta + * + * @returns {null} + */ + async function downloadFilesinSequential(fileMeta) + { + var index = 0; + var fileMetaLen = fileMeta.length; + + while (index < fileMetaLen) + { + var result = await downloadOneFile(fileMeta[index]); + if (result['resultStatus'] == resultStatus.RENEW_TOKEN) + { + var client = renewExpiredClient(); + for (var index2 = index; index2 < fileMetaLen; index2++) + { + fileMeta[index2]['client'] = client; + } + continue; + } + else if (result['resultStatus'] == resultStatus.RENEW_PRESIGNED_URL) + { + await updateFileMetasWithPresignedUrl() + continue; + } + results.push(result); + index += 1; + if (INJECT_WAIT_IN_PUT > 0) + { + await new Promise(resolve => setTimeout(resolve, INJECT_WAIT_IN_PUT)); + } + } + } + + /** + * Download a file and place into the target directory. + * + * @param {Object} meta + * + * @returns {Object} + */ + async function downloadOneFile(meta) + { + var tmpDir = await new Promise((resolve, reject) => + { + fs.mkdtemp(path.join(os.tmpdir(), 'tmp'), (err, dir) => + { + if (err) reject(err); + resolve(dir); + }); + }); + + meta['tmpDir'] = tmpDir; + try + { + var storageClient = getStorageClient(meta['stageLocationType']); + await storageClient.downloadOneFile(meta); + } + catch (err) + { + meta['dstFileSize'] = -1; + if (meta['resultStatus']) + { + meta['resultStatus'] = resultStatus.ERROR; + + } + meta['errorDetails'] = err.toString(); + meta['errorDetails'] += ` file=${meta['dstFileName']}`; + } + + return meta; + } + /** * Determine whether to acceleration configuration for S3 clients. * @@ -400,10 +566,7 @@ function file_transfer_agent(context) { for (var index = 0; index < fileMetadata.length; index++) { - if (presignedUrl.length > index) - { - meta['presignedUrl'] = presignedUrl[index]; - } + fileMetadata[index]['presignedUrl'] = presignedUrls[index]; } } } @@ -527,11 +690,34 @@ function file_transfer_agent(context) autoCompress = data['autoCompress']; sourceCompression = data['sourceCompression']; } - + else if (commandType === CMD_TYPE_DOWNLOAD) + { + srcFiles = data['src_locations']; + + if (srcFiles.length == encryptionMaterial.length) + { + for (const idx in srcFiles) + { + srcFilesToEncryptionMaterial[srcFiles[idx]] = encryptionMaterial[idx]; + } + } + else if (encryptionMaterial.length !== 0) + { + // some encryption material exists. Zero means no encryption + throw new Error("The number of downloading files doesn't match"); + } + localLocation = expandTilde(data["localLocation"]); + var dir = fs.statSync(localLocation); + if (!dir.isDirectory()) + { + throw new Error("The local path is not a directory: " + localLocation); + } + } + parallel = data['parallel']; stageInfo = data['stageInfo']; stageLocationType = stageInfo['locationType']; - presignedUrl = stageInfo['presignedUrl']; + presignedUrls = data['presignedUrls']; } /** @@ -552,6 +738,16 @@ function file_transfer_agent(context) rootNode['queryId'], rootNode['smkId'])); } + else if (commandType === CMD_TYPE_DOWNLOAD) + { + for (const elem in rootNode) + { + encryptionMaterial.push(new SnowflakeFileEncryptionMaterial( + rootNode[elem]['queryStageMasterKey'], + rootNode[elem]['queryId'], + rootNode[elem]['smkId'])); + } + } } } @@ -576,6 +772,22 @@ function file_transfer_agent(context) fileMetadata.push(currFileObj); } } + else if (commandType === CMD_TYPE_DOWNLOAD) + { + for (var fileName of srcFiles) + { + var currFileObj = {}; + currFileObj['srcFileName'] = fileName; + currFileObj['dstFileName'] = fileName; + currFileObj['stageLocationType'] = stageLocationType; + currFileObj['stageInfo'] = stageInfo; + currFileObj['useAccelerateEndpoint'] = useAccelerateEndpoint; + currFileObj['localLocation'] = localLocation; + currFileObj['encryptionMaterial'] = srcFilesToEncryptionMaterial[fileName]; + + fileMetadata.push(currFileObj); + } + } if (encryptionMaterial.length > 0) { diff --git a/lib/file_transfer_agent/gcs_util.js b/lib/file_transfer_agent/gcs_util.js index e1eba3f2c..7bcc457a7 100644 --- a/lib/file_transfer_agent/gcs_util.js +++ b/lib/file_transfer_agent/gcs_util.js @@ -15,6 +15,7 @@ const GCS_FILE_HEADER_ENCRYPTION_METADATA = 'gcs-file-header-encryption-metadata const CONTENT_CHUNK_SIZE = 10 * 1024; const HTTP_HEADER_CONTENT_ENCODING = 'Content-Encoding'; +const HTTP_HEADER_ACCEPT_ENCODING = 'Accept-Encoding'; const resultStatus = require('./file_util').resultStatus; // GCS Location @@ -289,6 +290,105 @@ function gcs_util(httpclient, filestream) meta[GCS_FILE_HEADER_ENCRYPTION_METADATA] = gcsHeaders[GCS_METADATA_ENCRYPTIONDATAPROP]; } + + /** + * Download the file. + * + * @param {String} dataFile + * @param {Object} meta + * @param {Object} encryptionMetadata + * @param {Number} maxConcurrency + * + * @returns {null} + */ + this.nativeDownloadFile = async function (meta, fullDstPath, maxConcurrency) + { + var downloadUrl = meta['presignedUrl']; + var accessToken = null; + var gcsHeaders = {}; + + if (!downloadUrl) + { + downloadUrl = this.generateFileURL( + meta.stageInfo["location"], lstrip(meta['srcFileName'], "/") + ) + accessToken = meta['client']; + gcsHeaders = { 'Authorization': `Bearer ${accessToken}` }; + } + + var response; + try + { + await axios({ + method: 'get', + url: downloadUrl, + headers: gcsHeaders, + responseType: 'stream' + }).then(async (res) => + { + response = res; + await new Promise((resolve, reject) => + { + const writer = fs.createWriteStream(fullDstPath); + response.data.pipe(writer); + writer.on('error', err => + { + writer.close(); + reject(err); + }); + writer.on('close', () => + { + resolve(); + }); + }); + }); + } + catch (err) + { + if (err['code'] == EXPIRED_TOKEN) + { + meta['resultStatus'] = resultStatus.RENEW_TOKEN; + } + else + { + meta['lastError'] = err; + if (err['code'] == ERRORNO_WSAECONNABORTED) + { + meta['resultStatus'] = resultStatus.NEED_RETRY_WITH_LOWER_CONCURRENCY; + } + else + { + meta['resultStatus'] = resultStatus.NEED_RETRY; + } + } + return; + } + + var encryptionData; + if (response.headers[GCS_METADATA_ENCRYPTIONDATAPROP]) + { + encryptionData = JSON.parse(response.headers[GCS_METADATA_ENCRYPTIONDATAPROP]); + } + + var encryptionMetadata; + if (encryptionData) + { + encryptionMetadata = EncryptionMetadata( + encryptionData["WrappedContentKey"]["EncryptedKey"], + encryptionData["ContentEncryptionIV"], + response.headers[GCS_METADATA_MATDESC_KEY] + ); + } + var fileInfo = fs.statSync(fullDstPath); + meta['srcFileSize'] = fileInfo.size; + + meta['resultStatus'] = resultStatus.DOWNLOADED; + + meta[GCS_FILE_HEADER_DIGEST] = response.headers[GCS_METADATA_SFC_DIGEST]; + meta[GCS_FILE_HEADER_CONTENT_LENGTH] = response.headers['content-length']; + meta[GCS_FILE_HEADER_ENCRYPTION_METADATA] = encryptionMetadata; + } + /** * Generate file URL based on bucket. * diff --git a/lib/file_transfer_agent/local_util.js b/lib/file_transfer_agent/local_util.js index 1042b3d53..e268a34ab 100644 --- a/lib/file_transfer_agent/local_util.js +++ b/lib/file_transfer_agent/local_util.js @@ -59,6 +59,45 @@ function local_util() meta['dstFileSize'] = meta['uploadSize']; meta['resultStatus'] = resultStatus.UPLOADED; } + + /** + * Write file to download. + * + * @param {Object} meta + * + * @returns {null} + */ + this.downloadOneFile = async function (meta) + { + await new Promise(function (resolve) + { + const srcFilePath = expandTilde(meta['stageInfo']['location']); + + // Create stream object for reader and writer + var realSrcFilePath = path.join(srcFilePath, meta['srcFileName']); + var reader = fs.createReadStream(realSrcFilePath); + + // Create directory if doesn't exist + if (!fs.existsSync(meta['localLocation'])) + { + fs.mkdirSync(meta['localLocation'], { recursive: true }); + } + + var output = path.join(meta['localLocation'], meta['dstFileName']); + + var writer = fs.createWriteStream(output); + // Write file + var result = reader.pipe(writer); + result.on('finish', function () + { + resolve(); + }); + }); + + var fileStat = fs.statSync(output) + meta['dstFileSize'] = fileStat.size; + meta['resultStatus'] = resultStatus.DOWNLOADED; + } } exports.local_util = local_util; diff --git a/lib/file_transfer_agent/remote_storage_util.js b/lib/file_transfer_agent/remote_storage_util.js index d20d84ec3..5d42a7a3d 100644 --- a/lib/file_transfer_agent/remote_storage_util.js +++ b/lib/file_transfer_agent/remote_storage_util.js @@ -233,18 +233,39 @@ function remote_storage_util() this.downloadOneFile = async function (meta) { // Downloads a file from S3 - var fullDstPath = path.join(meta['localLocation'], path.basename(meta['dstFileName'])); - fullDstPath = fs.realpathSync(fullDstPath); + var fullDstPath = meta['localLocation']; + await new Promise((resolve, reject) => + { + fs.realpath(fullDstPath, (err, basePath) => + { + if (err) reject(err); + fullDstPath = path.join(basePath, path.basename(meta['dstFileName'])); + resolve(); + }); + }); // TODO: validate fullDstPath is under the writable directory var baseDir = path.dirname(fullDstPath); - if (!fs.existsSync(baseDir)) + await new Promise((resolve) => { - fs.mkdirSync(baseDir); - } + fs.exists(baseDir, (exists) => + { + if (!exists) + { + fs.mkdir(baseDir, () => + { + resolve(); + }); + } + else + { + resolve(); + } + }); + }); - var utilClass = getForStorageType(meta['stageInfo']['locationType']); - var fileHeader = await utilClass.getFileHeader(meta, meta['srcFilePath']); + var utilClass = this.getForStorageType(meta['stageInfo']['locationType']); + var fileHeader = await utilClass.getFileHeader(meta, meta['srcFileName']); if (fileHeader) { @@ -257,11 +278,12 @@ function remote_storage_util() for (var retry = 0; retry < maxRetry; retry++) { - utilClass.nativeDownloadFile(meta, fullDstPath, maxConcurrency); + // Download the file + await utilClass.nativeDownloadFile(meta, fullDstPath, maxConcurrency); - if (meta['result_status'] == ResultStatus.DOWNLOADED) + if (meta['resultStatus'] == resultStatus.DOWNLOADED) { - if (meta['encryption_material']) + if (meta['encryptionMaterial']) { /** * For storage utils that do not have the privilege of @@ -278,21 +300,54 @@ function remote_storage_util() fileHeader = await utilClass.getFileHeader(meta, meta['srcFilePath']) } - var tmpDstName = SnowflakeEncryptionUtil.decryptFile( + var tmpDstName = await SnowflakeEncryptionUtil.decryptFile( fileHeader.encryptionMetadata, meta['encryptionMaterial'], fullDstPath, - undefined, meta['tmpDir']); - fs.copyFileSync(tmpDstName, fullDstPath); + // Copy decrypted tmp file to target destination path + await new Promise((resolve, reject) => + { + fs.copyFile(tmpDstName, fullDstPath, async (err) => + { + if (err) reject(err); + resolve(); + }); + }); + + // Delete tmp file + await new Promise((resolve, reject) => { + fs.unlink(tmpDstName, (err) => + { + if (err) reject(err); + resolve(); + }); + }); + + // Delete tmp folder + await new Promise((resolve, reject) => + { + fs.rmdir(meta['tmpDir'], (err) => + { + if (err) + { + reject(err); + } + resolve(); + }); + }); } - else + await new Promise((resolve) => { - var fileInfo = fs.statSync(fullDstPath); - meta['dstFileSize'] = fileInfo.size; - return - } + fs.stat(fullDstPath, (err, stat) => + { + meta['dstFileSize'] = stat.size; + resolve(); + }); + }); + + return; } else if (meta['resultStatus'] == resultStatus.RENEW_TOKEN) { diff --git a/lib/file_transfer_agent/s3_util.js b/lib/file_transfer_agent/s3_util.js index 7b8305572..1465430ee 100644 --- a/lib/file_transfer_agent/s3_util.js +++ b/lib/file_transfer_agent/s3_util.js @@ -182,7 +182,7 @@ function s3_util(s3, filestream) return FileHeader( akey.Metadata[SFC_DIGEST], - akey.contentLength, + akey.ContentLength, encryptionMetadata ); } @@ -255,6 +255,68 @@ function s3_util(s3, filestream) meta['dstFileSize'] = meta['uploadSize']; meta['resultStatus'] = resultStatus.UPLOADED; } + + /** + * Download the file. + * + * @param {String} dataFile + * @param {Object} meta + * @param {Object} encryptionMetadata + * @param {Number} maxConcurrency + * + * @returns {null} + */ + this.nativeDownloadFile = async function (meta, fullDstPath, maxConcurrency) + { + var stageInfo = meta['stageInfo']; + var client = this.createClient(stageInfo); + + var s3location = this.extractBucketNameAndPath(meta['stageInfo']['location']); + + var params = { + Bucket: s3location.bucketName, + Key: s3location.s3path + meta['dstFileName'], + }; + + // call S3 to download file to specified bucket + try + { + await client.getObject(params) + .promise() + .then((data) => + { + return new Promise((resolve, reject) => + { + fs.writeFile(fullDstPath, data.Body, 'binary', (err) => + { + if (err) reject(err); + resolve(); + }); + }); + }); + } + catch (err) + { + if (err['code'] == EXPIRED_TOKEN) + { + meta['resultStatus'] = resultStatus.RENEW_TOKEN; + } + else + { + meta['lastError'] = err; + if (err['code'] == ERRORNO_WSAECONNABORTED) + { + meta['resultStatus'] = resultStatus.NEED_RETRY_WITH_LOWER_CONCURRENCY; + } + else + { + meta['resultStatus'] = resultStatus.NEED_RETRY; + } + } + return; + } + meta['resultStatus'] = resultStatus.DOWNLOADED; + } } module.exports = s3_util; diff --git a/lib/util.js b/lib/util.js index de59a4fd9..b46a1e4eb 100644 --- a/lib/util.js +++ b/lib/util.js @@ -500,3 +500,14 @@ exports.isPutCommand = function (sqlText) { return (sqlText.substring(0, 3).toUpperCase() === "PUT"); }; + +/** + * Returns if command is a GET command + * + * @param sqlText the query command + * @returns {boolean} + */ +exports.isGetCommand = function (sqlText) +{ + return (sqlText.substring(0, 3).toUpperCase() === "GET"); +}; diff --git a/test/integration/testPut.js b/test/integration/testPutGet.js similarity index 75% rename from test/integration/testPut.js rename to test/integration/testPutGet.js index e351bd09d..03ddce769 100644 --- a/test/integration/testPut.js +++ b/test/integration/testPutGet.js @@ -9,12 +9,15 @@ const fileCompressionType = require('./../../lib/file_transfer_agent/file_compre const fs = require('fs'); const testUtil = require('./testUtil'); const tmp = require('tmp'); +const os = require('os'); +const path = require('path'); const DATABASE_NAME = connOption.valid.database; const SCHEMA_NAME = connOption.valid.schema; const TEMP_TABLE_NAME = 'TEMP_TABLE'; const UPLOADED = "UPLOADED"; +const DOWNLOADED = "DOWNLOADED"; const COL1 = 'C1'; const COL2 = 'C2'; @@ -28,7 +31,7 @@ const ROW_DATA = COL1_DATA + "," + COL2_DATA + "," + COL3_DATA + "\n" + COL1_DATA + "," + COL2_DATA + "," + COL3_DATA + "\n"; -describe('PUT test', function () +describe('PUT GET test', function () { this.timeout(100000); @@ -59,27 +62,27 @@ describe('PUT test', function () var testCases = [ { - name: 'PUT command - gzip', + name: 'gzip', encoding: fileCompressionType.lookupByMimeSubType('gzip'), }, { - name: 'PUT command - bzip2', + name: 'bzip2', encoding: fileCompressionType.lookupByMimeSubType('bz2'), }, { - name: 'PUT command - brotli', + name: 'brotli', encoding: fileCompressionType.lookupByMimeSubType('br'), }, { - name: 'PUT command - deflate', + name: 'deflate', encoding: fileCompressionType.lookupByMimeSubType('deflate'), }, { - name: 'PUT command - raw deflate', + name: 'raw deflate', encoding: fileCompressionType.lookupByMimeSubType('raw_deflate'), }, { - name: 'PUT command - zstd', + name: 'zstd', encoding: fileCompressionType.lookupByMimeSubType('zstd'), } ]; @@ -102,6 +105,18 @@ describe('PUT test', function () putQuery = `PUT file://${process.env.USERPROFILE}\\AppData\\Local\\Temp\\${fileName} @${DATABASE_NAME}.${SCHEMA_NAME}.%${TEMP_TABLE_NAME}`; } + // Create a tmp folder for downloaded files + var tmpDir = fs.mkdtempSync(path.join(os.tmpdir(), 'get')); + var fileSize; + + var getQuery = `GET @${DATABASE_NAME}.${SCHEMA_NAME}.%${TEMP_TABLE_NAME} file://${tmpDir}`; + // Windows user contains a '~' in the path which causes an error + if (process.platform == "win32") + { + var dirName = tmpDir.substring(tmpDir.lastIndexOf('\\') + 1); + getQuery = `GET @${DATABASE_NAME}.${SCHEMA_NAME}.%${TEMP_TABLE_NAME} file://${process.env.USERPROFILE}\\AppData\\Local\\Temp\\${dirName}`; + } + async.series( [ function (callback) @@ -123,6 +138,7 @@ describe('PUT test', function () }); stream.on('data', function (row) { + fileSize = row.targetSize; // Check the file is correctly uploaded assert.strictEqual(row['status'], UPLOADED); // Check the target encoding is correct @@ -191,6 +207,40 @@ describe('PUT test', function () }); }, function (callback) + { + // Run GET command + var statement = connection.execute({ + sqlText: getQuery, + complete: function (err, stmt, rows) + { + var stream = statement.streamRows(); + stream.on('error', function (err) + { + done(err); + }); + stream.on('data', function (row) + { + assert.strictEqual(row.status, DOWNLOADED); + assert.strictEqual(row.size, fileSize); + // Delete the downloaded file + fs.unlink(path.join(tmpDir, row.file), (err) => + { + if (err) throw (err); + // Delete the temporary folder + fs.rmdir(tmpDir, (err) => + { + if (err) throw (err); + }); + }); + }); + stream.on('end', function (row) + { + callback(); + }); + } + }); + }, + function (callback) { // Remove files from staging testUtil.executeCmd(connection, removeFile, callback);