Skip to content

Commit

Permalink
Add GET command (#221)
Browse files Browse the repository at this point in the history
* Check if command being executed is GET

* Add GET functionality to file transfer agent

* Add GET functionality to remote storage util

* Add download function to S3 client

* Fix decryption when downloading more than one file

* Add Azure client for GET

* Add GCS client for GET

* Add local client for GET

* Add testing for GET command

* Add temp debug msg

* Remove temp debug msg

* Modify GET query on Windows OS
  • Loading branch information
SimbaGithub authored Dec 13, 2021
1 parent d1533d3 commit 0bc8779
Show file tree
Hide file tree
Showing 10 changed files with 650 additions and 44 deletions.
2 changes: 1 addition & 1 deletion lib/connection/statement.js
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ exports.createStatementPreExec = function (
var context = createContextPreExec(
options, services, connectionConfig);

if (Util.isPutCommand(options.sqlText))
if (Util.isPutCommand(options.sqlText) || Util.isGetCommand(options.sqlText))
{
return createFileStatementPreExec(
options, context, services, connectionConfig);
Expand Down
60 changes: 59 additions & 1 deletion lib/file_transfer_agent/azure_util.js
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,65 @@ function azure_util(azure, filestream)
meta['resultStatus'] = resultStatus.UPLOADED;
}

/**
* Download the file blob then write the file.
*
* @param {String} dataFile
* @param {Object} meta
* @param {Object} encryptionMetadata
* @param {Number} maxConcurrency
*
* @returns {null}
*/
this.nativeDownloadFile = async function (meta, fullDstPath, maxConcurrency)
{
var stageInfo = meta['stageInfo'];
var client = this.createClient(stageInfo);
var azureLocation = this.extractContainerNameAndPath(stageInfo['location']);
var blobName = azureLocation.path + meta['srcFileName'];

var containerClient = client.getContainerClient(azureLocation.containerName);
var blockBlobClient = containerClient.getBlockBlobClient(blobName);

try
{
var downloadBlockBlobResponse = await blockBlobClient.download(0);
var readableStream = downloadBlockBlobResponse.readableStreamBody;

await new Promise((resolve, reject) =>
{
var writer = fs.createWriteStream(fullDstPath);
readableStream.on("data", (data) =>
{
writer.write(data);
});
readableStream.on("end", () =>
{
writer.end();
resolve();
});
readableStream.on("error", reject);
});
}
catch (err)
{
if (err['statusCode'] == 403 && detectAzureTokenExpireError(err))
{
meta['lastError'] = err;
meta['resultStatus'] = resultStatus.RENEW_TOKEN;
return;
}
else
{
meta['lastError'] = err;
meta['resultStatus'] = resultStatus.NEED_RETRY;
}
return;
}

meta['resultStatus'] = resultStatus.DOWNLOADED;
}

/**
* Detect if the Azure token has expired.
*
Expand All @@ -253,7 +312,6 @@ function azure_util(azure, filestream)
return errstr.includes("Signature not valid in the specified time frame") ||
errstr.includes("Server failed to authenticate the request.");
}

}

module.exports = azure_util;
27 changes: 23 additions & 4 deletions lib/file_transfer_agent/encrypt_util.js
Original file line number Diff line number Diff line change
Expand Up @@ -181,9 +181,18 @@ function encrypt_util(encrypt, filestream, temp)
var ivBytes = new Buffer.from(ivBase64, BASE64);

// Create temp file
var tmpobj = tmp.fileSync({ dir: tmpDir, prefix: path.basename(inFileName) + '#' });
var tempOutputFileName = tmpobj.name;
var tempFd = tmpobj.fd;
var tempOutputFileName;
var tempFd;
await new Promise((resolve) =>
{
tmp.file({ dir: tmpDir, prefix: path.basename(inFileName) + '#' }, (err, path, fd) =>
{
if (err) reject(err);
tempOutputFileName = path;
tempFd = fd;
resolve();
});
});

// Create key decipher with decoded key and AES ECB
var decipher = crypto.createDecipheriv(AES_ECB, decodedKey, null);
Expand Down Expand Up @@ -211,12 +220,22 @@ function encrypt_util(encrypt, filestream, temp)
{
outfile.write(decipher.final());
outfile.close();
});
outfile.on('close', () =>
{
resolve();
});
});

// Close temp file
fs.closeSync(tempFd);
await new Promise((resolve) =>
{
fs.close(tempFd, (err) =>
{
if (err) reject(err);
resolve();
});
});

return tempOutputFileName;
}
Expand Down
Loading

0 comments on commit 0bc8779

Please sign in to comment.